diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 44ae2ce34d707..da40d9eea7e77 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1298,7 +1298,8 @@ def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, total_a def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" - dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session) + # added all() to save runtime, otherwise query is executed more than once + dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all() active_runs_of_dags = Counter( DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session), diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 5be761279006a..8d28e01867bb0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5141,7 +5141,9 @@ def test_execute_queries_count_with_harvested_dags(self, expected_query_count, d with assert_queries_count(expected_query_count, margin=15): with mock.patch.object(DagRun, "next_dagruns_to_examine") as mock_dagruns: - mock_dagruns.return_value = dagruns + query = MagicMock() + query.all.return_value = dagruns + mock_dagruns.return_value = query self.job_runner._run_scheduler_loop()