Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jan 7, 2022

This is needed for the scheduler to be able to expand the task instances at runtime

@ashb ashb force-pushed the mapped-operator-serialization branch from 5461a41 to 30afd2e Compare January 7, 2022 15:31
@ashb ashb changed the title Serilze mapped tasks and task groups Serialize mapped tasks and task groups Jan 7, 2022
@ashb ashb requested a review from kaxil January 7, 2022 15:31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t get what this implies.

Copy link
Member Author

@ashb ashb Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the task_id from the kwargs (so it isn't stored in the mapped data structure) -- so del kwargs['task_id'] but without the try/except for when it's not there? I'll double check if I can just do this with a del.

Copy link
Member

@uranusjr uranusjr Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my question is more like why do we need to treat task_id specially (and why we didn’t need to previously, but only when we add operator mapping)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task ID was getting stored in partial_kwargs which I hadn't noticed until I looked at the serialized representation of a MappedOperator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about storing task_id separately instead and not in partial_kwargs?

Copy link
Member Author

@ashb ashb Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dealing with it in partial_kwargs --

            # Store the args passed to init -- we need them to support task.map serialzation!
            kwargs.pop('task_id', None)
            self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore

(The only use of __init_kwargs is for building partial_kwargs when mapping a task)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other option:

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 752b8a6cf..71170320d 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -136,6 +136,7 @@ class BaseOperatorMeta(abc.ABCMeta):
         non_optional_args = {
             name for (name, param) in non_variadic_params.items() if param.default == param.empty
         }
+        non_optional_args -= {'task_id'}
 
         class autostacklevel_warn:
             def __init__(self):
@@ -158,7 +159,7 @@ class BaseOperatorMeta(abc.ABCMeta):
             func.__globals__['warnings'] = autostacklevel_warn()
 
         @functools.wraps(func)
-        def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
+        def apply_defaults(self: "BaseOperator", *args: Any, task_id: str, **kwargs: Any) -> Any:
             from airflow.models.dag import DagContext
             from airflow.utils.task_group import TaskGroupContext
 
@@ -201,15 +202,15 @@ class BaseOperatorMeta(abc.ABCMeta):
 
             hook = getattr(self, '_hook_apply_defaults', None)
             if hook:
-                args, kwargs = hook(**kwargs, default_args=default_args)
+                args, kwargs = hook(task_id=task_id, **kwargs, default_args=default_args)
+                task_id = kwargs.pop('task_id')
                 default_args = kwargs.pop('default_args', {})
 
             if not hasattr(self, '_BaseOperator__init_kwargs'):
                 self._BaseOperator__init_kwargs = {}
 
-            result = func(self, **kwargs, default_args=default_args)
+            result = func(self, **kwargs, task_id=task_id, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
-            kwargs.pop('task_id', None)
             self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
 
             # Here we set upstream task defined by XComArgs passed to template fields of the operator

Which do you think is better @uranusjr?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The “other” one looks better to me, it feels like we are intentionally treating task_id different from all other arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9439378 -- and a bit of a justification/reason in the commit message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this has an unfortunate side effect:

>>> FooOperator()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: apply_defaults() missing 1 required keyword-only argument: 'task_id'

and error message seems to be hard-coded in the interpreter and cannot be patched. I’m trying yet another approach (see new commits)

@ashb ashb force-pushed the mapped-operator-serialization branch 2 times, most recently from 5644640 to 3ef6268 Compare January 13, 2022 16:18
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't partial_kwargs={} be a default on the MappedOperator itself?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, cos in all other situations where this is constructor we need to pass a value.

Comment on lines +619 to +621
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for these three, shouldn't we have a default on MappedOperator itself?

@ashb ashb force-pushed the mapped-operator-serialization branch from 211ae54 to 08c4236 Compare January 14, 2022 14:02
@ashb ashb marked this pull request as ready for review January 14, 2022 14:02
@ashb ashb requested a review from XD-DENG as a code owner January 14, 2022 14:02
@ashb ashb requested review from kaxil and uranusjr January 14, 2022 14:21
@ashb
Copy link
Member Author

ashb commented Jan 14, 2022

I'm now saying this is "good enough" -- we can make changes later if we want to

ashb added 2 commits January 17, 2022 16:01
It simplifies a few things.

We also deal with (and test) the old name when deserializing
Since `task_id` is handled speically in the serialization of
MappedOperators, we don't want it duplicated in to the partial_kwargs.
@ashb ashb force-pushed the mapped-operator-serialization branch from 702407b to 9439378 Compare January 17, 2022 17:11
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Comment on lines +1743 to +1745
@deps.default
def _deps_from_class(self):
return self.operator_class.deps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m assuming many of these defaults (this, _is_dummy, and template_fields, I believe) don’t need to consider when operator_class is a str because in that case these values would’ve been supplied explicitly instead. (It’s kind of bad it’s designed this way but I guess that can be said for many things regarding the current serialisation implementation...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly that. I've already started thinking in my head about how to refactor/rearchitect the serialization and deserialization.

@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jan 17, 2022
@uranusjr uranusjr removed the full tests needed We need to run full set of tests for this PR to merge label Jan 18, 2022
@uranusjr
Copy link
Member

Seems to be working. If the latest two commits look good to you @ashb this is ready to go in.

@ashb
Copy link
Member Author

ashb commented Jan 18, 2022

Looks good -- thanks for picking up those fiixes TP.

@ashb ashb merged commit 077bacd into apache:main Jan 18, 2022
@ashb ashb deleted the mapped-operator-serialization branch January 18, 2022 09:21
@uranusjr
Copy link
Member

I’ll start working on attribute parity between BaseOperator and MappedOperator.

@ashb
Copy link
Member Author

ashb commented Jan 18, 2022

I'm going to work on adding --map-index to task execution side.

@jedcunningham jedcunningham added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 1, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dynamic-task-mapping AIP-42 area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants