-
Hello and thanks for this amazing project! I'm trying to figure out how to pass arguments to a taskiq worker from "outside". Currently, I'm using For example:
import typer
from taskiq import TaskiqEvents, TaskiqState
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.run import run_worker
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker("amqp://guest:guest@localhost:5672")
app = typer.Typer()
secret_password: str | None = "N/A"
@app.command()
def worker_provider(
password: str,
) -> None:
global secret_password
secret_password = password
print(f"Typer init context: {secret_password}")
run_worker(WorkerArgs(broker="remove2:broker", modules=[__name__]))
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
print(f"Worker init context: {secret_password}")
@broker.task
async def my_worker(guess_password: str) -> bool:
return guess_password == secret_password
if __name__ == "__main__":
app() $ python remove2.py 1234
Typer init context: 1234
[2025-02-07 23:34:13,380][taskiq.worker][INFO ][MainProcess] Pid of a main process: 358134
[2025-02-07 23:34:13,380][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2025-02-07 23:34:13,383][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 358153
[2025-02-07 23:34:13,385][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 358154
[2025-02-07 23:34:13,386][taskiq.worker][INFO ][worker-0] Importing tasks from module __main__
Worker init context: N/A
[2025-02-07 23:34:13,387][taskiq.worker][INFO ][worker-1] Importing tasks from module __main__
Worker init context: N/A
[2025-02-07 23:34:13,402][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
[2025-02-07 23:34:13,402][taskiq.receiver.receiver][INFO ][worker-0] Listening started. As you can check in the logs, the Even if I use dependecy for the task, I still need somehow pass arguments to the worker, but I can't find any info on the docs or Internet. So my question is: How are workers supposed to be initialized? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Yes, because we use multiprocessing for spinning up new workers. Since you're setting a variable in current process, it's only updated on the current process. I guess the best option to fix that would be to allow passing custom context to the run command. |
Beta Was this translation helpful? Give feedback.
Just in case someone can find it useful, this is how I manage it right now to integrate Taskiq + Typer without duplicating typer code.