Skip to content

Conversation

@github-actions
Copy link

Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:

from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group


@task
def get_nums() -> list[int]:
    return [1, 2, 4]


@task
def times_2(n: int) -> int:
    return n * 2


@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value


@task
def log_success() -> None:
    print("Processed successful!")


@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()


test()

Before

Screenshot 2025-10-21 at 17 57 20

After

Screenshot 2025-10-21 at 17 56 57 (cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun pierrejbrun@gmail.com

Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
(cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun <pierrejbrun@gmail.com>
@github-actions github-actions bot mentioned this pull request Oct 21, 2025
@kaxil kaxil marked this pull request as ready for review October 21, 2025 17:31
@kaxil
Copy link
Member

kaxil commented Oct 21, 2025

I was manually cherry-picking and included this already while I was at it.

ccc33ff

@kaxil kaxil closed this Oct 21, 2025
@kaxil kaxil deleted the backport-c3f53b1-v3-1-test branch October 23, 2025 15:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants