Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 51 additions & 55 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import time
from datetime import datetime, timedelta
from logging.config import dictConfig
from tempfile import TemporaryDirectory
from textwrap import dedent
from unittest import mock
from unittest.mock import MagicMock, PropertyMock
Expand Down Expand Up @@ -145,20 +144,19 @@ def run_processor_manager_one_loop(self, manager, parent_pipe):
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")

@conf_vars({("core", "load_examples"): "False"})
def test_remove_file_clears_import_error(self, tmpdir):
filename_to_parse = tmpdir / "temp_dag.py"
def test_remove_file_clears_import_error(self, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"

# Generate original import error
with open(filename_to_parse, "w") as file_to_parse:
file_to_parse.writelines("an invalid airflow DAG")
path_to_parse.write_text("an invalid airflow DAG")

child_pipe, parent_pipe = multiprocessing.Pipe()

async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=tmpdir,
dag_directory=path_to_parse.parent,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand All @@ -174,7 +172,7 @@ def test_remove_file_clears_import_error(self, tmpdir):
import_errors = session.query(errors.ImportError).all()
assert len(import_errors) == 1

filename_to_parse.remove()
path_to_parse.unlink()

# Rerun the scheduler once the dag file has been removed
self.run_processor_manager_one_loop(manager, parent_pipe)
Expand All @@ -187,25 +185,24 @@ def test_remove_file_clears_import_error(self, tmpdir):
parent_pipe.close()

@conf_vars({("core", "load_examples"): "False"})
def test_max_runs_when_no_files(self):
def test_max_runs_when_no_files(self, tmp_path):
child_pipe, parent_pipe = multiprocessing.Pipe()

with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder:
async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=dags_folder,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
dag_ids=[],
pickle_dags=False,
async_mode=async_mode,
),
)
async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=os.fspath(tmp_path),
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
dag_ids=[],
pickle_dags=False,
async_mode=async_mode,
),
)

self.run_processor_manager_one_loop(manager, parent_pipe)
self.run_processor_manager_one_loop(manager, parent_pipe)
child_pipe.close()
parent_pipe.close()

Expand Down Expand Up @@ -893,24 +890,23 @@ def fake_processor_(*args, **kwargs):

@conf_vars({("core", "load_examples"): "False"})
@mock.patch("airflow.dag_processing.manager.Stats.timing")
def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
filename_to_parse = tmpdir / "temp_dag.py"
def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"
dag_code = dedent(
"""
from airflow import DAG
dag = DAG(dag_id='temp_dag', schedule='0 0 * * *')
"""
)
with open(filename_to_parse, "w") as file_to_parse:
file_to_parse.writelines(dag_code)
path_to_parse.write_text(dag_code)

child_pipe, parent_pipe = multiprocessing.Pipe()

async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=tmpdir,
dag_directory=path_to_parse.parent,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand Down Expand Up @@ -938,7 +934,7 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
any_order=True,
)

def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
job=Job(),
Expand All @@ -952,7 +948,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
async_mode=True,
),
)
dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
dagbag.process_file(zipped_dag_path)
dag = dagbag.get_dag("test_zip_dag")
Expand All @@ -967,7 +963,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
# assert dag still active
assert dag.get_is_active()

def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
job=Job(),
Expand All @@ -981,7 +977,7 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
async_mode=True,
),
)
dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
dagbag.process_file(zipped_dag_path)
dag = dagbag.get_dag("test_zip_dag")
Expand All @@ -1000,10 +996,10 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
# assert dag deactivated
assert not dag.get_is_active()

def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmpdir):
def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir"""

dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py")
dagbag.process_file(dag_path)
dag = dagbag.get_dag("miscellaneous_test_dag")
Expand Down Expand Up @@ -1040,28 +1036,28 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t
("scheduler", "standalone_dag_processor"): "True",
}
)
def test_fetch_callbacks_from_database(self, tmpdir):
def test_fetch_callbacks_from_database(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

callback1 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
processor_subdir=str(tmpdir),
processor_subdir=os.fspath(tmp_path),
run_id="123",
)
callback2 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
processor_subdir=str(tmpdir),
processor_subdir=os.fspath(tmp_path),
run_id="456",
)
callback3 = SlaCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
processor_subdir=str(tmpdir),
processor_subdir=os.fspath(tmp_path),
)

with create_session() as session:
Expand All @@ -1073,7 +1069,7 @@ def test_fetch_callbacks_from_database(self, tmpdir):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=str(tmpdir),
dag_directory=os.fspath(tmp_path),
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand All @@ -1093,15 +1089,15 @@ def test_fetch_callbacks_from_database(self, tmpdir):
("scheduler", "standalone_dag_processor"): "True",
}
)
def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir):
def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

callback1 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
processor_subdir=str(tmpdir),
processor_subdir=os.fspath(tmp_path),
run_id="123",
)
callback2 = DagCallbackRequest(
Expand All @@ -1120,7 +1116,7 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=tmpdir,
dag_directory=tmp_path,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand All @@ -1141,7 +1137,7 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir):
("core", "load_examples"): "False",
}
)
def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

Expand All @@ -1152,15 +1148,15 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
full_filepath=str(dag_filepath),
is_failure_callback=True,
run_id=str(i),
processor_subdir=str(tmpdir),
processor_subdir=os.fspath(tmp_path),
)
session.add(DbCallbackRequest(callback=callback, priority_weight=i))

child_pipe, parent_pipe = multiprocessing.Pipe()
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=str(tmpdir),
dag_directory=str(tmp_path),
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand All @@ -1184,15 +1180,15 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
("core", "load_examples"): "False",
}
)
def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
def test_fetch_callbacks_from_database_not_standalone(self, tmp_path):
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

with create_session() as session:
callback = DagCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
processor_subdir=str(tmpdir),
processor_subdir=str(tmp_path),
run_id="123",
)
session.add(DbCallbackRequest(callback=callback, priority_weight=10))
Expand All @@ -1201,7 +1197,7 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=tmpdir,
dag_directory=tmp_path,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
Expand All @@ -1219,7 +1215,7 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
with create_session() as session:
assert session.query(DbCallbackRequest).count() == 1

def test_callback_queue(self, tmpdir):
def test_callback_queue(self, tmp_path):
# given
manager = DagProcessorJobRunner(
job=Job(),
Expand All @@ -1239,41 +1235,41 @@ def test_callback_queue(self, tmpdir):
dag_id="dag1",
run_id="run1",
is_failure_callback=False,
processor_subdir=tmpdir,
processor_subdir=tmp_path,
msg=None,
)
dag1_req2 = DagCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
run_id="run1",
is_failure_callback=False,
processor_subdir=tmpdir,
processor_subdir=tmp_path,
msg=None,
)
dag1_sla1 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
processor_subdir=tmpdir,
processor_subdir=tmp_path,
)
dag1_sla2 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
processor_subdir=tmpdir,
processor_subdir=tmp_path,
)

dag2_req1 = DagCallbackRequest(
full_filepath="/green_eggs/ham/file2.py",
dag_id="dag2",
run_id="run1",
is_failure_callback=False,
processor_subdir=tmpdir,
processor_subdir=tmp_path,
msg=None,
)

dag3_sla1 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file3.py",
dag_id="dag3",
processor_subdir=tmpdir,
processor_subdir=tmp_path,
)

# when
Expand Down