Skip to content

Commit 9efc4e1

Browse files
authored
Refactor workers (#62)
* Refactor workers * Refactor workers
1 parent 48ddd03 commit 9efc4e1

File tree

21 files changed

+564
-508
lines changed

21 files changed

+564
-508
lines changed

.vscode/launch.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"env": {},
1414
"args": [
1515
"-x",
16-
"tests/scheduler/test_scheduler.py::test_disabled_execution"
16+
"tests/scheduler/test_scheduler.py"
1717
],
1818
"debugOptions": [
1919
"RedirectOutput"
@@ -50,5 +50,16 @@
5050
"RedirectOutput"
5151
]
5252
},
53+
{
54+
"name": "AIO-FLUID simple_worker",
55+
"type": "python",
56+
"request": "launch",
57+
"program": "${workspaceFolder}/docs_src/simple_worker.py",
58+
"cwd": "${workspaceFolder}",
59+
"justMyCode": false,
60+
"debugOptions": [
61+
"RedirectOutput"
62+
]
63+
},
5364
]
5465
}

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pip install aio-fluid
3030
To install all the dependencies:
3131

3232
```
33-
pip install aio-fluid[cli, db, http, log]
33+
pip install aio-fluid[cli, db, http, log, k8s]
3434
```
3535
this includes the following extra dependencies:
3636

