From 41b9f66f11f13a47bfac682937eec926c8f49243 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 18:26:21 -0600 Subject: [PATCH 01/20] Co-assign root-ish tasks --- distributed/scheduler.py | 54 ++++++++++++++++- distributed/tests/test_scheduler.py | 89 ++++++++++++++++++++++++++++- 2 files changed, 140 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a2996482ac9..34e9d112cee 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -940,6 +940,7 @@ class TaskGroup: _start: double _stop: double _all_durations: object + _last_worker: WorkerState def __init__(self, name: str): self._name = name @@ -953,6 +954,7 @@ def __init__(self, name: str): self._start = 0.0 self._stop = 0.0 self._all_durations = defaultdict(float) + self._last_worker = None @property def name(self): @@ -994,6 +996,10 @@ def start(self): def stop(self): return self._stop + @property + def last_worker(self): + return self._last_worker + @ccall def add(self, o): ts: TaskState = o @@ -2316,6 +2322,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: Decide on a worker for task *ts*. Return a WorkerState. """ ws: WorkerState = None + group: TaskGroup = ts._group valid_workers: set = self.valid_workers(ts) if ( @@ -2328,6 +2335,41 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts.state = "no-worker" return ws + total_nthreads: Py_ssize_t = ( + self._total_nthreads + if valid_workers is None + else sum(wws._nthreads for wws in valid_workers) + ) + group_tasks_per_thread: double = ( + (len(group) / total_nthreads) if total_nthreads > 0 else 0 + ) + if group_tasks_per_thread > 2 and sum(map(len, group._dependencies)) < 5: + # Group is larger than cluster with very few dependencies; minimize future data transfers. + ws = group._last_worker + if not (ws and valid_workers is not None and ws not in valid_workers): + if ( + ws + and ws._occupancy / ws._nthreads / self.get_task_duration(ts) + < group_tasks_per_thread + ): + # Schedule sequential tasks onto the same worker until it's filled up. + # Assumes `decide_worker` is being called in priority order. + return ws + + # Pick a new worker for the next few tasks, considering all possible workers + worker_pool = ( + valid_workers + if valid_workers is not None + else (self._idle_dv or self._workers_dv).values() + ) + ws = min( + worker_pool, + key=partial(self.worker_objective, ts), + default=None, + ) + group._last_worker = ws + return ws + if ts._dependencies or valid_workers is not None: ws = decide_worker( ts, @@ -2336,6 +2378,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: partial(self.worker_objective, ts), ) else: + # Fastpath when there are no related tasks or restrictions worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) wp_vals = worker_pool.values() @@ -4661,6 +4704,8 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): recommendations[ts._key] = "released" else: # pure data recommendations[ts._key] = "forgotten" + if ts._group._last_worker is ws: + ts._group._last_worker = None ws._has_what.clear() self.transitions(recommendations) @@ -6234,8 +6279,9 @@ async def retire_workers( logger.info("Retire workers %s", workers) # Keys orphaned by retiring those workers - keys = {k for w in workers for k in w.has_what} - keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} + tasks = {ts for w in workers for ts in w.has_what} + keys = {ts._key for ts in tasks if ts._who_has.issubset(workers)} + groups = {ts._group for ts in tasks} if keys: other_workers = set(parent._workers_dv.values()) - workers @@ -6250,6 +6296,10 @@ async def retire_workers( lock=False, ) + for group in groups: + if group._last_worker in workers: + group._last_worker = None + worker_keys = {ws._address: ws.identity() for ws in workers} if close_workers: await asyncio.gather( diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 04f266f8e61..1b6ff88ede1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import dask from dask import delayed -from dask.utils import apply +from dask.utils import apply, stringify from distributed import Client, Nanny, Worker, fire_and_forget, wait from distributed.comm import Comm @@ -126,6 +126,93 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): assert x.key in a.data or x.key in b.data +@pytest.mark.parametrize("ndeps", [0, 1, 4]) +@pytest.mark.parametrize( + "nthreads", + [ + [("127.0.0.1", 1)] * 5, + [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], + ], +) +def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): + @gen_cluster( + client=True, + nthreads=nthreads, + config={"distributed.scheduler.work-stealing": False}, + ) + async def test(c, s, *workers): + """Ensure that related tasks end up on the same node""" + da = pytest.importorskip("dask.array") + np = pytest.importorskip("numpy") + + if ndeps == 0: + x = da.random.random((100, 100), chunks=(10, 10)) + else: + + def random(**kwargs): + assert len(kwargs) == ndeps + return np.random.random((10, 10)) + + trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)} + + # TODO is there a simpler (non-blockwise) way to make this sort of graph? + x = da.blockwise( + random, + "yx", + new_axes={"y": (10,) * 10, "x": (10,) * 10}, + dtype=float, + **trivial_deps, + ) + + xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) + await xsum + + # Check that each chunk-row of the array is (mostly) stored on the same worker + primary_worker_key_fractions = [] + secondary_worker_key_fractions = [] + for i, keys in enumerate(x.__dask_keys__()): + # Iterate along rows of the array. + keys = set(stringify(k) for k in keys) + + # No more than 2 workers should have any keys + assert sum(any(k in w.data for k in keys) for w in workers) <= 2 + + # What fraction of the keys for this row does each worker hold? + key_fractions = [ + len(set(w.data).intersection(keys)) / len(keys) for w in workers + ] + key_fractions.sort() + # Primary worker: holds the highest percentage of keys + # Secondary worker: holds the second highest percentage of keys + primary_worker_key_fractions.append(key_fractions[-1]) + secondary_worker_key_fractions.append(key_fractions[-2]) + + # There may be one or two rows that were poorly split across workers, + # but the vast majority of rows should only be on one worker. + assert np.mean(primary_worker_key_fractions) >= 0.9 + assert np.median(primary_worker_key_fractions) == 1.0 + assert np.mean(secondary_worker_key_fractions) <= 0.1 + assert np.median(secondary_worker_key_fractions) == 0.0 + + # Check that there were few transfers + unexpected_transfers = [] + for worker in workers: + for log in worker.incoming_transfer_log: + keys = log["keys"] + # The root-ish tasks should never be transferred + assert not any(k.startswith("random") for k in keys), keys + # `object-` keys (the trivial deps of the root random tasks) should be transferred + if any(not k.startswith("object") for k in keys): + # But not many other things should be + unexpected_transfers.append(list(keys)) + + # A transfer at the very end to move aggregated results is fine (necessary with unbalanced workers in fact), + # but generally there should be very very few transfers. + assert len(unexpected_transfers) <= 2, unexpected_transfers + + test() + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = await client.scatter([1], workers=b.address) From cd5d55dd7443207f7edaec934c911fc21b60b6c8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 23 Jun 2021 18:39:38 -0600 Subject: [PATCH 02/20] Fix cython It doesn't like `sum(wws._nthreads for wws in valid_workers)` since that requires defining a closure. --- distributed/scheduler.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 34e9d112cee..a5abfd58366 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2335,11 +2335,14 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts.state = "no-worker" return ws - total_nthreads: Py_ssize_t = ( - self._total_nthreads - if valid_workers is None - else sum(wws._nthreads for wws in valid_workers) - ) + total_nthreads: Py_ssize_t + if valid_workers is None: + total_nthreads = self._total_nthreads + else: + total_nthreads = 0 + for ws in valid_workers: + total_nthreads += ws._nthreads + group_tasks_per_thread: double = ( (len(group) / total_nthreads) if total_nthreads > 0 else 0 ) From f4f1320f83955ec59af5f747e6412f770d111782 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 11:08:30 -0600 Subject: [PATCH 03/20] No root-ish when `valid_workers`; simplify --- distributed/scheduler.py | 50 +++++++++++++++------------------------- 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a5abfd58366..61a5ad02ae6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2335,40 +2335,28 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts.state = "no-worker" return ws - total_nthreads: Py_ssize_t - if valid_workers is None: - total_nthreads = self._total_nthreads - else: - total_nthreads = 0 - for ws in valid_workers: - total_nthreads += ws._nthreads - - group_tasks_per_thread: double = ( - (len(group) / total_nthreads) if total_nthreads > 0 else 0 - ) - if group_tasks_per_thread > 2 and sum(map(len, group._dependencies)) < 5: - # Group is larger than cluster with very few dependencies; minimize future data transfers. - ws = group._last_worker - if not (ws and valid_workers is not None and ws not in valid_workers): - if ( - ws - and ws._occupancy / ws._nthreads / self.get_task_duration(ts) - < group_tasks_per_thread - ): - # Schedule sequential tasks onto the same worker until it's filled up. - # Assumes `decide_worker` is being called in priority order. - return ws - + # Group is larger than cluster with few dependencies? Minimize future data transfers. + if ( + valid_workers is None + and len(self._workers_dv) > 1 + and len(group) > self._total_nthreads * 2 + and sum(map(len, group._dependencies)) < 5 + ): + last: WorkerState = group._last_worker + tasks_per_thread = len(group) / self._total_nthreads + if ( + last + and last._occupancy / last._nthreads / self.get_task_duration(ts) + < tasks_per_thread + ): + # Schedule sequential tasks onto the same worker until it's filled up. + # Assumes `decide_worker` is being called in priority order. + return last + else: # Pick a new worker for the next few tasks, considering all possible workers - worker_pool = ( - valid_workers - if valid_workers is not None - else (self._idle_dv or self._workers_dv).values() - ) ws = min( - worker_pool, + (self._idle_dv or self._workers_dv).values(), key=partial(self.worker_objective, ts), - default=None, ) group._last_worker = ws return ws From f1b38360fc945cc051578a14aaf71a2cca5f8cc2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 11:32:32 -0600 Subject: [PATCH 04/20] Simplify no-workers case --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 61a5ad02ae6..b4ec05dcd17 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2321,6 +2321,9 @@ def decide_worker(self, ts: TaskState) -> WorkerState: """ Decide on a worker for task *ts*. Return a WorkerState. """ + if not self._workers_dv: + return None + ws: WorkerState = None group: TaskGroup = ts._group valid_workers: set = self.valid_workers(ts) @@ -2329,7 +2332,6 @@ def decide_worker(self, ts: TaskState) -> WorkerState: valid_workers is not None and not valid_workers and not ts._loose_restrictions - and self._workers_dv ): self._unrunnable.add(ts) ts.state = "no-worker" @@ -2338,7 +2340,6 @@ def decide_worker(self, ts: TaskState) -> WorkerState: # Group is larger than cluster with few dependencies? Minimize future data transfers. if ( valid_workers is None - and len(self._workers_dv) > 1 and len(group) > self._total_nthreads * 2 and sum(map(len, group._dependencies)) < 5 ): From fbcb07cb5b9be15eea0f371494b62e8d4bdc3685 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 12:35:57 -0600 Subject: [PATCH 05/20] Update work-stealing test --- distributed/tests/test_steal.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index ee2695cea87..fb38c2ff04e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -146,21 +146,18 @@ async def test_steal_related_tasks(e, s, a, b, c): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000) async def test_dont_steal_fast_tasks_compute_time(c, s, *workers): - np = pytest.importorskip("numpy") - x = c.submit(np.random.random, 10000000, workers=workers[0].address) - def do_nothing(x, y=None): pass - # execute and measure runtime once - await wait(c.submit(do_nothing, 1)) + xs = c.map(do_nothing, range(10), workers=workers[0].address) + await wait(xs) - futures = c.map(do_nothing, range(1000), y=x) + futures = c.map(do_nothing, range(1000), y=xs) await wait(futures) - assert len(s.who_has[x.key]) == 1 - assert len(s.has_what[workers[0].address]) == 1001 + assert len(set.union(*(s.who_has[x.key] for x in xs))) == 1 + assert len(s.has_what[workers[0].address]) == len(xs) + len(futures) @gen_cluster(client=True) From 51dfbc42839a07993e234efaa2b8020243c3159a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 12:44:23 -0600 Subject: [PATCH 06/20] Check worker exists; dont try to clear last_worker --- distributed/scheduler.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b4ec05dcd17..1b5133c19df 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2347,6 +2347,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: tasks_per_thread = len(group) / self._total_nthreads if ( last + and last._address in self._workers_dv and last._occupancy / last._nthreads / self.get_task_duration(ts) < tasks_per_thread ): @@ -4696,8 +4697,6 @@ async def remove_worker(self, comm=None, address=None, safe=False, close=True): recommendations[ts._key] = "released" else: # pure data recommendations[ts._key] = "forgotten" - if ts._group._last_worker is ws: - ts._group._last_worker = None ws._has_what.clear() self.transitions(recommendations) @@ -6271,9 +6270,8 @@ async def retire_workers( logger.info("Retire workers %s", workers) # Keys orphaned by retiring those workers - tasks = {ts for w in workers for ts in w.has_what} - keys = {ts._key for ts in tasks if ts._who_has.issubset(workers)} - groups = {ts._group for ts in tasks} + keys = {k for w in workers for k in w.has_what} + keys = {ts._key for ts in keys if ts._who_has.issubset(workers)} if keys: other_workers = set(parent._workers_dv.values()) - workers @@ -6288,10 +6286,6 @@ async def retire_workers( lock=False, ) - for group in groups: - if group._last_worker in workers: - group._last_worker = None - worker_keys = {ws._address: ws.identity() for ws in workers} if close_workers: await asyncio.gather( From 4298657dce2d29e095270ba8350437785df344eb Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 13:23:35 -0600 Subject: [PATCH 07/20] Clear last_worker on final task --- distributed/scheduler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1b5133c19df..5046bbec5ba 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2345,6 +2345,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ): last: WorkerState = group._last_worker tasks_per_thread = len(group) / self._total_nthreads + if ( last and last._address in self._workers_dv @@ -2353,15 +2354,19 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ): # Schedule sequential tasks onto the same worker until it's filled up. # Assumes `decide_worker` is being called in priority order. - return last + ws = last else: # Pick a new worker for the next few tasks, considering all possible workers ws = min( (self._idle_dv or self._workers_dv).values(), key=partial(self.worker_objective, ts), ) - group._last_worker = ws - return ws + + # Record `last_worker`, or clear it on the final task + group._last_worker = ( + ws if group.states["released"] + group.states["waiting"] > 1 else None + ) + return ws if ts._dependencies or valid_workers is not None: ws = decide_worker( From 941a836c907d1362f1e8193916653b3cddd4b3af Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 13:31:40 -0600 Subject: [PATCH 08/20] Simplify? Is this easier or harder to read? --- distributed/scheduler.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5046bbec5ba..265d136db98 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2343,20 +2343,16 @@ def decide_worker(self, ts: TaskState) -> WorkerState: and len(group) > self._total_nthreads * 2 and sum(map(len, group._dependencies)) < 5 ): - last: WorkerState = group._last_worker + ws: WorkerState = group._last_worker tasks_per_thread = len(group) / self._total_nthreads - if ( - last - and last._address in self._workers_dv - and last._occupancy / last._nthreads / self.get_task_duration(ts) + if not ( + ws + and ws._address in self._workers_dv + and ws._occupancy / ws._nthreads / self.get_task_duration(ts) < tasks_per_thread ): - # Schedule sequential tasks onto the same worker until it's filled up. - # Assumes `decide_worker` is being called in priority order. - ws = last - else: - # Pick a new worker for the next few tasks, considering all possible workers + # Last-used worker is full or unknown; pick a new worker for the next few tasks ws = min( (self._idle_dv or self._workers_dv).values(), key=partial(self.worker_objective, ts), From 103006a0341c23fd31ea7be23385bd67ee0f2e8e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 15:38:47 -0600 Subject: [PATCH 09/20] Back to counting --- distributed/scheduler.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 265d136db98..7480e32c9d6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -941,6 +941,7 @@ class TaskGroup: _stop: double _all_durations: object _last_worker: WorkerState + _last_worker_tasks_left: Py_ssize_t def __init__(self, name: str): self._name = name @@ -955,6 +956,7 @@ def __init__(self, name: str): self._stop = 0.0 self._all_durations = defaultdict(float) self._last_worker = None + self._last_worker_tasks_left = 0 @property def name(self): @@ -1000,6 +1002,10 @@ def stop(self): def last_worker(self): return self._last_worker + @property + def last_worker_tasks_left(self): + return self._last_worker_tasks_left + @ccall def add(self, o): ts: TaskState = o @@ -2344,24 +2350,24 @@ def decide_worker(self, ts: TaskState) -> WorkerState: and sum(map(len, group._dependencies)) < 5 ): ws: WorkerState = group._last_worker - tasks_per_thread = len(group) / self._total_nthreads if not ( - ws - and ws._address in self._workers_dv - and ws._occupancy / ws._nthreads / self.get_task_duration(ts) - < tasks_per_thread + ws and group._last_worker_tasks_left and ws._address in self._workers_dv ): # Last-used worker is full or unknown; pick a new worker for the next few tasks ws = min( (self._idle_dv or self._workers_dv).values(), key=partial(self.worker_objective, ts), ) + group._last_worker_tasks_left = math.floor( + (len(group) / self._total_nthreads) * ws._nthreads + ) # Record `last_worker`, or clear it on the final task group._last_worker = ( ws if group.states["released"] + group.states["waiting"] > 1 else None ) + group._last_worker_tasks_left -= 1 return ws if ts._dependencies or valid_workers is not None: From 7aa691ab1636171e2a3ef7e2d13ac37c136c581c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 15:58:37 -0600 Subject: [PATCH 10/20] Debug `test_lifetime` failing in CI --- distributed/tests/test_worker.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2f3a7f58ede..e99f7bfab3a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1587,14 +1587,18 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.asyncio async def test_lifetime(cleanup): async with Scheduler() as s: - async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: + async with Worker(s.address, lifetime="1 seconds") as a, Worker(s.address) as b: async with Client(s.address, asynchronous=True) as c: futures = c.map(slowinc, range(200), delay=0.1) + assert a.status == Status.running await asyncio.sleep(1.5) - assert b.status != Status.running - await b.finished() + assert a.status != Status.running + await a.finished() - assert set(b.data).issubset(a.data) # successfully moved data over + assert set(a.data).issubset(b.data), ( + list(a.data), + list(b.data), + ) # successfully moved data over @gen_cluster(client=True, worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) From 7041b11398d2cf549049abf190cdb26e8d321c8b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 17:10:40 -0600 Subject: [PATCH 11/20] Apparantly sometimes this can be worse in CI --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 1b6ff88ede1..04587c87d92 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -208,7 +208,7 @@ def random(**kwargs): # A transfer at the very end to move aggregated results is fine (necessary with unbalanced workers in fact), # but generally there should be very very few transfers. - assert len(unexpected_transfers) <= 2, unexpected_transfers + assert len(unexpected_transfers) <= 3, unexpected_transfers test() From dfd327b120a271d8b76e9817e32c82838e332390 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 17:15:53 -0600 Subject: [PATCH 12/20] Revert "Debug `test_lifetime` failing in CI" This reverts commit 7aa691ab1636171e2a3ef7e2d13ac37c136c581c. --- distributed/tests/test_worker.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index e99f7bfab3a..2f3a7f58ede 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1587,18 +1587,14 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.asyncio async def test_lifetime(cleanup): async with Scheduler() as s: - async with Worker(s.address, lifetime="1 seconds") as a, Worker(s.address) as b: + async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: async with Client(s.address, asynchronous=True) as c: futures = c.map(slowinc, range(200), delay=0.1) - assert a.status == Status.running await asyncio.sleep(1.5) - assert a.status != Status.running - await a.finished() + assert b.status != Status.running + await b.finished() - assert set(a.data).issubset(b.data), ( - list(a.data), - list(b.data), - ) # successfully moved data over + assert set(b.data).issubset(a.data) # successfully moved data over @gen_cluster(client=True, worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) From 93a9ede559aac9fde0d0dc4aba0f478074ffb3c5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Jun 2021 17:17:15 -0600 Subject: [PATCH 13/20] How about this for `test_lifetime`? --- distributed/tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2f3a7f58ede..f2421c90b44 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1589,12 +1589,12 @@ async def test_lifetime(cleanup): async with Scheduler() as s: async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: async with Client(s.address, asynchronous=True) as c: - futures = c.map(slowinc, range(200), delay=0.1) + futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) await asyncio.sleep(1.5) assert b.status != Status.running await b.finished() - assert set(b.data).issubset(a.data) # successfully moved data over + assert set(b.data) == set(a.data) # successfully moved data over @gen_cluster(client=True, worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) From 7ab7999aafbc904d04a3388f72acc57c2c9c3bf9 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Jun 2021 11:55:26 -0600 Subject: [PATCH 14/20] Rename `decide_worker` for clarity --- distributed/scheduler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7480e32c9d6..9ca2e98a602 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2371,7 +2371,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: return ws if ts._dependencies or valid_workers is not None: - ws = decide_worker( + ws = decide_worker_from_deps_and_restrictions( ts, self._workers_dv.values(), valid_workers, @@ -7508,7 +7508,7 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @cfunc @exceptval(check=False) -def decide_worker( +def decide_worker_from_deps_and_restrictions( ts: TaskState, all_workers, valid_workers: set, objective ) -> WorkerState: """ @@ -7545,7 +7545,9 @@ def decide_worker( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective) + ws = decide_worker_from_deps_and_restrictions( + ts, all_workers, None, objective + ) return ws ncandidates: Py_ssize_t = len(candidates) From de752ed3c21fa1a3bef2ec2ca119b1b810c6a767 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Jun 2021 15:13:17 -0600 Subject: [PATCH 15/20] Docstring for `decide_worker` --- distributed/scheduler.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9ca2e98a602..ad583de7e52 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2325,7 +2325,16 @@ def transition_no_worker_waiting(self, key): @exceptval(check=False) def decide_worker(self, ts: TaskState) -> WorkerState: """ - Decide on a worker for task *ts*. Return a WorkerState. + Decide on a worker for task *ts*. Return a WorkerState. + + If it's a root or root-like task, we place it with its relatives to + reduce future data tansfer. + + If it has dependencies or restrictions, we use + `decide_worker_from_deps_and_restrictions`. + + Otherwise, we pick the least occupied worker, or pick from all workers + in a round-robin fashion. """ if not self._workers_dv: return None From b0aeef4a35501c3dfc33d824f7aa2cae815ded65 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Jun 2021 15:13:47 -0600 Subject: [PATCH 16/20] Attribute docstrings for `TaskGroup` --- distributed/scheduler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ad583de7e52..9c978390baa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -925,6 +925,16 @@ class TaskGroup: The result types of this TaskGroup + .. attribute:: last_worker: WorkerState + + The worker most recently assigned a task from this group, or None when the group + is not identified to be root-like by `SchedulerState.decide_worker`. + + .. attribute:: last_worker_tasks_left: int + + If `last_worker` is not None, the number of times that worker should be assigned + subsequent tasks until a new worker is chosen. + See also -------- TaskPrefix From 880d13382252ec2c65832ff31a77f28be286c50d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Jun 2021 20:26:40 -0600 Subject: [PATCH 17/20] Docstring for test. Good or too much? --- distributed/tests/test_scheduler.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 04587c87d92..f4495e1d593 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -141,7 +141,28 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): config={"distributed.scheduler.work-stealing": False}, ) async def test(c, s, *workers): - """Ensure that related tasks end up on the same node""" + r""" + Ensure that sibling root tasks are scheduled to the same node, reducing future data transfer. + + We generate a wide layer of "root" tasks (random NumPy arrays). All of those tasks share 0-5 + trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are most common in real-world use + (``ndeps=1`` is basically ``da.from_array(..., inline_array=False)`` or ``da.from_zarr``). + The graph is structured like this (though the number of tasks and workers is different): + + |-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling + + q r s t < --- `sum-aggregate-` + / \ / \ / \ / \ + i j k l m n o p < --- `sum-` + | | | | | | | | + a b c d e f g h < --- `random-` + \ \ \ | | / / / + TRIVIAL * 0..5 + + Neighboring `random-` tasks should be scheduled on the same worker. We test that generally, + only one worker holds each row of the array, that the `random-` tasks are never transferred, + and that there are few transfers overall. + """ da = pytest.importorskip("dask.array") np = pytest.importorskip("numpy") From 2616d80d7fea25753f516473e456b2eb9c657bf5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 28 Jun 2021 20:38:15 -0600 Subject: [PATCH 18/20] Revert "Rename `decide_worker` for clarity" This reverts commit 7ab7999aafbc904d04a3388f72acc57c2c9c3bf9. --- distributed/scheduler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9c978390baa..dbaf1579672 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2390,7 +2390,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: return ws if ts._dependencies or valid_workers is not None: - ws = decide_worker_from_deps_and_restrictions( + ws = decide_worker( ts, self._workers_dv.values(), valid_workers, @@ -7527,7 +7527,7 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @cfunc @exceptval(check=False) -def decide_worker_from_deps_and_restrictions( +def decide_worker( ts: TaskState, all_workers, valid_workers: set, objective ) -> WorkerState: """ @@ -7564,9 +7564,7 @@ def decide_worker_from_deps_and_restrictions( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker_from_deps_and_restrictions( - ts, all_workers, None, objective - ) + ws = decide_worker(ts, all_workers, None, objective) return ws ncandidates: Py_ssize_t = len(candidates) From 91aee925271e938f25a5c647a907ef8c63b74ede Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 29 Jun 2021 16:22:24 -0600 Subject: [PATCH 19/20] Short-circuit deps len See speedscope profile from https://github.com/gjoseph92/dask-profiling-coiled/tree/main/results#purepy-shuffle-gc-coassign. 1.6% scheduler time was spent summing the number of deps of the TaskGroup---small but not nothing. For large TaksGroups, we can easily short-circuit that. --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index dbaf1579672..87dd75bce16 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2366,6 +2366,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ( valid_workers is None and len(group) > self._total_nthreads * 2 + and len(group._dependencies) < 5 and sum(map(len, group._dependencies)) < 5 ): ws: WorkerState = group._last_worker From 964d2ea2a11c75d6862c6fb4f5a37dd2a1170d9b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 30 Jun 2021 15:38:16 -0600 Subject: [PATCH 20/20] Revert "Short-circuit deps len" This reverts commit 91aee925271e938f25a5c647a907ef8c63b74ede. Comparing py-spy profiles for https://github.com/gjoseph92/dask-profiling-coiled/tree/main/results#purepy-shuffle-nogc-coassign and https://github.com/gjoseph92/dask-profiling-coiled/tree/main/results#purepy-shuffle-nogc-coassign-short-circuit-len, we can see that what I initially thought was a very slow `len()` call in https://github.com/gjoseph92/dask-profiling-coiled/tree/main/results#purepy-shuffle-gc-coassign was actually just a GC artifact. `len()` is actually just taking a tiny amount of time in either case. On top of that, having >5 TaskGroups as dependencies seems pretty rare, so we probably don't need this line. --- distributed/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 87dd75bce16..dbaf1579672 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2366,7 +2366,6 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if ( valid_workers is None and len(group) > self._total_nthreads * 2 - and len(group._dependencies) < 5 and sum(map(len, group._dependencies)) < 5 ): ws: WorkerState = group._last_worker