Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 99 additions & 35 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2029,23 +2029,44 @@ def transition_released_waiting(self, key, stimulus_id):
pdb.set_trace()
raise

def transition_no_worker_queued(self, key, stimulus_id):
try:
ts: TaskState = self.tasks[key]

if self.validate:
assert self.is_rootish(
ts
), "Non root-ish task should remain in no-worker"
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts not in self.queued

self.unrunnable.remove(ts)
self.queued.add(ts)
ts.state = "queued"

return {}, {}, {}
except Exception as e:
logger.exception(e)
if LOG_PDB:
import pdb

pdb.set_trace()
raise

def transition_no_worker_processing(self, key, stimulus_id):
try:
ts: TaskState = self.tasks[key]
recommendations: Recs = {}
client_msgs: dict = {}
worker_msgs: dict = {}

if self.validate:
assert not ts.actor, f"Actors can't be in `no-worker`: {ts}"
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
self.unrunnable.discard(ts)
worker_msgs = _add_to_processing(self, ts, ws)
# If no worker, task just stays in `no-worker`
ws_or_state = self.decide_worker_or_next_state(ts)
if isinstance(ws_or_state, str):
return {key: ws_or_state}, {}, {}

return recommendations, client_msgs, worker_msgs
self.unrunnable.discard(ts)
return {}, {}, _add_to_processing(self, ts, ws_or_state)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand All @@ -2054,7 +2075,7 @@ def transition_no_worker_processing(self, key, stimulus_id):
pdb.set_trace()
raise

def decide_worker_rootish_queuing_disabled(
def _decide_worker_rootish_queuing_disabled(
self, ts: TaskState
) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, without queuing.
Expand Down Expand Up @@ -2114,7 +2135,7 @@ def decide_worker_rootish_queuing_disabled(

return ws

def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
def _decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
"""Pick a worker for a runnable root-ish task, if not all are busy.

Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer
Expand Down Expand Up @@ -2166,7 +2187,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:

return ws

def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
def _decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
"""Pick a worker for a runnable non-root task, considering dependencies and restrictions.

Out of eligible workers holding dependencies of ``ts``, selects the worker
Expand Down Expand Up @@ -2235,6 +2256,32 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:

return ws

def decide_worker_or_next_state(
self, ts: TaskState
) -> WorkerState | Literal["queued", "no-worker"]:
"""
Pick a worker for a runnable task, or if there is none, which state to hold the task in.

Selects an appropriate worker to run the task, based on whether the task is
root-ish or not and whether queuing is enabled. If there is none, instead
returns the next state the task should go do.
"""
if self.is_rootish(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is removed,
# there should only be one, which combines co-assignment and queuing.
# Eventually, special-casing root tasks might be removed entirely, with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self._decide_worker_rootish_queuing_disabled(ts)):
return "no-worker"
else:
if not (ws := self._decide_worker_rootish_queuing_enabled()):
return "queued"
else:
if not (ws := self._decide_worker_non_rootish(ts)):
return "no-worker"

return ws

def transition_waiting_processing(self, key, stimulus_id):
"""Possibly schedule a ready task. This is the primary dispatch for ready tasks.

Expand All @@ -2244,22 +2291,11 @@ def transition_waiting_processing(self, key, stimulus_id):
try:
ts: TaskState = self.tasks[key]

if self.is_rootish(ts):
# NOTE: having two root-ish methods is temporary. When the feature flag is removed,
# there should only be one, which combines co-assignment and queuing.
# Eventually, special-casing root tasks might be removed entirely, with better heuristics.
if math.isinf(self.WORKER_SATURATION):
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
return {ts.key: "no-worker"}, {}, {}
else:
if not (ws := self.decide_worker_rootish_queuing_enabled()):
return {ts.key: "queued"}, {}, {}
else:
if not (ws := self.decide_worker_non_rootish(ts)):
return {ts.key: "no-worker"}, {}, {}
ws_or_state = self.decide_worker_or_next_state(ts)
if isinstance(ws_or_state, str):
return {key: ws_or_state}, {}, {}

worker_msgs = _add_to_processing(self, ts, ws)
return {}, {}, worker_msgs
return {}, {}, _add_to_processing(self, ts, ws_or_state)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down Expand Up @@ -2851,23 +2887,44 @@ def transition_queued_released(self, key, stimulus_id):
pdb.set_trace()
raise

def transition_queued_no_worker(self, key, stimulus_id):
# FIXME this transition may be unreachable?
try:
ts: TaskState = self.tasks[key]

if self.validate:
assert not self.is_rootish(ts), "Root-ish task should remain in queued"
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts not in self.unrunnable

self.queued.remove(ts)
self.unrunnable.add(ts)
ts.state = "no-worker"

return {}, {}, {}
except Exception as e:
logger.exception(e)
if LOG_PDB:
import pdb

pdb.set_trace()
raise

def transition_queued_processing(self, key, stimulus_id):
try:
ts: TaskState = self.tasks[key]
recommendations: Recs = {}
client_msgs: dict = {}
worker_msgs: dict = {}

if self.validate:
assert not ts.actor, f"Actors can't be queued: {ts}"
assert ts in self.queued

if ws := self.decide_worker_rootish_queuing_enabled():
self.queued.discard(ts)
worker_msgs = _add_to_processing(self, ts, ws)
# If no worker, task just stays `queued`
ws_or_state = self.decide_worker_or_next_state(ts)
if isinstance(ws_or_state, str):
return {key: ws_or_state}, {}, {}

self.queued.discard(ts)
return {}, {}, _add_to_processing(self, ts, ws_or_state)

return recommendations, client_msgs, worker_msgs
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down Expand Up @@ -2988,11 +3045,13 @@ def transition_released_forgotten(self, key, stimulus_id):
("waiting", "queued"): transition_waiting_queued,
("waiting", "memory"): transition_waiting_memory,
("queued", "released"): transition_queued_released,
("queued", "no-worker"): transition_queued_no_worker,
("queued", "processing"): transition_queued_processing,
("processing", "released"): transition_processing_released,
("processing", "memory"): transition_processing_memory,
("processing", "erred"): transition_processing_erred,
("no-worker", "released"): transition_no_worker_released,
("no-worker", "queued"): transition_no_worker_queued,
("no-worker", "processing"): transition_no_worker_processing,
("released", "forgotten"): transition_released_forgotten,
("memory", "forgotten"): transition_memory_forgotten,
Expand Down Expand Up @@ -3020,7 +3079,12 @@ def is_rootish(self, ts: TaskState) -> bool:
Root-ish tasks are part of a group that's much larger than the cluster,
and have few or no dependencies.
"""
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
if (
ts.resource_restrictions
or ts.worker_restrictions
or ts.host_restrictions
or ts.actor
):
return False
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
Expand Down
135 changes: 135 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pickle
import re
import sys
from contextlib import AsyncExitStack
from itertools import product
from textwrap import dedent
from time import sleep
Expand Down Expand Up @@ -481,6 +482,140 @@ async def test_queued_remove_add_worker(c, s, a, b):
await wait(fs)


@gen_cluster(
client=True,
nthreads=[("", 2)],
config={
"distributed.scheduler.worker-saturation": 1.0,
},
)
async def test_queued_oversaturates_after_group_shrinks(c, s, a):
"""
When tasks switch from root-ish to non-root-ish, even though they're in `queued`,
they're scheduled without regard to worker-saturation.

This isn't really desireable behavior, it's just what happens to occur right now. If
this changes, that's okay (maybe even good).
"""
root = c.submit(inc, 1, key="root")

# Put some tasks in the queue
es = [Event() for _ in range(5)]
fs = c.map(lambda _, e: e.wait(), [root] * len(es), es)
await wait_for_state(fs[0].key, "processing", s)
assert s.queued

# Add a downstream task that depends on fs[0]
de = Event()
downstream = c.submit(lambda _: de.wait(), fs[0])
await wait_for_state(downstream.key, "waiting", s)

# Cancel one task. Group is now too small to be root-ish.
del fs[-1], es[-1]
await async_wait_for(lambda: len(s.tasks) == len(fs) + 2, 5)
if s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is still root-ish. Test may no longer be relevant."
)

# Let the downstream task schedule.
# When a slot opens and we try to schedule the next task on the queue,
# it gets scheduled as non-root-ish. So both the downstream task and the next
# queued task get assigned to the worker, exceeding worker-saturation.
await es[0].set()
await wait_for_state(downstream.key, "processing", s)
# KEY ASSERTION:
# the next task on the queue got scheduled, exceeding worker-saturation, because
# even though it was in `queued`, it was no longer root-ish.
assert len(s.workers[a.address].processing) == a.state.nthreads + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of all the new tests, this assert is the only one that fails on main

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion states that a task that is no longer classified as root-ish is still queued even though.
Is this the only error case we are concerned about? If so, I suggest to just drop the issue. If there is something else, the tests should reproduce the issue first.


# Everything runs
await de.set()
await downstream
await asyncio.gather(*(e.set() for e in es))

await c.gather(fs)


@gen_cluster(
client=True,
nthreads=[("", 2)] * 2,
config={
"distributed.worker.memory.pause": False,
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": False,
"distributed.scheduler.work-stealing": False,
},
)
async def test_queued_rootish_changes_while_paused(c, s, a, b):
"Some tasks are root-ish, some aren't. So both `unrunnable` and `queued` contain non-restricted tasks."
# NOTE: this tests the `no-worker->queued` transition when queueing is active.

root = c.submit(inc, 1, key="root")
await root

# manually pause the workers
a.status = Status.paused
b.status = Status.paused

await async_wait_for(lambda: not s.running, 5)

fs = [c.submit(inc, root, key=f"inc-{i}") for i in range(s.total_nthreads * 2 + 1)]
# ^ `c.submit` in a for-loop so the first tasks don't look root-ish (`TaskGroup` too
# small), then the last one does. So N-1 tasks will go to `no-worker`, and the last
# to `queued`. `is_rootish` is just messed up like that.
Comment on lines +563 to +566
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should verify the assumption of when the tasks are classified as root-ish. This is otherwise incredibly opaque and brittle


await async_wait_for(lambda: len(s.tasks) > len(fs), 5)

# un-pause
a.status = Status.running
b.status = Status.running
await async_wait_for(lambda: len(s.running) == len(s.workers), 5)

await c.gather(fs)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing": False},
)
async def test_queued_rootish_changes_scale_up(c, s, a):
"Tasks are initially root-ish. After cluster scales, they aren't."

root = c.submit(inc, 1, key="root")

event = Event()
clog = c.submit(event.wait, key="clog")
await wait_for_state(clog.key, "processing", s)

fs = c.map(inc, [root] * 5, key=[f"inc-{i}" for i in range(5)])

await async_wait_for(lambda: len(s.tasks) > len(fs), 5)

if not s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is not root-ish. Test may no longer be relevant."
)
if math.isfinite(s.WORKER_SATURATION):
assert s.queued

async with AsyncExitStack() as stack:
for _ in range(3):
await stack.enter_async_context(Worker(s.address, nthreads=2))

if s.is_rootish(s.tasks[fs[0].key]):
pytest.fail(
"Test assumptions have changed; task is still root-ish. Test may no longer be relevant."
)

await event.set()
await clog

# Just verify it doesn't deadlock
await c.gather(fs)
Comment on lines +578 to +616
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test works fine on main



@gen_cluster(client=True, nthreads=[("", 1)])
async def test_secede_opens_slot(c, s, a):
first = Event()
Expand Down