Skip to content

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Dec 3, 2020

Normally we have one separate thread pool executor for deserializing data
and another thread pool executor in a worker for execution.
Sometimes this presents a problem because some data types want to be
used in the thread in which they were created. One example of this is
TensorFlow graphs, but there are others.

One way to resolve this is to reuse the same executor in both
situations, and ensure that it has only one thread. This means that
execution and deserialization will block each other (not great) but that
user data will always be operated on in one thread only.

This commit implements that, and drafts up a small test. However, it is
still broken because Dask can be clever in some situations and
deserialize directly on the event loop. This happens for a few reasons
today:

  1. For small messages we deserialize on the event loop for performance
    reasons. The user can control this with the
    distributed.comm.offload configuration value.

    I recommend the value of 1, meaning a single byte

  2. The scheduler-client comms intentionally do not offload today
    (grep for the allow_offload=False)

mrocklin and others added 2 commits December 3, 2020 10:18
Normally we have one separate thread pool executor for deserializing data
and another thread pool executor in a worker for execution.
Sometimes this presents a problem because some data types want to be
used in the thread in which they were created.  One example of this is
TensorFlow graphs, but there are others.

One way to resolve this is to reuse the same executor in both
situations, and ensure that it has only one thread.  This means that
execution and deserialization will block each other (not great) but that
user data will always be operated on in one thread only.

This commit implements that, and drafts up a small test.  However, it is
still broken because Dask can be clever in some situations and
deserialize directly on the event loop.  This happens for a few reasons
today:

1.  For small messages we deserialize on the event loop for performance
    reasons.  The user can control this with the
    distributed.comm.offload configuration value.

    I recommend the value of 1, meaning a single byte

2.  The scheduler-client comms intentionally do not offload today
    (grep for the `allow_offload=False`)
@jrbourbeau
Copy link
Member

Nice, this is really interesting. We also don't offload deserialization of tasks on the worker

def _deserialize(function=None, args=None, kwargs=None, task=no_value):
""" Deserialize task inputs and regularize to func, args, kwargs """
if function is not None:
function = loads_function(function)
if args:
args = pickle.loads(args)
if kwargs:
kwargs = pickle.loads(kwargs)
if task is not no_value:
assert not function and not args and not kwargs
function = execute_task
args = (task,)
return function, args or (), kwargs or {}

so for this to work, we'll need to add some additional offloading logic there

function, args, kwargs = _deserialize(*ts.runspec)
# Offload deserializing large tasks
offload_threshold = get_offload_threshold()
if sizeof(ts.runspec) > offload_threshold:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to think about if there's a better way to do this. This may lead to a slowdown if we start submitting task deserialization to a busy offloading thread pool.

if FRAME_OFFLOAD_THRESHOLD and allow_offload:
# Offload serializing large frames to improve event loop responsiveness.
offload_threshold = get_offload_threshold()
if offload_threshold and allow_offload:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, this may be expensive to do on every message. Thoughts?

Copy link
Member

@jrbourbeau jrbourbeau Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this adds ~3µs:

In [1]: import dask

In [2]: from distributed.comm.utils import get_offload_threshold

In [3]: %timeit get_offload_threshold()
2.96 µs ± 44.7 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

This change was so we could set distributed.comm.offload at runtime (i.e. with dask.config.set(distributed__comm__offload=1):) instead of just at startup time. Though I agree since is run for every message, the increased overhead may not be worth the extra flexibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the addition of get_offload_threshold and we now just pull in distributed.comm.offload at startup time

mrocklin and others added 4 commits December 7, 2020 18:38
Otherwise I think that we open ourselves to some slowdown on the event
loop and the possibility of deadlocks (maybe?)
function, args, kwargs = _deserialize(*ts.runspec)
# Offload deserializing large tasks
if sizeof(ts.runspec) > OFFLOAD_THRESHOLD:
function, args, kwargs = await offload(_deserialize, *ts.runspec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is definitely nicer

@mrocklin
Copy link
Member Author

mrocklin commented Dec 9, 2020

OK. This seems ok to me. Merging in.

@mrocklin mrocklin merged commit d54388c into dask:master Dec 9, 2020
@mrocklin mrocklin deleted the worker-offload-executor branch December 9, 2020 00:23
@jakirkham
Copy link
Member

cc @Carreau (who IIRC had a similar use case in the past)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants