Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -165,9 +166,9 @@ def encode_timezone(var: Timezone) -> str | int:
)


def decode_timezone(var: str | int) -> Timezone:
def decode_timezone(var: str | int) -> Timezone | FixedTimezone:
"""Decode a previously serialized Pendulum Timezone."""
return pendulum.tz.timezone(var)
return parse_timezone(var)


def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
Expand Down Expand Up @@ -605,7 +606,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone
_deserialize_timezone = parse_timezone

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
15 changes: 7 additions & 8 deletions airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
serialize as serialize_timezone,
)
from airflow.utils.module_loading import qualname
from airflow.utils.timezone import convert_to_utc, is_naive
from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
import datetime

from pendulum import DateTime
from pendulum.tz import fixed_timezone, timezone

tz: datetime.tzinfo | None = None
if isinstance(data, dict) and TIMEZONE in data:
if version == 1:
# try to deserialize unsupported timezones
timezone_mapping = {
"EDT": fixed_timezone(-4 * 3600),
"CDT": fixed_timezone(-5 * 3600),
"MDT": fixed_timezone(-6 * 3600),
"PDT": fixed_timezone(-7 * 3600),
"CEST": timezone("CET"),
"EDT": parse_timezone(-4 * 3600),
"CDT": parse_timezone(-5 * 3600),
"MDT": parse_timezone(-6 * 3600),
"PDT": parse_timezone(-7 * 3600),
"CEST": parse_timezone("CET"),
}
if data[TIMEZONE] in timezone_mapping:
tz = timezone_mapping[data[TIMEZONE]]
else:
tz = timezone(data[TIMEZONE])
tz = parse_timezone(data[TIMEZONE])
else:
tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])

Expand Down
7 changes: 2 additions & 5 deletions airflow/serialization/serializers/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ def serialize(o: object) -> tuple[U, str, int, bool]:


def deserialize(classname: str, version: int, data: object) -> Any:
from pendulum.tz import fixed_timezone, timezone
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if isinstance(data, int):
return fixed_timezone(data)

if "zoneinfo.ZoneInfo" in classname:
try:
from zoneinfo import ZoneInfo
Expand All @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:

return ZoneInfo(data)

return timezone(data)
return parse_timezone(data)


# ported from pendulum.tz.timezone._get_tzinfo_name
Expand Down
5 changes: 3 additions & 2 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.state import State
from airflow.utils.timezone import parse_timezone, utc

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand All @@ -54,9 +55,9 @@
if tz == "system":
TIMEZONE = pendulum.tz.local_timezone()
else:
TIMEZONE = pendulum.tz.timezone(tz)
TIMEZONE = parse_timezone(tz)
except Exception:
TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)

Expand Down
6 changes: 3 additions & 3 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter
from pendulum.tz.timezone import Timezone

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone

if TYPE_CHECKING:
from pendulum import DateTime
from pendulum.tz.timezone import Timezone


def _is_schedule_fixed(expression: str) -> bool:
Expand All @@ -56,7 +56,7 @@ def __init__(self, cron: str, timezone: str | Timezone) -> None:
self._expression = cron_presets.get(cron, cron)

if isinstance(timezone, str):
timezone = Timezone(timezone)
timezone = parse_timezone(timezone)
self._timezone = timezone

try:
Expand Down
5 changes: 1 addition & 4 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import logging
from typing import TYPE_CHECKING, Any, Generator, Iterable, overload

import pendulum
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
Expand All @@ -34,7 +33,7 @@
from airflow import settings
from airflow.configuration import conf
from airflow.serialization.enums import Encoding
from airflow.utils.timezone import make_naive
from airflow.utils.timezone import make_naive, utc

if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
Expand All @@ -46,8 +45,6 @@

log = logging.getLogger(__name__)

utc = pendulum.tz.timezone("UTC")


