From 8cfa3a64bbe2ed42a409e5a33f9fede71def0cb7 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 29 May 2025 04:23:51 +0530 Subject: [PATCH] Fix deferred task resumption in ``dag.test()`` When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption! Dag used to test: ```python from datetime import datetime, timedelta, timezone from typing import Any import pendulum from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sdk import Context, task, BaseOperator, DAG class DummyOperator(BaseOperator): def execute(self, context: Context): self.defer( trigger=DateTimeTrigger( moment=datetime.now(timezone.utc) + timedelta(seconds=2), ), method_name="execute_complet", ) def execute_complet(self, context: Context, event: Any = None): assert event is not None return "test" @task def dummy_task(param): print("DEBUG") assert param == "test", "Parameter should be 'test'" with DAG( dag_id="example_debug", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), ) as dag: task1 = DummyOperator(task_id="task1") task2 = dummy_task(task1.output) task1 >> task2 if __name__ == "__main__": dag.test() ``` --- airflow-core/tests/unit/cli/commands/test_dag_command.py | 2 +- task-sdk/src/airflow/sdk/definitions/dag.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index fb2daebf418f2..219785fc7d053 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -874,7 +874,7 @@ def execute(self, context, event=None): self.defer(trigger=trigger, method_name="execute") return print("RESUMING") - return self.tfield + 1 + assert self.tfield + 1 == 3 task_one = one() task_two = two(task_one) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 08cbb13335333..449e101f7f565 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1248,6 +1248,8 @@ def _run_task(*, ti, run_triggerer=False): ti.task = taskrun_result.ti.task if ti.state == State.DEFERRED and isinstance(msg, DeferTask) and run_triggerer: + from airflow.utils.session import create_session + # API Server expects the task instance to be in QUEUED state before # resuming from deferral. ti.set_state(State.QUEUED) @@ -1259,7 +1261,10 @@ def _run_task(*, ti, run_triggerer=False): ti.next_kwargs = {"event": event.payload} if event else msg.next_kwargs log.info("[DAG TEST] Trigger completed") - ti.set_state(State.SUCCESS) + # Set the state to SCHEDULED so that the task can be resumed. + with create_session() as session: + ti.state = State.SCHEDULED + session.add(ti) return taskrun_result except Exception: