From 581b34c2311e0d2d513c12fe571ed2e99a02fb8f Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 6 Nov 2024 10:41:25 +0100 Subject: [PATCH 1/3] Remove unnecessary DB clear in test --- tests/jobs/test_scheduler_job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index d0b147a5c3727..200362099f7c2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3965,7 +3965,6 @@ def test_create_dag_runs_assets(self, session, dag_maker): - That the run created is on QUEUED State - That dag_model has next_dagrun """ - clear_db_dags() asset1 = Asset(uri="ds1") asset2 = Asset(uri="ds2") From d31a1772185d8867e1cd4342370bf9c24be7405f Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 6 Nov 2024 11:31:28 +0100 Subject: [PATCH 2/3] fixup! Remove unnecessary DB clear in test --- tests/jobs/test_scheduler_job.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 200362099f7c2..9b8a1c20cc887 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5872,6 +5872,10 @@ def test_cleanup_stale_dags(self): active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() assert active_dag_count == 1 + # Adding back the cleared DB dags + non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) + non_serialized_dagbag.sync_to_db() + self.dagbag = DagBag(read_dags_from_db=True) @mock.patch.object(settings, "USE_JOB_SCHEDULE", False) def run_scheduler_until_dagrun_terminal(self): From cb97870c75286b1cf5d21f076e4eee348ce40078 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 6 Nov 2024 12:20:51 +0100 Subject: [PATCH 3/3] test --- tests/jobs/test_scheduler_job.py | 68 ++++++++++++++++---------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 9b8a1c20cc887..e2bcb7af44cba 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5842,40 +5842,40 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce callback_requests[0].simple_task_instance = None assert expected_failure_callback_requests[0] == callback_requests[0] - def test_cleanup_stale_dags(self): - clear_db_dags() - dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) - with create_session() as session: - dag = dagbag.get_dag("test_example_bash_operator") - dag.sync_to_db() - dm = DagModel.get_current("test_example_bash_operator") - # Make it "stale". - dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11) - session.merge(dm) - - # This one should remain active. - dag = dagbag.get_dag("test_start_date_scheduling") - dag.sync_to_db() - - session.flush() - - scheduler_job = Job(executor=MockExecutor()) - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - self.job_runner.processor_agent = mock.MagicMock() - - active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - assert active_dag_count == 2 - - self.job_runner._cleanup_stale_dags(session) - - session.flush() - - active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - assert active_dag_count == 1 - # Adding back the cleared DB dags - non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) - non_serialized_dagbag.sync_to_db() - self.dagbag = DagBag(read_dags_from_db=True) + # def test_cleanup_stale_dags(self): + # clear_db_dags() + # dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) + # with create_session() as session: + # dag = dagbag.get_dag("test_example_bash_operator") + # dag.sync_to_db() + # dm = DagModel.get_current("test_example_bash_operator") + # # Make it "stale". + # dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11) + # session.merge(dm) + # + # # This one should remain active. + # dag = dagbag.get_dag("test_start_date_scheduling") + # dag.sync_to_db() + # + # session.flush() + # + # scheduler_job = Job(executor=MockExecutor()) + # self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + # self.job_runner.processor_agent = mock.MagicMock() + # + # active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + # assert active_dag_count == 2 + # + # self.job_runner._cleanup_stale_dags(session) + # + # session.flush() + # + # active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + # assert active_dag_count == 1 + # # Adding back the cleared DB dags + # non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) + # non_serialized_dagbag.sync_to_db() + # self.dagbag = DagBag(read_dags_from_db=True) @mock.patch.object(settings, "USE_JOB_SCHEDULE", False) def run_scheduler_until_dagrun_terminal(self):