From 7e98b3d55fbfeb56004cfcf8d0d4960b97e1dfb4 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Thu, 31 Oct 2024 16:35:54 +0000 Subject: [PATCH 1/8] move triggers and left over operators and sensors into standard provider --- .../example_external_task_marker_dag.py | 2 +- airflow/models/dag.py | 4 ++-- airflow/serialization/serialized_objects.py | 2 +- .../authoring-and-scheduling/deferring.rst | 12 ++++++------ .../providers/standard}/operators/branch.py | 0 .../providers/standard}/operators/email.py | 0 .../standard}/operators/latest_only.py | 0 .../standard}/operators/trigger_dagrun.py | 2 +- .../standard}/sensors/external_task.py | 2 +- .../providers/standard}/sensors/filesystem.py | 4 ++-- .../providers/standard/triggers/__init__.py | 17 +++++++++++++++++ .../standard}/triggers/external_task.py | 4 ++-- .../providers/standard}/triggers/file.py | 14 ++------------ .../providers/standard}/triggers/temporal.py | 0 providers/tests/standard/triggers/__init__.py | 16 ++++++++++++++++ .../standard}/triggers/test_external_task.py | 16 ++++++++-------- .../tests/standard}/triggers/test_file.py | 4 ++-- .../tests/standard}/triggers/test_temporal.py | 10 +++++----- scripts/cov/other_coverage.py | 4 ++-- tests/cli/commands/test_dag_command.py | 2 +- tests/cli/commands/test_task_command.py | 6 +++--- ...test_external_task_sensor_check_existense.py | 2 +- tests/jobs/test_triggerer_job.py | 2 +- tests/operators/test_trigger_dagrun.py | 2 +- tests/sensors/test_external_task_sensor.py | 4 ++-- tests/sensors/test_filesystem.py | 2 +- tests/serialization/test_dag_serialization.py | 10 +++++----- .../example_external_task_parent_deferrable.py | 2 +- 28 files changed, 84 insertions(+), 61 deletions(-) rename {airflow => providers/src/airflow/providers/standard}/operators/branch.py (100%) rename {airflow => providers/src/airflow/providers/standard}/operators/email.py (100%) rename {airflow => providers/src/airflow/providers/standard}/operators/latest_only.py (100%) rename {airflow => providers/src/airflow/providers/standard}/operators/trigger_dagrun.py (99%) rename {airflow => providers/src/airflow/providers/standard}/sensors/external_task.py (99%) rename {airflow => providers/src/airflow/providers/standard}/sensors/filesystem.py (97%) create mode 100644 providers/src/airflow/providers/standard/triggers/__init__.py rename {airflow => providers/src/airflow/providers/standard}/triggers/external_task.py (97%) rename {airflow => providers/src/airflow/providers/standard}/triggers/file.py (85%) rename {airflow => providers/src/airflow/providers/standard}/triggers/temporal.py (100%) create mode 100644 providers/tests/standard/triggers/__init__.py rename {tests => providers/tests/standard}/triggers/test_external_task.py (93%) rename {tests => providers/tests/standard}/triggers/test_file.py (93%) rename {tests => providers/tests/standard}/triggers/test_temporal.py (92%) diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index abaf21770523a..0a282a834195c 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -44,7 +44,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor start_date = pendulum.datetime(2021, 1, 1, tz="UTC") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 851d2a5129346..f2d5ff74b62b8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -73,7 +73,7 @@ from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call -from airflow.assets import Asset, AssetAlias, BaseAsset +from airflow.assets import BaseAsset from airflow.configuration import conf as airflow_conf, secrets_backend_list from airflow.exceptions import ( AirflowException, @@ -1115,7 +1115,7 @@ def _get_task_instances( if include_dependent_dags: # Recursively find external tasks indicated by ExternalTaskMarker - from airflow.sensors.external_task import ExternalTaskMarker + from airflow.providers.standard.sensors.external_task import ExternalTaskMarker query = tis if as_pk_tuple: diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 79403860f5fac..ea2f225de36da 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1016,7 +1016,7 @@ class DependencyDetector: def detect_task_dependencies(task: Operator) -> list[DagDependency]: """Detect dependencies caused by tasks.""" from airflow.operators.trigger_dagrun import TriggerDagRunOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor deps = [] if isinstance(task, TriggerDagRunOperator): diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 0b477151a9091..c208e8cedcbc9 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -68,7 +68,7 @@ When writing a deferrable operators these are the main points to consider: from airflow.configuration import conf from airflow.sensors.base import BaseSensorOperator - from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger from airflow.utils.context import Context @@ -122,7 +122,7 @@ This example shows the structure of a basic trigger, a very simplified version o self.moment = moment def serialize(self): - return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) + return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) async def run(self): while self.moment > timezone.utcnow(): @@ -177,7 +177,7 @@ Here's a basic example of how a sensor might trigger deferral: from typing import TYPE_CHECKING, Any from airflow.sensors.base import BaseSensorOperator - from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger if TYPE_CHECKING: from airflow.utils.context import Context @@ -237,7 +237,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as `` class WaitOneHourSensor(BaseSensorOperator): start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", trigger_kwargs={"moment": timedelta(hours=1)}, next_method="execute_complete", next_kwargs=None, @@ -268,7 +268,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as `` class WaitHoursSensor(BaseSensorOperator): start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", trigger_kwargs={"moment": timedelta(hours=1)}, next_method="execute_complete", next_kwargs=None, @@ -307,7 +307,7 @@ After the trigger has finished executing, the task may be sent back to the worke class WaitHoursSensor(BaseSensorOperator): start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", trigger_kwargs={"moment": timedelta(hours=1)}, next_method="execute_complete", next_kwargs=None, diff --git a/airflow/operators/branch.py b/providers/src/airflow/providers/standard/operators/branch.py similarity index 100% rename from airflow/operators/branch.py rename to providers/src/airflow/providers/standard/operators/branch.py diff --git a/airflow/operators/email.py b/providers/src/airflow/providers/standard/operators/email.py similarity index 100% rename from airflow/operators/email.py rename to providers/src/airflow/providers/standard/operators/email.py diff --git a/airflow/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py similarity index 100% rename from airflow/operators/latest_only.py rename to providers/src/airflow/providers/standard/operators/latest_only.py diff --git a/airflow/operators/trigger_dagrun.py b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py similarity index 99% rename from airflow/operators/trigger_dagrun.py rename to providers/src/airflow/providers/standard/operators/trigger_dagrun.py index bb1eac7c6963f..3e63311d227a7 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -40,7 +40,7 @@ from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.models.xcom import XCom -from airflow.triggers.external_task import DagStateTrigger +from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.utils import timezone from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session diff --git a/airflow/sensors/external_task.py b/providers/src/airflow/providers/standard/sensors/external_task.py similarity index 99% rename from airflow/sensors/external_task.py rename to providers/src/airflow/providers/standard/sensors/external_task.py index 331e17168bab7..16b892f90a804 100644 --- a/airflow/sensors/external_task.py +++ b/providers/src/airflow/providers/standard/sensors/external_task.py @@ -29,8 +29,8 @@ from airflow.models.dagbag import DagBag from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.external_task import WorkflowTrigger from airflow.utils.file import correct_maybe_zipped from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.sensor_helper import _get_count, _get_external_task_group_task_ids diff --git a/airflow/sensors/filesystem.py b/providers/src/airflow/providers/standard/sensors/filesystem.py similarity index 97% rename from airflow/sensors/filesystem.py rename to providers/src/airflow/providers/standard/sensors/filesystem.py index 4496f5d6abfa4..1d499e4435b2d 100644 --- a/airflow/sensors/filesystem.py +++ b/providers/src/airflow/providers/standard/sensors/filesystem.py @@ -26,9 +26,9 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.providers.standard.hooks.filesystem import FSHook +from airflow.providers.standard.triggers.file import FileTrigger from airflow.sensors.base import BaseSensorOperator from airflow.triggers.base import StartTriggerArgs -from airflow.triggers.file import FileTrigger if TYPE_CHECKING: from airflow.utils.context import Context @@ -63,7 +63,7 @@ class FileSensor(BaseSensorOperator): template_fields: Sequence[str] = ("filepath",) ui_color = "#91818a" start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.file.FileTrigger", + trigger_cls="airflow.providers.standard.triggers.file.FileTrigger", trigger_kwargs={}, next_method="execute_complete", next_kwargs=None, diff --git a/providers/src/airflow/providers/standard/triggers/__init__.py b/providers/src/airflow/providers/standard/triggers/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/providers/src/airflow/providers/standard/triggers/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/triggers/external_task.py b/providers/src/airflow/providers/standard/triggers/external_task.py similarity index 97% rename from airflow/triggers/external_task.py rename to providers/src/airflow/providers/standard/triggers/external_task.py index cd43d59876e9e..47c8aa54b6c4c 100644 --- a/airflow/triggers/external_task.py +++ b/providers/src/airflow/providers/standard/triggers/external_task.py @@ -78,7 +78,7 @@ def __init__( def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize the trigger param and module path.""" return ( - "airflow.triggers.external_task.WorkflowTrigger", + "airflow.providers.standard.triggers.external_task.WorkflowTrigger", { "external_dag_id": self.external_dag_id, "external_task_ids": self.external_task_ids, @@ -159,7 +159,7 @@ def __init__( def serialize(self) -> tuple[str, dict[str, typing.Any]]: """Serialize DagStateTrigger arguments and classpath.""" return ( - "airflow.triggers.external_task.DagStateTrigger", + "airflow.providers.standard.triggers.external_task.DagStateTrigger", { "dag_id": self.dag_id, "states": self.states, diff --git a/airflow/triggers/file.py b/providers/src/airflow/providers/standard/triggers/file.py similarity index 85% rename from airflow/triggers/file.py rename to providers/src/airflow/providers/standard/triggers/file.py index 5f40dd4d2de3f..f6a7715a035eb 100644 --- a/airflow/triggers/file.py +++ b/providers/src/airflow/providers/standard/triggers/file.py @@ -20,7 +20,6 @@ import datetime import os import typing -import warnings from glob import glob from typing import Any @@ -48,21 +47,12 @@ def __init__( super().__init__() self.filepath = filepath self.recursive = recursive - if kwargs.get("poll_interval") is not None: - warnings.warn( - "`poll_interval` has been deprecated and will be removed in future." - "Please use `poke_interval` instead.", - DeprecationWarning, - stacklevel=2, - ) - self.poke_interval: float = kwargs["poll_interval"] - else: - self.poke_interval = poke_interval + self.poke_interval = poke_interval def serialize(self) -> tuple[str, dict[str, Any]]: """Serialize FileTrigger arguments and classpath.""" return ( - "airflow.triggers.file.FileTrigger", + "airflow.providers.standard.triggers.file.FileTrigger", { "filepath": self.filepath, "recursive": self.recursive, diff --git a/airflow/triggers/temporal.py b/providers/src/airflow/providers/standard/triggers/temporal.py similarity index 100% rename from airflow/triggers/temporal.py rename to providers/src/airflow/providers/standard/triggers/temporal.py diff --git a/providers/tests/standard/triggers/__init__.py b/providers/tests/standard/triggers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/tests/standard/triggers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/triggers/test_external_task.py b/providers/tests/standard/triggers/test_external_task.py similarity index 93% rename from tests/triggers/test_external_task.py rename to providers/tests/standard/triggers/test_external_task.py index 202eafb0b9e3d..792d7ef7262f6 100644 --- a/tests/triggers/test_external_task.py +++ b/providers/tests/standard/triggers/test_external_task.py @@ -24,8 +24,8 @@ from airflow.models.dag import DAG from airflow.models.dagrun import DagRun +from airflow.providers.standard.triggers.external_task import DagStateTrigger, WorkflowTrigger from airflow.triggers.base import TriggerEvent -from airflow.triggers.external_task import DagStateTrigger, WorkflowTrigger from airflow.utils import timezone from airflow.utils.state import DagRunState @@ -37,7 +37,7 @@ class TestWorkflowTrigger: STATES = ["success", "fail"] @pytest.mark.flaky(reruns=5) - @mock.patch("airflow.triggers.external_task._get_count") + @mock.patch("airflow.providers.standard.triggers.external_task._get_count") @pytest.mark.asyncio async def test_task_workflow_trigger_success(self, mock_get_count): """check the db count get called correctly.""" @@ -70,7 +70,7 @@ async def test_task_workflow_trigger_success(self, mock_get_count): await gen.__anext__() @pytest.mark.flaky(reruns=5) - @mock.patch("airflow.triggers.external_task._get_count") + @mock.patch("airflow.providers.standard.triggers.external_task._get_count") @pytest.mark.asyncio async def test_task_workflow_trigger_failed(self, mock_get_count): mock_get_count.side_effect = mocked_get_count @@ -102,7 +102,7 @@ async def test_task_workflow_trigger_failed(self, mock_get_count): with pytest.raises(StopAsyncIteration): await gen.__anext__() - @mock.patch("airflow.triggers.external_task._get_count") + @mock.patch("airflow.providers.standard.triggers.external_task._get_count") @pytest.mark.asyncio async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count): mock_get_count.return_value = 0 @@ -133,7 +133,7 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count): await gen.__anext__() @pytest.mark.flaky(reruns=5) - @mock.patch("airflow.triggers.external_task._get_count") + @mock.patch("airflow.providers.standard.triggers.external_task._get_count") @pytest.mark.asyncio async def test_task_workflow_trigger_skipped(self, mock_get_count): mock_get_count.side_effect = mocked_get_count @@ -162,7 +162,7 @@ async def test_task_workflow_trigger_skipped(self, mock_get_count): states=["success", "fail"], ) - @mock.patch("airflow.triggers.external_task._get_count") + @mock.patch("airflow.providers.standard.triggers.external_task._get_count") @mock.patch("asyncio.sleep") @pytest.mark.asyncio async def test_task_workflow_trigger_sleep_success(self, mock_sleep, mock_get_count): @@ -203,7 +203,7 @@ def test_serialization(self): poke_interval=5, ) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.external_task.WorkflowTrigger" + assert classpath == "airflow.providers.standard.triggers.external_task.WorkflowTrigger" assert kwargs == { "external_dag_id": self.DAG_ID, "execution_dates": [timezone.datetime(2022, 1, 1)], @@ -271,7 +271,7 @@ def test_serialization(self): poll_interval=5, ) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.external_task.DagStateTrigger" + assert classpath == "airflow.providers.standard.triggers.external_task.DagStateTrigger" assert kwargs == { "dag_id": self.DAG_ID, "states": self.STATES, diff --git a/tests/triggers/test_file.py b/providers/tests/standard/triggers/test_file.py similarity index 93% rename from tests/triggers/test_file.py rename to providers/tests/standard/triggers/test_file.py index 6fb25dea3f00c..b73e16d764ff3 100644 --- a/tests/triggers/test_file.py +++ b/providers/tests/standard/triggers/test_file.py @@ -20,7 +20,7 @@ import pytest -from airflow.triggers.file import FileTrigger +from airflow.providers.standard.triggers.file import FileTrigger class TestFileTrigger: @@ -30,7 +30,7 @@ def test_serialization(self): """Asserts that the trigger correctly serializes its arguments and classpath.""" trigger = FileTrigger(filepath=self.FILE_PATH, poll_interval=5) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.file.FileTrigger" + assert classpath == "airflow.providers.standard.triggers.file.FileTrigger" assert kwargs == { "filepath": self.FILE_PATH, "poke_interval": 5, diff --git a/tests/triggers/test_temporal.py b/providers/tests/standard/triggers/test_temporal.py similarity index 92% rename from tests/triggers/test_temporal.py rename to providers/tests/standard/triggers/test_temporal.py index 90f00a694e5b3..f94066699a993 100644 --- a/tests/triggers/test_temporal.py +++ b/providers/tests/standard/triggers/test_temporal.py @@ -23,8 +23,8 @@ import pendulum import pytest +from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.base import TriggerEvent -from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import utcnow @@ -56,7 +56,7 @@ def test_datetime_trigger_serialization(): moment = pendulum.instance(datetime.datetime(2020, 4, 1, 13, 0), pendulum.UTC) trigger = DateTimeTrigger(moment) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.temporal.DateTimeTrigger" + assert classpath == "airflow.providers.standard.triggers.temporal.DateTimeTrigger" assert kwargs == {"moment": moment, "end_from_trigger": False} @@ -68,7 +68,7 @@ def test_timedelta_trigger_serialization(): trigger = TimeDeltaTrigger(datetime.timedelta(seconds=10)) expected_moment = timezone.utcnow() + datetime.timedelta(seconds=10) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.temporal.DateTimeTrigger" + assert classpath == "airflow.providers.standard.triggers.temporal.DateTimeTrigger" # We need to allow for a little time difference to avoid this test being # flaky if it runs over the boundary of a single second assert -2 < (kwargs["moment"] - expected_moment).total_seconds() < 2 @@ -113,8 +113,8 @@ async def test_datetime_trigger_timing(tz, end_from_trigger): assert result.payload == expected_payload -@mock.patch("airflow.triggers.temporal.timezone.utcnow") -@mock.patch("airflow.triggers.temporal.asyncio.sleep") +@mock.patch("airflow.providers.standard.triggers.temporal.timezone.utcnow") +@mock.patch("airflow.providers.standard.triggers.temporal.asyncio.sleep") @pytest.mark.asyncio async def test_datetime_trigger_mocked(mock_sleep, mock_utcnow): """ diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py index dae7733ec5c15..404f5ab6b890e 100644 --- a/scripts/cov/other_coverage.py +++ b/scripts/cov/other_coverage.py @@ -58,8 +58,8 @@ "airflow/dag_processing/manager.py", "airflow/dag_processing/processor.py", "airflow/triggers/base.py", - "airflow/triggers/external_task.py", - "airflow/triggers/file.py", + "providers/src/airflow/providers/standard/triggers/external_task.py", + "providers/src/airflow/providers/standard/triggers/file.py", "airflow/triggers/testing.py", ] diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 7acc3a85b9345..c210577fae276 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -40,8 +40,8 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.dag import _run_inline_trigger from airflow.models.serialized_dag import SerializedDagModel +from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.base import TriggerEvent -from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 1de3f7b68d394..8e513abbd3502 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -397,9 +397,9 @@ def test_cli_test_with_env_vars(self): assert "foo=bar" in output assert "AIRFLOW_TEST_MODE=True" in output - @mock.patch("airflow.triggers.file.os.path.getmtime", return_value=0) - @mock.patch("airflow.triggers.file.glob", return_value=["/tmp/test"]) - @mock.patch("airflow.triggers.file.os.path.isfile", return_value=True) + @mock.patch("airflow.providers.standard.triggers.file.os.path.getmtime", return_value=0) + @mock.patch("airflow.providers.standard.triggers.file.glob", return_value=["/tmp/test"]) + @mock.patch("airflow.providers.standard.triggers.file.os.path.isfile", return_value=True) @mock.patch("airflow.sensors.filesystem.FileSensor.poke", return_value=False) def test_cli_test_with_deferrable_operator( self, mock_pock, mock_is_file, mock_glob, mock_getmtime, caplog diff --git a/tests/dags/test_external_task_sensor_check_existense.py b/tests/dags/test_external_task_sensor_check_existense.py index 9de992c073b91..656f3760a248c 100644 --- a/tests/dags/test_external_task_sensor_check_existense.py +++ b/tests/dags/test_external_task_sensor_check_existense.py @@ -19,7 +19,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from tests.models import DEFAULT_DATE diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 939c210dfe75f..6d3d26dcb8dd7 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -37,8 +37,8 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator +from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.base import TriggerEvent -from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.testing import FailureTrigger, SuccessTrigger from airflow.utils import timezone from airflow.utils.log.logging_mixin import RedirectStdHandler diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 52a11d10e5e33..1836534be3b47 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -32,8 +32,8 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.settings import TracebackSessionForTests -from airflow.triggers.external_task import DagStateTrigger from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 43911c1a41d48..bec8e52508bff 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -39,12 +39,12 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.time import TimeSensor -from airflow.sensors.external_task import ( +from airflow.providers.standard.triggers.external_task import WorkflowTrigger +from airflow.providers.standard.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, ) from airflow.serialization.serialized_objects import SerializedBaseOperator -from airflow.triggers.external_task import WorkflowTrigger from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import create_session, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py index 641f2f218f2db..ce1742e42ee94 100644 --- a/tests/sensors/test_filesystem.py +++ b/tests/sensors/test_filesystem.py @@ -26,8 +26,8 @@ from airflow.exceptions import AirflowSensorTimeout, TaskDeferred from airflow.models.dag import DAG +from airflow.providers.standard.triggers.file import FileTrigger from airflow.sensors.filesystem import FileSensor -from airflow.triggers.file import FileTrigger from airflow.utils.timezone import datetime pytestmark = pytest.mark.db_test diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 0111cc669cd02..aeb6bc82ff6f5 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1505,7 +1505,7 @@ def test_deps_sorted(self): Tests serialize_operator, make sure the deps is in order """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor execution_date = datetime(2020, 1, 1) with DAG(dag_id="test_deps_sorted", schedule=None, start_date=execution_date) as dag: @@ -1620,7 +1620,7 @@ def test_derived_dag_deps_sensor(self): Tests DAG dependency detection for sensors, including derived classes """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor class DerivedSensor(ExternalTaskSensor): pass @@ -1651,7 +1651,7 @@ def test_dag_deps_assets_with_duplicate_asset(self): """ Check that dag_dependencies node is populated correctly for a DAG with duplicate assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") @@ -1740,7 +1740,7 @@ def test_dag_deps_assets(self): """ Check that dag_dependencies node is populated correctly for a DAG with assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") @@ -2198,7 +2198,7 @@ def test_start_trigger_args_in_serialized_dag(self): class TestOperator(BaseOperator): start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", trigger_kwargs={"delta": timedelta(seconds=1)}, next_method="execute_complete", next_kwargs=None, diff --git a/tests/system/core/example_external_task_parent_deferrable.py b/tests/system/core/example_external_task_parent_deferrable.py index 62bb1afc18b2e..3aa1b2c33b939 100644 --- a/tests/system/core/example_external_task_parent_deferrable.py +++ b/tests/system/core/example_external_task_parent_deferrable.py @@ -19,7 +19,7 @@ from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.timezone import datetime with DAG( From a750af6fec2a04ba5a347303fb4397a4e496d56e Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 1 Nov 2024 06:04:09 +0000 Subject: [PATCH 2/8] move left over operators/sensors tests inside standard provider --- .pre-commit-config.yaml | 5 ++- airflow/example_dags/example_dag_decorator.py | 2 +- airflow/example_dags/example_sensors.py | 2 +- airflow/serialization/serialized_objects.py | 6 ++-- docs/apache-airflow/core-concepts/dags.rst | 4 +-- .../core-concepts/operators.rst | 2 +- .../apache-airflow/core-concepts/taskflow.rst | 2 +- .../howto/operator/external_task_sensor.rst | 4 +-- docs/apache-airflow/howto/operator/file.rst | 2 +- .../operators-and-hooks-ref.rst | 10 +++--- docs/apache-airflow/tutorial/taskflow.rst | 2 +- generated/provider_dependencies.json | 1 + .../common/compat/standard/triggers.py | 31 +++++++++++++++++++ .../microsoft/azure/sensors/msgraph.py | 2 +- .../providers/standard/operators/datetime.py | 2 +- .../standard/operators/latest_only.py | 2 +- .../providers/standard/operators/python.py | 2 +- .../providers/standard/operators/weekday.py | 2 +- .../airflow/providers/standard/provider.yaml | 13 ++++++++ .../providers/standard/sensors/date_time.py | 2 +- .../providers/standard/sensors/time.py | 2 +- .../providers/standard/sensors/time_delta.py | 2 +- .../operators/test_branch_operator.py | 2 +- .../tests/standard}/operators/test_email.py | 2 +- .../operators/test_latest_only_operator.py | 0 .../operators/test_trigger_dagrun.py | 0 .../sensors/test_external_task_sensor.py | 10 +++--- .../standard}/sensors/test_filesystem.py | 2 +- providers/tests/standard/sensors/test_time.py | 2 +- tests/cli/commands/test_task_command.py | 2 +- tests/serialization/test_dag_serialization.py | 2 +- 31 files changed, 86 insertions(+), 38 deletions(-) create mode 100644 providers/src/airflow/providers/common/compat/standard/triggers.py rename {tests => providers/tests/standard}/operators/test_branch_operator.py (99%) rename {tests => providers/tests/standard}/operators/test_email.py (97%) rename {tests => providers/tests/standard}/operators/test_latest_only_operator.py (100%) rename {tests => providers/tests/standard}/operators/test_trigger_dagrun.py (100%) rename {tests => providers/tests/standard}/sensors/test_external_task_sensor.py (99%) rename {tests => providers/tests/standard}/sensors/test_filesystem.py (99%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 20c7a293bff06..2e226e95de4f3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -724,7 +724,10 @@ repos: files: > (?x) ^providers/src/airflow/providers/.*\.py$ - exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py + exclude: | + (?x) + ^.*/.*_vendor/| + ^providers/src/airflow/providers/standard/.*$ - id: check-get-lineage-collector-providers language: python name: Check providers import hook lineage code from compat diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index 447b4471b97e4..7ec31497a1fe7 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -24,7 +24,7 @@ from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator -from airflow.operators.email import EmailOperator +from airflow.providers.standard.operators.email import EmailOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/airflow/example_dags/example_sensors.py b/airflow/example_dags/example_sensors.py index 52b1b84e223e8..39d7b8d29635f 100644 --- a/airflow/example_dags/example_sensors.py +++ b/airflow/example_dags/example_sensors.py @@ -24,11 +24,11 @@ from airflow.models.dag import DAG from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor +from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.sensors.python import PythonSensor from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync from airflow.providers.standard.sensors.weekday import DayOfWeekSensor -from airflow.sensors.filesystem import FileSensor from airflow.utils.trigger_rule import TriggerRule from airflow.utils.weekday import WeekDay diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index ea2f225de36da..20e029621d0b3 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -118,10 +118,10 @@ log = logging.getLogger(__name__) _OPERATOR_EXTRA_LINKS: set[str] = { - "airflow.operators.trigger_dagrun.TriggerDagRunLink", - "airflow.sensors.external_task.ExternalDagLink", + "airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink", + "airflow.providers.standard.sensors.external_task.ExternalDagLink", # Deprecated names, so that existing serialized dags load straight away. - "airflow.sensors.external_task.ExternalTaskSensorLink", + "airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink", "airflow.operators.dagrun_operator.TriggerDagRunLink", "airflow.sensors.external_task_sensor.ExternalTaskSensorLink", } diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index ae4da4ab18c68..6ac6d874ade56 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -359,7 +359,7 @@ The ``@task.branch`` can also be used with XComs allowing branching context to d start_op >> branch_op >> [continue_op, stop_op] -If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``. +If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.providers.standard.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``. .. note:: The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.providers.standard.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator. @@ -765,7 +765,7 @@ While dependencies between tasks in a DAG are explicitly defined through upstrea relationships, dependencies between DAGs are a bit more complex. In general, there are two ways in which one DAG can depend on another: -- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator` +- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator` - waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG diff --git a/docs/apache-airflow/core-concepts/operators.rst b/docs/apache-airflow/core-concepts/operators.rst index e721faa97560f..7812006658eab 100644 --- a/docs/apache-airflow/core-concepts/operators.rst +++ b/docs/apache-airflow/core-concepts/operators.rst @@ -30,7 +30,7 @@ Airflow has a very extensive set of operators available, with some built-in to t - :class:`~airflow.providers.standard.operators.bash.BashOperator` - executes a bash command - :class:`~airflow.providers.standard.operators.python.PythonOperator` - calls an arbitrary Python function -- :class:`~airflow.operators.email.EmailOperator` - sends an email +- :class:`~airflow.providers.standard.operators.email.EmailOperator` - sends an email - Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments. .. note:: diff --git a/docs/apache-airflow/core-concepts/taskflow.rst b/docs/apache-airflow/core-concepts/taskflow.rst index b31494c155891..b090302888f3d 100644 --- a/docs/apache-airflow/core-concepts/taskflow.rst +++ b/docs/apache-airflow/core-concepts/taskflow.rst @@ -25,7 +25,7 @@ If you write most of your DAGs using plain Python code rather than Operators, th TaskFlow takes care of moving inputs and outputs between your Tasks using XComs for you, as well as automatically calculating dependencies - when you call a TaskFlow function in your DAG file, rather than executing it, you will get an object representing the XCom for the result (an ``XComArg``), that you can then use as inputs to downstream tasks or operators. For example:: from airflow.decorators import task - from airflow.operators.email import EmailOperator + from airflow.providers.standard.operators.email import EmailOperator @task def get_ip(): diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst index 7ddbc4b2417b8..812f3b6aab9b7 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst @@ -41,7 +41,7 @@ DAGs. ExternalTaskSensor ^^^^^^^^^^^^^^^^^^ -Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another task on a different DAG for a specific ``execution_date``. ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode: ExternalTaskSensor with task_group dependency --------------------------------------------- -In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another ``task_group`` on a different DAG for a specific ``execution_date``. .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py diff --git a/docs/apache-airflow/howto/operator/file.rst b/docs/apache-airflow/howto/operator/file.rst index 49ca1c75f6042..6241cc2e50533 100644 --- a/docs/apache-airflow/howto/operator/file.rst +++ b/docs/apache-airflow/howto/operator/file.rst @@ -22,7 +22,7 @@ FileSensor ========== -Use the :class:`~airflow.sensors.filesystem.FileSensor` to detect files appearing in your local +Use the :class:`~airflow.providers.standard.sensors.filesystem.FileSensor` to detect files appearing in your local filesystem. You need to have connection defined to use it (pass connection id via ``fs_conn_id``). Default connection is ``fs_default``. diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 026d206ea5f01..bf5ce84a82c9a 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -53,13 +53,13 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.providers.standard.operators.bash` - :doc:`How to use ` - * - :mod:`airflow.operators.branch` + * - :mod:`airflow.providers.standard.operators.branch` - * - :mod:`airflow.operators.empty` - - * - :mod:`airflow.operators.email` + * - :mod:`airflow.providers.standard.operators.email` - * - :mod:`airflow.operators.generic_transfer` @@ -71,7 +71,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.providers.standard.operators.python` - :doc:`How to use ` - * - :mod:`airflow.operators.trigger_dagrun` + * - :mod:`airflow.providers.standard.operators.trigger_dagrun` - **Sensors:** @@ -85,10 +85,10 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.providers.standard.sensors.bash` - :ref:`How to use ` - * - :mod:`airflow.sensors.external_task` + * - :mod:`airflow.providers.standard.sensors.external_task` - :doc:`How to use ` - * - :mod:`airflow.sensors.filesystem` + * - :mod:`airflow.providers.standard.sensors.filesystem` - :ref:`How to use ` * - :mod:`airflow.providers.standard.sensors.python` diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst index 72b5ac82dd049..0b6bf77316fb2 100644 --- a/docs/apache-airflow/tutorial/taskflow.rst +++ b/docs/apache-airflow/tutorial/taskflow.rst @@ -438,7 +438,7 @@ Adding dependencies between decorated and traditional tasks ----------------------------------------------------------- The above tutorial shows how to create dependencies between TaskFlow functions. However, dependencies can also be set between traditional tasks (such as :class:`~airflow.providers.standard.operators.bash.BashOperator` -or :class:`~airflow.sensors.filesystem.FileSensor`) and TaskFlow functions. +or :class:`~airflow.providers.standard.sensors.filesystem.FileSensor`) and TaskFlow functions. Building this dependency is shown in the code below: diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 058f7c00063e7..89c13386e2b71 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -846,6 +846,7 @@ "plugins": [], "cross-providers-deps": [ "amazon", + "common.compat", "google", "oracle", "sftp" diff --git a/providers/src/airflow/providers/common/compat/standard/triggers.py b/providers/src/airflow/providers/common/compat/standard/triggers.py new file mode 100644 index 0000000000000..1f7f524e8867c --- /dev/null +++ b/providers/src/airflow/providers/common/compat/standard/triggers.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger +else: + try: + from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger + except ModuleNotFoundError: + from airflow.triggers.temporal import TimeDeltaTrigger + + +__all__ = ["TimeDeltaTrigger"] diff --git a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py index 6736ea59c918d..23b8b1cde79a9 100644 --- a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py +++ b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py @@ -20,10 +20,10 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence from airflow.exceptions import AirflowException +from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook from airflow.providers.microsoft.azure.triggers.msgraph import MSGraphTrigger, ResponseSerializer from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import TimeDeltaTrigger if TYPE_CHECKING: from datetime import timedelta diff --git a/providers/src/airflow/providers/standard/operators/datetime.py b/providers/src/airflow/providers/standard/operators/datetime.py index 4455b84dd3bb9..ac6b01067d1b8 100644 --- a/providers/src/airflow/providers/standard/operators/datetime.py +++ b/providers/src/airflow/providers/standard/operators/datetime.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Iterable from airflow.exceptions import AirflowException -from airflow.operators.branch import BaseBranchOperator +from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.utils import timezone if TYPE_CHECKING: diff --git a/providers/src/airflow/providers/standard/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py index 6f2c0288d90d6..9983058ea2fe0 100644 --- a/providers/src/airflow/providers/standard/operators/latest_only.py +++ b/providers/src/airflow/providers/standard/operators/latest_only.py @@ -23,7 +23,7 @@ import pendulum -from airflow.operators.branch import BaseBranchOperator +from airflow.providers.standard.operators.branch import BaseBranchOperator if TYPE_CHECKING: from airflow.models import DAG, DagRun diff --git a/providers/src/airflow/providers/standard/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py index 4d908f6ba6809..b5d8aa5615d5b 100644 --- a/providers/src/airflow/providers/standard/operators/python.py +++ b/providers/src/airflow/providers/standard/operators/python.py @@ -50,7 +50,7 @@ from airflow.models.skipmixin import SkipMixin from airflow.models.taskinstance import _CURRENT_CONTEXT from airflow.models.variable import Variable -from airflow.operators.branch import BranchMixIn +from airflow.providers.standard.operators.branch import BranchMixIn from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script from airflow.settings import _ENABLE_AIP_44 from airflow.typing_compat import Literal diff --git a/providers/src/airflow/providers/standard/operators/weekday.py b/providers/src/airflow/providers/standard/operators/weekday.py index 0c74b19d795b4..ffa51c0c82c1b 100644 --- a/providers/src/airflow/providers/standard/operators/weekday.py +++ b/providers/src/airflow/providers/standard/operators/weekday.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Iterable -from airflow.operators.branch import BaseBranchOperator +from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.utils import timezone from airflow.utils.weekday import WeekDay diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index 426a3a67871ab..0ce56dde9023c 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -46,6 +46,10 @@ operators: - airflow.providers.standard.operators.bash - airflow.providers.standard.operators.python - airflow.providers.standard.operators.generic_transfer + - airflow.providers.standard.operators.branch + - airflow.providers.standard.operators.email + - airflow.providers.standard.operators.latest_only + - airflow.providers.standard.operators.trigger_dagrun sensors: - integration-name: Standard @@ -56,9 +60,18 @@ sensors: - airflow.providers.standard.sensors.weekday - airflow.providers.standard.sensors.bash - airflow.providers.standard.sensors.python + - airflow.providers.standard.sensors.external_task + - airflow.providers.standard.sensors.filesystem hooks: - integration-name: Standard python-modules: - airflow.providers.standard.hooks.filesystem - airflow.providers.standard.hooks.package_index - airflow.providers.standard.hooks.subprocess + +triggers: + - integration-name: Standard + python-modules: + - airflow.providers.standard.triggers.external_task + - airflow.providers.standard.triggers.file + - airflow.providers.standard.triggers.temporal diff --git a/providers/src/airflow/providers/standard/sensors/date_time.py b/providers/src/airflow/providers/standard/sensors/date_time.py index 20a6a484e05a4..ba02678d03167 100644 --- a/providers/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/src/airflow/providers/standard/sensors/date_time.py @@ -20,9 +20,9 @@ import datetime from typing import TYPE_CHECKING, Any, NoReturn, Sequence +from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator from airflow.triggers.base import StartTriggerArgs -from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone if TYPE_CHECKING: diff --git a/providers/src/airflow/providers/standard/sensors/time.py b/providers/src/airflow/providers/standard/sensors/time.py index 6dba2628fce35..684a5316be76c 100644 --- a/providers/src/airflow/providers/standard/sensors/time.py +++ b/providers/src/airflow/providers/standard/sensors/time.py @@ -20,9 +20,9 @@ import datetime from typing import TYPE_CHECKING, Any, NoReturn +from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sensors.base import BaseSensorOperator from airflow.triggers.base import StartTriggerArgs -from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone if TYPE_CHECKING: diff --git a/providers/src/airflow/providers/standard/sensors/time_delta.py b/providers/src/airflow/providers/standard/sensors/time_delta.py index dc78a0e33bc42..9a51b52dc8dc4 100644 --- a/providers/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/src/airflow/providers/standard/sensors/time_delta.py @@ -23,8 +23,8 @@ from airflow.configuration import conf from airflow.exceptions import AirflowSkipException +from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone if TYPE_CHECKING: diff --git a/tests/operators/test_branch_operator.py b/providers/tests/standard/operators/test_branch_operator.py similarity index 99% rename from tests/operators/test_branch_operator.py rename to providers/tests/standard/operators/test_branch_operator.py index 9df1b897b4270..ec8987d776723 100644 --- a/tests/operators/test_branch_operator.py +++ b/providers/tests/standard/operators/test_branch_operator.py @@ -22,8 +22,8 @@ import pytest from airflow.models.taskinstance import TaskInstance as TI -from airflow.operators.branch import BaseBranchOperator from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.task_group import TaskGroup diff --git a/tests/operators/test_email.py b/providers/tests/standard/operators/test_email.py similarity index 97% rename from tests/operators/test_email.py rename to providers/tests/standard/operators/test_email.py index df772beb95fdd..79c250c4773d2 100644 --- a/tests/operators/test_email.py +++ b/providers/tests/standard/operators/test_email.py @@ -22,7 +22,7 @@ import pytest -from airflow.operators.email import EmailOperator +from airflow.providers.standard.operators.email import EmailOperator from airflow.utils import timezone from tests_common.test_utils.config import conf_vars diff --git a/tests/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py similarity index 100% rename from tests/operators/test_latest_only_operator.py rename to providers/tests/standard/operators/test_latest_only_operator.py diff --git a/tests/operators/test_trigger_dagrun.py b/providers/tests/standard/operators/test_trigger_dagrun.py similarity index 100% rename from tests/operators/test_trigger_dagrun.py rename to providers/tests/standard/operators/test_trigger_dagrun.py diff --git a/tests/sensors/test_external_task_sensor.py b/providers/tests/standard/sensors/test_external_task_sensor.py similarity index 99% rename from tests/sensors/test_external_task_sensor.py rename to providers/tests/standard/sensors/test_external_task_sensor.py index bec8e52508bff..182decfea91b5 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/providers/tests/standard/sensors/test_external_task_sensor.py @@ -38,12 +38,12 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.python import PythonOperator -from airflow.providers.standard.sensors.time import TimeSensor -from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.providers.standard.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, ) +from airflow.providers.standard.sensors.time import TimeSensor +from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import create_session, provide_session @@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self): ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count") - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") def test_fail_poke( self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message ): @@ -991,7 +991,7 @@ def test_fail_poke( ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") @mock.patch("airflow.models.dagbag.DagBag.get_dag") @mock.patch("os.path.exists") @mock.patch("airflow.models.dag.DagModel.get_current") diff --git a/tests/sensors/test_filesystem.py b/providers/tests/standard/sensors/test_filesystem.py similarity index 99% rename from tests/sensors/test_filesystem.py rename to providers/tests/standard/sensors/test_filesystem.py index ce1742e42ee94..22432e90a92f9 100644 --- a/tests/sensors/test_filesystem.py +++ b/providers/tests/standard/sensors/test_filesystem.py @@ -26,8 +26,8 @@ from airflow.exceptions import AirflowSensorTimeout, TaskDeferred from airflow.models.dag import DAG +from airflow.providers.standard.sensors.filesystem import FileSensor from airflow.providers.standard.triggers.file import FileTrigger -from airflow.sensors.filesystem import FileSensor from airflow.utils.timezone import datetime pytestmark = pytest.mark.db_test diff --git a/providers/tests/standard/sensors/test_time.py b/providers/tests/standard/sensors/test_time.py index a144c3dc41de7..017b410eda4df 100644 --- a/providers/tests/standard/sensors/test_time.py +++ b/providers/tests/standard/sensors/test_time.py @@ -26,7 +26,7 @@ from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync -from airflow.triggers.temporal import DateTimeTrigger +from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.utils import timezone DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00 diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 8e513abbd3502..9e3ead771775a 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -400,7 +400,7 @@ def test_cli_test_with_env_vars(self): @mock.patch("airflow.providers.standard.triggers.file.os.path.getmtime", return_value=0) @mock.patch("airflow.providers.standard.triggers.file.glob", return_value=["/tmp/test"]) @mock.patch("airflow.providers.standard.triggers.file.os.path.isfile", return_value=True) - @mock.patch("airflow.sensors.filesystem.FileSensor.poke", return_value=False) + @mock.patch("airflow.providers.standard.sensors.filesystem.FileSensor.poke", return_value=False) def test_cli_test_with_deferrable_operator( self, mock_pock, mock_is_file, mock_glob, mock_getmtime, caplog ): diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index aeb6bc82ff6f5..4ed6f1e4df16d 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -2241,7 +2241,7 @@ def execute_complete(self): assert tasks[0]["__var"]["start_trigger_args"] == { "__type": "START_TRIGGER_ARGS", - "trigger_cls": "airflow.triggers.temporal.TimeDeltaTrigger", + "trigger_cls": "airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", # "trigger_kwargs": {"__type": "dict", "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}}, "trigger_kwargs": {"__type": "dict", "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}}, "next_method": "execute_complete", From 11bb47298005a8ce5514fbdbeb67d8b5473d545d Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 1 Nov 2024 16:04:37 +0000 Subject: [PATCH 3/8] fix imports in tests standard provider tests --- .../example_trigger_controller_dag.py | 2 +- airflow/models/dag.py | 2 +- airflow/serialization/serialized_objects.py | 2 +- .../providers/standard/sensors/date_time.py | 2 +- .../providers/standard/sensors/time.py | 2 +- .../providers/standard/triggers/temporal.py | 2 +- .../standard/operators/test_trigger_dagrun.py | 13 +++++++-- .../sensors/test_external_task_sensor.py | 29 +++++++++++++++---- tests/serialization/test_dag_serialization.py | 2 +- ...example_external_task_parent_deferrable.py | 2 +- 10 files changed, 41 insertions(+), 17 deletions(-) diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index e68e60c4b19a3..e546bd0a7e87d 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -26,7 +26,7 @@ import pendulum from airflow.models.dag import DAG -from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator with DAG( dag_id="example_trigger_controller_dag", diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f2d5ff74b62b8..b28f2e0a32f98 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -73,7 +73,7 @@ from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call -from airflow.assets import BaseAsset +from airflow.assets import Asset, AssetAlias, BaseAsset from airflow.configuration import conf as airflow_conf, secrets_backend_list from airflow.exceptions import ( AirflowException, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 20e029621d0b3..936a386e8c092 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1015,7 +1015,7 @@ class DependencyDetector: @staticmethod def detect_task_dependencies(task: Operator) -> list[DagDependency]: """Detect dependencies caused by tasks.""" - from airflow.operators.trigger_dagrun import TriggerDagRunOperator + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor deps = [] diff --git a/providers/src/airflow/providers/standard/sensors/date_time.py b/providers/src/airflow/providers/standard/sensors/date_time.py index ba02678d03167..50cc7b6aaaf59 100644 --- a/providers/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/src/airflow/providers/standard/sensors/date_time.py @@ -93,7 +93,7 @@ class DateTimeSensorAsync(DateTimeSensor): """ start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.DateTimeTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger", trigger_kwargs={"moment": "", "end_from_trigger": False}, next_method="execute_complete", next_kwargs=None, diff --git a/providers/src/airflow/providers/standard/sensors/time.py b/providers/src/airflow/providers/standard/sensors/time.py index 684a5316be76c..48ea8e6813a1d 100644 --- a/providers/src/airflow/providers/standard/sensors/time.py +++ b/providers/src/airflow/providers/standard/sensors/time.py @@ -68,7 +68,7 @@ class TimeSensorAsync(BaseSensorOperator): """ start_trigger_args = StartTriggerArgs( - trigger_cls="airflow.triggers.temporal.DateTimeTrigger", + trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger", trigger_kwargs={"moment": "", "end_from_trigger": False}, next_method="execute_complete", next_kwargs=None, diff --git a/providers/src/airflow/providers/standard/triggers/temporal.py b/providers/src/airflow/providers/standard/triggers/temporal.py index 64c3afe8162c3..44295cf1057dd 100644 --- a/providers/src/airflow/providers/standard/triggers/temporal.py +++ b/providers/src/airflow/providers/standard/triggers/temporal.py @@ -53,7 +53,7 @@ def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = False) def serialize(self) -> tuple[str, dict[str, Any]]: return ( - "airflow.triggers.temporal.DateTimeTrigger", + "airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment, "end_from_trigger": self.end_from_trigger}, ) diff --git a/providers/tests/standard/operators/test_trigger_dagrun.py b/providers/tests/standard/operators/test_trigger_dagrun.py index 1836534be3b47..2ddbe0b93570a 100644 --- a/providers/tests/standard/operators/test_trigger_dagrun.py +++ b/providers/tests/standard/operators/test_trigger_dagrun.py @@ -31,7 +31,7 @@ from airflow.models.log import Log from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance -from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.settings import TracebackSessionForTests from airflow.utils import timezone @@ -39,6 +39,8 @@ from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunType +from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS + pytestmark = pytest.mark.db_test DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc) @@ -76,7 +78,10 @@ def setup_method(self): def re_sync_triggered_dag_to_db(self, dag, dag_maker): TracebackSessionForTests.set_allow_db_access(dag_maker.session, True) dagbag = DagBag(self.f_name, read_dags_from_db=False, include_examples=False) - dagbag.bag_dag(dag) + if AIRFLOW_V_3_0_PLUS: + dagbag.bag_dag(dag, root_dag=dag) + else: + dagbag.bag_dag(dag) dagbag.sync_to_db(session=dag_maker.session) TracebackSessionForTests.set_allow_db_access(dag_maker.session, False) @@ -108,7 +113,9 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session): ) .one() ) - with mock.patch("airflow.operators.trigger_dagrun.build_airflow_url_with_query") as mock_build_url: + with mock.patch( + "airflow.providers.standard.operators.trigger_dagrun.build_airflow_url_with_query" + ) as mock_build_url: triggering_task.get_extra_links(triggering_ti, "Triggered DAG") assert mock_build_url.called args, _ = mock_build_url.call_args diff --git a/providers/tests/standard/sensors/test_external_task_sensor.py b/providers/tests/standard/sensors/test_external_task_sensor.py index 182decfea91b5..039ec7a0e569f 100644 --- a/providers/tests/standard/sensors/test_external_task_sensor.py +++ b/providers/tests/standard/sensors/test_external_task_sensor.py @@ -1218,7 +1218,10 @@ def dag_bag_ext(): task_a_3 >> task_b_3 for dag in [dag_0, dag_1, dag_2, dag_3]: - dag_bag.bag_dag(dag=dag) + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=dag) + else: + dag_bag.bag_dag(dag=dag, root_dag=dag) yield dag_bag @@ -1267,7 +1270,10 @@ def dag_bag_parent_child(): ) for dag in [dag_0, dag_1]: - dag_bag.bag_dag(dag=dag) + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=dag) + else: + dag_bag.bag_dag(dag=dag, root_dag=dag) yield dag_bag @@ -1546,8 +1552,13 @@ def dag_bag_multiple(): dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False) daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE, schedule="@daily") agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE, schedule="@daily") - dag_bag.bag_dag(dag=daily_dag) - dag_bag.bag_dag(dag=agg_dag) + + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=daily_dag) + dag_bag.bag_dag(dag=agg_dag) + else: + dag_bag.bag_dag(dag=daily_dag, root_dag=daily_dag) + dag_bag.bag_dag(dag=agg_dag, root_dag=agg_dag) daily_task = EmptyOperator(task_id="daily_tas", dag=daily_dag) @@ -1618,7 +1629,10 @@ def dag_bag_head_tail(): ) head >> body >> tail - dag_bag.bag_dag(dag=dag) + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=dag) + else: + dag_bag.bag_dag(dag=dag, root_dag=dag) return dag_bag @@ -1702,7 +1716,10 @@ def dummy_task(x: int): ) head >> body >> tail - dag_bag.bag_dag(dag=dag) + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=dag) + else: + dag_bag.bag_dag(dag=dag, root_dag=dag) return dag_bag diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 4ed6f1e4df16d..ddcffe8afe24f 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1805,7 +1805,7 @@ def test_derived_dag_deps_operator(self): Tests DAG dependency detection for operators, including derived classes """ from airflow.operators.empty import EmptyOperator - from airflow.operators.trigger_dagrun import TriggerDagRunOperator + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator class DerivedOperator(TriggerDagRunOperator): pass diff --git a/tests/system/core/example_external_task_parent_deferrable.py b/tests/system/core/example_external_task_parent_deferrable.py index 3aa1b2c33b939..ff003eee12a48 100644 --- a/tests/system/core/example_external_task_parent_deferrable.py +++ b/tests/system/core/example_external_task_parent_deferrable.py @@ -18,7 +18,7 @@ from airflow import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.timezone import datetime From f2b867ef0e83f2c099c30c967297b832c3d9b926 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 1 Nov 2024 23:22:29 +0000 Subject: [PATCH 4/8] fix latest only operator imports --- airflow/example_dags/example_latest_only.py | 2 +- airflow/example_dags/example_latest_only_with_trigger.py | 2 +- docs/apache-airflow/operators-and-hooks-ref.rst | 2 +- providers/tests/standard/operators/test_latest_only_operator.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 1e9906240f5af..2eac0819db18a 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -23,7 +23,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator with DAG( dag_id="latest_only", diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 44a5ed4274005..1b05d5726b644 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -28,7 +28,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index bf5ce84a82c9a..775c863b5c813 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -65,7 +65,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.operators.generic_transfer` - - * - :mod:`airflow.operators.latest_only` + * - :mod:`airflow.providers.standard.operators.latest_only` - * - :mod:`airflow.providers.standard.operators.python` diff --git a/providers/tests/standard/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py index fbc2cd5b46d8a..e7572f8ca0e5a 100644 --- a/providers/tests/standard/operators/test_latest_only_operator.py +++ b/providers/tests/standard/operators/test_latest_only_operator.py @@ -25,7 +25,7 @@ from airflow import settings from airflow.models import DagRun, TaskInstance from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule From 937724f32c9b52386218af73ded3fad673720c26 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 1 Nov 2024 23:27:55 +0000 Subject: [PATCH 5/8] remove depricated poll_interval parameter --- providers/tests/standard/triggers/test_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/tests/standard/triggers/test_file.py b/providers/tests/standard/triggers/test_file.py index b73e16d764ff3..baf0dffa80d0a 100644 --- a/providers/tests/standard/triggers/test_file.py +++ b/providers/tests/standard/triggers/test_file.py @@ -46,7 +46,7 @@ async def test_task_file_trigger(self, tmp_path): trigger = FileTrigger( filepath=str(p.resolve()), - poll_interval=0.2, + poke_interval=0.2, ) task = asyncio.create_task(trigger.run().__anext__()) From 6fc3b15b9dc25928c256224c15189984cdd06fb4 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Sat, 2 Nov 2024 05:13:50 +0000 Subject: [PATCH 6/8] fix email imports and trigger_dagrun operators imports --- .../src/airflow/providers/standard}/utils/email.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {airflow => providers/src/airflow/providers/standard}/utils/email.py (100%) diff --git a/airflow/utils/email.py b/providers/src/airflow/providers/standard/utils/email.py similarity index 100% rename from airflow/utils/email.py rename to providers/src/airflow/providers/standard/utils/email.py From 9cc841666991769bd327922a71d470342613c2fc Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Sat, 2 Nov 2024 05:30:00 +0000 Subject: [PATCH 7/8] fix email imports and trigger_dagrun operators imports --- airflow/models/taskinstance.py | 2 +- generated/provider_dependencies.json | 4 ++- .../airflow/providers/amazon/aws/hooks/ses.py | 2 +- .../providers/common/compat/standard/utils.py | 5 ++- .../providers/sendgrid/utils/emailer.py | 2 +- .../providers/standard/operators/email.py | 2 +- .../standard/operators/trigger_dagrun.py | 33 ++++++++++++++----- .../tests/standard/operators/test_email.py | 4 ++- .../standard/operators/test_trigger_dagrun.py | 5 +-- tests/utils/test_email.py | 2 +- 10 files changed, 42 insertions(+), 19 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index bb07ba6d848af..76480d8445ac7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -101,6 +101,7 @@ from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import LazyXComSelectSequence, XCom from airflow.plugins_manager import integrate_macros_plugins +from airflow.providers.standard.utils.email import send_email from airflow.sentry import Sentry from airflow.settings import task_instance_mutation_hook from airflow.stats import Stats @@ -118,7 +119,6 @@ context_get_outlet_events, context_merge, ) -from airflow.utils.email import send_email from airflow.utils.helpers import prune_dict, render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 89c13386e2b71..6301d4d946bbd 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1181,7 +1181,9 @@ ], "devel-deps": [], "plugins": [], - "cross-providers-deps": [], + "cross-providers-deps": [ + "common.compat" + ], "excluded-python-versions": [], "state": "ready" }, diff --git a/providers/src/airflow/providers/amazon/aws/hooks/ses.py b/providers/src/airflow/providers/amazon/aws/hooks/ses.py index a5ea06de35171..4396a89fe577b 100644 --- a/providers/src/airflow/providers/amazon/aws/hooks/ses.py +++ b/providers/src/airflow/providers/amazon/aws/hooks/ses.py @@ -21,7 +21,7 @@ from typing import Any, Iterable from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.utils.email import build_mime_message +from airflow.providers.common.compat.standard.utils import build_mime_message class SesHook(AwsBaseHook): diff --git a/providers/src/airflow/providers/common/compat/standard/utils.py b/providers/src/airflow/providers/common/compat/standard/utils.py index bfa263d1be946..15ad5a2a2406b 100644 --- a/providers/src/airflow/providers/common/compat/standard/utils.py +++ b/providers/src/airflow/providers/common/compat/standard/utils.py @@ -20,12 +20,15 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + from airflow.providers.standard.utils.email import build_mime_message, get_email_address_list from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script else: try: + from airflow.providers.standard.utils.email import build_mime_message, get_email_address_list from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script except ModuleNotFoundError: + from airflow.utils.email import build_mime_message, get_email_address_list from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script -__all__ = ["write_python_script", "prepare_virtualenv"] +__all__ = ["write_python_script", "prepare_virtualenv", "build_mime_message", "get_email_address_list"] diff --git a/providers/src/airflow/providers/sendgrid/utils/emailer.py b/providers/src/airflow/providers/sendgrid/utils/emailer.py index b82aad35db9b6..0396b064f028f 100644 --- a/providers/src/airflow/providers/sendgrid/utils/emailer.py +++ b/providers/src/airflow/providers/sendgrid/utils/emailer.py @@ -41,7 +41,7 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook -from airflow.utils.email import get_email_address_list +from airflow.providers.common.compat.standard.utils import get_email_address_list log = logging.getLogger(__name__) diff --git a/providers/src/airflow/providers/standard/operators/email.py b/providers/src/airflow/providers/standard/operators/email.py index 98af60afe4afc..3ed745916ab91 100644 --- a/providers/src/airflow/providers/standard/operators/email.py +++ b/providers/src/airflow/providers/standard/operators/email.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Sequence from airflow.models.baseoperator import BaseOperator -from airflow.utils.email import send_email +from airflow.providers.standard.utils.email import send_email if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py index 3e63311d227a7..920e2dda2c46e 100644 --- a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -22,9 +22,11 @@ import time from typing import TYPE_CHECKING, Any, Sequence, cast +from packaging.version import Version from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound +from airflow import __version__ as AIRFLOW_VERSION from airflow.api.common.trigger_dag import trigger_dag from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.configuration import conf @@ -45,12 +47,16 @@ from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session from airflow.utils.state import DagRunState -from airflow.utils.types import DagRunTriggeredByType, DagRunType XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso" XCOM_RUN_ID = "trigger_run_id" +AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= Version("3.0.0") + +if AIRFLOW_V_3_0_PLUS: + from airflow.utils.types import DagRunTriggeredByType, DagRunType + if TYPE_CHECKING: from sqlalchemy.orm.session import Session @@ -191,14 +197,23 @@ def execute(self, context: Context): run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date) try: - dag_run = trigger_dag( - dag_id=self.trigger_dag_id, - run_id=run_id, - conf=self.conf, - execution_date=parsed_logical_date, - replace_microseconds=False, - triggered_by=DagRunTriggeredByType.OPERATOR, - ) + if AIRFLOW_V_3_0_PLUS: + dag_run = trigger_dag( + dag_id=self.trigger_dag_id, + run_id=run_id, + conf=self.conf, + execution_date=parsed_logical_date, + replace_microseconds=False, + triggered_by=DagRunTriggeredByType.OPERATOR, + ) + else: + dag_run = trigger_dag( # type: ignore[call-arg] + dag_id=self.trigger_dag_id, + run_id=run_id, + conf=self.conf, + execution_date=parsed_logical_date, + replace_microseconds=False, + ) except DagRunAlreadyExists as e: if self.reset_dag_run: diff --git a/providers/tests/standard/operators/test_email.py b/providers/tests/standard/operators/test_email.py index 79c250c4773d2..dc796a709fd82 100644 --- a/providers/tests/standard/operators/test_email.py +++ b/providers/tests/standard/operators/test_email.py @@ -39,7 +39,9 @@ class TestEmailOperator: def test_execute(self, dag_maker): - with conf_vars({("email", "email_backend"): "tests.operators.test_email.send_email_test"}): + with conf_vars( + {("email", "email_backend"): "providers.tests.standard.operators.test_email.send_email_test"} + ): with dag_maker( "test_dag", default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, diff --git a/providers/tests/standard/operators/test_trigger_dagrun.py b/providers/tests/standard/operators/test_trigger_dagrun.py index 2ddbe0b93570a..276eb07798184 100644 --- a/providers/tests/standard/operators/test_trigger_dagrun.py +++ b/providers/tests/standard/operators/test_trigger_dagrun.py @@ -79,9 +79,10 @@ def re_sync_triggered_dag_to_db(self, dag, dag_maker): TracebackSessionForTests.set_allow_db_access(dag_maker.session, True) dagbag = DagBag(self.f_name, read_dags_from_db=False, include_examples=False) if AIRFLOW_V_3_0_PLUS: - dagbag.bag_dag(dag, root_dag=dag) - else: dagbag.bag_dag(dag) + else: + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db(session=dag_maker.session) TracebackSessionForTests.set_allow_db_access(dag_maker.session, False) diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py index 625f0f4b1a5da..2dc39ee4e4200 100644 --- a/tests/utils/test_email.py +++ b/tests/utils/test_email.py @@ -28,7 +28,7 @@ import pytest from airflow.configuration import conf -from airflow.utils import email +from airflow.providers.standard.utils import email from tests_common.test_utils.config import conf_vars From cef9b4dec3cfca685bdfd16f3a172988891ef14c Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Sat, 2 Nov 2024 07:01:17 +0000 Subject: [PATCH 8/8] fix test_email imports --- airflow/config_templates/config.yml | 4 ++-- docs/apache-airflow/howto/email-config.rst | 6 +++--- .../standard/operators/trigger_dagrun.py | 3 ++- .../sensors/test_external_task_sensor.py | 5 ++++- .../tests/standard}/utils/test_email.py | 18 +++++++++--------- tests_common/test_utils/compat.py | 2 ++ 6 files changed, 22 insertions(+), 16 deletions(-) rename {tests => providers/tests/standard}/utils/test_email.py (95%) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a14567cd00e17..c65485fb2b8e5 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2060,7 +2060,7 @@ email: version_added: ~ type: string example: ~ - default: "airflow.utils.email.send_email_smtp" + default: "airflow.providers.standard.utils.email.send_email_smtp" email_conn_id: description: Email connection to use version_added: 2.1.0 @@ -2126,7 +2126,7 @@ email: smtp: description: | If you want airflow to send emails on retries, failure, and you want to use - the airflow.utils.email.send_email_smtp function, you have to configure an + the airflow.providers.standard.utils.email.send_email_smtp function, you have to configure an smtp server here options: smtp_host: diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst index bc7ea9a1f9d04..5fe39a2ae347e 100644 --- a/docs/apache-airflow/howto/email-config.rst +++ b/docs/apache-airflow/howto/email-config.rst @@ -25,7 +25,7 @@ in the ``[email]`` section. .. code-block:: ini [email] - email_backend = airflow.utils.email.send_email_smtp + email_backend = airflow.providers.standard.utils.email.send_email_smtp subject_template = /path/to/my_subject_template_file html_content_template = /path/to/my_html_content_template_file @@ -33,7 +33,7 @@ Equivalent environment variables look like: .. code-block:: sh - AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp + AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.standard.utils.email.send_email_smtp AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file @@ -63,7 +63,7 @@ the example below. .. code-block:: bash $ airflow config get-value email email_backend - airflow.utils.email.send_email_smtp + airflow.providers.standard.utils.email.send_email_smtp To access the task's information you use `Jinja Templating `_ in your template files. diff --git a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py index 920e2dda2c46e..843b77ef1f3b0 100644 --- a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -47,6 +47,7 @@ from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso" XCOM_RUN_ID = "trigger_run_id" @@ -55,7 +56,7 @@ AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= Version("3.0.0") if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType, DagRunType + from airflow.utils.types import DagRunTriggeredByType if TYPE_CHECKING: from sqlalchemy.orm.session import Session diff --git a/providers/tests/standard/sensors/test_external_task_sensor.py b/providers/tests/standard/sensors/test_external_task_sensor.py index 039ec7a0e569f..69c4ec8963737 100644 --- a/providers/tests/standard/sensors/test_external_task_sensor.py +++ b/providers/tests/standard/sensors/test_external_task_sensor.py @@ -1500,7 +1500,10 @@ def _factory(depth: int) -> DagBag: task_a >> task_b for dag in dags: - dag_bag.bag_dag(dag=dag) + if AIRFLOW_V_3_0_PLUS: + dag_bag.bag_dag(dag=dag) + else: + dag_bag.bag_dag(dag=dag, root_dag=dag) return dag_bag diff --git a/tests/utils/test_email.py b/providers/tests/standard/utils/test_email.py similarity index 95% rename from tests/utils/test_email.py rename to providers/tests/standard/utils/test_email.py index 2dc39ee4e4200..4a0013f58362c 100644 --- a/tests/utils/test_email.py +++ b/providers/tests/standard/utils/test_email.py @@ -28,8 +28,8 @@ import pytest from airflow.configuration import conf -from airflow.providers.standard.utils import email +from tests_common.test_utils.compat import email from tests_common.test_utils.config import conf_vars EMAILS = ["test1@example.com", "test2@example.com"] @@ -81,17 +81,17 @@ def test_get_email_address_invalid_type_in_iterable(self): with pytest.raises(TypeError): email.get_email_address_list(emails_list) - @mock.patch("airflow.utils.email.send_email") + @mock.patch("airflow.providers.standard.utils.email.send_email") def test_default_backend(self, mock_send_email): res = email.send_email("to", "subject", "content") mock_send_email.assert_called_once_with("to", "subject", "content") assert mock_send_email.return_value == res - @mock.patch("airflow.utils.email.send_email_smtp") + @mock.patch("airflow.providers.standard.utils.email.send_email_smtp") def test_custom_backend(self, mock_send_email): with conf_vars( { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", + ("email", "email_backend"): "providers.tests.standard.utils.test_email.send_email_test", ("email", "email_conn_id"): "smtp_default", } ): @@ -112,10 +112,10 @@ def test_custom_backend(self, mock_send_email): ) assert not mock_send_email.called - @mock.patch("airflow.utils.email.send_email_smtp") + @mock.patch("airflow.providers.standard.utils.email.send_email_smtp") @conf_vars( { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", + ("email", "email_backend"): "providers.tests.standard.utils.test_email.send_email_test", ("email", "email_conn_id"): "smtp_default", ("email", "from_email"): "from@test.com", } @@ -158,7 +158,7 @@ def setup_test_cases(self, monkeypatch): json.dumps({"conn_type": "smtp", "login": "user", "password": "p@$$word"}), ) - @mock.patch("airflow.utils.email.send_mime_email") + @mock.patch("airflow.providers.standard.utils.email.send_mime_email") def test_send_smtp(self, mock_send_mime, tmp_path): path = tmp_path / "testfile" path.write_text("attachment") @@ -176,7 +176,7 @@ def test_send_smtp(self, mock_send_mime, tmp_path): mimeapp = MIMEApplication("attachment") assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload() - @mock.patch("airflow.utils.email.send_mime_email") + @mock.patch("airflow.providers.standard.utils.email.send_mime_email") def test_send_smtp_with_multibyte_content(self, mock_send_mime): email.send_email_smtp("to", "subject", "🔥", mime_charset="utf-8") assert mock_send_mime.called @@ -185,7 +185,7 @@ def test_send_smtp_with_multibyte_content(self, mock_send_mime): mimetext = MIMEText("🔥", "mixed", "utf-8") assert mimetext.get_payload() == msg.get_payload()[0].get_payload() - @mock.patch("airflow.utils.email.send_mime_email") + @mock.patch("airflow.providers.standard.utils.email.send_mime_email") def test_send_bcc_smtp(self, mock_send_mime, tmp_path): path = tmp_path / "testfile" path.write_text("attachment") diff --git a/tests_common/test_utils/compat.py b/tests_common/test_utils/compat.py index 6f788c30dcca2..09be782de8f5f 100644 --- a/tests_common/test_utils/compat.py +++ b/tests_common/test_utils/compat.py @@ -56,6 +56,7 @@ from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.bash import BashSensor from airflow.providers.standard.sensors.date_time import DateTimeSensor + from airflow.providers.standard.utils import email from airflow.providers.standard.utils.python_virtualenv import write_python_script except ImportError: # Compatibility for Airflow < 2.10.* @@ -64,6 +65,7 @@ from airflow.operators.python import PythonOperator # type: ignore[no-redef,attr-defined] from airflow.sensors.bash import BashSensor # type: ignore[no-redef,attr-defined] from airflow.sensors.date_time import DateTimeSensor # type: ignore[no-redef,attr-defined] + from airflow.utils import email # type: ignore[no-redef,attr-defined] from airflow.utils.python_virtualenv import write_python_script # type: ignore[no-redef,attr-defined]