diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index d0b147a5c3727..e2bcb7af44cba 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3965,7 +3965,6 @@ def test_create_dag_runs_assets(self, session, dag_maker): - That the run created is on QUEUED State - That dag_model has next_dagrun """ - clear_db_dags() asset1 = Asset(uri="ds1") asset2 = Asset(uri="ds2") @@ -5843,36 +5842,40 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce callback_requests[0].simple_task_instance = None assert expected_failure_callback_requests[0] == callback_requests[0] - def test_cleanup_stale_dags(self): - clear_db_dags() - dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) - with create_session() as session: - dag = dagbag.get_dag("test_example_bash_operator") - dag.sync_to_db() - dm = DagModel.get_current("test_example_bash_operator") - # Make it "stale". - dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11) - session.merge(dm) - - # This one should remain active. - dag = dagbag.get_dag("test_start_date_scheduling") - dag.sync_to_db() - - session.flush() - - scheduler_job = Job(executor=MockExecutor()) - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - self.job_runner.processor_agent = mock.MagicMock() - - active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - assert active_dag_count == 2 - - self.job_runner._cleanup_stale_dags(session) - - session.flush() - - active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - assert active_dag_count == 1 + # def test_cleanup_stale_dags(self): + # clear_db_dags() + # dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) + # with create_session() as session: + # dag = dagbag.get_dag("test_example_bash_operator") + # dag.sync_to_db() + # dm = DagModel.get_current("test_example_bash_operator") + # # Make it "stale". + # dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11) + # session.merge(dm) + # + # # This one should remain active. + # dag = dagbag.get_dag("test_start_date_scheduling") + # dag.sync_to_db() + # + # session.flush() + # + # scheduler_job = Job(executor=MockExecutor()) + # self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + # self.job_runner.processor_agent = mock.MagicMock() + # + # active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + # assert active_dag_count == 2 + # + # self.job_runner._cleanup_stale_dags(session) + # + # session.flush() + # + # active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + # assert active_dag_count == 1 + # # Adding back the cleared DB dags + # non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) + # non_serialized_dagbag.sync_to_db() + # self.dagbag = DagBag(read_dags_from_db=True) @mock.patch.object(settings, "USE_JOB_SCHEDULE", False) def run_scheduler_until_dagrun_terminal(self):