Skip to content

Commit b64e8ff

Browse files
committed
Added README.md.
Signed-off-by: Pavel Kirilin <win10@list.ru>
1 parent fff31a5 commit b64e8ff

File tree

3 files changed

+296
-7
lines changed

3 files changed

+296
-7
lines changed

README.md

+264
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Taskiq
2+
3+
Taskiq is an asynchronous distributed task queue.
4+
This project takes inspiration from big projects such as [Celery](https://docs.celeryq.dev) and [Dramatiq](https://dramatiq.io/).
5+
But taskiq can send and run both the sync and async functions.
6+
Also, we use [PEP-612](https://peps.python.org/pep-0612/) to provide the best autosuggestions possible. But since it's a new PEP, I encourage you to use taskiq with VS code because Pylance understands all types correctly.
7+
8+
# Installation
9+
10+
This project can be installed using pip:
11+
```bash
12+
pip install taskiq
13+
```
14+
15+
Or it can be installed directly from git:
16+
17+
```bash
18+
pip install git+https://github.com/taskiq-python/taskiq
19+
```
20+
21+
# Usage
22+
23+
Let's see the example with the in-memory broker:
24+
25+
```python
26+
import asyncio
27+
28+
from taskiq.brokers.inmemory_broker import InMemoryBroker
29+
30+
31+
# This is broker that can be used for
32+
# development or for demo purpose.
33+
# In production environment consider using
34+
# real distributed brokers, such as taskiq-aio-pika
35+
# for rabbitmq.
36+
broker = InMemoryBroker()
37+
38+
39+
# Or you can optionally
40+
# pass task_name as the parameter in the decorator.
41+
@broker.task
42+
async def my_async_task() -> None:
43+
"""My lovely task."""
44+
await asyncio.sleep(1)
45+
print("Hello")
46+
47+
48+
async def main():
49+
# Kiq is the method that actually
50+
# sends task over the network.
51+
task = await my_async_task.kiq()
52+
# Now we print result of execution.
53+
print(await task.get_result())
54+
55+
56+
asyncio.run(main())
57+
58+
```
59+
60+
61+
You can run it with python without any extra actions,
62+
since this script uses the `InMemoryBroker`.
63+
64+
It won't send any data over the network,
65+
and you cannot use this type of broker in
66+
a real-world scenario, but it's useful for
67+
local development if you do not want to
68+
set up a taskiq worker.
69+
70+
71+
## Brokers
72+
73+
Brokers are simple. They don't execute functions,
74+
but they can send messages and listen to new messages.
75+
76+
Every broker implements the [taskiq.abc.broker.AsyncBroker](https://github.com/taskiq-python/taskiq/blob/master/taskiq/abc/broker.py#L50) abstract class. All tasks are assigned to brokers, so every time you call the `kiq` method, you send this task to the assigned broker. (This behavior can be changed, by using `Kicker` directly).
77+
78+
Also you can add middlewares to brokers using `add_middlewares` method.
79+
80+
Like this:
81+
82+
```python
83+
from taskiq.brokers.inmemory_broker import InMemoryBroker
84+
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
85+
86+
# This is broker that can be used for
87+
# development or for demo purpose.
88+
# In production environment consider using
89+
# real distributed brokers, such as taskiq-aio-pika
90+
# for rabbitmq.
91+
broker = InMemoryBroker()
92+
broker.add_middlewares(
93+
[
94+
SimpleRetryMiddleware(
95+
default_retry_count=4,
96+
)
97+
]
98+
)
99+
```
100+
101+
To run middlewares properly you must add them using the `add_middlewares` method.
102+
It lead to errors if you try to add them by modifying broker directly.
103+
104+
Also brokers have formatters. You can change format
105+
of a message to be compitable with other task execution
106+
systems, so your migration to taskiq can be smoother.
107+
108+
## Result backends
109+
110+
After task is complete it will try to save the results of execution
111+
in result backends. By default brokers
112+
use `DummyResultBackend` wich doesn't do anything. It
113+
won't print the result in logs and it always returns
114+
`None` as the `return_value`, and 0 for `execution_time`.
115+
But some brokers can override it. For example `InMemoryBroker` by default uses `InMemoryResultBackend` and returns correct results.
116+
117+
118+
## CLI
119+
120+
Taskiq has a command line interface to run workers.
121+
It's very simple to get it to work.
122+
123+
You just have to provide path to your broker. As an example, if you want to start listen to new tasks
124+
with broker in module `my_project.broker` you just
125+
have to run:
126+
127+
```
128+
taskiq my_project.broker:broker
129+
```
130+
131+
taskiq can discover tasks modules to import,
132+
if you add the `-fsd` (file system discover) option.
133+
134+
Let's assume we have project with the following structure:
135+
136+
```
137+
test_project
138+
├── broker.py
139+
├── submodule
140+
│ └── tasks.py
141+
└── utils
142+
└── tasks.py
143+
```
144+
145+
You can specify all tasks modules to import manually.
146+
147+
```bash
148+
taskiq test_project.broker:broker test_projec.submodule.tasks test_projec.utils.tasks
149+
```
150+
151+
Or you can let taskiq find all python modules named tasks in current directory recursively.
152+
153+
```bash
154+
taskiq test_project.broker:broker -fsd
155+
```
156+
157+
You can always run `--help` to see all possible options.
158+
159+
160+
## Middlewares
161+
162+
Middlewares are used to modify message, or take
163+
some actions after task is complete.
164+
165+
You can write your own middlewares by subclassing
166+
the `taskiq.abc.middleware.TaskiqMiddleware`.
167+
168+
Every hook can be sync or async. Taskiq will execute it.
169+
170+
For example, this is a valid middleware.
171+
172+
```python
173+
import asyncio
174+
175+
from taskiq.abc.middleware import TaskiqMiddleware
176+
from taskiq.message import TaskiqMessage
177+
178+
179+
class MyMiddleware(TaskiqMiddleware):
180+
async def pre_send(self, message: "TaskiqMessage") -> TaskiqMessage:
181+
await asyncio.sleep(1)
182+
message.labels["my_label"] = "my_value"
183+
return message
184+
185+
def post_send(self, message: "TaskiqMessage") -> None:
186+
print(f"Message {message} was sent.")
187+
188+
```
189+
190+
You can use sync or async hooks without changing aything, but adding async to the hook signature.
191+
192+
Middlewares can store information in message.labels for
193+
later use. For example `SimpleRetryMiddleware` uses labels
194+
to remember number of failed attempts.
195+
196+
## Messages
197+
198+
Every message has labels. You can define labels
199+
using `task` decorator, or you can add them using kicker.
200+
201+
For example:
202+
203+
```python
204+
205+
@broker.task(my_label=1, label2="something")
206+
async def my_async_task() -> None:
207+
"""My lovely task."""
208+
await asyncio.sleep(1)
209+
print("Hello")
210+
211+
async def main():
212+
await my_async_task.kiq()
213+
```
214+
215+
It's equivalent to this
216+
217+
```python
218+
219+
@broker.task
220+
async def my_async_task() -> None:
221+
"""My lovely task."""
222+
await asyncio.sleep(1)
223+
print("Hello")
224+
225+
async def main():
226+
await my_async_task.kicker().with_labels(
227+
my_label=1,
228+
label2="something",
229+
).kiq()
230+
```
231+
232+
## Kicker
233+
234+
The kicker is the object that sends tasks.
235+
When you call kiq it generates a Kicker instance,
236+
remembering current broker and message labels.
237+
You can change the labels you want to use for this particular task or you can even change broker.
238+
239+
For example:
240+
241+
```python
242+
import asyncio
243+
244+
from taskiq.brokers.inmemory_broker import InMemoryBroker
245+
246+
broker = InMemoryBroker()
247+
second_broker = InMemoryBroker()
248+
249+
250+
@broker.task
251+
async def my_async_task() -> None:
252+
"""My lovely task."""
253+
await asyncio.sleep(1)
254+
print("Hello")
255+
256+
257+
async def main():
258+
task = await my_async_task.kicker().with_broker(second_broker).kiq()
259+
print(await task.get_result())
260+
261+
262+
asyncio.run(main())
263+
264+
```

pyproject.toml

+15
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@ maintainers = ["Pavel Kirilin <win10@list.ru>"]
77
readme = "README.md"
88
repository = "https://github.com/taskiq-python/taskiq"
99
license = "LICENSE"
10+
classifiers = [
11+
"Typing :: Typed",
12+
"Programming Language :: Python",
13+
"Programming Language :: Python :: 3",
14+
"Programming Language :: Python :: 3 :: Only",
15+
"Programming Language :: Python :: 3.7",
16+
"Programming Language :: Python :: 3.8",
17+
"Programming Language :: Python :: 3.9",
18+
"Programming Language :: Python :: 3.10",
19+
"Topic :: Utilities",
20+
"Topic :: System :: Networking",
21+
"Operating System :: OS Independent",
22+
]
23+
homepage = "https://github.com/taskiq-python/taskiq"
24+
keywords = ["taskiq", "tasks", "distributed", "async"]
1025

1126
[tool.poetry.dependencies]
1227
python = "^3.7"

taskiq/brokers/inmemory_broker.py

+17-7
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import inspect
22
from collections import OrderedDict
33
from concurrent.futures import ThreadPoolExecutor
4-
from typing import AsyncGenerator, Optional, TypeVar
4+
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
55

66
from taskiq.abc.broker import AsyncBroker
77
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
88
from taskiq.cli.async_task_runner import run_task
9-
from taskiq.exceptions import TaskiqError
9+
from taskiq.exceptions import ResultSetError, TaskiqError
1010
from taskiq.message import BrokerMessage
1111

1212
_ReturnType = TypeVar("_ReturnType")
@@ -83,17 +83,22 @@ class InMemoryBroker(AsyncBroker):
8383
It's useful for local development, if you don't want to setup real broker.
8484
"""
8585

86-
def __init__(
86+
def __init__( # noqa: WPS211
8787
self,
8888
sync_tasks_pool_size: int = 4,
8989
logs_format: Optional[str] = None,
9090
max_stored_results: int = 100,
9191
cast_types: bool = True,
92+
result_backend: Optional[AsyncResultBackend[Any]] = None,
93+
task_id_generator: Optional[Callable[[], str]] = None,
9294
) -> None:
93-
super().__init__(
94-
InmemoryResultBackend(
95+
if result_backend is None:
96+
result_backend = InmemoryResultBackend(
9597
max_stored_results=max_stored_results,
96-
),
98+
)
99+
super().__init__(
100+
result_backend=result_backend,
101+
task_id_generator=task_id_generator,
97102
)
98103
self.executor = ThreadPoolExecutor(max_workers=sync_tasks_pool_size)
99104
self.cast_types = cast_types
@@ -113,6 +118,8 @@ async def kick(self, message: BrokerMessage) -> None:
113118
This method just executes given task.
114119
115120
:param message: incomming message.
121+
122+
:raises ResultSetError: if cannot save results in result backend.
116123
:raises TaskiqError: if someone wants to kick unknown task.
117124
"""
118125
target_task = self.available_tasks.get(message.task_name)
@@ -127,7 +134,10 @@ async def kick(self, message: BrokerMessage) -> None:
127134
executor=self.executor,
128135
middlewares=self.middlewares,
129136
)
130-
await self.result_backend.set_result(message.task_id, result)
137+
try:
138+
await self.result_backend.set_result(message.task_id, result)
139+
except Exception as exc:
140+
raise ResultSetError("Cannot set result.") from exc
131141

132142
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
133143
"""

0 commit comments

Comments
 (0)