1
1
import asyncio
2
- import multiprocessing as mp
3
2
import sys
4
3
5
4
import loguru
6
5
import netaddr
7
6
import requests
8
7
import torch
8
+
9
+ # import multiprocessing as mp
10
+ import torch .multiprocessing as mp
9
11
import wandb
10
12
from bittensor .core .extrinsics .serving import serve_extrinsic
11
13
29
31
NEURON_SAMPLE_SIZE = 100 # TODO: Should add this to constants.py
30
32
31
33
32
- def create_loop_process (task_queue , scoring_queue , reward_events ):
34
+ def create_loop_process (task_queue , scoring_queue , reward_events , miners_dict ):
33
35
settings .shared_settings = settings .SharedSettings .load (mode = "validator" )
34
36
if settings .shared_settings .WANDB_ON :
35
37
init_wandb (neuron = "validator" )
36
38
37
- async def spawn_loops (task_queue , scoring_queue , reward_events ):
39
+ async def spawn_loops (task_queue , scoring_queue , reward_events , miners_dict ):
38
40
# ruff: noqa: E402
39
41
from prompting .llms .model_manager import model_scheduler
40
- from prompting .miner_availability .miner_availability import availability_checking_loop
42
+
43
+ # from prompting.miner_availability.miner_availability import availability_checking_loop
41
44
from prompting .tasks .task_creation import task_loop
42
- from prompting .tasks .task_sending import task_sender
43
- from prompting .weight_setting .weight_setter import weight_setter
44
45
from shared .profiling import profiler
45
46
46
47
logger .info ("Starting Profiler..." )
47
48
asyncio .create_task (profiler .print_stats (), name = "Profiler" ),
48
49
49
- # -------- Duplicate of create_task_loop ----------
50
- logger .info ("Starting AvailabilityCheckingLoop..." )
51
- asyncio .create_task (availability_checking_loop .start ())
52
-
53
- logger .info ("Starting TaskSender..." )
54
- asyncio .create_task (task_sender .start (task_queue , scoring_queue ))
55
-
56
50
logger .info ("Starting TaskLoop..." )
57
- asyncio .create_task (task_loop .start (task_queue , scoring_queue ))
58
- # -------------------------------------------------
51
+ asyncio .create_task (task_loop .start (task_queue , scoring_queue , miners_dict , simultaneous_loops = 4 ))
59
52
60
53
logger .info ("Starting ModelScheduler..." )
61
54
asyncio .create_task (model_scheduler .start (scoring_queue ), name = "ModelScheduler" ),
62
55
logger .info ("Starting TaskScorer..." )
63
- asyncio .create_task (task_scorer .start (scoring_queue , reward_events ), name = "TaskScorer" ),
64
- logger .info ("Starting WeightSetter..." )
65
- asyncio .create_task (weight_setter .start (reward_events ))
56
+ asyncio .create_task (task_scorer .start (scoring_queue , reward_events , simultaneous_loops = 4 ), name = "TaskScorer" ),
66
57
67
58
while True :
68
59
await asyncio .sleep (5 )
@@ -73,9 +64,9 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
73
64
logger .debug (f"Number of tasks in Reward Events: { len (reward_events )} " )
74
65
75
66
try :
76
- asyncio .run (spawn_loops (task_queue , scoring_queue , reward_events ))
67
+ asyncio .run (spawn_loops (task_queue , scoring_queue , reward_events , miners_dict ))
77
68
except Exception as e :
78
- logger .info (f"Terminating loop process: { e } " )
69
+ logger .exception (f"Terminating loop process: { e } " )
79
70
finally :
80
71
logger .info ("Cleaning up resources..." )
81
72
@@ -85,16 +76,10 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
85
76
logger .info ("WandB run finished." )
86
77
87
78
88
- def start_api (scoring_queue , reward_events ):
79
+ def start_api (scoring_queue , reward_events , miners_dict ):
89
80
async def start ():
90
81
from prompting .api .api import start_scoring_api # noqa: F401
91
82
92
- # TODO: We should not use 2 availability loops for each process, in reality
93
- # we should only be sharing the miner availability data between processes.
94
- from prompting .miner_availability .miner_availability import availability_checking_loop
95
-
96
- asyncio .create_task (availability_checking_loop .start ())
97
-
98
83
try :
99
84
external_ip = requests .get ("https://checkip.amazonaws.com" ).text .strip ()
100
85
netaddr .IPAddress (external_ip )
@@ -111,37 +96,121 @@ async def start():
111
96
logger .debug (f"Serve success: { serve_success } " )
112
97
except Exception as e :
113
98
logger .warning (f"Failed to serve scoring api to chain: { e } " )
114
- await start_scoring_api (task_scorer , scoring_queue , reward_events )
99
+ await start_scoring_api (task_scorer , scoring_queue , reward_events , miners_dict )
115
100
116
101
while True :
117
102
await asyncio .sleep (10 )
118
103
119
104
asyncio .run (start ())
120
105
121
106
107
+ def start_task_sending_loop (task_queue , scoring_queue , miners_dict : dict ):
108
+ async def spawn_loops (task_queue , scoring_queue , miners_dict : dict ):
109
+ from prompting .tasks .task_sending import task_sender
110
+
111
+ logger .info ("Starting task sending loop in validator2..." )
112
+ asyncio .create_task (task_sender .start (task_queue , scoring_queue , miners_dict , simultaneous_loops = 10 ))
113
+ while True :
114
+ await asyncio .sleep (5 )
115
+ logger .debug ("Task sending loop is running" )
116
+
117
+ try :
118
+ logger .info ("Starting task sending loop in validator..." )
119
+ asyncio .run (spawn_loops (task_queue , scoring_queue , miners_dict ))
120
+
121
+ except Exception as e :
122
+ logger .exception (f"Task sending loop error: { e } " )
123
+ raise
124
+
125
+
126
+ def start_availability_checking_loop (miners_dict : dict ):
127
+ async def spawn_loops (miners_dict : dict ):
128
+ from prompting .miner_availability .miner_availability import availability_checking_loop
129
+
130
+ logger .info ("Starting availability checking loop in validator2..." )
131
+ asyncio .create_task (availability_checking_loop .start (miners_dict ))
132
+ while True :
133
+ await asyncio .sleep (5 )
134
+ logger .debug ("Availability checking loop is running" )
135
+
136
+ try :
137
+ logger .info ("Starting availability checking loop in validator..." )
138
+ asyncio .run (spawn_loops (miners_dict ))
139
+
140
+ except Exception as e :
141
+ logger .exception (f"Availability checking loop error: { e } " )
142
+ raise
143
+
144
+
145
+ def start_weight_setter_loop (reward_events ):
146
+ async def spawn_loops (reward_events ):
147
+ from prompting .weight_setting .weight_setter import weight_setter
148
+
149
+ logger .info ("Starting weight setter loop in validator2..." )
150
+ asyncio .create_task (weight_setter .start (reward_events ))
151
+ while True :
152
+ await asyncio .sleep (5 )
153
+ logger .debug ("Weight setter loop is running" )
154
+
155
+ try :
156
+ logger .info ("Starting weight setter loop in validator..." )
157
+ asyncio .run (spawn_loops (reward_events ))
158
+
159
+ except Exception as e :
160
+ logger .exception (f"Weight setter loop error: { e } " )
161
+ raise
162
+
163
+
122
164
async def main ():
123
165
# will start checking the availability of miners at regular intervals, needed for API and Validator
124
166
with torch .multiprocessing .Manager () as manager :
125
167
reward_events = manager .list ()
126
168
scoring_queue = manager .list ()
127
169
task_queue = manager .list ()
128
-
129
- # Create process pool for managed processes
170
+ miners_dict = manager .dict ()
130
171
processes = []
131
172
132
173
try :
133
- # # Start checking the availability of miners at regular intervals
174
+ # Start checking the availability of miners at regular intervals
134
175
if settings .shared_settings .DEPLOY_SCORING_API :
135
176
# Use multiprocessing to bypass API blocking issue
136
- api_process = mp .Process (target = start_api , args = (scoring_queue , reward_events ), name = "API_Process" )
177
+ api_process = mp .Process (
178
+ target = start_api , args = (scoring_queue , reward_events , miners_dict ), name = "API_Process"
179
+ )
137
180
api_process .start ()
138
181
processes .append (api_process )
139
182
140
- loop_process = mp .Process (
141
- target = create_loop_process , args = (task_queue , scoring_queue , reward_events ), name = "LoopProcess"
183
+ availability_process = mp .Process (
184
+ target = start_availability_checking_loop ,
185
+ args = (miners_dict ,),
186
+ name = "AvailabilityProcess" ,
142
187
)
188
+ availability_process .start ()
189
+ processes .append (availability_process )
143
190
191
+ loop_process = mp .Process (
192
+ target = create_loop_process ,
193
+ args = (task_queue , scoring_queue , reward_events , miners_dict ),
194
+ name = "LoopProcess" ,
195
+ )
144
196
loop_process .start ()
197
+
198
+ task_sending_process = mp .Process (
199
+ target = start_task_sending_loop ,
200
+ args = (task_queue , scoring_queue , miners_dict ),
201
+ name = "TaskSendingProcess" ,
202
+ )
203
+ task_sending_process .start ()
204
+ processes .append (task_sending_process )
205
+
206
+ weight_setter_process = mp .Process (
207
+ target = start_weight_setter_loop ,
208
+ args = (reward_events ,),
209
+ name = "WeightSetterProcess" ,
210
+ )
211
+ weight_setter_process .start ()
212
+ processes .append (weight_setter_process )
213
+
145
214
processes .append (loop_process )
146
215
GPUInfo .log_gpu_info ()
147
216
0 commit comments