From 713ca94664e865a05a376702cf8ff919b56620fd Mon Sep 17 00:00:00 2001 From: Idris Adebisi Date: Sat, 23 Nov 2024 14:17:49 +0000 Subject: [PATCH 1/5] Moved latest_only operator to standard provider along with all references --- airflow/example_dags/example_latest_only.py | 2 +- airflow/example_dags/example_latest_only_with_trigger.py | 2 +- docs/apache-airflow/operators-and-hooks-ref.rst | 2 +- .../src/airflow/providers/standard}/operators/latest_only.py | 0 tests/operators/test_latest_only_operator.py | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename {airflow => providers/src/airflow/providers/standard}/operators/latest_only.py (100%) diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 1e9906240f5af..2eac0819db18a 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -23,7 +23,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator with DAG( dag_id="latest_only", diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 44a5ed4274005..1b05d5726b644 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -28,7 +28,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 7c238307e587d..0a2abcc841057 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -62,7 +62,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.operators.generic_transfer` - - * - :mod:`airflow.operators.latest_only` + * - :mod:`airflow.providers.standard.operators.latest_only` - * - :mod:`airflow.providers.standard.operators.trigger_dagrun` diff --git a/airflow/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py similarity index 100% rename from airflow/operators/latest_only.py rename to providers/src/airflow/providers/standard/operators/latest_only.py diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py index 7c6b086994d7c..8c5af55f92c92 100644 --- a/tests/operators/test_latest_only_operator.py +++ b/tests/operators/test_latest_only_operator.py @@ -25,7 +25,7 @@ from airflow import settings from airflow.models import DagRun, TaskInstance from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule From 3b2be355deac06da751fa6e7271fdf477860ce87 Mon Sep 17 00:00:00 2001 From: Idris Adebisi Date: Sat, 23 Nov 2024 14:51:22 +0000 Subject: [PATCH 2/5] Add latest_only operator to standard provider.yaml file. --- providers/src/airflow/providers/standard/provider.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index a58ef13a3acc6..6e5d7ba9e732d 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -50,6 +50,7 @@ operators: - airflow.providers.standard.operators.python - airflow.providers.standard.operators.generic_transfer - airflow.providers.standard.operators.trigger_dagrun + - airflow.providers.standard.operators.latest_only sensors: - integration-name: Standard From 3d93454be246c4d554351399138771c4b52b483d Mon Sep 17 00:00:00 2001 From: Idris Adebisi Date: Sun, 24 Nov 2024 12:52:07 +0000 Subject: [PATCH 3/5] Move latest_only operator test file to Standard provider --- .../tests/standard}/operators/test_latest_only_operator.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {tests => providers/tests/standard}/operators/test_latest_only_operator.py (100%) diff --git a/tests/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py similarity index 100% rename from tests/operators/test_latest_only_operator.py rename to providers/tests/standard/operators/test_latest_only_operator.py From 4af8172b66f75e05b4f6f4e71056319bf63eb380 Mon Sep 17 00:00:00 2001 From: Idris Adebisi Date: Fri, 29 Nov 2024 09:04:40 +0000 Subject: [PATCH 4/5] Add skip condition to latest_only_operator tests that are not backward compatible --- .../tests/standard/operators/test_latest_only_operator.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/providers/tests/standard/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py index 8c5af55f92c92..262f36d2f4fed 100644 --- a/providers/tests/standard/operators/test_latest_only_operator.py +++ b/providers/tests/standard/operators/test_latest_only_operator.py @@ -82,6 +82,9 @@ def test_run(self, dag_maker): dag_maker.create_dagrun() task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + @pytest.mark.skipif( + not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0" + ) def test_skipping_non_latest(self, dag_maker): with dag_maker( default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True @@ -161,6 +164,9 @@ def test_skipping_non_latest(self, dag_maker): timezone.datetime(2016, 1, 2): "success", } + @pytest.mark.skipif( + not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0" + ) def test_not_skipping_external(self, dag_maker): with dag_maker( default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True From 3c2ee36c10d03fa033553c6cabcd5f6d8552c8f7 Mon Sep 17 00:00:00 2001 From: Idris Adebisi Date: Fri, 29 Nov 2024 12:49:31 +0000 Subject: [PATCH 5/5] Revert skipping test to using a conditional code depending on airflow version --- .../operators/test_latest_only_operator.py | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/providers/tests/standard/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py index 262f36d2f4fed..fa20f294fd019 100644 --- a/providers/tests/standard/operators/test_latest_only_operator.py +++ b/providers/tests/standard/operators/test_latest_only_operator.py @@ -48,11 +48,12 @@ def get_task_instances(task_id): session = settings.Session() + logical_date = DagRun.logical_date if AIRFLOW_V_3_0_PLUS else DagRun.execution_date return ( session.query(TaskInstance) .join(TaskInstance.dag_run) .filter(TaskInstance.task_id == task_id) - .order_by(DagRun.logical_date) + .order_by(logical_date) .all() ) @@ -82,9 +83,6 @@ def test_run(self, dag_maker): dag_maker.create_dagrun() task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.skipif( - not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0" - ) def test_skipping_non_latest(self, dag_maker): with dag_maker( default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True @@ -133,7 +131,10 @@ def test_skipping_non_latest(self, dag_maker): downstream_task3.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances("latest") - exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + else: + exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances} assert exec_date_to_latest_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -141,7 +142,10 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "skipped", timezone.datetime(2016, 1, 1, 12): "skipped", @@ -149,7 +153,10 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream_2") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): None, timezone.datetime(2016, 1, 1, 12): None, @@ -157,16 +164,16 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream_3") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", timezone.datetime(2016, 1, 2): "success", } - @pytest.mark.skipif( - not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0" - ) def test_not_skipping_external(self, dag_maker): with dag_maker( default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True @@ -216,7 +223,10 @@ def test_not_skipping_external(self, dag_maker): downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances("latest") - exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + else: + exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances} assert exec_date_to_latest_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -224,7 +234,10 @@ def test_not_skipping_external(self, dag_maker): } downstream_instances = get_task_instances("downstream") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -232,7 +245,10 @@ def test_not_skipping_external(self, dag_maker): } downstream_instances = get_task_instances("downstream_2") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success",