diff --git a/cortext/dendrite.py b/cortext/dendrite.py index b06d0526..14abe123 100644 --- a/cortext/dendrite.py +++ b/cortext/dendrite.py @@ -1,8 +1,9 @@ +import asyncio from typing import Union, AsyncGenerator, Any import aiohttp import bittensor as bt -from aiohttp import ServerTimeoutError, ClientConnectorError +from aiohttp import ServerTimeoutError, ClientConnectorError, ClientConnectionError from bittensor import dendrite import traceback import time @@ -47,7 +48,7 @@ async def call_stream( # Preprocess synapse for making a request synapse: StreamPrompting = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore max_try = 0 - timeout = aiohttp.ClientTimeout(total=timeout, connect=10, sock_connect=10, sock_read=10) + timeout = aiohttp.ClientTimeout(total=100, connect=timeout, sock_connect=timeout, sock_read=timeout) connector = aiohttp.TCPConnector(limit=200) session = aiohttp.ClientSession(timeout=timeout, connector=connector) try: @@ -63,16 +64,18 @@ async def call_stream( yield chunk # Yield each chunk as it's processed except aiohttp.client_exceptions.ClientPayloadError: pass - except TimeoutError as err: - bt.logging.error(f"timeout error happens. max_try is {max_try}") - max_try += 1 - continue except ConnectionRefusedError as err: bt.logging.error(f"can not connect to miner for now. connection failed") - break + max_try += 1 + continue except ClientConnectorError as err: - bt.logging.error(f"can not connect to miner for now. connection failed") - break + bt.logging.error(f"can not connect to miner for now. retrying") + max_try += 1 + continue + except ClientConnectionError as err: + bt.logging.error(f"can not connect to miner for now. retrying") + max_try += 1 + continue except ServerTimeoutError as err: bt.logging.error(f"timeout error happens. max_try is {max_try}") max_try += 1 diff --git a/cortext/protocol.py b/cortext/protocol.py index cfb92798..559bcd3e 100644 --- a/cortext/protocol.py +++ b/cortext/protocol.py @@ -1,3 +1,4 @@ +import asyncio from typing import AsyncIterator, Dict, List, Optional, Union import bittensor as bt import pydantic @@ -369,11 +370,17 @@ def to_headers(self) -> dict: async def process_streaming_response(self, response: StreamingResponse, organic=True) -> AsyncIterator[str]: if self.completion is None: self.completion = "" - chunk_size = 100 if organic else 1024 - async for chunk in response.content.iter_chunked(chunk_size): - tokens = chunk.decode("utf-8") - self.completion += tokens - yield tokens + chunk_size = 100 if organic else 1000 + remain_chunk = "" + try: + async for chunk in response.content.iter_chunked(chunk_size): + tokens = chunk.decode("utf-8") + remain_chunk = tokens + self.completion += tokens + yield tokens + except asyncio.TimeoutError as err: + yield remain_chunk + def extract_response_json(self, response: StreamingResponse) -> dict: headers = { diff --git a/miner/services/base.py b/miner/services/base.py index 41b724f9..e3079182 100644 --- a/miner/services/base.py +++ b/miner/services/base.py @@ -57,8 +57,8 @@ def base_blacklist(self, synapse) -> Tuple[bool, str]: # check the stake stake = self.metagraph.S[self.metagraph.hotkeys.index(hotkey)] - if stake < self.blacklist_amt: - return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}" + # if stake < self.blacklist_amt: + # return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}" return False, f"accepting {synapse_type} request from {hotkey}" diff --git a/validators/services/validators/text_validator.py b/validators/services/validators/text_validator.py index 6c12983f..b6e6c9ca 100644 --- a/validators/services/validators/text_validator.py +++ b/validators/services/validators/text_validator.py @@ -177,7 +177,7 @@ async def call_api(self, conversation: List[Dict[str, Optional[str]]], query_syn else: bt.logging.error(f"provider {provider} not found") - @save_or_get_answer_from_cache + # @save_or_get_answer_from_cache async def get_answer_task(self, uid: int, query_syn: StreamPrompting, response): answer = await self.call_api(query_syn.messages, query_syn) return answer diff --git a/validators/utils.py b/validators/utils.py index 3ed0b35d..f1390040 100644 --- a/validators/utils.py +++ b/validators/utils.py @@ -191,7 +191,7 @@ async def fetch(session, url): try: return await response.json() except Exception as err: - bt.logging.error(f"{err} {traceback.format_exc()}") + pass # Asynchronous function to gather multiple HTTP requests async def gather_requests(urls): diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 033dbe3c..aa6260a5 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -190,17 +190,16 @@ async def handle_response(resp): bt.logging.trace(f"Streamed text: {chunk}") # Store the query and response in the shared database - async with self.lock: - self.query_database.append({ - 'uid': uid, - 'synapse': query_syn, - 'response': (response_text, query_syn.dendrite.process_time), - 'query_type': 'organic', - 'timestamp': asyncio.get_event_loop().time(), - 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, - metagraph=self.metagraph) - }) - query_syn.time_taken = query_syn.dendrite.process_time + self.query_database.append({ + 'uid': uid, + 'synapse': query_syn, + 'response': (response_text, query_syn.dendrite.process_time), + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, + metagraph=self.metagraph) + }) + query_syn.time_taken = query_syn.dendrite.process_time axon = self.metagraph.axons[uid] response = self.dendrite.call_stream( @@ -256,15 +255,14 @@ def is_cycle_end(self): async def perform_synthetic_queries_one_cycle(self): start_time = time.time() # don't process any organic query while processing synthetic queries. - async with self.lock: - synthetic_tasks = [] - # check available bandwidth and send synthetic requests to all miners. - query_synapses = await self.create_query_syns_for_remaining_bandwidth() - for query_syn in query_synapses: - uid = self.task_mgr.assign_task(query_syn) - if uid is None: - bt.logging.debug(f"No available uids for synthetic query process.") - synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False))) + synthetic_tasks = [] + # check available bandwidth and send synthetic requests to all miners. + query_synapses = await self.create_query_syns_for_remaining_bandwidth() + for query_syn in query_synapses: + uid = self.task_mgr.assign_task(query_syn) + if uid is None: + bt.logging.debug(f"No available uids for synthetic query process.") + synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False))) bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.") @@ -364,13 +362,12 @@ async def update_weights(self): avg_scores = {} # Compute average scores per UID - async with self.lock: - for uid in self.total_scores: - count = self.score_counts[uid] - if count > 0: - avg_scores[uid] = self.total_scores[uid] / count - else: - avg_scores[uid] = 0.0 + for uid in self.total_scores: + count = self.score_counts[uid] + if count > 0: + avg_scores[uid] = self.total_scores[uid] / count + else: + avg_scores[uid] = 0.0 bt.logging.info(f"Average scores = {avg_scores}") @@ -444,16 +441,15 @@ async def images(self, synapse: ImageResponse) -> ImageResponse: bt.logging.info(f"New synapse = {synapse_response}") # Store the query and response in the shared database - async with self.lock: - self.query_database.append({ - 'uid': synapse.uid, - 'synapse': synapse, - 'response': synapse_response, - 'query_type': 'organic', - 'timestamp': asyncio.get_event_loop().time(), - 'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config, - metagraph=self.metagraph) - }) + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': synapse_response, + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config, + metagraph=self.metagraph) + }) return synapse_response @@ -466,16 +462,15 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings: bt.logging.info(f"New synapse = {synapse_response}") # Store the query and response in the shared database - async with self.lock: - self.query_database.append({ - 'uid': synapse.uid, - 'synapse': synapse, - 'response': synapse_response, - 'query_type': 'organic', - 'timestamp': asyncio.get_event_loop().time(), - 'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config, - metagraph=self.metagraph) - }) + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': synapse_response, + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config, + metagraph=self.metagraph) + }) return synapse_response @@ -509,17 +504,16 @@ async def handle_response(resp): bt.logging.trace(f"Streamed text: {chunk}") # Store the query and response in the shared database - async with self.lock: - self.query_database.append({ - 'uid': synapse.uid, - 'synapse': synapse, - 'response': (response_text, synapse.dendrite.process_time), - 'query_type': 'organic', - 'timestamp': asyncio.get_event_loop().time(), - 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, - metagraph=self.metagraph) - }) - synapse.time_taken = self.dendrite.process_time + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': (response_text, synapse.dendrite.process_time), + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, + metagraph=self.metagraph) + }) + synapse.time_taken = synapse.dendrite.process_time await send({"type": "http.response.body", "body": b'', "more_body": False}) @@ -605,9 +599,8 @@ async def process_queries_from_database(self): bt.logging.info(f"start scoring process...") - async with self.lock: - queries_to_process = self.query_database.copy() - self.query_database.clear() + queries_to_process = self.query_database.copy() + self.query_database.clear() self.synthetic_task_done = False bt.logging.info("start scoring process") @@ -619,12 +612,11 @@ async def process_queries_from_database(self): resps = await asyncio.gather(*score_tasks) resps = [item for item in resps if item is not None] # Update total_scores and score_counts - async with self.lock: - for uid_scores_dict, _, _ in resps: - for uid, score in uid_scores_dict.items(): - if self.total_scores.get(uid) is not None: - self.total_scores[uid] += score - self.score_counts[uid] += 1 + for uid_scores_dict, _, _ in resps: + for uid, score in uid_scores_dict.items(): + if self.total_scores.get(uid) is not None: + self.total_scores[uid] += score + self.score_counts[uid] += 1 bt.logging.info( f"current total score are {self.total_scores}. total time of scoring is {time.time() - start_time}") self.saving_datas = queries_to_process.copy()