Skip to content

Commit fff31a5

Browse files
committed
Added sync methods.
Signed-off-by: Pavel Kirilin <win10@list.ru>
1 parent e9c6857 commit fff31a5

File tree

5 files changed

+254
-32
lines changed

5 files changed

+254
-32
lines changed

taskiq/cli/async_task_runner.py

+13-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from taskiq.cli.log_collector import log_collector
1717
from taskiq.message import TaskiqMessage
1818
from taskiq.result import TaskiqResult
19+
from taskiq.utils import maybe_awaitable
1920

2021
logger = getLogger("taskiq.worker")
2122

@@ -168,13 +169,13 @@ async def run_task( # noqa: C901, WPS210, WPS211
168169
)
169170
if found_exception is not None:
170171
for middleware in middlewares:
171-
err_handler = middleware.on_error(
172-
message,
173-
result,
174-
found_exception,
172+
await maybe_awaitable(
173+
middleware.on_error(
174+
message,
175+
result,
176+
found_exception,
177+
),
175178
)
176-
if inspect.isawaitable(err_handler):
177-
await err_handler
178179

179180
return result
180181

@@ -309,13 +310,12 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
309310
)
310311
continue
311312
for middleware in broker.middlewares:
312-
pre_ex_res = middleware.pre_execute(
313-
taskiq_msg,
313+
taskiq_msg = await maybe_awaitable(
314+
middleware.pre_execute(
315+
taskiq_msg,
316+
),
314317
)
315-
if inspect.isawaitable(pre_ex_res):
316-
taskiq_msg = await pre_ex_res
317-
else:
318-
taskiq_msg = pre_ex_res # type: ignore
318+
319319
result = await run_task(
320320
target=broker.available_tasks[message.task_name].original_func,
321321
signature=task_signatures.get(message.task_name),
@@ -325,9 +325,7 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
325325
middlewares=broker.middlewares,
326326
)
327327
for middleware in broker.middlewares:
328-
post_ex_res = middleware.post_execute(taskiq_msg, result)
329-
if inspect.isawaitable(post_ex_res):
330-
await post_ex_res
328+
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
331329
try:
332330
await broker.result_backend.set_result(message.task_id, result)
333331
except Exception as exc:

taskiq/decor.py

+35-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from typing_extensions import ParamSpec
1313

1414
from taskiq.kicker import AsyncKicker
15-
from taskiq.task import AsyncTaskiqTask
15+
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
1616

1717
if TYPE_CHECKING:
1818
from taskiq.abc.broker import AsyncBroker
@@ -93,6 +93,40 @@ async def kiq(
9393
"""
9494
return await self.kicker().kiq(*args, **kwargs)
9595

96+
@overload
97+
def kiq_sync(
98+
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
99+
*args: _FuncParams.args,
100+
**kwargs: _FuncParams.kwargs,
101+
) -> SyncTaskiqTask[_T]:
102+
...
103+
104+
@overload
105+
def kiq_sync(
106+
self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]",
107+
*args: _FuncParams.args,
108+
**kwargs: _FuncParams.kwargs,
109+
) -> SyncTaskiqTask[_ReturnType]:
110+
...
111+
112+
def kiq_sync(
113+
self,
114+
*args: _FuncParams.args,
115+
**kwargs: _FuncParams.kwargs,
116+
) -> Any:
117+
"""
118+
This method sends function call over the network.
119+
120+
It gets current broker and calls it's kick method,
121+
returning what it returns.
122+
123+
:param args: function's arguments.
124+
:param kwargs: function's key word arguments.
125+
126+
:returns: taskiq task.
127+
"""
128+
return self.kicker().kiq_sync(*args, **kwargs)
129+
96130
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
97131
"""
98132
This function returns kicker object.

taskiq/kicker.py

+42-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import asdict, is_dataclass
2-
from inspect import isawaitable
32
from logging import getLogger
43
from typing import (
54
TYPE_CHECKING,
@@ -17,7 +16,8 @@
1716

1817
from taskiq.exceptions import SendTaskError
1918
from taskiq.message import TaskiqMessage
20-
from taskiq.task import AsyncTaskiqTask
19+
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
20+
from taskiq.utils import maybe_awaitable, run_sync
2121

2222
if TYPE_CHECKING:
2323
from taskiq.abc.broker import AsyncBroker
@@ -87,7 +87,7 @@ async def kiq( # noqa: D102
8787
) -> AsyncTaskiqTask[_ReturnType]:
8888
...
8989

90-
async def kiq( # noqa: C901
90+
async def kiq(
9191
self,
9292
*args: _FuncParams.args,
9393
**kwargs: _FuncParams.kwargs,
@@ -110,24 +110,55 @@ async def kiq( # noqa: C901
110110
)
111111
message = self._prepare_message(*args, **kwargs)
112112
for middleware in self.broker.middlewares:
113-
pre_send_res = middleware.pre_send(message)
114-
if isawaitable(pre_send_res):
115-
message = await pre_send_res
116-
else:
117-
message = pre_send_res # type: ignore
113+
message = await maybe_awaitable(middleware.pre_send(message))
114+
118115
try:
119116
await self.broker.kick(self.broker.formatter.dumps(message))
120117
except Exception as exc:
121118
raise SendTaskError() from exc
119+
122120
for middleware in self.broker.middlewares:
123-
post_send_res = middleware.post_send(message)
124-
if isawaitable(post_send_res):
125-
await post_send_res
121+
await maybe_awaitable(middleware.post_send(message))
122+
126123
return AsyncTaskiqTask(
127124
task_id=message.task_id,
128125
result_backend=self.broker.result_backend,
129126
)
130127

128+
@overload
129+
def kiq_sync(
130+
self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]",
131+
*args: _FuncParams.args,
132+
**kwargs: _FuncParams.kwargs,
133+
) -> SyncTaskiqTask[_T]:
134+
...
135+
136+
@overload
137+
def kiq_sync(
138+
self: "AsyncKicker[_FuncParams, _ReturnType]",
139+
*args: _FuncParams.args,
140+
**kwargs: _FuncParams.kwargs,
141+
) -> SyncTaskiqTask[_ReturnType]:
142+
...
143+
144+
def kiq_sync(
145+
self,
146+
*args: _FuncParams.args,
147+
**kwargs: _FuncParams.kwargs,
148+
) -> Any:
149+
"""
150+
This method sends function call over the network.
151+
152+
It just wraps async kiq call in run_sync
153+
funcion.
154+
155+
:param args: function's arguments.
156+
:param kwargs: function's key word arguments.
157+
158+
:returns: sync taskiq task.
159+
"""
160+
return SyncTaskiqTask(run_sync(self.kiq(*args, **kwargs)))
161+
131162
@classmethod
132163
def _prepare_arg(cls, arg: Any) -> Any:
133164
"""

taskiq/task.py

+111-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import asyncio
2+
from abc import ABC, abstractmethod
23
from time import time
3-
from typing import TYPE_CHECKING, Generic, TypeVar
4+
from typing import TYPE_CHECKING, Any, Coroutine, Generic, TypeVar, Union
45

56
from taskiq.exceptions import (
67
ResultGetError,
78
ResultIsReadyError,
89
TaskiqResultTimeoutError,
910
)
11+
from taskiq.utils import run_sync
1012

1113
if TYPE_CHECKING:
1214
from taskiq.abc.result_backend import AsyncResultBackend
@@ -15,7 +17,111 @@
1517
_ReturnType = TypeVar("_ReturnType")
1618

1719

18-
class AsyncTaskiqTask(Generic[_ReturnType]):
20+
class _Task(ABC, Generic[_ReturnType]):
21+
"""TaskiqTask interface."""
22+
23+
@abstractmethod
24+
def is_ready(self) -> Union[bool, Coroutine[Any, Any, bool]]:
25+
"""
26+
Method to check wether result is ready.
27+
28+
:return: True if result is ready.
29+
"""
30+
31+
@abstractmethod
32+
def get_result( # noqa: WPS234
33+
self,
34+
with_logs: bool = False,
35+
) -> Union[
36+
"TaskiqResult[_ReturnType]",
37+
Coroutine[Any, Any, "TaskiqResult[_ReturnType]"],
38+
]:
39+
"""
40+
Get actual execution result.
41+
42+
:param with_logs: wether you want to fetch logs.
43+
:return: TaskiqResult.
44+
"""
45+
46+
@abstractmethod
47+
def wait_result( # noqa: WPS234
48+
self,
49+
check_interval: float = 0.2,
50+
timeout: float = -1.0,
51+
with_logs: bool = False,
52+
) -> Union[
53+
"TaskiqResult[_ReturnType]",
54+
Coroutine[Any, Any, "TaskiqResult[_ReturnType]"],
55+
]:
56+
"""
57+
Wait for result to become ready and get it.
58+
59+
This function constantly checks wheter result is ready
60+
and fetches it when it becomes available.
61+
62+
:param check_interval: how ofen availability is checked.
63+
:param timeout: maximum amount of time it will wait
64+
before raising TaskiqResultTimeoutError.
65+
:param with_logs: whether you need to download logs.
66+
:return: TaskiqResult.
67+
"""
68+
69+
70+
class SyncTaskiqTask(_Task[_ReturnType]):
71+
"""Sync wrapper over AsyncTaskiqTask."""
72+
73+
def __init__(self, async_task: "AsyncTaskiqTask[_ReturnType]") -> None:
74+
self.async_task = async_task
75+
76+
def is_ready(self) -> bool:
77+
"""
78+
Checks if task is completed.
79+
80+
:return: True if task is completed.
81+
"""
82+
return run_sync(self.async_task.is_ready())
83+
84+
def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType]":
85+
"""
86+
Get result of a task from result backend.
87+
88+
:param with_logs: whether you want to fetch logs from worker.
89+
90+
:return: task's return value.
91+
"""
92+
return run_sync(self.async_task.get_result(with_logs=with_logs))
93+
94+
def wait_result(
95+
self,
96+
check_interval: float = 0.2,
97+
timeout: float = -1,
98+
with_logs: bool = False,
99+
) -> "TaskiqResult[_ReturnType]":
100+
"""
101+
Waits until result is ready.
102+
103+
This method just checks whether the task is
104+
ready. And if it is it returns the result.
105+
106+
It may throw TaskiqResultTimeoutError if
107+
task didn't became ready in provided
108+
period of time.
109+
110+
:param check_interval: How often checks are performed.
111+
:param timeout: timeout for the result.
112+
:param with_logs: whether you want to fetch logs from worker.
113+
:return: task's return value.
114+
"""
115+
return run_sync(
116+
self.async_task.wait_result(
117+
check_interval=check_interval,
118+
timeout=timeout,
119+
with_logs=with_logs,
120+
),
121+
)
122+
123+
124+
class AsyncTaskiqTask(_Task[_ReturnType]):
19125
"""AsyncTask for AsyncResultBackend."""
20126

21127
def __init__(
@@ -59,8 +165,8 @@ async def get_result(self, with_logs: bool = False) -> "TaskiqResult[_ReturnType
59165

60166
async def wait_result(
61167
self,
62-
check_interval: float = 1.0,
63-
timeout: float = 5.0,
168+
check_interval: float = 0.2,
169+
timeout: float = -1.0,
64170
with_logs: bool = False,
65171
) -> "TaskiqResult[_ReturnType]":
66172
"""
@@ -83,6 +189,6 @@ async def wait_result(
83189
start_time = time()
84190
while not await self.is_ready():
85191
await asyncio.sleep(check_interval)
86-
if time() - start_time > timeout:
192+
if 0 < timeout < time() - start_time:
87193
raise TaskiqResultTimeoutError()
88194
return await self.get_result(with_logs=with_logs)

taskiq/utils.py

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import asyncio
2+
import inspect
3+
from concurrent.futures import ThreadPoolExecutor
4+
from typing import Any, Awaitable, Coroutine, TypeVar, Union
5+
6+
_T = TypeVar("_T") # noqa: WPS111
7+
8+
9+
def run_sync(coroutine: "Coroutine[Any, Any, _T]") -> _T:
10+
"""
11+
Run the coroutine synchronously.
12+
13+
This function tries to run corouting using asyncio.run.
14+
15+
If it's not possible, it manually creates executor and
16+
runs async function returns it's result.
17+
18+
1. When called within a coroutine.
19+
2. When called from ``python -m asyncio``, or iPython with %autoawait
20+
enabled, which means an event loop may already be running in the
21+
current thread.
22+
23+
:param coroutine: awaitable to execute.
24+
:returns: the same type as if it were awaited.
25+
"""
26+
try:
27+
# We try this first, as in most situations this will work.
28+
return asyncio.run(coroutine)
29+
except RuntimeError:
30+
# An event loop already exists.
31+
with ThreadPoolExecutor(max_workers=1) as executor:
32+
future = executor.submit(asyncio.run, coroutine)
33+
return future.result()
34+
35+
36+
async def maybe_awaitable(
37+
possible_coroutine: "Union[_T, Awaitable[_T]]",
38+
) -> _T:
39+
"""
40+
Awaits coroutine if needed.
41+
42+
This function allows run function
43+
that may return coroutine.
44+
45+
It not awaitable value passed, it
46+
returned immediately.
47+
48+
:param possible_coroutine: some value.
49+
:return: value.
50+
"""
51+
if inspect.isawaitable(possible_coroutine):
52+
return await possible_coroutine
53+
return possible_coroutine # type: ignore

0 commit comments

Comments
 (0)