Skip to content

Commit 00ee6cf

Browse files
committed
Merge branch 'release/0.7.2'
2 parents dc6571a + 052e675 commit 00ee6cf

File tree

5 files changed

+14
-6
lines changed

5 files changed

+14
-6
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.7.1"
3+
version = "0.7.2"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <win10@list.ru>"]
66
maintainers = ["Pavel Kirilin <win10@list.ru>"]

taskiq/context.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from copy import copy
21
from typing import TYPE_CHECKING
32

43
from taskiq.abc.broker import AsyncBroker
@@ -28,10 +27,9 @@ async def requeue(self) -> None:
2827
2928
:raises NoResultError: to not store result for current task.
3029
"""
31-
message = copy(self.message)
32-
requeue_count = int(message.labels.get("X-Taskiq-requeue", 0))
30+
requeue_count = int(self.message.labels.get("X-Taskiq-requeue", 0))
3331
requeue_count += 1
34-
message.labels["X-Taskiq-requeue"] = str(requeue_count)
32+
self.message.labels["X-Taskiq-requeue"] = str(requeue_count)
3533
await self.broker.kick(self.broker.formatter.dumps(self.message))
3634
raise NoResultError()
3735

taskiq/receiver/receiver.py

+1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ async def run_task( # noqa: C901, WPS210
265265
return_value=returned,
266266
execution_time=round(execution_time, 2),
267267
error=found_exception,
268+
labels=message.labels,
268269
)
269270
# If exception is found we execute middlewares.
270271
if found_exception is not None:

taskiq/result.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from functools import partial
44
from typing import Any, Callable, Dict, Generic, Optional, TypeVar
55

6-
from pydantic import validator
6+
from pydantic import Field, validator
77
from pydantic.generics import GenericModel
88
from typing_extensions import Self
99

@@ -33,6 +33,7 @@ class TaskiqResult(GenericModel, Generic[_ReturnType]):
3333
log: Optional[str] = None
3434
return_value: _ReturnType
3535
execution_time: float
36+
labels: Dict[str, str] = Field(default_factory=dict)
3637

3738
error: Optional[BaseException] = None
3839

tests/test_requeue.py

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ async def task(context: Context = TaskiqDepends()) -> None:
1818

1919
kicked = await task.kiq()
2020
await kicked.wait_result()
21+
assert (
22+
broker.custom_dependency_context[Context].message.labels["X-Taskiq-requeue"]
23+
== "1"
24+
)
2125

2226
assert runs_count == 2
2327

@@ -40,5 +44,9 @@ async def task(_: None = TaskiqDepends(dep_func)) -> None:
4044

4145
kicked = await task.kiq()
4246
await kicked.wait_result()
47+
assert (
48+
broker.custom_dependency_context[Context].message.labels["X-Taskiq-requeue"]
49+
== "1"
50+
)
4351

4452
assert runs_count == 2

0 commit comments

Comments
 (0)