Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
# During expansion we may change some tis into non-schedulable
# states, so we need to re-compute.
if expansion_happened:
changed_tis = True
new_unfinished_tis = [t for t in unfinished_tis if t.state in State.unfinished]
finished_tis.extend(t for t in unfinished_tis if t.state in State.finished)
unfinished_tis = new_unfinished_tis
Expand Down
49 changes: 48 additions & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,9 @@ def say_hi():
tis["say_hi"].state = TaskInstanceState.SUCCESS
session.flush()

dr.update_state(session=session)
dr.update_state(session=session) # expands the mapped tasks
dr.update_state(session=session) # marks the task as skipped
dr.update_state(session=session) # marks dagrun as success
assert dr.state == DagRunState.SUCCESS
assert tis["add_one__1"].state == TaskInstanceState.SKIPPED

Expand Down Expand Up @@ -2099,3 +2101,48 @@ def tg(va):
ti.run()
assert len(results) == 1
assert list(results[("t3", -1)]) == [["a", "b"], [4], ["z"]]


def test_mapping_against_empty_list(dag_maker, session):
with dag_maker(session=session):

@task
def add_one(x: int):
return x + 1

@task
def say_hi():
print("Hi")

@task
def say_bye():
print("Bye")

added_values = add_one.expand(x=[])
added_more_values = add_one.expand(x=[])
added_more_more_values = add_one.expand(x=[])
say_hi() >> say_bye() >> added_values
added_values >> added_more_values >> added_more_more_values

dr: DagRun = dag_maker.create_dagrun()

tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
say_hi_ti = tis["say_hi"]
say_bye_ti = tis["say_bye"]
say_hi_ti.state = TaskInstanceState.SUCCESS
say_bye_ti.state = TaskInstanceState.SUCCESS
session.merge(say_hi_ti)
session.merge(say_bye_ti)
session.flush()

dr.update_state(session=session)
dr.update_state(session=session) # marks first empty mapped task as skipped
dr.update_state(session=session) # marks second empty mapped task as skipped
dr.update_state(session=session) # marks the third empty mapped task as skipped and dagrun as success
tis = {ti.task_id: ti.state for ti in dr.get_task_instances(session=session)}
assert tis["say_hi"] == TaskInstanceState.SUCCESS
assert tis["say_bye"] == TaskInstanceState.SUCCESS
assert tis["add_one"] == TaskInstanceState.SKIPPED
assert tis["add_one__1"] == TaskInstanceState.SKIPPED
assert tis["add_one__2"] == TaskInstanceState.SKIPPED
assert dr.state == State.SUCCESS