Skip to content

Add example for dynamically mapped tasks#24085

Closed
potiuk wants to merge 1 commit intoapache:mainfrom
potiuk:add-example-dag-for-mapped-task
Closed

Add example for dynamically mapped tasks#24085
potiuk wants to merge 1 commit intoapache:mainfrom
potiuk:add-example-dag-for-mapped-task

Conversation

@potiuk
Copy link
Member

@potiuk potiuk commented Jun 1, 2022


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

@potiuk
Copy link
Member Author

potiuk commented Jun 1, 2022

One of the reasons I have not found the mapped task bug in the last rc1 is that I am usually using couple of example dags to test airflow. And surprisingly I found we had no dynamic mapped task in our "example_dags" :).

Here is one I used to test rc2

Copy link
Contributor

@josh-fell josh-fell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

@potiuk potiuk added this to the Airflow 2.3.3 milestone Jun 1, 2022
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jun 1, 2022
@github-actions
Copy link

github-actions bot commented Jun 1, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

1 similar comment
@github-actions
Copy link

github-actions bot commented Jun 1, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@potiuk
Copy link
Member Author

potiuk commented Jun 1, 2022

Seems like bitnami chart is failing more and more ...

@potiuk potiuk force-pushed the add-example-dag-for-mapped-task branch 2 times, most recently from d1c03b0 to 4115244 Compare June 2, 2022 14:41
Copy link
Contributor

@pingzh pingzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, thanks

@ephraimbuddy
Copy link
Contributor

Serialization error...

@potiuk
Copy link
Member Author

potiuk commented Jun 2, 2022

This is strange :)

@potiuk
Copy link
Member Author

potiuk commented Jun 2, 2022

Hey @uranusjr @ashb @kaxil - I need your expert knowledge here.

It seems that - after I added "Mapped tasks" to example dags, the serialization test detected it as "wrong". I am not 100% sure what the implications are without deeper looking at it but it does look like we are not able to use Mapped Tasks in cases where Dags are serialized - which might or might not be a problem.

Example here:

https://github.com/apache/airflow/runs/6710772433?check_suite_focus=true#step:10:4202

  >       assert isinstance(serialized_task, SerializedBaseOperator)
  E       assert False
  E        +  where False = isinstance(<Mapped(_PythonDecoratedOperator): add_one>, SerializedBaseOperator)

Any comments before I dive deeper to understand it ?

@uranusjr
Copy link
Member

uranusjr commented Jun 3, 2022

Looks like the test is wrong. MappedOperator instances are deserialised into MappedOperator, not SerializedBaseOperator. The test should be updated to

if serialized_task.is_mapped:
    assert isinstance(serialized_task, MappedOperator)
    assert isinstance(task, MappedOperator)
else:
    assert isinstance(serialized_task, SerializedBaseOperator)
    assert isinstance(task, BaseOperator)
assert not isinstance(task, SerializedBaseOperator)

@potiuk
Copy link
Member Author

potiuk commented Jun 3, 2022

Thanks @uranusjr !

This change adds quite a bit of modifications in the mapped operators
implementation to make them comparable before and after serialization
as otherwise the serialization tests fail.
@potiuk potiuk force-pushed the add-example-dag-for-mapped-task branch from 4115244 to 508f1da Compare June 3, 2022 13:14
@potiuk potiuk requested review from XD-DENG and kaxil as code owners June 3, 2022 13:14
@potiuk
Copy link
Member Author

potiuk commented Jun 3, 2022

Hey @uranusjr - actually I needed to do quite a number of other changes to make the tests pass. I am not sure however if some of those changes are how they should be - some of them were based rather on intuition than facts as the current hierarchy of operators and how it interacts with Serialization is well,, convoluted to say the least (most of it obviously due to history of adding MappedOperator on top of existing BaseOperator).

  • I believe we had qute a bit of inconsistency in how we treated hash/eq of operators for BasedOperator and MappedOperator. I think I fixed it - though when I tried to move it level up (to AbstractOperator) I had some strange "MapperOperator" is not hashable even if it derived from AbstractOperator - but likely some magic of serializing/deserializing which I don't fully understand caused it.

  • operators_extra_links in baseOperator were )_ rather than [] and it cause comparision with serialized version wrong (this is the one I have most reservations about - likely it shoudl be changed elsewhere

  • XComArg() is recreated when serializing and since is passed as value in a dictionary, the field comparision flagged identical instances as different. I implemented proper hash there to make it hashable and usable in set comparision, and had to make a bit more complex comparision for dicts - to compare values by equality rather than identity.

  • when serializing /deserializing we loose "DecoratedMappedOperator" - likely for a good reason, that's why I had to relax the check in __eq for type otherwise we would have DecoratedMappedOperator != MappedOperator. I think it might not matter that we are loosing Decorated (because I think they are equivalent after deserializing) but maybe it's worth looking at.

  • I excluded some fields from comparision, because they were wildly different after serialize/deserialize. I guess it does not matter as those are only used to before mapping (serializing happens already for fully mapped task I think) but it woudl be great to double check if that is the right approach and whether we have not lost anything

For dags:
* 'partial_kwargs',
* 'operator_class',
For tasks:
* 'operator_class',
* 'partial_kwargs',
* 'mapped_op_kwargs',

@potiuk
Copy link
Member Author

potiuk commented Jun 3, 2022

I'd appreciate thorough review :)

@potiuk
Copy link
Member Author

potiuk commented Jun 3, 2022

For some strange reason DecoratedMappedOperator is not hashable however it should (in principle) derive hash() and eq() from MappedOperator. Anyone knows what's going on here :)? Some Python woodo we do somewhere?

@potiuk potiuk closed this Aug 4, 2022
@potiuk potiuk deleted the add-example-dag-for-mapped-task branch August 8, 2022 08:02
@ephraimbuddy ephraimbuddy removed this from the Airflow 2.3.4 milestone Aug 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants