diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 42fd1117483c7..48c0866f1b722 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -456,4 +456,5 @@ def sync_to_db(self): from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel DAG.bulk_sync_to_db(self.dags.values()) - SerializedDagModel.bulk_sync_to_db(self.dags.values()) + if self.store_serialized_dags: + SerializedDagModel.bulk_sync_to_db(self.dags.values()) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 2e7ac687e45d8..bac2c4ce90c3c 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -30,7 +30,7 @@ from airflow.models.dag import DAG, DagModel from airflow.models.dagcode import DagCode from airflow.serialization.serialized_objects import SerializedDAG -from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, STORE_SERIALIZED_DAGS, json +from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime @@ -205,9 +205,6 @@ def bulk_sync_to_db(dags: List[DAG], session=None): :type dags: List[airflow.models.dag.DAG] :return: None """ - if not STORE_SERIALIZED_DAGS: - return - for dag in dags: if not dag.is_subdag: SerializedDagModel.write_dag( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 644df90e63def..a8bfd698908d2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -44,6 +44,7 @@ from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.bash import BashOperator from airflow.operators.dummy_operator import DummyOperator +from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils import timezone from airflow.utils.dag_processing import FailureCallbackRequest, SimpleDag, SimpleDagBag from airflow.utils.dates import days_ago @@ -122,7 +123,11 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed @classmethod def setUpClass(cls): - cls.dagbag = DagBag() + # Ensure the DAGs we are looking at from the DB are up-to-date + non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag.store_serialized_dags = True + non_serialized_dagbag.sync_to_db() + cls.dagbag = DagBag(store_serialized_dags=True) def test_dag_file_processor_sla_miss_callback(self): """ @@ -415,6 +420,7 @@ def test_dag_file_processor_dagrun_once(self): schedule_interval="@once") dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) @@ -491,6 +497,8 @@ def test_dag_file_processor_process_task_instances(self, state, start_date, end_ orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -537,6 +545,8 @@ def test_dag_file_processor_process_task_instances_with_task_concurrency( orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -587,6 +597,8 @@ def test_dag_file_processor_process_task_instances_depends_on_past(self, state, orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -621,6 +633,8 @@ def test_dag_file_processor_do_not_schedule_removed_task(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -628,9 +642,11 @@ def test_dag_file_processor_do_not_schedule_removed_task(self): self.assertIsNotNone(dr) dr = DagRun.find(run_id=dr.run_id)[0] + # Re-create the DAG, but remove the task dag = DAG( dag_id='test_scheduler_do_not_schedule_removed_task', start_date=DEFAULT_DATE) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) @@ -651,6 +667,8 @@ def test_dag_file_processor_do_not_schedule_too_early(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -668,6 +686,8 @@ def test_dag_file_processor_do_not_schedule_without_tasks(self): with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear(session=session) dag.start_date = None @@ -688,6 +708,8 @@ def test_dag_file_processor_do_not_run_finished(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -724,6 +746,8 @@ def test_dag_file_processor_add_new_task(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -737,6 +761,7 @@ def test_dag_file_processor_add_new_task(self): task_id='dummy2', dag=dag, owner='airflow') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor._process_task_instances(dag, dag_runs=[dr]) @@ -763,6 +788,7 @@ def test_dag_file_processor_verify_max_active_runs(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -791,6 +817,8 @@ def test_dag_file_processor_fail_dagrun_timeout(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -831,6 +859,8 @@ def test_dag_file_processor_verify_max_active_runs_and_dagrun_timeout(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -868,6 +898,7 @@ def test_dag_file_processor_max_active_runs_respected_after_clear(self): session.merge(orm_dag) session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -892,8 +923,6 @@ def test_dag_file_processor_max_active_runs_respected_after_clear(self): def test_find_dags_to_run_includes_subdags(self): dag = self.dagbag.get_dag('test_subdag_operator') - print(self.dagbag.dag_ids) - print(self.dagbag.dag_folder) self.assertGreater(len(dag.subdags), 0) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values()) @@ -931,7 +960,7 @@ def setup_dag(dag_id, schedule_interval, start_date, catchup): session.commit() session.close() - return dag + return SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) now = timezone.utcnow() six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace( @@ -1002,6 +1031,8 @@ def test_dag_file_processor_auto_align(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dr = dag_file_processor.create_dag_run(dag) @@ -1022,6 +1053,7 @@ def test_dag_file_processor_auto_align(self): orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() @@ -1332,7 +1364,11 @@ def setUp(self): @classmethod def setUpClass(cls): - cls.dagbag = DagBag() + # Ensure the DAGs we are looking at from the DB are up-to-date + non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag.store_serialized_dags = True + non_serialized_dagbag.sync_to_db() + cls.dagbag = DagBag(store_serialized_dags=True) def test_is_alive(self): job = SchedulerJob(None, heartrate=10, state=State.RUNNING) @@ -1454,6 +1490,7 @@ def test_execute_task_instances_is_paused_wont_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1485,6 +1522,8 @@ def test_execute_task_instances_no_dagrun_task_will_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1511,6 +1550,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1537,6 +1577,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1577,6 +1618,7 @@ def test_find_executable_task_instances_pool(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1, pool='a') task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1620,6 +1662,7 @@ def test_find_executable_task_instances_in_default_pool(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) op1 = DummyOperator(dag=dag, task_id='dummy1') op2 = DummyOperator(dag=dag, task_id='dummy2') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) executor = MockExecutor(do_update=True) @@ -1661,6 +1704,7 @@ def test_nonexistent_pool(self): task_id = 'dummy_wrong_pool' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist") + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1685,6 +1729,7 @@ def test_find_executable_task_instances_none(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1703,6 +1748,7 @@ def test_find_executable_task_instances_concurrency(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1749,6 +1795,7 @@ def test_find_executable_task_instances_concurrency_queued(self): task1 = DummyOperator(dag=dag, task_id='dummy1') task2 = DummyOperator(dag=dag, task_id='dummy2') task3 = DummyOperator(dag=dag, task_id='dummy3') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1783,6 +1830,7 @@ def test_find_executable_task_instances_task_concurrency(self): # pylint: disab dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1, task_concurrency=2) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) executor = MockExecutor(do_update=True) @@ -1877,6 +1925,7 @@ def test_change_state_for_executable_task_instances_no_tis_with_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) self._make_simple_dag_bag([dag]) scheduler = SchedulerJob() @@ -1909,6 +1958,7 @@ def test_enqueue_task_instances_with_queued_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1931,6 +1981,7 @@ def test_execute_task_instances_nothing(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = SimpleDagBag([]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1956,6 +2007,7 @@ def test_execute_task_instances(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3) task1 = DummyOperator(dag=dag, task_id=task_id_1) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -2026,6 +2078,7 @@ def test_execute_task_instances_limit(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -2062,15 +2115,20 @@ def test_change_state_for_tis_without_dagrun(self): DummyOperator(task_id='dummy', dag=dag1, owner='airflow') DummyOperator(task_id='dummy_b', dag=dag1, owner='airflow') + dag1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag1)) dag2 = DAG(dag_id='test_change_state_for_tis_without_dagrun_dont_change', start_date=DEFAULT_DATE) DummyOperator(task_id='dummy', dag=dag2, owner='airflow') + dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2)) + dag3 = DAG(dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', start_date=DEFAULT_DATE) DummyOperator(task_id='dummy', dag=dag3, owner='airflow') + dag3 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag3)) + session = settings.Session() dr1 = dag1.create_dagrun(run_id=DagRunType.SCHEDULED.value, state=State.RUNNING, @@ -2154,6 +2212,7 @@ def test_change_state_for_tasks_failed_to_execute(self): task_id='task_id', dag=dag, owner='airflow') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) # If there's no left over task in executor.queued_tasks, nothing happens session = settings.Session() @@ -2202,6 +2261,7 @@ def test_reset_state_for_orphaned_tasks(self): with dag: op1 = DummyOperator(task_id='op1') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__", @@ -2250,6 +2310,7 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(self, with dag: op1 = DummyOperator(task_id='op1') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) # Create DAG run with FAILED state dag.clear() @@ -2311,6 +2372,7 @@ def evaluate_dagrun( self.null_exec.mock_task_fail(dag_id, tid, ex_date) try: + # This needs a _REAL_ dag, not the serialized version dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs) except AirflowException: pass @@ -2452,7 +2514,7 @@ def test_scheduler_start_date(self): dag_id = 'test_start_date_scheduling' dag = self.dagbag.get_dag(dag_id) dag.clear() - self.assertTrue(dag.start_date > datetime.datetime.utcnow()) + self.assertGreater(dag.start_date, datetime.datetime.utcnow()) scheduler = SchedulerJob(dag_id, executor=self.null_exec, @@ -2592,6 +2654,8 @@ def test_scheduler_verify_pool_full(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) scheduler = SchedulerJob(executor=self.null_exec) @@ -2643,6 +2707,7 @@ def test_scheduler_reschedule(self): bash_command='echo 1', ) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dag.is_subdag = False diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index a578307ddbebc..fe96d731dcc65 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -19,7 +19,6 @@ """Unit tests for SerializedDagModel.""" import unittest -from unittest import mock from airflow import DAG, example_dags as example_dags_module from airflow.models import DagBag @@ -117,7 +116,6 @@ def test_remove_stale_dags(self): self.assertFalse(SDM.has_dag(stale_dag.dag_id)) self.assertTrue(SDM.has_dag(fresh_dag.dag_id)) - @mock.patch('airflow.models.serialized_dag.STORE_SERIALIZED_DAGS', True) def test_bulk_sync_to_db(self): dags = [ DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),