From a62b1f5a59c44f708052fb5a079cccbbc640c04b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= <6774676+eumiro@users.noreply.github.com> Date: Fri, 25 Aug 2023 21:07:45 +0200 Subject: [PATCH] Refactor: tmp_path in tests/dag_processing --- tests/dag_processing/test_job_runner.py | 106 ++++++++++++------------ 1 file changed, 51 insertions(+), 55 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index e300eb9866157..8189c9a219786 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -29,7 +29,6 @@ import time from datetime import datetime, timedelta from logging.config import dictConfig -from tempfile import TemporaryDirectory from textwrap import dedent from unittest import mock from unittest.mock import MagicMock, PropertyMock @@ -145,12 +144,11 @@ def run_processor_manager_one_loop(self, manager, parent_pipe): raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!") @conf_vars({("core", "load_examples"): "False"}) - def test_remove_file_clears_import_error(self, tmpdir): - filename_to_parse = tmpdir / "temp_dag.py" + def test_remove_file_clears_import_error(self, tmp_path): + path_to_parse = tmp_path / "temp_dag.py" # Generate original import error - with open(filename_to_parse, "w") as file_to_parse: - file_to_parse.writelines("an invalid airflow DAG") + path_to_parse.write_text("an invalid airflow DAG") child_pipe, parent_pipe = multiprocessing.Pipe() @@ -158,7 +156,7 @@ def test_remove_file_clears_import_error(self, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=tmpdir, + dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -174,7 +172,7 @@ def test_remove_file_clears_import_error(self, tmpdir): import_errors = session.query(errors.ImportError).all() assert len(import_errors) == 1 - filename_to_parse.remove() + path_to_parse.unlink() # Rerun the scheduler once the dag file has been removed self.run_processor_manager_one_loop(manager, parent_pipe) @@ -187,25 +185,24 @@ def test_remove_file_clears_import_error(self, tmpdir): parent_pipe.close() @conf_vars({("core", "load_examples"): "False"}) - def test_max_runs_when_no_files(self): + def test_max_runs_when_no_files(self, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder: - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") - manager = DagProcessorJobRunner( - job=Job(), - processor=DagFileProcessorManager( - dag_directory=dags_folder, - max_runs=1, - processor_timeout=timedelta(days=365), - signal_conn=child_pipe, - dag_ids=[], - pickle_dags=False, - async_mode=async_mode, - ), - ) + async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") + manager = DagProcessorJobRunner( + job=Job(), + processor=DagFileProcessorManager( + dag_directory=os.fspath(tmp_path), + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=child_pipe, + dag_ids=[], + pickle_dags=False, + async_mode=async_mode, + ), + ) - self.run_processor_manager_one_loop(manager, parent_pipe) + self.run_processor_manager_one_loop(manager, parent_pipe) child_pipe.close() parent_pipe.close() @@ -893,16 +890,15 @@ def fake_processor_(*args, **kwargs): @conf_vars({("core", "load_examples"): "False"}) @mock.patch("airflow.dag_processing.manager.Stats.timing") - def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): - filename_to_parse = tmpdir / "temp_dag.py" + def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): + path_to_parse = tmp_path / "temp_dag.py" dag_code = dedent( """ from airflow import DAG dag = DAG(dag_id='temp_dag', schedule='0 0 * * *') """ ) - with open(filename_to_parse, "w") as file_to_parse: - file_to_parse.writelines(dag_code) + path_to_parse.write_text(dag_code) child_pipe, parent_pipe = multiprocessing.Pipe() @@ -910,7 +906,7 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=tmpdir, + dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -938,7 +934,7 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): any_order=True, ) - def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): + def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): """Test DagProcessorJobRunner._refresh_dag_dir method""" manager = DagProcessorJobRunner( job=Job(), @@ -952,7 +948,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): async_mode=True, ), ) - dagbag = DagBag(dag_folder=tmpdir, include_examples=False) + dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") dagbag.process_file(zipped_dag_path) dag = dagbag.get_dag("test_zip_dag") @@ -967,7 +963,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): # assert dag still active assert dag.get_is_active() - def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir): + def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): """Test DagProcessorJobRunner._refresh_dag_dir method""" manager = DagProcessorJobRunner( job=Job(), @@ -981,7 +977,7 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir): async_mode=True, ), ) - dagbag = DagBag(dag_folder=tmpdir, include_examples=False) + dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") dagbag.process_file(zipped_dag_path) dag = dagbag.get_dag("test_zip_dag") @@ -1000,10 +996,10 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir): # assert dag deactivated assert not dag.get_is_active() - def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmpdir): + def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmp_path): """Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir""" - dagbag = DagBag(dag_folder=tmpdir, include_examples=False) + dagbag = DagBag(dag_folder=tmp_path, include_examples=False) dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py") dagbag.process_file(dag_path) dag = dagbag.get_dag("miscellaneous_test_dag") @@ -1040,7 +1036,7 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t ("scheduler", "standalone_dag_processor"): "True", } ) - def test_fetch_callbacks_from_database(self, tmpdir): + def test_fetch_callbacks_from_database(self, tmp_path): """Test DagProcessorJobRunner._fetch_callbacks method""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1048,20 +1044,20 @@ def test_fetch_callbacks_from_database(self, tmpdir): dag_id="test_start_date_scheduling", full_filepath=str(dag_filepath), is_failure_callback=True, - processor_subdir=str(tmpdir), + processor_subdir=os.fspath(tmp_path), run_id="123", ) callback2 = DagCallbackRequest( dag_id="test_start_date_scheduling", full_filepath=str(dag_filepath), is_failure_callback=True, - processor_subdir=str(tmpdir), + processor_subdir=os.fspath(tmp_path), run_id="456", ) callback3 = SlaCallbackRequest( dag_id="test_start_date_scheduling", full_filepath=str(dag_filepath), - processor_subdir=str(tmpdir), + processor_subdir=os.fspath(tmp_path), ) with create_session() as session: @@ -1073,7 +1069,7 @@ def test_fetch_callbacks_from_database(self, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=str(tmpdir), + dag_directory=os.fspath(tmp_path), max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -1093,7 +1089,7 @@ def test_fetch_callbacks_from_database(self, tmpdir): ("scheduler", "standalone_dag_processor"): "True", } ) - def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir): + def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): """Test DagProcessorJobRunner._fetch_callbacks method""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1101,7 +1097,7 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir): dag_id="test_start_date_scheduling", full_filepath=str(dag_filepath), is_failure_callback=True, - processor_subdir=str(tmpdir), + processor_subdir=os.fspath(tmp_path), run_id="123", ) callback2 = DagCallbackRequest( @@ -1120,7 +1116,7 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=tmpdir, + dag_directory=tmp_path, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -1141,7 +1137,7 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir): ("core", "load_examples"): "False", } ) - def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir): + def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): """Test DagProcessorJobRunner._fetch_callbacks method""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1152,7 +1148,7 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir): full_filepath=str(dag_filepath), is_failure_callback=True, run_id=str(i), - processor_subdir=str(tmpdir), + processor_subdir=os.fspath(tmp_path), ) session.add(DbCallbackRequest(callback=callback, priority_weight=i)) @@ -1160,7 +1156,7 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=str(tmpdir), + dag_directory=str(tmp_path), max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -1184,7 +1180,7 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir): ("core", "load_examples"): "False", } ) - def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): + def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" with create_session() as session: @@ -1192,7 +1188,7 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): dag_id="test_start_date_scheduling", full_filepath=str(dag_filepath), is_failure_callback=True, - processor_subdir=str(tmpdir), + processor_subdir=str(tmp_path), run_id="123", ) session.add(DbCallbackRequest(callback=callback, priority_weight=10)) @@ -1201,7 +1197,7 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=tmpdir, + dag_directory=tmp_path, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, @@ -1219,7 +1215,7 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): with create_session() as session: assert session.query(DbCallbackRequest).count() == 1 - def test_callback_queue(self, tmpdir): + def test_callback_queue(self, tmp_path): # given manager = DagProcessorJobRunner( job=Job(), @@ -1239,7 +1235,7 @@ def test_callback_queue(self, tmpdir): dag_id="dag1", run_id="run1", is_failure_callback=False, - processor_subdir=tmpdir, + processor_subdir=tmp_path, msg=None, ) dag1_req2 = DagCallbackRequest( @@ -1247,18 +1243,18 @@ def test_callback_queue(self, tmpdir): dag_id="dag1", run_id="run1", is_failure_callback=False, - processor_subdir=tmpdir, + processor_subdir=tmp_path, msg=None, ) dag1_sla1 = SlaCallbackRequest( full_filepath="/green_eggs/ham/file1.py", dag_id="dag1", - processor_subdir=tmpdir, + processor_subdir=tmp_path, ) dag1_sla2 = SlaCallbackRequest( full_filepath="/green_eggs/ham/file1.py", dag_id="dag1", - processor_subdir=tmpdir, + processor_subdir=tmp_path, ) dag2_req1 = DagCallbackRequest( @@ -1266,14 +1262,14 @@ def test_callback_queue(self, tmpdir): dag_id="dag2", run_id="run1", is_failure_callback=False, - processor_subdir=tmpdir, + processor_subdir=tmp_path, msg=None, ) dag3_sla1 = SlaCallbackRequest( full_filepath="/green_eggs/ham/file3.py", dag_id="dag3", - processor_subdir=tmpdir, + processor_subdir=tmp_path, ) # when