From 624add30b160ea9dd60cc611d674fd050048d1c3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 3 Nov 2022 17:34:46 -0600 Subject: [PATCH 01/12] test for is_rootish changing on no-worker tasks it's possible for tasks to not be rootish when they go into no-worker, but to be rootish when they come out. --- distributed/tests/test_scheduler.py | 79 +++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f3e21e9d0af..c899809ba0f 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -8,6 +8,7 @@ import pickle import re import sys +from contextlib import AsyncExitStack from itertools import product from textwrap import dedent from time import sleep @@ -481,6 +482,84 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + "distributed.scheduler.work-stealing": False, + }, +) +async def test_queued_rootish_changes_while_paused(c, s, a, b): + "Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks." + + root = c.submit(inc, 1, key="root") + await root + + # manually pause the workers + a.status = Status.paused + b.status = Status.paused + + await async_wait_for(lambda: not s.running, 5) + + fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)] + # ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too + # small), then the last one does. So N-1 tasks will go to `no-worker`, and the last + # to `queued`. `is_rootish` is just messed up like that. + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + # un-pause + a.status = Status.running + b.status = Status.running + await async_wait_for(lambda: len(s.running) == len(s.workers), 5) + + await c.gather(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing": False}, +) +async def test_queued_rootish_changes_scale_up(c, s, a): + "Tasks are initially root-ish. After cluster scales, they aren't." + + root = c.submit(inc, 1, key="root") + + event = Event() + clog = c.submit(event.wait, key="clog") + await wait_for_state(clog.key, "processing", s) + + fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)]) + + await async_wait_for(lambda: len(s.tasks) > len(fs), 5) + + if not s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is not root-ish. Test may no longer be relevant." + ) + if math.isfinite(s.WORKER_SATURATION): + assert s.queued + + async with AsyncExitStack() as stack: + for _ in range(3): + await stack.enter_async_context(Worker(s.address, nthreads=2)) + + if s.is_rootish(s.tasks[fs[0].key]): + pytest.fail( + "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + ) + + await event.set() + await clog + + # Just verify it doesn't deadlock + await c.gather(fs) + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event() From 70bba668c96105d26e44a847c1c4adcda64724df Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 4 Nov 2022 19:42:36 -0600 Subject: [PATCH 02/12] WIP cache root-ish-ness --- distributed/scheduler.py | 69 +++++++++++++++++++++-------- distributed/tests/test_scheduler.py | 6 +-- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5342d693e91..a581c064e1c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -746,6 +746,9 @@ def remove_from_processing(self, ts: TaskState) -> None: """Remove a task from a workers processing""" if self.scheduler.validate: assert ts in self.processing + assert ts._rootish is not None + + ts._rootish = None if ts in self.long_running: self.long_running.discard(ts) @@ -1311,6 +1314,9 @@ class TaskState: #: Cached hash of :attr:`~TaskState.client_key` _hash: int + #: Cached; set by `SchedulerState.is_rootish` + _rootish: bool | None + # Support for weakrefs to a class with __slots__ __weakref__: Any = None __slots__ = tuple(__annotations__) @@ -1352,6 +1358,7 @@ def __init__(self, key: str, run_spec: object, state: TaskStateState): self.metadata = {} self.annotations = {} self.erred_on = set() + self._rootish = None TaskState._instances.add(self) def __hash__(self) -> int: @@ -1511,10 +1518,15 @@ class SchedulerState: #: All tasks currently known to the scheduler tasks: dict[str, TaskState] - #: Tasks in the "queued" state, ordered by priority + #: Tasks in the "queued" state, ordered by priority. + #: They should generally be root-ish, but in certain cases may not be. + #: They must not have restrictions. + #: Always empty if `worker-saturation` is set to `inf`. queued: HeapSet[TaskState] - #: Tasks in the "no-worker" state + #: Tasks in the "no-worker" state. + #: They may or may not have restrictions. + #: Could contain root-ish tasks even when `worker-saturation` is a finite value. unrunnable: set[TaskState] #: Subset of tasks that exist in memory on more than one worker @@ -2014,11 +2026,19 @@ def transition_no_worker_processing(self, key, stimulus_id): assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" assert ts in self.unrunnable - if ws := self.decide_worker_non_rootish(ts): + decide_worker = ( + self.decide_worker_rootish_queuing_disabled + if self.is_rootish(ts) + else self.decide_worker_non_rootish + ) + if ws := decide_worker(ts): self.unrunnable.discard(ts) worker_msgs = _add_to_processing(self, ts, ws) # If no worker, task just stays in `no-worker` + if self.validate and self.is_rootish(ts): + assert ws is not None + return recommendations, client_msgs, worker_msgs except Exception as e: logger.exception(e) @@ -2052,8 +2072,8 @@ def decide_worker_rootish_queuing_disabled( ``no-worker``. """ if self.validate: - # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` assert math.isinf(self.WORKER_SATURATION) + assert self.is_rootish(ts) pool = self.idle.values() if self.idle else self.running if not pool: @@ -2113,11 +2133,6 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: """ if self.validate: - # We don't `assert self.is_rootish(ts)` here, because that check is dependent on - # cluster size. It's possible a task looked root-ish when it was queued, but the - # cluster has since scaled up and it no longer does when coming out of the queue. - # If `is_rootish` changes to a static definition, then add that assertion here - # (and actually pass in the task). assert not math.isinf(self.WORKER_SATURATION) if not self.idle: @@ -2154,6 +2169,9 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: ``ts`` or there are no running workers, returns None, in which case the task should be transitioned to ``no-worker``. """ + if self.validate: + assert not self.is_rootish(ts) + if not self.running: return None @@ -2988,15 +3006,28 @@ def is_rootish(self, ts: TaskState) -> bool: Root-ish tasks are part of a group that's much larger than the cluster, and have few or no dependencies. """ - if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: - return False - tg = ts.group - # TODO short-circuit to True if `not ts.dependencies`? - return ( - len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 - ) + # NOTE: we cache `is_rootish` not for performance, but so it can't change if + # `TaskGroup` and cluster size does. That avoids annoying edge cases where a + # task does/doesn't look root-ish when it goes into `queued` or `unrunnable`, + # but that's flipped when it comes out. + if (result := ts._rootish) is None: + if ( + ts.resource_restrictions + or ts.worker_restrictions + or ts.host_restrictions + ): + result = False + else: + tg = ts.group + result = ( + len(tg) > self.total_nthreads * 2 + and len(tg.dependencies) < 5 + and sum(map(len, tg.dependencies)) < 5 + ) + ts._rootish = result + else: + pass + return result def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): """Update the status of the idle and saturated state @@ -7052,6 +7083,8 @@ def get_metadata(self, keys: list[str], default=no_default): def set_restrictions(self, worker: dict[str, Collection[str] | str]): for key, restrictions in worker.items(): ts = self.tasks[key] + if ts._rootish is not None: + raise ValueError(f"cannot set restrictions on ready {ts}") if isinstance(restrictions, str): restrictions = {restrictions} ts.worker_restrictions = set(restrictions) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c899809ba0f..b31a4f2a2fe 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -525,7 +525,7 @@ async def test_queued_rootish_changes_while_paused(c, s, a, b): config={"distributed.scheduler.work-stealing": False}, ) async def test_queued_rootish_changes_scale_up(c, s, a): - "Tasks are initially root-ish. After cluster scales, they aren't." + "Tasks are initially root-ish. After cluster scales, they don't meet the definition, but still are." root = c.submit(inc, 1, key="root") @@ -548,9 +548,9 @@ async def test_queued_rootish_changes_scale_up(c, s, a): for _ in range(3): await stack.enter_async_context(Worker(s.address, nthreads=2)) - if s.is_rootish(s.tasks[fs[0].key]): + if not s.is_rootish(s.tasks[fs[0].key]): pytest.fail( - "Test assumptions have changed; task is still root-ish. Test may no longer be relevant." + "Test assumptions have changed; root-ish-ness has flipped. Test may no longer be relevant." ) await event.set() From 711997e02f81493e1cba6a81123f07d4a059cc10 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 4 Nov 2022 19:47:42 -0600 Subject: [PATCH 03/12] update comment --- distributed/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a581c064e1c..c7eb2cb1e8e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1519,14 +1519,13 @@ class SchedulerState: tasks: dict[str, TaskState] #: Tasks in the "queued" state, ordered by priority. - #: They should generally be root-ish, but in certain cases may not be. - #: They must not have restrictions. + #: They are all root-ish. #: Always empty if `worker-saturation` is set to `inf`. queued: HeapSet[TaskState] #: Tasks in the "no-worker" state. #: They may or may not have restrictions. - #: Could contain root-ish tasks even when `worker-saturation` is a finite value. + #: Only contains root-ish tasks if `worker-saturation` is set to `inf`. unrunnable: set[TaskState] #: Subset of tasks that exist in memory on more than one worker From 1806c864e5d52806b2a16d28c48d3fe9193ae2c1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 7 Nov 2022 19:50:32 -0700 Subject: [PATCH 04/12] invalidate `_rootish` when released --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c7eb2cb1e8e..353335beeba 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7945,6 +7945,7 @@ def _propagate_released( recommendations: Recs, ) -> None: ts.state = "released" + ts._rootish = None key = ts.key if ts.has_lost_dependencies: From 49dfa47f9448394eb4f930311c62ec0912852f21 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 11:06:36 -0700 Subject: [PATCH 05/12] also invalidate when entering processing --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 353335beeba..63fd8b51981 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7806,6 +7806,7 @@ def _validate_ready(state: SchedulerState, ts: TaskState) -> None: assert ts not in state.unrunnable assert ts not in state.queued assert all(dts.who_has for dts in ts.dependencies) + assert ts._rootish is not None def _add_to_processing( @@ -7817,6 +7818,7 @@ def _add_to_processing( assert ws in state.running, state.running assert (o := state.workers.get(ws.address)) is ws, (ws, o) + ts._rootish = None ws.add_to_processing(ts) ts.processing_on = ws ts.state = "processing" From a4a400232ec5e2e2d9115944724460090a800b0a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 11:08:30 -0700 Subject: [PATCH 06/12] note when cache is set --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 63fd8b51981..643999e27b3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3009,6 +3009,8 @@ def is_rootish(self, ts: TaskState) -> bool: # `TaskGroup` and cluster size does. That avoids annoying edge cases where a # task does/doesn't look root-ish when it goes into `queued` or `unrunnable`, # but that's flipped when it comes out. + # Specifically, `_rootish` is set only when a task in in the `queued` or + # `no-worker` states. if (result := ts._rootish) is None: if ( ts.resource_restrictions From b0474aeffe0aabca3f8c95a6dde2ee49f53cce4f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 11:09:46 -0700 Subject: [PATCH 07/12] add validation --- distributed/scheduler.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 643999e27b3..66ec08a8d7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4924,6 +4924,7 @@ def validate_released(self, key): assert not any([ts in dts.waiters for dts in ts.dependencies]) assert ts not in self.unrunnable assert ts not in self.queued + assert ts._rootish is None def validate_waiting(self, key): ts: TaskState = self.tasks[key] @@ -4932,6 +4933,7 @@ def validate_waiting(self, key): assert not ts.processing_on assert ts not in self.unrunnable assert ts not in self.queued + assert ts._rootish is None for dts in ts.dependencies: # We are waiting on a dependency iff it's not stored assert bool(dts.who_has) != (dts in ts.waiting_on) @@ -4944,6 +4946,7 @@ def validate_queued(self, key): assert not ts.waiting_on assert not ts.who_has assert not ts.processing_on + assert ts._rootish is True assert not ( ts.worker_restrictions or ts.host_restrictions or ts.resource_restrictions ) @@ -4960,6 +4963,7 @@ def validate_processing(self, key): assert ts in ws.processing assert not ts.who_has assert ts not in self.queued + assert ts._rootish is None for dts in ts.dependencies: assert dts.who_has assert ts in dts.waiters @@ -4973,6 +4977,7 @@ def validate_memory(self, key): assert not ts.waiting_on assert ts not in self.unrunnable assert ts not in self.queued + assert ts._rootish is None for dts in ts.dependents: assert (dts in ts.waiters) == ( dts.state in ("waiting", "queued", "processing", "no-worker") @@ -4987,6 +4992,7 @@ def validate_no_worker(self, key): assert not ts.processing_on assert not ts.who_has assert ts not in self.queued + assert ts._rootish is not None for dts in ts.dependencies: assert dts.who_has @@ -4995,6 +5001,7 @@ def validate_erred(self, key): assert ts.exception_blame assert not ts.who_has assert ts not in self.queued + assert ts._rootish is None def validate_key(self, key, ts: TaskState | None = None): try: From c7578948a0bdaf5c78796c46bfc568f61d7ec413 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 21:16:26 -0700 Subject: [PATCH 08/12] fix incorrect validation --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 66ec08a8d7b..452a9e4fee9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -746,9 +746,6 @@ def remove_from_processing(self, ts: TaskState) -> None: """Remove a task from a workers processing""" if self.scheduler.validate: assert ts in self.processing - assert ts._rootish is not None - - ts._rootish = None if ts in self.long_running: self.long_running.discard(ts) @@ -7855,6 +7852,9 @@ def _exit_processing_common( -------- Scheduler._set_duration_estimate """ + if state.validate: + assert ts._rootish is None, ts._rootish + ws = ts.processing_on assert ws ts.processing_on = None From c5a19f46ce8d547c708d20fa4c94ae63f95fec85 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 21:29:13 -0700 Subject: [PATCH 09/12] assertion info --- distributed/scheduler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 452a9e4fee9..9f5e565f6a9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4921,7 +4921,7 @@ def validate_released(self, key): assert not any([ts in dts.waiters for dts in ts.dependencies]) assert ts not in self.unrunnable assert ts not in self.queued - assert ts._rootish is None + assert ts._rootish is None, ts._rootish def validate_waiting(self, key): ts: TaskState = self.tasks[key] @@ -4930,7 +4930,7 @@ def validate_waiting(self, key): assert not ts.processing_on assert ts not in self.unrunnable assert ts not in self.queued - assert ts._rootish is None + assert ts._rootish is None, ts._rootish for dts in ts.dependencies: # We are waiting on a dependency iff it's not stored assert bool(dts.who_has) != (dts in ts.waiting_on) @@ -4943,7 +4943,7 @@ def validate_queued(self, key): assert not ts.waiting_on assert not ts.who_has assert not ts.processing_on - assert ts._rootish is True + assert ts._rootish is True, ts._rootish assert not ( ts.worker_restrictions or ts.host_restrictions or ts.resource_restrictions ) @@ -4960,7 +4960,7 @@ def validate_processing(self, key): assert ts in ws.processing assert not ts.who_has assert ts not in self.queued - assert ts._rootish is None + assert ts._rootish is None, ts._rootish for dts in ts.dependencies: assert dts.who_has assert ts in dts.waiters @@ -4974,7 +4974,7 @@ def validate_memory(self, key): assert not ts.waiting_on assert ts not in self.unrunnable assert ts not in self.queued - assert ts._rootish is None + assert ts._rootish is None, ts._rootish for dts in ts.dependents: assert (dts in ts.waiters) == ( dts.state in ("waiting", "queued", "processing", "no-worker") @@ -4989,7 +4989,7 @@ def validate_no_worker(self, key): assert not ts.processing_on assert not ts.who_has assert ts not in self.queued - assert ts._rootish is not None + assert ts._rootish is not None, ts._rootish for dts in ts.dependencies: assert dts.who_has @@ -4998,7 +4998,7 @@ def validate_erred(self, key): assert ts.exception_blame assert not ts.who_has assert ts not in self.queued - assert ts._rootish is None + assert ts._rootish is None, ts._rootish def validate_key(self, key, ts: TaskState | None = None): try: @@ -7812,7 +7812,7 @@ def _validate_ready(state: SchedulerState, ts: TaskState) -> None: assert ts not in state.unrunnable assert ts not in state.queued assert all(dts.who_has for dts in ts.dependencies) - assert ts._rootish is not None + assert ts._rootish is not None, ts._rootish def _add_to_processing( From d182c2a9313310f27bdb1fa4238bfa3c12c9c08d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 21:31:08 -0700 Subject: [PATCH 10/12] cache explicitly instead of by side effect --- distributed/scheduler.py | 45 +++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9f5e565f6a9..e9f0c5a991c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1311,7 +1311,8 @@ class TaskState: #: Cached hash of :attr:`~TaskState.client_key` _hash: int - #: Cached; set by `SchedulerState.is_rootish` + #: Cached while tasks are in `queued` or `no-worker`; set in + #: `transition_waiting_processing` and `_add_to_processing` _rootish: bool | None # Support for weakrefs to a class with __slots__ @@ -2236,6 +2237,7 @@ def transition_waiting_processing(self, key, stimulus_id): # NOTE: having two root-ish methods is temporary. When the feature flag is removed, # there should only be one, which combines co-assignment and queuing. # Eventually, special-casing root tasks might be removed entirely, with better heuristics. + ts._rootish = True # cached until `processing` if math.isinf(self.WORKER_SATURATION): if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): return {ts.key: "no-worker"}, {}, {} @@ -2243,6 +2245,7 @@ def transition_waiting_processing(self, key, stimulus_id): if not (ws := self.decide_worker_rootish_queuing_enabled()): return {ts.key: "queued"}, {}, {} else: + ts._rootish = False # cached until `processing` if not (ws := self.decide_worker_non_rootish(ts)): return {ts.key: "no-worker"}, {}, {} @@ -3002,30 +3005,24 @@ def is_rootish(self, ts: TaskState) -> bool: Root-ish tasks are part of a group that's much larger than the cluster, and have few or no dependencies. """ - # NOTE: we cache `is_rootish` not for performance, but so it can't change if - # `TaskGroup` and cluster size does. That avoids annoying edge cases where a - # task does/doesn't look root-ish when it goes into `queued` or `unrunnable`, - # but that's flipped when it comes out. - # Specifically, `_rootish` is set only when a task in in the `queued` or - # `no-worker` states. - if (result := ts._rootish) is None: - if ( - ts.resource_restrictions - or ts.worker_restrictions - or ts.host_restrictions - ): - result = False - else: - tg = ts.group - result = ( - len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 - ) - ts._rootish = result + # NOTE: the result of `is_rootish` is cached when putting a task into `queued` + # or `no-worker`, and invalidated when leaving those states. we cache + # `is_rootish` not for performance, but so it can't change if `TaskGroup` and + # cluster size does. That avoids annoying edge cases where a task does/doesn't + # look root-ish when it goes into `queued` or `unrunnable`, but that's flipped + # when it comes out. + if (cached := ts._rootish) is not None: + return cached + + if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: + return False else: - pass - return result + tg = ts.group + return ( + len(tg) > self.total_nthreads * 2 + and len(tg.dependencies) < 5 + and sum(map(len, tg.dependencies)) < 5 + ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): """Update the status of the idle and saturated state From 1a19cc634fa71103819b0dd5a9005d2e135263b7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 21:33:52 -0700 Subject: [PATCH 11/12] unnecessary diff --- distributed/scheduler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e9f0c5a991c..21fa5ee6354 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3016,13 +3016,13 @@ def is_rootish(self, ts: TaskState) -> bool: if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: return False - else: - tg = ts.group - return ( - len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 - ) + tg = ts.group + # TODO short-circuit to True if `not ts.dependencies`? + return ( + len(tg) > self.total_nthreads * 2 + and len(tg.dependencies) < 5 + and sum(map(len, tg.dependencies)) < 5 + ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): """Update the status of the idle and saturated state From 6f188bf21cd379571d38c00e320c8bb5ad2baacb Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 8 Nov 2022 21:35:15 -0700 Subject: [PATCH 12/12] correct comment --- distributed/scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 21fa5ee6354..c0a3308202d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3005,12 +3005,12 @@ def is_rootish(self, ts: TaskState) -> bool: Root-ish tasks are part of a group that's much larger than the cluster, and have few or no dependencies. """ - # NOTE: the result of `is_rootish` is cached when putting a task into `queued` - # or `no-worker`, and invalidated when leaving those states. we cache - # `is_rootish` not for performance, but so it can't change if `TaskGroup` and - # cluster size does. That avoids annoying edge cases where a task does/doesn't - # look root-ish when it goes into `queued` or `unrunnable`, but that's flipped - # when it comes out. + # NOTE: the result of `is_rootish` is cached in `waiting->processing`, and + # invalidated when entering `processing`. This is for the benefit of the + # `queued` and and `no-worker` states. We cache `is_rootish` not for + # performance, but so it can't change if `TaskGroup` and cluster size does. That + # avoids annoying edge cases where a task does/doesn't look root-ish when it + # goes into `queued` or `unrunnable`, but that's flipped when it comes out. if (cached := ts._rootish) is not None: return cached