From 7031a203192f19a66e3db4fd162da119ac3c1256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marco=20K=C3=BCttelwesch?= Date: Tue, 18 Apr 2023 10:55:27 +0200 Subject: [PATCH 1/5] Function returns list of dagruns and not query --- airflow/models/dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 42845b34bdd0e..2884ba92ca59c 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -345,7 +345,7 @@ def next_dagruns_to_examine( return with_row_locks( query.limit(max_number), of=cls, session=session, **skip_locked(session=session) - ) + ).all() @classmethod @provide_session From ea7eb4ca582e9547c22893f593e3f5760fa4fd3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marco=20K=C3=BCttelwesch?= Date: Wed, 19 Apr 2023 12:48:27 +0000 Subject: [PATCH 2/5] Changed pytests --- tests/models/test_dagrun.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index ffe78b17b6b85..02834feeea0f8 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -833,14 +833,14 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): session=session, ) - runs = DagRun.next_dagruns_to_examine(state, session).all() + runs = DagRun.next_dagruns_to_examine(state, session) assert runs == [dr] orm_dag.is_paused = True session.flush() - runs = DagRun.next_dagruns_to_examine(state, session).all() + runs = DagRun.next_dagruns_to_examine(state, session) assert runs == [] @mock.patch.object(Stats, "timing") From 1489c4e890a79d066102968c328f7a88561514ca Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 23 May 2023 09:04:01 +0000 Subject: [PATCH 3/5] Changed all to _start_queued_dagruns --- airflow/jobs/scheduler_job_runner.py | 2 +- airflow/models/dagrun.py | 2 +- tests/models/test_dagrun.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 44ae2ce34d707..78604f58d4425 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1298,7 +1298,7 @@ def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, total_a def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" - dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session) + dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all() active_runs_of_dags = Counter( DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session), diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2884ba92ca59c..42845b34bdd0e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -345,7 +345,7 @@ def next_dagruns_to_examine( return with_row_locks( query.limit(max_number), of=cls, session=session, **skip_locked(session=session) - ).all() + ) @classmethod @provide_session diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 02834feeea0f8..ffe78b17b6b85 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -833,14 +833,14 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): session=session, ) - runs = DagRun.next_dagruns_to_examine(state, session) + runs = DagRun.next_dagruns_to_examine(state, session).all() assert runs == [dr] orm_dag.is_paused = True session.flush() - runs = DagRun.next_dagruns_to_examine(state, session) + runs = DagRun.next_dagruns_to_examine(state, session).all() assert runs == [] @mock.patch.object(Stats, "timing") From 93e8bd962ccf2550e6463321f7b17a5feba971f1 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 23 May 2023 11:44:48 +0000 Subject: [PATCH 4/5] Added comment and fixed tests --- airflow/jobs/scheduler_job_runner.py | 1 + tests/jobs/test_scheduler_job.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 78604f58d4425..2ade664aec5fc 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1298,6 +1298,7 @@ def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, total_a def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" + # added all() to save runtime, otherwise query is executed more then once dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all() active_runs_of_dags = Counter( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 5be761279006a..8d28e01867bb0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5141,7 +5141,9 @@ def test_execute_queries_count_with_harvested_dags(self, expected_query_count, d with assert_queries_count(expected_query_count, margin=15): with mock.patch.object(DagRun, "next_dagruns_to_examine") as mock_dagruns: - mock_dagruns.return_value = dagruns + query = MagicMock() + query.all.return_value = dagruns + mock_dagruns.return_value = query self.job_runner._run_scheduler_loop() From e5694f064f4f33ff5d3fd2ff0c397bf9dac48b86 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 25 May 2023 12:13:30 +0000 Subject: [PATCH 5/5] Fixed typo --- airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 2ade664aec5fc..da40d9eea7e77 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1298,7 +1298,7 @@ def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, total_a def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" - # added all() to save runtime, otherwise query is executed more then once + # added all() to save runtime, otherwise query is executed more than once dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all() active_runs_of_dags = Counter(