From 8fe94bbe93abed94b1fc11f062ef620424de33dd Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Fri, 22 Nov 2024 20:13:50 +0530 Subject: [PATCH 1/6] Moved external_task sensor to standard provider along with all references --- .pre-commit-config.yaml | 4 ++-- RELEASE_NOTES.rst | 8 ++++---- .../example_dags/example_external_task_marker_dag.py | 2 +- airflow/models/dag.py | 2 +- airflow/reproducible_build.yaml | 4 ++-- airflow/serialization/serialized_objects.py | 8 ++++---- docs/apache-airflow/core-concepts/dags.rst | 2 +- .../howto/operator/external_task_sensor.rst | 4 ++-- docs/apache-airflow/howto/operator/index.rst | 2 -- docs/apache-airflow/operators-and-hooks-ref.rst | 3 --- newsfragments/41391.significant.rst | 4 ++-- providers/src/airflow/providers/standard/provider.yaml | 1 + .../providers/standard}/sensors/external_task.py | 0 .../dags/test_external_task_sensor_check_existense.py | 2 +- tests/sensors/test_external_task_sensor.py | 10 +++++----- tests/serialization/test_dag_serialization.py | 8 ++++---- .../core/example_external_task_parent_deferrable.py | 2 +- 17 files changed, 31 insertions(+), 35 deletions(-) rename {airflow => providers/src/airflow/providers/standard}/sensors/external_task.py (100%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d072d21055cff..ee4d9b317ff57 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -699,7 +699,7 @@ repos: ^airflow/hooks/.*$| ^airflow/operators/.*$| ^providers/src/airflow/providers/.*$| - ^airflow/sensors/.*$| + ^providers/src/airflow/providers/standard/sensors/.*$| ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep @@ -714,7 +714,7 @@ repos: ^airflow/hooks/.*$| ^airflow/operators/.*$| ^providers/src/airflow/providers/.*$| - ^airflow/sensors/.*$| + ^providers/src/airflow/providers/standard/sensors/.*$| ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index a8b5a265f3afa..54f73b4bd8d35 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -6450,10 +6450,10 @@ The following table shows changes in import paths. - ``airflow.sensors.base.BaseSensorOperator`` * - ``airflow.sensors.date_time_sensor.DateTimeSensor`` - ``airflow.sensors.date_time.DateTimeSensor`` - * - ``airflow.sensors.external_task_sensor.ExternalTaskMarker`` - - ``airflow.sensors.external_task.ExternalTaskMarker`` - * - ``airflow.sensors.external_task_sensor.ExternalTaskSensor`` - - ``airflow.sensors.external_task.ExternalTaskSensor`` + * - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskMarker`` + - ``airflow.providers.standard.sensors.external_task.ExternalTaskMarker`` + * - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor`` + - ``airflow.providers.standard.sensors.external_task.ExternalTaskSensor`` * - ``airflow.sensors.sql_sensor.SqlSensor`` - ``airflow.sensors.sql.SqlSensor`` * - ``airflow.sensors.time_delta_sensor.TimeDeltaSensor`` diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index abaf21770523a..0a282a834195c 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -44,7 +44,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor start_date = pendulum.datetime(2021, 1, 1, tz="UTC") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8177025c2d8ed..068a32815b1dd 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1089,7 +1089,7 @@ def _get_task_instances( if include_dependent_dags: # Recursively find external tasks indicated by ExternalTaskMarker - from airflow.sensors.external_task import ExternalTaskMarker + from airflow.providers.standard.sensors.external_task import ExternalTaskMarker query = tis if as_pk_tuple: diff --git a/airflow/reproducible_build.yaml b/airflow/reproducible_build.yaml index 1c13b028bb4f0..ef6f2621343e2 100644 --- a/airflow/reproducible_build.yaml +++ b/airflow/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: c68f3fa23f84c7fc270d73baaa2cc18d -source-date-epoch: 1731415143 +release-notes-hash: a04c0a95a5b972498a29924ff4b53216 +source-date-epoch: 1732286028 diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 61d851aaed118..2c63feef2f6b8 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -120,11 +120,11 @@ _OPERATOR_EXTRA_LINKS: set[str] = { "airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink", - "airflow.sensors.external_task.ExternalDagLink", + "airflow.providers.standard.sensors.external_task.ExternalDagLink", # Deprecated names, so that existing serialized dags load straight away. - "airflow.sensors.external_task.ExternalTaskSensorLink", + "airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink", "airflow.operators.dagrun_operator.TriggerDagRunLink", - "airflow.sensors.external_task_sensor.ExternalTaskSensorLink", + "airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensorLink", } @@ -1021,7 +1021,7 @@ class DependencyDetector: def detect_task_dependencies(task: Operator) -> list[DagDependency]: """Detect dependencies caused by tasks.""" from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor deps = [] if isinstance(task, TriggerDagRunOperator): diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index 98823fbc85ad7..b76b1525be82f 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -766,7 +766,7 @@ relationships, dependencies between DAGs are a bit more complex. In general, the in which one DAG can depend on another: - triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator` -- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` +- waiting - :class:`~airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor` Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG with different data intervals. The **Dag Dependencies** view diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst index 7ddbc4b2417b8..812f3b6aab9b7 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst @@ -41,7 +41,7 @@ DAGs. ExternalTaskSensor ^^^^^^^^^^^^^^^^^^ -Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another task on a different DAG for a specific ``execution_date``. ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode: ExternalTaskSensor with task_group dependency --------------------------------------------- -In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG +In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG wait for another ``task_group`` on a different DAG for a specific ``execution_date``. .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py diff --git a/docs/apache-airflow/howto/operator/index.rst b/docs/apache-airflow/howto/operator/index.rst index 5dcda247522ca..45b538874363e 100644 --- a/docs/apache-airflow/howto/operator/index.rst +++ b/docs/apache-airflow/howto/operator/index.rst @@ -28,5 +28,3 @@ determine what actually executes when your DAG runs. .. toctree:: :maxdepth: 2 - - external_task_sensor diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 99f0bb17f93f0..f5478593e8d08 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -76,9 +76,6 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - Sensors - Guides - * - :mod:`airflow.sensors.external_task` - - :doc:`How to use ` - **Hooks:** diff --git a/newsfragments/41391.significant.rst b/newsfragments/41391.significant.rst index 6ab26848f0657..0513aa3ab93b3 100644 --- a/newsfragments/41391.significant.rst +++ b/newsfragments/41391.significant.rst @@ -1,5 +1,5 @@ **Breaking Change** -The ``airflow.sensors.external_task.ExternalTaskSensorLink`` class has been removed. +The ``airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink`` class has been removed. This class was deprecated and is no longer available. Users should now use -the ``airflow.sensors.external_task.ExternalDagLink`` class directly. +the ``airflow.providers.standard.sensors.external_task.ExternalDagLink`` class directly. diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index d65d944b21953..aa0bfdee23569 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -60,6 +60,7 @@ sensors: - airflow.providers.standard.sensors.bash - airflow.providers.standard.sensors.python - airflow.providers.standard.sensors.filesystem + - airflow.providers.standard.sensors.external_task hooks: - integration-name: Standard python-modules: diff --git a/airflow/sensors/external_task.py b/providers/src/airflow/providers/standard/sensors/external_task.py similarity index 100% rename from airflow/sensors/external_task.py rename to providers/src/airflow/providers/standard/sensors/external_task.py diff --git a/tests/dags/test_external_task_sensor_check_existense.py b/tests/dags/test_external_task_sensor_check_existense.py index 9de992c073b91..656f3760a248c 100644 --- a/tests/dags/test_external_task_sensor_check_existense.py +++ b/tests/dags/test_external_task_sensor_check_existense.py @@ -19,7 +19,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from tests.models import DEFAULT_DATE diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index e03ceeed01960..f38458c0b8fd7 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -38,11 +38,11 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.python import PythonOperator -from airflow.providers.standard.sensors.time import TimeSensor -from airflow.sensors.external_task import ( +from airflow.providers.standard.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, ) +from airflow.providers.standard.sensors.time import TimeSensor from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.triggers.external_task import WorkflowTrigger from airflow.utils.hashlib_wrapper import md5 @@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self): ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count") - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") def test_fail_poke( self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message ): @@ -991,7 +991,7 @@ def test_fail_poke( ), ), ) - @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter") + @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter") @mock.patch("airflow.models.dagbag.DagBag.get_dag") @mock.patch("os.path.exists") @mock.patch("airflow.models.dag.DagModel.get_current") diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index ced00fd65ab62..87073d26522f5 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1506,7 +1506,7 @@ def test_deps_sorted(self): Tests serialize_operator, make sure the deps is in order """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor logical_date = datetime(2020, 1, 1) with DAG(dag_id="test_deps_sorted", schedule=None, start_date=logical_date) as dag: @@ -1621,7 +1621,7 @@ def test_derived_dag_deps_sensor(self): Tests DAG dependency detection for sensors, including derived classes """ from airflow.operators.empty import EmptyOperator - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor class DerivedSensor(ExternalTaskSensor): pass @@ -1652,7 +1652,7 @@ def test_dag_deps_assets_with_duplicate_asset(self): """ Check that dag_dependencies node is populated correctly for a DAG with duplicate assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") @@ -1741,7 +1741,7 @@ def test_dag_deps_assets(self): """ Check that dag_dependencies node is populated correctly for a DAG with assets. """ - from airflow.sensors.external_task import ExternalTaskSensor + from airflow.providers.standard.sensors.external_task import ExternalTaskSensor d1 = Asset("d1") d2 = Asset("d2") diff --git a/tests/system/core/example_external_task_parent_deferrable.py b/tests/system/core/example_external_task_parent_deferrable.py index 860cec5d0896f..ff003eee12a48 100644 --- a/tests/system/core/example_external_task_parent_deferrable.py +++ b/tests/system/core/example_external_task_parent_deferrable.py @@ -19,7 +19,7 @@ from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.timezone import datetime with DAG( From b38fac57b0659188c730affe74c66c78e287db9d Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Fri, 22 Nov 2024 20:33:15 +0530 Subject: [PATCH 2/6] Doc fix --- docs/apache-airflow/operators-and-hooks-ref.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index f5478593e8d08..7c238307e587d 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -76,6 +76,8 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - Sensors - Guides + * - :mod:`airflow.sensors.base` + - **Hooks:** From ca28132a812c1e20d594202c93008f4ee0f60839 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Mon, 25 Nov 2024 16:43:18 +0530 Subject: [PATCH 3/6] Moving external task sensor doc to standard provider doc folder --- .../sensors}/external_task_sensor.rst | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/{apache-airflow/howto/operator => apache-airflow-providers-standard/sensors}/external_task_sensor.rst (100%) diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow-providers-standard/sensors/external_task_sensor.rst similarity index 100% rename from docs/apache-airflow/howto/operator/external_task_sensor.rst rename to docs/apache-airflow-providers-standard/sensors/external_task_sensor.rst From d0b046d0b3ee9134d1e5e06b2391708676d6fe49 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Mon, 25 Nov 2024 19:52:14 +0530 Subject: [PATCH 4/6] Adding change in pre-commit hook --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4b3315f03268a..5f0bf7f61a91c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -725,7 +725,7 @@ repos: files: > (?x) ^providers/src/airflow/providers/.*\.py$ - exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py + exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py|providers/src/airflow/providers/standard/sensors/external_task.py - id: check-get-lineage-collector-providers language: python name: Check providers import hook lineage code from compat From a8d58a8c440cea139ba1ee5b89e550a93aac4e8a Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Wed, 27 Nov 2024 12:19:58 +0530 Subject: [PATCH 5/6] Reverted changes to RELEASE_NOTES.rst --- RELEASE_NOTES.rst | 8 ++++---- airflow/reproducible_build.yaml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 54f73b4bd8d35..3105f64bb24bd 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -6450,10 +6450,10 @@ The following table shows changes in import paths. - ``airflow.sensors.base.BaseSensorOperator`` * - ``airflow.sensors.date_time_sensor.DateTimeSensor`` - ``airflow.sensors.date_time.DateTimeSensor`` - * - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskMarker`` - - ``airflow.providers.standard.sensors.external_task.ExternalTaskMarker`` - * - ``airflow.providers.standard.sensors.external_task_sensor.ExternalTaskSensor`` - - ``airflow.providers.standard.sensors.external_task.ExternalTaskSensor`` + * - ``airflow..sensors.external_task_sensor.ExternalTaskMarker`` + - ``airflow.sensors.external_task.ExternalTaskMarker`` + * - ``airflow.sensors.external_task_sensor.ExternalTaskSensor`` + - ``airflow.sensors.external_task.ExternalTaskSensor`` * - ``airflow.sensors.sql_sensor.SqlSensor`` - ``airflow.sensors.sql.SqlSensor`` * - ``airflow.sensors.time_delta_sensor.TimeDeltaSensor`` diff --git a/airflow/reproducible_build.yaml b/airflow/reproducible_build.yaml index ef6f2621343e2..3b207063ffb42 100644 --- a/airflow/reproducible_build.yaml +++ b/airflow/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: a04c0a95a5b972498a29924ff4b53216 -source-date-epoch: 1732286028 +release-notes-hash: 60432f378a4fe77b54523c235fed3e26 +source-date-epoch: 1732690170 From 90fe71ec97d1b71bb5ec17341bee527f58eac608 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Wed, 27 Nov 2024 12:21:06 +0530 Subject: [PATCH 6/6] Reverted changes to RELEASE_NOTES.rst --- RELEASE_NOTES.rst | 2 +- airflow/reproducible_build.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 3105f64bb24bd..a8b5a265f3afa 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -6450,7 +6450,7 @@ The following table shows changes in import paths. - ``airflow.sensors.base.BaseSensorOperator`` * - ``airflow.sensors.date_time_sensor.DateTimeSensor`` - ``airflow.sensors.date_time.DateTimeSensor`` - * - ``airflow..sensors.external_task_sensor.ExternalTaskMarker`` + * - ``airflow.sensors.external_task_sensor.ExternalTaskMarker`` - ``airflow.sensors.external_task.ExternalTaskMarker`` * - ``airflow.sensors.external_task_sensor.ExternalTaskSensor`` - ``airflow.sensors.external_task.ExternalTaskSensor`` diff --git a/airflow/reproducible_build.yaml b/airflow/reproducible_build.yaml index 3b207063ffb42..5f2d030fd0b63 100644 --- a/airflow/reproducible_build.yaml +++ b/airflow/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: 60432f378a4fe77b54523c235fed3e26 -source-date-epoch: 1732690170 +release-notes-hash: c68f3fa23f84c7fc270d73baaa2cc18d +source-date-epoch: 1732690252