Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

closes: #44361

Why?

DAG Params are needed to provide runtime information to tasks: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html. They are generally useful when you want to provide a per DAG Run level configuration for a dag run, lets say some dynamic information.

These work with legacy Airflow 2 and should be ported over to task sdk.

  • The dag params classes from models have been ported over to the task sdk under definitions along with changing tests, references, documentation (atleast most of it), and general refs.
  • The existing tests continue to run

Testing

Basic Testing

DAG used for testing:

from airflow import DAG
from airflow.decorators import task
from airflow.sdk import Param

with DAG(
    "param_dag",
    params={
        "x": Param(5, type="integer", minimum=3),
        "my_int_param": 6
    },
) as dag:

    @task
    def example_task():
        # This will print the default value, 6:
        print(dag.params['my_int_param'])


    example_task()

image

Params Trigger UI from example dags

from __future__ import annotations

import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.utils.trigger_rule import TriggerRule

# [START params_trigger]
with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)
# [END params_trigger]
image image image

The failures above are because mappedtasks dont work yet

Params UI tutorial DAG demonstrating various options for a trigger form generated by DAG params

Run config:

{
    "a_simple_list": [
        "one",
        "two",
        "three",
        "actually one value is made per line"
    ],
    "array_of_numbers": [
        1,
        2,
        3
    ],
    "array_of_objects": [
        {
            "country": "country_name",
            "name": "account_name"
        }
    ],
    "bool": true,
    "checked_number": 100,
    "checked_text": "length-checked-field",
    "date": "2025-01-27",
    "date_time": "2025-01-27T12:17:00+00:00",
    "flag": false,
    "hidden_secret_field": "constant value",
    "most_loved_number": 42,
    "multi_select": [
        "two",
        "three"
    ],
    "multi_select_with_label": [
        "2",
        "3"
    ],
    "multiline_text": "A multiline text Param\nthat will keep the newline\ncharacters in its value.",
    "object": {
        "key": "value"
    },
    "optional_field": "optional text, you can trigger also w/o text",
    "pick_one": "value 42",
    "pick_with_label": 3,
    "proposals": "Alpha",
    "required_field": "dummy dummy",
    "text": "Hello World!",
    "time": "12:13:14",
    "x": 3
}
image

Logs:

