From da07181097b98a18d248607452e94bea7eea00a6 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 6 May 2020 18:33:42 +0100 Subject: [PATCH] Test that DagFileProcessor can operator against on a Serialized DAG As part of the scheduler HA work we are going to want to separate the parsing from the scheduling, so this changes the tests to ensure that the important methods of DagFileProcessor can do everything the need to when given a SerializedDAG, not just a DAG. i.e. that we have correctly serialized all the necessary fields. --- airflow/models/dagbag.py | 3 +- airflow/models/serialized_dag.py | 5 +- tests/jobs/test_scheduler_job.py | 77 ++++++++++++++++++++++++++--- tests/models/test_serialized_dag.py | 2 - 4 files changed, 74 insertions(+), 13 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 42fd1117483c7..48c0866f1b722 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -456,4 +456,5 @@ def sync_to_db(self): from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel DAG.bulk_sync_to_db(self.dags.values()) - SerializedDagModel.bulk_sync_to_db(self.dags.values()) + if self.store_serialized_dags: + SerializedDagModel.bulk_sync_to_db(self.dags.values()) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 2e7ac687e45d8..bac2c4ce90c3c 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -30,7 +30,7 @@ from airflow.models.dag import DAG, DagModel from airflow.models.dagcode import DagCode from airflow.serialization.serialized_objects import SerializedDAG -from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, STORE_SERIALIZED_DAGS, json +from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime @@ -205,9 +205,6 @@ def bulk_sync_to_db(dags: List[DAG], session=None): :type dags: List[airflow.models.dag.DAG] :return: None """ - if not STORE_SERIALIZED_DAGS: - return - for dag in dags: if not dag.is_subdag: SerializedDagModel.write_dag( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 644df90e63def..a8bfd698908d2 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -44,6 +44,7 @@ from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.bash import BashOperator from airflow.operators.dummy_operator import DummyOperator +from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils import timezone from airflow.utils.dag_processing import FailureCallbackRequest, SimpleDag, SimpleDagBag from airflow.utils.dates import days_ago @@ -122,7 +123,11 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed @classmethod def setUpClass(cls): - cls.dagbag = DagBag() + # Ensure the DAGs we are looking at from the DB are up-to-date + non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag.store_serialized_dags = True + non_serialized_dagbag.sync_to_db() + cls.dagbag = DagBag(store_serialized_dags=True) def test_dag_file_processor_sla_miss_callback(self): """ @@ -415,6 +420,7 @@ def test_dag_file_processor_dagrun_once(self): schedule_interval="@once") dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) @@ -491,6 +497,8 @@ def test_dag_file_processor_process_task_instances(self, state, start_date, end_ orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -537,6 +545,8 @@ def test_dag_file_processor_process_task_instances_with_task_concurrency( orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -587,6 +597,8 @@ def test_dag_file_processor_process_task_instances_depends_on_past(self, state, orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) @@ -621,6 +633,8 @@ def test_dag_file_processor_do_not_schedule_removed_task(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -628,9 +642,11 @@ def test_dag_file_processor_do_not_schedule_removed_task(self): self.assertIsNotNone(dr) dr = DagRun.find(run_id=dr.run_id)[0] + # Re-create the DAG, but remove the task dag = DAG( dag_id='test_scheduler_do_not_schedule_removed_task', start_date=DEFAULT_DATE) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) @@ -651,6 +667,8 @@ def test_dag_file_processor_do_not_schedule_too_early(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -668,6 +686,8 @@ def test_dag_file_processor_do_not_schedule_without_tasks(self): with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear(session=session) dag.start_date = None @@ -688,6 +708,8 @@ def test_dag_file_processor_do_not_run_finished(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -724,6 +746,8 @@ def test_dag_file_processor_add_new_task(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -737,6 +761,7 @@ def test_dag_file_processor_add_new_task(self): task_id='dummy2', dag=dag, owner='airflow') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor._process_task_instances(dag, dag_runs=[dr]) @@ -763,6 +788,7 @@ def test_dag_file_processor_verify_max_active_runs(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -791,6 +817,8 @@ def test_dag_file_processor_fail_dagrun_timeout(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -831,6 +859,8 @@ def test_dag_file_processor_verify_max_active_runs_and_dagrun_timeout(self): session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -868,6 +898,7 @@ def test_dag_file_processor_max_active_runs_respected_after_clear(self): session.merge(orm_dag) session.commit() session.close() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() @@ -892,8 +923,6 @@ def test_dag_file_processor_max_active_runs_respected_after_clear(self): def test_find_dags_to_run_includes_subdags(self): dag = self.dagbag.get_dag('test_subdag_operator') - print(self.dagbag.dag_ids) - print(self.dagbag.dag_folder) self.assertGreater(len(dag.subdags), 0) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values()) @@ -931,7 +960,7 @@ def setup_dag(dag_id, schedule_interval, start_date, catchup): session.commit() session.close() - return dag + return SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) now = timezone.utcnow() six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace( @@ -1002,6 +1031,8 @@ def test_dag_file_processor_auto_align(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dr = dag_file_processor.create_dag_run(dag) @@ -1022,6 +1053,7 @@ def test_dag_file_processor_auto_align(self): orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() @@ -1332,7 +1364,11 @@ def setUp(self): @classmethod def setUpClass(cls): - cls.dagbag = DagBag() + # Ensure the DAGs we are looking at from the DB are up-to-date + non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) + non_serialized_dagbag.store_serialized_dags = True + non_serialized_dagbag.sync_to_db() + cls.dagbag = DagBag(store_serialized_dags=True) def test_is_alive(self): job = SchedulerJob(None, heartrate=10, state=State.RUNNING) @@ -1454,6 +1490,7 @@ def test_execute_task_instances_is_paused_wont_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1485,6 +1522,8 @@ def test_execute_task_instances_no_dagrun_task_will_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1511,6 +1550,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1537,6 +1577,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1577,6 +1618,7 @@ def test_find_executable_task_instances_pool(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1, pool='a') task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1620,6 +1662,7 @@ def test_find_executable_task_instances_in_default_pool(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) op1 = DummyOperator(dag=dag, task_id='dummy1') op2 = DummyOperator(dag=dag, task_id='dummy2') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) executor = MockExecutor(do_update=True) @@ -1661,6 +1704,7 @@ def test_nonexistent_pool(self): task_id = 'dummy_wrong_pool' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist") + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1685,6 +1729,7 @@ def test_find_executable_task_instances_none(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1703,6 +1748,7 @@ def test_find_executable_task_instances_concurrency(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1749,6 +1795,7 @@ def test_find_executable_task_instances_concurrency_queued(self): task1 = DummyOperator(dag=dag, task_id='dummy1') task2 = DummyOperator(dag=dag, task_id='dummy2') task3 = DummyOperator(dag=dag, task_id='dummy3') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1783,6 +1830,7 @@ def test_find_executable_task_instances_task_concurrency(self): # pylint: disab dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1, task_concurrency=2) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) executor = MockExecutor(do_update=True) @@ -1877,6 +1925,7 @@ def test_change_state_for_executable_task_instances_no_tis_with_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) self._make_simple_dag_bag([dag]) scheduler = SchedulerJob() @@ -1909,6 +1958,7 @@ def test_enqueue_task_instances_with_queued_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1931,6 +1981,7 @@ def test_execute_task_instances_nothing(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = SimpleDagBag([]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -1956,6 +2007,7 @@ def test_execute_task_instances(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3) task1 = DummyOperator(dag=dag, task_id=task_id_1) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -2026,6 +2078,7 @@ def test_execute_task_instances_limit(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) task1 = DummyOperator(dag=dag, task_id=task_id_1) task2 = DummyOperator(dag=dag, task_id=task_id_2) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dagbag = self._make_simple_dag_bag([dag]) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) @@ -2062,15 +2115,20 @@ def test_change_state_for_tis_without_dagrun(self): DummyOperator(task_id='dummy', dag=dag1, owner='airflow') DummyOperator(task_id='dummy_b', dag=dag1, owner='airflow') + dag1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag1)) dag2 = DAG(dag_id='test_change_state_for_tis_without_dagrun_dont_change', start_date=DEFAULT_DATE) DummyOperator(task_id='dummy', dag=dag2, owner='airflow') + dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2)) + dag3 = DAG(dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', start_date=DEFAULT_DATE) DummyOperator(task_id='dummy', dag=dag3, owner='airflow') + dag3 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag3)) + session = settings.Session() dr1 = dag1.create_dagrun(run_id=DagRunType.SCHEDULED.value, state=State.RUNNING, @@ -2154,6 +2212,7 @@ def test_change_state_for_tasks_failed_to_execute(self): task_id='task_id', dag=dag, owner='airflow') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) # If there's no left over task in executor.queued_tasks, nothing happens session = settings.Session() @@ -2202,6 +2261,7 @@ def test_reset_state_for_orphaned_tasks(self): with dag: op1 = DummyOperator(task_id='op1') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dr = dag.create_dagrun(run_id=f"{DagRunType.SCHEDULED.value}__", @@ -2250,6 +2310,7 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(self, with dag: op1 = DummyOperator(task_id='op1') + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) # Create DAG run with FAILED state dag.clear() @@ -2311,6 +2372,7 @@ def evaluate_dagrun( self.null_exec.mock_task_fail(dag_id, tid, ex_date) try: + # This needs a _REAL_ dag, not the serialized version dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs) except AirflowException: pass @@ -2452,7 +2514,7 @@ def test_scheduler_start_date(self): dag_id = 'test_start_date_scheduling' dag = self.dagbag.get_dag(dag_id) dag.clear() - self.assertTrue(dag.start_date > datetime.datetime.utcnow()) + self.assertGreater(dag.start_date, datetime.datetime.utcnow()) scheduler = SchedulerJob(dag_id, executor=self.null_exec, @@ -2592,6 +2654,8 @@ def test_scheduler_verify_pool_full(self): session.merge(orm_dag) session.commit() + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) scheduler = SchedulerJob(executor=self.null_exec) @@ -2643,6 +2707,7 @@ def test_scheduler_reschedule(self): bash_command='echo 1', ) + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) dag.clear() dag.is_subdag = False diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index a578307ddbebc..fe96d731dcc65 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -19,7 +19,6 @@ """Unit tests for SerializedDagModel.""" import unittest -from unittest import mock from airflow import DAG, example_dags as example_dags_module from airflow.models import DagBag @@ -117,7 +116,6 @@ def test_remove_stale_dags(self): self.assertFalse(SDM.has_dag(stale_dag.dag_id)) self.assertTrue(SDM.has_dag(fresh_dag.dag_id)) - @mock.patch('airflow.models.serialized_dag.STORE_SERIALIZED_DAGS', True) def test_bulk_sync_to_db(self): dags = [ DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),