Skip to content

Commit dddfc90

Browse files
authored
Merge pull request #104 from Datura-ai/hotfix-main-bittensor
synapse dendrite timeout issue fixed
2 parents 669e4a5 + 9eec907 commit dddfc90

File tree

6 files changed

+87
-85
lines changed

6 files changed

+87
-85
lines changed

cortext/dendrite.py

+12-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
import asyncio
12
from typing import Union, AsyncGenerator, Any
23

34
import aiohttp
45
import bittensor as bt
5-
from aiohttp import ServerTimeoutError, ClientConnectorError
6+
from aiohttp import ServerTimeoutError, ClientConnectorError, ClientConnectionError
67
from bittensor import dendrite
78
import traceback
89
import time
@@ -47,7 +48,7 @@ async def call_stream(
4748
# Preprocess synapse for making a request
4849
synapse: StreamPrompting = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore
4950
max_try = 0
50-
timeout = aiohttp.ClientTimeout(total=timeout, connect=10, sock_connect=10, sock_read=10)
51+
timeout = aiohttp.ClientTimeout(total=100, connect=timeout, sock_connect=timeout, sock_read=timeout)
5152
connector = aiohttp.TCPConnector(limit=200)
5253
session = aiohttp.ClientSession(timeout=timeout, connector=connector)
5354
try:
@@ -63,16 +64,18 @@ async def call_stream(
6364
yield chunk # Yield each chunk as it's processed
6465
except aiohttp.client_exceptions.ClientPayloadError:
6566
pass
66-
except TimeoutError as err:
67-
bt.logging.error(f"timeout error happens. max_try is {max_try}")
68-
max_try += 1
69-
continue
7067
except ConnectionRefusedError as err:
7168
bt.logging.error(f"can not connect to miner for now. connection failed")
72-
break
69+
max_try += 1
70+
continue
7371
except ClientConnectorError as err:
74-
bt.logging.error(f"can not connect to miner for now. connection failed")
75-
break
72+
bt.logging.error(f"can not connect to miner for now. retrying")
73+
max_try += 1
74+
continue
75+
except ClientConnectionError as err:
76+
bt.logging.error(f"can not connect to miner for now. retrying")
77+
max_try += 1
78+
continue
7679
except ServerTimeoutError as err:
7780
bt.logging.error(f"timeout error happens. max_try is {max_try}")
7881
max_try += 1

cortext/protocol.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import AsyncIterator, Dict, List, Optional, Union
23
import bittensor as bt
34
import pydantic
@@ -369,11 +370,17 @@ def to_headers(self) -> dict:
369370
async def process_streaming_response(self, response: StreamingResponse, organic=True) -> AsyncIterator[str]:
370371
if self.completion is None:
371372
self.completion = ""
372-
chunk_size = 100 if organic else 1024
373-
async for chunk in response.content.iter_chunked(chunk_size):
374-
tokens = chunk.decode("utf-8")
375-
self.completion += tokens
376-
yield tokens
373+
chunk_size = 100 if organic else 1000
374+
remain_chunk = ""
375+
try:
376+
async for chunk in response.content.iter_chunked(chunk_size):
377+
tokens = chunk.decode("utf-8")
378+
remain_chunk = tokens
379+
self.completion += tokens
380+
yield tokens
381+
except asyncio.TimeoutError as err:
382+
yield remain_chunk
383+
377384

378385
def extract_response_json(self, response: StreamingResponse) -> dict:
379386
headers = {

miner/services/base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ def base_blacklist(self, synapse) -> Tuple[bool, str]:
5757

5858
# check the stake
5959
stake = self.metagraph.S[self.metagraph.hotkeys.index(hotkey)]
60-
if stake < self.blacklist_amt:
61-
return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}"
60+
# if stake < self.blacklist_amt:
61+
# return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}"
6262

6363
return False, f"accepting {synapse_type} request from {hotkey}"
6464

validators/services/validators/text_validator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ async def call_api(self, conversation: List[Dict[str, Optional[str]]], query_syn
177177
else:
178178
bt.logging.error(f"provider {provider} not found")
179179

180-
@save_or_get_answer_from_cache
180+
# @save_or_get_answer_from_cache
181181
async def get_answer_task(self, uid: int, query_syn: StreamPrompting, response):
182182
answer = await self.call_api(query_syn.messages, query_syn)
183183
return answer

validators/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def fetch(session, url):
191191
try:
192192
return await response.json()
193193
except Exception as err:
194-
bt.logging.error(f"{err} {traceback.format_exc()}")
194+
pass
195195

196196
# Asynchronous function to gather multiple HTTP requests
197197
async def gather_requests(urls):

validators/weight_setter.py

+59-67
Original file line numberDiff line numberDiff line change
@@ -190,17 +190,16 @@ async def handle_response(resp):
190190
bt.logging.trace(f"Streamed text: {chunk}")
191191

192192
# Store the query and response in the shared database
193-
async with self.lock:
194-
self.query_database.append({
195-
'uid': uid,
196-
'synapse': query_syn,
197-
'response': (response_text, query_syn.dendrite.process_time),
198-
'query_type': 'organic',
199-
'timestamp': asyncio.get_event_loop().time(),
200-
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
201-
metagraph=self.metagraph)
202-
})
203-
query_syn.time_taken = query_syn.dendrite.process_time
193+
self.query_database.append({
194+
'uid': uid,
195+
'synapse': query_syn,
196+
'response': (response_text, query_syn.dendrite.process_time),
197+
'query_type': 'organic',
198+
'timestamp': asyncio.get_event_loop().time(),
199+
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
200+
metagraph=self.metagraph)
201+
})
202+
query_syn.time_taken = query_syn.dendrite.process_time
204203

205204
axon = self.metagraph.axons[uid]
206205
response = self.dendrite.call_stream(
@@ -256,15 +255,14 @@ def is_cycle_end(self):
256255
async def perform_synthetic_queries_one_cycle(self):
257256
start_time = time.time()
258257
# don't process any organic query while processing synthetic queries.
259-
async with self.lock:
260-
synthetic_tasks = []
261-
# check available bandwidth and send synthetic requests to all miners.
262-
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
263-
for query_syn in query_synapses:
264-
uid = self.task_mgr.assign_task(query_syn)
265-
if uid is None:
266-
bt.logging.debug(f"No available uids for synthetic query process.")
267-
synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False)))
258+
synthetic_tasks = []
259+
# check available bandwidth and send synthetic requests to all miners.
260+
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
261+
for query_syn in query_synapses:
262+
uid = self.task_mgr.assign_task(query_syn)
263+
if uid is None:
264+
bt.logging.debug(f"No available uids for synthetic query process.")
265+
synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False)))
268266

