Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the TriggererJobRunner to run triggers in a process without DB access #46677

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ labelPRBasedOnFilePath:
- tests/cli/commands/local_commands/test_triggerer_command.py
- tests/jobs/test_triggerer_job.py
- tests/models/test_trigger.py
- tests/jobs/test_triggerer_job_logging.py
- providers/standard/tests/unit/standard/triggers/**/*

area:Serialization:
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sys
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union
from typing import TYPE_CHECKING, Annotated, Callable, ClassVar, Literal, Union

import attrs
from pydantic import BaseModel, Field, TypeAdapter
Expand Down Expand Up @@ -207,7 +207,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
"""

parsing_result: DagFileParsingResult | None = None
decoder: TypeAdapter[ToParent] = TypeAdapter[ToParent](ToParent)
decoder: ClassVar[TypeAdapter[ToParent]] = TypeAdapter[ToParent](ToParent)

@classmethod
def start( # type: ignore[override]
Expand Down
6 changes: 4 additions & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@
from setproctitle import setproctitle

from airflow import settings
from airflow.executors import workloads
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.executors import workloads

TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Optional[Exception]]


Expand Down Expand Up @@ -82,6 +81,9 @@ def _run_worker(
# Received poison pill, no more tasks to run
return

if not isinstance(workload, workloads.ExecuteTask):
raise ValueError(f"LocalExecutor does not now how to handle {type(workload)}")

# Decrement this as soon as we pick up a message off the queue
with unread_messages:
unread_messages.value -= 1
Expand Down
34 changes: 32 additions & 2 deletions airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Union
from typing import TYPE_CHECKING, Annotated, Literal, Union

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -106,4 +107,33 @@ def make(cls, ti: TIModel, dag_rel_path: Path | None = None) -> ExecuteTask:
return cls(ti=ser_ti, dag_rel_path=path, token="", log_path=fname, bundle_info=bundle_info)


All = Union[ExecuteTask]
class RunTrigger(BaseModel):
"""Execute an async "trigger" process that yields events."""

id: int

ti: TaskInstance | None
"""
The task instance associated with this trigger.

Could be none for asset-based triggers.
"""

classpath: str
"""
Dot-separated name of the module+fn to import and run this workload.

Consumers of this Workload must perform their own validation of this input.
"""

encrypted_kwargs: str

timeout_after: datetime | None = None

kind: Literal["RunTrigger"] = Field(init=False, default="RunTrigger")


All = Annotated[
Union[ExecuteTask, RunTrigger],
Field(discriminator="kind"),
]
Loading
Loading