Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Apr 2, 2025

related: #45436

Why?

Since the move of operators / hooks to standard provider, we should ideally also be moving the decorators to the standard provider or where they truly belong. This will permit those being out of core and allow for a more flexible release and bug fix cadence.

What have been moved so far

  1. bash
  2. branch_external_python
  3. branch_python
  4. branch_virtualenv
  5. external_python
  6. python
  7. python_virtualenv

What i am not so sure if it needs moving to standard:

  1. condition
  2. sensor
  3. setup_teardown
  4. short_circuit
  5. task_group

Points to note

  1. I have still imported the python_task inside airflow-core/src/airflow/decorators/__init__.py to be able to continue allowing it to be the said default, do comment if you know of a better way :)
  2. The init.pyi file has also been updated

Testing:

Created a massive dag with all these decorators in it

import sys
import tempfile

from airflow.decorators import dag, task
from airflow.providers.standard.operators.empty import EmptyOperator


@dag()
def all_decorators():
    # python
    @task
    def python():
        return "Hello!"

    python()

    # bash
    @task.bash
    def bash() -> str:
        return "echo https://airflow.apache.org/"

    bash()

    # external-python
    @task.external_python(python=sys.executable)
    def external_python():
        print("doing something in external Python")

    external_python()

    empty_task_1 = EmptyOperator(task_id="empty_task_1")
    empty_task_2 = EmptyOperator(task_id="empty_task_2")

    # branch
    @task.branch()
    def branch_task(**kwargs) -> str:
        """
        Determine which empty_task should be run based on if the logical date minute is even or odd.

        :param dict kwargs: Context
        :return: Id of the task to run
        """
        return "empty_task_2"
    cond = branch_task()
    cond >> [empty_task_1, empty_task_2]

    venv_a = EmptyOperator(task_id="venv_a")
    venv_b = EmptyOperator(task_id="venv_b")
    venv_c = EmptyOperator(task_id="venv_c")
    venv_d = EmptyOperator(task_id="venv_d")

    # branch virtual env
    @task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=tempfile.gettempdir())
    def branching_virtualenv(choices) -> str:
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

    options = ["a", "b", "c", "d"]
    branching_virtualenv(choices=options)

    # external_python
    @task.external_python(python=sys.executable)
    def external_python():
        print("doing something in external Python")

    external_python()

    @task.virtualenv(
        requirements=["numpy~=1.26.0"], venv_cache_path=tempfile.gettempdir()
    )
    def virtualenv():
        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")

    virtualenv()

all_decorators()

Works as expected:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move tests too airflow-core/tests/unit/decorators

@amoghrajesh
Copy link
Contributor Author

Yeah i am looking into it

@amoghrajesh amoghrajesh changed the title House most core decorators inside standard providers Migrate standard decorators to standard provider Apr 3, 2025
@amoghrajesh
Copy link
Contributor Author

Added more tasks in the task to test for shortcircuit and sensor too.

DAG:

import sys
import tempfile

from airflow.decorators import dag, task
from airflow.providers.standard.operators.empty import EmptyOperator


@dag()
def all_decorators():
    # python
    @task
    def python():
        return "Hello!"

    python()

    # bash
    @task.bash
    def bash() -> str:
        return "echo https://airflow.apache.org/"

    bash()

    # external-python
    @task.external_python(python=sys.executable)
    def external_python():
        print("doing something in external Python")

    external_python()

    empty_task_1 = EmptyOperator(task_id="empty_task_1")
    empty_task_2 = EmptyOperator(task_id="empty_task_2")

    # branch
    @task.branch()
    def branch_task(**kwargs) -> str:
        """
        Determine which empty_task should be run based on if the logical date minute is even or odd.

        :param dict kwargs: Context
        :return: Id of the task to run
        """
        return "empty_task_2"
    cond = branch_task()
    cond >> [empty_task_1, empty_task_2]

    venv_a = EmptyOperator(task_id="venv_a")
    venv_b = EmptyOperator(task_id="venv_b")
    venv_c = EmptyOperator(task_id="venv_c")
    venv_d = EmptyOperator(task_id="venv_d")

    # branch virtual env
    @task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=tempfile.gettempdir())
    def branching_virtualenv(choices) -> str:
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

    options = ["a", "b", "c", "d"]
    branching_virtualenv(choices=options)

    # external_python
    @task.external_python(python=sys.executable)
    def external_python():
        print("doing something in external Python")

    external_python()

    @task.short_circuit()
    def short_circuit():
        return True

    @task.virtualenv(
        requirements=["numpy~=1.26.0"], venv_cache_path=tempfile.gettempdir()
    )
    def virtualenv():
        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")

    short_circuit() >> virtualenv()

all_decorators()

all_decorators-graph

@amoghrajesh amoghrajesh force-pushed the move-decorators-to-standard-provider branch from 564d810 to 42542b5 Compare April 3, 2025 14:14
@amoghrajesh
Copy link
Contributor Author

Just squashed it all together. Hopefully the compat and mypy have been fixed now! 🤞🏽

Comment on lines +46 to +49
@pytest.mark.skipif(
not AIRFLOW_V_3_0_PLUS,
reason="Decorators were part of core not providers, so this test doesnt make sense for < AF3.",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to highlight this for others.

The reason we did this, is that On Airflow 2.x, task.python and all the other previous "core" operators were hard-coded in https://github.com/apache/airflow/blob/2.10.5/airflow/decorators/__init__.py#L61-L71

Which means that it is impossible for users to use these decorators on Airflow 2, meaning we should skip these tests in the compat tests, as the decorators specifically won't ever be using the one from the standard provider, even if installed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving this comment, but yes thats right

@amoghrajesh amoghrajesh force-pushed the move-decorators-to-standard-provider branch from be10686 to be7502e Compare April 4, 2025 16:00
@ashb ashb merged commit f01e5e2 into apache:main Apr 4, 2025
62 checks passed
@ashb ashb deleted the move-decorators-to-standard-provider branch April 4, 2025 18:49
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
diogotrodrigues pushed a commit to diogotrodrigues/airflow that referenced this pull request Apr 6, 2025
simonprydden pushed a commit to simonprydden/airflow that referenced this pull request Apr 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants