Skip to content

More flexible dynamic task mapping #23803

@Tehada

Description

@Tehada

Description

Current implementation of dynamic task mapping as of airflow 2.3.0 is lacking some features to support different workflows. Particularly, it lacks "zip-like" functionality to dynamically spawn tasks. Consider this example (concept, not an actual code):

@task
def get_params():
    res = {
        "arg1": [1, 3],
        "arg2": [2, 4]
    }
    return res

@task
def run_task(arg1, arg2):
    print(arg1, arg2)

with DAG(dag_id="airflow_fun", start_date=datetime(2022, 5, 16)) as dag:
    p = get_params()
    run_task.expand_zip(arg1=p["arg1"], arg2=p["arg2"])

The desired behavior for this code is to run run_task twice: run_task(arg1=1, arg2=2) and run_task(arg1=3, arg2=4). It is not possible to implement such workflow in current implementation of dynamic tasks using expand API -- I want such API to appear in airflow.

It is possible, that creating something similar to current expand is not the best approach and may be there is another fundamentally different way, which will be simple to use and implement -- I am going to make some research of exising tools with this feature (prefect and dagster coming in mind first, luigi seems to be dead).

Also, @ashb noted in slack:

The first thing to work out and discuss is what is the syntax to the DAG author?

Which leads me to this plan (may be updated later):

  1. Gather existing suggestions in this issue for further comparison.
  2. Research goob and bad practices of this API in different tools.
  3. Find out usecases and workflows, which will be not covered by suggested API (it is desirable to cover all usecases).

This plan and description are open for any feedback, suggestions, criticism.

Use case/motivation

One real-life usecase is to have a task get_params, which, when triggered, will list gcs bucket for new files for yesterday (number of files is variable day-to-day) and generate arguments for KubernetesPodOperator to run processing task in pod. But the processing is quite RAM-hungry, so I also predict RAM usage of particular processing task based on the file size in bucket. Then I want to run processing for each file in a separate pod with dynamically generated cmds, arguments and resources for each pod. With "zip-like" dynamic mapping this usecase will be trivially implementable.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions