Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,8 @@ def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, total_a

def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session)
# added all() to save runtime, otherwise query is executed more than once
dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all()

active_runs_of_dags = Counter(
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
Expand Down
4 changes: 3 additions & 1 deletion tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5141,7 +5141,9 @@ def test_execute_queries_count_with_harvested_dags(self, expected_query_count, d

with assert_queries_count(expected_query_count, margin=15):
with mock.patch.object(DagRun, "next_dagruns_to_examine") as mock_dagruns:
mock_dagruns.return_value = dagruns
query = MagicMock()
query.all.return_value = dagruns
mock_dagruns.return_value = query

self.job_runner._run_scheduler_loop()

Expand Down