diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 9e02f4775f549..f3fc068b04317 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -716,6 +716,7 @@ def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]: # During expansion we may change some tis into non-schedulable # states, so we need to re-compute. if expansion_happened: + changed_tis = True new_unfinished_tis = [t for t in unfinished_tis if t.state in State.unfinished] finished_tis.extend(t for t in unfinished_tis if t.state in State.finished) unfinished_tis = new_unfinished_tis diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index ffee25f5e89a9..a3e2a5065259f 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -1943,7 +1943,9 @@ def say_hi(): tis["say_hi"].state = TaskInstanceState.SUCCESS session.flush() - dr.update_state(session=session) + dr.update_state(session=session) # expands the mapped tasks + dr.update_state(session=session) # marks the task as skipped + dr.update_state(session=session) # marks dagrun as success assert dr.state == DagRunState.SUCCESS assert tis["add_one__1"].state == TaskInstanceState.SKIPPED @@ -2099,3 +2101,48 @@ def tg(va): ti.run() assert len(results) == 1 assert list(results[("t3", -1)]) == [["a", "b"], [4], ["z"]] + + +def test_mapping_against_empty_list(dag_maker, session): + with dag_maker(session=session): + + @task + def add_one(x: int): + return x + 1 + + @task + def say_hi(): + print("Hi") + + @task + def say_bye(): + print("Bye") + + added_values = add_one.expand(x=[]) + added_more_values = add_one.expand(x=[]) + added_more_more_values = add_one.expand(x=[]) + say_hi() >> say_bye() >> added_values + added_values >> added_more_values >> added_more_more_values + + dr: DagRun = dag_maker.create_dagrun() + + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + say_hi_ti = tis["say_hi"] + say_bye_ti = tis["say_bye"] + say_hi_ti.state = TaskInstanceState.SUCCESS + say_bye_ti.state = TaskInstanceState.SUCCESS + session.merge(say_hi_ti) + session.merge(say_bye_ti) + session.flush() + + dr.update_state(session=session) + dr.update_state(session=session) # marks first empty mapped task as skipped + dr.update_state(session=session) # marks second empty mapped task as skipped + dr.update_state(session=session) # marks the third empty mapped task as skipped and dagrun as success + tis = {ti.task_id: ti.state for ti in dr.get_task_instances(session=session)} + assert tis["say_hi"] == TaskInstanceState.SUCCESS + assert tis["say_bye"] == TaskInstanceState.SUCCESS + assert tis["add_one"] == TaskInstanceState.SKIPPED + assert tis["add_one__1"] == TaskInstanceState.SKIPPED + assert tis["add_one__2"] == TaskInstanceState.SKIPPED + assert dr.state == State.SUCCESS