Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synapse dendrite timeout issue fixed #104

Merged
merged 8 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions cortext/dendrite.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
from typing import Union, AsyncGenerator, Any

import aiohttp
import bittensor as bt
from aiohttp import ServerTimeoutError, ClientConnectorError
from aiohttp import ServerTimeoutError, ClientConnectorError, ClientConnectionError
from bittensor import dendrite
import traceback
import time
Expand Down Expand Up @@ -47,7 +48,7 @@ async def call_stream(
# Preprocess synapse for making a request
synapse: StreamPrompting = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore
max_try = 0
timeout = aiohttp.ClientTimeout(total=timeout, connect=10, sock_connect=10, sock_read=10)
timeout = aiohttp.ClientTimeout(total=100, connect=timeout, sock_connect=timeout, sock_read=timeout)
connector = aiohttp.TCPConnector(limit=200)
session = aiohttp.ClientSession(timeout=timeout, connector=connector)
try:
Expand All @@ -63,16 +64,18 @@ async def call_stream(
yield chunk # Yield each chunk as it's processed
except aiohttp.client_exceptions.ClientPayloadError:
pass
except TimeoutError as err:
bt.logging.error(f"timeout error happens. max_try is {max_try}")
max_try += 1
continue
except ConnectionRefusedError as err:
bt.logging.error(f"can not connect to miner for now. connection failed")
break
max_try += 1
continue
except ClientConnectorError as err:
bt.logging.error(f"can not connect to miner for now. connection failed")
break
bt.logging.error(f"can not connect to miner for now. retrying")
max_try += 1
continue
except ClientConnectionError as err:
bt.logging.error(f"can not connect to miner for now. retrying")
max_try += 1
continue
except ServerTimeoutError as err:
bt.logging.error(f"timeout error happens. max_try is {max_try}")
max_try += 1
Expand Down
17 changes: 12 additions & 5 deletions cortext/protocol.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import AsyncIterator, Dict, List, Optional, Union
import bittensor as bt
import pydantic
Expand Down Expand Up @@ -369,11 +370,17 @@ def to_headers(self) -> dict:
async def process_streaming_response(self, response: StreamingResponse, organic=True) -> AsyncIterator[str]:
if self.completion is None:
self.completion = ""
chunk_size = 100 if organic else 1024
async for chunk in response.content.iter_chunked(chunk_size):
tokens = chunk.decode("utf-8")
self.completion += tokens
yield tokens
chunk_size = 100 if organic else 1000
remain_chunk = ""
try:
async for chunk in response.content.iter_chunked(chunk_size):
tokens = chunk.decode("utf-8")
remain_chunk = tokens
self.completion += tokens
yield tokens
except asyncio.TimeoutError as err:
yield remain_chunk


def extract_response_json(self, response: StreamingResponse) -> dict:
headers = {
Expand Down
4 changes: 2 additions & 2 deletions miner/services/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def base_blacklist(self, synapse) -> Tuple[bool, str]:

# check the stake
stake = self.metagraph.S[self.metagraph.hotkeys.index(hotkey)]
if stake < self.blacklist_amt:
return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}"
# if stake < self.blacklist_amt:
# return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}"

return False, f"accepting {synapse_type} request from {hotkey}"

Expand Down
2 changes: 1 addition & 1 deletion validators/services/validators/text_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def call_api(self, conversation: List[Dict[str, Optional[str]]], query_syn
else:
bt.logging.error(f"provider {provider} not found")

@save_or_get_answer_from_cache
# @save_or_get_answer_from_cache
async def get_answer_task(self, uid: int, query_syn: StreamPrompting, response):
answer = await self.call_api(query_syn.messages, query_syn)
return answer
Expand Down
2 changes: 1 addition & 1 deletion validators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def fetch(session, url):
try:
return await response.json()
except Exception as err:
bt.logging.error(f"{err} {traceback.format_exc()}")
pass

# Asynchronous function to gather multiple HTTP requests
async def gather_requests(urls):
Expand Down
126 changes: 59 additions & 67 deletions validators/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,16 @@ async def handle_response(resp):
bt.logging.trace(f"Streamed text: {chunk}")

# Store the query and response in the shared database
async with self.lock:
self.query_database.append({
'uid': uid,
'synapse': query_syn,
'response': (response_text, query_syn.dendrite.process_time),
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
metagraph=self.metagraph)
})
query_syn.time_taken = query_syn.dendrite.process_time
self.query_database.append({
'uid': uid,
'synapse': query_syn,
'response': (response_text, query_syn.dendrite.process_time),
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
metagraph=self.metagraph)
})
query_syn.time_taken = query_syn.dendrite.process_time

axon = self.metagraph.axons[uid]
response = self.dendrite.call_stream(
Expand Down Expand Up @@ -256,15 +255,14 @@ def is_cycle_end(self):
async def perform_synthetic_queries_one_cycle(self):
start_time = time.time()
# don't process any organic query while processing synthetic queries.
async with self.lock:
synthetic_tasks = []
# check available bandwidth and send synthetic requests to all miners.
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
for query_syn in query_synapses:
uid = self.task_mgr.assign_task(query_syn)
if uid is None:
bt.logging.debug(f"No available uids for synthetic query process.")
synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False)))
synthetic_tasks = []
# check available bandwidth and send synthetic requests to all miners.
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
for query_syn in query_synapses:
uid = self.task_mgr.assign_task(query_syn)
if uid is None:
bt.logging.debug(f"No available uids for synthetic query process.")
synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False)))

bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.")

Expand Down Expand Up @@ -364,13 +362,12 @@ async def update_weights(self):
avg_scores = {}

# Compute average scores per UID
async with self.lock:
for uid in self.total_scores:
count = self.score_counts[uid]
if count > 0:
avg_scores[uid] = self.total_scores[uid] / count
else:
avg_scores[uid] = 0.0
for uid in self.total_scores:
count = self.score_counts[uid]
if count > 0:
avg_scores[uid] = self.total_scores[uid] / count
else:
avg_scores[uid] = 0.0

bt.logging.info(f"Average scores = {avg_scores}")

Expand Down Expand Up @@ -444,16 +441,15 @@ async def images(self, synapse: ImageResponse) -> ImageResponse:

bt.logging.info(f"New synapse = {synapse_response}")
# Store the query and response in the shared database
async with self.lock:
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': synapse_response,
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,
metagraph=self.metagraph)
})
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': synapse_response,
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,
metagraph=self.metagraph)
})

return synapse_response

Expand All @@ -466,16 +462,15 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings:

bt.logging.info(f"New synapse = {synapse_response}")
# Store the query and response in the shared database
async with self.lock:
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': synapse_response,
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config,
metagraph=self.metagraph)
})
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': synapse_response,
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config,
metagraph=self.metagraph)
})

return synapse_response

Expand Down Expand Up @@ -509,17 +504,16 @@ async def handle_response(resp):
bt.logging.trace(f"Streamed text: {chunk}")

# Store the query and response in the shared database
async with self.lock:
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': (response_text, synapse.dendrite.process_time),
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
metagraph=self.metagraph)
})
synapse.time_taken = self.dendrite.process_time
self.query_database.append({
'uid': synapse.uid,
'synapse': synapse,
'response': (response_text, synapse.dendrite.process_time),
'query_type': 'organic',
'timestamp': asyncio.get_event_loop().time(),
'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config,
metagraph=self.metagraph)
})
synapse.time_taken = synapse.dendrite.process_time

await send({"type": "http.response.body", "body": b'', "more_body": False})

Expand Down Expand Up @@ -605,9 +599,8 @@ async def process_queries_from_database(self):

bt.logging.info(f"start scoring process...")

async with self.lock:
queries_to_process = self.query_database.copy()
self.query_database.clear()
queries_to_process = self.query_database.copy()
self.query_database.clear()

self.synthetic_task_done = False
bt.logging.info("start scoring process")
Expand All @@ -619,12 +612,11 @@ async def process_queries_from_database(self):
resps = await asyncio.gather(*score_tasks)
resps = [item for item in resps if item is not None]
# Update total_scores and score_counts
async with self.lock:
for uid_scores_dict, _, _ in resps:
for uid, score in uid_scores_dict.items():
if self.total_scores.get(uid) is not None:
self.total_scores[uid] += score
self.score_counts[uid] += 1
for uid_scores_dict, _, _ in resps:
for uid, score in uid_scores_dict.items():
if self.total_scores.get(uid) is not None:
self.total_scores[uid] += score
self.score_counts[uid] += 1
bt.logging.info(
f"current total score are {self.total_scores}. total time of scoring is {time.time() - start_time}")
self.saving_datas = queries_to_process.copy()
Expand Down
Loading