Skip to content

Commit 54550a1

Browse files
authored
Merge pull request #97 from Datura-ai/hotfix-main-bittensor
Hotfix main bittensor
2 parents b26762f + bb924f2 commit 54550a1

16 files changed

+233
-12
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,4 @@ validators/.ipynb_checkpoints/
1616
**/Cortex.t.egg-info
1717
**/test.ipynb
1818
.env
19-
**/server/**/*.py
2019
cache.db

cortext/dendrite.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ async def call_stream(
2626
synapse: bt.StreamingSynapse = bt.Synapse(), # type: ignore
2727
timeout: float = 12.0,
2828
deserialize: bool = True,
29+
organic: bool = True
2930
) -> AsyncGenerator[Any, Any]:
3031
start_time = time.time()
3132
target_axon = (
@@ -61,7 +62,7 @@ async def call_stream(
6162
) as response:
6263
# Use synapse subclass' process_streaming_response method to yield the response chunks
6364
try:
64-
async for chunk in synapse.process_streaming_response(response): # type: ignore
65+
async for chunk in synapse.process_streaming_response(response, organic): # type: ignore
6566
yield chunk # Yield each chunk as it's processed
6667
except aiohttp.client_exceptions.ClientPayloadError:
6768
pass

cortext/protocol.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,11 @@ def to_headers(self) -> dict:
366366

367367
return headers
368368

369-
async def process_streaming_response(self, response: StreamingResponse) -> AsyncIterator[str]:
369+
async def process_streaming_response(self, response: StreamingResponse, organic=True) -> AsyncIterator[str]:
370370
if self.completion is None:
371371
self.completion = ""
372-
async for chunk in response.content.iter_any():
372+
chunk_size = 100 if organic else 1024
373+
async for chunk in response.content.iter_chunked(chunk_size):
373374
tokens = chunk.decode("utf-8")
374375
self.completion += tokens
375376
yield tokens

server/Dockerfile

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Use official Python image
2+
FROM python:3.10
3+
4+
# Set working directory
5+
WORKDIR /app
6+
7+
# Copy and install dependencies
8+
COPY requirements.txt .
9+
RUN pip install --no-cache-dir -r requirements.txt
10+
11+
# Copy the app files to the container
12+
COPY . .
13+
14+
# Expose the FastAPI port
15+
EXPOSE 8000
16+
17+
# Start FastAPI app
18+
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

server/app/__init__.py

Whitespace-only changes.

server/app/curd.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import os
2+
from typing import List
3+
from . import models, schemas
4+
from .database import cur, TABEL_NAME, conn
5+
from fastapi import HTTPException
6+
7+
8+
def create_item(item: schemas.ItemCreate):
9+
query = f"INSERT INTO {TABEL_NAME} (p_key, question, answer, provider, model, timestamp) VALUES (%s, %s, %s, %s, %s, %s)"
10+
cur.execute(query, item.p_key, item.question, item.answer, item.provider, item.model, item.timestamp)
11+
conn.commit() # Save changes to the database
12+
return item
13+
14+
15+
def create_items(items: List[schemas.ItemCreate]):
16+
query = f"INSERT INTO {TABEL_NAME} (p_key, question, answer, provider, model, timestamp) VALUES (%s, %s, %s, %s, %s, %s)"
17+
datas = []
18+
for item in items:
19+
datas.append((item.p_key, item.question, item.answer, item.provider, item.model, item.timestamp))
20+
try:
21+
cur.executemany(query, datas)
22+
conn.commit() # Save changes to the database
23+
except Exception as err:
24+
raise HTTPException(status_code=500, detail=f"Internal Server Error {err}")
25+
26+
27+
def get_items(skip: int = 0, limit: int = 10):
28+
query = f"SELECT * FROM {TABEL_NAME} LIMIT {limit} OFFSET {skip};"
29+
cur.execute(query)
30+
items = cur.fetchall() # Fetch all results
31+
return [dict(item) for item in items]
32+
33+
34+
def get_item(p_key: int):
35+
query = f"SELECT * FROM {TABEL_NAME} WHERE p_key = %s"
36+
cur.execute(query, (p_key,))
37+
item = cur.fetchone() # Fetch one result
38+
return dict(item)

server/app/database.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import psycopg2
2+
import os
3+
from contextlib import asynccontextmanager
4+
5+
DATABASE_URL = os.getenv("DATABASE_URL")
6+
TABEL_NAME = 'query_resp_data'
7+
# PostgreSQL connection parameters
8+
conn = psycopg2.connect(DATABASE_URL)
9+
10+
# Create a cursor object to interact with the database
11+
cur = conn.cursor()
12+
13+
14+
async def create_table(app):
15+
global conn, cur, TABEL_NAME
16+
try:
17+
# Connect to the PostgreSQL database
18+
conn = psycopg2.connect(DATABASE_URL)
19+
cur = conn.cursor()
20+
21+
# SQL command to create a table
22+
create_table_query = f"""
23+
CREATE TABLE IF NOT EXISTS {TABEL_NAME} (
24+
p_key VARCHAR(100) PRIMARY KEY,
25+
question JSON,
26+
answer TEXT,
27+
provider VARCHAR(100),
28+
model VARCHAR(100),
29+
timestamp FLOAT
30+
);
31+
"""
32+
33+
# Execute the SQL command
34+
cur.execute(create_table_query)
35+
conn.commit() # Save changes
36+
create_index_query = f"""
37+
CREATE INDEX IF NOT EXISTS question_answer_index ON {TABEL_NAME} (provider, model);
38+
"""
39+
cur.execute(create_index_query)
40+
conn.commit()
41+
print("Table created successfully!")
42+
43+
except Exception as e:
44+
print(f"Error creating table: {e}")
45+
46+
finally:
47+
# Close the cursor and connection
48+
if cur:
49+
cur.close()
50+
if conn:
51+
conn.close()
52+
53+
54+
create_table(None)

server/app/main.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from contextlib import asynccontextmanager
2+
from fastapi import FastAPI, Depends, HTTPException
3+
from . import curd, models, schemas
4+
from .database import create_table
5+
from typing import List
6+
7+
8+
@asynccontextmanager
9+
async def lifespan(app: FastAPI):
10+
# Load the ML model
11+
await create_table(None)
12+
yield
13+
14+
15+
app = FastAPI(lifespan=lifespan)
16+
17+
18+
# Create an item
19+
@app.post("/items")
20+
def create_item(items: List[schemas.ItemCreate]):
21+
return curd.create_items(items=items)
22+
23+
24+
# Read all items
25+
@app.get("/items", response_model=list)
26+
def read_items(skip: int = 0, limit: int = 10):
27+
items = curd.get_items(skip=skip, limit=limit)
28+
return items
29+
30+
31+
# Read a single item by ID
32+
@app.get("/items/{p_key}", response_model=schemas.Item)
33+
def read_item(p_key: int):
34+
db_item = curd.get_item(p_key=p_key)
35+
if db_item is None:
36+
raise HTTPException(status_code=404, detail="Item not found")
37+
return db_item

server/app/models.py

Whitespace-only changes.

server/app/schemas.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pydantic import BaseModel, Json
2+
3+
4+
class ItemBase(BaseModel):
5+
p_key: str
6+
question: Json
7+
answer: str
8+
provider: str
9+
model: str
10+
timestamp: float
11+
12+
13+
class ItemCreate(ItemBase):
14+
pass
15+
16+
17+
class Item(ItemBase):
18+
pass

server/app/utils.py

Whitespace-only changes.

server/docker-compose.yml

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
version: "3.9"
2+
3+
services:
4+
db:
5+
image: postgres:13
6+
restart: always
7+
environment:
8+
POSTGRES_USER: ${POSTGRES_USER}
9+
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
10+
POSTGRES_DB: ${POSTGRES_DB}
11+
volumes:
12+
- postgres_data_score:/var/lib/postgresql/data
13+
ports:
14+
- "5432:5432"
15+
16+
web:
17+
build: .
18+
restart: always
19+
ports:
20+
- "8000:8000"
21+
depends_on:
22+
- db
23+
environment:
24+
DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
25+
POSTGRES_DB: ${POSTGRES_DB}
26+
volumes:
27+
- .:/app
28+
29+
volumes:
30+
postgres_data_score:

server/requirements.txt

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
fastapi
2+
uvicorn[standard]
3+
psycopg2-binary
4+
sqlalchemy
5+
pydantic

validators/services/cache.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import sqlite3
23
import time
34
import hashlib
@@ -54,7 +55,7 @@ def set_cache_in_batch(self, syns: List[StreamPrompting], ttl=3600 * 24, block_n
5455
datas = []
5556
last_update_time = time.time()
5657
for syn in syns:
57-
p_key = self.generate_hash(str(time.monotonic_ns()) + str(syn.json()))
58+
p_key = self.generate_hash(str(time.monotonic_ns()) + str(syn.json()) + str(random.random()))
5859
syn.time_taken = syn.dendrite.process_time or 0
5960
syn.validator_info = {"vali_uid": self.vali_uid, "vali_hotkey": self.vali_hotkey}
6061
syn.miner_info = {"miner_id": syn.uid, "miner_hotkey": syn.axon.hotkey}

validators/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def setup_max_capacity(item):
174174
if isinstance(value, dict): # If the value is another dictionary, recurse
175175
setup_max_capacity(value)
176176
elif isinstance(value, (int, float)): # If the value is a number, increment by 5
177-
item[key] = min(value, 30)
177+
item[key] = min(value, 10)
178178

179179

180180
def get_bandwidth(data, uid, provider, model):

validators/weight_setter.py

+25-6
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
import asyncio
22
import concurrent
33
import random
4-
import traceback
54
import threading
65

76
import torch
87
import time
8+
import requests
99

1010
from black.trans import defaultdict
1111
from substrateinterface import SubstrateInterface
1212
from functools import partial
13-
from typing import Tuple, List
13+
from typing import Tuple
1414
import bittensor as bt
1515
from bittensor import StreamingSynapse
1616
import cortext
17-
17+
import json
1818
from starlette.types import Send
1919

2020
from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings
@@ -93,6 +93,7 @@ def __init__(self, config, cache: QueryResponseCache, loop=None):
9393
self.loop.create_task(self.process_queries_from_database())
9494

9595
self.saving_datas = []
96+
self.url = None
9697
daemon_thread = threading.Thread(target=self.saving_resp_answers_from_miners)
9798
daemon_thread.start()
9899

@@ -104,11 +105,28 @@ def saving_resp_answers_from_miners(self):
104105
time.sleep(1)
105106
else:
106107
bt.logging.info(f"saving responses...")
108+
start_time = time.time()
107109
self.cache.set_cache_in_batch([item.get('synapse') for item in self.saving_datas],
108110
block_num=self.current_block,
109111
cycle_num=self.current_block // 36, epoch_num=self.current_block // 360)
110112
bt.logging.info(f"total saved responses is {len(self.saving_datas)}")
111113
self.saving_datas.clear()
114+
if not self.url:
115+
return
116+
bt.logging.info("sending datas to central server.")
117+
json_data = [item.get('synapse').dict() for item in self.saving_datas]
118+
headers = {
119+
'Content-Type': 'application/json' # Specify that we're sending JSON
120+
}
121+
response = requests.post(self.url, data=json.dumps(json_data), headers=headers)
122+
# Check the response
123+
if response.status_code == 200:
124+
bt.logging.info(
125+
f"Successfully sent data to central server. {time.time() - start_time} sec total elapsed for sending to central server.")
126+
else:
127+
bt.logging.info(
128+
f"Failed to send data. Status code: {response.status_code} {time.time() - start_time} sec total elapsed for sending to central server.")
129+
bt.logging.info(f"Response:{response.text}")
112130

113131
async def run_sync_in_async(self, fn):
114132
return await self.loop.run_in_executor(None, fn)
@@ -162,7 +180,7 @@ async def update_and_refresh(self):
162180
await self.initialize_uids_and_capacities()
163181
bt.logging.info("Metagraph refreshed.")
164182

165-
async def query_miner(self, uid, query_syn: cortext.ALL_SYNAPSE_TYPE):
183+
async def query_miner(self, uid, query_syn: cortext.ALL_SYNAPSE_TYPE, organic=True):
166184
query_syn.uid = uid
167185
if query_syn.streaming:
168186
if uid is None:
@@ -195,6 +213,7 @@ async def handle_response(resp):
195213
target_axon=axon,
196214
synapse=query_syn,
197215
timeout=query_syn.timeout,
216+
organic=organic
198217
)
199218
await handle_response(response)
200219
else:
@@ -256,7 +275,7 @@ async def perform_synthetic_queries(self):
256275
uid = self.task_mgr.assign_task(query_syn)
257276
if uid is None:
258277
bt.logging.debug(f"No available uids for synthetic query process.")
259-
synthetic_tasks.append((uid, self.query_miner(uid, query_syn)))
278+
synthetic_tasks.append((uid, self.query_miner(uid, query_syn, organic=False)))
260279

261280
bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.")
262281

@@ -283,7 +302,7 @@ async def perform_synthetic_queries(self):
283302
f"synthetic queries and answers has been processed in cache successfully. total times {time.time() - start_time}")
284303

285304
def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks):
286-
batch_size = 10000
305+
batch_size = 3000
287306
max_query_cnt_per_miner = 50
288307
batch_tasks = []
289308
remain_tasks = []

0 commit comments

Comments
 (0)