diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a7be03edde142..326ba2b1cb8f8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -699,7 +699,7 @@ repos: ^airflow/hooks/.*$| ^airflow/operators/.*$| ^providers/src/airflow/providers/.*$| - ^airflow/sensors/.*$| + ^providers/src/airflow/providers/standard/sensors/.*$| ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep @@ -714,7 +714,7 @@ repos: ^airflow/hooks/.*$| ^airflow/operators/.*$| ^providers/src/airflow/providers/.*$| - ^airflow/sensors/.*$| + ^providers/src/airflow/providers/standard/sensors/.*$| ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep @@ -725,7 +725,7 @@ repos: files: > (?x) ^providers/src/airflow/providers/.*\.py$ - exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py + exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py|providers/src/airflow/providers/standard/sensors/external_task.py - id: check-get-lineage-collector-providers language: python name: Check providers import hook lineage code from compat diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index abaf21770523a..0a282a834195c 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -44,7 +44,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor start_date = pendulum.datetime(2021, 1, 1, tz="UTC") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f8829cc6c28f9..34ed6694a0695 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1086,7 +1086,7 @@ def _get_task_instances( if include_dependent_dags: # Recursively find external tasks indicated by ExternalTaskMarker - from airflow.sensors.external_task import ExternalTaskMarker + from airflow.providers.standard.sensors.external_task import ExternalTaskMarker query = tis if as_pk_tuple: diff --git a/airflow/reproducible_build.yaml b/airflow/reproducible_build.yaml index 1c13b028bb4f0..5f2d030fd0b63 100644 --- a/airflow/reproducible_build.yaml +++ b/airflow/reproducible_build.yaml @@ -1,2 +1,2 @@ release-notes-hash: c68f3fa23f84c7fc270d73baaa2cc18d -source-date-epoch: 1731415143 +source-date-epoch: 1732690252 diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index e51836036814d..2e1a5cd14f919 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -121,11 +121,11 @@ _OPERATOR_EXTRA_LINKS: set[str] = { "airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink", - "airflow.sensors.external_task.ExternalDagLink", + "airflow.providers.standard.sensors.external_task.ExternalDagLink", # Deprecated names, so that existing serialized dags load straight away. - "airflow.sensors.external_task.ExternalTaskSensorLink", + "airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink", "airflow.operators.dagrun_operator.TriggerDagRunLink", - "airflow.sensors.external_task_sensor.ExternalTaskSensorLink", + "airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensorLink", } @@ -1022,7 +1022,7 @@ class DependencyDetector: def detect_task_dependencies(task: Operator) -> list[DagDependency]: """Detect dependencies caused by tasks.""" from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor deps = [] if isinstance(task, TriggerDagRunOperator): diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow-providers-standard/sensors/external_task_sensor.rst similarity index 94% rename from docs/apache-airflow/howto/operator/external_task_sensor.rst rename to docs/apache-airflow-providers-standard/sensors/external_task_sensor.rst index 7ddbc4b2417b8..812f3b6aab9b7 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow-providers-standard/sensors/external_task_sensor.rst @@ -41,7 +41,7 @@ DAGs. ExternalTaskSensor ^^^^^^^^^^^^^^^^^^ -Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another task on a different DAG for a specific ``execution_date``. ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode: ExternalTaskSensor with task_group dependency --------------------------------------------- -In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another ``task_group`` on a different DAG for a specific ``execution_date``. .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index 98823fbc85ad7..b76b1525be82f 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -766,7 +766,7 @@ relationships, dependencies between DAGs are a bit more complex. In general, the in which one DAG can depend on another: - triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator` -- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` +- waiting - :class:`~airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor` Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG with different data intervals. The **Dag Dependencies** view diff --git a/docs/apache-airflow/howto/operator/index.rst b/docs/apache-airflow/howto/operator/index.rst index 5dcda247522ca..45b538874363e 100644 --- a/docs/apache-airflow/howto/operator/index.rst +++ b/docs/apache-airflow/howto/operator/index.rst @@ -28,5 +28,3 @@ determine what actually executes when your DAG runs. .. toctree:: :maxdepth: 2 - - external_task_sensor diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 99f0bb17f93f0..7c238307e587d 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -76,9 +76,8 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - Sensors - Guides - * - :mod:`airflow.sensors.external_task` - - :doc:`How to use ` - + * - :mod:`airflow.sensors.base` + - **Hooks:** diff --git a/newsfragments/41391.significant.rst b/newsfragments/41391.significant.rst index 6ab26848f0657..0513aa3ab93b3 100644 --- a/newsfragments/41391.significant.rst +++ b/newsfragments/41391.significant.rst @@ -1,5 +1,5 @@ **Breaking Change** -The ``airflow.sensors.external_task.ExternalTaskSensorLink`` class has been removed. +The ``airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink`` class has been removed. This class was deprecated and is no longer available. Users should now use -the ``airflow.sensors.external_task.ExternalDagLink`` class directly. +the ``airflow.providers.standard.sensors.external_task.ExternalDagLink`` class directly. diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index eea8991e2526e..a58ef13a3acc6 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -61,6 +61,7 @@ sensors: - airflow.providers.standard.sensors.bash - airflow.providers.standard.sensors.python - airflow.providers.standard.sensors.filesystem + - airflow.providers.standard.sensors.external_task hooks: - integration-name: Standard python-modules: diff --git a/airflow/sensors/external_task.py b/providers/src/airflow/providers/standard/sensors/external_task.py similarity index 100% rename from airflow/sensors/external_task.py rename to providers/src/airflow/providers/standard/sensors/external_task.py diff --git a/tests/dags/test_external_task_sensor_check_existense.py b/tests/dags/test_external_task_sensor_check_existense.py index 9de992c073b91..656f3760a248c 100644 --- a/tests/dags/test_external_task_sensor_check_existense.py +++ b/tests/dags/test_external_task_sensor_check_existense.py @@ -19,7 +19,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from tests.models import DEFAULT_DATE diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index e47e88d62ef70..c67b757eaadb1 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -38,12 +38,12 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.python import PythonOperator -from airflow.providers.standard.sensors.time import TimeSensor -from airflow.providers.standard.triggers.external_task import WorkflowTrigger -from airflow.sensors.external_task import ( +from airflow.providers.standard.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, ) +from airflow.providers.standard.sensors.time import TimeSensor +from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import NEW_SESSION, create_session, provide_session @@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self): ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count") - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") def test_fail_poke( self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message ): @@ -991,7 +991,7 @@ def test_fail_poke( ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") @mock.patch("airflow.models.dagbag.DagBag.get_dag") @mock.patch("os.path.exists") @mock.patch("airflow.models.dag.DagModel.get_current") diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 555124f65b049..5dbe6c968ab6f 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1511,7 +1511,7 @@ def test_deps_sorted(self): Tests serialize_operator, make sure the deps is in order """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor logical_date = datetime(2020, 1, 1) with DAG(dag_id="test_deps_sorted", schedule=None, start_date=logical_date) as dag: @@ -1626,7 +1626,7 @@ def test_derived_dag_deps_sensor(self): Tests DAG dependency detection for sensors, including derived classes """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor class DerivedSensor(ExternalTaskSensor): pass @@ -1657,7 +1657,7 @@ def test_dag_deps_assets_with_duplicate_asset(self): """ Check that dag_dependencies node is populated correctly for a DAG with duplicate assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") @@ -1746,7 +1746,7 @@ def test_dag_deps_assets(self): """ Check that dag_dependencies node is populated correctly for a DAG with assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") diff --git a/tests/system/core/example_external_task_parent_deferrable.py b/tests/system/core/example_external_task_parent_deferrable.py index 860cec5d0896f..ff003eee12a48 100644 --- a/tests/system/core/example_external_task_parent_deferrable.py +++ b/tests/system/core/example_external_task_parent_deferrable.py @@ -19,7 +19,7 @@ from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.timezone import datetime with DAG(