From bab867f7c60410f1e34a1959a16bf098879dce70 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 26 Aug 2024 21:23:26 +0200 Subject: [PATCH 1/2] Move away from deprecated DAG.following_schedule() method --- airflow/providers/google/cloud/operators/gcs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index c396e173eaad1..0ea68198b46db 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -795,11 +795,11 @@ def execute(self, context: Context) -> list[str]: orig_end = context["data_interval_end"] except KeyError: orig_start = pendulum.instance(context["execution_date"]) - following_execution_date = context["dag"].following_schedule(context["execution_date"]) - if following_execution_date is None: - orig_end = None + next_dagrun = context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False) + if next_dagrun and next_dagrun.data_interval and next_dagrun.data_interval.end: + orig_end = pendulum.instance(next_dagrun.data_interval.end) else: - orig_end = pendulum.instance(following_execution_date) + orig_end = None timespan_start = orig_start if orig_end is None: # Only possible in Airflow before 2.2. From b2739b92da044d6bbb64c93609e031c887c5eae7 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 27 Aug 2024 22:50:49 +0200 Subject: [PATCH 2/2] Update mock to changed logic and remove one layer of Pendulum --- .../providers/google/cloud/operators/gcs.py | 2 +- .../google/cloud/operators/test_gcs.py | 22 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index 0ea68198b46db..561de4c46edd4 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -797,7 +797,7 @@ def execute(self, context: Context) -> list[str]: orig_start = pendulum.instance(context["execution_date"]) next_dagrun = context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False) if next_dagrun and next_dagrun.data_interval and next_dagrun.data_interval.end: - orig_end = pendulum.instance(next_dagrun.data_interval.end) + orig_end = next_dagrun.data_interval.end else: orig_end = None diff --git a/tests/providers/google/cloud/operators/test_gcs.py b/tests/providers/google/cloud/operators/test_gcs.py index 1a5acd0bf610e..fcb5fcb92514f 100644 --- a/tests/providers/google/cloud/operators/test_gcs.py +++ b/tests/providers/google/cloud/operators/test_gcs.py @@ -21,6 +21,7 @@ from pathlib import Path from unittest import mock +import pendulum import pytest from airflow.providers.common.compat.openlineage.facet import ( @@ -40,6 +41,7 @@ GCSSynchronizeBucketsOperator, GCSTimeSpanFileTransformOperator, ) +from airflow.timetables.base import DagRunInfo, DataInterval TASK_ID = "test-gcs-operator" TEST_BUCKET = "test-bucket" @@ -395,7 +397,15 @@ def test_execute(self, mock_hook, mock_subprocess, mock_tempdir): timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc) timespan_end = timespan_start + timedelta(hours=1) mock_dag = mock.Mock() - mock_dag.following_schedule = lambda x: x + timedelta(hours=1) + mock_dag.next_dagrun_info.side_effect = [ + DagRunInfo( + run_after=pendulum.instance(timespan_start), + data_interval=DataInterval( + start=pendulum.instance(timespan_start), + end=pendulum.instance(timespan_end), + ), + ), + ] mock_ti = mock.Mock() context = dict( execution_date=timespan_start, @@ -575,7 +585,15 @@ def test_get_openlineage_facets_on_complete( timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc) mock_dag = mock.Mock() - mock_dag.following_schedule = lambda x: x + timedelta(hours=1) + mock_dag.next_dagrun_info.side_effect = [ + DagRunInfo( + run_after=pendulum.instance(timespan_start), + data_interval=DataInterval( + start=pendulum.instance(timespan_start), + end=None, + ), + ), + ] context = dict( execution_date=timespan_start, dag=mock_dag,