From 49c04082f18098a528e3b46f9a783288e6cd44c8 Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 18 Jul 2024 23:55:25 +0300 Subject: [PATCH] fix: some minor issues (#342) Co-authored-by: Anton --- taskiq/depends/progress_tracker.py | 25 +++++++++++++++---------- taskiq/receiver/receiver.py | 6 ++++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/taskiq/depends/progress_tracker.py b/taskiq/depends/progress_tracker.py index d6e525e7..fca71bb1 100644 --- a/taskiq/depends/progress_tracker.py +++ b/taskiq/depends/progress_tracker.py @@ -7,12 +7,6 @@ from taskiq.compat import IS_PYDANTIC2 from taskiq.context import Context -if IS_PYDANTIC2: - from pydantic import BaseModel as GenericModel -else: - from pydantic.generics import GenericModel # type: ignore[no-redef] - - _ProgressType = TypeVar("_ProgressType") @@ -25,15 +19,26 @@ class TaskState(str, enum.Enum): RETRY = "RETRY" -class TaskProgress(GenericModel, Generic[_ProgressType]): +if IS_PYDANTIC2: + from pydantic import BaseModel, ConfigDict + + class _TaskProgressConfig(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + +else: + from pydantic.generics import GenericModel + + class _TaskProgressConfig(GenericModel): # type: ignore[no-redef] + class Config: + arbitrary_types_allowed = True + + +class TaskProgress(_TaskProgressConfig, Generic[_ProgressType]): """Progress of task execution.""" state: Union[TaskState, str] meta: Optional[_ProgressType] - class Config: - arbitrary_types_allowed = True - class ProgressTracker(Generic[_ProgressType]): """Task's dependency to set progress.""" diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index c9b1d668..7d5a4035 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -401,8 +401,10 @@ def task_cb(task: "asyncio.Task[Any]") -> None: self.sem_prefetch.release() message = await queue.get() if message is QUEUE_DONE: - logger.info("Waiting for running tasks to complete.") - await asyncio.wait(tasks, timeout=self.wait_tasks_timeout) + # asyncio.wait will throw an error if there is nothing to wait for + if tasks: + logger.info("Waiting for running tasks to complete.") + await asyncio.wait(tasks, timeout=self.wait_tasks_timeout) break task = asyncio.create_task(