class UtcDateTime(TypeDecorator):
"""
Expand Down
31 changes: 29 additions & 2 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
import pendulum
from dateutil.relativedelta import relativedelta
from pendulum.datetime import DateTime
from pendulum.tz import fixed_timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

# UTC time zone as a tzinfo instance.
utc = pendulum.tz.timezone("UTC")
from airflow.compat.functools import cache

# UTC time zone as a FixedTimezone instance (subclass of tzinfo)
# This type uses for compatibility with type provided by pendulum 2.x,
# some of the components might not work correctly with `pendulum.tz.timezone.Timezone` in pendulum 2.x
utc = FixedTimezone(offset=0, name="UTC")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an issue moving to Pendulum 3 UTC? So moving from FixedTimezone to Timezone? FixedTimezone derives from Timezone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting question. In general speaking right now we use two different version of UTC timezone in airflow codebase.

  1. FixedTimezone created by pendulum.tz.timezone("UTC") in most part of codebase
  2. Timezone created by pendulum.tz.timezone.Timezone("UTC") in timetables

This is a two different objects with different attributes however it should work exactly the same. So it might be a good idea to try to resolve different implementation of UTC timezone in airflow codebase, I just choose to keep the same as it now in most parts, but I happy to change it to pendulum.tz.timezone.Timezone("UTC")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolkedebruin
Well as we could see on our CI Timezone("UTC") in pendulum 2.x might not work into some specific cases such

else:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()

This code works fine in pendulum 3, however it failed (or spawn a lot of warnings) in 2.1.2

from pendulum.tz.timezone import FixedTimezone, Timezone
from datetime import datetime
from pandas.tseries.holiday import USFederalHolidayCalendar

holiday_calendar = USFederalHolidayCalendar()

for utc in [FixedTimezone(offset=0, name="UTC"), Timezone("UTC")]:
    print(f" {utc} {type(utc).__name__} ".center(72, "="))
    next_start = datetime(2021, 9, 5, tzinfo=utc)
    result = holiday_calendar.holidays(start=next_start, end=next_start)
    print(f"Result: {next_start}")

Copy link
Contributor Author

@Taragolis Taragolis Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And revert it back to FixedTimezone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what about using

from datetime import timezone

utc = timezone.utc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check this one, some of the methods pendulum 2 have a problem with non-pendulum's timezones, e.g. ZoneInfo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then leave it for now, we can switch later



def is_localized(value):
Expand Down Expand Up @@ -273,3 +279,24 @@ def _format_part(key: str) -> str:
if not joined:
return "<1s"
return joined


@cache
def parse_timezone(name: str | int) -> Timezone | FixedTimezone:
"""
Parse timezone and return one of the pendulum Timezone.

Provide the same interface as ``pendulum.tz.timezone(name)``

.. note::
This class for compatibility between pendulum 2 and 3.
In pendulum 3 ``pendulum.tz.timezone`` it is a module, which can't be used as parser
In pendulum 2 ``pendulum.timezone`` mypy failed on static check

:meta: private
"""
if isinstance(name, int):
return fixed_timezone(name)
elif name.lower() == "utc":
return utc
return Timezone(name)
14 changes: 7 additions & 7 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import pytest
import time_machine
from dateutil.relativedelta import relativedelta
from pendulum.tz.timezone import Timezone
from sqlalchemy import inspect

from airflow import settings
Expand Down Expand Up @@ -96,7 +97,6 @@
pytestmark = pytest.mark.db_test

TEST_DATE = datetime_tz(2015, 1, 2, 0, 0)

repo_root = Path(__file__).parents[2]


Expand Down Expand Up @@ -676,8 +676,8 @@ def test_following_previous_schedule(self):
"""
Make sure DST transitions are properly observed
"""
local_tz = pendulum.timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION)
local_tz = Timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, fold=0))
assert start.isoformat() == "2018-10-28T02:55:00+02:00", "Pre-condition: start date is in DST"

utc = timezone.convert_to_utc(start)
Expand Down Expand Up @@ -705,8 +705,8 @@ def test_following_previous_schedule_daily_dag_cest_to_cet(self):
"""
Make sure DST transitions are properly observed
"""
local_tz = pendulum.timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 10, 27, 3), dst_rule=pendulum.PRE_TRANSITION)
local_tz = Timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0))

utc = timezone.convert_to_utc(start)

Expand Down Expand Up @@ -734,8 +734,8 @@ def test_following_previous_schedule_daily_dag_cet_to_cest(self):
"""
Make sure DST transitions are properly observed
"""
local_tz = pendulum.timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 3, 25, 2), dst_rule=pendulum.PRE_TRANSITION)
local_tz = Timezone("Europe/Zurich")
start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0))

utc = timezone.convert_to_utc(start)

Expand Down
11 changes: 5 additions & 6 deletions tests/sensors/test_time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pendulum
import pytest
import time_machine
from pendulum.tz.timezone import UTC
from pendulum.tz.timezone import Timezone

from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
Expand All @@ -33,7 +33,7 @@

DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE))
DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=Timezone(DEFAULT_TIMEZONE))


class TestTimeSensor:
Expand All @@ -47,7 +47,7 @@ class TestTimeSensor:
)
@time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc))
def test_timezone(self, default_timezone, start_date, expected):
with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
with patch("airflow.settings.TIMEZONE", Timezone(default_timezone)):
dag = DAG("test", default_args={"start_date": start_date})
op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
assert op.poke(None) == expected
Expand All @@ -72,8 +72,7 @@ def test_target_time_aware(self):
with DAG("test_target_time_aware", start_date=timezone.datetime(2020, 1, 1, 23, 0)):
aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone())
op = TimeSensorAsync(task_id="test", target_time=aware_time)
assert hasattr(op.target_datetime.tzinfo, "offset")
assert op.target_datetime.tzinfo.offset == 0
assert op.target_datetime.tzinfo == timezone.utc

def test_target_time_naive_dag_timezone(self):
"""
Expand All @@ -85,4 +84,4 @@ def test_target_time_naive_dag_timezone(self):
):
op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0))
assert op.target_datetime.time() == pendulum.time(1, 0)
assert op.target_datetime.tzinfo == UTC
assert op.target_datetime.tzinfo == timezone.utc
13 changes: 9 additions & 4 deletions tests/timetables/test_trigger_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@

import dateutil.relativedelta
import pendulum
import pendulum.tz
import pytest
import time_machine
from pendulum.tz.timezone import Timezone

from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils.timezone import utc

TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc
START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE)

PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
Expand Down Expand Up @@ -216,7 +217,7 @@ def test_validate_failure() -> None:
(
CronTriggerTimetable(
"0 0 1 12 0",
timezone=pendulum.tz.timezone("Asia/Taipei"),
timezone=Timezone("Asia/Taipei"),
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
),
{"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", "interval": {"weekday": [0]}},
Expand All @@ -229,5 +230,9 @@ def test_serialization(timetable: CronTriggerTimetable, data: dict[str, typing.A
tt = CronTriggerTimetable.deserialize(data)
assert isinstance(tt, CronTriggerTimetable)
assert tt._expression == timetable._expression
assert tt._timezone == timetable._timezone
if hasattr(tt._timezone, "offset"):
assert tt._timezone == timetable._timezone
else:
# ``parse_timezone`` use separate cache with pendulum v2 for IANA timezone
assert tt._timezone.name == timetable._timezone.name
assert tt._interval == timetable._interval
7 changes: 4 additions & 3 deletions tests/triggers/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import pendulum
import pytest
from pendulum.tz.timezone import Timezone

from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
Expand Down Expand Up @@ -64,9 +65,9 @@ def test_timedelta_trigger_serialization():
@pytest.mark.parametrize(
"tz",
[
pendulum.tz.timezone("UTC"),
pendulum.tz.timezone("Europe/Paris"),
pendulum.tz.timezone("America/Toronto"),
Timezone("UTC"),
Timezone("Europe/Paris"),
Timezone("America/Toronto"),
],
)
@pytest.mark.asyncio
Expand Down
Loading