Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the TriggererJobRunner to run triggers in a process without DB access #46677

Merged
merged 1 commit into from
Feb 18, 2025

Conversation

ashb
Copy link
Member

@ashb ashb commented Feb 12, 2025

This uses a similar approach to the DAG Parser -- the subprocess runs the
async Triggers (i.e. user code) in a process and sends messages back and forth
to the supervisor/parent to perform CRUD operations on the DB.

I have also massively re-worked how per-trigger logging works to greatly simplify it. I hope @dstandish will approve.
The main way it has been simplified is with the switch to TaskSDK then all (100%! Really) of logs are set as JSON over a socket to the parent process; everything in the subprocess logs to this output, there is no differentiation needed in stdlib, no custom handlers etc. and by making use of structlog's automatic context vars we can include a trigger_id field -- if we find that we route the output to the right trigger specific log file.

This is all now so much simpler with structlog in the mix.

Logging from the async process works as follows:

  • stdlib logging is configured to send messages via struct log as json
  • As part of the stdlib->structlog processing change we include structlog bound contextvars
  • When a triggerer coro starts it binds trigger_id as a paramter
  • When the Supervisor receives a log message (which happens as LD JSON over a dedicated socket channel) it parses the JSON, and if it finds trigger_id key in there it redirects it to the trigger file log, else prints it.

Of note: I haven't allowed triggers to directly access or set Xcom, Variables etc. We can add that in future if there is demand.

Testing:

I created a simple test dag:

from datetime import timedelta

from airflow import DAG
from airflow.providers.standard.sensors.time_delta import WaitSensor


with DAG("trigger_test"):
    WaitSensor(task_id="waiter", time_to_wait=timedelta(seconds=90), deferrable=True)

And fired things up in breeze start-airflow

And we can see that the trigger runs and we get logs from the triggerer in the output (The ordering of logs isn't correct, but that is a separate issue)

Screenshot 2025-02-16 at 12 31 23

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb
Copy link
Member Author

ashb commented Feb 12, 2025

I need to re-examing logs to make sure they still get routed to the right place, and fix all the tests

@ashb ashb force-pushed the triggerer-user-code-without-db branch 2 times, most recently from 62817ba to 3affa2d Compare February 12, 2025 23:32
@ashb ashb force-pushed the triggerer-user-code-without-db branch 4 times, most recently from c4e86ea to ed561f2 Compare February 15, 2025 22:28
@ashb ashb marked this pull request as ready for review February 15, 2025 22:59
@ashb ashb force-pushed the triggerer-user-code-without-db branch 2 times, most recently from b8aabbe to eeeb809 Compare February 16, 2025 12:51
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, just few comments/qns

airflow/executors/workloads.py Outdated Show resolved Hide resolved
airflow/models/trigger.py Show resolved Hide resolved
task_sdk/src/airflow/sdk/api/client.py Show resolved Hide resolved
task_sdk/src/airflow/sdk/api/client.py Show resolved Hide resolved
airflow/jobs/triggerer_job_runner.py Outdated Show resolved Hide resolved
airflow/jobs/triggerer_job_runner.py Outdated Show resolved Hide resolved
airflow/jobs/triggerer_job_runner.py Show resolved Hide resolved
airflow/jobs/triggerer_job_runner.py Show resolved Hide resolved
@ashb ashb force-pushed the triggerer-user-code-without-db branch from eeeb809 to 9c7a0dc Compare February 17, 2025 15:12
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments, LGTM +1.

Lets get this one in, we might discover issues while testing and we can fix those as they come.

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Massive refactoring on the triggerer side! I'd wait for more reviews given the size of the PR but I could not find anything outstanding :)

@ashb ashb force-pushed the triggerer-user-code-without-db branch from f2c5e03 to e7bf549 Compare February 17, 2025 23:02
… access

This uses a similar approach to the DAG Parser -- the subprocess runs the
async Triggers (i.e. user code) in a process and sends messages back and forth
to the supervisor/parent to perform CRUD operations on the DB.

I have also massively re-worked how per-trigger logging works to greatly simplify it. I hope @dstandish will approve.
The main way it has been simplified is with the switch to TaskSDK then all
(100%! Really) of logs are set as JSON over a socket to the parent process;
everything in the subprocess logs to this output, there is no differentiation
needed in stdlib, no custom handlers etc. and by making use of structlog's
automatic context vars we can include a trigger_id field -- if we find that we
route the output to the right trigger specific log file.

This is all now so much simpler with structlog in the mix.

Logging from the async process works as follows:

- stdlib logging is configured to send messages via struct log as json
- As part of the stdlib->structlog processing change we include structlog
  bound contextvars
- When a triggerer coro starts it binds trigger_id as a paramter
- When the Supervisor receives a log message (which happens as LD JSON over a
  dedicated socket channel) it parses the JSON, and if it finds trigger_id key
  in there it redirects it to the trigger file log, else prints it.
@ashb ashb force-pushed the triggerer-user-code-without-db branch from e7bf549 to d1ee9ca Compare February 18, 2025 07:03
@ashb ashb merged commit 18396ed into main Feb 18, 2025
149 checks passed
@ashb ashb deleted the triggerer-user-code-without-db branch February 18, 2025 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants