diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb5828bf716..9c2d36a83eb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2029,23 +2029,44 @@ def transition_released_waiting(self, key, stimulus_id): pdb.set_trace() raise + def transition_no_worker_queued(self, key, stimulus_id): + try: + ts: TaskState = self.tasks[key] + + if self.validate: + assert self.is_rootish( + ts + ), "Non root-ish task should remain in no-worker" + assert not ts.actor, f"Actors can't be queued: {ts}" + assert ts not in self.queued + + self.unrunnable.remove(ts) + self.queued.add(ts) + ts.state = "queued" + + return {}, {}, {} + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + def transition_no_worker_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} if self.validate: assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" assert ts in self.unrunnable - if ws := self.decide_worker_non_rootish(ts): - self.unrunnable.discard(ts) - worker_msgs = _add_to_processing(self, ts, ws) - # If no worker, task just stays in `no-worker` + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} - return recommendations, client_msgs, worker_msgs + self.unrunnable.discard(ts) + return {}, {}, _add_to_processing(self, ts, ws_or_state) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2054,7 +2075,7 @@ def transition_no_worker_processing(self, key, stimulus_id): pdb.set_trace() raise - def decide_worker_rootish_queuing_disabled( + def _decide_worker_rootish_queuing_disabled( self, ts: TaskState ) -> WorkerState | None: """Pick a worker for a runnable root-ish task, without queuing. @@ -2114,7 +2135,7 @@ def decide_worker_rootish_queuing_disabled( return ws - def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: + def _decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: """Pick a worker for a runnable root-ish task, if not all are busy. Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer @@ -2166,7 +2187,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: return ws - def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: + def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: """Pick a worker for a runnable non-root task, considering dependencies and restrictions. Out of eligible workers holding dependencies of ``ts``, selects the worker @@ -2235,6 +2256,32 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: return ws + def decide_worker_or_next_state( + self, ts: TaskState + ) -> WorkerState | Literal["queued", "no-worker"]: + """ + Pick a worker for a runnable task, or if there is none, which state to hold the task in. + + Selects an appropriate worker to run the task, based on whether the task is + root-ish or not and whether queuing is enabled. If there is none, instead + returns the next state the task should go do. + """ + if self.is_rootish(ts): + # NOTE: having two root-ish methods is temporary. When the feature flag is removed, + # there should only be one, which combines co-assignment and queuing. + # Eventually, special-casing root tasks might be removed entirely, with better heuristics. + if math.isinf(self.WORKER_SATURATION): + if not (ws := self._decide_worker_rootish_queuing_disabled(ts)): + return "no-worker" + else: + if not (ws := self._decide_worker_rootish_queuing_enabled()): + return "queued" + else: + if not (ws := self._decide_worker_non_rootish(ts)): + return "no-worker" + + return ws + def transition_waiting_processing(self, key, stimulus_id): """Possibly schedule a ready task. This is the primary dispatch for ready tasks. @@ -2244,22 +2291,11 @@ def transition_waiting_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - if self.is_rootish(ts): - # NOTE: having two root-ish methods is temporary. When the feature flag is removed, - # there should only be one, which combines co-assignment and queuing. - # Eventually, special-casing root tasks might be removed entirely, with better heuristics. - if math.isinf(self.WORKER_SATURATION): - if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): - return {ts.key: "no-worker"}, {}, {} - else: - if not (ws := self.decide_worker_rootish_queuing_enabled()): - return {ts.key: "queued"}, {}, {} - else: - if not (ws := self.decide_worker_non_rootish(ts)): - return {ts.key: "no-worker"}, {}, {} + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} - worker_msgs = _add_to_processing(self, ts, ws) - return {}, {}, worker_msgs + return {}, {}, _add_to_processing(self, ts, ws_or_state) except Exception as e: logger.exception(e) if LOG_PDB: @@ -2851,23 +2887,44 @@ def transition_queued_released(self, key, stimulus_id): pdb.set_trace() raise + def transition_queued_no_worker(self, key, stimulus_id): + # FIXME this transition may be unreachable? + try: + ts: TaskState = self.tasks[key] + + if self.validate: + assert not self.is_rootish(ts), "Root-ish task should remain in queued" + assert not ts.actor, f"Actors can't be queued: {ts}" + assert ts not in self.unrunnable + + self.queued.remove(ts) + self.unrunnable.add(ts) + ts.state = "no-worker" + + return {}, {}, {} + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + def transition_queued_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} if self.validate: assert not ts.actor, f"Actors can't be queued: {ts}" assert ts in self.queued - if ws := self.decide_worker_rootish_queuing_enabled(): - self.queued.discard(ts) - worker_msgs = _add_to_processing(self, ts, ws) - # If no worker, task just stays `queued` + ws_or_state = self.decide_worker_or_next_state(ts) + if isinstance(ws_or_state, str): + return {key: ws_or_state}, {}, {} + + self.queued.discard(ts) + return {}, {}, _add_to_processing(self, ts, ws_or_state) - return recommendations, client_msgs, worker_msgs except Exception as e: logger.exception(e) if LOG_PDB: @@ -2988,11 +3045,13 @@ def transition_released_forgotten(self, key, stimulus_id): ("waiting", "queued"): transition_waiting_queued, ("waiting", "memory"): transition_waiting_memory, ("queued", "released"): transition_queued_released, + ("queued", "no-worker"): transition_queued_no_worker, ("queued", "processing"): transition_queued_processing, ("processing", "released"): transition_processing_released, ("processing", "memory"): transition_processing_memory, ("processing", "erred"): transition_processing_erred, ("no-worker", "released"): transition_no_worker_released, + ("no-worker", "queued"): transition_no_worker_queued, ("no-worker", "processing"): transition_no_worker_processing, ("released", "forgotten"): transition_released_forgotten, ("memory", "forgotten"): transition_memory_forgotten, @@ -3020,7 +3079,12 @@ def is_rootish(self, ts: TaskState) -> bool: Root-ish tasks are part of a group that's much larger than the cluster, and have few or no dependencies. """ - if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: + if ( + ts.resource_restrictions + or ts.worker_restrictions + or ts.host_restrictions + or ts.actor + ): return False tg = ts.group # TODO short-circuit to True if `not ts.dependencies`? diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 48ad99db771..a8a6e12a708 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -8,6 +8,7 @@ import pickle import re import sys +from contextlib import AsyncExitStack from itertools import product from textwrap import dedent from time import sleep @@ -481,6 +482,140 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)], + config={ + "distributed.scheduler.worker-saturation": 1.0, + }, +) +async def test_queued_oversaturates_after_group_shrinks(c, s, a): + """ + When tasks switch from root-ish to non-root-ish, even though they're in `queued`, + they're scheduled without regard to worker-saturation. + + This isn't really desireable behavior, it's just what happens to occur right now. If + this changes, that's okay (maybe even good). + """ + root = c.submit(inc, 1, key="root") + + # Put some tasks in the queue + es = [Event() for _ in range(5)] + fs = c.map(lambda _, e: e.wait(), [root] * len(es), es) + await wait_for_state(fs[0].key, "processing", s) + assert s.queued + + # Add a downstream task that depends on fs[0] + de = Event() + downstream = c.submit(lambda _: de.wait(), fs[0]) + await wait_for_state(downstream.key, "waiting", s) + + # Cancel one task. Group is now too small to be root-ish. + del fs[-1], es[-1] + await async_wait_for(lambda: len(s.tasks) == len(fs) + 2, 5) + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + # Let the downstream task schedule. + # When a slot opens and we try to schedule the next task on the queue, + # it gets scheduled as non-root-ish. So both the downstream task and the next + # queued task get assigned to the worker, exceeding worker-saturation. + await es[0].set() + await wait_for_state(downstream.key, "processing", s) + # KEY ASSERTION: + # the next task on the queue got scheduled, exceeding worker-saturation, because + # even though it was in `queued`, it was no longer root-ish. + assert len(s.workers[a.address].processing) == a.state.nthreads + 1 + + # Everything runs + await de.set() + await downstream + await asyncio.gather(*(e.set() for e in es)) + + await c.gather(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + "distributed.scheduler.work-stealing": False, + }, +) +async def test_queued_rootish_changes_while_paused(c, s, a, b): + "Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks." + # NOTE: this tests the `no-worker->queued` transition when queueing is active. + + root = c.submit(inc, 1, key="root") + await root + + # manually pause the workers + a.status = Status.paused + b.status = Status.paused + + await async_wait_for(lambda: not s.running, 5) + + fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)] + # ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too + # small), then the last one does. So N-1 tasks will go to `no-worker`, and the last + # to `queued`. `is_rootish` is just messed up like that. + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + # un-pause + a.status = Status.running + b.status = Status.running + await async_wait_for(lambda: len(s.running) == len(s.workers), 5) + + await c.gather(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing": False}, +) +async def test_queued_rootish_changes_scale_up(c, s, a): + "Tasks are initially root-ish. After cluster scales, they aren't." + + root = c.submit(inc, 1, key="root") + + event = Event() + clog = c.submit(event.wait, key="clog") + await wait_for_state(clog.key, "processing", s) + + fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)]) + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + if not s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is not root-ish. Test may no longer be relevant." + ) + if math.isfinite(s.WORKER_SATURATION): + assert s.queued + + async with AsyncExitStack() as stack: + for _ in range(3): + await stack.enter_async_context(Worker(s.address, nthreads=2)) + + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + await event.set() + await clog + + # Just verify it doesn't deadlock + await c.gather(fs) + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event()