Skip to content

Conversation

@rjzamora
Copy link
Member

This PR is motivated by serialization needs for the ongoing Blockwise-IO work in dask/dask (e.g dask#7417, dask#7415). One critical issue is that serialize is not guarenteed to iterate through the elements of a task (and generate seperate header/frames for each element). This is a problem when one or more of those task elements are a Serialized object - We don't what these objects to be grouped together with the other elements in a single blob of bytes, otherwise the object will remain Serialized when it is passed to the actual function on the worker.

Note that I also think that we should always use serialize(..., iterate_collection=True) when the object is a task. However, I'll rather leave that work for a seperate PR.

@rjzamora rjzamora changed the title Add iterate_collection argumet to serialize Add iterate_collection argument to serialize Mar 27, 2021
@rjzamora
Copy link
Member Author

rjzamora commented Mar 27, 2021

cc @jrbourbeau @ian-r-rose - This seems to resolve most of the serialization problems we discussed today. If you use serialize(task, iterate_collection=True) while materializing the graph on the scheduler, the task tuple may contain elements of type Serialized, and the worker will handle deserialization without any special logic.


def serialize(x, serializers=None, on_error="message", context=None):
def serialize(
x, serializers=None, on_error="message", context=None, iterate_collection=None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, if iterate_collection=None isn't supposed to have a distinct meaning from iterate_collection=False (which seems to be the case), the default value could be set to False?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I guess the current logic will use False as the default anyway, so we might as well be explicit. Thanks, good suggestion.

Copy link
Contributor

@hristog hristog Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you! Basically, my thinking has been that, this way the intended value space would be documented more clearly.

Copy link
Member Author

@rjzamora rjzamora Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking back at the code a bit, I think the iterate_collection=None default actually makes sense. The problem is that we do not need the iterate_collection = iterate_collection or False line below.

The behavior we want:

  • iterate_collection=True: Serialization always dumps the elements of the collection separately
  • iterate_collection=False: Serialization always dumps the entire collection together
  • iterate_collection=None (default): The ideal approach is inferred by the existence/location of "pickle" among the list of available serializers, and the types of data detected in the collection.

Copy link
Contributor

@hristog hristog Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, that iterate_collection = iterate_collection or False was redundant (apologies I couldn't be of more help by actually spotting it). Thanks for updating the docstring!

One potential style point about in (list, set, tuple, dict) (further down). Not sure if there are any firm guidelines about this (distributed-wise or Dask organization-wide), but perhaps that could be considered as well (i.e., in <tuple> vs in <set>).

@jakirkham
Copy link
Member

cc @madsbk (in case you have thoughts here 🙂)

Comment on lines 3578 to 3579
_dumps = warn_serialize if _serialized_task_arg(task[2:]) else warn_dumps
d = {"function": dumps_function(task[1]), "args": _dumps(task[2])}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjoseph92 @ian-r-rose @jrbourbeau - The changes in this PR seem to address the requirements in #4673 (in fact, nested tasks also work), but I am not happy with the changes in dumps_task.

Note that _serialized_task_arg is checking the type of each (non-function) element in task, and using warn_serialize rather than warn_dumps if there are any Serialized objects. This extra pass over the data partially defeats the purpose of the check (we might as well always use warn_serialize if we are going to take the time to iterate over elements anyway). Perhaps we should introduce a standalone serialize_task function that will do the same thing as dumps_task, but will always call warn_serialize on the args and kwargs? In that case, the decision whether to use dumps_task vs serialize_task would need to be made upstream, but the logic in dumps_task could remain the same.

I guess we could also cache the _serialized_task_arg result based on the function, but that seems like it could be fragile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oaky - I think I'd like to leave dumps_task alone and introduce the new logic for serializing a task on the scheduler in a new serialize_task function (I introduced this in 093d240)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing on this @rjzamora! I'm still grokking all the changes here

Reading through the discussion over in #4673, I was wondering what your thoughts were on @gjoseph92's comments, specifically:

Register a dummy serialization family for "task", which caches, then just calls back into deserialize. By marking things as "task" when serializing, we can reduce cache misses. Kinda hackey, could work?

Alternatively, the quickest solution would be to leave everything as it is, and have worker._deserialize just call deserialize on the args/kwargs, to unpack any Serialize/Serialized objects they may contain. But I imagine that is undesirable, especially when we already have this nice machinery for handling nested serialization with to_serialize and Serialized.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 7, 2021

Register a dummy serialization family for "task", which caches, then just calls back into deserialize. By marking things as "task" when serializing, we can reduce cache misses. Kinda hackey, could work?

Yeah - I would consider this to be the main alternative to using something like serialize_task (or a modified/improved version of dumps_task), but we would still need the iterate_collection changes for Serialized objects to be captured correctly.

Alternatively, the quickest solution would be to leave everything as it is, and have worker._deserialize just call deserialize on the args/kwargs, to unpack any Serialize/Serialized objects they may contain. But I imagine that is undesirable, especially when we already have this nice machinery for handling nested serialization with to_serialize and Serialized.

This could work for the Serialized-object problem, but my intuition is that we should avoid checking all args/kwargs for the general case. Also, this won't addrress the inlined-task problem.

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would still need the iterate_collection changes for Serialized objects to be captured correctly.

Agreed: no matter what we do for task serialization, we'll need core serialize to be better at descending through collections.

Since we're less sure about the approach for task serialization though, could it make sense to focus this PR on just iterate_collection, and move the serialize_task changes into a different one?

@rjzamora rjzamora marked this pull request as draft April 8, 2021 00:03
@rjzamora
Copy link
Member Author

rjzamora commented Apr 8, 2021

Since we're less sure about the approach for task serialization though, could it make sense to focus this PR on just iterate_collection, and move the serialize_task changes into a different one?

My mistake for not (re-)marking this as a draft PR. I am fairly certain that we will need to expose an iterate_collection= argument to serialize for any solution we come up with. All the other components should be stripped from this PR before it is merged.

Confession: I actually thought I was pushing to a separate branch when I first started the serialize_task experiment, but just carried on after I already made a mess.

Some Notes: In another branch, I am also experimenting with a serialize(.., task=) approach that is a bit more like your idea to handle function caching within serialize. That may end up being a cleaner solution. However, it looks like dask#7525 is now producing Serialize objects (rather than Serialized) within __dask_distributed_unpack__ - So, I am now wondering if my understanding of the problem/solution needs to change.

@rjzamora rjzamora marked this pull request as ready for review April 8, 2021 14:26
@rjzamora
Copy link
Member Author

rjzamora commented Apr 8, 2021

Okay good(ish) news:

  • I removed the experimental serialize_task and dumps_function changes from this PR
  • It seems that these changes alone unblock HLG/Blockwise progress in Dask. For now, within Blockwise, we will need to do something similar to what the proposed serialize_task was doing, but I have confirmed that this will at least "work".

Comment on lines -3479 to +3481
if args:
if args and isinstance(args, bytes):
args = pickle.loads(args)
if kwargs:
if kwargs and isinstance(kwargs, bytes):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is the only change that wasn't in this PR originally. When constructing a task on the scheduler, we will need to have the option of using to_serialize on args/kwargs (so that we can avoid pickling Serialized/Serialize objects while still enabling function caching). Therefore, these dict values may not be pickled bytes when we get to _deserialize.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 9, 2021

Just a note: I have also experimented with a serialize(..., is_task=) argument that allows us to rely on to_serialize for all tasks (supporting function caching, and possibly large-object warnings): comparison with main

@madsbk
Copy link
Contributor

madsbk commented Apr 9, 2021

Any other reason not to always iterate collections beside performance?
I suspect that the overhead of calling check_dask_serializable(x) when iterate_collection=None, which tries to serialize the first item in each nested collection, is greater than always iterating the task args in most cases. I might be wrong though.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 9, 2021

Any other reason not to always iterate collections beside performance?

Only for performance reasons (as far as I can tell).

I suspect that the overhead of calling check_dask_serializable(x) when iterate_collection=None, which tries to serialize the first item in each nested collection, is greater than always iterating the task args in most cases. I might be wrong though.

I think the hypothetical case we are guarding against is a many-element collection (much larger than a task-based tuple) that doesn't contain any special elements. My intuition tells me that we don't want to iterate through these elements in python and generate a separate header/frames that will then need to be iterated though again on the worker. At the same time, it would also be nice to avoid the iterate_collection inference step (which, as you point out, also has real overhead).

I certainly agree that this is probably an appropriate time to question the iterate_collection approach altogether. When a graph is generated on the client, we always use dumps_task to serialize a task, and that function rarely calls into to_serialize (unless one or more of the task elements is also a task). If we decide to always take the to_serialize approach when the graph is constructed on the scheduler, then the iterate_collection overheads (both iteration and inference) may have a much larger effect on overall performance.

@jrbourbeau
Copy link
Member

We could also do something in the middle and set iterate_collection=True when we encounter a Serialize object (instead of always iterating through collections). This has the added benefit of not needing to add a iterate_collection keyword to the Serialize class constructor.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 13, 2021

We could also do something in the middle and set iterate_collection=True when we encounter a Serialize object (instead of always iterating through collections). This has the added benefit of not needing to add a iterate_collection keyword to the Serialize class constructor.

Thanks @jrbourbeau - I like the idea of avoiding the need to string iterate_collection into the Serialize class. I made the change you suggested. We can indeed get Blockwise serialization working this way, but it will require us to do something like to_serialize(to_serialize(task)) when we are constructing the graph (I've tested something like this here). Note that doing to_serialize(task) will just result in default serialization on task (so we will not iterate through the tuple unless the very first element is dask-serializable).

@jrbourbeau
Copy link
Member

I made the change you suggested. We can indeed get Blockwise serialization working this way, but it will require us to do something like to_serialize(to_serialize(task)) when we are constructing the graph (I've tested something like this here).

Hrm that's unfortunate. I wonder if we can avoid the need for double to_serialize calls. Looking at the diff for this commit ea0e54d, it looks like we're unpacking Serialize objects in _encode_default and no longer passing iterate_collection information to serialize_and_split. Since serialize will itself unpack Serialize objects and set iterate_collection=True, can we instead just pass Serialize objects directly to serialize_and_split in _encode_default?

Specifically I'm wondering if we can avoid the need for double to_serialize calls over in dask/dask#7455 if we make the following change here:

diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py
index fb85d32f..e471ca8b 100644
--- a/distributed/protocol/core.py
+++ b/distributed/protocol/core.py
@@ -48,8 +48,6 @@ def dumps(msg, serializers=None, on_error="message", context=None) -> list:
         def _encode_default(obj):
             typ = type(obj)
             if typ is Serialize or typ is Serialized:
-                if typ is Serialize:
-                    obj = obj.data
                 offset = len(frames)
                 if typ is Serialized:
                     sub_header, sub_frames = obj.header, obj.frames

@rjzamora
Copy link
Member Author

Ah - Good suggestion @jrbourbeau. That works. Sorry I missed it!

@rjzamora
Copy link
Member Author

I assume the test_dashboard faiilures are unrelated to this PR.

@jrbourbeau
Copy link
Member

Yeah, sorry about that. test_dashboard is known flaky test (#4697) and should be resolved by #4706

@rjzamora
Copy link
Member Author

Okay - CI finally passing here :)

Since we probably don't want to rush #4699, and this PR no longer breaks the public API, I think there is still good motivation to get this in.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice green check marks @rjzamora : )

@jakirkham if you get a moment, your feedback on this PR would be valuable. As Rick pointed out in his original description

One critical issue is that serialize is not guarenteed to iterate through the elements of a task (and generate seperate header/frames for each element). This is a problem when one or more of those task elements are a Serialized object - We don't what these objects to be grouped together with the other elements in a single blob of bytes, otherwise the object will remain Serialized when it is passed to the actual function on the worker.

This PR updates serialize to always iterate through collections when it encounters a Serialize object.

In the future we will probably address task serialization with a more wide-spread refactor (e.g. something like @madsbk's work over in #4699) but the updates here serve as a smaller set of changes we could make today to unblock moving more operations to HighLevelGraph representations over in dask (e.g. dask/dask#7455, dask/dask#7415).

@rjzamora
Copy link
Member Author

Thanks for the summary @jrbourbeau - I couldn't have put it better myself :)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! Planning to merge later today if there are no further comments

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM as well

@jrbourbeau jrbourbeau merged commit 74a9aff into dask:main Apr 17, 2021
@rjzamora rjzamora deleted the iterate-collection-serialize branch April 20, 2021 14:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants