Skip to content

Commit 3f8af45

Browse files
committed
Merge branch 'release/0.6.0'
2 parents 66d685f + f020045 commit 3f8af45

File tree

18 files changed

+507
-81
lines changed

18 files changed

+507
-81
lines changed

docs/examples/extending/broker.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from typing import AsyncGenerator
1+
from typing import AsyncGenerator, Union
22

3-
from taskiq import AsyncBroker, BrokerMessage
3+
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
44

55

66
class MyBroker(AsyncBroker):
@@ -23,7 +23,7 @@ async def kick(self, message: BrokerMessage) -> None:
2323
# Send a message.message.
2424
pass
2525

26-
async def listen(self) -> AsyncGenerator[bytes, None]:
26+
async def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
2727
while True:
2828
# Get new message.
2929
new_message: bytes = ... # type: ignore

docs/extending-taskiq/broker.md

+22-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,28 @@ As a broker developer, please send only raw bytes from the `message` field of a
2323
:::
2424

2525

26-
The `listen` method should yield raw bytes that were sent over the network.
26+
## Acknowledgement
27+
28+
The `listen` method should yield raw bytes of a message.
29+
But if your broker supports acking or rejecting messages, the broker should return `taskiq.AckableMessage`
30+
with required fields.
31+
32+
For example:
33+
34+
```python
35+
36+
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
37+
for message in self.my_channel:
38+
yield AckableMessage(
39+
data=message.bytes,
40+
# Ack is a function that takes no parameters.
41+
# So you either set here method of a message,
42+
# or you can make a closure.
43+
ack=message.ack
44+
# Can be set to None if broker doesn't support it.
45+
reject=message.reject
46+
)
47+
```
2748

2849
## Conventions
2950

poetry.lock

+44-44
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <win10@list.ru>"]
66
maintainers = ["Pavel Kirilin <win10@list.ru>"]

taskiq/__init__.py

+30-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,22 @@
66
from taskiq.abc.middleware import TaskiqMiddleware
77
from taskiq.abc.result_backend import AsyncResultBackend
88
from taskiq.abc.schedule_source import ScheduleSource
9+
from taskiq.acks import AckableMessage
910
from taskiq.brokers.inmemory_broker import InMemoryBroker
1011
from taskiq.brokers.shared_broker import async_shared_broker
1112
from taskiq.brokers.zmq_broker import ZeroMQBroker
1213
from taskiq.context import Context
1314
from taskiq.events import TaskiqEvents
14-
from taskiq.exceptions import TaskiqError
15+
from taskiq.exceptions import (
16+
NoResultError,
17+
RejectError,
18+
ResultGetError,
19+
ResultIsReadyError,
20+
SecurityError,
21+
SendTaskError,
22+
TaskiqError,
23+
TaskiqResultTimeoutError,
24+
)
1525
from taskiq.funcs import gather
1626
from taskiq.message import BrokerMessage, TaskiqMessage
1727
from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware
@@ -21,28 +31,45 @@
2131
from taskiq.state import TaskiqState
2232
from taskiq.task import AsyncTaskiqTask
2333

34+
try:
35+
# Python 3.8+
36+
from importlib.metadata import version # noqa: WPS433
37+
except ImportError:
38+
# Python 3.7
39+
from importlib_metadata import version # noqa: WPS433
40+
41+
__version__ = version("taskiq")
2442
__all__ = [
43+
"__version__",
2544
"gather",
2645
"Context",
2746
"AsyncBroker",
2847
"TaskiqError",
48+
"RejectError",
2949
"TaskiqState",
3050
"TaskiqResult",
3151
"ZeroMQBroker",
3252
"TaskiqEvents",
53+
"SecurityError",
3354
"TaskiqMessage",
3455
"BrokerMessage",
56+
"ResultGetError",
3557
"ScheduledTask",
3658
"TaskiqDepends",
59+
"NoResultError",
60+
"SendTaskError",
61+
"AckableMessage",
3762
"InMemoryBroker",
3863
"ScheduleSource",
3964
"TaskiqScheduler",
4065
"TaskiqFormatter",
4166
"AsyncTaskiqTask",
4267
"TaskiqMiddleware",
68+
"ResultIsReadyError",
4369
"AsyncResultBackend",
4470
"async_shared_broker",
45-
"AsyncTaskiqDecoratedTask",
46-
"SimpleRetryMiddleware",
4771
"PrometheusMiddleware",
72+
"SimpleRetryMiddleware",
73+
"AsyncTaskiqDecoratedTask",
74+
"TaskiqResultTimeoutError",
4875
]

taskiq/__main__.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import sys
33
from typing import Dict
44

5-
from importlib_metadata import entry_points, version
5+
from importlib_metadata import entry_points
66

7+
from taskiq import __version__
78
from taskiq.abc.cmd import TaskiqCMD
89

910

@@ -60,7 +61,7 @@ def main() -> None: # noqa: WPS210 # pragma: no cover
6061
args, _ = parser.parse_known_args()
6162

6263
if args.version:
63-
print(version("taskiq")) # noqa: WPS421
64+
print(__version__) # noqa: WPS421
6465
return
6566

6667
if args.subcommand is None:

taskiq/abc/broker.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from typing_extensions import ParamSpec, Self, TypeAlias
2525

2626
from taskiq.abc.middleware import TaskiqMiddleware
27+
from taskiq.acks import AckableMessage
2728
from taskiq.decor import AsyncTaskiqDecoratedTask
2829
from taskiq.events import TaskiqEvents
2930
from taskiq.formatters.json_formatter import JSONFormatter
@@ -68,7 +69,10 @@ class AsyncBroker(ABC):
6869
"""
6970

7071
available_tasks: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {}
72+
# True only if broker runs in worker process.
7173
is_worker_process: bool = False
74+
# True only if broker runs in scheduler process.
75+
is_scheduler_process: bool = False
7276

7377
def __init__(
7478
self,
@@ -182,13 +186,19 @@ async def kick(
182186
"""
183187

184188
@abstractmethod
185-
def listen(self) -> AsyncGenerator[bytes, None]:
189+
def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
186190
"""
187191
This function listens to new messages and yields them.
188192
189193
This it the main point for workers.
190194
This function is used to get new tasks from the network.
191195
196+
If your broker support acknowledgement, then you
197+
should wrap your message in AckableMessage dataclass.
198+
199+
If your messages was wrapped in AckableMessage dataclass,
200+
taskiq will call ack when finish processing message.
201+
192202
:yield: incoming messages.
193203
:return: nothing.
194204
"""

taskiq/acks.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import dataclasses
2+
from typing import Awaitable, Callable, Optional, Union
3+
4+
5+
@dataclasses.dataclass
6+
class AckableMessage:
7+
"""
8+
Message that can be acknowledged.
9+
10+
If your broker support message acknowledgement,
11+
please return this type of message, so we'll be
12+
able to mark this message as acknowledged after
13+
the function will be executed.
14+
15+
It adds more reliability to brokers and system
16+
as a whole.
17+
"""
18+
19+
data: bytes
20+
ack: Callable[[], Union[None, Awaitable[None]]]
21+
reject: Optional[Callable[[], Union[None, Awaitable[None]]]] = None

0 commit comments

Comments
 (0)