Skip to content

Commit bf95417

Browse files
author
µ
committed
bittensor/axon.py: thread and exception handling
Various issues were encountered trying to run and understand e2e tests: - if uvicorn fails to start, an uncaught exception is emitted to stderr - axon keeps spinning waiting for self.started, indefinitely - exceptions are not propagated from threads - there is no way to (simply) test from the outside whether an axon started and/or runs - axon creates a thread that only creates another thread, which seems redundant This patch addresses some of these issues, in FastAPIThreadedServer: - add thread safe set/get_exception() to set/get exceptions - run_in_thread() yields the created thread, so that the code using it can check whether the thread is alive - uvicorn.Server.startup() is wrapped to set a thread-safe flag using self.set_started(True) to indicate startup succeeded - run_in_thread() times out after one second to prevent infinite loop in case self.get_started() never becomes True - run_in_thread() raises an exception if it fails to start the thread - _wrapper_run() tests whether the thread is still alive and in class axon, the following are added: - @Property axon.exception(), returning any exception - axon.is_running(), returning True when the axon is operational The seemingly redundant thread is left in until feedback is received on the reasons for including it.
1 parent a28963d commit bf95417

File tree

1 file changed

+77
-6
lines changed

1 file changed

+77
-6
lines changed

bittensor/axon.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import inspect
2727
import json
2828
import os
29+
import socket
2930
import threading
3031
import time
3132
import traceback
@@ -100,26 +101,72 @@ class FastAPIThreadedServer(uvicorn.Server):
100101
should_exit: bool = False
101102
is_running: bool = False
102103

104+
"""
105+
Provide a channel to signal exceptions from the thread to our caller.
106+
"""
107+
_exception: Exception = None
108+
_lock: threading.Lock = threading.Lock()
109+
_thread: threading.Thread = None
110+
_started: bool = False
111+
112+
def set_exception(self, ex):
113+
with self._lock:
114+
self._exception = ex
115+
116+
def get_exception(self):
117+
with self._lock:
118+
return self._exception
119+
120+
def set_thread(self, thread):
121+
with self._lock:
122+
self._thread = thread
123+
124+
def get_thread(self):
125+
with self._lock:
126+
return self._thread
127+
128+
def set_started(self, started):
129+
with self._lock:
130+
self._started = started
131+
132+
def get_started(self):
133+
with self._lock:
134+
return self._started
135+
103136
def install_signal_handlers(self):
104137
"""
105138
Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server.
106139
"""
107140
pass
108141

142+
async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
143+
"""
144+
Adds a thread-safe call to set a 'started' flag on the object.
145+
"""
146+
ret = await super().startup(sockets)
147+
self.set_started(True)
148+
return ret
149+
109150
@contextlib.contextmanager
110151
def run_in_thread(self):
111152
"""
112153
Manages the execution of the server in a separate thread, allowing the FastAPI application to run asynchronously without blocking the main thread of the Axon server. This method is a key component in enabling concurrent request handling in the Axon server.
113154
114155
Yields:
115-
None: This method yields control back to the caller while the server is running in the background thread.
156+
thread: a running thread
157+
158+
Raises:
159+
Exception: in case the server did not start (as signalled by self.get_started())
116160
"""
117161
thread = threading.Thread(target=self.run, daemon=True)
118162
thread.start()
119163
try:
120-
while not self.started:
164+
t0 = time.time()
165+
while not self.get_started() and time.time()-t0<1:
121166
time.sleep(1e-3)
122-
yield
167+
if not self.get_started():
168+
raise Exception("failed to start server")
169+
yield thread
123170
finally:
124171
self.should_exit = True
125172
thread.join()
@@ -128,9 +175,15 @@ def _wrapper_run(self):
128175
"""
129176
A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start`` method to initiate the server's execution in a separate thread.
130177
"""
131-
with self.run_in_thread():
132-
while not self.should_exit:
133-
time.sleep(1e-3)
178+
try:
179+
with self.run_in_thread() as thread:
180+
self.set_thread(thread)
181+
while not self.should_exit:
182+
if not thread.is_alive():
183+
raise Exception("worker thread died")
184+
time.sleep(1e-3)
185+
except Exception as e:
186+
self.set_exception(e)
134187

135188
def start(self):
136189
"""
@@ -405,6 +458,24 @@ def info(self) -> "bittensor.AxonInfo":
405458
placeholder2=0,
406459
)
407460

461+
# Our instantiator should be able to test axon.exception to see if any
462+
# exception occurred.
463+
@property
464+
def exception(self):
465+
# for future use: setting self._exception to signal an exception
466+
e = getattr(self,'_exception',None)
467+
if e:
468+
return e
469+
return self.fast_server.get_exception()
470+
471+
# Our instantiator should be able to test axon.is_running() to see if all
472+
# required threads etc are running.
473+
def is_running(self):
474+
t = self.fast_server.get_thread()
475+
if t is None:
476+
return False
477+
return t.is_alive()
478+
408479
def attach(
409480
self,
410481
forward_fn: Callable,

0 commit comments

Comments
 (0)