From 87ff98dad325973dd0fd0ef138b0f1afdb438038 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 13:49:19 -0600 Subject: [PATCH 1/5] test queued task scheduled upon `secede` --- distributed/tests/test_scheduler.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6e84db227d9..391cd554f13 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -64,7 +64,7 @@ varying, wait_for_state, ) -from distributed.worker import dumps_function, dumps_task, get_worker +from distributed.worker import dumps_function, dumps_task, get_worker, secede pytestmark = pytest.mark.ci1 @@ -479,6 +479,29 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 1)], +) +async def test_secede_opens_slot(c, s, a): + first = Event() + second = Event() + + def func(first, second): + first.wait() + secede() + second.wait() + + fs = c.map(func, [first] * 5, [second] * 5) + await async_wait_for(lambda: a.state.executing, 5) + + await first.set() + await async_wait_for(lambda: len(a.state.tasks) == len(fs), 5) + + await second.set() + await c.gather(fs) + + @pytest.mark.parametrize( "saturation_config, expected_task_counts", [ From c1ed4e05b6b89bd776844218c8c2e7fadc2f249f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 13:56:28 -0600 Subject: [PATCH 2/5] pull out logic and use in `handle_long_running` --- distributed/scheduler.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d0e31aa131f..4caeffa77bc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5338,6 +5338,9 @@ def handle_long_running( ws.add_to_long_running(ts) self.check_idle_saturated(ws) + if qts := _next_queued_when_slot_maybe_opened(self, ws): + self.transitions({qts.key: "processing"}, stimulus_id) + def handle_worker_status_change( self, status: str | Status, worker: str | WorkerState, stimulus_id: str ) -> None: @@ -7886,19 +7889,34 @@ def _exit_processing_common( state.check_idle_saturated(ws) state.release_resources(ts, ws) - # If a slot has opened up for a queued task, schedule it. - if state.queued and not _worker_full(ws, state.WORKER_SATURATION): + if qts := _next_queued_when_slot_maybe_opened(state, ws): + if state.validate: + assert qts.key not in recommendations, recommendations[qts.key] + recommendations[qts.key] = "processing" + + return ws + + +def _next_queued_when_slot_maybe_opened( + state: SchedulerState, ws: WorkerState +) -> TaskState | None: + "If a slot has opened up on this worker, return the task at the front of the queue." + if ( + state.queued + and ws.status == Status.running + and not _worker_full(ws, state.WORKER_SATURATION) + ): qts = state.queued.peek() if state.validate: assert qts.state == "queued", qts.state - assert qts.key not in recommendations, recommendations[qts.key] + assert not qts.processing_on + assert not qts.waiting_on # NOTE: we don't need to schedule more than one task at once here. Since this is # called each time 1 task completes, multiple tasks must complete for multiple # slots to open up. - recommendations[qts.key] = "processing" - - return ws + return qts + return None def _add_to_memory( From b44eb9b734d6532fb0c213cc7f5ec8159faeaccb Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 14:29:22 -0600 Subject: [PATCH 3/5] iterator and use in bulk_schedule as well I'm not totally sure about this. Yielding a single value vs returning a single value from a function is 3-4x slower in Python (45ns vs 130ns on my machine). `_exit_processing_common` is pretty hot, and especially latency sensitive with fast tasks. Probably not worth worrying about, but let's profile and see. --- distributed/scheduler.py | 59 ++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4caeffa77bc..fcc8ac8836b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3288,20 +3288,7 @@ def bulk_schedule_after_adding_worker(self, ws: WorkerState) -> Recs: Returns priority-ordered recommendations. """ - maybe_runnable: list[TaskState] = [] - # Schedule any queued tasks onto the new worker - if not math.isinf(self.WORKER_SATURATION) and self.queued: - for qts in reversed( - list( - self.queued.peekn(_task_slots_available(ws, self.WORKER_SATURATION)) - ) - ): - if self.validate: - assert qts.state == "queued" - assert not qts.processing_on - assert not qts.waiting_on - - maybe_runnable.append(qts) + maybe_runnable = list(_next_queued_when_slot_maybe_opened(self, ws))[::-1] # Schedule any restricted tasks onto the new worker, if the worker can run them for ts in self.unrunnable: @@ -5338,8 +5325,14 @@ def handle_long_running( ws.add_to_long_running(ts) self.check_idle_saturated(ws) - if qts := _next_queued_when_slot_maybe_opened(self, ws): - self.transitions({qts.key: "processing"}, stimulus_id) + recommendations = { + qts.key: "processing" + for qts in _next_queued_when_slot_maybe_opened(self, ws) + } + if self.validate: + assert len(recommendations) == 1, (ws, recommendations) + + self.transitions(recommendations, stimulus_id) def handle_worker_status_change( self, status: str | Status, worker: str | WorkerState, stimulus_id: str @@ -7889,7 +7882,7 @@ def _exit_processing_common( state.check_idle_saturated(ws) state.release_resources(ts, ws) - if qts := _next_queued_when_slot_maybe_opened(state, ws): + for qts in _next_queued_when_slot_maybe_opened(state, ws): if state.validate: assert qts.key not in recommendations, recommendations[qts.key] recommendations[qts.key] = "processing" @@ -7899,24 +7892,20 @@ def _exit_processing_common( def _next_queued_when_slot_maybe_opened( state: SchedulerState, ws: WorkerState -) -> TaskState | None: - "If a slot has opened up on this worker, return the task at the front of the queue." - if ( - state.queued - and ws.status == Status.running - and not _worker_full(ws, state.WORKER_SATURATION) - ): - qts = state.queued.peek() - if state.validate: - assert qts.state == "queued", qts.state - assert not qts.processing_on - assert not qts.waiting_on - - # NOTE: we don't need to schedule more than one task at once here. Since this is - # called each time 1 task completes, multiple tasks must complete for multiple - # slots to open up. - return qts - return None +) -> Iterator[TaskState]: + "Queued tasks to run, in priority order, if a slot may have opened up on this worker." + if state.queued and ws.status == Status.running: + # NOTE: this is called most frequently because a single task has completed, so + # there are <= 1 task slots available on the worker. `peekn` has fastpahs for + # these cases N<=0 and N==1. + for qts in state.queued.peekn( + _task_slots_available(ws, state.WORKER_SATURATION) + ): + if state.validate: + assert qts.state == "queued", qts.state + assert not qts.processing_on + assert not qts.waiting_on + yield qts def _add_to_memory( From 24ff570bdbf8af615f3153512c6b14832e9f6941 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 15:38:29 -0600 Subject: [PATCH 4/5] fix assertion --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fcc8ac8836b..76d60f46ac1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5330,7 +5330,7 @@ def handle_long_running( for qts in _next_queued_when_slot_maybe_opened(self, ws) } if self.validate: - assert len(recommendations) == 1, (ws, recommendations) + assert len(recommendations) <= 1, (ws, recommendations) self.transitions(recommendations, stimulus_id) From 9ad1608680224584e9b3b42ea47e742f37ea6075 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 31 Oct 2022 11:34:28 +0000 Subject: [PATCH 5/5] Mostly cosmetic tweaks --- distributed/scheduler.py | 35 ++++++++++++++--------------- distributed/tests/test_scheduler.py | 9 +++----- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 76d60f46ac1..98a4174775d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3288,7 +3288,7 @@ def bulk_schedule_after_adding_worker(self, ws: WorkerState) -> Recs: Returns priority-ordered recommendations. """ - maybe_runnable = list(_next_queued_when_slot_maybe_opened(self, ws))[::-1] + maybe_runnable = list(_next_queued_tasks_for_worker(self, ws))[::-1] # Schedule any restricted tasks onto the new worker, if the worker can run them for ts in self.unrunnable: @@ -5326,8 +5326,7 @@ def handle_long_running( self.check_idle_saturated(ws) recommendations = { - qts.key: "processing" - for qts in _next_queued_when_slot_maybe_opened(self, ws) + qts.key: "processing" for qts in _next_queued_tasks_for_worker(self, ws) } if self.validate: assert len(recommendations) <= 1, (ws, recommendations) @@ -7882,7 +7881,7 @@ def _exit_processing_common( state.check_idle_saturated(ws) state.release_resources(ts, ws) - for qts in _next_queued_when_slot_maybe_opened(state, ws): + for qts in _next_queued_tasks_for_worker(state, ws): if state.validate: assert qts.key not in recommendations, recommendations[qts.key] recommendations[qts.key] = "processing" @@ -7890,22 +7889,22 @@ def _exit_processing_common( return ws -def _next_queued_when_slot_maybe_opened( +def _next_queued_tasks_for_worker( state: SchedulerState, ws: WorkerState ) -> Iterator[TaskState]: - "Queued tasks to run, in priority order, if a slot may have opened up on this worker." - if state.queued and ws.status == Status.running: - # NOTE: this is called most frequently because a single task has completed, so - # there are <= 1 task slots available on the worker. `peekn` has fastpahs for - # these cases N<=0 and N==1. - for qts in state.queued.peekn( - _task_slots_available(ws, state.WORKER_SATURATION) - ): - if state.validate: - assert qts.state == "queued", qts.state - assert not qts.processing_on - assert not qts.waiting_on - yield qts + """Queued tasks to run, in priority order, on all open slots on a worker""" + if not state.queued or ws.status != Status.running: + return + + # NOTE: this is called most frequently because a single task has completed, so there + # are <= 1 task slots available on the worker. + # `peekn` has fast paths for the cases N<=0 and N==1. + for qts in state.queued.peekn(_task_slots_available(ws, state.WORKER_SATURATION)): + if state.validate: + assert qts.state == "queued", qts.state + assert not qts.processing_on + assert not qts.waiting_on + yield qts def _add_to_memory( diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 391cd554f13..db231c317dd 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -479,10 +479,7 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) -@gen_cluster( - client=True, - nthreads=[("", 1)], -) +@gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event() second = Event() @@ -493,10 +490,10 @@ def func(first, second): second.wait() fs = c.map(func, [first] * 5, [second] * 5) - await async_wait_for(lambda: a.state.executing, 5) + await async_wait_for(lambda: a.state.executing, timeout=5) await first.set() - await async_wait_for(lambda: len(a.state.tasks) == len(fs), 5) + await async_wait_for(lambda: len(a.state.long_running) == len(fs), timeout=5) await second.set() await c.gather(fs)