diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index c396e173eaad1..561de4c46edd4 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -795,11 +795,11 @@ def execute(self, context: Context) -> list[str]: orig_end = context["data_interval_end"] except KeyError: orig_start = pendulum.instance(context["execution_date"]) - following_execution_date = context["dag"].following_schedule(context["execution_date"]) - if following_execution_date is None: - orig_end = None + next_dagrun = context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False) + if next_dagrun and next_dagrun.data_interval and next_dagrun.data_interval.end: + orig_end = next_dagrun.data_interval.end else: - orig_end = pendulum.instance(following_execution_date) + orig_end = None timespan_start = orig_start if orig_end is None: # Only possible in Airflow before 2.2. diff --git a/tests/providers/google/cloud/operators/test_gcs.py b/tests/providers/google/cloud/operators/test_gcs.py index 1a5acd0bf610e..fcb5fcb92514f 100644 --- a/tests/providers/google/cloud/operators/test_gcs.py +++ b/tests/providers/google/cloud/operators/test_gcs.py @@ -21,6 +21,7 @@ from pathlib import Path from unittest import mock +import pendulum import pytest from airflow.providers.common.compat.openlineage.facet import ( @@ -40,6 +41,7 @@ GCSSynchronizeBucketsOperator, GCSTimeSpanFileTransformOperator, ) +from airflow.timetables.base import DagRunInfo, DataInterval TASK_ID = "test-gcs-operator" TEST_BUCKET = "test-bucket" @@ -395,7 +397,15 @@ def test_execute(self, mock_hook, mock_subprocess, mock_tempdir): timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc) timespan_end = timespan_start + timedelta(hours=1) mock_dag = mock.Mock() - mock_dag.following_schedule = lambda x: x + timedelta(hours=1) + mock_dag.next_dagrun_info.side_effect = [ + DagRunInfo( + run_after=pendulum.instance(timespan_start), + data_interval=DataInterval( + start=pendulum.instance(timespan_start), + end=pendulum.instance(timespan_end), + ), + ), + ] mock_ti = mock.Mock() context = dict( execution_date=timespan_start, @@ -575,7 +585,15 @@ def test_get_openlineage_facets_on_complete( timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc) mock_dag = mock.Mock() - mock_dag.following_schedule = lambda x: x + timedelta(hours=1) + mock_dag.next_dagrun_info.side_effect = [ + DagRunInfo( + run_after=pendulum.instance(timespan_start), + data_interval=DataInterval( + start=pendulum.instance(timespan_start), + end=None, + ), + ), + ] context = dict( execution_date=timespan_start, dag=mock_dag,