Skip to content

work stealing seems to not occur fully when launching tasks from tasks #4945

@bolliger32

Description

@bolliger32

What happened:

I have a workflow in which long-running tasks are launched from within tasks, following the instructions for use of the worker_client context manager in the docs. In theory, according to the docs, each time the context manager is invoked, it will secede from the thread pool and then signal that the worker is able to accept another task. However, I've run into an issue where some workers will have no more tasks to process (i.e. they will only have these long-running, seceded tasks) while other workers will have a long backlog of "processing" tasks that are not actively being run but could be run on the now idle workers. When I say "long" I mean that it will take many minutes to run through those tasks, while shipping any data necessary to begin these tasks to another worker would take <1s. I mentioned this issue in a comment to #4906 but it was pointed out that this was not relevant to that issue. It's possible this situation has been brought up in another issue and if so, my apologies.

What you expected to happen:
In this situation, when the backlog on the active worker seems much longer than the time it would take to send any necessary dependencies over to the idle worker, I would expect some work stealing to happen and for the idle worker to start picking up some of those tasks. I cannot find any reason in the work stealing docs why this shouldn't happen

Minimal Complete Verifiable Example:

from dask.distributed import Client, worker_client
from time import sleep

client = Client(n_workers=4, threads_per_worker=1)

def take_some_time(time):
    sleep(time)
    return time

def launcher(time):
    with worker_client() as client:
        return client.submit(take_some_time, time).result()

short_running_fut = client.map(take_some_time, np.linspace(0, .5, 50), priority=0)
long_running_fut = client.map(launcher, [1000, 1001, 1002], priority=1)

In this example, the short-running take_some_time tasks are typically dispersed evenly across workers. Before they can complete, the higher-priority launcher task will typically get assigned to 3 different workers, which will then map the long-running take_some_time tasks to 2 or 3 workers. Those workers will start running this long-running task and now will have a backlog of short-running tasks in their queue. The 1 or 2 workers that are not currently running a long-running take_some_time tasks will spin through their shorter-running tasks and then sit idle.

We end up with an allocation like this:
image

where the worker with just 1 processing task is a worker that has a seceded launcher task but nothing else. In this situation, I would expect it to start stealing tasks from the backlog of Worker 1.

If I simply change launcher in the final line to take_some_time, then eventually all of the tasks get dumped onto the 1 worker that's not running a long-running task (as expected). If I then set the distributed.scheduler.work-stealing config setting to False, this does not occur (and the short running tasks stay piled up in the queue of the active workers, while one worker stays idle)

It appears that some work-stealing does occur when launching tasks from tasks. If I run the original code (i.e. with mapping the launcher function not the take_some_time function in the last line) after having disabled work stealing, I get a distribution something like this:
image

which looks categorically different than what I was seeing originally. So, it appears that launching tasks from tasks has some effect in confusing the standard work stealing algorithm, but it does not disable it entirely. My guess is that in the context of evaluating work stealing, the scheduler is seeing the long-running task as something actively being computed on that worker, rather than treating the worker as idle and ready for more work?

Anything else we need to know?:
It's totally possible that I just don't understand work-stealing correctly, and there is a way to set this up such that I can avoid this behavior. Regardless, if that's the case, I would be really grateful to hear it.

Environment:

  • Dask version: 2021.06.1
  • Python version: 3.8.10
  • Operating System: Linux
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions