From fa26c7e2a8416f12b96fae7707619f0a46182a0d Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Tue, 20 Dec 2022 13:48:03 -0500 Subject: [PATCH 1/7] Migrate DagFileProcessor.manage_slas to Internal API --- .../endpoints/rpc_api_endpoint.py | 1 + airflow/dag_processing/processor.py | 35 ++++-- tests/dag_processing/test_processor.py | 118 +++++++++++------- 3 files changed, 95 insertions(+), 59 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index bdf124345b7f9..b07a7dbfeaa36 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -36,6 +36,7 @@ def _initialize_map() -> dict[str, Callable]: functions: list[Callable] = [ DagFileProcessor.update_import_errors, + DagFileProcessor.manage_slas, ] return {f"{func.__module__}.{func.__name__}": func for func in functions} diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index d577c93ee791f..a3956f8a6322c 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -55,6 +55,8 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import State +log = logging.getLogger(__name__) + if TYPE_CHECKING: from airflow.models.operator import Operator @@ -365,8 +367,10 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L self._dag_directory = dag_directory self.dag_warnings: set[tuple[str, str]] = set() + @staticmethod + @internal_api_call @provide_session - def manage_slas(self, dag: DAG, session: Session = None) -> None: + def manage_slas(dag_folder, dag_id: str, session: Session = None) -> None: """ Finding all tasks that have SLAs defined, and sending alert emails when needed. @@ -375,9 +379,11 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: We are assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ - self.log.info("Running SLA Checks for %s", dag.dag_id) + dagbag = DagFileProcessor._get_dagbag(dag_folder, log) + dag = dagbag.get_dag(dag_id) + log.info("Running SLA Checks for %s", dag.dag_id) if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): - self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) + log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return qry = ( @@ -479,13 +485,13 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: else [dag.sla_miss_callback] ) for callback in callbacks: - self.log.info("Calling SLA miss callback %s", callback) + log.info("Calling SLA miss callback %s", callback) try: callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True except Exception: Stats.incr("sla_callback_notification_failure") - self.log.exception( + log.exception( "Could not call sla_miss_callback(%s) for DAG %s", callback.func_name, # type: ignore[attr-defined] dag.dag_id, @@ -504,7 +510,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: task = dag.get_task(sla.task_id) except TaskNotFound: # task already deleted from DAG, skip it - self.log.warning( + log.warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) continue @@ -524,7 +530,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: notification_sent = True except Exception: Stats.incr("sla_email_notification_failure") - self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) + log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: @@ -644,7 +650,7 @@ def execute_callbacks( if isinstance(request, TaskCallbackRequest): self._execute_task_callbacks(dagbag, request, session=session) elif isinstance(request, SlaCallbackRequest): - self.manage_slas(dagbag.get_dag(request.dag_id), session=session) + DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session) elif isinstance(request, DagCallbackRequest): self._execute_dag_callbacks(dagbag, request, session) except Exception: @@ -728,6 +734,15 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe self.log.info("Executed failure callback for %s in state %s", ti, ti.state) session.flush() + @staticmethod + def _get_dagbag(file_path: str, logger: logging.Logger): + try: + return DagBag(file_path, include_examples=False) + except Exception: + logger.exception("Failed at reloading the DAG file %s", file_path) + Stats.incr("dag_file_refresh_error", 1, 1) + raise + @provide_session def process_file( self, @@ -758,10 +773,8 @@ def process_file( self.log.info("Processing file %s for tasks to queue", file_path) try: - dagbag = DagBag(file_path, include_examples=False) + dagbag = DagFileProcessor._get_dagbag(file_path, self.log) except Exception: - self.log.exception("Failed at reloading the DAG file %s", file_path) - Stats.incr("dag_file_refresh_error", 1, 1) return 0, 0 if len(dagbag.dags) > 0: diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 05e1a6ab42105..aa2a4f71459a2 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -105,16 +105,15 @@ def _process_file(self, file_path, dag_directory, session): dag_file_processor.process_file(file_path, [], False, session) - def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dummy_dag, get_test_dag): """ Test that the dag file processor calls the sla miss callback """ session = settings.Session() - sla_callback = MagicMock() - # Create dag with a start of 1 day ago, but an sla of 0 - # so we'll already have an sla_miss on the books. + # Create dag with a start of 1 day ago, but a sla of 0, so we'll already have a sla_miss on the books. test_start_date = timezone.utcnow() - datetime.timedelta(days=1) dag, task = create_dummy_dag( dag_id="test_sla_miss", @@ -124,17 +123,18 @@ def test_dag_file_processor_sla_miss_callback(self, create_dummy_dag): ) session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called - def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_invalid_sla(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor does not call the sla miss callback when given an invalid sla @@ -155,16 +155,17 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, create_dummy_dag ) session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() - def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_sent_notification(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor does not call the sla_miss_callback when a notification has already been sent @@ -198,16 +199,20 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, create_dum ) ) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + # Now call manage_slas and see if the sla_miss callback gets called - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() @mock.patch("airflow.dag_processing.processor.Stats.incr") - def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_stats_incr, dag_maker): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( + self, mock_get_dagbag, mock_stats_incr, dag_maker + ): """ Test that the dag file processor does not try to insert already existing item into the database """ @@ -229,10 +234,11 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_st session.merge(ti) session.flush() - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -247,11 +253,12 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_st # because of existing SlaMiss above. # Since this is run often, it's possible that it runs before another # ti is successful thereby trying to insert a duplicate record. - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @mock.patch("airflow.dag_processing.processor.Stats.incr") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla( - self, mock_stats_incr, dag_maker + self, mock_get_dagbag, mock_stats_incr, dag_maker ): """ Test that the dag file processor continue checking subsequent task instances @@ -277,10 +284,11 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ ) session.flush() - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -293,7 +301,11 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ mock_stats_incr.assert_called_with("sla_missed") @mock.patch("airflow.dag_processing.processor.Stats.incr") - def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.log") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_callback_exception( + self, mock_get_dagbag, mock_log, mock_stats_incr, create_dummy_dag + ): """ Test that the dag file processor gracefully logs an exception if there is a problem calling the sla_miss_callback @@ -321,9 +333,12 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c ) # Now call manage_slas and see if the sla_miss callback gets called - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + mock_log.reset_mock() + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called mock_log.exception.assert_called_once_with( "Could not call sla_miss_callback(%s) for DAG %s", @@ -333,8 +348,9 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c mock_stats_incr.assert_called_once_with("sla_callback_notification_failure") @mock.patch("airflow.dag_processing.processor.send_email") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( - self, mock_send_email, create_dummy_dag + self, mock_get_dagbag, mock_send_email, create_dummy_dag ): session = settings.Session() @@ -354,11 +370,11 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss", execution_date=test_start_date)) - dag_file_processor = DagFileProcessor( - dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock() - ) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert len(mock_send_email.call_args_list) == 1 @@ -368,8 +384,10 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.utils.email.send_email") + @mock.patch("airflow.dag_processing.processor.log") + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_email_exception( - self, mock_send_email, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_log, mock_send_email, mock_stats_incr, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -394,16 +412,18 @@ def test_dag_file_processor_sla_miss_email_exception( # Create an SlaMiss where notification was sent, but email was not session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag - dag_file_processor.manage_slas(dag=dag, session=session) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) mock_log.exception.assert_called_once_with( "Could not send SLA Miss email notification for DAG %s", "test_sla_miss" ) mock_stats_incr.assert_called_once_with("sla_email_notification_failure") - def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): + @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") + def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_dummy_dag): """ Test that the dag file processor will not crash when trying to send sla miss notification for a deleted task @@ -425,9 +445,11 @@ def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss", execution_date=test_start_date) ) - mock_log = mock.MagicMock() - dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock_log) - dag_file_processor.manage_slas(dag=dag, session=session) + mock_dagbag = mock.Mock() + mock_dagbag.get_dag.return_value = dag + mock_get_dagbag.return_value = mock_dagbag + + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): From a2fbf5b79fc17b9303a52de0066f1eba10ff6b25 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Thu, 22 Dec 2022 14:43:40 -0500 Subject: [PATCH 2/7] Pass logger as parameter --- airflow/dag_processing/processor.py | 8 ++-- tests/dag_processing/test_processor.py | 56 +++++++++++++++++++------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index a3956f8a6322c..ac63c5c7d0b19 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -55,8 +55,6 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import State -log = logging.getLogger(__name__) - if TYPE_CHECKING: from airflow.models.operator import Operator @@ -370,7 +368,7 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L @staticmethod @internal_api_call @provide_session - def manage_slas(dag_folder, dag_id: str, session: Session = None) -> None: + def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = NEW_SESSION) -> None: """ Finding all tasks that have SLAs defined, and sending alert emails when needed. @@ -650,7 +648,9 @@ def execute_callbacks( if isinstance(request, TaskCallbackRequest): self._execute_task_callbacks(dagbag, request, session=session) elif isinstance(request, SlaCallbackRequest): - DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session) + DagFileProcessor.manage_slas( + dagbag.dag_folder, request.dag_id, log=self.log, session=session + ) elif isinstance(request, DagCallbackRequest): self._execute_dag_callbacks(dagbag, request, session) except Exception: diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index aa2a4f71459a2..4a0351fd5b7af 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -125,11 +125,14 @@ def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dumm session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) assert sla_callback.called @@ -157,11 +160,14 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, mock_get_dagbag, session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) sla_callback.assert_not_called() @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") @@ -199,12 +205,15 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, mock_get_d ) ) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag # Now call manage_slas and see if the sla_miss callback gets called - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) sla_callback.assert_not_called() @@ -234,11 +243,14 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( session.merge(ti) session.flush() + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -253,7 +265,9 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( # because of existing SlaMiss above. # Since this is run often, it's possible that it runs before another # ti is successful thereby trying to insert a duplicate record. - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") @@ -284,11 +298,14 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ ) session.flush() + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -301,10 +318,9 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ mock_stats_incr.assert_called_with("sla_missed") @mock.patch("airflow.dag_processing.processor.Stats.incr") - @mock.patch("airflow.dag_processing.processor.log") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_callback_exception( - self, mock_get_dagbag, mock_log, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_stats_incr, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -333,12 +349,14 @@ def test_dag_file_processor_sla_miss_callback_exception( ) # Now call manage_slas and see if the sla_miss callback gets called + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - mock_log.reset_mock() - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) assert sla_callback.called mock_log.exception.assert_called_once_with( "Could not call sla_miss_callback(%s) for DAG %s", @@ -370,11 +388,14 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss", execution_date=test_start_date)) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) assert len(mock_send_email.call_args_list) == 1 @@ -384,10 +405,9 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.utils.email.send_email") - @mock.patch("airflow.dag_processing.processor.log") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_email_exception( - self, mock_get_dagbag, mock_log, mock_send_email, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_send_email, mock_stats_incr, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -412,11 +432,14 @@ def test_dag_file_processor_sla_miss_email_exception( # Create an SlaMiss where notification was sent, but email was not session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) mock_log.exception.assert_called_once_with( "Could not send SLA Miss email notification for DAG %s", "test_sla_miss" ) @@ -445,11 +468,14 @@ def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_ SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss", execution_date=test_start_date) ) + mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) + DagFileProcessor.manage_slas( + dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session + ) @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): From eb2a9bf079b1c05546bc53b43c5569a8dfe65f92 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Mon, 16 Jan 2023 11:46:45 -0500 Subject: [PATCH 3/7] Use classmethod instead of staticmethod --- airflow/dag_processing/processor.py | 32 ++++++++++++++--------------- airflow/utils/log/logging_mixin.py | 7 +++++++ 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index ac63c5c7d0b19..5a89f66eab60c 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -365,10 +365,10 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L self._dag_directory = dag_directory self.dag_warnings: set[tuple[str, str]] = set() - @staticmethod + @classmethod @internal_api_call @provide_session - def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = NEW_SESSION) -> None: + def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None: """ Finding all tasks that have SLAs defined, and sending alert emails when needed. @@ -377,11 +377,11 @@ def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = We are assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ - dagbag = DagFileProcessor._get_dagbag(dag_folder, log) + dagbag = DagFileProcessor._get_dagbag(dag_folder) dag = dagbag.get_dag(dag_id) - log.info("Running SLA Checks for %s", dag.dag_id) + cls.get_log().info("Running SLA Checks for %s", dag.dag_id) if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): - log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) + cls.get_log().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return qry = ( @@ -483,13 +483,13 @@ def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = else [dag.sla_miss_callback] ) for callback in callbacks: - log.info("Calling SLA miss callback %s", callback) + cls.get_log().info("Calling SLA miss callback %s", callback) try: callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True except Exception: Stats.incr("sla_callback_notification_failure") - log.exception( + cls.get_log().exception( "Could not call sla_miss_callback(%s) for DAG %s", callback.func_name, # type: ignore[attr-defined] dag.dag_id, @@ -508,7 +508,7 @@ def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = task = dag.get_task(sla.task_id) except TaskNotFound: # task already deleted from DAG, skip it - log.warning( + cls.get_log().warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) continue @@ -528,7 +528,9 @@ def manage_slas(dag_folder, dag_id: str, log: logging.Logger, session: Session = notification_sent = True except Exception: Stats.incr("sla_email_notification_failure") - log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) + cls.get_log().exception( + "Could not send SLA Miss email notification for DAG %s", dag.dag_id + ) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: @@ -648,9 +650,7 @@ def execute_callbacks( if isinstance(request, TaskCallbackRequest): self._execute_task_callbacks(dagbag, request, session=session) elif isinstance(request, SlaCallbackRequest): - DagFileProcessor.manage_slas( - dagbag.dag_folder, request.dag_id, log=self.log, session=session - ) + DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session) elif isinstance(request, DagCallbackRequest): self._execute_dag_callbacks(dagbag, request, session) except Exception: @@ -734,12 +734,12 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe self.log.info("Executed failure callback for %s in state %s", ti, ti.state) session.flush() - @staticmethod - def _get_dagbag(file_path: str, logger: logging.Logger): + @classmethod + def _get_dagbag(cls, file_path: str): try: return DagBag(file_path, include_examples=False) except Exception: - logger.exception("Failed at reloading the DAG file %s", file_path) + cls.get_log().exception("Failed at reloading the DAG file %s", file_path) Stats.incr("dag_file_refresh_error", 1, 1) raise @@ -773,7 +773,7 @@ def process_file( self.log.info("Processing file %s for tasks to queue", file_path) try: - dagbag = DagFileProcessor._get_dagbag(file_path, self.log) + dagbag = DagFileProcessor._get_dagbag(file_path) except Exception: return 0, 0 diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 85ff71a94f1bf..2b9b4345f2040 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -67,6 +67,13 @@ class LoggingMixin: def __init__(self, context=None): self._set_context(context) + @classmethod + def get_log(cls): + """Returns a logger.""" + if cls._log is None: + cls._log = logging.getLogger(cls.__module__ + "." + cls.__name__) + return cls._log + @property def log(self) -> Logger: """Returns a logger.""" From 63c1f8c9aa649f3fcd21f33efccde18dc5bb2235 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Fri, 20 Jan 2023 13:50:09 -0500 Subject: [PATCH 4/7] Refactore code --- airflow/utils/log/logging_mixin.py | 21 ++++++---- tests/dag_processing/test_processor.py | 55 ++++++++------------------ 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 2b9b4345f2040..911a30c8e1c9b 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -24,7 +24,7 @@ import sys from io import IOBase from logging import Handler, Logger, StreamHandler -from typing import IO, cast +from typing import IO, Any, TypeVar, cast from airflow.settings import IS_K8S_EXECUTOR_POD @@ -59,6 +59,9 @@ def remove_escape_codes(text: str) -> str: return ANSI_ESCAPE.sub("", text) +_T = TypeVar("_T") + + class LoggingMixin: """Convenience super-class to have a logger configured with the class name""" @@ -67,19 +70,21 @@ class LoggingMixin: def __init__(self, context=None): self._set_context(context) + @staticmethod + def _get_log(obj: Any, clazz: type[_T]) -> Logger: + if obj._log is None: + obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}") + return obj._log + @classmethod - def get_log(cls): + def get_log(cls) -> Logger: """Returns a logger.""" - if cls._log is None: - cls._log = logging.getLogger(cls.__module__ + "." + cls.__name__) - return cls._log + return LoggingMixin._get_log(cls, cls) @property def log(self) -> Logger: """Returns a logger.""" - if self._log is None: - self._log = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__) - return self._log + return LoggingMixin._get_log(self, self.__class__) def _set_context(self, context): if context is not None: diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 4a0351fd5b7af..0421ea2665de0 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -125,14 +125,11 @@ def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dumm session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called @@ -160,14 +157,11 @@ def test_dag_file_processor_sla_miss_callback_invalid_sla(self, mock_get_dagbag, session.merge(TaskInstance(task=task, execution_date=test_start_date, state="success")) session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") @@ -205,15 +199,12 @@ def test_dag_file_processor_sla_miss_callback_sent_notification(self, mock_get_d ) ) - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag # Now call manage_slas and see if the sla_miss callback gets called - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_callback.assert_not_called() @@ -243,14 +234,11 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( session.merge(ti) session.flush() - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -265,9 +253,7 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( # because of existing SlaMiss above. # Since this is run often, it's possible that it runs before another # ti is successful thereby trying to insert a duplicate record. - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") @@ -298,14 +284,11 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ ) session.flush() - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) sla_miss_count = ( session.query(SlaMiss) .filter( @@ -317,10 +300,11 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ assert sla_miss_count == 2 mock_stats_incr.assert_called_with("sla_missed") + @patch.object(DagFileProcessor, "get_log") @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_callback_exception( - self, mock_get_dagbag, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_stats_incr, mock_get_log, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -350,13 +334,12 @@ def test_dag_file_processor_sla_miss_callback_exception( # Now call manage_slas and see if the sla_miss callback gets called mock_log = mock.Mock() + mock_get_log.return_value = mock_log mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert sla_callback.called mock_log.exception.assert_called_once_with( "Could not call sla_miss_callback(%s) for DAG %s", @@ -388,14 +371,11 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss", execution_date=test_start_date)) - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) assert len(mock_send_email.call_args_list) == 1 @@ -403,11 +383,12 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( assert email1 in send_email_to assert email2 not in send_email_to + @patch.object(DagFileProcessor, "get_log") @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.utils.email.send_email") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_email_exception( - self, mock_get_dagbag, mock_send_email, mock_stats_incr, create_dummy_dag + self, mock_get_dagbag, mock_send_email, mock_stats_incr, mock_get_log, create_dummy_dag ): """ Test that the dag file processor gracefully logs an exception if there is a problem @@ -433,13 +414,12 @@ def test_dag_file_processor_sla_miss_email_exception( session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) mock_log = mock.Mock() + mock_get_log.return_value = mock_log mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) mock_log.exception.assert_called_once_with( "Could not send SLA Miss email notification for DAG %s", "test_sla_miss" ) @@ -468,14 +448,11 @@ def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_ SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss", execution_date=test_start_date) ) - mock_log = mock.Mock() mock_dagbag = mock.Mock() mock_dagbag.get_dag.return_value = dag mock_get_dagbag.return_value = mock_dagbag - DagFileProcessor.manage_slas( - dag_folder=dag.fileloc, dag_id="test_sla_miss", log=mock_log, session=session - ) + DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): From 451088556f322a0269d511fbcef916c4484dbaa2 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Fri, 20 Jan 2023 15:21:29 -0500 Subject: [PATCH 5/7] Rename get_log to logger --- airflow/dag_processing/processor.py | 14 +++++++------- airflow/utils/log/logging_mixin.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 5a89f66eab60c..108d71a86fd8a 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -379,9 +379,9 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> """ dagbag = DagFileProcessor._get_dagbag(dag_folder) dag = dagbag.get_dag(dag_id) - cls.get_log().info("Running SLA Checks for %s", dag.dag_id) + cls.logger().info("Running SLA Checks for %s", dag.dag_id) if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): - cls.get_log().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) + cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return qry = ( @@ -483,13 +483,13 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> else [dag.sla_miss_callback] ) for callback in callbacks: - cls.get_log().info("Calling SLA miss callback %s", callback) + cls.logger().info("Calling SLA miss callback %s", callback) try: callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True except Exception: Stats.incr("sla_callback_notification_failure") - cls.get_log().exception( + cls.logger().exception( "Could not call sla_miss_callback(%s) for DAG %s", callback.func_name, # type: ignore[attr-defined] dag.dag_id, @@ -508,7 +508,7 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> task = dag.get_task(sla.task_id) except TaskNotFound: # task already deleted from DAG, skip it - cls.get_log().warning( + cls.logger().warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) continue @@ -528,7 +528,7 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> notification_sent = True except Exception: Stats.incr("sla_email_notification_failure") - cls.get_log().exception( + cls.logger().exception( "Could not send SLA Miss email notification for DAG %s", dag.dag_id ) # If we sent any notification, update the sla_miss table @@ -739,7 +739,7 @@ def _get_dagbag(cls, file_path: str): try: return DagBag(file_path, include_examples=False) except Exception: - cls.get_log().exception("Failed at reloading the DAG file %s", file_path) + cls.logger().exception("Failed at reloading the DAG file %s", file_path) Stats.incr("dag_file_refresh_error", 1, 1) raise diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 911a30c8e1c9b..79746c8492396 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -77,7 +77,7 @@ def _get_log(obj: Any, clazz: type[_T]) -> Logger: return obj._log @classmethod - def get_log(cls) -> Logger: + def logger(cls) -> Logger: """Returns a logger.""" return LoggingMixin._get_log(cls, cls) From 7f25d1f023b8ca0b47bf8f8a274049791a47172f Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Mon, 23 Jan 2023 11:21:28 -0500 Subject: [PATCH 6/7] Fix static checks --- airflow/dag_processing/processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index af279e6d27093..a50ca933dc8e0 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -536,7 +536,9 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> notification_sent = True except Exception: Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id}) - cls.logger().exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) + cls.logger().exception( + "Could not send SLA Miss email notification for DAG %s", dag.dag_id + ) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: From d130514acb65c1db7416cc9d4db8b0e430118ac0 Mon Sep 17 00:00:00 2001 From: Vincent Beck Date: Mon, 23 Jan 2023 13:26:18 -0500 Subject: [PATCH 7/7] Fix unit tests --- tests/dag_processing/test_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 29d3667c5a1aa..12528ae462c93 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -304,7 +304,7 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", "task_id": "dummy"} ) - @patch.object(DagFileProcessor, "get_log") + @patch.object(DagFileProcessor, "logger") @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") def test_dag_file_processor_sla_miss_callback_exception( @@ -392,7 +392,7 @@ def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( assert email1 in send_email_to assert email2 not in send_email_to - @patch.object(DagFileProcessor, "get_log") + @patch.object(DagFileProcessor, "logger") @mock.patch("airflow.dag_processing.processor.Stats.incr") @mock.patch("airflow.utils.email.send_email") @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")