-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Map and Partial DAG authoring interface for Dynamic Task Mapping #19965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This comment has been minimized.
This comment has been minimized.
6eda954 to
0792002
Compare
93bbbf3 to
a2f4bcb
Compare
|
This should now have all of the DAG authoring interface changes for AIP-42, including (some, if not extensive) tests. Still to do:
Docs will come later -- the interface might change yet still as we work out more details. |
a2f4bcb to
f6f242d
Compare
9dc47d6 to
025cfee
Compare
6b268bf to
0823885
Compare
|
I've got another small change coming to better validate mapped/partial argument names (I was missing a few cases) |
uranusjr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not finished, but submitting this batch because my comments already went stale once before I finished.
airflow/decorators/base.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder, what should MyOperator.map(x=something).map(x=another) do? If I understand this correctly, this would currently discard something and just map to another. We should likely add something in to prevent this from happening, perhaps in _validate_arg_names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's an unspecified point of the API. I think I'm leaning towards .map().map() being an error on general grounds.
But yes, in my head updating the params was my intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think map().map() should be an error. We already agreed on suporting map(arg1, arg2) and:
There should be one-- and preferably only one --obvious way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unliekly to be able to make this change before stopping for Christmas, so either someone else can make this, or we can merge it with this and fix it later.
(I think that having a .map() function that returns an error would be clearer than having no map method, similar to how I have .partial() on a Task object throw an error still.)
airflow/decorators/task_group.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What’s the purpose of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the tests, to catch this:
@task_group_decorator
def tg():
...
with pytest.warns(UserWarning, match='was never mapped'):
with DAG("test-dag", start_date=DEFAULT_DATE):
tg.partial()I.e. tg = tg.partial().map() was needed. You doesn't make sense to have a partial TG without also mapping it once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen in this case, nothing? (i.e. this is likely a user mistake)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct -- nothing, and likely a user mistake as the tasks in the task group wouldn't appear in the DAG
airflow/models/baseoperator.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if _validate_arg_names and this have similar logic to be refactored out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar, but the way this gets the possible names (most of the function) is quite differnt, so I couldn't work out a way of really sharing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test to ensure get_serialized_fields() is up-to-date? Otherwise we may accidentally add a field on BaseOperator but forget to serialize it, and this test would fail to reflect that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't, no, this is it. This will pick up anything added via self.x in the constructor, but won't pick up things like x: int = 0 at the class level.
Think it's worth it?
uranusjr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn’t read the tests.
airflow/models/taskmixin.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Mypy OK with this? I’d make _set_relatives() only accept the sequence case, and do the coercing outside in the public functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, yes.
airflow/utils/task_group.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not change the parent class definition to be plain class-level variable declarations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because on BaseOperator these are defined as properties.
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Also prevent calls like .partial(...).partial(...). It is uncertain whether these kinds of repeated partial/map calls have utility, so let's disable them entirely for now to simplify implementation. We can always add them if they are proven useful.
airflow/decorators/base.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, this one doesn't behave correctly right now, as:
@taks(task_id='x')
def y():
...
This currently stores task_id in the self.kwargs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this check only in deb54df
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
This was missed in the refactor: apache#19965 -- since the `task_decorator_factory` enforces keyword-only parameters for via the `*` in its signature.
Closes #19889
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.