Skip to content

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Apr 9, 2020

Some dask.dataframe/dask_cudf shuffling algorithms utilize tasks that produce a dictionary of pandas/cudf DataFrame objects. Since these dictionaries are typically larger than five elements, they are usually pickled when the output needs to be transferred between workers. For the cudf-backed shuffle algorithm, this can seriously degrade performance (changing the dominant communication mechanism from IPC to TPC on systems with NVLink support).

This PR increases the minimum size of a list/set/dict for which each element will be separately serialized (from 5 to 64). If the length is >5, the iterative serialization is only performed if the "first" element is dask-serializable. The check clearly doesnt assert that all items in the list/set/dict are dask-serializable, but always capture the case when all elements are.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 9, 2020

@pentschev - Any thoughts on using logic like this to avoid pickling of the group-shuffle distionary output?

cc @beckernick @VibhuJawa

@jakirkham
Copy link
Member

cc @mrocklin

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020

                 dask_serialize.dispatch(type(next(iter(x))))

This is a fun approach. Really, what we want to say is "if this dictionary is a bunch of big-complex things that Dask knows how to handle, then please serailize each of them independently. If it's just a big json-like blob, then please don't bother". The previous check of if len(d) > 5 was a very poor approximation of this check. The check here of "does dask_serialize grok these objects in a special way seems a lot nicer to me. If we can trust this approach it would be great to remove the len(d) < 64 part of the check as well.

How well does the dask_serialize approach work? If I had a dict like {"x": 1} does this reliably not dive through the dict? It might be worth pulling this functionality into a separate function, testing it with a bunch of cases that we know about

{"x": 1} -> False
{"x": np.ones(5)} -> True
{"a": 1, "x": np.ones(5)} -> True (maybe xfails today?)
{"x": [np.ones(5)} -> True
{("x", i): np.ones(5) for i in range(100)} -> True
...

And then make sure that it runs in constant time relative to the size of the dict.

In short, I like this approach if it works. I'd love to see us drop the 64 length limit if we can. I think that being able to do that would be easier if we had a function that we could all agree did the right thing all (most?) of the time, which would be easy to show through a nice test like this.

@rjzamora
Copy link
Member Author

rjzamora commented Apr 9, 2020

In short, I like this approach if it works. I'd love to see us drop the 64 length limit if we can. I think that being able to do that would be easier if we had a function that we could all agree did the right thing all (most?) of the time, which would be easy to show through a nice test like this.

Thanks for the feedback @mrocklin! The approach will indeed not dive through the dict for the case you mentioned. I will work on the test you suggested and confirm that the check runs in constant time.

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2020 via email

@rjzamora rjzamora marked this pull request as ready for review April 10, 2020 02:36
@rjzamora rjzamora changed the title [WIP] Dask-serialize dicts longer than five elements Dask-serialize dicts longer than five elements Apr 10, 2020
@rjzamora
Copy link
Member Author

rjzamora commented Apr 10, 2020

Thanks again for your advice here @mrocklin - I moved the check into a standalone function (check_dask_serializable_collection), and added a test for the behavior you mentioned. One difference from your example, however, is that I could not use a numpy array as a "dask-serializable" object. As far as I can tell, there are no custom serialization mechanisms registered for numpy types. [EDIT: Probably my mistake here - will follow up soon]

@rjzamora
Copy link
Member Author

Okay - It seems that the removal of the len <= 5 criteria is leading to two new test failures...

  • test_nested_types is serializing [[[x]]], and expecting to recursively iterate through the list (but no longer will).
  • test_pickle_safe is serializing [1, 2, 3], but only offering the "msgpack" serializer. Since we are no longer iterating through the list, and recording the "collection" datatype, we seem to be erroneously converting from list to tuple during the dumps-loads round-trip.

@rjzamora
Copy link
Member Author

Note: I am adding back the len <= 5 criteria for now (to allow tests to pass) - Ill need to follow up later

@mrocklin
Copy link
Member

For nested types it seems like the check_dask_serializable function might have to recurse if the subtype is list/dict/...

@rjzamora
Copy link
Member Author

For nested types it seems like the check_dask_serializable function might have to recurse if the subtype is list/dict/...

Right - Not sure how else a "dask-serializable" object can be detected within a nested collection... Having a bit of trouble finding a balance here (between functionality and simplicity)

@mrocklin
Copy link
Member

Maybe something like this? (I don't know really, I haven't thought too much about this

def is_serializable(x):
    if isinstance(x, (tuple, list)) and x:
        return is_serializable(x[0])
    if isinstance(x, dict) and x:
        return is_serializable(toolz.first(x.values()))
    return dask_serialize.....(x)

@rjzamora
Copy link
Member Author

Sorry - I'm not seeing how that is very different from check_dask_serializable. It will still need to recurse, right?

@mrocklin
Copy link
Member

Oh I see, yes. I apologize I hadn't looked at the recent code (pushing quickly through issues this morning)

test_nested_types is serializing [[[x]]], and expecting to recursively iterate through the list (but no longer will).

O guess I'm surprised by this then

@rjzamora
Copy link
Member Author

O guess I'm surprised by this then

Sorry - I was using github/CI a bit too much for debugging :)

That test failure is no longer a problem (the recursion fixes it). The only remaining issue is that I still need the len <= 5 criteria to get all tests to pass. (There also seems to be a recent CI problem unrelated to these changes)

@mrocklin
Copy link
Member

What tests are failing due to the length check? Is it possible to fix whatever is causing those tests to fail?

@rjzamora
Copy link
Member Author

What tests are failing due to the length check? Is it possible to fix whatever is causing those tests to fail?

On my local system, it is only distributed/tests/test_publish.py::test_pickle_safe - The problem seems to be that amsgpack round-trip converts a list to a tuple. I removed the len <= 5 and added a special_case for when the "pickle" serializer is unavailable -- I will try to find time to look into the "root"/extent of the msgpack problem.

@pentschev
Copy link
Member

Sorry for the very late reply here @rjzamora . I think your approach is great, nice that you managed to improve my very naive implementation, thanks for doing that! Also it seems that it passed now! :)