269267
bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.")
270268

@@ -364,13 +362,12 @@ async def update_weights(self):
364362
avg_scores = {}
365363

366364
# Compute average scores per UID
367-
async with self.lock:
368-
for uid in self.total_scores:
369-
count = self.score_counts[uid]
370-
if count > 0:
371-
avg_scores[uid] = self.total_scores[uid] / count
372-
else:
373-
avg_scores[uid] = 0.0
365+
for uid in self.total_scores:
366+
count = self.score_counts[uid]
367+
if count > 0:
368+
avg_scores[uid] = self.total_scores[uid] / count
369+
else:
370+
avg_scores[uid] = 0.0
374371

375372
bt.logging.info(f"Average scores = {avg_scores}")
376373

@@ -444,16 +441,15 @@ async def images(self, synapse: ImageResponse) -> ImageResponse:
444441

445442
bt.logging.info(f"New synapse = {synapse_response}")
446443
# Store the query and response in the shared database
447-
async with self.lock:
448-
self.query_database.append({
449-
'uid': synapse.uid,
450-
'synapse': synapse,
451-
'response': synapse_response,
452-
'query_type': 'organic',
453-
'timestamp': asyncio.get_event_loop().time(),
454-
'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,
455-
metagraph=self.metagraph)
456-
})
444+
self.query_database.append({
445+
'uid': synapse.uid,
446+
'synapse': synapse,
447+
'response': synapse_response,
448+
'query_type': 'organic',
449+
'timestamp': asyncio.get_event_loop().time(),
450+
'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,
451+
metagraph=self.metagraph)
452+
})
457453

