-
Hello, Im currently trying to create a project with Taskiq and multiple queues, but I have a big issue: messages are sent to the wrong Queue! In my scenario I have 2 workers (with dependency call): import asyncio
from taskiq import TaskiqEvents, TaskiqState
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend
amqp_url: str = "amqp://guest:guest@localhost:5672"
redis_url: str = "redis://localhost"
broker1 = AioPikaBroker(url=amqp_url, queue_name="q-1").with_result_backend(RedisAsyncResultBackend(redis_url))
broker2 = AioPikaBroker(url=amqp_url, queue_name="q-2").with_result_backend(RedisAsyncResultBackend(redis_url))
@broker1.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(
state: TaskiqState,
) -> None:
print("Init broker2, as we will send tasks there")
await broker2.startup()
@broker2.task
def t_ask_question():
return "How are you?"
@broker1.task
async def t_greetings(name: str):
task = await t_ask_question.kiq()
res = await task.wait_result()
return f"Hello {name}. {res.return_value}"
async def main():
await broker1.startup()
task = await t_greetings.kiq(name="Juan")
res = await task.wait_result()
print(res.return_value)
await broker1.shutdown()
if __name__ == "__main__":
asyncio.run(main()) I launch workers with: $ taskiq worker remove2:broker1 -w 1
$ taskiq worker remove2:broker2 -w 1 And then I execute the main part (which works btw): $ python remove2.py
Hello Juan. How are you? However The full logs are: ❯ taskiq worker remove2:broker1 -w 1
[2025-02-12 16:40:27,880][taskiq.worker][INFO ][MainProcess] Pid of a main process: 431322
[2025-02-12 16:40:27,880][taskiq.worker][INFO ][MainProcess] Starting 1 worker processes.
[2025-02-12 16:40:27,883][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 431329
Init broker2, as we will send tasks there
[2025-02-12 16:40:28,014][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2025-02-12 16:40:39,936][taskiq.receiver.receiver][INFO ][worker-0] Executing task remove2:t_greetings with ID: e0d25641f4fc4d57a8ef124d2838c639
[2025-02-12 16:40:39,938][taskiq.receiver.receiver][WARNING][worker-0] task "remove2:t_ask_question" is not found. Maybe you forgot to import it? ❯ taskiq worker remove2:broker2 -w 1
[2025-02-12 16:40:34,249][taskiq.worker][INFO ][MainProcess] Pid of a main process: 431398
[2025-02-12 16:40:34,250][taskiq.worker][INFO ][MainProcess] Starting 1 worker processes.
[2025-02-12 16:40:34,253][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 431411
[2025-02-12 16:40:34,357][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2025-02-12 16:40:39,936][taskiq.receiver.receiver][WARNING][worker-0] task "remove2:t_greetings" is not found. Maybe you forgot to import it?
[2025-02-12 16:40:39,938][taskiq.receiver.receiver][INFO ][worker-0] Executing task remove2:t_ask_question with ID: 34ac0510a7114baab723c2d44784ef67 The key log line is "task X is not found. Maybe you forgot to import it?". In this simple example is not very important. But at large scale, I actually get the application completely blocked with messages dying in a unrelated queue I am doing something wrong or it's actually a issue? Thank you in advance |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
I was so focused on taskiq that I forgot that by default, it works on topics routing. Here the same code working as expected: import asyncio
from taskiq import TaskiqEvents, TaskiqState
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend
amqp_url: str = "amqp://guest:guest@localhost:5672"
redis_url: str = "redis://localhost"
broker1 = AioPikaBroker(url=amqp_url, queue_name="q1", routing_key="ask.#").with_result_backend(
RedisAsyncResultBackend(redis_url)
)
broker2 = AioPikaBroker(url=amqp_url, queue_name="q2", routing_key="suggest.#").with_result_backend(
RedisAsyncResultBackend(redis_url)
)
@broker1.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
print("Init broker2, as we will send tasks there")
await broker2.startup()
@broker2.task("suggest.question")
def t_ask_question():
return "How are you?"
@broker1.task("ask.name")
async def t_greetings(name: str):
task = await t_ask_question.kiq()
res = await task.wait_result()
return f"Hello {name}. {res.return_value}"
async def main():
await broker1.startup()
await broker2.startup()
task = await t_greetings.kiq(name="Juan")
res = await task.wait_result()
print(res.return_value)
await broker1.shutdown()
await broker2.shutdown()
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
I was so focused on taskiq that I forgot that by default, it works on topics routing. Here the same code working as expected: