Skip to content

axon threads and exception handling [merge conflict fixed] #2459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: staging
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions bittensor/core/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import contextlib
import copy
import inspect
import socket
import threading
import time
import traceback
Expand All @@ -14,8 +15,8 @@
from inspect import signature, Signature, Parameter
from typing import Any, Awaitable, Callable, Optional, Tuple

from async_substrate_interface.utils import json
import uvicorn
from async_substrate_interface.utils import json
from bittensor_wallet import Wallet, Keypair
from fastapi import APIRouter, Depends, FastAPI
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -53,6 +54,12 @@
V_7_2_0 = 7002000


"""
The quantum of time to sleep in waiting loops, in seconds.
"""
TIME_SLEEP_INTERVAL: float = 1e-3


class FastAPIThreadedServer(uvicorn.Server):
"""
The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor
Expand Down Expand Up @@ -111,13 +118,61 @@ class FastAPIThreadedServer(uvicorn.Server):
should_exit: bool = False
is_running: bool = False

"""
Provide a channel to signal exceptions from the thread to our caller.
"""
_exception: Optional[Exception] = None
_lock: threading.Lock = threading.Lock()
_thread: Optional[threading.Thread] = None
_started: bool = False

def set_exception(self, exception: Exception) -> None:
"""
Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread.
"""
with self._lock:
self._exception = exception

def get_exception(self) -> Optional[Exception]:
with self._lock:
return self._exception

def set_thread(self, thread: threading.Thread):
"""
Set self._thread in a thread safe manner, so the main thread can get the worker thread object.
"""
with self._lock:
self._thread = thread

def get_thread(self) -> Optional[threading.Thread]:
with self._lock:
return self._thread

def set_started(self, started: bool) -> None:
"""
Set self._started in a thread safe manner, so the main thread can get the worker thread status.
"""
with self._lock:
self._started = started

def get_started(self) -> bool:
with self._lock:
return self._started

def install_signal_handlers(self):
"""
Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that
the signal handling in the threaded server does not interfere with the main application's flow, especially in a
complex asynchronous environment like the Axon server.
"""

async def startup(self, sockets: Optional[list[socket.socket]] = None) -> None:
"""
Adds a thread-safe call to set a 'started' flag on the object.
"""
await super().startup(sockets)
self.set_started(True)

@contextlib.contextmanager
def run_in_thread(self):
"""
Expand All @@ -126,14 +181,20 @@ def run_in_thread(self):
request handling in the Axon server.

Yields:
None: This method yields control back to the caller while the server is running in the background thread.
thread: a running thread

Raises:
Exception: in case the server did not start (as signalled by self.get_started())
"""
thread = threading.Thread(target=self.run, daemon=True)
thread.start()
try:
while not self.started:
time.sleep(1e-3)
yield
time_start = time.time()
while not self.get_started() and time.time() - time_start < 1:
time.sleep(TIME_SLEEP_INTERVAL)
if not self.get_started():
raise Exception("failed to start server")
yield thread
finally:
self.should_exit = True
thread.join()
Expand All @@ -143,9 +204,15 @@ def _wrapper_run(self):
A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start``
method to initiate the server's execution in a separate thread.
"""
with self.run_in_thread():
while not self.should_exit:
time.sleep(1e-3)
try:
with self.run_in_thread() as thread:
self.set_thread(thread)
while not self.should_exit:
if not thread.is_alive():
raise Exception("worker thread died")
time.sleep(TIME_SLEEP_INTERVAL)
except Exception as e:
self.set_exception(e)

def start(self):
"""
Expand Down Expand Up @@ -422,6 +489,26 @@ def info(self) -> "AxonInfo":
placeholder2=0,
)

@property
def exception(self) -> Optional[Exception]:
"""
Axon objects expose exceptions that occurred internally through the .exception property.
"""
# for future use: setting self._exception to signal an exception
exception = getattr(self, "_exception", None)
if exception:
return exception
return self.fast_server.get_exception()

def is_running(self) -> bool:
"""
Axon objects can be queried using .is_running() to test whether worker threads are running.
"""
thread = self.fast_server.get_thread()
if thread is None:
return False
return thread.is_alive()

def attach(
self,
forward_fn: Callable,
Expand Down