diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 5dc3c56d36242..f3872c63a89d1 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -364,6 +364,9 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = starved_pools.add(pool_name) continue + # Make sure to emit metrics if pool has no starving tasks + pool_num_starving_tasks.setdefault(pool_name, 0) + pool_total = pool_stats["total"] open_slots = pool_stats["open"] @@ -387,6 +390,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = pool_name, ) + pool_num_starving_tasks[pool_name] += 1 + num_starving_tasks_total += 1 starved_tasks.add((task_instance.dag_id, task_instance.task_id)) continue diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index e5d077c86d1f4..7a22f0bb21567 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1135,6 +1135,52 @@ def test_find_executable_task_instances_not_enough_task_concurrency_for_first(se session.rollback() + @mock.patch('airflow.jobs.scheduler_job.Stats.gauge') + def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker): + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_id = 'SchedulerJobTest.test_emit_pool_starving_tasks_metrics' + with dag_maker(dag_id=dag_id): + op = DummyOperator(task_id='op', pool_slots=2) + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + + ti = dr.get_task_instance(op.task_id, session) + ti.state = State.SCHEDULED + + set_default_pool_slots(1) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + assert 0 == len(res) + + mock_stats_gauge.assert_has_calls( + [ + mock.call('scheduler.tasks.starving', 1), + mock.call(f'pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}', 1), + ], + any_order=True, + ) + mock_stats_gauge.reset_mock() + + set_default_pool_slots(2) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + assert 1 == len(res) + + mock_stats_gauge.assert_has_calls( + [ + mock.call('scheduler.tasks.starving', 0), + mock.call(f'pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}', 0), + ], + any_order=True, + ) + + session.rollback() + session.close() + def test_enqueue_task_instances_with_queued_state(self, dag_maker): dag_id = 'SchedulerJobTest.test_enqueue_task_instances_with_queued_state' task_id_1 = 'dummy'