Skip to content

Commit 0aea1b3

Browse files
authored
Merge pull request #107 from Datura-ai/hotfix-main-bittensor
Hotfix main bittensor
2 parents 0626321 + 14c4a7f commit 0aea1b3

File tree

1 file changed

+48
-12
lines changed

1 file changed

+48
-12
lines changed

validators/weight_setter.py

+48-12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Tuple
1313
import bittensor as bt
1414
from bittensor import StreamingSynapse
15+
1516
import cortext
1617
from starlette.types import Send
1718

@@ -72,7 +73,7 @@ def __init__(self, config, cache: QueryResponseCache, loop=None):
7273
self.weights_rate_limit = self.node_query('SubtensorModule', 'WeightsSetRateLimit', [self.netuid])
7374

7475
# Set up async-related attributes
75-
self.lock = asyncio.Lock()
76+
self.lock = threading.Lock()
7677
self.loop = loop or asyncio.get_event_loop()
7778

7879
# Initialize shared query database
@@ -95,11 +96,14 @@ def __init__(self, config, cache: QueryResponseCache, loop=None):
9596
daemon_thread = threading.Thread(target=self.saving_resp_answers_from_miners)
9697
daemon_thread.start()
9798

98-
synthetic_thread = threading.Thread(target=self.process_synthetic_tasks)
99-
synthetic_thread.start()
99+
# synthetic_thread = threading.Thread(target=self.process_synthetic_tasks)
100+
# synthetic_thread.start()
101+
self.loop.create_task(self.perform_synthetic_queries())
102+
103+
# organic_thread = threading.Thread(target=self.start_axon_server)
104+
# organic_thread.start()
105+
self.loop.create_task(self.consume_organic_queries())
100106

101-
organic_thread = threading.Thread(target=self.start_axon_server)
102-
organic_thread.start()
103107

104108
def start_axon_server(self):
105109
asyncio.run(self.consume_organic_queries())
@@ -134,6 +138,7 @@ async def refresh_metagraph(self):
134138
await self.run_sync_in_async(lambda: self.metagraph.sync())
135139

136140
async def initialize_uids_and_capacities(self):
141+
bt.logging.info("start initializing uids and capacities")
137142
self.available_uid_to_axons = await self.get_available_uids()
138143
self.uids_to_query = list(self.available_uid_to_axons.keys())
139144
bt.logging.info(f"Available UIDs: {list(self.available_uid_to_axons.keys())}")
@@ -175,7 +180,8 @@ def is_epoch_end(self):
175180
async def update_and_refresh(self):
176181
await self.update_weights()
177182
bt.logging.info("Refreshing metagraph...")
178-
await self.refresh_metagraph()
183+
184+
self.metagraph.sync()
179185
await self.initialize_uids_and_capacities()
180186
bt.logging.info("Metagraph refreshed.")
181187

@@ -300,6 +306,8 @@ async def perform_synthetic_queries(self):
300306
continue
301307
self.set_up_next_block_to_wait()
302308
# await asyncio.sleep(432)
309+
bt.logging.debug("start synthetic queries")
310+
self.loop = asyncio.get_event_loop()
303311
self.loop.create_task(self.perform_synthetic_queries_one_cycle())
304312

305313
def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks):
@@ -399,10 +407,11 @@ async def set_weights(self, scores):
399407
wallet=self.wallet,
400408
uids=self.metagraph.uids,
401409
weights=self.moving_average_scores,
402-
wait_for_inclusion=True,
410+
wait_for_inclusion=False,
403411
version_key=cortext.__weights_version__,
404412
)
405-
bt.logging.info(f"done setting weights: {success}, {msg}. {time.time() - start_time} elaspsed for updating weights.")
413+
bt.logging.info(
414+
f"done setting weights: {success}, {msg}. {time.time() - start_time} elaspsed for updating weights.")
406415

407416
def blacklist_prompt(self, synapse: StreamPrompting) -> Tuple[bool, str]:
408417
blacklist = self.base_blacklist(synapse, cortext.PROMPT_BLACKLIST_STAKE)
@@ -596,24 +605,45 @@ async def process_queries_from_database(self):
596605
if not self.query_database:
597606
bt.logging.debug("no data in query_database. so continue...")
598607
continue
599-
if not self.is_epoch_end():
600-
bt.logging.debug("no end of epoch. so continue...")
601-
continue
602608
if not self.synthetic_task_done:
603609
bt.logging.debug("wait for synthetic tasks to complete.")
604610
continue
611+
if not self.is_epoch_end():
612+
bt.logging.debug("no end of epoch. so continue...")
613+
continue
605614

606615
bt.logging.info(f"start scoring process...")
607616

608617
queries_to_process = self.query_database.copy()
609618
self.query_database.clear()
610619

620+
611621
self.synthetic_task_done = False
612622
bt.logging.info("start scoring process")
613623
start_time = time.time()
614624

625+
# remove query_resps where len of resp is 0
626+
empty_uid_model_items = []
627+
for item in queries_to_process:
628+
uid = item.get("uid")
629+
resp = item.get("response")
630+
model = item.get("synapse").model
631+
if not resp:
632+
empty_uid_model_items.append((uid, model))
633+
634+
items_to_score = []
635+
for item in queries_to_process:
636+
uid = item.get("uid")
637+
model = item.get("synapse").model
638+
if (uid, model) in empty_uid_model_items:
639+
bt.logging.trace(
640+
f"this miner {uid} has at least 1 empty response for model {model}. so being scored as 0.")
641+
continue
642+
items_to_score.append(item)
643+
bt.logging.info(f"total len of datas to score: {len(items_to_score)}")
644+
615645
# with all query_respones, select one per uid, provider, model randomly and score them.
616-
score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process)
646+
score_tasks = self.get_scoring_tasks_from_query_responses(items_to_score)
617647

618648
resps = await asyncio.gather(*score_tasks, return_exceptions=True)
619649
resps = [item for item in resps if item is not None]
@@ -623,6 +653,12 @@ async def process_queries_from_database(self):
623653
if self.total_scores.get(uid) is not None:
624654
self.total_scores[uid] += score
625655
self.score_counts[uid] += 1
656+
657+
for uid in self.uid_to_capacity:
658+
if self.total_scores.get(uid) is None:
659+
self.total_scores[uid] = 0
660+
self.score_counts[uid] = 1
661+
626662
bt.logging.info(
627663
f"current total score are {self.total_scores}. total time of scoring is {time.time() - start_time}")
628664
self.saving_datas = queries_to_process.copy()

0 commit comments

Comments
 (0)