diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 1e9906240f5af..2eac0819db18a 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -23,7 +23,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator with DAG( dag_id="latest_only", diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 44a5ed4274005..1b05d5726b644 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -28,7 +28,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 7c238307e587d..0a2abcc841057 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -62,7 +62,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.operators.generic_transfer` - - * - :mod:`airflow.operators.latest_only` + * - :mod:`airflow.providers.standard.operators.latest_only` - * - :mod:`airflow.providers.standard.operators.trigger_dagrun` diff --git a/airflow/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py similarity index 100% rename from airflow/operators/latest_only.py rename to providers/src/airflow/providers/standard/operators/latest_only.py diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index a58ef13a3acc6..6e5d7ba9e732d 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -50,6 +50,7 @@ operators: - airflow.providers.standard.operators.python - airflow.providers.standard.operators.generic_transfer - airflow.providers.standard.operators.trigger_dagrun + - airflow.providers.standard.operators.latest_only sensors: - integration-name: Standard diff --git a/tests/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py similarity index 81% rename from tests/operators/test_latest_only_operator.py rename to providers/tests/standard/operators/test_latest_only_operator.py index 7c6b086994d7c..fa20f294fd019 100644 --- a/tests/operators/test_latest_only_operator.py +++ b/providers/tests/standard/operators/test_latest_only_operator.py @@ -25,7 +25,7 @@ from airflow import settings from airflow.models import DagRun, TaskInstance from airflow.operators.empty import EmptyOperator -from airflow.operators.latest_only import LatestOnlyOperator +from airflow.providers.standard.operators.latest_only import LatestOnlyOperator from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule @@ -48,11 +48,12 @@ def get_task_instances(task_id): session = settings.Session() + logical_date = DagRun.logical_date if AIRFLOW_V_3_0_PLUS else DagRun.execution_date return ( session.query(TaskInstance) .join(TaskInstance.dag_run) .filter(TaskInstance.task_id == task_id) - .order_by(DagRun.logical_date) + .order_by(logical_date) .all() ) @@ -130,7 +131,10 @@ def test_skipping_non_latest(self, dag_maker): downstream_task3.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances("latest") - exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + else: + exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances} assert exec_date_to_latest_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -138,7 +142,10 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "skipped", timezone.datetime(2016, 1, 1, 12): "skipped", @@ -146,7 +153,10 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream_2") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): None, timezone.datetime(2016, 1, 1, 12): None, @@ -154,7 +164,10 @@ def test_skipping_non_latest(self, dag_maker): } downstream_instances = get_task_instances("downstream_3") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -210,7 +223,10 @@ def test_not_skipping_external(self, dag_maker): downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances("latest") - exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances} + else: + exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances} assert exec_date_to_latest_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -218,7 +234,10 @@ def test_not_skipping_external(self, dag_maker): } downstream_instances = get_task_instances("downstream") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success", @@ -226,7 +245,10 @@ def test_not_skipping_external(self, dag_maker): } downstream_instances = get_task_instances("downstream_2") - exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + if AIRFLOW_V_3_0_PLUS: + exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances} + else: + exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances} assert exec_date_to_downstream_state == { timezone.datetime(2016, 1, 1): "success", timezone.datetime(2016, 1, 1, 12): "success",