Skip to content

Initialize worker with arguments #405

Answered by vk496
vk496 asked this question in Q&A
Discussion options

You must be logged in to vote

Just in case someone can find it useful, this is how I manage it right now to integrate Taskiq + Typer without duplicating typer code.

import typer
import asyncio
from taskiq import TaskiqEvents, TaskiqState
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.run import run_worker
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker("amqp://guest:guest@localhost:5672")

app = typer.Typer()

secret_password: str | None = "N/A"

@app.command()
def worker_provider(
    password: str,
) -> None:
    # Typer invocation doesn't have a event loop running
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        logger.info("Program START")
        r…

Replies: 1 comment 1 reply

Comment options

You must be logged in to vote
1 reply
@vk496
Comment options

Answer selected by s3rius
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants