From 6ae4ed48bfd0137d9f322f2dff95df9fbc895dc9 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 16 Sep 2021 19:47:26 +0100 Subject: [PATCH 1/6] Merge AMM and WSMR --- distributed/active_memory_manager.py | 35 +++++++++++++------ .../tests/test_active_memory_manager.py | 2 +- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index c2bbe7ccd3c..39cce08c7a2 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio from collections import defaultdict from collections.abc import Generator from typing import TYPE_CHECKING @@ -10,6 +9,7 @@ import dask from dask.utils import parse_timedelta +from .metrics import time from .utils import import_term if TYPE_CHECKING: @@ -131,23 +131,36 @@ def run_once(self, comm=None) -> None: for ts, (pending_repl, pending_drop) in self.pending.items(): if not ts.who_has: continue - who_has = [ws_snd.address for ws_snd in ts.who_has - pending_drop] + who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop} assert who_has # Never drop the last replica for ws_rec in pending_repl: assert ws_rec not in ts.who_has - repl_by_worker[ws_rec.address][ts.key] = who_has + repl_by_worker[ws_rec.address][ts] = who_has for ws in pending_drop: assert ws in ts.who_has - drop_by_worker[ws.address].add(ts.key) + drop_by_worker[ws.address].add(ts) # Fire-and-forget enact recommendations from policies - # This is temporary code, waiting for - # https://github.com/dask/distributed/pull/5046 - for addr, who_has in repl_by_worker.items(): - asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has)) - for addr, keys in drop_by_worker.items(): - asyncio.create_task(self.scheduler.delete_worker_data(addr, keys)) - # End temporary code + stimulus_id = str(time()) + for addr, ts_to_who_has in repl_by_worker.items(): + self.scheduler.stream_comms[addr].send( + { + "op": "acquire-replicas", + "keys": [ts.key for ts in ts_to_who_has], + "stimulus_id": "acquire-replicas-" + stimulus_id, + "priorities": {ts.key: ts.priority for ts in ts_to_who_has}, + "who_has": {ts.key: v for ts, v in ts_to_who_has.items()}, + }, + ) + + for addr, tss in drop_by_worker.items(): + self.scheduler.stream_comms[addr].send( + { + "op": "remove-replicas", + "keys": [ts.key for ts in tss], + "stimulus_id": "remove-replicas-" + stimulus_id, + } + ) finally: del self.workers_memory diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index c7c747b8507..400667e4dc9 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -343,7 +343,7 @@ def run(self): yield "drop", ts, {ws} -@pytest.mark.xfail(reason="distributed#5046, distributed#5265") +@pytest.mark.xfail(reason="distributed#5265") @pytest.mark.slow @gen_cluster( client=True, From 1b755a0618a8cb7bd8204077b3c8ace9dc4869a7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 29 Sep 2021 08:38:35 +0100 Subject: [PATCH 2/6] Immediately drop scheduler-side replica --- distributed/active_memory_manager.py | 24 ++++++++++++------- .../tests/test_active_memory_manager.py | 2 -- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 1a4c6853751..d8c35826a85 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -126,8 +126,11 @@ def run_once(self, comm=None) -> None: # populate self.pending self._run_policies() - drop_by_worker = defaultdict(set) - repl_by_worker = defaultdict(dict) + drop_by_worker: defaultdict[WorkerState, set[TaskState]] = defaultdict(set) + repl_by_worker: defaultdict[ + WorkerState, dict[TaskState, set[WorkerState]] + ] = defaultdict(dict) + for ts, (pending_repl, pending_drop) in self.pending.items(): if not ts.who_has: continue @@ -135,15 +138,15 @@ def run_once(self, comm=None) -> None: assert who_has # Never drop the last replica for ws_rec in pending_repl: assert ws_rec not in ts.who_has - repl_by_worker[ws_rec.address][ts] = who_has + repl_by_worker[ws_rec][ts] = who_has for ws in pending_drop: assert ws in ts.who_has - drop_by_worker[ws.address].add(ts) + drop_by_worker[ws].add(ts) # Fire-and-forget enact recommendations from policies stimulus_id = str(time()) - for addr, ts_to_who_has in repl_by_worker.items(): - self.scheduler.stream_comms[addr].send( + for ws_rec, ts_to_who_has in repl_by_worker.items(): + self.scheduler.stream_comms[ws_rec.address].send( { "op": "acquire-replicas", "keys": [ts.key for ts in ts_to_who_has], @@ -153,8 +156,13 @@ def run_once(self, comm=None) -> None: }, ) - for addr, tss in drop_by_worker.items(): - self.scheduler.stream_comms[addr].send( + for ws, tss in drop_by_worker.items(): + # The scheduler immediately forgets about the replica and suggests the + # worker to drop it. The worker may refuse, at which point it will send + # back an add-keys message to reinstate it. + for ts in tss: + self.scheduler.remove_replica(ts, ws) + self.scheduler.stream_comms[ws.address].send( { "op": "remove-replicas", "keys": [ts.key for ts in tss], diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 400667e4dc9..243f6772fc7 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -209,7 +209,6 @@ async def test_drop_with_waiter(c, s, a, b): assert not y2.done() -@pytest.mark.xfail(reason="distributed#5265") @gen_cluster(client=True, config=NO_AMM_START) async def test_double_drop(c, s, a, b): """An AMM drop policy runs once to drop one of the two replicas of a key. @@ -343,7 +342,6 @@ def run(self): yield "drop", ts, {ws} -@pytest.mark.xfail(reason="distributed#5265") @pytest.mark.slow @gen_cluster( client=True, From 5d610014569035da5f35a4269d51f8ecd2a6232f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 29 Sep 2021 13:43:41 +0100 Subject: [PATCH 3/6] fix flaky test_drop_stress --- distributed/active_memory_manager.py | 119 +++++++++--------- .../tests/test_active_memory_manager.py | 118 +++++++++++------ 2 files changed, 140 insertions(+), 97 deletions(-) diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index d8c35826a85..9ce06dac96b 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -10,7 +10,7 @@ from dask.utils import parse_timedelta from .metrics import time -from .utils import import_term +from .utils import import_term, log_errors if TYPE_CHECKING: from .scheduler import SchedulerState, TaskState, WorkerState @@ -115,64 +115,67 @@ def run_once(self, comm=None) -> None: """Run all policies once and asynchronously (fire and forget) enact their recommendations to replicate/drop keys """ - # This should never fail since this is a synchronous method - assert not hasattr(self, "pending") - - self.pending = defaultdict(lambda: (set(), set())) - self.workers_memory = { - w: w.memory.optimistic for w in self.scheduler.workers.values() - } - try: - # populate self.pending - self._run_policies() - - drop_by_worker: defaultdict[WorkerState, set[TaskState]] = defaultdict(set) - repl_by_worker: defaultdict[ - WorkerState, dict[TaskState, set[WorkerState]] - ] = defaultdict(dict) - - for ts, (pending_repl, pending_drop) in self.pending.items(): - if not ts.who_has: - continue - who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop} - assert who_has # Never drop the last replica - for ws_rec in pending_repl: - assert ws_rec not in ts.who_has - repl_by_worker[ws_rec][ts] = who_has - for ws in pending_drop: - assert ws in ts.who_has - drop_by_worker[ws].add(ts) - - # Fire-and-forget enact recommendations from policies - stimulus_id = str(time()) - for ws_rec, ts_to_who_has in repl_by_worker.items(): - self.scheduler.stream_comms[ws_rec.address].send( - { - "op": "acquire-replicas", - "keys": [ts.key for ts in ts_to_who_has], - "stimulus_id": "acquire-replicas-" + stimulus_id, - "priorities": {ts.key: ts.priority for ts in ts_to_who_has}, - "who_has": {ts.key: v for ts, v in ts_to_who_has.items()}, - }, + with log_errors(): + # This should never fail since this is a synchronous method + assert not hasattr(self, "pending") + + self.pending = defaultdict(lambda: (set(), set())) + self.workers_memory = { + w: w.memory.optimistic for w in self.scheduler.workers.values() + } + try: + # populate self.pending + self._run_policies() + + drop_by_worker: defaultdict[WorkerState, set[TaskState]] = defaultdict( + set ) - - for ws, tss in drop_by_worker.items(): - # The scheduler immediately forgets about the replica and suggests the - # worker to drop it. The worker may refuse, at which point it will send - # back an add-keys message to reinstate it. - for ts in tss: - self.scheduler.remove_replica(ts, ws) - self.scheduler.stream_comms[ws.address].send( - { - "op": "remove-replicas", - "keys": [ts.key for ts in tss], - "stimulus_id": "remove-replicas-" + stimulus_id, - } - ) - - finally: - del self.workers_memory - del self.pending + repl_by_worker: defaultdict[ + WorkerState, dict[TaskState, set[WorkerState]] + ] = defaultdict(dict) + + for ts, (pending_repl, pending_drop) in self.pending.items(): + if not ts.who_has: + continue + who_has = {ws_snd.address for ws_snd in ts.who_has - pending_drop} + assert who_has # Never drop the last replica + for ws_rec in pending_repl: + assert ws_rec not in ts.who_has + repl_by_worker[ws_rec][ts] = who_has + for ws in pending_drop: + assert ws in ts.who_has + drop_by_worker[ws].add(ts) + + # Fire-and-forget enact recommendations from policies + stimulus_id = str(time()) + for ws_rec, ts_to_who_has in repl_by_worker.items(): + self.scheduler.stream_comms[ws_rec.address].send( + { + "op": "acquire-replicas", + "keys": [ts.key for ts in ts_to_who_has], + "stimulus_id": "acquire-replicas-" + stimulus_id, + "priorities": {ts.key: ts.priority for ts in ts_to_who_has}, + "who_has": {ts.key: v for ts, v in ts_to_who_has.items()}, + }, + ) + + for ws, tss in drop_by_worker.items(): + # The scheduler immediately forgets about the replica and suggests + # the worker to drop it. The worker may refuse, at which point it + # will send back an add-keys message to reinstate it. + for ts in tss: + self.scheduler.remove_replica(ts, ws) + self.scheduler.stream_comms[ws.address].send( + { + "op": "remove-replicas", + "keys": [ts.key for ts in tss], + "stimulus_id": "remove-replicas-" + stimulus_id, + } + ) + + finally: + del self.workers_memory + del self.pending def _run_policies(self) -> None: """Sequentially run ActiveMemoryManagerPolicy.run() for all registered policies, diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 243f6772fc7..af5726bbaaa 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -1,5 +1,6 @@ import asyncio import random +from time import time import pytest @@ -328,45 +329,6 @@ async def test_drop_with_bad_candidates(c, s, a, b): assert s.tasks["x"].who_has == {ws0, ws1} -class DropEverything(ActiveMemoryManagerPolicy): - """Inanely suggest to drop every single key in the cluster""" - - def run(self): - for ts in self.manager.scheduler.tasks.values(): - # Instead of yielding ("drop", ts, None) for each worker, which would result - # in semi-predictable output about which replica survives, randomly choose a - # different survivor at each AMM run. - candidates = list(ts.who_has) - random.shuffle(candidates) - for ws in candidates: - yield "drop", ts, {ws} - - -@pytest.mark.slow -@gen_cluster( - client=True, - nthreads=[("", 1)] * 8, - Worker=Nanny, - config={ - "distributed.scheduler.active-memory-manager.start": True, - "distributed.scheduler.active-memory-manager.interval": 0.1, - "distributed.scheduler.active-memory-manager.policies": [ - {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, - ], - }, -) -async def test_drop_stress(c, s, *nannies): - """A policy which suggests dropping everything won't break a running computation, - but only slow it down. - """ - import dask.array as da - - rng = da.random.RandomState(0) - a = rng.random((20, 20), chunks=(1, 1)) - b = (a @ a.T).sum().round(3) - assert await c.compute(b) == 2134.398 - - @gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("replicate", n=2)) async def test_replicate(c, s, *workers): futures = await c.scatter({"x": 123}) @@ -494,3 +456,81 @@ async def test_ReduceReplicas(c, s, *workers): s.extensions["amm"].run_once() while len(s.tasks["x"].who_has) > 1: await asyncio.sleep(0.01) + + +class DropEverything(ActiveMemoryManagerPolicy): + """Inanely suggest to drop every single key in the cluster""" + + def __init__(self): + self.start = None + + def run(self): + # Run for 5s every 0.1s, then let the computation finish + if not self.start: + self.start = time() + elif time() > self.start + 5: + self.manager.policies.remove(self) + return + + for ts in self.manager.scheduler.tasks.values(): + # Instead of yielding ("drop", ts, None) for each worker, which would result + # in semi-predictable output about which replica survives, randomly choose a + # different survivor at each AMM run. + candidates = list(ts.who_has) + random.shuffle(candidates) + for ws in candidates: + yield "drop", ts, {ws} + + +@pytest.mark.slow +@gen_cluster( + client=True, + nthreads=[("", 1)] * 8, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, + ], + }, +) +async def test_drop_stress(c, s, *nannies): + """A policy which suggests dropping everything won't break a running computation, + but only slow it down. + + See also: test_ReduceReplicas_stress + """ + da = pytest.importorskip("dask.array") + + rng = da.random.RandomState(0) + a = rng.random((20, 20), chunks=(1, 1)) + b = (a @ a.T).sum().round(3) + assert await c.compute(b) == 2134.398 + + +@pytest.mark.slow +@gen_cluster( + client=True, + nthreads=[("", 1)] * 8, + Worker=Nanny, + config={ + "distributed.scheduler.active-memory-manager.start": True, + "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.policies": [ + {"class": "distributed.active_memory_manager.ReduceReplicas"}, + ], + }, + timeout=120, +) +async def test_ReduceReplicas_stress(c, s, *nannies): + """Running ReduceReplicas compulsively won't break a running computation. Unlike + test_drop_stress above, this test does not stop running after a few seconds - the + policy must not disrupt the computation too much. + """ + da = pytest.importorskip("dask.array") + + rng = da.random.RandomState(0) + a = rng.random((20, 20), chunks=(1, 1)) + b = (a @ a.T).sum().round(3) + assert await c.compute(b) == 2134.398 From ae9fe588c49e4309c81056c7d546ec6539aa5bee Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 29 Sep 2021 14:24:09 +0100 Subject: [PATCH 4/6] relax timings --- .../tests/test_active_memory_manager.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index af5726bbaaa..750f19a52d7 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -1,6 +1,5 @@ import asyncio import random -from time import time import pytest @@ -462,16 +461,9 @@ class DropEverything(ActiveMemoryManagerPolicy): """Inanely suggest to drop every single key in the cluster""" def __init__(self): - self.start = None + self.i = 0 def run(self): - # Run for 5s every 0.1s, then let the computation finish - if not self.start: - self.start = time() - elif time() > self.start + 5: - self.manager.policies.remove(self) - return - for ts in self.manager.scheduler.tasks.values(): # Instead of yielding ("drop", ts, None) for each worker, which would result # in semi-predictable output about which replica survives, randomly choose a @@ -481,6 +473,10 @@ def run(self): for ws in candidates: yield "drop", ts, {ws} + self.i += 1 + if self.i == 5: + self.manager.policies.remove(self) + @pytest.mark.slow @gen_cluster( @@ -489,7 +485,7 @@ def run(self): Worker=Nanny, config={ "distributed.scheduler.active-memory-manager.start": True, - "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.interval": 0.5, "distributed.scheduler.active-memory-manager.policies": [ {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, ], @@ -516,7 +512,7 @@ async def test_drop_stress(c, s, *nannies): Worker=Nanny, config={ "distributed.scheduler.active-memory-manager.start": True, - "distributed.scheduler.active-memory-manager.interval": 0.1, + "distributed.scheduler.active-memory-manager.interval": 0.5, "distributed.scheduler.active-memory-manager.policies": [ {"class": "distributed.active_memory_manager.ReduceReplicas"}, ], From 1b77b260951d4077cdc1d167e585fbb4de0193de Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 29 Sep 2021 17:50:33 +0100 Subject: [PATCH 5/6] xfail tests --- distributed/tests/test_active_memory_manager.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 750f19a52d7..10b41ce8e38 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -479,13 +479,14 @@ def run(self): @pytest.mark.slow +@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") @gen_cluster( client=True, - nthreads=[("", 1)] * 8, + nthreads=[("", 1)] * 4, Worker=Nanny, config={ "distributed.scheduler.active-memory-manager.start": True, - "distributed.scheduler.active-memory-manager.interval": 0.5, + "distributed.scheduler.active-memory-manager.interval": 0.1, "distributed.scheduler.active-memory-manager.policies": [ {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, ], @@ -506,13 +507,14 @@ async def test_drop_stress(c, s, *nannies): @pytest.mark.slow +@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") @gen_cluster( client=True, - nthreads=[("", 1)] * 8, + nthreads=[("", 1)] * 4, Worker=Nanny, config={ "distributed.scheduler.active-memory-manager.start": True, - "distributed.scheduler.active-memory-manager.interval": 0.5, + "distributed.scheduler.active-memory-manager.interval": 0.1, "distributed.scheduler.active-memory-manager.policies": [ {"class": "distributed.active_memory_manager.ReduceReplicas"}, ], From b59e65dad4f845bb474ad2c290a84d26a99061ea Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 30 Sep 2021 12:34:13 +0100 Subject: [PATCH 6/6] Review --- .../tests/test_active_memory_manager.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 10b41ce8e38..66c29760bae 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -473,11 +473,21 @@ def run(self): for ws in candidates: yield "drop", ts, {ws} + # Stop running after ~2s self.i += 1 - if self.i == 5: + if self.i == 20: self.manager.policies.remove(self) +async def _tensordot_stress(c): + da = pytest.importorskip("dask.array") + + rng = da.random.RandomState(0) + a = rng.random((20, 20), chunks=(1, 1)) + b = (a @ a.T).sum().round(3) + assert await c.compute(b) == 2134.398 + + @pytest.mark.slow @pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") @gen_cluster( @@ -491,6 +501,7 @@ def run(self): {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, ], }, + timeout=120, ) async def test_drop_stress(c, s, *nannies): """A policy which suggests dropping everything won't break a running computation, @@ -498,12 +509,7 @@ async def test_drop_stress(c, s, *nannies): See also: test_ReduceReplicas_stress """ - da = pytest.importorskip("dask.array") - - rng = da.random.RandomState(0) - a = rng.random((20, 20), chunks=(1, 1)) - b = (a @ a.T).sum().round(3) - assert await c.compute(b) == 2134.398 + await _tensordot_stress(c) @pytest.mark.slow @@ -526,9 +532,4 @@ async def test_ReduceReplicas_stress(c, s, *nannies): test_drop_stress above, this test does not stop running after a few seconds - the policy must not disrupt the computation too much. """ - da = pytest.importorskip("dask.array") - - rng = da.random.RandomState(0) - a = rng.random((20, 20), chunks=(1, 1)) - b = (a @ a.T).sum().round(3) - assert await c.compute(b) == 2134.398 + await _tensordot_stress(c)