From 7e9bc1c7b70c85c5dfd41ba0f88476d57253ac33 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 26 Mar 2021 14:47:12 -0400 Subject: [PATCH 01/17] Add protection against repeated use of one worker in a quiet cluster ref dask/distributed#4637 --- distributed/scheduler.py | 8 +++++++- distributed/tests/test_scheduler.py | 8 ++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6d0da803bac..46d9ef6512b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2124,7 +2124,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) n_workers: Py_ssize_t = len(worker_pool_dv) - if n_workers < 20: # smart but linear in small case + # if all occupancies in worker pool of size less than 20 + # sum to under 0.1 of 1ms; go to the else branch (a round + # robin) because the cluster is considered quiet. + if ( + n_workers < 20 + and sum(w.occupancy for w in worker_pool.values()) > 1.0e-04 + ): # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 930239e5c9e..b4905dd5311 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1691,6 +1691,14 @@ async def test_result_type(c, s, a, b): assert "int" in s.tasks[x.key].type +@gen_cluster(client=True) +async def test_round_robin_dd(c, s, a, b): + await c.submit(inc, 1) + await c.submit(inc, 2) + await c.submit(inc, 3) + assert a.log and b.log + + @gen_cluster() async def test_close_workers(s, a, b): await s.close(close_workers=True) From 19c5f403aa4fcbe547b0d521768be128668a2c51 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 26 Mar 2021 14:54:35 -0400 Subject: [PATCH 02/17] move and rename test --- distributed/tests/test_scheduler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index b4905dd5311..03d7b6d7509 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1691,14 +1691,6 @@ async def test_result_type(c, s, a, b): assert "int" in s.tasks[x.key].type -@gen_cluster(client=True) -async def test_round_robin_dd(c, s, a, b): - await c.submit(inc, 1) - await c.submit(inc, 2) - await c.submit(inc, 3) - assert a.log and b.log - - @gen_cluster() async def test_close_workers(s, a, b): await s.close(close_workers=True) @@ -2209,3 +2201,11 @@ async def test_configurable_events_log_length(c, s, a, b): assert s.events["test"][0][1] == "dummy message 2" assert s.events["test"][1][1] == "dummy message 3" assert s.events["test"][2][1] == "dummy message 4" + + +@gen_cluster(client=True) +async def test_quiet_cluster_round_robin(c, s, a, b): + await c.submit(inc, 1) + await c.submit(inc, 2) + await c.submit(inc, 3) + assert a.log and b.log From 59508e690e23de0deda9a9c964b49b7619c4f163 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 26 Mar 2021 16:11:25 -0400 Subject: [PATCH 03/17] implement matt's first suggestion; passes cythonization --- distributed/scheduler.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 46d9ef6512b..593aaeefddd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2124,14 +2124,10 @@ def decide_worker(self, ts: TaskState) -> WorkerState: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) n_workers: Py_ssize_t = len(worker_pool_dv) - # if all occupancies in worker pool of size less than 20 - # sum to under 0.1 of 1ms; go to the else branch (a round - # robin) because the cluster is considered quiet. - if ( - n_workers < 20 - and sum(w.occupancy for w in worker_pool.values()) > 1.0e-04 - ): # smart but linear in small case + if n_workers < 20: ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) + if ws.occupancy == 0: + ws = worker_pool.values()[self._n_tasks % n_workers] else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] From af580fa1260d7895b948c7b2c168acfcf30e61c2 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 26 Mar 2021 16:12:35 -0400 Subject: [PATCH 04/17] add comments to current implementation --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 593aaeefddd..e939392f1b0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2124,9 +2124,9 @@ def decide_worker(self, ts: TaskState) -> WorkerState: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) n_workers: Py_ssize_t = len(worker_pool_dv) - if n_workers < 20: + if n_workers < 20: # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) - if ws.occupancy == 0: + if ws.occupancy == 0: # special case to use round-robin ws = worker_pool.values()[self._n_tasks % n_workers] else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] From cb1bdc74b256577fc39ba7585d26d3d759660025 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 31 Mar 2021 20:57:36 -0400 Subject: [PATCH 05/17] zoom into self._n_task; then linear search for 0 occupancy --- distributed/scheduler.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e939392f1b0..08fa3ad8d91 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2127,7 +2127,17 @@ def decide_worker(self, ts: TaskState) -> WorkerState: if n_workers < 20: # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) if ws.occupancy == 0: # special case to use round-robin - ws = worker_pool.values()[self._n_tasks % n_workers] + wp_vals = worker_pool.values() + start = self._n_tasks % n_workers + for i in range(start, n_workers): + if wp_vals[i].occupancy == 0: + ws = wp_vals[i] + break + else: + for i in range(start): + if wp_vals[i].occupancy == 0: + ws = wp_vals[i] + break else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] From 78dd9e97fedb5834b2595f2c7531fa90b7368b4f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 1 Apr 2021 23:08:02 -0400 Subject: [PATCH 06/17] Update distributed/scheduler.py use type annotated attribute Co-authored-by: jakirkham --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 08fa3ad8d91..e2f7fec7937 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2126,7 +2126,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: n_workers: Py_ssize_t = len(worker_pool_dv) if n_workers < 20: # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) - if ws.occupancy == 0: # special case to use round-robin + if ws._occupancy == 0: # special case to use round-robin wp_vals = worker_pool.values() start = self._n_tasks % n_workers for i in range(start, n_workers): From a111b8d7733fcf96679f8ea2a63209159efd971e Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 1 Apr 2021 23:08:31 -0400 Subject: [PATCH 07/17] Update distributed/scheduler.py type annotations Co-authored-by: jakirkham --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e2f7fec7937..05006f198f9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2128,7 +2128,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) if ws._occupancy == 0: # special case to use round-robin wp_vals = worker_pool.values() - start = self._n_tasks % n_workers + start: Py_ssize_t = self._n_tasks % n_workers + i: Py_ssize_t for i in range(start, n_workers): if wp_vals[i].occupancy == 0: ws = wp_vals[i] From 5c8f00445a128f592e120fefbf5f29d05d8065a5 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 1 Apr 2021 23:08:43 -0400 Subject: [PATCH 08/17] Update distributed/scheduler.py type annotations Co-authored-by: jakirkham --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 05006f198f9..0d897da4dc5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2128,6 +2128,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) if ws._occupancy == 0: # special case to use round-robin wp_vals = worker_pool.values() + wp_i: WorkerState start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t for i in range(start, n_workers): From 4a0332ec50d31e5e3abdeb4e0d763f54084f4f5c Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 1 Apr 2021 23:08:55 -0400 Subject: [PATCH 09/17] Update distributed/scheduler.py type annotations Co-authored-by: jakirkham --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0d897da4dc5..2a3ce57a463 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2132,8 +2132,9 @@ def decide_worker(self, ts: TaskState) -> WorkerState: start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t for i in range(start, n_workers): - if wp_vals[i].occupancy == 0: - ws = wp_vals[i] + wp_i = wp_vals[i] + if wp_i._occupancy == 0: + ws = wp_i break else: for i in range(start): From d4ce4a5fb772c4ecb7d6eecaf314b9037e7e3e78 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 1 Apr 2021 23:09:16 -0400 Subject: [PATCH 10/17] type annotations Co-authored-by: jakirkham --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2a3ce57a463..7ad981b8ffd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2138,8 +2138,9 @@ def decide_worker(self, ts: TaskState) -> WorkerState: break else: for i in range(start): - if wp_vals[i].occupancy == 0: - ws = wp_vals[i] + wp_i = wp_vals[i] + if wp_i._occupancy == 0: + ws = wp_i break else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] From 309415cfde18d09c41374bd8daaba8b5b553a332 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 6 Apr 2021 17:38:06 -0400 Subject: [PATCH 11/17] simplify loop looking for next zero occupancy worker; inline comment --- distributed/scheduler.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7ad981b8ffd..c54085598fc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2126,22 +2126,19 @@ def decide_worker(self, ts: TaskState) -> WorkerState: n_workers: Py_ssize_t = len(worker_pool_dv) if n_workers < 20: # smart but linear in small case ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) - if ws._occupancy == 0: # special case to use round-robin + if ws._occupancy == 0: + # special case to use round-robin; linear search + # for next worker with zero occupancy (or just + # land back where we started). wp_vals = worker_pool.values() wp_i: WorkerState start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t - for i in range(start, n_workers): - wp_i = wp_vals[i] + for i in range(0, n_workers): + wp_i = wp_vals[i + start % n_workers] if wp_i._occupancy == 0: ws = wp_i break - else: - for i in range(start): - wp_i = wp_vals[i] - if wp_i._occupancy == 0: - ws = wp_i - break else: # dumb but fast in large case ws = worker_pool.values()[self._n_tasks % n_workers] From dd2a99a3f20d384a0f384caedb7b856f587ba531 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 6 Apr 2021 17:41:18 -0400 Subject: [PATCH 12/17] bring back test after conflict fix. --- distributed/tests/test_scheduler.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6ae9eabdaaa..e35079e40f1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2212,4 +2212,12 @@ async def test_get_worker_monitor_info(s, a, b): for w in (a, b): assert all(res[w.address]["range_query"][m] is not None for m in ms) assert res[w.address]["count"] is not None - assert res[w.address]["last_time"] is not None \ No newline at end of file + assert res[w.address]["last_time"] is not None + + +@gen_cluster(client=True) +async def test_quiet_cluster_round_robin(c, s, a, b): + await c.submit(inc, 1) + await c.submit(inc, 2) + await c.submit(inc, 3) + assert a.log and b.log From 03de0d4134039caf3000002c88ac3daef070be8e Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 6 Apr 2021 17:47:20 -0400 Subject: [PATCH 13/17] Update distributed/scheduler.py Co-authored-by: jakirkham --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4276dd4e3cf..ed55aecefdf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2134,7 +2134,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: wp_i: WorkerState start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t - for i in range(0, n_workers): + for i in range(n_workers): wp_i = wp_vals[i + start % n_workers] if wp_i._occupancy == 0: ws = wp_i From 74a81dc650eb9b58c8ea85d6d4f4f00b68701022 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 8 Apr 2021 14:55:39 -0400 Subject: [PATCH 14/17] trigger CI From 0fbbc9ce1d2246843f93e7b32b48d6527450f02f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 8 Apr 2021 22:12:38 -0500 Subject: [PATCH 15/17] Update distributed/scheduler.py --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ed55aecefdf..3318c4ce18c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2135,7 +2135,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t for i in range(n_workers): - wp_i = wp_vals[i + start % n_workers] + wp_i = wp_vals[(i + start) % n_workers] if wp_i._occupancy == 0: ws = wp_i break From 1dd5622800e28a5c873a721f8c6eb6dc4532bf14 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 9 Apr 2021 12:30:17 -0400 Subject: [PATCH 16/17] reuse worker_pool.values() via wp_vals value --- distributed/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3318c4ce18c..814f4f99eda 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2123,14 +2123,14 @@ def decide_worker(self, ts: TaskState) -> WorkerState: else: worker_pool = self._idle or self._workers worker_pool_dv = cast(dict, worker_pool) + wp_vals = worker_pool.values() n_workers: Py_ssize_t = len(worker_pool_dv) if n_workers < 20: # smart but linear in small case - ws = min(worker_pool.values(), key=operator.attrgetter("occupancy")) + ws = min(wp_vals, key=operator.attrgetter("occupancy")) if ws._occupancy == 0: # special case to use round-robin; linear search # for next worker with zero occupancy (or just # land back where we started). - wp_vals = worker_pool.values() wp_i: WorkerState start: Py_ssize_t = self._n_tasks % n_workers i: Py_ssize_t @@ -2140,7 +2140,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = wp_i break else: # dumb but fast in large case - ws = worker_pool.values()[self._n_tasks % n_workers] + ws = wp_vals[self._n_tasks % n_workers] if self._validate: assert ws is None or isinstance(ws, WorkerState), ( From 6a768e128781573e6ba524b75d1d0a07e5b6b014 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 13 Apr 2021 17:32:27 -0400 Subject: [PATCH 17/17] implement suggestions from James related to test_retries (GH#4638) --- distributed/tests/test_client_executor.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 555bcb86fbe..3942a2ca010 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -210,26 +210,24 @@ def test_unsupported_arguments(client, s, a, b): def test_retries(client): args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 42] - with client.get_executor(retries=3, pure=False) as e: + with client.get_executor(retries=5, pure=False) as e: future = e.submit(varying(args)) assert future.result() == 42 - with client.get_executor(retries=2) as e: + with client.get_executor(retries=4) as e: future = e.submit(varying(args)) result = future.result() assert result == 42 - with client.get_executor(retries=1) as e: + with client.get_executor(retries=2) as e: future = e.submit(varying(args)) - with pytest.raises(ZeroDivisionError) as exc_info: + with pytest.raises(ZeroDivisionError, match="two"): res = future.result() - exc_info.match("two") with client.get_executor(retries=0) as e: future = e.submit(varying(args)) - with pytest.raises(ZeroDivisionError) as exc_info: + with pytest.raises(ZeroDivisionError, match="one"): res = future.result() - exc_info.match("one") def test_shutdown(loop):