Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,19 +769,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
# If the dag is missing, fail the task and continue to the next task.
# If the dag is transiently missing, skip scheduling it this iteration
# and try again next time instead of bulk-failing all scheduled tasks.
# See: https://github.com/apache/airflow/issues/62050
if not serialized_dag:
self.log.error(
"DAG '%s' for task instance %s not found in serialized_dag table",
self.log.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a batch has multiple TIs for the same DAG, each one will hit get_dag_for_run and log this warning individually before the starved_dags filter kicks in on the next query iteration. Consider checking if dag_id in starved_dags: continue before entering the has_task_concurrency_limits block, both to avoid redundant DB lookups and to reduce log noise.

"DAG '%s' for task instance %s not found in serialized_dag table, "
"skipping scheduling for this iteration and will retry next time",
dag_id,
task_instance,
)
session.execute(
update(TI)
.where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
.values(state=TaskInstanceState.FAILED)
.execution_options(synchronize_session="fetch")
)
starved_dags.add(dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a DAG is permanently deleted (not just transiently missing), tasks will stay SCHEDULED forever and this warning will fire every scheduler iteration. Would it be worth tracking consecutive misses per dag_id and escalating to failure after N iterations? The issue thread mentioned this approach too. At minimum, this should probably be documented as a known limitation in the PR description so follow-up work can address it.

continue

task_concurrency_limit: int | None = None
Expand Down
17 changes: 13 additions & 4 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from airflow.models.callback import ExecutorCallback
from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DBDagBag
from airflow.models.dagbundle import DagBundleModel
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
Expand Down Expand Up @@ -1806,8 +1807,14 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker, mock_ex
session.rollback()
session.close()

def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
"""Check that task instances of missing DAGs are failed"""
def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session):
"""Check that task instances of transiently missing DAGs are skipped, not bulk-failed.

When the serialized DAG is transiently missing (e.g. during a DAG file parse cycle),
the scheduler should skip scheduling for that DAG in the current iteration and retry
next time, rather than bulk-failing all SCHEDULED task instances.
See: https://github.com/apache/airflow/issues/62050
"""
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag"
task_id_1 = "dummy"
task_id_2 = "dummydummy"
Expand All @@ -1819,7 +1826,7 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

self.job_runner.scheduler_dag_bag = mock.MagicMock()
self.job_runner.scheduler_dag_bag = mock.MagicMock(spec=DBDagBag)
self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None

dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
Expand All @@ -1831,10 +1838,12 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
# No tasks should be queued
assert len(res) == 0
# Tasks should remain SCHEDULED (not be bulk-failed)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
assert all(ti.state == State.FAILED for ti in tis)
assert all(ti.state == State.SCHEDULED for ti in tis)

def test_nonexistent_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_nonexistent_pool"
Expand Down
Loading