@rjzamora
Copy link
Member Author

@mrocklin - Do you think the current state of this PR is reasonable? (thanks again for your attention on this)

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Thanks Rick! 😄

Made a small observation below. Don't think anything needs to be done for it though. Just something for us to keep in mind 😉

return check_dask_serializable(next(iter(x.items()))[1])
else:
try:
dask_serialize.dispatch(type(x))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note, this requires that objects be registered with dask_serialize (as we don't check cuda_serialize for example). However we do register all CUDA objects with dask_serialize now and use this in other contexts (like Dask-CUDA's spilling). So this seems like a good enough check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the note @jakirkham - The fact that you have cleaned up serialization and registered everything as dask_serialize definitely made my life easier here - Thanks! :)

@jakirkham
Copy link
Member

Planning on merging tomorrow if no comments.

# Check for "dask"-serializable data in dict/list/set
supported = (
isinstance(x, list) and "pickle" not in serializers
) or check_dask_serializable(x)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here? I know that you added this because a particular test was failing, but what was the underlying cause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - Didn't have time to look into this carefully yet. A list is being converted to a tuple during a round trip. I originally thought it was a msgpack limitation (since the failing test only provides the "msgpack" serializer). However, the problem may be something else in client.publish_dataset or get_dataset (a simple msgpack dumps-loads test works fine).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem seems to be that the use_list=False argument is passed to msgpack.loads in msgpack_loads - Not sure if there is a reason for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that might be intentional. Appears to have been added in PR ( #2000 ). @mrocklin, how would you like us to proceed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed @jakirkham - Removing that option allows tests to pass here, but may have uninteded consequences.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But doesn't [an_unsupported_object] pass this check if the msgpack serializer is included?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - You are right. supported just means “we should iterate through the collection”

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin - I just pushed some changes to (hopefully) make the logic a bit more clear. For example, I am using the term iterate_collection instead of supported. I am also now considering the order of the serializers list, because "msgpack" is only a problem for lists if it comes before "pickle" (I added test_serialize_lists to show/check this).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what's going on now. OK, thanks @rjzamora !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PR ( #4575 ), I think we are able to workaround this MsgPack oddity. So dropped this code there.

@jakirkham
Copy link
Member

Outside of that small comment, I think this is about as good for now. Hopefully issue ( #3716 ) will garner some attention and we can determine how to improve things more in the future.

Co-Authored-By: jakirkham <jakirkham@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants