Skip to content

Commit e7bf549

Browse files
committed
Rework the TriggererJobRunner to run triggers in a process without DB access
This uses a similar approach to the DAG Parser -- the subprocess runs the async Triggers (i.e. user code) in a process and sends messages back and forth to the supervisor/parent to perform CRUD operations on the DB. I have also massively re-worked how per-trigger logging works to greatly simplify it. I hope @dstandish will approve. The main way it has been simplified is with the switch to TaskSDK then all (100%! Really) of logs are set as JSON over a socket to the parent process; everything in the subprocess logs to this output, there is no differentiation needed in stdlib, no custom handlers etc. and by making use of structlog's automatic context vars we can include a trigger_id field -- if we find that we route the output to the right trigger specific log file. This is all now so much simpler with structlog in the mix. Logging from the async process works as follows: - stdlib logging is configured to send messages via struct log as json - As part of the stdlib->structlog processing change we include structlog bound contextvars - When a triggerer coro starts it binds trigger_id as a paramter - When the Supervisor receives a log message (which happens as LD JSON over a dedicated socket channel) it parses the JSON, and if it finds trigger_id key in there it redirects it to the trigger file log, else prints it.
1 parent c4091ab commit e7bf549

File tree

19 files changed

+1085
-1541
lines changed

19 files changed

+1085
-1541
lines changed

.github/boring-cyborg.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,6 @@ labelPRBasedOnFilePath:
405405
- tests/cli/commands/local_commands/test_triggerer_command.py
406406
- tests/jobs/test_triggerer_job.py
407407
- tests/models/test_trigger.py
408-
- tests/jobs/test_triggerer_job_logging.py
409408
- providers/standard/tests/unit/standard/triggers/**/*
410409

411410
area:Serialization:

airflow/dag_processing/processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import sys
2121
import traceback
2222
from pathlib import Path
23-
from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union
23+
from typing import TYPE_CHECKING, Annotated, Callable, ClassVar, Literal, Union
2424

2525
import attrs
2626
from pydantic import BaseModel, Field, TypeAdapter
@@ -207,7 +207,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
207207
"""
208208

209209
parsing_result: DagFileParsingResult | None = None
210-
decoder: TypeAdapter[ToParent] = TypeAdapter[ToParent](ToParent)
210+
decoder: ClassVar[TypeAdapter[ToParent]] = TypeAdapter[ToParent](ToParent)
211211

212212
@classmethod
213213
def start( # type: ignore[override]

airflow/executors/local_executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@
3636
from setproctitle import setproctitle
3737

3838
from airflow import settings
39+
from airflow.executors import workloads
3940
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
4041
from airflow.utils.session import NEW_SESSION, provide_session
4142
from airflow.utils.state import TaskInstanceState
4243

4344
if TYPE_CHECKING:
4445
from sqlalchemy.orm import Session
4546

46-
from airflow.executors import workloads
47-
4847
TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Optional[Exception]]
4948

5049

@@ -82,6 +81,9 @@ def _run_worker(
8281
# Received poison pill, no more tasks to run
8382
return
8483

84+
if not isinstance(workload, workloads.ExecuteTask):
85+
raise ValueError(f"LocalExecutor does not now how to handle {type(workload)}")
86+
8587
# Decrement this as soon as we pick up a message off the queue
8688
with unread_messages:
8789
unread_messages.value -= 1

airflow/executors/workloads.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818

1919
import os
2020
import uuid
21+
from datetime import datetime
2122
from pathlib import Path
22-
from typing import TYPE_CHECKING, Literal, Union
23+
from typing import TYPE_CHECKING, Annotated, Literal, Union
2324

2425
from pydantic import BaseModel, Field
2526

@@ -106,4 +107,33 @@ def make(cls, ti: TIModel, dag_rel_path: Path | None = None) -> ExecuteTask:
106107
return cls(ti=ser_ti, dag_rel_path=path, token="", log_path=fname, bundle_info=bundle_info)
107108

108109

109-
All = Union[ExecuteTask]
110+
class RunTrigger(BaseModel):
111+
"""Execute an async "trigger" process that yields events."""
112+
113+
id: int
114+
115+
ti: TaskInstance | None
116+
"""
117+
The task instance associated with this trigger.
118+
119+
Could be none for asset-based triggers.
120+
"""
121+
122+
classpath: str
123+
"""
124+
Dot-separated name of the module+fn to import and run this workload.
125+
126+
Consumers of this Workload must perform their own validation of this input.
127+
"""
128+
129+
encrypted_kwargs: str
130+
131+
timeout_after: datetime | None = None
132+
133+
kind: Literal["RunTrigger"] = Field(init=False, default="RunTrigger")
134+
135+
136+
All = Annotated[
137+
Union[ExecuteTask, RunTrigger],
138+
Field(discriminator="kind"),
139+
]

0 commit comments

Comments
 (0)