docs/reference/workers.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ There are several worker classes which can be imported from `fluid.utils.worker`
77
from fluid.utils.worker import StoppingWorker
88
```
99

10-
::: fluid.utils.worker.Worker
11-
12-
::: fluid.utils.worker.RunningWorker
10+
::: fluid.utils.worker.WorkerState
1311

14-
::: fluid.utils.worker.StoppingWorker
12+
::: fluid.utils.worker.Worker
1513

1614
::: fluid.utils.worker.WorkerFunction
1715

@@ -22,5 +20,3 @@ from fluid.utils.worker import StoppingWorker
2220
::: fluid.utils.worker.AsyncConsumer
2321

2422
::: fluid.utils.worker.Workers
25-
26-
::: fluid.utils.worker.DynamicWorkers

docs/tutorials/workers.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Workers
2+
3+
Workers are the main building block for asynchronous programming with `aio-fluid`. They are responsible for running tasks and managing their lifecycle.
4+
All workers implemented derive from the base abstract class [Worker][fluid.utils.worker.Worker] where the main method to implement is the [Worker.run][fluid.utils.worker.Worker.run] method.
5+
6+
## Worker Lifecycle
7+
8+
The lifecycle of a worker is managed by the [WorkerState][fluid.utils.worker.WorkerState] class which provides a set of states that a worker can be in. The worker starts in an inital state and than it can be started and stopped.
9+
10+
### Startup
11+
12+
To start a worker one uses the async method [Worker.startup][fluid.utils.worker.Worker.startup] which create the task running the worker. The task will transition the worker from [WorkerState.INIT][fluid.utils.worker.WorkerState.INIT] to the [WorkerState.RUNNING][fluid.utils.worker.WorkerState.RUNNING] state. The worker will then run the [Worker.on_startup][fluid.utils.worker.Worker.on_startup] coroutine method (which by default is a no-op) follow by the main worker coroutine method [Worker.run][fluid.utils.worker.Worker.run] method until it is stopped.
13+
14+
This is a very simple example of a worker that prints a message every second until it is stopped:
15+
16+
```python
17+
--8<-- "./docs_src/simple_worker.py"
18+
```
19+
20+
### Shutdown
21+
22+
To shut down a worker there are few possibilities.
23+
24+
* Direct call to the async [Worker.shutdown][fluid.utils.worker.Worker.shutdown] method which will trigger the graceful shutdown and wait for the worker to finish its work.
25+
* Call the [Worker.gracefully_stop][fluid.utils.worker.Worker.gracefully_stop] method which will trigger the graceful shutdown. Importantly, this method does not wait for the worker to finish its work, ti simply transition from the [WorkerState.RUNNING][fluid.utils.worker.WorkerState.RUNNING] to [WorkerState.STOPPING][fluid.utils.worker.WorkerState.STOPPING] state. To wait for the worker exit one should call the async [Worker.wait_for_shutdown][fluid.utils.worker.Worker.wait_for_shutdown] method (as in the example above)

docs_src/simple_worker.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import asyncio
2+
3+
from fluid.utils.worker import Worker
4+
5+
6+
class SimpleWorker(Worker):
7+
async def run(self):
8+
while self.is_running():
9+
self.print_message()
10+
await asyncio.sleep(1)
11+
12+
def print_message(self):
13+
print(f"Hello from {self.worker_name} in state {self.worker_state}")
14+
15+
16+
17+
async def main():
18+
worker = SimpleWorker()
19+
worker.print_message()
20+
await worker.startup()
21+
asyncio.get_event_loop().call_later(5, worker.gracefully_stop)
22+
await worker.wait_for_shutdown()
23+
worker.print_message()
24+
25+
26+
if __name__ == "__main__":
27+
asyncio.run(main())

examples/tasks/__init__.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import time
44
from dataclasses import dataclass, field
55
from datetime import timedelta
6-
from typing import Self, cast
6+
from typing import Any, Self, cast
77

88
from fastapi import FastAPI
99
from pydantic import BaseModel, Field
@@ -23,8 +23,10 @@ def get(cls, context: TaskRun) -> Self:
2323
return context.deps
2424

2525

26-
def task_scheduler() -> TaskScheduler:
27-
task_manager = TaskScheduler(deps=Deps())
26+
def task_scheduler(*, deps: Deps | None = None, **kwargs: Any) -> TaskScheduler:
27+
deps = deps or Deps()
28+
task_manager = TaskScheduler(deps=deps, **kwargs)
29+
task_manager.add_async_context_manager(deps.http_client)
2830
task_manager.register_from_dict(globals())
2931
return task_manager
3032

@@ -47,9 +49,10 @@ async def dummy(context: TaskRun[Sleep]) -> None:
4749

4850

4951
@task(schedule=every(timedelta(seconds=2)))
50-
async def scheduled(context: TaskRun) -> None:
51-
"""A simple scheduled task"""
52-
await asyncio.sleep(0.1)
52+
async def ping(context: TaskRun) -> None:
53+
"""A simple scheduled task that ping the broker"""
54+
redis_cli = cast(RedisTaskBroker, context.task_manager.broker).redis_cli
55+
await redis_cli.ping()
5356

5457

5558
class AddValues(BaseModel):

fluid/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Reusable server side python modules"""
22

3-
__version__ = "1.3.4"
3+
__version__ = "1.4.0"

fluid/scheduler/consumer.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,28 @@ def __init__(
8686
),
8787
] = TaskDispatcher()
8888
self.broker = TaskBroker.from_url(self.config.broker_url)
89+
self._async_contexts: list[Any] = []
8990
self._stack = AsyncExitStack()
9091

9192
async def __aenter__(self) -> Self:
93+
for cm in self._async_contexts:
94+
await self._stack.enter_async_context(cm)
9295
return self
9396

9497
async def __aexit__(self, *exc_info: Any) -> None:
9598
try:
96-
await self._stack.aclose()
99+
await self._stack.__aexit__(*exc_info)
97100
finally:
98-
await self.on_shutdown()
101+
await self.broker.close()
99102

100-
async def enter_async_context(self, cm: Any) -> Any:
101-
return await self._stack.enter_async_context(cm)
103+
async def on_startup(self) -> None:
104+
await self.__aenter__()
105+
106+
async def on_shutdown(self) -> None:
107+
await self.__aexit__(None, None, None)
108+
109+
def add_async_context_manager(self, cm: Any) -> None:
110+
self._async_contexts.append(cm)
102111

103112
@property
104113
def registry(self) -> TaskRegistry:
@@ -115,9 +124,6 @@ async def execute(self, task: Task | str, **params: Any) -> TaskRun:
115124
await task_run.execute()
116125
return task_run
117126

118-
async def on_shutdown(self) -> None:
119-
await self.broker.close()
120-
121127
def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
122128
return asyncio.run(self._execute_and_exit(task, **params))
123129

@@ -263,10 +269,9 @@ async def _queue_task(self) -> None:
263269
try:
264270
task = self._task_to_queue.pop()
265271
except IndexError:
266-
await asyncio.sleep(0.1)
272+
pass
267273
else:
268274
await self.queue(task)
269-
await asyncio.sleep(0)
270275

271276
async def _consume_tasks(self, worker_name: str) -> None:
272277
if not self.config.consume_tasks:

fluid/scheduler/endpoints.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,7 @@ def setup_fastapi(
152152
app.state.task_manager = task_manager
153153
if isinstance(task_manager, Worker):
154154
app_workers(app).add_workers(task_manager)
155+
else:
156+
app.add_event_handler("startup", task_manager.on_startup)
157+
app.add_event_handler("shutdown", task_manager.on_shutdown)
155158
return app

fluid/tools_fastapi/__init__.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,32 @@
11
from __future__ import annotations
22

3-
from fastapi import FastAPI
3+
from typing import Annotated
4+
5+
from fastapi import Depends, FastAPI, Request
46

57
from fluid.utils.worker import Workers
68

79
from .service import FastapiAppWorkers
810

911

12+
def get_workers_from_request(request: Request) -> Workers | None:
13+
"""Get workers from request."""
14+
return get_workers_from_app(request.app)
15+
16+
17+
def get_workers_from_app(app: FastAPI) -> Workers | None:
18+
"""Get workers from request."""
19+
return getattr(app.state, "workers", None)
20+
21+
1022
def app_workers(app: FastAPI) -> Workers:
1123
"""Get workers from app state or create new workers."""
12-
if workers := getattr(app.state, "workers", None):
24+
if workers := get_workers_from_app(app):
1325
return workers
1426
else:
1527
workers = FastapiAppWorkers.setup(app)
1628
app.state.workers = workers
1729
return workers
30+
31+
32+
WorkersDep = Annotated[Workers | None, Depends(get_workers_from_request)]

fluid/tools_fastapi/service.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from fluid import settings
88
from fluid.utils import log
9-
from fluid.utils.worker import Worker, Workers
9+
from fluid.utils.worker import Workers
1010

1111
logger = log.get_logger(__name__)
1212

@@ -21,13 +21,6 @@ def setup(cls, app: FastAPI, **kwargs: Any) -> Self:
2121
app.add_event_handler("shutdown", workers.shutdown)
2222
return workers
2323

24-
def bail_out(self, reason: str, code: int = 1) -> None:
24+
def after_shutdown(self, reason: str, code: int = 1) -> None:
2525
if settings.ENV != "test":
26-
logger.warning("shutting down due to %s", reason)
2726
os.kill(os.getpid(), signal.SIGTERM)
28-
29-
def get_active_worker(self, *, worker_name: str) -> Worker | None:
30-
worker = self._workers.get_worker_by_name(worker_name)
31-
if worker and not worker.is_stopping():
32-
return worker
33-
return None

fluid/utils/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ def __init__(
2222

2323
def __str__(self) -> str:
2424
return self.msg
25+
26+
27+
class WorkerStartError(FluidError):
28+
"""Worker start error"""

0 commit comments

Comments
 (0)