Skip to content

Commit 25ffb83

Browse files
author
Anton
committed
feat: changed approach to dealing with idle tasks
1 parent 49c0408 commit 25ffb83

File tree

3 files changed

+104
-2
lines changed

3 files changed

+104
-2
lines changed

taskiq/context.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import TYPE_CHECKING
1+
from contextlib import _AsyncGeneratorContextManager
2+
from typing import TYPE_CHECKING, Callable, Optional
23

34
from taskiq.abc.broker import AsyncBroker
45
from taskiq.exceptions import NoResultError, TaskRejectedError
@@ -11,11 +12,17 @@
1112
class Context:
1213
"""Context class."""
1314

14-
def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None:
15+
def __init__(
16+
self,
17+
message: TaskiqMessage,
18+
broker: AsyncBroker,
19+
idle: "Callable[[Optional[int]], _AsyncGeneratorContextManager[None]]",
20+
) -> None:
1521
self.message = message
1622
self.broker = broker
1723
self.state: "TaskiqState" = None # type: ignore
1824
self.state = broker.state
25+
self.idle = idle
1926

2027
async def requeue(self) -> None:
2128
"""

taskiq/depends/task_idler.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from contextlib import asynccontextmanager
2+
from typing import AsyncIterator, Optional
3+
4+
from taskiq_dependencies import Depends
5+
6+
from taskiq.context import Context
7+
8+
9+
class TaskIdler:
10+
"""Task's dependency to idle task."""
11+
12+
def __init__(self, context: Context = Depends()) -> None:
13+
self.context = context
14+
15+
@asynccontextmanager
16+
async def __call__(self, timeout: Optional[int] = None) -> AsyncIterator[None]:
17+
"""Idle task."""
18+
async with self.context.idle(timeout):
19+
yield

tests/depends/test_task_idler.py

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import asyncio
2+
from asyncio.exceptions import CancelledError
3+
4+
import anyio
5+
import pytest
6+
from taskiq_dependencies import Depends
7+
8+
from taskiq.api.receiver import run_receiver_task
9+
from taskiq.brokers.inmemory_broker import InmemoryResultBackend
10+
from taskiq.depends.task_idler import TaskIdler
11+
from tests.utils import AsyncQueueBroker
12+
13+
14+
@pytest.mark.anyio
15+
async def test_task_idler() -> None:
16+
broker = AsyncQueueBroker().with_result_backend(InmemoryResultBackend())
17+
kicked = 0
18+
desired_kicked = 20
19+
20+
@broker.task(timeout=1)
21+
async def test_func(idle: TaskIdler = Depends()) -> None:
22+
nonlocal kicked
23+
async with idle():
24+
await asyncio.sleep(0.5)
25+
kicked += 1
26+
27+
receiver_task = asyncio.create_task(run_receiver_task(broker, max_async_tasks=1))
28+
29+
tasks = []
30+
for _ in range(desired_kicked):
31+
tasks.append(await test_func.kiq())
32+
33+
with anyio.fail_after(1):
34+
for task in tasks:
35+
await task.wait_result(check_interval=0.01)
36+
37+
receiver_task.cancel()
38+
assert kicked == desired_kicked
39+
40+
41+
@pytest.mark.anyio
42+
async def test_task_idler_task_cancelled() -> None:
43+
broker = AsyncQueueBroker().with_result_backend(InmemoryResultBackend())
44+
kicked = 0
45+
desired_kicked = 20
46+
47+
@broker.task(timeout=0.2)
48+
async def test_func_timeout(idle: TaskIdler = Depends()) -> None:
49+
nonlocal kicked
50+
try:
51+
async with idle():
52+
await asyncio.sleep(2)
53+
except CancelledError:
54+
kicked += 1
55+
raise
56+
57+
@broker.task(timeout=2)
58+
async def test_func(idle: TaskIdler = Depends()) -> None:
59+
nonlocal kicked
60+
async with idle():
61+
await asyncio.sleep(0.5)
62+
kicked += 1
63+
64+
receiver_task = asyncio.create_task(run_receiver_task(broker, max_async_tasks=1))
65+
66+
tasks = []
67+
tasks.append(await test_func_timeout.kiq())
68+
for _ in range(desired_kicked):
69+
tasks.append(await test_func.kiq())
70+
71+
with anyio.fail_after(1):
72+
for task in tasks:
73+
await task.wait_result(check_interval=0.01)
74+
75+
receiver_task.cancel()
76+
assert kicked == desired_kicked + 1

0 commit comments

Comments
 (0)