From c590029ebf1add85ab74e97940ba10ab3322927e Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 18 Jan 2023 16:07:57 +0100 Subject: [PATCH 1/9] Remove cancelled and resumed from PROCESSING --- distributed/worker_state_machine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 9ef087f546f..5b893f05045 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -88,8 +88,6 @@ "constrained", "executing", "long-running", - "cancelled", - "resumed", } READY: set[TaskStateState] = {"ready", "constrained"} # Valid states for a task that is found in TaskState.waiting_for_data From 411ddfb34e7f9e4b6e3a2f6e1c3dd307575bb6c4 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 18 Jan 2023 17:37:46 +0100 Subject: [PATCH 2/9] Do not allow for the worker to reject a drop replica request --- distributed/tests/test_worker.py | 105 +--------------------------- distributed/worker_state_machine.py | 36 ++++------ 2 files changed, 13 insertions(+), 128 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 992206abfdf..46b9e9eee81 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -21,7 +21,7 @@ import psutil import pytest -from tlz import first, merge, pluck, sliding_window +from tlz import first, pluck, sliding_window from tornado.ioloop import IOLoop import dask @@ -3026,109 +3026,6 @@ async def test_remove_replicas_simple(c, s, a, b): assert all(s.tasks[f.key].who_has == {s.workers[a.address]} for f in futs) -@gen_cluster( - client=True, - nthreads=[("", 1), ("", 6)], # Up to 5 threads of b will get stuck; read below - config=merge(NO_AMM, {"distributed.comm.recent-messages-log-length": 1_000}), -) -async def test_remove_replicas_while_computing(c, s, a, b): - futs = c.map(inc, range(10), workers=[a.address]) - dependents_event = distributed.Event() - - def some_slow(x, event): - if x % 2: - event.wait() - return x + 1 - - # All interesting things will happen on b - dependents = c.map(some_slow, futs, event=dependents_event, workers=[b.address]) - - while not any(f.key in b.state.tasks for f in dependents): - await asyncio.sleep(0.01) - - # The scheduler removes keys from who_has/has_what immediately - # Make sure the worker responds to the rejection and the scheduler corrects - # the state - ws = s.workers[b.address] - - def ws_has_futs(aggr_func): - nonlocal futs - return aggr_func(s.tasks[fut.key] in ws.has_what for fut in futs) - - # Wait for all futs to transfer over - while not ws_has_futs(all): - await asyncio.sleep(0.01) - - # Wait for some dependent tasks to be done. No more than half of the dependents can - # finish, as the others are blocked on dependents_event. - # Note: for this to work reliably regardless of scheduling order, we need to have 6+ - # threads. At the moment of writing it works with 2 because futures of Client.map - # are always scheduled from left to right, but we'd rather not rely on this - # assumption. - while not any(fut.status == "finished" for fut in dependents): - await asyncio.sleep(0.01) - assert not all(fut.status == "finished" for fut in dependents) - - # Try removing the initial keys - s.request_remove_replicas( - b.address, [fut.key for fut in futs], stimulus_id=f"test-{time()}" - ) - # Scheduler removed all keys immediately... - assert not ws_has_futs(any) - # ... but the state is properly restored for all tasks for which the dependent task - # isn't done yet - while not ws_has_futs(any): - await asyncio.sleep(0.01) - - # Let the remaining dependent tasks complete - await dependents_event.set() - await wait(dependents) - assert ws_has_futs(any) and not ws_has_futs(all) - - # If a request is rejected, the worker responds with an add-keys message to - # reenlist the key in the schedulers state system to avoid race conditions, - # see also https://github.com/dask/distributed/issues/5265 - rejections = set() - for msg in b.state.log: - if msg[0] == "remove-replica-rejected": - rejections.update(msg[1]) - assert rejections - - def answer_sent(key): - for batch in b.batched_stream.recent_message_log: - for msg in batch: - if "op" in msg and msg["op"] == "add-keys" and key in msg["keys"]: - return True - return False - - for rejected_key in rejections: - assert answer_sent(rejected_key) - - # Now that all dependent tasks are done, futs replicas may be removed. - # They might be already gone due to the above remove replica calls - s.request_remove_replicas( - b.address, - [fut.key for fut in futs if ws in s.tasks[fut.key].who_has], - stimulus_id=f"test-{time()}", - ) - - while any( - b.state.tasks[f.key].state != "released" for f in futs if f.key in b.state.tasks - ): - await asyncio.sleep(0.01) - - # The scheduler actually gets notified about the removed replica - while ws_has_futs(any): - await asyncio.sleep(0.01) - # A replica is still on workers[0] - assert all(len(s.tasks[f.key].who_has) == 1 for f in futs) - - del dependents, futs - - while any(w.state.tasks for w in (a, b)): - await asyncio.sleep(0.01) - - @gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM) async def test_who_has_consistent_remove_replicas(c, s, *workers): a = workers[0] diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 5b893f05045..0996e22b4fc 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -355,11 +355,6 @@ def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict: # Remove all Nones and empty containers return {k: v for k, v in out.items() if v} - def is_protected(self) -> bool: - return self.state in PROCESSING or any( - dep_ts.state in PROCESSING for dep_ts in self.dependents - ) - @dataclass class Instruction: @@ -2835,35 +2830,28 @@ def _handle_remove_replicas(self, ev: RemoveReplicasEvent) -> RecsInstrs: holding this unnecessary data, if the worker hasn't released the data itself, already. - This handler does not guarantee the task nor the data to be actually - released but only asks the worker to release the data on a best effort - guarantee. This protects from race conditions where the given keys may - already have been rescheduled for compute in which case the compute - would win and this handler is ignored. + This handler only releases tasks that are indeed in state memory. For stronger guarantees, see handler free_keys """ recommendations: Recs = {} instructions: Instructions = [] - rejected = [] for key in ev.keys: ts = self.tasks.get(key) if ts is None or ts.state != "memory": continue - if not ts.is_protected(): - self.log.append( - (ts.key, "remove-replica-confirmed", ev.stimulus_id, time()) - ) - recommendations[ts] = "released" - else: - rejected.append(key) - - if rejected: - self.log.append( - ("remove-replica-rejected", rejected, ev.stimulus_id, time()) - ) - instructions.append(AddKeysMsg(keys=rejected, stimulus_id=ev.stimulus_id)) + # If the task is still in executing, the scheduler should never have + # asked the worker to drop this key. + # We cannot simply forget it because there is a time window between + # setting the state to executing and preparing/collecting the data + # for the task. + # If a dependency was released during this time, this would pop up + # as a KeyError during execute which is hard to understand + if any(dep.state == "executing" for dep in ts.dependents): + raise RuntimeError("Encountered invalid state") + self.log.append((ts.key, "remove-replica", ev.stimulus_id, time())) + recommendations[ts] = "released" return recommendations, instructions From 20b7231b60d33100a6bad3bb47c1398b1299b4fc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 19 Jan 2023 15:59:52 +0100 Subject: [PATCH 3/9] Fix stories in test_resumed_cancelled_handle_compute --- distributed/tests/test_cancelled_state.py | 27 ++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 6be0a559a4b..7e4f4b4af7a 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -457,7 +457,7 @@ async def test_resumed_cancelled_handle_compute( Given the history of a task executing -> cancelled -> resumed(fetch) - A handle_compute should properly restore executing. + the scheduler should reject the result upon completion and reschedule the task. """ # This test is heavily using set_restrictions to simulate certain scheduler # decisions of placing keys @@ -535,12 +535,12 @@ async def release_all_futures(): (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), - (f3.key, "resumed", "memory", "memory", {}), + (f3.key, "resumed", "released", "cancelled", {}), ( f3.key, + "cancelled", "memory", "released", - "released", {f2.key: "released", f3.key: "forgotten"}, ), (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), @@ -558,11 +558,15 @@ async def release_all_futures(): (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), - (f3.key, "resumed", "error", "released", {f3.key: "fetch"}), - (f3.key, "fetch", "flight", "flight", {}), - (f3.key, "flight", "missing", "missing", {}), - (f3.key, "missing", "waiting", "waiting", {f2.key: "fetch"}), - (f3.key, "waiting", "ready", "ready", {f3.key: "executing"}), + (f3.key, "resumed", "released", "cancelled", {}), + ( + f3.key, + "cancelled", + "error", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), + (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "memory", "memory", {}), ], @@ -577,7 +581,8 @@ async def release_all_futures(): (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), - (f3.key, "resumed", "waiting", "executing", {}), + (f3.key, "resumed", "released", "cancelled", {}), + (f3.key, "cancelled", "waiting", "executing", {}), (f3.key, "executing", "memory", "memory", {}), ( f3.key, @@ -602,8 +607,10 @@ async def release_all_futures(): (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), - (f3.key, "resumed", "waiting", "executing", {}), + (f3.key, "resumed", "released", "cancelled", {}), + (f3.key, "cancelled", "waiting", "executing", {}), (f3.key, "executing", "error", "error", {}), + # FIXME: (distributed#7489) ], ) else: From 23db5870236f44df4d029b649596747867039224 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 20 Jan 2023 13:11:17 +0100 Subject: [PATCH 4/9] Add test case --- distributed/tests/test_cancelled_state.py | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 7e4f4b4af7a..9fa83ab963a 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -32,6 +32,8 @@ GatherDepNetworkFailureEvent, GatherDepSuccessEvent, LongRunningMsg, + ReleaseWorkerDataMsg, + RemoveReplicasEvent, RescheduleEvent, SecedeEvent, TaskFinishedMsg, @@ -1172,3 +1174,26 @@ def f(ev1, ev2, ev3, ev4): await ev4.set() assert await x == 2 assert not ws.processing + + +def test_workerstate_remove_replica_of_cancelled_task_dependency(ws): + """If a dependency was fetched, but the task gets freed by the scheduler + before the add-keys message arrives, the scheduler sends a remove-replica + message to the worker, which should then release the dependency. + + See distributed#7487""" + ws2 = "127.0.0.1:2" + instructions = ws.handle_stimulus( + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"), + GatherDepSuccessEvent( + worker=ws2, total_nbytes=1, data={"x": 123}, stimulus_id="s2" + ), + FreeKeysEvent(keys=["y"], stimulus_id="s3"), + ) + assert ws.tasks["x"].state == "memory" + assert ws.tasks["y"].state == "cancelled" + instructions = ws.handle_stimulus( + RemoveReplicasEvent(keys=["x"], stimulus_id="s4"), + ) + assert ws.tasks["x"].state == "released" + assert instructions == [ReleaseWorkerDataMsg(stimulus_id="s4", key="x")] From 125279045c9c01ffe30c16c6af90fb224521e7ac Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 20 Jan 2023 13:28:17 +0100 Subject: [PATCH 5/9] Comment --- distributed/tests/test_cancelled_state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 9fa83ab963a..30e9b12aee1 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -1181,7 +1181,8 @@ def test_workerstate_remove_replica_of_cancelled_task_dependency(ws): before the add-keys message arrives, the scheduler sends a remove-replica message to the worker, which should then release the dependency. - See distributed#7487""" + Read: https://github.com/dask/distributed/pull/7487#issuecomment-1387277900 + """ ws2 = "127.0.0.1:2" instructions = ws.handle_stimulus( ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"), @@ -1192,6 +1193,9 @@ def test_workerstate_remove_replica_of_cancelled_task_dependency(ws): ) assert ws.tasks["x"].state == "memory" assert ws.tasks["y"].state == "cancelled" + + # Test that the worker does accepts the RemoveReplicasEvent and + # subsequently releases the data instructions = ws.handle_stimulus( RemoveReplicasEvent(keys=["x"], stimulus_id="s4"), ) From 381ff9a297ed381b3325084b1e8d19c58909bbf3 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 20 Jan 2023 14:59:50 +0100 Subject: [PATCH 6/9] Minor fix --- distributed/worker_state_machine.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 0996e22b4fc..39ec7fff8c2 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2841,14 +2841,14 @@ def _handle_remove_replicas(self, ev: RemoveReplicasEvent) -> RecsInstrs: ts = self.tasks.get(key) if ts is None or ts.state != "memory": continue - # If the task is still in executing, the scheduler should never have - # asked the worker to drop this key. + # If the task is still in executing or long-running, the scheduler + # should never have asked the worker to drop this key. # We cannot simply forget it because there is a time window between - # setting the state to executing and preparing/collecting the data - # for the task. + # setting the state to executing/long-running and + # preparing/collecting the data for the task. # If a dependency was released during this time, this would pop up # as a KeyError during execute which is hard to understand - if any(dep.state == "executing" for dep in ts.dependents): + if any(dep.state in ("executing", "long-running") for dep in ts.dependents): raise RuntimeError("Encountered invalid state") self.log.append((ts.key, "remove-replica", ev.stimulus_id, time())) recommendations[ts] = "released" From 2bf4dd559c9dbf9f06e30c20f6040b1e3e0a382b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 20 Jan 2023 16:41:07 +0100 Subject: [PATCH 7/9] Add test for rescheduling case --- distributed/tests/test_cancelled_state.py | 29 +++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 30e9b12aee1..4f7bdd5f518 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -926,11 +926,11 @@ def test_workerstate_flight_failure_to_executing(ws, block_queue): assert ws.tasks["x"].state == "executing" -def test_workerstate_resumed_fetch_to_executing(ws_with_running_task): +def test_workerstate_resumed_fetch_to_cancelled_to_executing(ws_with_running_task): """Test state loops: - executing -> cancelled -> resumed(fetch) -> cancelled -> executing - - executing -> long-running -> cancelled -> resumed(fetch) -> long-running + - executing -> long-running -> cancelled -> resumed(fetch) -> cancelled -> long-running See also: test_workerstate_resumed_waiting_to_flight """ @@ -953,6 +953,31 @@ def test_workerstate_resumed_fetch_to_executing(ws_with_running_task): assert ws.tasks["x"].state == prev_state +def test_workerstate_resumed_fetch_to_executing(ws_with_running_task): + ws = ws_with_running_task + ws2 = "127.0.0.1:2" + + prev_state = ws.tasks["x"].state + + instructions = ws.handle_stimulus( + # x is released for whatever reason (e.g. client cancellation) + FreeKeysEvent(keys=["x"], stimulus_id="s1"), + # x was computed somewhere else + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2"), + # x was lost / no known replicas, therefore y is cancelled + FreeKeysEvent(keys=["y"], stimulus_id="s3"), + ComputeTaskEvent.dummy("x", stimulus_id="s4"), + ) + if prev_state == "executing": + assert not instructions + else: + assert instructions == [ + LongRunningMsg(key="x", compute_duration=None, stimulus_id="s4") + ] + assert len(ws.tasks) == 1 + assert ws.tasks["x"].state == prev_state + + def test_workerstate_resumed_waiting_to_flight(ws): """Test state loop: From a9aace3627185a9ec7a775b3bf62f39e8cc11459 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 20 Jan 2023 17:33:44 +0100 Subject: [PATCH 8/9] Remove _transition_resumed_waiting --- distributed/worker_state_machine.py | 46 ----------------------------- 1 file changed, 46 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 39ec7fff8c2..2bcfa97805f 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2156,7 +2156,6 @@ def _transition_resumed_fetch( -------- _transition_cancelled_fetch _transition_cancelled_waiting - _transition_resumed_waiting _transition_flight_fetch """ if ts.previous == "flight": @@ -2203,47 +2202,6 @@ def _transition_resumed_released( ts.next = None return {}, [] - def _transition_resumed_waiting( - self, ts: TaskState, *, stimulus_id: str - ) -> RecsInstrs: - """ - See also - -------- - _transition_cancelled_fetch - _transition_cancelled_or_resumed_long_running - _transition_cancelled_waiting - _transition_resumed_fetch - """ - # None of the exit events of execute or gather_dep recommend a transition to - # waiting - assert not ts.done - if ts.previous == "executing": - assert ts.next == "fetch" - # We're back where we started. We should forget about the entire - # cancellation attempt - ts.state = "executing" - ts.next = None - ts.previous = None - return {}, [] - - elif ts.previous == "long-running": - assert ts.next == "fetch" - # Same as executing, and in addition send the LongRunningMsg in arrears - # Note that, if the task seceded before it was cancelled, this will cause - # the message to be sent twice. - ts.state = "long-running" - ts.next = None - ts.previous = None - smsg = LongRunningMsg( - key=ts.key, compute_duration=None, stimulus_id=stimulus_id - ) - return {}, [smsg] - - else: - assert ts.previous == "flight" - assert ts.next == "waiting" - return {}, [] - def _transition_cancelled_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: @@ -2252,7 +2210,6 @@ def _transition_cancelled_fetch( -------- _transition_cancelled_waiting _transition_resumed_fetch - _transition_resumed_waiting """ if ts.previous == "flight": if ts.done: @@ -2281,7 +2238,6 @@ def _transition_cancelled_waiting( _transition_cancelled_fetch _transition_cancelled_or_resumed_long_running _transition_resumed_fetch - _transition_resumed_waiting """ # None of the exit events of gather_dep or execute recommend a transition to # waiting @@ -2431,7 +2387,6 @@ def _transition_cancelled_or_resumed_long_running( -------- _transition_executing_long_running _transition_cancelled_waiting - _transition_resumed_waiting """ assert ts.previous in ("executing", "long-running") ts.previous = "long-running" @@ -2566,7 +2521,6 @@ def _transition_released_forgotten( ("resumed", "memory"): _transition_resumed_memory, ("resumed", "released"): _transition_resumed_released, ("resumed", "rescheduled"): _transition_resumed_rescheduled, - ("resumed", "waiting"): _transition_resumed_waiting, ("constrained", "executing"): _transition_constrained_executing, ("constrained", "released"): _transition_generic_released, ("error", "released"): _transition_generic_released, From 3a655f143cf7137b7bab06539493bc25f052c374 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 23 Jan 2023 15:55:45 +0100 Subject: [PATCH 9/9] Apply suggestions from code review Co-authored-by: Florian Jetter --- distributed/tests/test_cancelled_state.py | 3 +++ distributed/worker_state_machine.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 4f7bdd5f518..6c9462f0c80 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -954,6 +954,7 @@ def test_workerstate_resumed_fetch_to_cancelled_to_executing(ws_with_running_tas def test_workerstate_resumed_fetch_to_executing(ws_with_running_task): + """See test_resumed_cancelled_handle_compute for end-to-end version""" ws = ws_with_running_task ws2 = "127.0.0.1:2" @@ -1207,6 +1208,8 @@ def test_workerstate_remove_replica_of_cancelled_task_dependency(ws): message to the worker, which should then release the dependency. Read: https://github.com/dask/distributed/pull/7487#issuecomment-1387277900 + + See test_resumed_cancelled_handle_compute for end-to-end version """ ws2 = "127.0.0.1:2" instructions = ws.handle_stimulus( diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 2bcfa97805f..07543f7aef6 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2803,7 +2803,7 @@ def _handle_remove_replicas(self, ev: RemoveReplicasEvent) -> RecsInstrs: # If a dependency was released during this time, this would pop up # as a KeyError during execute which is hard to understand if any(dep.state in ("executing", "long-running") for dep in ts.dependents): - raise RuntimeError("Encountered invalid state") + raise RuntimeError("Encountered invalid state") # pragma: no cover self.log.append((ts.key, "remove-replica", ev.stimulus_id, time())) recommendations[ts] = "released"