diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 4773a89d1dd16..423c624f0c297 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1589,6 +1589,7 @@ def schedule_tis( start_date=timezone.utcnow(), end_date=timezone.utcnow(), duration=0, + try_number=TI.try_number + 1, ) .execution_options( synchronize_session=False, diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 93e0611243616..021d09e889420 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -36,6 +36,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, clear_task_instances from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule +from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import ShortCircuitOperator from airflow.serialization.serialized_objects import SerializedDAG @@ -2019,6 +2020,29 @@ def execute_complete(self): assert ti.state == TaskInstanceState.DEFERRED +def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session): + """ + When empty operator is not actually run, then we need to increment the try_number, + since ordinarily it's incremented when scheduled, but empty operator is generally not scheduled. + """ + + with dag_maker(session=session): + BashOperator(task_id="real_task", bash_command="echo 1") + EmptyOperator(task_id="empty_task") + + dr: DagRun = dag_maker.create_dagrun(session=session) + session.commit() + tis = dr.task_instances + dr.schedule_tis(tis, session=session) + session.commit() + session.expunge_all() + tis = dr.get_task_instances(session=session) + real_ti = next(x for x in tis if x.task_id == "real_task") + empty_ti = next(x for x in tis if x.task_id == "empty_task") + assert real_ti.try_number == 1 + assert empty_ti.try_number == 1 + + def test_mapped_expand_kwargs(dag_maker): with dag_maker():