Skip to content

Conversation

@github-actions
Copy link

When using dag.test() with deferred tasks, tasks that complete their trigger execution were incorrectly being set to SUCCESS state instead of SCHEDULED state. This prevented task resumption!

Dag used to test:

from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()

(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik kaxilnaik@gmail.com

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@kaxil kaxil marked this pull request as ready for review May 29, 2025 11:20
@kaxil kaxil requested review from amoghrajesh, ashb and kaxil as code owners May 29, 2025 11:20
@kaxil kaxil merged commit 778823a into v3-0-test May 29, 2025
49 checks passed
@kaxil kaxil deleted the backport-1b83f71-v3-0-test branch May 29, 2025 14:01
kaxil added a commit that referenced this pull request Jun 3, 2025
…51199)

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants