diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index fb2daebf418f2..219785fc7d053 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -874,7 +874,7 @@ def execute(self, context, event=None): self.defer(trigger=trigger, method_name="execute") return print("RESUMING") - return self.tfield + 1 + assert self.tfield + 1 == 3 task_one = one() task_two = two(task_one) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 08cbb13335333..449e101f7f565 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1248,6 +1248,8 @@ def _run_task(*, ti, run_triggerer=False): ti.task = taskrun_result.ti.task if ti.state == State.DEFERRED and isinstance(msg, DeferTask) and run_triggerer: + from airflow.utils.session import create_session + # API Server expects the task instance to be in QUEUED state before # resuming from deferral. ti.set_state(State.QUEUED) @@ -1259,7 +1261,10 @@ def _run_task(*, ti, run_triggerer=False): ti.next_kwargs = {"event": event.payload} if event else msg.next_kwargs log.info("[DAG TEST] Trigger completed") - ti.set_state(State.SUCCESS) + # Set the state to SCHEDULED so that the task can be resumed. + with create_session() as session: + ti.state = State.SCHEDULED + session.add(ti) return taskrun_result except Exception: