Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,10 @@ repos:
files: >
(?x)
^providers/src/airflow/providers/.*\.py$
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py
exclude: |
(?x)
^.*/.*_vendor/|
^providers/src/airflow/providers/standard/.*$
- id: check-get-lineage-collector-providers
language: python
name: Check providers import hook lineage code from compat
Expand Down
4 changes: 2 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ email:
version_added: ~
type: string
example: ~
default: "airflow.utils.email.send_email_smtp"
default: "airflow.providers.standard.utils.email.send_email_smtp"
email_conn_id:
description: Email connection to use
version_added: 2.1.0
Expand Down Expand Up @@ -2126,7 +2126,7 @@ email:
smtp:
description: |
If you want airflow to send emails on retries, failure, and you want to use
the airflow.utils.email.send_email_smtp function, you have to configure an
the airflow.providers.standard.utils.email.send_email_smtp function, you have to configure an
smtp server here
options:
smtp_host:
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dag_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.providers.standard.operators.email import EmailOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_external_task_marker_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = pendulum.datetime(2021, 1, 1, tz="UTC")

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator

with DAG(
dag_id="latest_only",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.sensors.filesystem import FileSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weekday import WeekDay

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import pendulum

from airflow.models.dag import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
dag_id="example_trigger_controller_dag",
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ def _get_task_instances(

if include_dependent_dags:
# Recursively find external tasks indicated by ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker

query = tis
if as_pk_tuple:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import LazyXComSelectSequence, XCom
from airflow.plugins_manager import integrate_macros_plugins
from airflow.providers.standard.utils.email import send_email
from airflow.sentry import Sentry
from airflow.settings import task_instance_mutation_hook
from airflow.stats import Stats
Expand All @@ -118,7 +119,6 @@
context_get_outlet_events,
context_merge,
)
from airflow.utils.email import send_email
from airflow.utils.helpers import prune_dict, render_template_to_string
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down
10 changes: 5 additions & 5 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@
log = logging.getLogger(__name__)

_OPERATOR_EXTRA_LINKS: set[str] = {
"airflow.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.sensors.external_task.ExternalDagLink",
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.providers.standard.sensors.external_task.ExternalDagLink",
# Deprecated names, so that existing serialized dags load straight away.
"airflow.sensors.external_task.ExternalTaskSensorLink",
"airflow.providers.standard.sensors.external_task.ExternalTaskSensorLink",
"airflow.operators.dagrun_operator.TriggerDagRunLink",
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
}
Expand Down Expand Up @@ -1015,8 +1015,8 @@ class DependencyDetector:
@staticmethod
def detect_task_dependencies(task: Operator) -> list[DagDependency]:
"""Detect dependencies caused by tasks."""
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor

deps = []
if isinstance(task, TriggerDagRunOperator):
Expand Down
12 changes: 6 additions & 6 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ When writing a deferrable operators these are the main points to consider:

from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


Expand Down Expand Up @@ -122,7 +122,7 @@ This example shows the structure of a basic trigger, a very simplified version o
self.moment = moment

def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment})

async def run(self):
while self.moment > timezone.utcnow():
Expand Down Expand Up @@ -177,7 +177,7 @@ Here's a basic example of how a sensor might trigger deferral:
from typing import TYPE_CHECKING, Any

from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -237,7 +237,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``

class WaitOneHourSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down Expand Up @@ -268,7 +268,7 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``

class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down Expand Up @@ -307,7 +307,7 @@ After the trigger has finished executing, the task may be sent back to the worke

class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ The ``@task.branch`` can also be used with XComs allowing branching context to d

start_op >> branch_op >> [continue_op, stop_op]

If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.
If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.providers.standard.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``.

.. note::
The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.providers.standard.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator.
Expand Down Expand Up @@ -765,7 +765,7 @@ While dependencies between tasks in a DAG are explicitly defined through upstrea
relationships, dependencies between DAGs are a bit more complex. In general, there are two ways
in which one DAG can depend on another:

- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator`
- triggering - :class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`

Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Airflow has a very extensive set of operators available, with some built-in to t

- :class:`~airflow.providers.standard.operators.bash.BashOperator` - executes a bash command
- :class:`~airflow.providers.standard.operators.python.PythonOperator` - calls an arbitrary Python function
- :class:`~airflow.operators.email.EmailOperator` - sends an email
- :class:`~airflow.providers.standard.operators.email.EmailOperator` - sends an email
- Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments.

.. note::
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ If you write most of your DAGs using plain Python code rather than Operators, th
TaskFlow takes care of moving inputs and outputs between your Tasks using XComs for you, as well as automatically calculating dependencies - when you call a TaskFlow function in your DAG file, rather than executing it, you will get an object representing the XCom for the result (an ``XComArg``), that you can then use as inputs to downstream tasks or operators. For example::

from airflow.decorators import task
from airflow.operators.email import EmailOperator
from airflow.providers.standard.operators.email import EmailOperator

@task
def get_ip():
Expand Down
6 changes: 3 additions & 3 deletions docs/apache-airflow/howto/email-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ in the ``[email]`` section.
.. code-block:: ini

[email]
email_backend = airflow.utils.email.send_email_smtp
email_backend = airflow.providers.standard.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file

Equivalent environment variables look like:

.. code-block:: sh

AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp
AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.standard.utils.email.send_email_smtp
AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file
AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file

Expand Down Expand Up @@ -63,7 +63,7 @@ the example below.
.. code-block:: bash

$ airflow config get-value email email_backend
airflow.utils.email.send_email_smtp
airflow.providers.standard.utils.email.send_email_smtp

To access the task's information you use `Jinja Templating <http://jinja.pocoo.org/docs/dev/>`_ in your template files.

Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/howto/operator/external_task_sensor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ DAGs.
ExternalTaskSensor
^^^^^^^^^^^^^^^^^^

Use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
Use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another task on a different DAG for a specific ``execution_date``.

ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed
Expand All @@ -64,7 +64,7 @@ Also for this action you can use sensor in the deferrable mode:

ExternalTaskSensor with task_group dependency
---------------------------------------------
In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
In Addition, we can also use the :class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
wait for another ``task_group`` on a different DAG for a specific ``execution_date``.

.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/howto/operator/file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
FileSensor
==========

Use the :class:`~airflow.sensors.filesystem.FileSensor` to detect files appearing in your local
Use the :class:`~airflow.providers.standard.sensors.filesystem.FileSensor` to detect files appearing in your local
filesystem. You need to have connection defined to use it (pass connection id via ``fs_conn_id``).
Default connection is ``fs_default``.

Expand Down
12 changes: 6 additions & 6 deletions docs/apache-airflow/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,25 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.providers.standard.operators.bash`
- :doc:`How to use <howto/operator/bash>`

* - :mod:`airflow.operators.branch`
* - :mod:`airflow.providers.standard.operators.branch`
-

* - :mod:`airflow.operators.empty`
-

* - :mod:`airflow.operators.email`
* - :mod:`airflow.providers.standard.operators.email`
-

* - :mod:`airflow.operators.generic_transfer`
-

* - :mod:`airflow.operators.latest_only`
* - :mod:`airflow.providers.standard.operators.latest_only`
-

* - :mod:`airflow.providers.standard.operators.python`
- :doc:`How to use <howto/operator/python>`

* - :mod:`airflow.operators.trigger_dagrun`
* - :mod:`airflow.providers.standard.operators.trigger_dagrun`
-

**Sensors:**
Expand All @@ -85,10 +85,10 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.providers.standard.sensors.bash`
- :ref:`How to use <howto/operator:BashSensor>`

* - :mod:`airflow.sensors.external_task`
* - :mod:`airflow.providers.standard.sensors.external_task`
- :doc:`How to use <howto/operator/external_task_sensor>`

* - :mod:`airflow.sensors.filesystem`
* - :mod:`airflow.providers.standard.sensors.filesystem`
- :ref:`How to use <howto/operator:FileSensor>`

* - :mod:`airflow.providers.standard.sensors.python`
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/tutorial/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ Adding dependencies between decorated and traditional tasks
-----------------------------------------------------------
The above tutorial shows how to create dependencies between TaskFlow functions. However, dependencies can also
be set between traditional tasks (such as :class:`~airflow.providers.standard.operators.bash.BashOperator`
or :class:`~airflow.sensors.filesystem.FileSensor`) and TaskFlow functions.
or :class:`~airflow.providers.standard.sensors.filesystem.FileSensor`) and TaskFlow functions.

Building this dependency is shown in the code below:

Expand Down
5 changes: 4 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@
"plugins": [],
"cross-providers-deps": [
"amazon",
"common.compat",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this change here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using compat provider for imports to work with old core location. so here in this case msgraph uses timedelta, https://github.com/apache/airflow/pull/43571/files/a750af6fec2a04ba5a347303fb4397a4e496d56e#diff-af3b61e0f2e76a9841ce87162351100deee4bc736a90d4cb8715acd9fb61473cR23

pre-commit generated this with common.compat as cross provider

"google",
"oracle",
"sftp"
Expand Down Expand Up @@ -1180,7 +1181,9 @@
],
"devel-deps": [],
"plugins": [],
"cross-providers-deps": [],
"cross-providers-deps": [
"common.compat"
],
"excluded-python-versions": [],
"state": "ready"
},
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/amazon/aws/hooks/ses.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Any, Iterable

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.utils.email import build_mime_message
from airflow.providers.common.compat.standard.utils import build_mime_message


class SesHook(AwsBaseHook):
Expand Down
31 changes: 31 additions & 0 deletions providers/src/airflow/providers/common/compat/standard/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
else:
try:
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
except ModuleNotFoundError:
from airflow.triggers.temporal import TimeDeltaTrigger


__all__ = ["TimeDeltaTrigger"]
Loading