Skip to content

Conversation

@avkirilishin
Copy link
Contributor

closes: #34023

Redo of the #34337
Relates to #35541 (comment)

@uranusjr Can you please take a look at the approach? I haven't checked the performance yet, but if the approach is okay, I will check it if needed. Perhaps it can be optimized.

And I found that something goes wrong with the example from the get_relevant_upstream_map_indexes method. When I checked the dag:

import pendulum

from airflow import DAG
from airflow.decorators import task, task_group


with DAG(
    'mul-in-tg',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 12, 26),
) as dag:
    @task
    def upstream(inp):
        return inp

    @task
    def this_task(v):  # This is self.task.
        return v * 2

    @task_group
    def tg1(inp):
        val = upstream(inp)  # This is the upstream task.
        this_task(val)  # When inp is 1, val here should resolve to 2.
        return val

    val = tg1.expand(inp=[1, 2, 3])  # This val is the same object returned by tg1.

    @task
    def another_task(inp, val):
        print("(inp, val)", (inp, val))

    @task_group
    def tg2(inp):
        another_task(inp, val)  # val here should resolve to [2, 4, 6].

    tg2.expand(inp=["a", "b"])

I observed:
image
Is this the expected behavior?

@eladkal eladkal added this to the Airflow 2.8.1 milestone Dec 31, 2023
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Dec 31, 2023
@potiuk
Copy link
Member

potiuk commented Jan 25, 2024

cc: @uranusjr - following discussion in #35541 - do you think that one should fix the problem ?

@eladkal
Copy link
Contributor

eladkal commented Mar 16, 2024

@avkirilishin can you rebase and resolve conflics?

@ephraimbuddy
Copy link
Contributor

Can you add this test:

def test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker, session):
    """Test that one failed trigger rule works well in mapped task group"""
    with dag_maker() as dag:

        @dag.task
        def t1():
            return [1, 2, 3]

        @task_group("tg1")
        def tg1(a):
            @dag.task()
            def t2(a):
                return a

            @dag.task(trigger_rule=TriggerRule.ONE_FAILED)
            def t3(a):
                return a

            t2(a) >> t3(a)

        t = t1()
        tg1.expand(a=t)

    dr = dag_maker.create_dagrun()
    ti = dr.get_task_instance(task_id="t1")
    ti.run()
    dr.task_instance_scheduling_decisions()
    ti3 = dr.get_task_instance(task_id="tg1.t3")
    assert not ti3.state

Comment on lines +3374 to +3376
# The task has not been expanded yet. Let's help it.
if self.map_index == -1 and ti_count > 1:
return range(0, ancestor_ti_count)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I’d prefer we don’t call this function for this case at all instead (i.e. try to ensure the task is expanded).

Also this check is not correct. If ti_count is exactly 1 if the task is mapped (against a list of length 1).

Comment on lines +453 to +454
if ti_needs_expansion and success > 0:
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this outside of the blocks instead of repeating it everywhere? This is both cumbersome and error-prone.

@eladkal
Copy link
Contributor

eladkal commented Apr 26, 2024

@avkirilishin are you still working on this PR?

@utkarsharma2 utkarsharma2 removed this from the Airflow 2.9.2 milestone Jun 4, 2024
@utkarsharma2 utkarsharma2 added this to the Airflow 2.9.3 milestone Jun 4, 2024
@eladkal eladkal removed this from the Airflow 2.9.3 milestone Jun 13, 2024
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 31, 2024
@github-actions github-actions bot closed this Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stale Stale PRs per the .github/workflows/stale.yml policy file type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Trigger Rule ONE_FAILED does not work in task group with mapped tasks

6 participants