diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b8bd7332f6656..84d50ed1a948a 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -1134,7 +1134,7 @@ def prepare_file_path_queue(self): self._file_stats.pop(file_path, None) file_paths_to_stop_watching.add(file_path) continue - file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path])) + file_modified_time = datetime.fromtimestamp(files_with_mtime[file_path], tz=timezone.utc) else: file_paths.append(file_path) file_modified_time = None diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index cb52b7a4cc30b..a7d9328bbacfa 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -26,6 +26,7 @@ import socket import sys import threading +import time from datetime import datetime, timedelta from logging.config import dictConfig from tempfile import TemporaryDirectory @@ -343,6 +344,24 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( manager.prepare_file_path_queue() assert manager._file_path_queue == expected_order + @pytest.fixture + def change_platform_timezone(self, monkeypatch): + monkeypatch.setenv("TZ", "Europe/Paris") + + # propagate new timezone to C routines + # this is only needed for Unix. On Windows, exporting the TZ env variable + # is enough (see https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-s-localtime32-s-localtime64-s?view=msvc-170#remarks) + tzset = getattr(time, "tzset", None) + if tzset is not None: + tzset() + + yield + + # reset timezone to platform's default + monkeypatch.delenv("TZ") + if tzset is not None: + tzset() + @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) @@ -350,7 +369,13 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) @mock.patch("airflow.utils.file.os.path.getmtime") def test_file_paths_in_queue_sorted_by_modified_time( - self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + self, + mock_getmtime, + mock_isfile, + mock_find_path, + mock_might_contain_dag, + mock_zipfile, + change_platform_timezone, ): """Test files are sorted by modified time""" paths_with_mtime = {"file_3.py": 3.0, "file_2.py": 2.0, "file_4.py": 5.0, "file_1.py": 4.0} @@ -382,7 +407,13 @@ def test_file_paths_in_queue_sorted_by_modified_time( @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) @mock.patch("airflow.utils.file.os.path.getmtime") def test_file_paths_in_queue_excludes_missing_file( - self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + self, + mock_getmtime, + mock_isfile, + mock_find_path, + mock_might_contain_dag, + mock_zipfile, + change_platform_timezone, ): """Check that a file is not enqueued for processing if it has been deleted""" dag_files = ["file_3.py", "file_2.py", "file_4.py"] @@ -410,7 +441,13 @@ def test_file_paths_in_queue_excludes_missing_file( @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) @mock.patch("airflow.utils.file.os.path.getmtime") def test_add_new_file_to_parsing_queue( - self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + self, + mock_getmtime, + mock_isfile, + mock_find_path, + mock_might_contain_dag, + mock_zipfile, + change_platform_timezone, ): """Check that new file is added to parsing queue""" dag_files = ["file_1.py", "file_2.py", "file_3.py"] @@ -438,13 +475,20 @@ def test_add_new_file_to_parsing_queue( ) @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("airflow.settings.TIMEZONE", timezone.utc) @mock.patch("zipfile.is_zipfile", return_value=True) @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) @mock.patch("airflow.utils.file.os.path.getmtime") def test_recently_modified_file_is_parsed_with_mtime_mode( - self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + self, + mock_getmtime, + mock_isfile, + mock_find_path, + mock_might_contain_dag, + mock_zipfile, + change_platform_timezone, ): """ Test recently updated files are processed even if min_file_process_interval is not reached @@ -465,7 +509,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( async_mode=True, ) - # let's say the DAG was just parsed 2 seconds before the Freezed time + # let's say the DAG was just parsed 10 seconds before the Freezed time last_finish_time = freezed_base_time - timedelta(seconds=10) manager._file_stats = { "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),