From 5625b69cd29192919759ae3ee360da7e18efa17b Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Tue, 12 Apr 2022 10:45:17 +0300 Subject: [PATCH 1/4] Fix regression in pool metrics --- airflow/jobs/scheduler_job.py | 5 ++++ tests/jobs/test_scheduler_job.py | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 628207df6b487..5a5959c601814 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -369,6 +369,9 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = starved_pools.add(pool_name) continue + # Make sure to emit metrics if pool has no starving tasks + pool_num_starving_tasks[pool_name] += 0 + pool_total = pools[pool]["total"] open_slots = pools[pool]["open"] @@ -407,6 +410,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = pool, ) + pool_num_starving_tasks[pool_name] += 1 + num_starving_tasks_total += 1 starved_tasks.add((task_instance.dag_id, task_instance.task_id)) continue diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 80e3cb1f347b2..6e4fea9b5d681 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1096,6 +1096,46 @@ def test_find_executable_task_instances_not_enough_task_concurrency_for_first(se session.rollback() + @mock.patch('airflow.jobs.scheduler_job.Stats.gauge') + def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker): + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_id = 'SchedulerJobTest.test_emit_pool_starving_tasks_metrics' + with dag_maker(dag_id=dag_id): + op = DummyOperator(task_id='op', pool_slots=2) + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + + ti = dr.get_task_instance(op.task_id, session) + ti.state = State.SCHEDULED + + set_default_pool_slots(1) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + assert 0 == len(res) + + mock_stats_gauge.assert_has_calls( + [mock.call('scheduler.tasks.starving', 1), mock.call('pool.starving_tasks.default_pool', 1)], + any_order=True, + ) + mock_stats_gauge.reset_mock() + + set_default_pool_slots(2) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + assert 1 == len(res) + + mock_stats_gauge.assert_has_calls( + [mock.call('scheduler.tasks.starving', 0), mock.call('pool.starving_tasks.default_pool', 0)], + any_order=True, + ) + + session.rollback() + session.close() + def test_enqueue_task_instances_with_queued_state(self, dag_maker): dag_id = 'SchedulerJobTest.test_enqueue_task_instances_with_queued_state' task_id_1 = 'dummy' From ac2d12f30a14e92b08ddec19281c1df6417beb99 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Tue, 12 Apr 2022 11:11:58 +0300 Subject: [PATCH 2/4] Fix regression in pool metrics --- tests/jobs/test_scheduler_job.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 6e4fea9b5d681..4aab46825eb12 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1117,7 +1117,10 @@ def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker): assert 0 == len(res) mock_stats_gauge.assert_has_calls( - [mock.call('scheduler.tasks.starving', 1), mock.call('pool.starving_tasks.default_pool', 1)], + [ + mock.call('scheduler.tasks.starving', 1), + mock.call(f'pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}', 1), + ], any_order=True, ) mock_stats_gauge.reset_mock() @@ -1129,7 +1132,10 @@ def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker): assert 1 == len(res) mock_stats_gauge.assert_has_calls( - [mock.call('scheduler.tasks.starving', 0), mock.call('pool.starving_tasks.default_pool', 0)], + [ + mock.call('scheduler.tasks.starving', 0), + mock.call(f'pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}', 0), + ], any_order=True, ) From fc493cab609e934107a6eeb394fee2df3223899d Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 12 Apr 2022 14:35:21 +0100 Subject: [PATCH 3/4] Update airflow/jobs/scheduler_job.py --- airflow/jobs/scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 5a5959c601814..90bfed308e7d2 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -370,7 +370,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = continue # Make sure to emit metrics if pool has no starving tasks - pool_num_starving_tasks[pool_name] += 0 + pool_num_starving_tasks.setdefault(pool_name, 0) pool_total = pools[pool]["total"] open_slots = pools[pool]["open"] From 99df1cc33f5bdb41ad0b9cf857e527c08f10bebf Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Tue, 12 Apr 2022 19:13:02 +0300 Subject: [PATCH 4/4] Merge --- airflow/jobs/scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 51449a55e5418..f3872c63a89d1 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -389,7 +389,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = pool_total, pool_name, ) - + pool_num_starving_tasks[pool_name] += 1 num_starving_tasks_total += 1 starved_tasks.add((task_instance.dag_id, task_instance.task_id))