Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ def prepare_file_path_queue(self):
self._file_stats.pop(file_path, None)
file_paths_to_stop_watching.add(file_path)
continue
file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
file_modified_time = datetime.fromtimestamp(files_with_mtime[file_path], tz=timezone.utc)
else:
file_paths.append(file_path)
file_modified_time = None
Expand Down
54 changes: 49 additions & 5 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import socket
import sys
import threading
import time
from datetime import datetime, timedelta
from logging.config import dictConfig
from tempfile import TemporaryDirectory
Expand Down Expand Up @@ -343,14 +344,38 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host(
manager.prepare_file_path_queue()
assert manager._file_path_queue == expected_order

@pytest.fixture
def change_platform_timezone(self, monkeypatch):
monkeypatch.setenv("TZ", "Europe/Paris")

# propagate new timezone to C routines
# this is only needed for Unix. On Windows, exporting the TZ env variable
# is enough (see https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-s-localtime32-s-localtime64-s?view=msvc-170#remarks)
tzset = getattr(time, "tzset", None)
if tzset is not None:
tzset()

yield

# reset timezone to platform's default
monkeypatch.delenv("TZ")
if tzset is not None:
tzset()

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_file_paths_in_queue_sorted_by_modified_time(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
self,
mock_getmtime,
mock_isfile,
mock_find_path,
mock_might_contain_dag,
mock_zipfile,
change_platform_timezone,
):
"""Test files are sorted by modified time"""
paths_with_mtime = {"file_3.py": 3.0, "file_2.py": 2.0, "file_4.py": 5.0, "file_1.py": 4.0}
Expand Down Expand Up @@ -382,7 +407,13 @@ def test_file_paths_in_queue_sorted_by_modified_time(
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_file_paths_in_queue_excludes_missing_file(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
self,
mock_getmtime,
mock_isfile,
mock_find_path,
mock_might_contain_dag,
mock_zipfile,
change_platform_timezone,
):
"""Check that a file is not enqueued for processing if it has been deleted"""
dag_files = ["file_3.py", "file_2.py", "file_4.py"]
Expand Down Expand Up @@ -410,7 +441,13 @@ def test_file_paths_in_queue_excludes_missing_file(
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_add_new_file_to_parsing_queue(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
self,
mock_getmtime,
mock_isfile,
mock_find_path,
mock_might_contain_dag,
mock_zipfile,
change_platform_timezone,
):
"""Check that new file is added to parsing queue"""
dag_files = ["file_1.py", "file_2.py", "file_3.py"]
Expand Down Expand Up @@ -438,13 +475,20 @@ def test_add_new_file_to_parsing_queue(
)

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.settings.TIMEZONE", timezone.utc)
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
self,
mock_getmtime,
mock_isfile,
mock_find_path,
mock_might_contain_dag,
mock_zipfile,
change_platform_timezone,
):
"""
Test recently updated files are processed even if min_file_process_interval is not reached
Expand All @@ -465,7 +509,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
async_mode=True,
)

# let's say the DAG was just parsed 2 seconds before the Freezed time
# let's say the DAG was just parsed 10 seconds before the Freezed time
last_finish_time = freezed_base_time - timedelta(seconds=10)
manager._file_stats = {
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
Expand Down