458454
return synapse_response
459455

@@ -466,16 +462,15 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings:
466462

467463
bt.logging.info(f"New synapse = {synapse_response}")
468464
# Store the query and response in the shared database
469-
async with self.lock:
470-
self.query_database.append({
471-
'uid': synapse.uid,
472-
'synapse': synapse,
473-
'response': synapse_response,
474-
'query_type': 'organic',
475-
'timestamp': asyncio.get_event_loop().time(),
476-
'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config,
477-
metagraph=self.metagraph)
478-
})
465+
self.query_database.append({
466+
'uid': synapse.uid,
467+
'synapse': synapse,
468+
'response': synapse_response,
469+
'query_type': 'organic',
470+
'timestamp': asyncio.get_event_loop().time(),
471+
'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config,
472+
metagraph=self.metagraph)
473+
})
479474

480475
return synapse_response
481476

@@ -509,17 +504,16 @@ async def handle_response(resp):
509504
bt.logging.trace(f"Streamed text: {chunk}")
510505

511506
# Store the query and response in the shared database
512-
async with self.lock:
513-
self.query_database.append({
514-
'uid': synapse.uid,
515-
'synapse': synapse,
516-
'response': (response_text, synapse.dendrite.process_time),
517-
'query_type': 'organic',
518-
'timestamp': asyncio.get_event_loop().time(),
519-
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
520-
metagraph=self.metagraph)
521-
})
522-
synapse.time_taken = self.dendrite.process_time
507+
self.query_database.append({
508+
'uid': synapse.uid,
509+
'synapse': synapse,
510+
'response': (response_text, synapse.dendrite.process_time),
511+
'query_type': 'organic',
512+
'timestamp': asyncio.get_event_loop().time(),
513+
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
514+
metagraph=self.metagraph)
515+
})
516+
synapse.time_taken = synapse.dendrite.process_time
523517

524518
await send({"type": "http.response.body", "body": b'', "more_body": False})
525519

@@ -605,9 +599,8 @@ async def process_queries_from_database(self):
605599

606600
bt.logging.info(f"start scoring process...")
607601

608-
async with self.lock:
609-
queries_to_process = self.query_database.copy()
610-
self.query_database.clear()
602+
queries_to_process = self.query_database.copy()
603+
self.query_database.clear()
611604

612605
self.synthetic_task_done = False
613606
bt.logging.info("start scoring process")
@@ -619,12 +612,11 @@ async def process_queries_from_database(self):
619612
resps = await asyncio.gather(*score_tasks)
620613
resps = [item for item in resps if item is not None]
621614
# Update total_scores and score_counts
622-
async with self.lock:
623-
for uid_scores_dict, _, _ in resps:
624-
for uid, score in uid_scores_dict.items():
625-
if self.total_scores.get(uid) is not None:
626-
self.total_scores[uid] += score
627-
self.score_counts[uid] += 1
615+
for uid_scores_dict, _, _ in resps:
616+
for uid, score in uid_scores_dict.items():
617+
if self.total_scores.get(uid) is not None:
618+
self.total_scores[uid] += score
619+
self.score_counts[uid] += 1
628620
bt.logging.info(
629621
f"current total score are {self.total_scores}. total time of scoring is {time.time() - start_time}")
630622
self.saving_datas = queries_to_process.copy()

0 commit comments

Comments
 (0)