beef66d4770b
 ▶ Log message source details
{"logger":"airflow.dag_processing.bundles.manager.DagBundlesManager","timestamp":"2025-01-27T15:26:19.910440","event":"DAG bundles loaded: dags-folder, example_dags","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.910732","event":"Filling up the DagBag from /opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.911277","event":"Importing /opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","level":"debug"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.946419","event":"Loaded DAG <DAG: example_params_ui_tutorial>","level":"debug"}
{"file":"/opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","timestamp":"2025-01-27T15:26:19.946663","logger":"task","event":"DAG file parsed","level":"debug"}
{"json":"{\"rendered_fields\":{\"templates_dict\":null,\"op_args\":\"()\",\"op_kwargs\":{}},\"type\":\"SetRenderedFields\"}\n","timestamp":"2025-01-27T15:26:19.946922","logger":"task","event":"Sending request","level":"debug"}
{"logger":"airflow.sdk.definitions.param","timestamp":"2025-01-27T15:26:19.956861","event":"Updating task params ({'x': 3, 'text': 'Hello World!', 'flag': False, 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'most_loved_number': 42, 'pick_one': 'value 42', 'pick_with_label': 3, 'proposals': 'Alpha', 'multi_select': ['two', 'three'], 'multi_select_with_label': ['2', '3'], 'array_of_numbers': [1, 2, 3], 'bool': True, 'date_time': '2025-01-27T12:17:00+00:00', 'date': '2025-01-27', 'time': '12:13:14', 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'required_field': None, 'optional_field': 'optional text, you can trigger also w/o text', 'checked_text': 'length-checked-field', 'checked_number': 100, 'object': {'key': 'value'}, 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'hidden_secret_field': 'constant value'}) with DagRun.conf ({'x': 3, 'bool': True, 'date': '2025-01-27', 'flag': False, 'text': 'Hello World!', 'time': '12:13:14', 'object': {'key': 'value'}, 'pick_one': 'value 42', 'date_time': '2025-01-27T12:17:00+00:00', 'proposals': 'Alpha', 'checked_text': 'length-checked-field', 'multi_select': ['two', 'three'], 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'checked_number': 100, 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'optional_field': 'optional text, you can trigger also w/o text', 'required_field': 'dummy dummy', 'pick_with_label': 3, 'array_of_numbers': [1, 2, 3], 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'most_loved_number': 42, 'hidden_secret_field': 'constant value', 'multi_select_with_label': ['2', '3']})","level":"debug"}
{"chan":"stdout","event":"Validates params aere {'x': 3, 'text': 'Hello World!', 'flag': False, 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'most_loved_number': 42, 'pick_one': 'value 42', 'pick_with_label': 3, 'proposals': 'Alpha', 'multi_select': ['two', 'three'], 'multi_select_with_label': ['2', '3'], 'array_of_numbers': [1, 2, 3], 'bool': True, 'date_time': '2025-01-27T12:17:00+00:00', 'date': '2025-01-27', 'time': '12:13:14', 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'required_field': 'dummy dummy', 'optional_field': 'optional text, you can trigger also w/o text', 'checked_text': 'length-checked-field', 'checked_number': 100, 'object': {'key': 'value'}, 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'hidden_secret_field': 'constant value'}","timestamp":"2025-01-27T15:26:19.967251Z","level":"info","logger":"task"}
{"chan":"stdout","event":"This DAG was triggered with the following parameters:","timestamp":"2025-01-27T15:26:19.971408Z","level":"info","logger":"task"}
{"chan":"stdout","event":"","timestamp":"2025-01-27T15:26:19.971505Z","level":"info","logger":"task"}
{"chan":"stdout","event":"{","timestamp":"2025-01-27T15:26:19.971657Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"x\": 3,","timestamp":"2025-01-27T15:26:19.971733Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"text\": \"Hello World!\",","timestamp":"2025-01-27T15:26:19.971813Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"flag\": false,","timestamp":"2025-01-27T15:26:19.971885Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"a_simple_list\": [","timestamp":"2025-01-27T15:26:19.971956Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"one\",","timestamp":"2025-01-27T15:26:19.972028Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"two\",","timestamp":"2025-01-27T15:26:19.972097Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"three\",","timestamp":"2025-01-27T15:26:19.972166Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"actually one value is made per line\"","timestamp":"2025-01-27T15:26:19.972238Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.972313Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"most_loved_number\": 42,","timestamp":"2025-01-27T15:26:19.972383Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"pick_one\": \"value 42\",","timestamp":"2025-01-27T15:26:19.972453Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"pick_with_label\": 3,","timestamp":"2025-01-27T15:26:19.972524Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"proposals\": \"Alpha\",","timestamp":"2025-01-27T15:26:19.972593Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multi_select\": [","timestamp":"2025-01-27T15:26:19.972663Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"two\",","timestamp":"2025-01-27T15:26:19.972734Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"three\"","timestamp":"2025-01-27T15:26:19.972806Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.972875Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multi_select_with_label\": [","timestamp":"2025-01-27T15:26:19.972948Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"2\",","timestamp":"2025-01-27T15:26:19.973017Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"3\"","timestamp":"2025-01-27T15:26:19.973086Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.973165Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"array_of_numbers\": [","timestamp":"2025-01-27T15:26:19.973251Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        1,","timestamp":"2025-01-27T15:26:19.973324Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        2,","timestamp":"2025-01-27T15:26:19.973396Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        3","timestamp":"2025-01-27T15:26:19.973464Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.973534Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"bool\": true,","timestamp":"2025-01-27T15:26:19.973603Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"date_time\": \"2025-01-27T12:17:00+00:00\",","timestamp":"2025-01-27T15:26:19.973671Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"date\": \"2025-01-27\",","timestamp":"2025-01-27T15:26:19.973740Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"time\": \"12:13:14\",","timestamp":"2025-01-27T15:26:19.973807Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multiline_text\": \"A multiline text Param\\nthat will keep the newline\\ncharacters in its value.\",","timestamp":"2025-01-27T15:26:19.973876Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"required_field\": \"dummy dummy\",","timestamp":"2025-01-27T15:26:19.973946Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"optional_field\": \"optional text, you can trigger also w/o text\",","timestamp":"2025-01-27T15:26:19.974077Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"checked_text\": \"length-checked-field\",","timestamp":"2025-01-27T15:26:19.974145Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"checked_number\": 100,","timestamp":"2025-01-27T15:26:19.974212Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"object\": {","timestamp":"2025-01-27T15:26:19.974278Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"key\": \"value\"","timestamp":"2025-01-27T15:26:19.974350Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    },","timestamp":"2025-01-27T15:26:19.974411Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"array_of_objects\": [","timestamp":"2025-01-27T15:26:19.974478Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        {","timestamp":"2025-01-27T15:26:19.974559Z","level":"info","logger":"task"}
{"chan":"stdout","event":"            \"name\": \"account_name\",","timestamp":"2025-01-27T15:26:19.974625Z","level":"info","logger":"task"}
{"chan":"stdout","event":"            \"country\": \"country_name\"","timestamp":"2025-01-27T15:26:19.974692Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        }","timestamp":"2025-01-27T15:26:19.974777Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.974844Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"hidden_secret_field\": \"constant value\"","timestamp":"2025-01-27T15:26:19.974911Z","level":"info","logger":"task"}
{"chan":"stdout","event":"}","timestamp":"2025-01-27T15:26:19.974980Z","level":"info","logger":"task"}
{"chan":"stdout","event":"","timestamp":"2025-01-27T15:26:19.975050Z","level":"info","logger":"task"}
{"logger":"airflow.task.operators.airflow.decorators.python._PythonDecoratedOperator","timestamp":"2025-01-27T15:26:19.971434","event":"Done. Returned value was: None","level":"info"}
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-27T15:26:19.971547Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-27T15:26:19.971629","logger":"task","event":"Sending request","level":"debug"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you missed an import in the tests:

task_sdk/tests/definitions/test_mappedoperator.py:25: in <module>
    from airflow.models.param import ParamsDict
E   ModuleNotFoundError: No module named 'airflow.models.param'

Which also probably means that we should keep the old module around, but just have it import+"reexport" it from the Task SDK

@amoghrajesh
Copy link
Contributor Author

Looks like you missed an import in the tests:

task_sdk/tests/definitions/test_mappedoperator.py:25: in <module>
    from airflow.models.param import ParamsDict
E   ModuleNotFoundError: No module named 'airflow.models.param'

Which also probably means that we should keep the old module around, but just have it import+"reexport" it from the Task SDK

Yeah, doesn't sound like a bad idea. Let me try that

@amoghrajesh amoghrajesh requested a review from ashb January 29, 2025 07:36
@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Jan 29, 2025

@ashb moved some more missing references and did some work on moving tests too. Can you take a look when you have some time?

@amoghrajesh amoghrajesh requested a review from ashb January 29, 2025 15:23
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good.

A few nits and I think we need to move some more of the tests over too

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-approving for once we've got the tests moved over.

@amoghrajesh
Copy link
Contributor Author

Phew, just managed to add the tests, instead of relying on XCOMs, wrote the CustomOperator in such a way that we assert expected vs actual params per test case!

@amoghrajesh
Copy link
Contributor Author

Very interesting pattern really:

    def test_dag_param_resolves_from_task(self, create_runtime_ti, mock_supervisor_comms, time_machine):
        """Test dagparam resolves on operator execution"""
        instant = timezone.datetime(2024, 12, 3, 10, 0)
        time_machine.move_to(instant, tick=False)

        dag = DAG(dag_id="dag_with_dag_params", start_date=timezone.datetime(2024, 12, 3))
        dag.param("value", default="NOTSET")

        class CustomOperator(BaseOperator):
            def execute(self, context):
                # important to use self.dag here
                assert self.dag.params["value"] == "NOTSET"

        task = CustomOperator(task_id="task_with_dag_params")
        runtime_ti = create_runtime_ti(task=task, dag_id="dag_with_dag_params")

        run(runtime_ti, log=mock.MagicMock())

        mock_supervisor_comms.send_request.assert_called_once_with(
            msg=SucceedTask(
                state=TerminalTIState.SUCCESS, end_date=instant, task_outlets=[], outlet_events=[]
            ),
            log=mock.ANY,
        )

At runtime, we parse the ti which will trigger reparsing our dag as per the fixtures and will add params to it if needed. At runtime, self.dag.params should have the required info for a task :)

@amoghrajesh amoghrajesh merged commit 03c4966 into apache:main Jan 31, 2025
63 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-dag-params branch January 31, 2025 06:32
Prab-27 pushed a commit to Prab-27/airflow that referenced this pull request Jan 31, 2025
niklasr22 pushed a commit to niklasr22/airflow that referenced this pull request Feb 8, 2025
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Move DAG Params to Task SDK

2 participants