Skip to content

Conversation

@AndersonReyes
Copy link
Contributor


closes: #8996

using type hints to infer multiple outputs when using task decorator
^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg
Copy link

boring-cyborg bot commented Aug 16, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://apache-airflow-slack.herokuapp.com/

@AndersonReyes
Copy link
Contributor Author

AndersonReyes commented Aug 16, 2020

TODO

  • forgot to handle functions without type annotations for return type. Need to check if sig is set to Signature.empty

Copy link
Member

@turbaszek turbaszek Aug 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support also set? Probably it would be awesome to support any iterable... WDYT? @casassg @evgenyshulman @jonathanshir

@turbaszek
Copy link
Member

turbaszek commented Aug 16, 2020

Thanks @AndersonReyes for taking interest in this issue! This looks good but we need to adjust this logic of XCom storage:

if not self.multiple_outputs:
return return_value
if isinstance(return_value, dict):
for key in return_value.keys():
if not isinstance(key, str):
raise AirflowException('Returned dictionary keys must be strings when using '
f'multiple_outputs, found {key} ({type(key)}) instead')
for key, value in return_value.items():
self.xcom_push(context, key, value)
else:
raise AirflowException(f'Returned output was type {type(return_value)} expected dictionary '
'for multiple_outputs')

Also the crucial part of this change is allowing users to reference those returned values before executing a task. For example this should work:

@task
def task1() -> Tuple[str, int]:
    return "magic", 42


@task 
def task2(a: str, b: int) -> None:
    pass


a, b = task1()
task2(a, b)

And this means that we have to implement some smart __iter__ for XComArg class because that's the return type of task1() invocation. Otherwise we get this:

    a, b = task2()
ValueError: too many values to unpack (expected 2)

But I think we can limit the scope of this PR to just resolve the multiple_output.

@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label Aug 16, 2020
@AndersonReyes
Copy link
Contributor Author

AndersonReyes commented Aug 16, 2020

I'm thinking For handling tuples in execute, the easiest thing would be to make the key the position of each iterable item. And be able to access by index Maybe? Hmmm or just add entire tuple to default XCOM key and using that .

I think to stay consistent if it's a tuple just store each item with key as position , and to get item to pass the index in XcomArg but can also retrieve all of them at once using the default XCOM what ya think

@turbaszek
Copy link
Member

I agree that's the simplest approach to use is something like return_value_0, return_value_1, ... as keys for tuples.

@AndersonReyes
Copy link
Contributor Author

I can't quite figure a clean unpacking of the xcomargs without knowing the size of the output in advance. Right now i have this which is not clean, infer the number of outputs from the typing and pass that to the _PythonOperator but still brainstorming on how to do that unpacking or if something else got a solution will prob leave this for another pr.

def _infer_multiple_outputs(
    python_callable: Optional[Callable] = None,
    n_outputs: Optional[int] = None,
    multiple_outputs: bool = False,
) -> Tuple[bool, Union[None, int]]:
    """
    Try to infer multiple outputs and number of outputs from typing.
    This a hack really and only works for tuples.
    """
    if not python_callable:
        return multiple_outputs, n_outputs

    sig = signature(python_callable).return_annotation
    ttype = getattr(sig, "__origin__", None)

    if (
        sig != inspect.Signature.empty
        and is_container(ttype)
    ):
        multiple_outputs = True

        # see if we can infer the number of outputs
        type_args = sig.__args__
        if (not n_outputs )and (ttype in (Tuple, tuple)) and (Ellipsis not in type_args):
            n_outputs = len(type_args)

    return multiple_outputs, n_outputs


def task(
    python_callable: Optional[Callable] = None,
    multiple_outputs: bool = False,
    n_outputs: Optional[int] = None,
    **kwargs
) -> Callable[[T], T]:
    """
    Python operator decorator. Wraps a function into an Airflow operator.
    Accepts kwargs for operator kwarg. Can be reused in a single DAG.

    :param python_callable: Function to decorate
    :type python_callable: Optional[Callable]
    :param multiple_outputs: if set, function return value will be
        unrolled to multiple XCom values. List/Tuples will unroll to xcom values
        with index as key. Dict will unroll to xcom values with keys as XCom keys.
        Defaults to False.
    :type multiple_outputs: bool

    """

    multiple_outputs, n_outputs = _infer_multiple_outputs(
        python_callable=python_callable, n_outputs=n_outputs, multiple_outputs=multiple_outputs)

    def wrapper(f: T):
        """
        Python wrapper to generate PythonFunctionalOperator out of simple python functions.
        Used for Airflow functional interface
        """
        _PythonFunctionalOperator.validate_python_callable(f)
        kwargs.setdefault('task_id', f.__name__)

        @functools.wraps(f)
        def factory(*args, **f_kwargs):
            op = _PythonFunctionalOperator(python_callable=f, op_args=args, op_kwargs=f_kwargs,
                                           multiple_outputs=multiple_outputs, n_outputs=n_outputs,
                                           **kwargs)
            return XComArg(op)
        return cast(T, factory)
    if callable(python_callable):
        return wrapper(python_callable)
    elif python_callable is not None:
        raise AirflowException('No args allowed while using @task, use kwargs instead')
    return wrapper

and the iter for XcomArg

    def __iter__(self):
        return iter(XComArg(operator=self.operator, key=str(i)) for i in range(self.operator._n_outputs))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Implements xcomresult['some_result_key'
Implements xcomresult['some_result_key']

Copy link
Member

@turbaszek turbaszek Aug 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.xcom_push(context, str(i), value)
self.xcom_push(context, f"return_value_{i}", value)

How about something more informative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha fixed that, but wouldn't you want to index xcomarg though? like output[0] vs output["return_value_0"]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when you think about it anywhere you have to index each item to pass to another task is better to take a generic iterable instead and just pass entire container output instead of each individual item so I see your point

@casassg
Copy link
Contributor

casassg commented Aug 18, 2020

Regarding supporting any iter. We discussed that on the previous PR: #8962 (comment)

My 2 cents on adding tuple support: Makes code quite more complex over not enough high value.

Finally, should we add a custom class to make this more explicit? aka you need to import from airflow.ttypes import RetDict or something like that.

Comment on lines 288 to 291
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about using this parenthesis here. Does this not fit in a single line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah that's just left over noise, originally had complex check before I found the is_container func existed. I'll clean it up

@AndersonReyes
Copy link
Contributor Author

not sure whats up with the one test case getting exit 137 but cant tell if its me or github actions

@potiuk
Copy link
Member

potiuk commented Aug 19, 2020

Quarantined tests do timoeout from time to time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
multiple_outputs: Union[None, bool] = None,
multiple_outputs: Optional[bool] = None,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh almost missed this good catch

@kaxil kaxil added this to the Airflow 2.0.0 milestone Aug 24, 2020
@turbaszek
Copy link
Member

My 2 cents on adding tuple support: Makes code quite more complex over not enough high value.

I agree. Let's just add __iter__ that will raise a meaningful error just in case someone tries unpacking.

@casassg
Copy link
Contributor

casassg commented Aug 26, 2020

Note: You will need to update tests. Now only Dict annotated return should be multiple_output. Since tuple, set and such should not work.

@AndersonReyes
Copy link
Contributor Author

makes sense ill make the updates 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add back one of these tests to make sure it's not inferred?

@stale
Copy link

stale bot commented Oct 12, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 12, 2020
@AndersonReyes
Copy link
Contributor Author

@casassg @dimberman @turbaszek I pretty much forgot about this since it's crazy at work and I'm sure ya got your own stuff happening also but is there anything else to do for this PR? Or has it been implemented elsewhere and I can just close this one?

@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 29, 2020
@turbaszek
Copy link
Member

@AndersonReyes we will need a note in documentation expelling what types will be treated as multiple_outpus and how users should use type hinting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Not, If you manually set the ``multiple_outputs`` parameter the inference is disabled and
Note, if you manually set the ``multiple_outputs`` parameter the inference is disabled and

Or did you have something else in your mind?

Copy link
Contributor Author

@AndersonReyes AndersonReyes Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah definitely a typo

Comment on lines 260 to 264
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sig = signature(python_callable).return_annotation
ttype = getattr(sig, "__origin__", None)
if sig != inspect.Signature.empty and ttype in (dict, Dict):
multiple_outputs = True
sig = signature(python_callable).return_annotation
ttype = getattr(sig, "__origin__", None)
multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 266 to 267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be able to skip this line? None has boolean value of False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do the patch suggestion you have above, removing the if statement would set it to false anyways so yeah the line is not needed

Copy link
Member

@turbaszek turbaszek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @AndersonReyes ! 👏

@github-actions
Copy link

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 12, 2020
@turbaszek
Copy link
Member

@AndersonReyes could you rebase onto latest master please?

@AndersonReyes
Copy link
Contributor Author

@AndersonReyes could you rebase onto latest master please?

yessir, squashed first to avoid merge conflicts 14 times for each commit lol

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@turbaszek
Copy link
Member

squashed first to avoid merge conflicts 14 times for each commit lol

Good thinking 😄

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@AndersonReyes
Copy link
Contributor Author

AndersonReyes commented Nov 14, 2020

nvm rebased one more time the static check readme error was fixed on master

@potiuk
Copy link
Member

potiuk commented Dec 7, 2020

Hello. Is this something we really want for 2.0.0rc1? If not - can someone set the right milestone? Or maybe rebase and merge since it is approved already :) ?

@ashb ashb modified the milestones: Airflow 2.0.0rc1, Airflow 2.1 Dec 7, 2020
@ashb ashb merged commit 1d91ca7 into apache:master Dec 9, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 9, 2020

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-31 Task Flow API for nicer DAG definition full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use type hints to indicate multiple return values when using @task

6 participants