diff --git a/distributed/scheduler.py b/distributed/scheduler.py index dcb2013c194..ccfc3dbf995 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2141,11 +2141,24 @@ def decide_worker(self, ts: TaskState) -> WorkerState: else: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) + wp_vals = worker_pool.values() n_workers: Py_ssize_t = len(worker_pool_dv) if n_workers < 20: # smart but linear in small case - ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) + ws = min(wp_vals, key=operator.attrgetter("occupancy")) + if ws._occupancy == 0: + # special case to use round-robin; linear search + # for next worker with zero occupancy (or just + # land back where we started). + wp_i: WorkerState + start: Py_ssize_t = self._n_tasks % n_workers + i: Py_ssize_t + for i in range(n_workers): + wp_i = wp_vals[(i + start) % n_workers] + if wp_i._occupancy == 0: + ws = wp_i + break else: # dumb but fast in large case - ws = worker_pool.values()[self._n_tasks % n_workers] + ws = wp_vals[self._n_tasks % n_workers] if self._validate: assert ws is None or isinstance(ws, WorkerState), ( diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 555bcb86fbe..3942a2ca010 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -210,26 +210,24 @@ def test_unsupported_arguments(client, s, a, b): def test_retries(client): args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 42] - with client.get_executor(retries=3, pure=False) as e: + with client.get_executor(retries=5, pure=False) as e: future = e.submit(varying(args)) assert future.result() == 42 - with client.get_executor(retries=2) as e: + with client.get_executor(retries=4) as e: future = e.submit(varying(args)) result = future.result() assert result == 42 - with client.get_executor(retries=1) as e: + with client.get_executor(retries=2) as e: future = e.submit(varying(args)) - with pytest.raises(ZeroDivisionError) as exc_info: + with pytest.raises(ZeroDivisionError, match="two"): res = future.result() - exc_info.match("two") with client.get_executor(retries=0) as e: future = e.submit(varying(args)) - with pytest.raises(ZeroDivisionError) as exc_info: + with pytest.raises(ZeroDivisionError, match="one"): res = future.result() - exc_info.match("one") def test_shutdown(loop): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f2fc346d219..c82a4a04839 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2234,3 +2234,11 @@ async def test_get_worker_monitor_info(s, a, b): assert all(res[w.address]["range_query"][m] is not None for m in ms) assert res[w.address]["count"] is not None assert res[w.address]["last_time"] is not None + + +@gen_cluster(client=True) +async def test_quiet_cluster_round_robin(c, s, a, b): + await c.submit(inc, 1) + await c.submit(inc, 2) + await c.submit(inc, 3) + assert a.log and b.log