Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,55 @@ jobs:
uses: ./.github/actions/post_tests_failure
if: failure()

tests-postgres-pre-release-pendulum:
timeout-minutes: 130
name: >
PreReleasePendulum${{needs.build-info.outputs.default-postgres-version}},
Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: "${{needs.build-info.outputs.runs-on}}"
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "postgres"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
USE_PENDULUM_PRERELEASE: "3.0.0b1"
JOB_ID: >
postgres-pre-release-pendulum-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v3
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: breeze testing tests --run-in-parallel
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Pendulum"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Pendulum"
uses: ./.github/actions/post_tests_failure
if: failure()

tests-mysql:
timeout-minutes: 130
name: >
Expand Down
7 changes: 7 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,13 @@ if [[ ${DOWNGRADE_SQLALCHEMY=} == "true" ]]; then
pip install --root-user-action ignore "sqlalchemy==${min_sqlalchemy_version}"
pip check
fi
if [[ -n ${USE_PENDULUM_PRERELEASE:=} ]]; then
echo
echo "${COLOR_BLUE}Replace pendulum by pre-release version: ${USE_PENDULUM_PRERELEASE}${COLOR_RESET}"
echo
pip install --root-user-action ignore "pendulum==${USE_PENDULUM_PRERELEASE}"
pip check
fi

set +u
if [[ "${RUN_TESTS}" != "true" ]]; then
Expand Down
7 changes: 4 additions & 3 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -165,9 +166,9 @@ def encode_timezone(var: Timezone) -> str | int:
)


def decode_timezone(var: str | int) -> Timezone:
def decode_timezone(var: str | int) -> Timezone | FixedTimezone:
"""Decode a previously serialized Pendulum Timezone."""
return pendulum.tz.timezone(var)
return parse_timezone(var)


def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
Expand Down Expand Up @@ -605,7 +606,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone
_deserialize_timezone = parse_timezone

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
15 changes: 7 additions & 8 deletions airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
serialize as serialize_timezone,
)
from airflow.utils.module_loading import qualname
from airflow.utils.timezone import convert_to_utc, is_naive
from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
import datetime

from pendulum import DateTime
from pendulum.tz import fixed_timezone, timezone

tz: datetime.tzinfo | None = None
if isinstance(data, dict) and TIMEZONE in data:
if version == 1:
# try to deserialize unsupported timezones
timezone_mapping = {
"EDT": fixed_timezone(-4 * 3600),
"CDT": fixed_timezone(-5 * 3600),
"MDT": fixed_timezone(-6 * 3600),
"PDT": fixed_timezone(-7 * 3600),
"CEST": timezone("CET"),
"EDT": parse_timezone(-4 * 3600),
"CDT": parse_timezone(-5 * 3600),
"MDT": parse_timezone(-6 * 3600),
"PDT": parse_timezone(-7 * 3600),
"CEST": parse_timezone("CET"),
}
if data[TIMEZONE] in timezone_mapping:
tz = timezone_mapping[data[TIMEZONE]]
else:
tz = timezone(data[TIMEZONE])
tz = parse_timezone(data[TIMEZONE])
else:
tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])

Expand Down
7 changes: 2 additions & 5 deletions airflow/serialization/serializers/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ def serialize(o: object) -> tuple[U, str, int, bool]:


def deserialize(classname: str, version: int, data: object) -> Any:
from pendulum.tz import fixed_timezone, timezone
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if isinstance(data, int):
return fixed_timezone(data)

if "zoneinfo.ZoneInfo" in classname:
try:
from zoneinfo import ZoneInfo
Expand All @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:

return ZoneInfo(data)

return timezone(data)
return parse_timezone(data)


# ported from pendulum.tz.timezone._get_tzinfo_name
Expand Down
5 changes: 3 additions & 2 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.state import State
from airflow.utils.timezone import parse_timezone, utc

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand All @@ -54,9 +55,9 @@
if tz == "system":
TIMEZONE = pendulum.tz.local_timezone()
else:
TIMEZONE = pendulum.tz.timezone(tz)
TIMEZONE = parse_timezone(tz)
except Exception:
TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)

Expand Down
6 changes: 3 additions & 3 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter
from pendulum.tz.timezone import Timezone

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone

if TYPE_CHECKING:
from pendulum import DateTime
from pendulum.tz.timezone import Timezone


def _is_schedule_fixed(expression: str) -> bool:
Expand All @@ -56,7 +56,7 @@ def __init__(self, cron: str, timezone: str | Timezone) -> None:
self._expression = cron_presets.get(cron, cron)

if isinstance(timezone, str):
timezone = Timezone(timezone)
timezone = parse_timezone(timezone)
self._timezone = timezone

try:
Expand Down
5 changes: 1 addition & 4 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import logging
from typing import TYPE_CHECKING, Any, Generator, Iterable, overload

import pendulum
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
Expand All @@ -34,7 +33,7 @@
from airflow import settings
from airflow.configuration import conf
from airflow.serialization.enums import Encoding
from airflow.utils.timezone import make_naive
from airflow.utils.timezone import make_naive, utc

if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
Expand All @@ -46,8 +45,6 @@

log = logging.getLogger(__name__)

utc = pendulum.tz.timezone("UTC")


