From 5eb119a5ddf8fb8c97e7b197c5752d63394c5dc5 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Wed, 4 Oct 2023 07:34:35 +0400 Subject: [PATCH 1/3] Make core compatible with `pendulum` v3 --- airflow/serialization/serialized_objects.py | 7 ++-- airflow/serialization/serializers/datetime.py | 15 ++++--- airflow/serialization/serializers/timezone.py | 7 +--- airflow/settings.py | 5 ++- airflow/timetables/_cron.py | 6 +-- airflow/utils/sqlalchemy.py | 5 +-- airflow/utils/timezone.py | 31 +++++++++++++- tests/models/test_dag.py | 14 +++---- tests/sensors/test_time_sensor.py | 6 +-- tests/timetables/test_trigger_timetable.py | 13 ++++-- tests/triggers/test_temporal.py | 7 ++-- tests/utils/test_timezone.py | 40 +++++++++++++++++-- 12 files changed, 108 insertions(+), 48 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 889177f4c15e7..5c2b9b1873575 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -63,6 +63,7 @@ from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources from airflow.utils.task_group import MappedTaskGroup, TaskGroup +from airflow.utils.timezone import parse_timezone from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: @@ -165,9 +166,9 @@ def encode_timezone(var: Timezone) -> str | int: ) -def decode_timezone(var: str | int) -> Timezone: +def decode_timezone(var: str | int) -> Timezone | FixedTimezone: """Decode a previously serialized Pendulum Timezone.""" - return pendulum.tz.timezone(var) + return parse_timezone(var) def _get_registered_timetable(importable_string: str) -> type[Timetable] | None: @@ -605,7 +606,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: raise TypeError(f"Invalid type {type_!s} in deserialization.") _deserialize_datetime = pendulum.from_timestamp - _deserialize_timezone = pendulum.tz.timezone + _deserialize_timezone = parse_timezone @classmethod def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta: diff --git a/airflow/serialization/serializers/datetime.py b/airflow/serialization/serializers/datetime.py index 49f0899a5918f..ea030a8afcba5 100644 --- a/airflow/serialization/serializers/datetime.py +++ b/airflow/serialization/serializers/datetime.py @@ -24,7 +24,7 @@ serialize as serialize_timezone, ) from airflow.utils.module_loading import qualname -from airflow.utils.timezone import convert_to_utc, is_naive +from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone if TYPE_CHECKING: import datetime @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date import datetime from pendulum import DateTime - from pendulum.tz import fixed_timezone, timezone tz: datetime.tzinfo | None = None if isinstance(data, dict) and TIMEZONE in data: if version == 1: # try to deserialize unsupported timezones timezone_mapping = { - "EDT": fixed_timezone(-4 * 3600), - "CDT": fixed_timezone(-5 * 3600), - "MDT": fixed_timezone(-6 * 3600), - "PDT": fixed_timezone(-7 * 3600), - "CEST": timezone("CET"), + "EDT": parse_timezone(-4 * 3600), + "CDT": parse_timezone(-5 * 3600), + "MDT": parse_timezone(-6 * 3600), + "PDT": parse_timezone(-7 * 3600), + "CEST": parse_timezone("CET"), } if data[TIMEZONE] in timezone_mapping: tz = timezone_mapping[data[TIMEZONE]] else: - tz = timezone(data[TIMEZONE]) + tz = parse_timezone(data[TIMEZONE]) else: tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0]) diff --git a/airflow/serialization/serializers/timezone.py b/airflow/serialization/serializers/timezone.py index 23901b9d444e8..0f580adef83f5 100644 --- a/airflow/serialization/serializers/timezone.py +++ b/airflow/serialization/serializers/timezone.py @@ -74,7 +74,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]: def deserialize(classname: str, version: int, data: object) -> Any: - from pendulum.tz import fixed_timezone, timezone + from airflow.utils.timezone import parse_timezone if not isinstance(data, (str, int)): raise TypeError(f"{data} is not of type int or str but of {type(data)}") @@ -82,9 +82,6 @@ def deserialize(classname: str, version: int, data: object) -> Any: if version > __version__: raise TypeError(f"serialized {version} of {classname} > {__version__}") - if isinstance(data, int): - return fixed_timezone(data) - if "zoneinfo.ZoneInfo" in classname: try: from zoneinfo import ZoneInfo @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any: return ZoneInfo(data) - return timezone(data) + return parse_timezone(data) # ported from pendulum.tz.timezone._get_tzinfo_name diff --git a/airflow/settings.py b/airflow/settings.py index 1a38a59ed301c..1e63466267ea7 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -40,6 +40,7 @@ from airflow.logging_config import configure_logging from airflow.utils.orm_event_handlers import setup_event_handlers from airflow.utils.state import State +from airflow.utils.timezone import parse_timezone, utc if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -54,9 +55,9 @@ if tz == "system": TIMEZONE = pendulum.tz.local_timezone() else: - TIMEZONE = pendulum.tz.timezone(tz) + TIMEZONE = parse_timezone(tz) except Exception: - TIMEZONE = pendulum.tz.timezone("UTC") + TIMEZONE = utc log.info("Configured default timezone %s", TIMEZONE) diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index f9b8efa465914..f9473ae2db2b8 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -22,14 +22,14 @@ from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException from croniter import CroniterBadCronError, CroniterBadDateError, croniter -from pendulum.tz.timezone import Timezone from airflow.exceptions import AirflowTimetableInvalid from airflow.utils.dates import cron_presets -from airflow.utils.timezone import convert_to_utc, make_aware, make_naive +from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone if TYPE_CHECKING: from pendulum import DateTime + from pendulum.tz.timezone import Timezone def _is_schedule_fixed(expression: str) -> bool: @@ -56,7 +56,7 @@ def __init__(self, cron: str, timezone: str | Timezone) -> None: self._expression = cron_presets.get(cron, cron) if isinstance(timezone, str): - timezone = Timezone(timezone) + timezone = parse_timezone(timezone) self._timezone = timezone try: diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index a042d4e9024de..fb241f482f511 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -24,7 +24,6 @@ import logging from typing import TYPE_CHECKING, Any, Generator, Iterable, overload -import pendulum from dateutil import relativedelta from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_ from sqlalchemy.dialects import mssql, mysql @@ -34,7 +33,7 @@ from airflow import settings from airflow.configuration import conf from airflow.serialization.enums import Encoding -from airflow.utils.timezone import make_naive +from airflow.utils.timezone import make_naive, utc if TYPE_CHECKING: from kubernetes.client.models.v1_pod import V1Pod @@ -46,8 +45,6 @@ log = logging.getLogger(__name__) -utc = pendulum.tz.timezone("UTC") - class UtcDateTime(TypeDecorator): """ diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 12c75bef5976b..3caa3b62be0bb 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -18,14 +18,20 @@ from __future__ import annotations import datetime as dt +from functools import lru_cache from typing import overload import pendulum from dateutil.relativedelta import relativedelta from pendulum.datetime import DateTime +from pendulum.tz import fixed_timezone +from pendulum.tz.timezone import FixedTimezone, Timezone -# UTC time zone as a tzinfo instance. -utc = pendulum.tz.timezone("UTC") +# UTC time zone as a FixedTimezone instance (subclass of tzinfo) +# This type uses for compatibility with type provided by pendulum 2.x +# - in pendulum 2.x ``pendulum.tz.timezone`` returns FixedTimezone +# - in pendulum 3.x ``pendulum.timezone`` returns Timezone +utc = FixedTimezone(offset=0, name="UTC") def is_localized(value): @@ -273,3 +279,24 @@ def _format_part(key: str) -> str: if not joined: return "<1s" return joined + + +@lru_cache(maxsize=None) +def parse_timezone(name: str | int) -> Timezone | FixedTimezone: + """ + Parse timezone and return one of the pendulum Timezone. + + Provide the same interface as ``pendulum.tz.timezone(name)`` + + .. note:: + This class for compatibility between pendulum 2 and 3. + In pendulum 3 ``pendulum.tz.timezone`` it is a module, which can't be used as parser + In pendulum 2 ``pendulum.timezone`` mypy failed on static check + + :meta: private + """ + if isinstance(name, int): + return fixed_timezone(name) + elif name.lower() == "utc": + return utc + return Timezone(name) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f4b872109f626..1b1f776871ca6 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -37,6 +37,7 @@ import pytest import time_machine from dateutil.relativedelta import relativedelta +from pendulum.tz.timezone import Timezone from sqlalchemy import inspect from airflow import settings @@ -96,7 +97,6 @@ pytestmark = pytest.mark.db_test TEST_DATE = datetime_tz(2015, 1, 2, 0, 0) - repo_root = Path(__file__).parents[2] @@ -676,8 +676,8 @@ def test_following_previous_schedule(self): """ Make sure DST transitions are properly observed """ - local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION) + local_tz = Timezone("Europe/Zurich") + start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, fold=0)) assert start.isoformat() == "2018-10-28T02:55:00+02:00", "Pre-condition: start date is in DST" utc = timezone.convert_to_utc(start) @@ -705,8 +705,8 @@ def test_following_previous_schedule_daily_dag_cest_to_cet(self): """ Make sure DST transitions are properly observed """ - local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 27, 3), dst_rule=pendulum.PRE_TRANSITION) + local_tz = Timezone("Europe/Zurich") + start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0)) utc = timezone.convert_to_utc(start) @@ -734,8 +734,8 @@ def test_following_previous_schedule_daily_dag_cet_to_cest(self): """ Make sure DST transitions are properly observed """ - local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 3, 25, 2), dst_rule=pendulum.PRE_TRANSITION) + local_tz = Timezone("Europe/Zurich") + start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0)) utc = timezone.convert_to_utc(start) diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py index 935d1cb128176..ad5044e1521db 100644 --- a/tests/sensors/test_time_sensor.py +++ b/tests/sensors/test_time_sensor.py @@ -23,7 +23,7 @@ import pendulum import pytest import time_machine -from pendulum.tz.timezone import UTC +from pendulum.tz.timezone import Timezone, UTC from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG @@ -33,7 +33,7 @@ DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00 DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1) -DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)) +DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=Timezone(DEFAULT_TIMEZONE)) class TestTimeSensor: @@ -47,7 +47,7 @@ class TestTimeSensor: ) @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) def test_timezone(self, default_timezone, start_date, expected): - with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)): + with patch("airflow.settings.TIMEZONE", Timezone(default_timezone)): dag = DAG("test", default_args={"start_date": start_date}) op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) assert op.poke(None) == expected diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py index 58e1f49df9058..a17d797646613 100644 --- a/tests/timetables/test_trigger_timetable.py +++ b/tests/timetables/test_trigger_timetable.py @@ -21,15 +21,16 @@ import dateutil.relativedelta import pendulum -import pendulum.tz import pytest import time_machine +from pendulum.tz.timezone import Timezone from airflow.exceptions import AirflowTimetableInvalid from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction from airflow.timetables.trigger import CronTriggerTimetable +from airflow.utils.timezone import utc -TIMEZONE = pendulum.tz.timezone("UTC") +TIMEZONE = utc START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) @@ -216,7 +217,7 @@ def test_validate_failure() -> None: ( CronTriggerTimetable( "0 0 1 12 0", - timezone=pendulum.tz.timezone("Asia/Taipei"), + timezone=Timezone("Asia/Taipei"), interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO), ), {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}}, @@ -229,5 +230,9 @@ def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.A tt = CronTriggerTimetable.deserialize(data) assert isinstance(tt, CronTriggerTimetable) assert tt._expression == timetable._expression - assert tt._timezone == timetable._timezone + if hasattr(tt._timezone, "offset"): + assert tt._timezone == timetable._timezone + else: + # ``parse_timezone`` use separate cache with pendulum v2 for IANA timezone + assert tt._timezone.name == timetable._timezone.name assert tt._interval == timetable._interval diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 655910394fb8c..bf47f8846a878 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -21,6 +21,7 @@ import pendulum import pytest +from pendulum.tz.timezone import Timezone from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger @@ -64,9 +65,9 @@ def test_timedelta_trigger_serialization(): @pytest.mark.parametrize( "tz", [ - pendulum.tz.timezone("UTC"), - pendulum.tz.timezone("Europe/Paris"), - pendulum.tz.timezone("America/Toronto"), + Timezone("UTC"), + Timezone("Europe/Paris"), + Timezone("America/Toronto"), ], ) @pytest.mark.asyncio diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index ff5ad26f5a31d..9c6a2ff40532f 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -21,13 +21,14 @@ import pendulum import pytest +from pendulum.tz.timezone import Timezone from airflow.utils import timezone -from airflow.utils.timezone import coerce_datetime +from airflow.utils.timezone import coerce_datetime, parse_timezone -CET = pendulum.tz.timezone("Europe/Paris") -EAT = pendulum.tz.timezone("Africa/Nairobi") # Africa/Nairobi -ICT = pendulum.tz.timezone("Asia/Bangkok") # Asia/Bangkok +CET = Timezone("Europe/Paris") +EAT = Timezone("Africa/Nairobi") +ICT = Timezone("Asia/Bangkok") UTC = timezone.utc @@ -117,3 +118,34 @@ def test_td_format(self): ) def test_coerce_datetime(input_datetime, output_datetime): assert output_datetime == coerce_datetime(input_datetime) + + +@pytest.mark.parametrize( + "tz_name", + [ + pytest.param("Europe/Paris", id="CET"), + pytest.param("Africa/Nairobi", id="EAT"), + pytest.param("Asia/Bangkok", id="ICT"), + ], +) +def test_parse_timezone_iana(tz_name): + tz = parse_timezone(tz_name) + assert tz.name == tz_name + assert parse_timezone(tz_name) is tz + + +@pytest.mark.parametrize( + "tz_name, expected_offset, expected_name", + [ + pytest.param("UTC", 0, "UTC", id="UTC"), + pytest.param("utc", 0, "UTC", id="utc-lower-case"), + pytest.param(0, 0, "+00:00", id="zero-offset"), + pytest.param(-3600, -3600, "-01:00", id="1-hour-behind"), + pytest.param(19800, 19800, "+05:30", id="5.5-hours-ahead"), + ], +) +def test_parse_timezone_offset(tz_name, expected_offset, expected_name): + tz = parse_timezone(tz_name) + assert tz.offset == expected_offset + assert tz.name == expected_name + assert parse_timezone(tz_name) is tz From f3d6a7251244e5e17539fd180a79620352508d95 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Mon, 23 Oct 2023 23:56:47 +0400 Subject: [PATCH 2/3] Add suggestion during code review Co-authored-by: Tzu-ping Chung Co-authored-by: bolkedebruin --- airflow/utils/timezone.py | 12 +++++++----- tests/sensors/test_time_sensor.py | 7 +++---- tests/utils/test_timezone.py | 10 ++++++++-- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 3caa3b62be0bb..61ad95f4b0647 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -18,7 +18,6 @@ from __future__ import annotations import datetime as dt -from functools import lru_cache from typing import overload import pendulum @@ -27,11 +26,14 @@ from pendulum.tz import fixed_timezone from pendulum.tz.timezone import FixedTimezone, Timezone -# UTC time zone as a FixedTimezone instance (subclass of tzinfo) -# This type uses for compatibility with type provided by pendulum 2.x +from airflow.compat.functools import cache + +# UTC time zone as a Timezone instance (subclass of tzinfo) +# This type uses for compatibility between pendulum v2 and v3 # - in pendulum 2.x ``pendulum.tz.timezone`` returns FixedTimezone # - in pendulum 3.x ``pendulum.timezone`` returns Timezone -utc = FixedTimezone(offset=0, name="UTC") +# Same is valid for pendulum.tz.UTC +utc = Timezone("UTC") def is_localized(value): @@ -281,7 +283,7 @@ def _format_part(key: str) -> str: return joined -@lru_cache(maxsize=None) +@cache def parse_timezone(name: str | int) -> Timezone | FixedTimezone: """ Parse timezone and return one of the pendulum Timezone. diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py index ad5044e1521db..ae77760313570 100644 --- a/tests/sensors/test_time_sensor.py +++ b/tests/sensors/test_time_sensor.py @@ -23,7 +23,7 @@ import pendulum import pytest import time_machine -from pendulum.tz.timezone import Timezone, UTC +from pendulum.tz.timezone import Timezone from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG @@ -72,8 +72,7 @@ def test_target_time_aware(self): with DAG("test_target_time_aware", start_date=timezone.datetime(2020, 1, 1, 23, 0)): aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone()) op = TimeSensorAsync(task_id="test", target_time=aware_time) - assert hasattr(op.target_datetime.tzinfo, "offset") - assert op.target_datetime.tzinfo.offset == 0 + assert op.target_datetime.tzinfo == timezone.utc def test_target_time_naive_dag_timezone(self): """ @@ -85,4 +84,4 @@ def test_target_time_naive_dag_timezone(self): ): op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0)) assert op.target_datetime.time() == pendulum.time(1, 0) - assert op.target_datetime.tzinfo == UTC + assert op.target_datetime.tzinfo == timezone.utc diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index 9c6a2ff40532f..879f2d37d84bb 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -134,11 +134,17 @@ def test_parse_timezone_iana(tz_name): assert parse_timezone(tz_name) is tz +@pytest.mark.parametrize("tz_name", ["utc", "UTC", "uTc"]) +def test_parse_timezone_utc(tz_name): + tz = parse_timezone(tz_name) + assert tz.name == "UTC" + assert parse_timezone(tz_name) is tz + assert tz is timezone.utc, "Expected that UTC timezone is same object as `airflow.utils.timezone.utc`" + + @pytest.mark.parametrize( "tz_name, expected_offset, expected_name", [ - pytest.param("UTC", 0, "UTC", id="UTC"), - pytest.param("utc", 0, "UTC", id="utc-lower-case"), pytest.param(0, 0, "+00:00", id="zero-offset"), pytest.param(-3600, -3600, "-01:00", id="1-hour-behind"), pytest.param(19800, 19800, "+05:30", id="5.5-hours-ahead"), From ee273101d38f9e1947b5887e4b4a44bcb5d06a25 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Wed, 22 Nov 2023 12:52:20 +0400 Subject: [PATCH 3/3] Revert internal utc timzone to FixedTimezone type --- airflow/utils/timezone.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 61ad95f4b0647..bb51b0d2c90fe 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -28,12 +28,10 @@ from airflow.compat.functools import cache -# UTC time zone as a Timezone instance (subclass of tzinfo) -# This type uses for compatibility between pendulum v2 and v3 -# - in pendulum 2.x ``pendulum.tz.timezone`` returns FixedTimezone -# - in pendulum 3.x ``pendulum.timezone`` returns Timezone -# Same is valid for pendulum.tz.UTC -utc = Timezone("UTC") +# UTC time zone as a FixedTimezone instance (subclass of tzinfo) +# This type uses for compatibility with type provided by pendulum 2.x, +# some of the components might not work correctly with `pendulum.tz.timezone.Timezone` in pendulum 2.x +utc = FixedTimezone(offset=0, name="UTC") def is_localized(value):