From 5181bbf71899818eb644d604f15973e31d930e03 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 5 May 2022 18:32:18 +0200 Subject: [PATCH 01/19] Initial attempt --- distributed/utils_test.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 4dce47c3bea..12923ce6c60 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -28,6 +28,7 @@ from typing import Any, Literal from distributed.compatibility import MACOS +from distributed.profile import wait_profiler from distributed.scheduler import Scheduler try: @@ -1769,6 +1770,14 @@ def check_instances(): _global_clients.clear() + for s in Scheduler._instances: + s.extensions.clear() + s.plugins.clear() + s.services.clear() + # No close methods, destroy them + del s.http_application + del s.http_server + for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop w.loop.add_callback(w.close, report=False, executor_wait=False) @@ -1800,8 +1809,22 @@ def check_instances(): assert all(c.status == Status.closed for c in SpecCluster._instances), list( SpecCluster._instances ) - SpecCluster._instances.clear() + wait_profiler() + gc.collect() + if Scheduler._instances: + s = next(iter(Scheduler._instances)) + import objgraph + + def ignore(obj): + return ( + # ignore bound methods temporarily + not (inspect.ismethod(obj) and obj.__self__ is s)) + objgraph.show_backrefs([s], filename="scheduler.png", filter=ignore) + assert not Scheduler._instances + + + SpecCluster._instances.clear() Nanny._instances.clear() DequeHandler.clear_all_instances() From 0221a0f9d574c8d65749cde2552ad7189558ac8e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Thu, 5 May 2022 18:45:28 +0200 Subject: [PATCH 02/19] Clear out handlers and comms --- distributed/utils_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 12923ce6c60..6f27ca134ad 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1774,6 +1774,9 @@ def check_instances(): s.extensions.clear() s.plugins.clear() s.services.clear() + s.handlers.clear() + s.stream_handlers.clear() + s.stream_comms.clear() # No close methods, destroy them del s.http_application del s.http_server From 8f39d6d29e4a76a8a2dbd1b31964b260195613d7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 6 May 2022 08:51:20 +0200 Subject: [PATCH 03/19] Clear transitions_table --- distributed/utils_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 6f27ca134ad..1899f952ccd 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1777,6 +1777,7 @@ def check_instances(): s.handlers.clear() s.stream_handlers.clear() s.stream_comms.clear() + s.transitions_table.clear() # No close methods, destroy them del s.http_application del s.http_server @@ -1818,12 +1819,7 @@ def check_instances(): s = next(iter(Scheduler._instances)) import objgraph - def ignore(obj): - return ( - # ignore bound methods temporarily - not (inspect.ismethod(obj) and obj.__self__ is s)) - - objgraph.show_backrefs([s], filename="scheduler.png", filter=ignore) + objgraph.show_backrefs([s], filename="scheduler.png") assert not Scheduler._instances From 98fbb38854d8e922ed89a92a72eea6f4c66c42c4 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 6 May 2022 10:31:57 +0200 Subject: [PATCH 04/19] Clean IOLoops before instance checking so that any callbacks retaining instance references are removed --- distributed/utils_test.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 1899f952ccd..c207e29a2b1 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1778,7 +1778,7 @@ def check_instances(): s.stream_handlers.clear() s.stream_comms.clear() s.transitions_table.clear() - # No close methods, destroy them + # No close method, cut the loop del s.http_application del s.http_server @@ -1822,7 +1822,6 @@ def check_instances(): objgraph.show_backrefs([s], filename="scheduler.png") assert not Scheduler._instances - SpecCluster._instances.clear() Nanny._instances.clear() DequeHandler.clear_all_instances() @@ -1831,9 +1830,9 @@ def check_instances(): @contextmanager def clean(threads=True, instances=True, timeout=1, processes=True): with check_thread_leak() if threads else nullcontext(): - with pristine_loop() as loop: - with check_process_leak(check=processes): - with check_instances() if instances else nullcontext(): + with check_instances() if instances else nullcontext(): + with pristine_loop() as loop: + with check_process_leak(check=processes): with check_active_rpc(loop, timeout): reset_config() From 72276d599a51abb99e669fb448eaa5126ed92679 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 6 May 2022 10:32:28 +0200 Subject: [PATCH 05/19] Comment out debug code --- distributed/utils_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c207e29a2b1..23f1d60c153 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1815,11 +1815,11 @@ def check_instances(): ) wait_profiler() gc.collect() - if Scheduler._instances: - s = next(iter(Scheduler._instances)) - import objgraph + # if Scheduler._instances: + # s = next(iter(Scheduler._instances)) + # import objgraph - objgraph.show_backrefs([s], filename="scheduler.png") + # objgraph.show_backrefs([s], filename="scheduler.png") assert not Scheduler._instances SpecCluster._instances.clear() From 0bca52a59c63705ec4c581ad2f6c59ce0cf29433 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Fri, 6 May 2022 11:15:11 +0200 Subject: [PATCH 06/19] Clear out listeners and delete scheduler instances --- distributed/utils_test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 23f1d60c153..4ee198a5297 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1774,6 +1774,7 @@ def check_instances(): s.extensions.clear() s.plugins.clear() s.services.clear() + s.listeners.clear() s.handlers.clear() s.stream_handlers.clear() s.stream_comms.clear() @@ -1781,6 +1782,7 @@ def check_instances(): # No close method, cut the loop del s.http_application del s.http_server + del s for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop @@ -1815,11 +1817,11 @@ def check_instances(): ) wait_profiler() gc.collect() - # if Scheduler._instances: - # s = next(iter(Scheduler._instances)) - # import objgraph + if Scheduler._instances: + s = next(iter(Scheduler._instances)) + import objgraph - # objgraph.show_backrefs([s], filename="scheduler.png") + objgraph.show_backrefs([s], filename="scheduler.png") assert not Scheduler._instances SpecCluster._instances.clear() From 79e43bba24c84411b3b68be34f95f096d4368902 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 09:31:50 +0200 Subject: [PATCH 07/19] Move instance clearing logic into Scheduler.clear_instances --- distributed/scheduler.py | 21 +++++++++++++++++++++ distributed/utils_test.py | 14 +------------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3708ef3fc06..01cbe26905b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7259,6 +7259,27 @@ def request_remove_replicas(self, addr: str, keys: list, *, stimulus_id: str): } ) + def clear_instance(self): + self.extensions.clear() + self.plugins.clear() + self.services.clear() + self.listeners.clear() + self.handlers.clear() + self.periodic_callbacks.clear() + self.stream_handlers.clear() + self.stream_comms.clear() + self.transitions_table.clear() + self.log.clear() + self.transition_log.clear() + + del self.http_application + del self.http_server + + @classmethod + def clear_instances(cls): + for instance in cls._instances: + instance.clear_instance() + def _remove_from_processing( state: SchedulerState, ts: TaskState diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 4ee198a5297..8db0a79eade 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1770,19 +1770,7 @@ def check_instances(): _global_clients.clear() - for s in Scheduler._instances: - s.extensions.clear() - s.plugins.clear() - s.services.clear() - s.listeners.clear() - s.handlers.clear() - s.stream_handlers.clear() - s.stream_comms.clear() - s.transitions_table.clear() - # No close method, cut the loop - del s.http_application - del s.http_server - del s + Scheduler.clear_instances() for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop From 5279f2a146cab1a1ffc23bb20505967948fa834c Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 09:32:43 +0200 Subject: [PATCH 08/19] Clear log handlers prior to garbage collection --- distributed/utils_test.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 8db0a79eade..5dbfca84c75 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1803,8 +1803,20 @@ def check_instances(): assert all(c.status == Status.closed for c in SpecCluster._instances), list( SpecCluster._instances ) + + DequeHandler.clear_all_instances() + + from _pytest.logging import LogCaptureHandler + + for v in logging.Logger.manager.loggerDict.values(): + if not isinstance(v, logging.PlaceHolder): + for h in v.handlers: + if isinstance(h, LogCaptureHandler): + h.reset() + wait_profiler() gc.collect() + if Scheduler._instances: s = next(iter(Scheduler._instances)) import objgraph @@ -1814,7 +1826,6 @@ def check_instances(): SpecCluster._instances.clear() Nanny._instances.clear() - DequeHandler.clear_all_instances() @contextmanager From f7340381499fb576bebe106bad300e2d84a854e9 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 09:45:08 +0200 Subject: [PATCH 09/19] Don't check instances in secession test case --- distributed/tests/test_worker_client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index ec76d45740a..c69ecda7a80 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -256,7 +256,11 @@ def test_secede_without_stealing_issue_1262(): """ # run the loop as an inner function so all workers are closed # and exceptions can be examined - @gen_cluster(client=True, scheduler_kwargs={"extensions": {}}) + @gen_cluster( + client=True, + scheduler_kwargs={"extensions": {}}, + clean_kwargs={"instances": False}, + ) async def secede_test(c, s, a, b): def func(x): with worker_client() as wc: From d7549b0a3a98130a825936edaf98893ec9a6c0c9 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 10:13:12 +0200 Subject: [PATCH 10/19] Clear has_keyword LRU cache --- distributed/utils_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 5dbfca84c75..e4d7b9ff558 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -30,6 +30,7 @@ from distributed.compatibility import MACOS from distributed.profile import wait_profiler from distributed.scheduler import Scheduler +from distributed.utils import has_keyword try: import ssl @@ -1814,6 +1815,8 @@ def check_instances(): if isinstance(h, LogCaptureHandler): h.reset() + has_keyword.cache_clear() + wait_profiler() gc.collect() From 6d7ab9ade9b04f7e9da92262ad87bc55063ddc51 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 11:28:13 +0200 Subject: [PATCH 11/19] Disable instance checking in test_rebalance_workers_and_keys --- distributed/tests/test_client.py | 7 ++++++- distributed/tests/test_scheduler.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 2c458b0ea65..cd811f4b393 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2888,7 +2888,12 @@ async def test_rebalance(c, s, a, b): assert len(b.data) == 50 -@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG) +@gen_cluster( + nthreads=[("", 1)] * 3, + client=True, + config=REBALANCE_MANAGED_CONFIG, + clean_kwargs={"instances": False}, +) async def test_rebalance_workers_and_keys(client, s, a, b, c): """Test Client.rebalance(). These are just to test the Client wrapper around Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py. diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6401a873e60..2eba108ca44 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2794,7 +2794,12 @@ async def test_rebalance_managed_memory(c, s, a, b): assert len(b.data) == 50 -@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG) +@gen_cluster( + nthreads=[("", 1)] * 3, + client=True, + config=REBALANCE_MANAGED_CONFIG, + clean_kwargs={"instances": False}, +) async def test_rebalance_workers_and_keys(client, s, a, b, c): futures = await client.scatter(range(100), workers=[a.address]) assert (len(a.data), len(b.data), len(c.data)) == (100, 0, 0) From 2a9e17d289340f3c1f9b906729e62c4724a8fa8c Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 11:52:55 +0200 Subject: [PATCH 12/19] Disable instance checks in test_quiet_client_close_when_cluster_is_closed_before_client --- distributed/tests/test_client.py | 1 + distributed/utils_test.py | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index cd811f4b393..045f353218f 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4923,6 +4923,7 @@ def test_quiet_client_close(loop): @pytest.mark.slow +@pytest.mark.parametrize("loop", [{"instances": False}], indirect=True) def test_quiet_client_close_when_cluster_is_closed_before_client(loop): with captured_logger(logging.getLogger("tornado.application")) as logger: cluster = LocalCluster(loop=loop, n_workers=1, dashboard_address=":0") diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e4d7b9ff558..6b06669e66e 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -141,9 +141,11 @@ async def cleanup_global_workers(): await worker.close(report=False, executor_wait=False) -@pytest.fixture -def loop(): - with check_instances(): +@pytest.fixture(params={"instances": True}) +def loop(request): + instances = request.param["instances"] + + with check_instances() if instances else nullcontext(): with pristine_loop() as loop: # Monkey-patch IOLoop.start to wait for loop stop orig_start = loop.start From a5a82b9dac2cfedd57bf6c725732b2eebf56df47 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 11:53:29 +0200 Subject: [PATCH 13/19] Disable instance checks in further tests --- distributed/tests/test_scheduler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 2eba108ca44..cb9e3ed4128 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2170,7 +2170,11 @@ async def test_gather_no_workers(c, s, a, b): @pytest.mark.slow @pytest.mark.parametrize("reschedule_different_worker", [True, False]) @pytest.mark.parametrize("swap_data_insert_order", [True, False]) -@gen_cluster(client=True, client_kwargs={"direct_to_workers": False}) +@gen_cluster( + client=True, + client_kwargs={"direct_to_workers": False}, + clean_kwargs={"instances": False}, +) async def test_gather_allow_worker_reconnect( c, s, a, b, reschedule_different_worker, swap_data_insert_order ): @@ -3243,7 +3247,7 @@ async def test_worker_heartbeat_after_cancel(c, s, *workers): await asyncio.gather(*(w.heartbeat() for w in workers)) -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster(client=True, nthreads=[("", 1)], client_kwargs={"instances": False}) async def test_worker_reconnect_task_memory(c, s, a): a.periodic_callbacks["heartbeat"].stop() From 7b68b22cb34a651814e39930463e18a7f13bb387 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 12:18:37 +0200 Subject: [PATCH 14/19] Add stimulus_id kwarg to reschedule --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 01cbe26905b..b481c72c0d0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6688,7 +6688,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key=None, worker=None): + def reschedule(self, key=None, worker=None, stimulus_id=None): """Reschedule a task Things may have shifted and this task may now be better suited to run @@ -6706,7 +6706,7 @@ def reschedule(self, key=None, worker=None): return if worker and ts.processing_on.address != worker: return - self.transitions({key: "released"}, f"reschedule-{time()}") + self.transitions({key: "released"}, stimulus_id or f"reschedule-{time()}") ##################### # Utility functions # From decefdddba18b6fd339100940cfe2cfe3f9684c7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 13:51:44 +0200 Subject: [PATCH 15/19] Clear scheduler from SpecCluster instances --- distributed/deploy/spec.py | 8 ++++++++ distributed/utils_test.py | 8 ++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 96a66d63aed..5d59eea949f 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -630,6 +630,14 @@ def from_name(cls, name: str): """Create an instance of this class to represent an existing cluster by name.""" raise NotImplementedError() + def clear_instance(self): + del self.scheduler + + @classmethod + def clear_instances(cls): + for instance in cls._instances: + instance.clear_instance() + async def run_spec(spec: dict, *args): workers = {} diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 06db5290b1e..ad861959723 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1802,12 +1802,8 @@ def check_instances(): for n in Nanny._instances ), {n: n.status for n in Nanny._instances} - # assert not list(SpecCluster._instances) # TODO - assert all(c.status == Status.closed for c in SpecCluster._instances), list( - SpecCluster._instances - ) - - DequeHandler.clear_all_instances() + SpecCluster.clear_instances() + DequeHandler.clear_instances() from _pytest.logging import LogCaptureHandler From 30956cbcc913ff649ad22cf930756aeb9a2435ee Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 13:52:01 +0200 Subject: [PATCH 16/19] Fix loop fixture params --- distributed/utils_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index ad861959723..40b8e1ff4a3 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -140,7 +140,7 @@ async def cleanup_global_workers(): await worker.close(report=False, executor_wait=False) -@pytest.fixture(params={"instances": True}) +@pytest.fixture(params=[{"instances": True}]) def loop(request): instances = request.param["instances"] From cbe992d5cd32632955ed1a2fa20a5afb81803a4e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 13:54:38 +0200 Subject: [PATCH 17/19] Make clear_instances the default across different types --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index 475ea836aa5..d3f2793ac19 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1152,7 +1152,7 @@ def clear(self): self.deque.clear() @classmethod - def clear_all_instances(cls): + def clear_instances(cls): """ Clear the internal storage of all live DequeHandlers. """ From 26eae508d1b6b51d10a73c33e3ec57544dbe5895 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 13:55:13 +0200 Subject: [PATCH 18/19] Disable instance checking in some test cases --- distributed/tests/test_client.py | 3 --- distributed/tests/test_cluster_dump.py | 2 +- distributed/tests/test_publish.py | 4 ++-- distributed/tests/test_scheduler.py | 4 ++-- distributed/tests/test_stories.py | 2 +- 5 files changed, 6 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 948f582d32f..01fbe1d9a1e 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4476,9 +4476,6 @@ def assert_no_data_loss(scheduler): @gen_cluster(client=True) async def test_interleave_computations(c, s, a, b): - import distributed - - distributed.g = s xs = [delayed(slowinc)(i, delay=0.02) for i in range(30)] ys = [delayed(slowdec)(x, delay=0.02) for x in xs] zs = [delayed(slowadd)(x, y, delay=0.02) for x, y in zip(xs, ys)] diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 540e0abb4e9..a5c3d525247 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -55,7 +55,7 @@ def blocked_inc(x, event): return x + 1 -@gen_cluster(client=True) +@gen_cluster(client=True, clean_kwargs={"instances": False}) async def test_cluster_dump_state(c, s, a, b, tmp_path): filename = tmp_path / "dump" futs = c.map(inc, range(2)) diff --git a/distributed/tests/test_publish.py b/distributed/tests/test_publish.py index 85d3fc5ff4d..8b8954ac9c1 100644 --- a/distributed/tests/test_publish.py +++ b/distributed/tests/test_publish.py @@ -11,7 +11,7 @@ from distributed.utils_test import gen_cluster, inc -@gen_cluster() +@gen_cluster(clean_kwargs={"instances": False}) async def test_publish_simple(s, a, b): c = Client(s.address, asynchronous=True) f = Client(s.address, asynchronous=True) @@ -259,7 +259,7 @@ async def test_datasets_async(c, s, a, b): len(c.datasets) -@gen_cluster(client=True) +@gen_cluster(client=True, clean_kwargs={"instances": False}) async def test_pickle_safe(c, s, a, b): async with Client(s.address, asynchronous=True, serializers=["msgpack"]) as c2: await c2.publish_dataset(x=[1, 2, 3]) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d8bc818c7a4..4a3e4dbc8d6 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3259,7 +3259,7 @@ async def test_worker_heartbeat_after_cancel(c, s, *workers): await asyncio.gather(*(w.heartbeat() for w in workers)) -@gen_cluster(client=True, nthreads=[("", 1)], client_kwargs={"instances": False}) +@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False}) async def test_worker_reconnect_task_memory(c, s, a): a.periodic_callbacks["heartbeat"].stop() @@ -3279,7 +3279,7 @@ async def test_worker_reconnect_task_memory(c, s, a): } -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False}) async def test_worker_reconnect_task_memory_with_resources(c, s, a): async with Worker(s.address, resources={"A": 1}) as b: while s.workers[b.address].status != Status.running: diff --git a/distributed/tests/test_stories.py b/distributed/tests/test_stories.py index 1616422f252..0571daba42b 100644 --- a/distributed/tests/test_stories.py +++ b/distributed/tests/test_stories.py @@ -104,7 +104,7 @@ async def get_story(self, *args, **kw): raise CommClosedError -@gen_cluster(client=True, Worker=WorkerBrokenStory) +@gen_cluster(client=True, Worker=WorkerBrokenStory, clean_kwargs={"instances": False}) @pytest.mark.parametrize("on_error", ["ignore", "raise"]) async def test_client_story_failed_worker(c, s, a, b, on_error): f = c.submit(inc, 1) From 11de7b77413e2d21adf4e7dd490efc4ef1591f70 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 9 May 2022 14:42:43 +0200 Subject: [PATCH 19/19] Disable instance checks in dashboards and deploy --- distributed/dashboard/tests/test_components.py | 2 +- distributed/dashboard/tests/test_scheduler_bokeh.py | 5 ++++- distributed/dashboard/tests/test_worker_bokeh.py | 4 +++- distributed/deploy/tests/test_local.py | 4 ++-- distributed/http/scheduler/tests/test_scheduler_http.py | 4 ++-- distributed/http/scheduler/tests/test_semaphore_http.py | 2 +- distributed/tests/test_semaphore.py | 2 +- 7 files changed, 14 insertions(+), 9 deletions(-) diff --git a/distributed/dashboard/tests/test_components.py b/distributed/dashboard/tests/test_components.py index bc9f6c74849..e3c21a688d1 100644 --- a/distributed/dashboard/tests/test_components.py +++ b/distributed/dashboard/tests/test_components.py @@ -30,7 +30,7 @@ async def test_profile_plot(c, s, a, b): assert len(p.source.data["left"]) >= 1 -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_profile_time_plot(c, s, a, b): from bokeh.io import curdoc diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 0d1649de0d5..762846fc18a 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -55,7 +55,9 @@ scheduler.PROFILING = False # type: ignore -@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) +@gen_cluster( + client=True, scheduler_kwargs={"dashboard": True}, clean_kwargs={"instances": False} +) async def test_simple(c, s, a, b): port = s.http_server.port @@ -883,6 +885,7 @@ async def test_lots_of_tasks(c, s, a, b): @gen_cluster( client=True, scheduler_kwargs={"dashboard": True}, + clean_kwargs={"instances": False}, config={ "distributed.scheduler.dashboard.tls.key": get_cert("tls-key.pem"), "distributed.scheduler.dashboard.tls.cert": get_cert("tls-cert.pem"), diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index b6f5476f618..b6e72ce56aa 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -153,7 +153,9 @@ async def test_CommunicatingStream(c, s, a, b): @gen_cluster( - client=True, clean_kwargs={"threads": False}, worker_kwargs={"dashboard": True} + client=True, + clean_kwargs={"threads": False, "instances": False}, + worker_kwargs={"dashboard": True}, ) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index fca1fc4c550..63a87e92912 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -371,7 +371,7 @@ async def test_memory_limit_none(): def test_cleanup(): - with clean(threads=False): + with clean(threads=False, instances=False): c = LocalCluster(n_workers=2, silence_logs=False, dashboard_address=":0") port = c.scheduler.port c.close() @@ -882,7 +882,7 @@ def test_starts_up_sync(loop): def test_dont_select_closed_worker(): # Make sure distributed does not try to reuse a client from a # closed cluster (https://github.com/dask/distributed/issues/2840). - with clean(threads=False): + with clean(threads=False, instances=False): cluster = LocalCluster(n_workers=0, dashboard_address=":0") c = Client(cluster) cluster.scale(2) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index dce785fdce5..91ab1b26ac2 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -77,7 +77,7 @@ async def test_prefix(c, s, a, b): assert is_valid_xml(body) -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families @@ -103,7 +103,7 @@ async def test_prometheus(c, s, a, b): assert client.samples[0].value == 1.0 -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 67842e141fb..3fa239d4a48 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -5,7 +5,7 @@ from distributed.utils_test import gen_cluster -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families diff --git a/distributed/tests/test_semaphore.py b/distributed/tests/test_semaphore.py index 50ad43dfce8..004d2f30a32 100644 --- a/distributed/tests/test_semaphore.py +++ b/distributed/tests/test_semaphore.py @@ -189,7 +189,7 @@ def f(x, release=True): @pytest.mark.slow -@gen_cluster(client=True, timeout=120) +@gen_cluster(client=True, timeout=120, clean_kwargs={"instances": False}) async def test_close_async(c, s, a, b): sem = await Semaphore(name="test")