Skip to content

Commit ca8200a

Browse files
authored
more coverage (#63)
* more coverage * Add stack sampler to tests
1 parent 9efc4e1 commit ca8200a

File tree

10 files changed

+74
-60
lines changed

10 files changed

+74
-60
lines changed

docs/reference/workers.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# Workers
22

3-
Workers are the main building block for asynchronous programming with `aio-fluid`. They are responsible for running tasks and managing their lifecycle.
4-
There are several worker classes which can be imported from `fluid.utils.worker`:
3+
Workers are the main building block for asynchronous programming with `aio-fluid`. They are responsible for running asynchronous tasks and managing their lifecycle.
4+
There are several worker classes which can be imported from `fluid.utils.worker`, and they aall derive from the abstract `fluid.utils.worker.Worker` class.
55

66
```python
7-
from fluid.utils.worker import StoppingWorker
7+
from fluid.utils.worker import Worker
88
```
99

1010
::: fluid.utils.worker.WorkerState

fluid/utils/data.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
DEFAULT_SKIP_VALUES = frozenset((None,))
44

55

6-
def reversed_dict(d: dict) -> dict: # type: ignore [type-arg]
7-
return {v: k for k, v in d.items()}
8-
9-
106
def compact_dict(
117
*args: Iterable[Any],
128
skip_values: set[Any] | frozenset[Any] = DEFAULT_SKIP_VALUES,

fluid/utils/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ def __str__(self) -> str:
2626

2727
class WorkerStartError(FluidError):
2828
"""Worker start error"""
29+
30+
31+
class FlamegraphError(FluidError):
32+
pass

fluid/utils/executor.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

fluid/utils/stacksampler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Sampler:
2828
"""
2929

3030
interval: float = 0.005
31-
_started = None
31+
_started: float | None = None
3232
_stack_counts: defaultdict[str, int] = field(
3333
default_factory=lambda: defaultdict(int)
3434
)

fluid/utils/text.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import os
32
import uuid
43
from typing import Any
54

@@ -35,28 +34,14 @@ def as_uuid(uid: Any) -> str | None:
3534
return None
3635

3736

38-
def nice_env_str(space: int = 4, trim_length: int = 100) -> str:
39-
lt = max(len(k) for k in os.environ) + space
40-
values = []
41-
for key, value in os.environ.items():
42-
if len(value) > trim_length + 3:
43-
value = f"{value[:trim_length]}..."
44-
k = f"{key}:".ljust(lt)
45-
values.append(f"{k}{value}")
46-
return "\n".join(values)
47-
48-
4937
def nice_json(data: Any) -> str:
5038
if not isinstance(data, str):
5139
return json.dumps(data, indent=4)
5240
return data
5341

5442

5543
def trim_docstring(docstring: str) -> str:
56-
"""Uniformly trims leading/trailing whitespace from docstrings.
57-
Based on
58-
http://www.python.org/peps/pep-0257.html#handling-docstring-indentation
59-
"""
44+
"""Uniformly trims leading/trailing whitespace from doc-strings"""
6045
if not docstring or not docstring.strip():
6146
return ""
6247
# Convert tabs to spaces and split into lines

fluid/utils/worker.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,7 @@ async def shutdown(self) -> None:
7070
async def wait_for_shutdown(self) -> None:
7171
if self.task is None:
7272
return
73-
try:
74-
await self.task
75-
except asyncio.CancelledError:
76-
if self.force_shutdown:
77-
# we are shutting down, this is expected
78-
pass
79-
else:
80-
raise
73+
await self.task
8174

8275
async def gracefully_shutdown(self) -> None:
8376
self.started_shutdown = True
@@ -193,9 +186,6 @@ def is_stopping(self) -> bool:
193186
def is_stopped(self) -> bool:
194187
return self._worker_state in (WorkerState.STOPPED, WorkerState.FORCE_STOPPED)
195188

196-
def is_task(self) -> bool:
197-
return self._worker_task_runner is not None
198-
199189
def gracefully_stop(self) -> None:
200190
if self.is_running():
201191
self._worker_state = WorkerState.STOPPING
@@ -245,6 +235,11 @@ async def shutdown(self) -> None:
245235
await self._worker_task_runner.shutdown()
246236

247237
async def wait_for_shutdown(self) -> None:
238+
"""Wait for the worker to stop
239+
240+
This method will wait for the worker to stop running, but doesn't
241+
try to gracefully stop it nor force shutdown.
242+
"""
248243
if self._worker_task_runner is not None:
249244
await self._worker_task_runner.wait_for_shutdown()
250245

tests/scheduler/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from contextlib import asynccontextmanager
2-
from typing import AsyncIterator, cast
2+
from typing import AsyncIterator, Iterator, cast
33

44
import pytest
55
from fastapi import FastAPI
@@ -11,6 +11,7 @@
1111
from fluid.scheduler.broker import RedisTaskBroker
1212
from fluid.scheduler.endpoints import get_task_manger, setup_fastapi
1313
from fluid.tools_fastapi import backdoor
14+
from fluid.utils.stacksampler import Sampler
1415
from tests.scheduler.tasks import TaskClient
1516

1617

@@ -25,6 +26,14 @@ def redis_broker(task_manager: TaskManager) -> RedisTaskBroker:
2526
return cast(RedisTaskBroker, task_manager.broker)
2627

2728

29+
@pytest.fixture(scope="module", autouse=True)
30+
def sampler() -> Iterator[Sampler]:
31+
sampler = Sampler()
32+
sampler.start()
33+
yield sampler
34+
sampler.stop()
35+
36+
2837
@pytest.fixture(scope="module")
2938
async def task_app():
3039
task_manager = tasks.task_scheduler(max_concurrent_tasks=2, schedule_tasks=False)

tests/scheduler/test_scheduler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
TaskState,
1313
)
1414
from fluid.scheduler.errors import UnknownTaskError
15+
from fluid.utils.stacksampler import Sampler
1516
from fluid.utils.waiter import wait_for
1617

1718
pytestmark = pytest.mark.asyncio(loop_scope="module")
@@ -116,3 +117,9 @@ async def test_async_handler(task_scheduler: TaskScheduler) -> None:
116117
assert handler.task_run
117118
assert handler.task_run.state == TaskState.running
118119
assert task_scheduler.unregister_async_handler("running.test") is handler
120+
121+
122+
def test_sampler(sampler: Sampler) -> None:
123+
assert sampler.started
124+
stats = sampler.stats()
125+
assert stats

tests/utils/test_worker.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import asyncio
22
from dataclasses import dataclass, field
33

4+
import pytest
5+
6+
from fluid.utils.errors import WorkerStartError
47
from fluid.utils.worker import QueueConsumerWorker, Worker, Workers, WorkerState
58

69

@@ -10,6 +13,18 @@ async def run(self):
1013
await asyncio.sleep(0.1)
1114

1215

16+
class BadWorker2(Worker):
17+
async def run(self):
18+
await asyncio.sleep(0.1)
19+
raise RuntimeError("I'm so bad")
20+
21+
22+
class BadWorker3(Worker):
23+
async def run(self):
24+
await asyncio.sleep(0.1)
25+
raise asyncio.CancelledError
26+
27+
1328
@dataclass
1429
class Waiter:
1530
waiter: asyncio.Future = field(
@@ -35,10 +50,37 @@ async def test_consumer() -> None:
3550
assert runner.is_stopped()
3651

3752

53+
async def test_metadata() -> None:
54+
worker = BadWorker(stopping_grace_period=2)
55+
assert worker.num_workers == 1
56+
assert list(worker.workers()) == [worker]
57+
await worker.wait_for_shutdown()
58+
assert worker.worker_state is WorkerState.INIT
59+
60+
3861
async def test_force_shutdown() -> None:
3962
worker = BadWorker(stopping_grace_period=2)
4063
await worker.startup()
64+
with pytest.raises(WorkerStartError):
65+
await worker.startup()
4166
assert worker.is_running()
4267
await worker.shutdown()
4368
assert worker.is_stopped()
4469
assert worker.worker_state is WorkerState.FORCE_STOPPED
70+
71+
72+
async def test_exeception2() -> None:
73+
worker = BadWorker2()
74+
await worker.startup()
75+
with pytest.raises(RuntimeError) as exc:
76+
await worker.wait_for_shutdown()
77+
assert str(exc.value) == "I'm so bad"
78+
assert worker.is_stopped()
79+
80+
81+
async def test_exeception3() -> None:
82+
worker = BadWorker3()
83+
await worker.startup()
84+
with pytest.raises(asyncio.CancelledError):
85+
await worker.wait_for_shutdown()
86+
assert worker.is_stopped()

0 commit comments

Comments
 (0)