From 18201b2c21e5ce5b5deb68928f0c433877f09225 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 16:45:33 -0700 Subject: [PATCH 1/7] `decide_worker_and_next_state` --- distributed/scheduler.py | 87 ++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb5828bf716..f43a96b50cd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2032,20 +2032,20 @@ def transition_released_waiting(self, key, stimulus_id): def transition_no_worker_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} if self.validate: assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" assert ts in self.unrunnable - if ws := self.decide_worker_non_rootish(ts): - self.unrunnable.discard(ts) - worker_msgs = _add_to_processing(self, ts, ws) - # If no worker, task just stays in `no-worker` + ws, state = self.decide_worker_and_next_state(ts) + if not ws: + return {key: state}, {}, {} - return recommendations, client_msgs, worker_msgs + if self.validate: + assert state == "processing", state + + self.unrunnable.discard(ts) + return {}, {}, _add_to_processing(self, ts, ws) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2054,7 +2054,7 @@ def transition_no_worker_processing(self, key, stimulus_id): pdb.set_trace() raise - def decide_worker_rootish_queuing_disabled( + def _decide_worker_rootish_queuing_disabled( self, ts: TaskState ) -> WorkerState | None: """Pick a worker for a runnable root-ish task, without queuing. @@ -2114,7 +2114,7 @@ def decide_worker_rootish_queuing_disabled( return ws - def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: + def _decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: """Pick a worker for a runnable root-ish task, if not all are busy. Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer @@ -2166,7 +2166,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: return ws - def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: + def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: """Pick a worker for a runnable non-root task, considering dependencies and restrictions. Out of eligible workers holding dependencies of ``ts``, selects the worker @@ -2235,6 +2235,32 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: return ws + def decide_worker_and_next_state( + self, ts: TaskState + ) -> tuple[WorkerState | None, Literal["queued", "no-worker", "processing"]]: + """Pick a worker for a runnable task. + + Selects an appropriate worker to run the task (or None), based on whether the + task is root-ish or not and whether queuing is enabled. Also returns the next + state the task should go do. If the worker is not None, the state will be + ``processing``. + """ + if self.is_rootish(ts): + # NOTE: having two root-ish methods is temporary. When the feature flag is removed, + # there should only be one, which combines co-assignment and queuing. + # Eventually, special-casing root tasks might be removed entirely, with better heuristics. + if math.isinf(self.WORKER_SATURATION): + if not (ws := self._decide_worker_rootish_queuing_disabled(ts)): + return None, "no-worker" + else: + if not (ws := self._decide_worker_rootish_queuing_enabled()): + return None, "queued" + else: + if not (ws := self._decide_worker_non_rootish(ts)): + return None, "no-worker" + + return ws, "processing" + def transition_waiting_processing(self, key, stimulus_id): """Possibly schedule a ready task. This is the primary dispatch for ready tasks. @@ -2244,22 +2270,14 @@ def transition_waiting_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - if self.is_rootish(ts): - # NOTE: having two root-ish methods is temporary. When the feature flag is removed, - # there should only be one, which combines co-assignment and queuing. - # Eventually, special-casing root tasks might be removed entirely, with better heuristics. - if math.isinf(self.WORKER_SATURATION): - if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): - return {ts.key: "no-worker"}, {}, {} - else: - if not (ws := self.decide_worker_rootish_queuing_enabled()): - return {ts.key: "queued"}, {}, {} - else: - if not (ws := self.decide_worker_non_rootish(ts)): - return {ts.key: "no-worker"}, {}, {} + ws, state = self.decide_worker_and_next_state(ts) + if not ws: + return {key: state}, {}, {} - worker_msgs = _add_to_processing(self, ts, ws) - return {}, {}, worker_msgs + if self.validate: + assert state == "processing", state + + return {}, {}, _add_to_processing(self, ts, ws) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2854,20 +2872,21 @@ def transition_queued_released(self, key, stimulus_id): def transition_queued_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} if self.validate: assert not ts.actor, f"Actors can't be queued: {ts}" assert ts in self.queued - if ws := self.decide_worker_rootish_queuing_enabled(): - self.queued.discard(ts) - worker_msgs = _add_to_processing(self, ts, ws) - # If no worker, task just stays `queued` + ws, state = self.decide_worker_and_next_state(ts) + if not ws: + return {key: state}, {}, {} + + if self.validate: + assert state == "processing", state + + self.queued.discard(ts) + return {}, {}, _add_to_processing(self, ts, ws) - return recommendations, client_msgs, worker_msgs except Exception as e: logger.exception(e) if LOG_PDB: From fac16c2979d4c4c4335e217af5c2a7de6a73ca18 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 16:53:05 -0700 Subject: [PATCH 2/7] `decide_worker_or_next_state` --- distributed/scheduler.py | 55 +++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f43a96b50cd..4bad3e00830 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2037,15 +2037,12 @@ def transition_no_worker_processing(self, key, stimulus_id): assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" assert ts in self.unrunnable - ws, state = self.decide_worker_and_next_state(ts) - if not ws: - return {key: state}, {}, {} - - if self.validate: - assert state == "processing", state + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} self.unrunnable.discard(ts) - return {}, {}, _add_to_processing(self, ts, ws) + return {}, {}, _add_to_processing(self, ts, ws_or_state) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2235,15 +2232,15 @@ def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: return ws - def decide_worker_and_next_state( + def decide_worker_or_next_state( self, ts: TaskState - ) -> tuple[WorkerState | None, Literal["queued", "no-worker", "processing"]]: - """Pick a worker for a runnable task. + ) -> WorkerState | Literal["queued", "no-worker"]: + """ + Pick a worker for a runnable task, or if there is none, which state to hold the task in. - Selects an appropriate worker to run the task (or None), based on whether the - task is root-ish or not and whether queuing is enabled. Also returns the next - state the task should go do. If the worker is not None, the state will be - ``processing``. + Selects an appropriate worker to run the task, based on whether the task is + root-ish or not and whether queuing is enabled. If there is none, instead + returns the next state the task should go do. """ if self.is_rootish(ts): # NOTE: having two root-ish methods is temporary. When the feature flag is removed, @@ -2251,15 +2248,15 @@ def decide_worker_and_next_state( # Eventually, special-casing root tasks might be removed entirely, with better heuristics. if math.isinf(self.WORKER_SATURATION): if not (ws := self._decide_worker_rootish_queuing_disabled(ts)): - return None, "no-worker" + return "no-worker" else: if not (ws := self._decide_worker_rootish_queuing_enabled()): - return None, "queued" + return "queued" else: if not (ws := self._decide_worker_non_rootish(ts)): - return None, "no-worker" + return "no-worker" - return ws, "processing" + return ws def transition_waiting_processing(self, key, stimulus_id): """Possibly schedule a ready task. This is the primary dispatch for ready tasks. @@ -2270,14 +2267,11 @@ def transition_waiting_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - ws, state = self.decide_worker_and_next_state(ts) - if not ws: - return {key: state}, {}, {} + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} - if self.validate: - assert state == "processing", state - - return {}, {}, _add_to_processing(self, ts, ws) + return {}, {}, _add_to_processing(self, ts, ws_or_state) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2877,15 +2871,12 @@ def transition_queued_processing(self, key, stimulus_id): assert not ts.actor, f"Actors can't be queued: {ts}" assert ts in self.queued - ws, state = self.decide_worker_and_next_state(ts) - if not ws: - return {key: state}, {}, {} - - if self.validate: - assert state == "processing", state + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} self.queued.discard(ts) - return {}, {}, _add_to_processing(self, ts, ws) + return {}, {}, _add_to_processing(self, ts, ws_or_state) except Exception as e: logger.exception(e) From b5227acee1cae6548f4ea7d3ace1c40cfd340fe1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 17:03:56 -0700 Subject: [PATCH 3/7] driveby: actors aren't root-ish --- distributed/scheduler.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4bad3e00830..d45ba901959 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3030,7 +3030,12 @@ def is_rootish(self, ts: TaskState) -> bool: Root-ish tasks are part of a group that's much larger than the cluster, and have few or no dependencies. """ - if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: + if ( + ts.resource_restrictions + or ts.worker_restrictions + or ts.host_restrictions + or ts.actor + ): return False tg = ts.group # TODO short-circuit to True if `not ts.dependencies`? From 446cee5c7bcf08818f7e26673f8134ef7555efec Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 17:09:34 -0700 Subject: [PATCH 4/7] queued<->no-worker transitions --- distributed/scheduler.py | 48 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d45ba901959..47f2084ae4b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2029,6 +2029,30 @@ def transition_released_waiting(self, key, stimulus_id): pdb.set_trace() raise + def transition_no_worker_queued(self, key, stimulus_id): + try: + ts: TaskState = self.tasks[key] + + if self.validate: + assert self.is_rootish( + ts + ), "Non root-ish task should remain in no-worker" + assert not ts.actor, f"Actors can't be queued: {ts}" + assert ts not in self.queued + + self.unrunnable.remove(ts) + self.queued.add(ts) + ts.state = "queued" + + return {}, {}, {} + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + def transition_no_worker_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] @@ -2863,6 +2887,28 @@ def transition_queued_released(self, key, stimulus_id): pdb.set_trace() raise + def transition_queued_no_worker(self, key, stimulus_id): + try: + ts: TaskState = self.tasks[key] + + if self.validate: + assert not self.is_rootish(ts), "Root-ish task should remain in queued" + assert not ts.actor, f"Actors can't be queued: {ts}" + assert ts not in self.unrunnable + + self.queued.remove(ts) + self.unrunnable.add(ts) + ts.state = "no-worker" + + return {}, {}, {} + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + def transition_queued_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] @@ -2998,11 +3044,13 @@ def transition_released_forgotten(self, key, stimulus_id): ("waiting", "queued"): transition_waiting_queued, ("waiting", "memory"): transition_waiting_memory, ("queued", "released"): transition_queued_released, + ("queued", "no-worker"): transition_queued_no_worker, ("queued", "processing"): transition_queued_processing, ("processing", "released"): transition_processing_released, ("processing", "memory"): transition_processing_memory, ("processing", "erred"): transition_processing_erred, ("no-worker", "released"): transition_no_worker_released, + ("no-worker", "queued"): transition_no_worker_queued, ("no-worker", "processing"): transition_no_worker_processing, ("released", "forgotten"): transition_released_forgotten, ("memory", "forgotten"): transition_memory_forgotten, From af1b9614e77eb9331eaa3877525b436bf62eaf32 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 17:16:22 -0700 Subject: [PATCH 5/7] add tests from #7259 --- distributed/tests/test_scheduler.py | 79 +++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 48ad99db771..061e6b49ebf 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -8,6 +8,7 @@ import pickle import re import sys +from contextlib import AsyncExitStack from itertools import product from textwrap import dedent from time import sleep @@ -481,6 +482,84 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + "distributed.scheduler.work-stealing": False, + }, +) +async def test_queued_rootish_changes_while_paused(c, s, a, b): + "Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks." + + root = c.submit(inc, 1, key="root") + await root + + # manually pause the workers + a.status = Status.paused + b.status = Status.paused + + await async_wait_for(lambda: not s.running, 5) + + fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)] + # ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too + # small), then the last one does. So N-1 tasks will go to `no-worker`, and the last + # to `queued`. `is_rootish` is just messed up like that. + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + # un-pause + a.status = Status.running + b.status = Status.running + await async_wait_for(lambda: len(s.running) == len(s.workers), 5) + + await c.gather(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing": False}, +) +async def test_queued_rootish_changes_scale_up(c, s, a): + "Tasks are initially root-ish. After cluster scales, they aren't." + + root = c.submit(inc, 1, key="root") + + event = Event() + clog = c.submit(event.wait, key="clog") + await wait_for_state(clog.key, "processing", s) + + fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)]) + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + if not s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is not root-ish. Test may no longer be relevant." + ) + if math.isfinite(s.WORKER_SATURATION): + assert s.queued + + async with AsyncExitStack() as stack: + for _ in range(3): + await stack.enter_async_context(Worker(s.address, nthreads=2)) + + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + await event.set() + await clog + + # Just verify it doesn't deadlock + await c.gather(fs) + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event() From 71043b22521fffa7cbd1ea4ec86b637c9e19bbd3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 18:26:18 -0700 Subject: [PATCH 6/7] `test_queued_oversaturates_after_group_shrinks` --- distributed/tests/test_scheduler.py | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 061e6b49ebf..a8a6e12a708 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -482,6 +482,61 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)], + config={ + "distributed.scheduler.worker-saturation": 1.0, + }, +) +async def test_queued_oversaturates_after_group_shrinks(c, s, a): + """ + When tasks switch from root-ish to non-root-ish, even though they're in `queued`, + they're scheduled without regard to worker-saturation. + + This isn't really desireable behavior, it's just what happens to occur right now. If + this changes, that's okay (maybe even good). + """ + root = c.submit(inc, 1, key="root") + + # Put some tasks in the queue + es = [Event() for _ in range(5)] + fs = c.map(lambda _, e: e.wait(), [root] * len(es), es) + await wait_for_state(fs[0].key, "processing", s) + assert s.queued + + # Add a downstream task that depends on fs[0] + de = Event() + downstream = c.submit(lambda _: de.wait(), fs[0]) + await wait_for_state(downstream.key, "waiting", s) + + # Cancel one task. Group is now too small to be root-ish. + del fs[-1], es[-1] + await async_wait_for(lambda: len(s.tasks) == len(fs) + 2, 5) + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + # Let the downstream task schedule. + # When a slot opens and we try to schedule the next task on the queue, + # it gets scheduled as non-root-ish. So both the downstream task and the next + # queued task get assigned to the worker, exceeding worker-saturation. + await es[0].set() + await wait_for_state(downstream.key, "processing", s) + # KEY ASSERTION: + # the next task on the queue got scheduled, exceeding worker-saturation, because + # even though it was in `queued`, it was no longer root-ish. + assert len(s.workers[a.address].processing) == a.state.nthreads + 1 + + # Everything runs + await de.set() + await downstream + await asyncio.gather(*(e.set() for e in es)) + + await c.gather(fs) + + @gen_cluster( client=True, nthreads=[("", 2)] * 2, @@ -494,6 +549,7 @@ async def test_queued_remove_add_worker(c, s, a, b): ) async def test_queued_rootish_changes_while_paused(c, s, a, b): "Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks." + # NOTE: this tests the `no-worker->queued` transition when queueing is active. root = c.submit(inc, 1, key="root") await root From 04201ffb1e67d745e0b563482f03d66468b1557b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 18:27:19 -0700 Subject: [PATCH 7/7] note unreachable transition --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 47f2084ae4b..9c2d36a83eb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2888,6 +2888,7 @@ def transition_queued_released(self, key, stimulus_id): raise def transition_queued_no_worker(self, key, stimulus_id): + # FIXME this transition may be unreachable? try: ts: TaskState = self.tasks[key]