class UtcDateTime(TypeDecorator):
"""
Expand Down
33 changes: 31 additions & 2 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@
import pendulum
from dateutil.relativedelta import relativedelta
from pendulum.datetime import DateTime
from pendulum.tz import fixed_timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

# UTC time zone as a tzinfo instance.
utc = pendulum.tz.timezone("UTC")
from airflow.compat.functools import cache

# UTC time zone as a Timezone instance (subclass of tzinfo)
# This type uses for compatibility between pendulum v2 and v3
# - in pendulum 2.x ``pendulum.tz.timezone`` returns FixedTimezone
# - in pendulum 3.x ``pendulum.timezone`` returns Timezone
# Same is valid for pendulum.tz.UTC
utc = Timezone("UTC")


def is_localized(value):
Expand Down Expand Up @@ -273,3 +281,24 @@ def _format_part(key: str) -> str:
if not joined:
return "<1s"
return joined


@cache
def parse_timezone(name: str | int) -> Timezone | FixedTimezone:
"""
Parse timezone and return one of the pendulum Timezone.

Provide the same interface as ``pendulum.tz.timezone(name)``

.. note::
This class for compatibility between pendulum 2 and 3.
In pendulum 3 ``pendulum.tz.timezone`` it is a module, which can't be used as parser
In pendulum 2 ``pendulum.timezone`` mypy failed on static check

:meta: private
"""
if isinstance(name, int):
return fixed_timezone(name)
elif name.lower() == "utc":
return utc
return Timezone(name)
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
option_upgrade_boto,
option_use_airflow_version,
option_use_packages_from_dist,
option_use_pendulum_prerelease,
option_verbose,
)
from airflow_breeze.utils.console import get_console
Expand Down Expand Up @@ -167,6 +168,7 @@ def run(self):
@option_include_mypy_volume
@option_upgrade_boto
@option_downgrade_sqlalchemy
@option_use_pendulum_prerelease
@option_verbose
@option_dry_run
@option_github_repository
Expand Down Expand Up @@ -206,6 +208,7 @@ def shell(
upgrade_boto: bool,
downgrade_sqlalchemy: bool,
standalone_dag_processor: bool,
use_pendulum_prerelease: str,
):
"""Enter breeze environment. this is the default command use when no other is selected."""
if get_verbose() or get_dry_run():
Expand Down Expand Up @@ -246,6 +249,7 @@ def shell(
upgrade_boto=upgrade_boto,
downgrade_sqlalchemy=downgrade_sqlalchemy,
standalone_dag_processor=standalone_dag_processor,
use_pendulum_prerelease=use_pendulum_prerelease,
)
sys.exit(result.returncode)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"options": [
"--upgrade-boto",
"--downgrade-sqlalchemy",
"--use-pendulum-prerelease",
],
},
],
Expand Down
4 changes: 4 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
option_skip_cleanup,
option_upgrade_boto,
option_use_airflow_version,
option_use_pendulum_prerelease,
option_verbose,
)
from airflow_breeze.utils.console import Output, get_console
Expand Down Expand Up @@ -363,6 +364,7 @@ def run_tests_in_parallel(
)
@option_upgrade_boto
@option_downgrade_sqlalchemy
@option_use_pendulum_prerelease
@click.option(
"--collect-only",
help="Collect tests only, do not run them.",
Expand Down Expand Up @@ -407,6 +409,7 @@ def command_for_tests(
extra_pytest_args: tuple,
upgrade_boto: bool,
downgrade_sqlalchemy: bool,
use_pendulum_prerelease: str,
collect_only: bool,
remove_arm_packages: bool,
github_repository: str,
Expand All @@ -428,6 +431,7 @@ def command_for_tests(
test_type=test_type,
upgrade_boto=upgrade_boto,
downgrade_sqlalchemy=downgrade_sqlalchemy,
use_pendulum_prerelease=use_pendulum_prerelease,
collect_only=collect_only,
remove_arm_packages=remove_arm_packages,
github_repository=github_repository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"--mount-sources",
"--upgrade-boto",
"--downgrade-sqlalchemy",
"--use-pendulum-prerelease",
"--remove-arm-packages",
"--skip-docker-compose-down",
],
Expand Down
1 change: 1 addition & 0 deletions dev/breeze/src/airflow_breeze/params/shell_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ShellParams:
verbose: bool = False
upgrade_boto: bool = False
downgrade_sqlalchemy: bool = False
use_pendulum_prerelease: str = ""
executor: str = START_AIRFLOW_DEFAULT_ALLOWED_EXECUTORS
celery_broker: str = DEFAULT_CELERY_BROKER
celery_flower: bool = False
Expand Down
8 changes: 8 additions & 0 deletions dev/breeze/src/airflow_breeze/utils/common_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
CacheableDefault,
DryRunOption,
MySQLBackendVersionType,
PreReleaseVersion,
UseAirflowVersionType,
VerboseOption,
)
Expand Down Expand Up @@ -642,3 +643,10 @@ def _set_default_from_parent(ctx: click.core.Context, option: click.core.Option,
is_flag=True,
envvar="DOWNGRADE_SQLALCHEMY",
)
option_use_pendulum_prerelease = click.option(
"--use-pendulum-prerelease",
help="Use pre-released version of pendulum.",
required=False,
type=PreReleaseVersion(),
envvar="USE_PENDULUM_PRERELEASE",
)
Loading