Skip to content
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ repos:
^airflow/hooks/.*$|
^airflow/operators/.*$|
^providers/src/airflow/providers/.*$|
^airflow/sensors/.*$|
^providers/src/airflow/providers/standard/sensors/.*$|
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
Expand All @@ -714,7 +714,7 @@ repos:
^airflow/hooks/.*$|
^airflow/operators/.*$|
^providers/src/airflow/providers/.*$|
^airflow/sensors/.*$|
^providers/src/airflow/providers/standard/sensors/.*$|
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
Expand All @@ -725,7 +725,7 @@ repos:
files: >
(?x)
^providers/src/airflow/providers/.*\.py$
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py|providers/src/airflow/providers/standard/sensors/external_task.py
- id: check-get-lineage-collector-providers
language: python
name: Check providers import hook lineage code from compat
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_external_task_marker_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = pendulum.datetime(2021, 1, 1, tz="UTC")

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ def _get_task_instances(

if include_dependent_dags:
# Recursively find external tasks indicated by ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker

query = tis
if as_pk_tuple:
Expand Down
2 changes: 1 addition & 1 deletion airflow/reproducible_build.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release-notes-hash: c68f3fa23f84c7fc270d73baaa2cc18d
source-date-epoch: 1731415143
source-date-epoch: 1732690252
8 changes: 4 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@

_OPERATOR_EXTRA_LINKS: set[str] = {
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.sensors.external_task.ExternalDagLink",
"airflow.providers.standard.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
"airflow.sensors.external_task.ExternalTaskSensorLink",
"airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink",
"airflow.operators.dagrun_operator.TriggerDagRunLink",
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
"airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensorLink",
}


Expand Down Expand Up @@ -1022,7 +1022,7 @@ class DependencyDetector:
def detect_task_dependencies(task: Operator) -> list[DagDependency]:
"""Detect dependencies caused by tasks."""
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

deps = []
if isinstance(task, TriggerDagRunOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ DAGs.
ExternalTaskSensor
^^^^^^^^^^^^^^^^^^

Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.

ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
Expand All @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode:

ExternalTaskSensor with task_group dependency
---------------------------------------------
In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another ``task_group`` on a different DAG for a specific ``execution_date``.

.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ relationships, dependencies between DAGs are a bit more complex. In general, the
in which one DAG can depend on another:

- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`
- waiting - :class:`~airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor`

Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
with different data intervals. The **Dag Dependencies** view
Expand Down
2 changes: 0 additions & 2 deletions docs/apache-airflow/howto/operator/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,3 @@ determine what actually executes when your DAG runs.

.. toctree::
:maxdepth: 2

external_task_sensor
5 changes: 2 additions & 3 deletions docs/apache-airflow/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - Sensors
- Guides

* - :mod:`airflow.sensors.external_task`
- :doc:`How to use <howto/operator/external_task_sensor>`

* - :mod:`airflow.sensors.base`
-

**Hooks:**

Expand Down
4 changes: 2 additions & 2 deletions newsfragments/41391.significant.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
**Breaking Change**

The ``airflow.sensors.external_task.ExternalTaskSensorLink`` class has been removed.
The ``airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink`` class has been removed.
This class was deprecated and is no longer available. Users should now use
the ``airflow.sensors.external_task.ExternalDagLink`` class directly.
the ``airflow.providers.standard.sensors.external_task.ExternalDagLink`` class directly.
1 change: 1 addition & 0 deletions providers/src/airflow/providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ sensors:
- airflow.providers.standard.sensors.bash
- airflow.providers.standard.sensors.python
- airflow.providers.standard.sensors.filesystem
- airflow.providers.standard.sensors.external_task
hooks:
- integration-name: Standard
python-modules:
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_external_task_sensor_check_existense.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

from tests.models import DEFAULT_DATE

Expand Down
12 changes: 6 additions & 6 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.sensors.external_task import (
from airflow.providers.standard.sensors.external_task import (
ExternalTaskMarker,
ExternalTaskSensor,
)
from airflow.providers.standard.sensors.time import TimeSensor
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import NEW_SESSION, create_session, provide_session
Expand Down Expand Up @@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self):
),
),
)
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count")
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
def test_fail_poke(
self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message
):
Expand Down Expand Up @@ -991,7 +991,7 @@ def test_fail_poke(
),
),
)
@mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.models.dagbag.DagBag.get_dag")
@mock.patch("os.path.exists")
@mock.patch("airflow.models.dag.DagModel.get_current")
Expand Down
8 changes: 4 additions & 4 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ def test_deps_sorted(self):
Tests serialize_operator, make sure the deps is in order
"""
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

logical_date = datetime(2020, 1, 1)
with DAG(dag_id="test_deps_sorted", schedule=None, start_date=logical_date) as dag:
Expand Down Expand Up @@ -1626,7 +1626,7 @@ def test_derived_dag_deps_sensor(self):
Tests DAG dependency detection for sensors, including derived classes
"""
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

class DerivedSensor(ExternalTaskSensor):
pass
Expand Down Expand Up @@ -1657,7 +1657,7 @@ def test_dag_deps_assets_with_duplicate_asset(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with duplicate assets.
"""
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

d1 = Asset("d1")
d2 = Asset("d2")
Expand Down Expand Up @@ -1746,7 +1746,7 @@ def test_dag_deps_assets(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with assets.
"""
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

d1 = Asset("d1")
d2 = Asset("d2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from airflow.utils.timezone import datetime

with DAG(
Expand Down