-
-
Notifications
You must be signed in to change notification settings - Fork 748
Delay deserialization of Data in workers until actual usage. #3998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In some case deserialization is not thread safe and it would be good to only deserialize if no work is being done at the same time. This is trying to achieve this in part by delaying until unpicking until the data is actually needed to be sent to the Executor. In the case of a single thread TPE that would mean non-thread safe code can be unpickled. Maybe we even want to move unpicking into the Executor, but then we need the executor to support this; and we might want to make sure we don't unpickle many times. This of course needs a gross hack by looking at frames, but at least I hope this will get some conversation started. This also screws up the computation of per-type bandwidth; unless we delay the BW calculation per type.
|
My other idea was to use a custom serializer/deserializer that would be lazy, the problems being that:
I'm also wondering if keeping the serialized data in worker, if it's needed somewhere else as then worked don't need to re-serialise it to be sent... |
|
Yeah, this seems like a big enough hack that we would probably reject it without a lot of evidence that it was necessary in a variety of situations. In general I would recommend that any group facing things like this look into improving their serialization, or using locks to protect finicky resources. You could also look into using a synchronous executor if you wanted everything to be in one thread. The closest common situation I can recall that sounds like this one is dealing with TensorFlow graphs, which don't like being deserialized in one thread and then used in another. Any fix like this would have, I think, enough unexpected results that I'd be very hesitant to consider it. |
Do you mean the inspect stack, or keeping obj as serialized until necessary ? I can likely find another way to pass the information that data should be lazyly deserialized; but mostly wanted to know if it's worth looking into. Also yes, the group is looking into improving thread safety and (de)serialisation of objects, and already on a single thread TPE to minimize issues, and looking into locks. |
|
In general lazy deserialization. Or if we do want to do this that's fine, but we need to survey other use cases first, see how important it is, figure out what a convention might be that makes sense, etc.. This feels like tacking on a feature for a single user's use case fairly deep into the guts of Dask. My guess is that there is some more general solution somewhere that lets this group get what they want, and doesn't add an odd corner case into the core. |
|
Alternatively, if this is the right solution, then I think that we need to build a case for it. The best case I can think of that is similar to this is TensorFlow/Keras, as mentioned above. |
|
I thought we could already control serialization/deserialization with a separate thread with the config option: distributed.comm.offload . Does that not work ? |
I don't believe it does, the core of the problem being that some types objects don't like having work being done at the same time and other instances being deserialized. deactivating offload already force the deserialisation to be done in the main loop, but then the ThreadPool executor is still running; And that's not good. My goal here is to reduce the chance of deserialisation happening while the TPE is doing work. Either the objects need to have a both when in use that block deserialisation; OR I need dask/distributed to not deserialize while computing. Right now this is hard as unpacking messages unpacks data as well. |
|
There is a shared interest in delaying deserialization I think. For example here we discussed delaying deserialization to reduce memory usage ( rapidsai/dask-cuda#342 (comment) ). I think how that would look may differ a bit from how it is currently implement here, but the overall objective seems agreeable. |
Yes, in rapidsai/dask-cuda#342 (comment) the plan is to delay the deserialization until the data is accessed by the task. It even enables tasks to coordinate deserializations with other work explicitly. Currently I am on vacation but I will start working on this when I get back next week. |
|
The problem with not having dask being aware of this is then user get exposed to those proxy objects you suggest in rapidsai/dask-cuda#342 (comment) ; I think it would be good to have a solution that works regardless of the types; or the serializer/deserializer involved. I completely agree that the implementation here is horrible with stack inspection. Please ping me on any work you are doing on this on the dask-cuda, I'm happy to help make it more generic. |
|
I wonder if we can push deserialization into |
Does this have the potential to stall some task/duplicate work if the same deserialized data is required by 2 tasks. |
|
I think that deserializing data when we get it is more sensible in the common case situation. I want to make sure that we're not mucking about with sensible behaviors because of a few odd cases. |
Agree, this is are disadvantages with the approach. We can mitigate this by make the proxy object as transparent as possible but you are right, it shouldn't be on by default. |
|
#4307 should allow users to not deserialize and run tasks at the same time |
|
Closing as stale. |
In some case deserialization is not thread safe and it would be good to
only deserialize if no work is being done at the same time.
This is trying to achieve this in part by delaying until unpicking
until the data is actually needed to be sent to the Executor. In the
case of a single thread TPE that would mean non-thread safe code can be
unpickled. Maybe we even want to move unpicking into the Executor, but
then we need the executor to support this; and we might want to make
sure we don't unpickle many times.
This of course needs a gross hack by looking at frames, but at least I
hope this will get some conversation started.
This also screws up the computation of per-type bandwidth; unless we
delay the BW calculation per type.
There is also some issues as clients will also call indirectly
get_data_from_worker, and for exampleawait client.submit(lambda x: x + 1, 10)will return aSerializedobject instead of expected result...