diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 20c7a293bff06..2e226e95de4f3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -724,7 +724,10 @@ repos:
files: >
(?x)
^providers/src/airflow/providers/.*\.py$
- exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py
+ exclude: |
+ (?x)
+ ^.*/.*_vendor/|
+ ^providers/src/airflow/providers/standard/.*$
- id: check-get-lineage-collector-providers
language: python
name: Check providers import hook lineage code from compat
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a14567cd00e17..c65485fb2b8e5 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2060,7 +2060,7 @@ email:
version_added: ~
type: string
example: ~
- default: "airflow.utils.email.send_email_smtp"
+ default: "airflow.providers.standard.utils.email.send_email_smtp"
email_conn_id:
description: Email connection to use
version_added: 2.1.0
@@ -2126,7 +2126,7 @@ email:
smtp:
description: |
If you want airflow to send emails on retries, failure, and you want to use
- the airflow.utils.email.send_email_smtp function, you have to configure an
+ the airflow.providers.standard.utils.email.send_email_smtp function, you have to configure an
smtp server here
options:
smtp_host:
diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py
index 447b4471b97e4..7ec31497a1fe7 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -24,7 +24,7 @@
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
-from airflow.operators.email import EmailOperator
+from airflow.providers.standard.operators.email import EmailOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index abaf21770523a..0a282a834195c 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -44,7 +44,7 @@
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
+from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
start_date = pendulum.datetime(2021, 1, 1, tz="UTC")
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 1e9906240f5af..2eac0819db18a 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -23,7 +23,7 @@
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.operators.latest_only import LatestOnlyOperator
+from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
with DAG(
dag_id="latest_only",
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index 44a5ed4274005..1b05d5726b644 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -28,7 +28,7 @@
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.operators.latest_only import LatestOnlyOperator
+from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
diff --git a/airflow/example_dags/example_sensors.py b/airflow/example_dags/example_sensors.py
index 52b1b84e223e8..39d7b8d29635f 100644
--- a/airflow/example_dags/example_sensors.py
+++ b/airflow/example_dags/example_sensors.py
@@ -24,11 +24,11 @@
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
+from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
-from airflow.sensors.filesystem import FileSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weekday import WeekDay
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index e68e60c4b19a3..e546bd0a7e87d 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -26,7 +26,7 @@
import pendulum
from airflow.models.dag import DAG
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="example_trigger_controller_dag",
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 851d2a5129346..b28f2e0a32f98 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1115,7 +1115,7 @@ def _get_task_instances(
if include_dependent_dags:
# Recursively find external tasks indicated by ExternalTaskMarker
- from airflow.sensors.external_task import ExternalTaskMarker
+ from airflow.providers.standard.sensors.external_task import ExternalTaskMarker
query = tis
if as_pk_tuple:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb07ba6d848af..76480d8445ac7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -101,6 +101,7 @@
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import LazyXComSelectSequence, XCom
from airflow.plugins_manager import integrate_macros_plugins
+from airflow.providers.standard.utils.email import send_email
from airflow.sentry import Sentry
from airflow.settings import task_instance_mutation_hook
from airflow.stats import Stats
@@ -118,7 +119,6 @@
context_get_outlet_events,
context_merge,
)
-from airflow.utils.email import send_email
from airflow.utils.helpers import prune_dict, render_template_to_string
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 79403860f5fac..936a386e8c092 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -118,10 +118,10 @@
log = logging.getLogger(__name__)
_OPERATOR_EXTRA_LINKS: set[str] = {
- "airflow.operators.trigger_dagrun.TriggerDagRunLink",
- "airflow.sensors.external_task.ExternalDagLink",
+ "airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink",
+ "airflow.providers.standard.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
- "airflow.sensors.external_task.ExternalTaskSensorLink",
+ "airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink",
"airflow.operators.dagrun_operator.TriggerDagRunLink",
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
}
@@ -1015,8 +1015,8 @@ class DependencyDetector:
@staticmethod
def detect_task_dependencies(task: Operator) -> list[DagDependency]:
"""Detect dependencies caused by tasks."""
- from airflow.operators.trigger_dagrun import TriggerDagRunOperator
- from airflow.sensors.external_task import ExternalTaskSensor
+ from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
deps = []
if isinstance(task, TriggerDagRunOperator):
diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index 0b477151a9091..c208e8cedcbc9 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -68,7 +68,7 @@ When writing a deferrable operators these are the main points to consider:
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
- from airflow.triggers.temporal import TimeDeltaTrigger
+ from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
@@ -122,7 +122,7 @@ This example shows the structure of a basic trigger, a very simplified version o
self.moment = moment
def serialize(self):
- return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
+ return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
@@ -177,7 +177,7 @@ Here's a basic example of how a sensor might trigger deferral:
from typing import TYPE_CHECKING, Any
from airflow.sensors.base import BaseSensorOperator
- from airflow.triggers.temporal import TimeDeltaTrigger
+ from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -237,7 +237,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
class WaitOneHourSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
@@ -268,7 +268,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
@@ -307,7 +307,7 @@ After the trigger has finished executing, the task may be sent back to the worke
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst
index ae4da4ab18c68..6ac6d874ade56 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -359,7 +359,7 @@ The ``@task.branch`` can also be used with XComs allowing branching context to d
start_op >> branch_op >> [continue_op, stop_op]
-If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.
+If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.providers.standard.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.
.. note::
The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.providers.standard.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator.
@@ -765,7 +765,7 @@ While dependencies between tasks in a DAG are explicitly defined through upstrea
relationships, dependencies between DAGs are a bit more complex. In general, there are two ways
in which one DAG can depend on another:
-- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator`
+- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`
Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
diff --git a/docs/apache-airflow/core-concepts/operators.rst b/docs/apache-airflow/core-concepts/operators.rst
index e721faa97560f..7812006658eab 100644
--- a/docs/apache-airflow/core-concepts/operators.rst
+++ b/docs/apache-airflow/core-concepts/operators.rst
@@ -30,7 +30,7 @@ Airflow has a very extensive set of operators available, with some built-in to t
- :class:`~airflow.providers.standard.operators.bash.BashOperator` - executes a bash command
- :class:`~airflow.providers.standard.operators.python.PythonOperator` - calls an arbitrary Python function
-- :class:`~airflow.operators.email.EmailOperator` - sends an email
+- :class:`~airflow.providers.standard.operators.email.EmailOperator` - sends an email
- Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments.
.. note::
diff --git a/docs/apache-airflow/core-concepts/taskflow.rst b/docs/apache-airflow/core-concepts/taskflow.rst
index b31494c155891..b090302888f3d 100644
--- a/docs/apache-airflow/core-concepts/taskflow.rst
+++ b/docs/apache-airflow/core-concepts/taskflow.rst
@@ -25,7 +25,7 @@ If you write most of your DAGs using plain Python code rather than Operators, th
TaskFlow takes care of moving inputs and outputs between your Tasks using XComs for you, as well as automatically calculating dependencies - when you call a TaskFlow function in your DAG file, rather than executing it, you will get an object representing the XCom for the result (an ``XComArg``), that you can then use as inputs to downstream tasks or operators. For example::
from airflow.decorators import task
- from airflow.operators.email import EmailOperator
+ from airflow.providers.standard.operators.email import EmailOperator
@task
def get_ip():
diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst
index bc7ea9a1f9d04..5fe39a2ae347e 100644
--- a/docs/apache-airflow/howto/email-config.rst
+++ b/docs/apache-airflow/howto/email-config.rst
@@ -25,7 +25,7 @@ in the ``[email]`` section.
.. code-block:: ini
[email]
- email_backend = airflow.utils.email.send_email_smtp
+ email_backend = airflow.providers.standard.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
@@ -33,7 +33,7 @@ Equivalent environment variables look like:
.. code-block:: sh
- AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp
+ AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.standard.utils.email.send_email_smtp
AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file
AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file
@@ -63,7 +63,7 @@ the example below.
.. code-block:: bash
$ airflow config get-value email email_backend
- airflow.utils.email.send_email_smtp
+ airflow.providers.standard.utils.email.send_email_smtp
To access the task's information you use `Jinja Templating `_ in your template files.
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index 7ddbc4b2417b8..812f3b6aab9b7 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -41,7 +41,7 @@ DAGs.
ExternalTaskSensor
^^^^^^^^^^^^^^^^^^
-Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
+Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.
ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
@@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode:
ExternalTaskSensor with task_group dependency
---------------------------------------------
-In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
+In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another ``task_group`` on a different DAG for a specific ``execution_date``.
.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
diff --git a/docs/apache-airflow/howto/operator/file.rst b/docs/apache-airflow/howto/operator/file.rst
index 49ca1c75f6042..6241cc2e50533 100644
--- a/docs/apache-airflow/howto/operator/file.rst
+++ b/docs/apache-airflow/howto/operator/file.rst
@@ -22,7 +22,7 @@
FileSensor
==========
-Use the :class:`~airflow.sensors.filesystem.FileSensor` to detect files appearing in your local
+Use the :class:`~airflow.providers.standard.sensors.filesystem.FileSensor` to detect files appearing in your local
filesystem. You need to have connection defined to use it (pass connection id via ``fs_conn_id``).
Default connection is ``fs_default``.
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index 026d206ea5f01..775c863b5c813 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -53,25 +53,25 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.providers.standard.operators.bash`
- :doc:`How to use `
- * - :mod:`airflow.operators.branch`
+ * - :mod:`airflow.providers.standard.operators.branch`
-
* - :mod:`airflow.operators.empty`
-
- * - :mod:`airflow.operators.email`
+ * - :mod:`airflow.providers.standard.operators.email`
-
* - :mod:`airflow.operators.generic_transfer`
-
- * - :mod:`airflow.operators.latest_only`
+ * - :mod:`airflow.providers.standard.operators.latest_only`
-
* - :mod:`airflow.providers.standard.operators.python`
- :doc:`How to use `
- * - :mod:`airflow.operators.trigger_dagrun`
+ * - :mod:`airflow.providers.standard.operators.trigger_dagrun`
-
**Sensors:**
@@ -85,10 +85,10 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.providers.standard.sensors.bash`
- :ref:`How to use `
- * - :mod:`airflow.sensors.external_task`
+ * - :mod:`airflow.providers.standard.sensors.external_task`
- :doc:`How to use `
- * - :mod:`airflow.sensors.filesystem`
+ * - :mod:`airflow.providers.standard.sensors.filesystem`
- :ref:`How to use `
* - :mod:`airflow.providers.standard.sensors.python`
diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst
index 72b5ac82dd049..0b6bf77316fb2 100644
--- a/docs/apache-airflow/tutorial/taskflow.rst
+++ b/docs/apache-airflow/tutorial/taskflow.rst
@@ -438,7 +438,7 @@ Adding dependencies between decorated and traditional tasks
-----------------------------------------------------------
The above tutorial shows how to create dependencies between TaskFlow functions. However, dependencies can also
be set between traditional tasks (such as :class:`~airflow.providers.standard.operators.bash.BashOperator`
-or :class:`~airflow.sensors.filesystem.FileSensor`) and TaskFlow functions.
+or :class:`~airflow.providers.standard.sensors.filesystem.FileSensor`) and TaskFlow functions.
Building this dependency is shown in the code below:
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index 058f7c00063e7..6301d4d946bbd 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -846,6 +846,7 @@
"plugins": [],
"cross-providers-deps": [
"amazon",
+ "common.compat",
"google",
"oracle",
"sftp"
@@ -1180,7 +1181,9 @@
],
"devel-deps": [],
"plugins": [],
- "cross-providers-deps": [],
+ "cross-providers-deps": [
+ "common.compat"
+ ],
"excluded-python-versions": [],
"state": "ready"
},
diff --git a/providers/src/airflow/providers/amazon/aws/hooks/ses.py b/providers/src/airflow/providers/amazon/aws/hooks/ses.py
index a5ea06de35171..4396a89fe577b 100644
--- a/providers/src/airflow/providers/amazon/aws/hooks/ses.py
+++ b/providers/src/airflow/providers/amazon/aws/hooks/ses.py
@@ -21,7 +21,7 @@
from typing import Any, Iterable
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.utils.email import build_mime_message
+from airflow.providers.common.compat.standard.utils import build_mime_message
class SesHook(AwsBaseHook):
diff --git a/providers/src/airflow/providers/common/compat/standard/triggers.py b/providers/src/airflow/providers/common/compat/standard/triggers.py
new file mode 100644
index 0000000000000..1f7f524e8867c
--- /dev/null
+++ b/providers/src/airflow/providers/common/compat/standard/triggers.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
+else:
+ try:
+ from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
+ except ModuleNotFoundError:
+ from airflow.triggers.temporal import TimeDeltaTrigger
+
+
+__all__ = ["TimeDeltaTrigger"]
diff --git a/providers/src/airflow/providers/common/compat/standard/utils.py b/providers/src/airflow/providers/common/compat/standard/utils.py
index bfa263d1be946..15ad5a2a2406b 100644
--- a/providers/src/airflow/providers/common/compat/standard/utils.py
+++ b/providers/src/airflow/providers/common/compat/standard/utils.py
@@ -20,12 +20,15 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
+ from airflow.providers.standard.utils.email import build_mime_message, get_email_address_list
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
else:
try:
+ from airflow.providers.standard.utils.email import build_mime_message, get_email_address_list
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
except ModuleNotFoundError:
+ from airflow.utils.email import build_mime_message, get_email_address_list
from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
-__all__ = ["write_python_script", "prepare_virtualenv"]
+__all__ = ["write_python_script", "prepare_virtualenv", "build_mime_message", "get_email_address_list"]
diff --git a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
index 6736ea59c918d..23b8b1cde79a9 100644
--- a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
+++ b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
@@ -20,10 +20,10 @@
from typing import TYPE_CHECKING, Any, Callable, Sequence
from airflow.exceptions import AirflowException
+from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.providers.microsoft.azure.triggers.msgraph import MSGraphTrigger, ResponseSerializer
from airflow.sensors.base import BaseSensorOperator
-from airflow.triggers.temporal import TimeDeltaTrigger
if TYPE_CHECKING:
from datetime import timedelta
diff --git a/providers/src/airflow/providers/sendgrid/utils/emailer.py b/providers/src/airflow/providers/sendgrid/utils/emailer.py
index b82aad35db9b6..0396b064f028f 100644
--- a/providers/src/airflow/providers/sendgrid/utils/emailer.py
+++ b/providers/src/airflow/providers/sendgrid/utils/emailer.py
@@ -41,7 +41,7 @@
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
-from airflow.utils.email import get_email_address_list
+from airflow.providers.common.compat.standard.utils import get_email_address_list
log = logging.getLogger(__name__)
diff --git a/airflow/operators/branch.py b/providers/src/airflow/providers/standard/operators/branch.py
similarity index 100%
rename from airflow/operators/branch.py
rename to providers/src/airflow/providers/standard/operators/branch.py
diff --git a/providers/src/airflow/providers/standard/operators/datetime.py b/providers/src/airflow/providers/standard/operators/datetime.py
index 4455b84dd3bb9..ac6b01067d1b8 100644
--- a/providers/src/airflow/providers/standard/operators/datetime.py
+++ b/providers/src/airflow/providers/standard/operators/datetime.py
@@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Iterable
from airflow.exceptions import AirflowException
-from airflow.operators.branch import BaseBranchOperator
+from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.utils import timezone
if TYPE_CHECKING:
diff --git a/airflow/operators/email.py b/providers/src/airflow/providers/standard/operators/email.py
similarity index 98%
rename from airflow/operators/email.py
rename to providers/src/airflow/providers/standard/operators/email.py
index 98af60afe4afc..3ed745916ab91 100644
--- a/airflow/operators/email.py
+++ b/providers/src/airflow/providers/standard/operators/email.py
@@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Any, Sequence
from airflow.models.baseoperator import BaseOperator
-from airflow.utils.email import send_email
+from airflow.providers.standard.utils.email import send_email
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git a/airflow/operators/latest_only.py b/providers/src/airflow/providers/standard/operators/latest_only.py
similarity index 97%
rename from airflow/operators/latest_only.py
rename to providers/src/airflow/providers/standard/operators/latest_only.py
index 6f2c0288d90d6..9983058ea2fe0 100644
--- a/airflow/operators/latest_only.py
+++ b/providers/src/airflow/providers/standard/operators/latest_only.py
@@ -23,7 +23,7 @@
import pendulum
-from airflow.operators.branch import BaseBranchOperator
+from airflow.providers.standard.operators.branch import BaseBranchOperator
if TYPE_CHECKING:
from airflow.models import DAG, DagRun
diff --git a/providers/src/airflow/providers/standard/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py
index 4d908f6ba6809..b5d8aa5615d5b 100644
--- a/providers/src/airflow/providers/standard/operators/python.py
+++ b/providers/src/airflow/providers/standard/operators/python.py
@@ -50,7 +50,7 @@
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
-from airflow.operators.branch import BranchMixIn
+from airflow.providers.standard.operators.branch import BranchMixIn
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
diff --git a/airflow/operators/trigger_dagrun.py b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
similarity index 91%
rename from airflow/operators/trigger_dagrun.py
rename to providers/src/airflow/providers/standard/operators/trigger_dagrun.py
index bb1eac7c6963f..843b77ef1f3b0 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -22,9 +22,11 @@
import time
from typing import TYPE_CHECKING, Any, Sequence, cast
+from packaging.version import Version
from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound
+from airflow import __version__ as AIRFLOW_VERSION
from airflow.api.common.trigger_dag import trigger_dag
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
@@ -40,17 +42,22 @@
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.xcom import XCom
-from airflow.triggers.external_task import DagStateTrigger
+from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.utils import timezone
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
-from airflow.utils.types import DagRunTriggeredByType, DagRunType
+from airflow.utils.types import DagRunType
XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso"
XCOM_RUN_ID = "trigger_run_id"
+AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= Version("3.0.0")
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.utils.types import DagRunTriggeredByType
+
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
@@ -191,14 +198,23 @@ def execute(self, context: Context):
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date)
try:
- dag_run = trigger_dag(
- dag_id=self.trigger_dag_id,
- run_id=run_id,
- conf=self.conf,
- execution_date=parsed_logical_date,
- replace_microseconds=False,
- triggered_by=DagRunTriggeredByType.OPERATOR,
- )
+ if AIRFLOW_V_3_0_PLUS:
+ dag_run = trigger_dag(
+ dag_id=self.trigger_dag_id,
+ run_id=run_id,
+ conf=self.conf,
+ execution_date=parsed_logical_date,
+ replace_microseconds=False,
+ triggered_by=DagRunTriggeredByType.OPERATOR,
+ )
+ else:
+ dag_run = trigger_dag( # type: ignore[call-arg]
+ dag_id=self.trigger_dag_id,
+ run_id=run_id,
+ conf=self.conf,
+ execution_date=parsed_logical_date,
+ replace_microseconds=False,
+ )
except DagRunAlreadyExists as e:
if self.reset_dag_run:
diff --git a/providers/src/airflow/providers/standard/operators/weekday.py b/providers/src/airflow/providers/standard/operators/weekday.py
index 0c74b19d795b4..ffa51c0c82c1b 100644
--- a/providers/src/airflow/providers/standard/operators/weekday.py
+++ b/providers/src/airflow/providers/standard/operators/weekday.py
@@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Iterable
-from airflow.operators.branch import BaseBranchOperator
+from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.weekday import WeekDay
diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml
index 426a3a67871ab..0ce56dde9023c 100644
--- a/providers/src/airflow/providers/standard/provider.yaml
+++ b/providers/src/airflow/providers/standard/provider.yaml
@@ -46,6 +46,10 @@ operators:
- airflow.providers.standard.operators.bash
- airflow.providers.standard.operators.python
- airflow.providers.standard.operators.generic_transfer
+ - airflow.providers.standard.operators.branch
+ - airflow.providers.standard.operators.email
+ - airflow.providers.standard.operators.latest_only
+ - airflow.providers.standard.operators.trigger_dagrun
sensors:
- integration-name: Standard
@@ -56,9 +60,18 @@ sensors:
- airflow.providers.standard.sensors.weekday
- airflow.providers.standard.sensors.bash
- airflow.providers.standard.sensors.python
+ - airflow.providers.standard.sensors.external_task
+ - airflow.providers.standard.sensors.filesystem
hooks:
- integration-name: Standard
python-modules:
- airflow.providers.standard.hooks.filesystem
- airflow.providers.standard.hooks.package_index
- airflow.providers.standard.hooks.subprocess
+
+triggers:
+ - integration-name: Standard
+ python-modules:
+ - airflow.providers.standard.triggers.external_task
+ - airflow.providers.standard.triggers.file
+ - airflow.providers.standard.triggers.temporal
diff --git a/providers/src/airflow/providers/standard/sensors/date_time.py b/providers/src/airflow/providers/standard/sensors/date_time.py
index 20a6a484e05a4..50cc7b6aaaf59 100644
--- a/providers/src/airflow/providers/standard/sensors/date_time.py
+++ b/providers/src/airflow/providers/standard/sensors/date_time.py
@@ -20,9 +20,9 @@
import datetime
from typing import TYPE_CHECKING, Any, NoReturn, Sequence
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
-from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
if TYPE_CHECKING:
@@ -93,7 +93,7 @@ class DateTimeSensorAsync(DateTimeSensor):
"""
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.DateTimeTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
next_method="execute_complete",
next_kwargs=None,
diff --git a/airflow/sensors/external_task.py b/providers/src/airflow/providers/standard/sensors/external_task.py
similarity index 99%
rename from airflow/sensors/external_task.py
rename to providers/src/airflow/providers/standard/sensors/external_task.py
index 331e17168bab7..16b892f90a804 100644
--- a/airflow/sensors/external_task.py
+++ b/providers/src/airflow/providers/standard/sensors/external_task.py
@@ -29,8 +29,8 @@
from airflow.models.dagbag import DagBag
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
+from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.sensors.base import BaseSensorOperator
-from airflow.triggers.external_task import WorkflowTrigger
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
diff --git a/airflow/sensors/filesystem.py b/providers/src/airflow/providers/standard/sensors/filesystem.py
similarity index 97%
rename from airflow/sensors/filesystem.py
rename to providers/src/airflow/providers/standard/sensors/filesystem.py
index 4496f5d6abfa4..1d499e4435b2d 100644
--- a/airflow/sensors/filesystem.py
+++ b/providers/src/airflow/providers/standard/sensors/filesystem.py
@@ -26,9 +26,9 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.standard.hooks.filesystem import FSHook
+from airflow.providers.standard.triggers.file import FileTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
-from airflow.triggers.file import FileTrigger
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -63,7 +63,7 @@ class FileSensor(BaseSensorOperator):
template_fields: Sequence[str] = ("filepath",)
ui_color = "#91818a"
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.file.FileTrigger",
+ trigger_cls="airflow.providers.standard.triggers.file.FileTrigger",
trigger_kwargs={},
next_method="execute_complete",
next_kwargs=None,
diff --git a/providers/src/airflow/providers/standard/sensors/time.py b/providers/src/airflow/providers/standard/sensors/time.py
index 6dba2628fce35..48ea8e6813a1d 100644
--- a/providers/src/airflow/providers/standard/sensors/time.py
+++ b/providers/src/airflow/providers/standard/sensors/time.py
@@ -20,9 +20,9 @@
import datetime
from typing import TYPE_CHECKING, Any, NoReturn
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
-from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
if TYPE_CHECKING:
@@ -68,7 +68,7 @@ class TimeSensorAsync(BaseSensorOperator):
"""
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.DateTimeTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
trigger_kwargs={"moment": "", "end_from_trigger": False},
next_method="execute_complete",
next_kwargs=None,
diff --git a/providers/src/airflow/providers/standard/sensors/time_delta.py b/providers/src/airflow/providers/standard/sensors/time_delta.py
index dc78a0e33bc42..9a51b52dc8dc4 100644
--- a/providers/src/airflow/providers/standard/sensors/time_delta.py
+++ b/providers/src/airflow/providers/standard/sensors/time_delta.py
@@ -23,8 +23,8 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.sensors.base import BaseSensorOperator
-from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
if TYPE_CHECKING:
diff --git a/providers/src/airflow/providers/standard/triggers/__init__.py b/providers/src/airflow/providers/standard/triggers/__init__.py
new file mode 100644
index 0000000000000..217e5db960782
--- /dev/null
+++ b/providers/src/airflow/providers/standard/triggers/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/triggers/external_task.py b/providers/src/airflow/providers/standard/triggers/external_task.py
similarity index 97%
rename from airflow/triggers/external_task.py
rename to providers/src/airflow/providers/standard/triggers/external_task.py
index cd43d59876e9e..47c8aa54b6c4c 100644
--- a/airflow/triggers/external_task.py
+++ b/providers/src/airflow/providers/standard/triggers/external_task.py
@@ -78,7 +78,7 @@ def __init__(
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the trigger param and module path."""
return (
- "airflow.triggers.external_task.WorkflowTrigger",
+ "airflow.providers.standard.triggers.external_task.WorkflowTrigger",
{
"external_dag_id": self.external_dag_id,
"external_task_ids": self.external_task_ids,
@@ -159,7 +159,7 @@ def __init__(
def serialize(self) -> tuple[str, dict[str, typing.Any]]:
"""Serialize DagStateTrigger arguments and classpath."""
return (
- "airflow.triggers.external_task.DagStateTrigger",
+ "airflow.providers.standard.triggers.external_task.DagStateTrigger",
{
"dag_id": self.dag_id,
"states": self.states,
diff --git a/airflow/triggers/file.py b/providers/src/airflow/providers/standard/triggers/file.py
similarity index 85%
rename from airflow/triggers/file.py
rename to providers/src/airflow/providers/standard/triggers/file.py
index 5f40dd4d2de3f..f6a7715a035eb 100644
--- a/airflow/triggers/file.py
+++ b/providers/src/airflow/providers/standard/triggers/file.py
@@ -20,7 +20,6 @@
import datetime
import os
import typing
-import warnings
from glob import glob
from typing import Any
@@ -48,21 +47,12 @@ def __init__(
super().__init__()
self.filepath = filepath
self.recursive = recursive
- if kwargs.get("poll_interval") is not None:
- warnings.warn(
- "`poll_interval` has been deprecated and will be removed in future."
- "Please use `poke_interval` instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- self.poke_interval: float = kwargs["poll_interval"]
- else:
- self.poke_interval = poke_interval
+ self.poke_interval = poke_interval
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize FileTrigger arguments and classpath."""
return (
- "airflow.triggers.file.FileTrigger",
+ "airflow.providers.standard.triggers.file.FileTrigger",
{
"filepath": self.filepath,
"recursive": self.recursive,
diff --git a/airflow/triggers/temporal.py b/providers/src/airflow/providers/standard/triggers/temporal.py
similarity index 98%
rename from airflow/triggers/temporal.py
rename to providers/src/airflow/providers/standard/triggers/temporal.py
index 64c3afe8162c3..44295cf1057dd 100644
--- a/airflow/triggers/temporal.py
+++ b/providers/src/airflow/providers/standard/triggers/temporal.py
@@ -53,7 +53,7 @@ def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = False)
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
- "airflow.triggers.temporal.DateTimeTrigger",
+ "airflow.providers.standard.triggers.temporal.DateTimeTrigger",
{"moment": self.moment, "end_from_trigger": self.end_from_trigger},
)
diff --git a/airflow/utils/email.py b/providers/src/airflow/providers/standard/utils/email.py
similarity index 100%
rename from airflow/utils/email.py
rename to providers/src/airflow/providers/standard/utils/email.py
diff --git a/tests/operators/test_branch_operator.py b/providers/tests/standard/operators/test_branch_operator.py
similarity index 99%
rename from tests/operators/test_branch_operator.py
rename to providers/tests/standard/operators/test_branch_operator.py
index 9df1b897b4270..ec8987d776723 100644
--- a/tests/operators/test_branch_operator.py
+++ b/providers/tests/standard/operators/test_branch_operator.py
@@ -22,8 +22,8 @@
import pytest
from airflow.models.taskinstance import TaskInstance as TI
-from airflow.operators.branch import BaseBranchOperator
from airflow.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
diff --git a/tests/operators/test_email.py b/providers/tests/standard/operators/test_email.py
similarity index 91%
rename from tests/operators/test_email.py
rename to providers/tests/standard/operators/test_email.py
index df772beb95fdd..dc796a709fd82 100644
--- a/tests/operators/test_email.py
+++ b/providers/tests/standard/operators/test_email.py
@@ -22,7 +22,7 @@
import pytest
-from airflow.operators.email import EmailOperator
+from airflow.providers.standard.operators.email import EmailOperator
from airflow.utils import timezone
from tests_common.test_utils.config import conf_vars
@@ -39,7 +39,9 @@
class TestEmailOperator:
def test_execute(self, dag_maker):
- with conf_vars({("email", "email_backend"): "tests.operators.test_email.send_email_test"}):
+ with conf_vars(
+ {("email", "email_backend"): "providers.tests.standard.operators.test_email.send_email_test"}
+ ):
with dag_maker(
"test_dag",
default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
diff --git a/tests/operators/test_latest_only_operator.py b/providers/tests/standard/operators/test_latest_only_operator.py
similarity index 99%
rename from tests/operators/test_latest_only_operator.py
rename to providers/tests/standard/operators/test_latest_only_operator.py
index fbc2cd5b46d8a..e7572f8ca0e5a 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/providers/tests/standard/operators/test_latest_only_operator.py
@@ -25,7 +25,7 @@
from airflow import settings
from airflow.models import DagRun, TaskInstance
from airflow.operators.empty import EmptyOperator
-from airflow.operators.latest_only import LatestOnlyOperator
+from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
diff --git a/tests/operators/test_trigger_dagrun.py b/providers/tests/standard/operators/test_trigger_dagrun.py
similarity index 98%
rename from tests/operators/test_trigger_dagrun.py
rename to providers/tests/standard/operators/test_trigger_dagrun.py
index 52a11d10e5e33..276eb07798184 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/providers/tests/standard/operators/test_trigger_dagrun.py
@@ -31,14 +31,16 @@
from airflow.models.log import Log
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.settings import TracebackSessionForTests
-from airflow.triggers.external_task import DagStateTrigger
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunType
+from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
+
pytestmark = pytest.mark.db_test
DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
@@ -76,7 +78,11 @@ def setup_method(self):
def re_sync_triggered_dag_to_db(self, dag, dag_maker):
TracebackSessionForTests.set_allow_db_access(dag_maker.session, True)
dagbag = DagBag(self.f_name, read_dags_from_db=False, include_examples=False)
- dagbag.bag_dag(dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dagbag.bag_dag(dag)
+ else:
+ dagbag.bag_dag(dag, root_dag=dag)
+
dagbag.sync_to_db(session=dag_maker.session)
TracebackSessionForTests.set_allow_db_access(dag_maker.session, False)
@@ -108,7 +114,9 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session):
)
.one()
)
- with mock.patch("airflow.operators.trigger_dagrun.build_airflow_url_with_query") as mock_build_url:
+ with mock.patch(
+ "airflow.providers.standard.operators.trigger_dagrun.build_airflow_url_with_query"
+ ) as mock_build_url:
triggering_task.get_extra_links(triggering_ti, "Triggered DAG")
assert mock_build_url.called
args, _ = mock_build_url.call_args
diff --git a/tests/sensors/test_external_task_sensor.py b/providers/tests/standard/sensors/test_external_task_sensor.py
similarity index 98%
rename from tests/sensors/test_external_task_sensor.py
rename to providers/tests/standard/sensors/test_external_task_sensor.py
index 43911c1a41d48..69c4ec8963737 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/providers/tests/standard/sensors/test_external_task_sensor.py
@@ -38,13 +38,13 @@
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.providers.standard.sensors.time import TimeSensor
-from airflow.sensors.external_task import (
+from airflow.providers.standard.sensors.external_task import (
ExternalTaskMarker,
ExternalTaskSensor,
)
+from airflow.providers.standard.sensors.time import TimeSensor
+from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.serialization.serialized_objects import SerializedBaseOperator
-from airflow.triggers.external_task import WorkflowTrigger
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -934,8 +934,8 @@ def test_external_task_group_when_there_is_no_TIs(self):
),
),
)
- @mock.patch("airflow.sensors.external_task.ExternalTaskSensor.get_count")
- @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
+ @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor.get_count")
+ @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
def test_fail_poke(
self, _get_dttm_filter, get_count, soft_fail, expected_exception, kwargs, expected_message
):
@@ -991,7 +991,7 @@ def test_fail_poke(
),
),
)
- @mock.patch("airflow.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
+ @mock.patch("airflow.providers.standard.sensors.external_task.ExternalTaskSensor._get_dttm_filter")
@mock.patch("airflow.models.dagbag.DagBag.get_dag")
@mock.patch("os.path.exists")
@mock.patch("airflow.models.dag.DagModel.get_current")
@@ -1218,7 +1218,10 @@ def dag_bag_ext():
task_a_3 >> task_b_3
for dag in [dag_0, dag_1, dag_2, dag_3]:
- dag_bag.bag_dag(dag=dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=dag)
+ else:
+ dag_bag.bag_dag(dag=dag, root_dag=dag)
yield dag_bag
@@ -1267,7 +1270,10 @@ def dag_bag_parent_child():
)
for dag in [dag_0, dag_1]:
- dag_bag.bag_dag(dag=dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=dag)
+ else:
+ dag_bag.bag_dag(dag=dag, root_dag=dag)
yield dag_bag
@@ -1494,7 +1500,10 @@ def _factory(depth: int) -> DagBag:
task_a >> task_b
for dag in dags:
- dag_bag.bag_dag(dag=dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=dag)
+ else:
+ dag_bag.bag_dag(dag=dag, root_dag=dag)
return dag_bag
@@ -1546,8 +1555,13 @@ def dag_bag_multiple():
dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE, schedule="@daily")
agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE, schedule="@daily")
- dag_bag.bag_dag(dag=daily_dag)
- dag_bag.bag_dag(dag=agg_dag)
+
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=daily_dag)
+ dag_bag.bag_dag(dag=agg_dag)
+ else:
+ dag_bag.bag_dag(dag=daily_dag, root_dag=daily_dag)
+ dag_bag.bag_dag(dag=agg_dag, root_dag=agg_dag)
daily_task = EmptyOperator(task_id="daily_tas", dag=daily_dag)
@@ -1618,7 +1632,10 @@ def dag_bag_head_tail():
)
head >> body >> tail
- dag_bag.bag_dag(dag=dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=dag)
+ else:
+ dag_bag.bag_dag(dag=dag, root_dag=dag)
return dag_bag
@@ -1702,7 +1719,10 @@ def dummy_task(x: int):
)
head >> body >> tail
- dag_bag.bag_dag(dag=dag)
+ if AIRFLOW_V_3_0_PLUS:
+ dag_bag.bag_dag(dag=dag)
+ else:
+ dag_bag.bag_dag(dag=dag, root_dag=dag)
return dag_bag
diff --git a/tests/sensors/test_filesystem.py b/providers/tests/standard/sensors/test_filesystem.py
similarity index 98%
rename from tests/sensors/test_filesystem.py
rename to providers/tests/standard/sensors/test_filesystem.py
index 641f2f218f2db..22432e90a92f9 100644
--- a/tests/sensors/test_filesystem.py
+++ b/providers/tests/standard/sensors/test_filesystem.py
@@ -26,8 +26,8 @@
from airflow.exceptions import AirflowSensorTimeout, TaskDeferred
from airflow.models.dag import DAG
-from airflow.sensors.filesystem import FileSensor
-from airflow.triggers.file import FileTrigger
+from airflow.providers.standard.sensors.filesystem import FileSensor
+from airflow.providers.standard.triggers.file import FileTrigger
from airflow.utils.timezone import datetime
pytestmark = pytest.mark.db_test
diff --git a/providers/tests/standard/sensors/test_time.py b/providers/tests/standard/sensors/test_time.py
index a144c3dc41de7..017b410eda4df 100644
--- a/providers/tests/standard/sensors/test_time.py
+++ b/providers/tests/standard/sensors/test_time.py
@@ -26,7 +26,7 @@
from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
-from airflow.triggers.temporal import DateTimeTrigger
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
diff --git a/providers/tests/standard/triggers/__init__.py b/providers/tests/standard/triggers/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/providers/tests/standard/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/triggers/test_external_task.py b/providers/tests/standard/triggers/test_external_task.py
similarity index 93%
rename from tests/triggers/test_external_task.py
rename to providers/tests/standard/triggers/test_external_task.py
index 202eafb0b9e3d..792d7ef7262f6 100644
--- a/tests/triggers/test_external_task.py
+++ b/providers/tests/standard/triggers/test_external_task.py
@@ -24,8 +24,8 @@
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
+from airflow.providers.standard.triggers.external_task import DagStateTrigger, WorkflowTrigger
from airflow.triggers.base import TriggerEvent
-from airflow.triggers.external_task import DagStateTrigger, WorkflowTrigger
from airflow.utils import timezone
from airflow.utils.state import DagRunState
@@ -37,7 +37,7 @@ class TestWorkflowTrigger:
STATES = ["success", "fail"]
@pytest.mark.flaky(reruns=5)
- @mock.patch("airflow.triggers.external_task._get_count")
+ @mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_success(self, mock_get_count):
"""check the db count get called correctly."""
@@ -70,7 +70,7 @@ async def test_task_workflow_trigger_success(self, mock_get_count):
await gen.__anext__()
@pytest.mark.flaky(reruns=5)
- @mock.patch("airflow.triggers.external_task._get_count")
+ @mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_failed(self, mock_get_count):
mock_get_count.side_effect = mocked_get_count
@@ -102,7 +102,7 @@ async def test_task_workflow_trigger_failed(self, mock_get_count):
with pytest.raises(StopAsyncIteration):
await gen.__anext__()
- @mock.patch("airflow.triggers.external_task._get_count")
+ @mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
mock_get_count.return_value = 0
@@ -133,7 +133,7 @@ async def test_task_workflow_trigger_fail_count_eq_0(self, mock_get_count):
await gen.__anext__()
@pytest.mark.flaky(reruns=5)
- @mock.patch("airflow.triggers.external_task._get_count")
+ @mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@pytest.mark.asyncio
async def test_task_workflow_trigger_skipped(self, mock_get_count):
mock_get_count.side_effect = mocked_get_count
@@ -162,7 +162,7 @@ async def test_task_workflow_trigger_skipped(self, mock_get_count):
states=["success", "fail"],
)
- @mock.patch("airflow.triggers.external_task._get_count")
+ @mock.patch("airflow.providers.standard.triggers.external_task._get_count")
@mock.patch("asyncio.sleep")
@pytest.mark.asyncio
async def test_task_workflow_trigger_sleep_success(self, mock_sleep, mock_get_count):
@@ -203,7 +203,7 @@ def test_serialization(self):
poke_interval=5,
)
classpath, kwargs = trigger.serialize()
- assert classpath == "airflow.triggers.external_task.WorkflowTrigger"
+ assert classpath == "airflow.providers.standard.triggers.external_task.WorkflowTrigger"
assert kwargs == {
"external_dag_id": self.DAG_ID,
"execution_dates": [timezone.datetime(2022, 1, 1)],
@@ -271,7 +271,7 @@ def test_serialization(self):
poll_interval=5,
)
classpath, kwargs = trigger.serialize()
- assert classpath == "airflow.triggers.external_task.DagStateTrigger"
+ assert classpath == "airflow.providers.standard.triggers.external_task.DagStateTrigger"
assert kwargs == {
"dag_id": self.DAG_ID,
"states": self.STATES,
diff --git a/tests/triggers/test_file.py b/providers/tests/standard/triggers/test_file.py
similarity index 91%
rename from tests/triggers/test_file.py
rename to providers/tests/standard/triggers/test_file.py
index 6fb25dea3f00c..baf0dffa80d0a 100644
--- a/tests/triggers/test_file.py
+++ b/providers/tests/standard/triggers/test_file.py
@@ -20,7 +20,7 @@
import pytest
-from airflow.triggers.file import FileTrigger
+from airflow.providers.standard.triggers.file import FileTrigger
class TestFileTrigger:
@@ -30,7 +30,7 @@ def test_serialization(self):
"""Asserts that the trigger correctly serializes its arguments and classpath."""
trigger = FileTrigger(filepath=self.FILE_PATH, poll_interval=5)
classpath, kwargs = trigger.serialize()
- assert classpath == "airflow.triggers.file.FileTrigger"
+ assert classpath == "airflow.providers.standard.triggers.file.FileTrigger"
assert kwargs == {
"filepath": self.FILE_PATH,
"poke_interval": 5,
@@ -46,7 +46,7 @@ async def test_task_file_trigger(self, tmp_path):
trigger = FileTrigger(
filepath=str(p.resolve()),
- poll_interval=0.2,
+ poke_interval=0.2,
)
task = asyncio.create_task(trigger.run().__anext__())
diff --git a/tests/triggers/test_temporal.py b/providers/tests/standard/triggers/test_temporal.py
similarity index 92%
rename from tests/triggers/test_temporal.py
rename to providers/tests/standard/triggers/test_temporal.py
index 90f00a694e5b3..f94066699a993 100644
--- a/tests/triggers/test_temporal.py
+++ b/providers/tests/standard/triggers/test_temporal.py
@@ -23,8 +23,8 @@
import pendulum
import pytest
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.triggers.base import TriggerEvent
-from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import utcnow
@@ -56,7 +56,7 @@ def test_datetime_trigger_serialization():
moment = pendulum.instance(datetime.datetime(2020, 4, 1, 13, 0), pendulum.UTC)
trigger = DateTimeTrigger(moment)
classpath, kwargs = trigger.serialize()
- assert classpath == "airflow.triggers.temporal.DateTimeTrigger"
+ assert classpath == "airflow.providers.standard.triggers.temporal.DateTimeTrigger"
assert kwargs == {"moment": moment, "end_from_trigger": False}
@@ -68,7 +68,7 @@ def test_timedelta_trigger_serialization():
trigger = TimeDeltaTrigger(datetime.timedelta(seconds=10))
expected_moment = timezone.utcnow() + datetime.timedelta(seconds=10)
classpath, kwargs = trigger.serialize()
- assert classpath == "airflow.triggers.temporal.DateTimeTrigger"
+ assert classpath == "airflow.providers.standard.triggers.temporal.DateTimeTrigger"
# We need to allow for a little time difference to avoid this test being
# flaky if it runs over the boundary of a single second
assert -2 < (kwargs["moment"] - expected_moment).total_seconds() < 2
@@ -113,8 +113,8 @@ async def test_datetime_trigger_timing(tz, end_from_trigger):
assert result.payload == expected_payload
-@mock.patch("airflow.triggers.temporal.timezone.utcnow")
-@mock.patch("airflow.triggers.temporal.asyncio.sleep")
+@mock.patch("airflow.providers.standard.triggers.temporal.timezone.utcnow")
+@mock.patch("airflow.providers.standard.triggers.temporal.asyncio.sleep")
@pytest.mark.asyncio
async def test_datetime_trigger_mocked(mock_sleep, mock_utcnow):
"""
diff --git a/tests/utils/test_email.py b/providers/tests/standard/utils/test_email.py
similarity index 95%
rename from tests/utils/test_email.py
rename to providers/tests/standard/utils/test_email.py
index 625f0f4b1a5da..4a0013f58362c 100644
--- a/tests/utils/test_email.py
+++ b/providers/tests/standard/utils/test_email.py
@@ -28,8 +28,8 @@
import pytest
from airflow.configuration import conf
-from airflow.utils import email
+from tests_common.test_utils.compat import email
from tests_common.test_utils.config import conf_vars
EMAILS = ["test1@example.com", "test2@example.com"]
@@ -81,17 +81,17 @@ def test_get_email_address_invalid_type_in_iterable(self):
with pytest.raises(TypeError):
email.get_email_address_list(emails_list)
- @mock.patch("airflow.utils.email.send_email")
+ @mock.patch("airflow.providers.standard.utils.email.send_email")
def test_default_backend(self, mock_send_email):
res = email.send_email("to", "subject", "content")
mock_send_email.assert_called_once_with("to", "subject", "content")
assert mock_send_email.return_value == res
- @mock.patch("airflow.utils.email.send_email_smtp")
+ @mock.patch("airflow.providers.standard.utils.email.send_email_smtp")
def test_custom_backend(self, mock_send_email):
with conf_vars(
{
- ("email", "email_backend"): "tests.utils.test_email.send_email_test",
+ ("email", "email_backend"): "providers.tests.standard.utils.test_email.send_email_test",
("email", "email_conn_id"): "smtp_default",
}
):
@@ -112,10 +112,10 @@ def test_custom_backend(self, mock_send_email):
)
assert not mock_send_email.called
- @mock.patch("airflow.utils.email.send_email_smtp")
+ @mock.patch("airflow.providers.standard.utils.email.send_email_smtp")
@conf_vars(
{
- ("email", "email_backend"): "tests.utils.test_email.send_email_test",
+ ("email", "email_backend"): "providers.tests.standard.utils.test_email.send_email_test",
("email", "email_conn_id"): "smtp_default",
("email", "from_email"): "from@test.com",
}
@@ -158,7 +158,7 @@ def setup_test_cases(self, monkeypatch):
json.dumps({"conn_type": "smtp", "login": "user", "password": "p@$$word"}),
)
- @mock.patch("airflow.utils.email.send_mime_email")
+ @mock.patch("airflow.providers.standard.utils.email.send_mime_email")
def test_send_smtp(self, mock_send_mime, tmp_path):
path = tmp_path / "testfile"
path.write_text("attachment")
@@ -176,7 +176,7 @@ def test_send_smtp(self, mock_send_mime, tmp_path):
mimeapp = MIMEApplication("attachment")
assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload()
- @mock.patch("airflow.utils.email.send_mime_email")
+ @mock.patch("airflow.providers.standard.utils.email.send_mime_email")
def test_send_smtp_with_multibyte_content(self, mock_send_mime):
email.send_email_smtp("to", "subject", "🔥", mime_charset="utf-8")
assert mock_send_mime.called
@@ -185,7 +185,7 @@ def test_send_smtp_with_multibyte_content(self, mock_send_mime):
mimetext = MIMEText("🔥", "mixed", "utf-8")
assert mimetext.get_payload() == msg.get_payload()[0].get_payload()
- @mock.patch("airflow.utils.email.send_mime_email")
+ @mock.patch("airflow.providers.standard.utils.email.send_mime_email")
def test_send_bcc_smtp(self, mock_send_mime, tmp_path):
path = tmp_path / "testfile"
path.write_text("attachment")
diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py
index dae7733ec5c15..404f5ab6b890e 100644
--- a/scripts/cov/other_coverage.py
+++ b/scripts/cov/other_coverage.py
@@ -58,8 +58,8 @@
"airflow/dag_processing/manager.py",
"airflow/dag_processing/processor.py",
"airflow/triggers/base.py",
- "airflow/triggers/external_task.py",
- "airflow/triggers/file.py",
+ "providers/src/airflow/providers/standard/triggers/external_task.py",
+ "providers/src/airflow/providers/standard/triggers/file.py",
"airflow/triggers/testing.py",
]
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 7acc3a85b9345..c210577fae276 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -40,8 +40,8 @@
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import _run_inline_trigger
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.triggers.base import TriggerEvent
-from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index 1de3f7b68d394..9e3ead771775a 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -397,10 +397,10 @@ def test_cli_test_with_env_vars(self):
assert "foo=bar" in output
assert "AIRFLOW_TEST_MODE=True" in output
- @mock.patch("airflow.triggers.file.os.path.getmtime", return_value=0)
- @mock.patch("airflow.triggers.file.glob", return_value=["/tmp/test"])
- @mock.patch("airflow.triggers.file.os.path.isfile", return_value=True)
- @mock.patch("airflow.sensors.filesystem.FileSensor.poke", return_value=False)
+ @mock.patch("airflow.providers.standard.triggers.file.os.path.getmtime", return_value=0)
+ @mock.patch("airflow.providers.standard.triggers.file.glob", return_value=["/tmp/test"])
+ @mock.patch("airflow.providers.standard.triggers.file.os.path.isfile", return_value=True)
+ @mock.patch("airflow.providers.standard.sensors.filesystem.FileSensor.poke", return_value=False)
def test_cli_test_with_deferrable_operator(
self, mock_pock, mock_is_file, mock_glob, mock_getmtime, caplog
):
diff --git a/tests/dags/test_external_task_sensor_check_existense.py b/tests/dags/test_external_task_sensor_check_existense.py
index 9de992c073b91..656f3760a248c 100644
--- a/tests/dags/test_external_task_sensor_check_existense.py
+++ b/tests/dags/test_external_task_sensor_check_existense.py
@@ -19,7 +19,7 @@
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
+from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from tests.models import DEFAULT_DATE
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 939c210dfe75f..6d3d26dcb8dd7 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -37,8 +37,8 @@
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.triggers.base import TriggerEvent
-from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.triggers.testing import FailureTrigger, SuccessTrigger
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import RedirectStdHandler
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 0111cc669cd02..ddcffe8afe24f 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1505,7 +1505,7 @@ def test_deps_sorted(self):
Tests serialize_operator, make sure the deps is in order
"""
from airflow.operators.empty import EmptyOperator
- from airflow.sensors.external_task import ExternalTaskSensor
+ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
execution_date = datetime(2020, 1, 1)
with DAG(dag_id="test_deps_sorted", schedule=None, start_date=execution_date) as dag:
@@ -1620,7 +1620,7 @@ def test_derived_dag_deps_sensor(self):
Tests DAG dependency detection for sensors, including derived classes
"""
from airflow.operators.empty import EmptyOperator
- from airflow.sensors.external_task import ExternalTaskSensor
+ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
class DerivedSensor(ExternalTaskSensor):
pass
@@ -1651,7 +1651,7 @@ def test_dag_deps_assets_with_duplicate_asset(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with duplicate assets.
"""
- from airflow.sensors.external_task import ExternalTaskSensor
+ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
d1 = Asset("d1")
d2 = Asset("d2")
@@ -1740,7 +1740,7 @@ def test_dag_deps_assets(self):
"""
Check that dag_dependencies node is populated correctly for a DAG with assets.
"""
- from airflow.sensors.external_task import ExternalTaskSensor
+ from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
d1 = Asset("d1")
d2 = Asset("d2")
@@ -1805,7 +1805,7 @@ def test_derived_dag_deps_operator(self):
Tests DAG dependency detection for operators, including derived classes
"""
from airflow.operators.empty import EmptyOperator
- from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
class DerivedOperator(TriggerDagRunOperator):
pass
@@ -2198,7 +2198,7 @@ def test_start_trigger_args_in_serialized_dag(self):
class TestOperator(BaseOperator):
start_trigger_args = StartTriggerArgs(
- trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+ trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"delta": timedelta(seconds=1)},
next_method="execute_complete",
next_kwargs=None,
@@ -2241,7 +2241,7 @@ def execute_complete(self):
assert tasks[0]["__var"]["start_trigger_args"] == {
"__type": "START_TRIGGER_ARGS",
- "trigger_cls": "airflow.triggers.temporal.TimeDeltaTrigger",
+ "trigger_cls": "airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
# "trigger_kwargs": {"__type": "dict", "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}},
"trigger_kwargs": {"__type": "dict", "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}},
"next_method": "execute_complete",
diff --git a/tests/system/core/example_external_task_parent_deferrable.py b/tests/system/core/example_external_task_parent_deferrable.py
index 62bb1afc18b2e..ff003eee12a48 100644
--- a/tests/system/core/example_external_task_parent_deferrable.py
+++ b/tests/system/core/example_external_task_parent_deferrable.py
@@ -18,8 +18,8 @@
from airflow import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
-from airflow.sensors.external_task import ExternalTaskSensor
+from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from airflow.utils.timezone import datetime
with DAG(
diff --git a/tests_common/test_utils/compat.py b/tests_common/test_utils/compat.py
index 6f788c30dcca2..09be782de8f5f 100644
--- a/tests_common/test_utils/compat.py
+++ b/tests_common/test_utils/compat.py
@@ -56,6 +56,7 @@
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.date_time import DateTimeSensor
+ from airflow.providers.standard.utils import email
from airflow.providers.standard.utils.python_virtualenv import write_python_script
except ImportError:
# Compatibility for Airflow < 2.10.*
@@ -64,6 +65,7 @@
from airflow.operators.python import PythonOperator # type: ignore[no-redef,attr-defined]
from airflow.sensors.bash import BashSensor # type: ignore[no-redef,attr-defined]
from airflow.sensors.date_time import DateTimeSensor # type: ignore[no-redef,attr-defined]
+ from airflow.utils import email # type: ignore[no-redef,attr-defined]
from airflow.utils.python_virtualenv import write_python_script # type: ignore[no-redef,attr-defined]