diff --git a/distributed/dashboard/tests/test_components.py b/distributed/dashboard/tests/test_components.py index bc9f6c74849..e3c21a688d1 100644 --- a/distributed/dashboard/tests/test_components.py +++ b/distributed/dashboard/tests/test_components.py @@ -30,7 +30,7 @@ async def test_profile_plot(c, s, a, b): assert len(p.source.data["left"]) >= 1 -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_profile_time_plot(c, s, a, b): from bokeh.io import curdoc diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 0d1649de0d5..762846fc18a 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -55,7 +55,9 @@ scheduler.PROFILING = False # type: ignore -@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) +@gen_cluster( + client=True, scheduler_kwargs={"dashboard": True}, clean_kwargs={"instances": False} +) async def test_simple(c, s, a, b): port = s.http_server.port @@ -883,6 +885,7 @@ async def test_lots_of_tasks(c, s, a, b): @gen_cluster( client=True, scheduler_kwargs={"dashboard": True}, + clean_kwargs={"instances": False}, config={ "distributed.scheduler.dashboard.tls.key": get_cert("tls-key.pem"), "distributed.scheduler.dashboard.tls.cert": get_cert("tls-cert.pem"), diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index b6f5476f618..b6e72ce56aa 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -153,7 +153,9 @@ async def test_CommunicatingStream(c, s, a, b): @gen_cluster( - client=True, clean_kwargs={"threads": False}, worker_kwargs={"dashboard": True} + client=True, + clean_kwargs={"threads": False, "instances": False}, + worker_kwargs={"dashboard": True}, ) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 96a66d63aed..5d59eea949f 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -630,6 +630,14 @@ def from_name(cls, name: str): """Create an instance of this class to represent an existing cluster by name.""" raise NotImplementedError() + def clear_instance(self): + del self.scheduler + + @classmethod + def clear_instances(cls): + for instance in cls._instances: + instance.clear_instance() + async def run_spec(spec: dict, *args): workers = {} diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index fca1fc4c550..63a87e92912 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -371,7 +371,7 @@ async def test_memory_limit_none(): def test_cleanup(): - with clean(threads=False): + with clean(threads=False, instances=False): c = LocalCluster(n_workers=2, silence_logs=False, dashboard_address=":0") port = c.scheduler.port c.close() @@ -882,7 +882,7 @@ def test_starts_up_sync(loop): def test_dont_select_closed_worker(): # Make sure distributed does not try to reuse a client from a # closed cluster (https://github.com/dask/distributed/issues/2840). - with clean(threads=False): + with clean(threads=False, instances=False): cluster = LocalCluster(n_workers=0, dashboard_address=":0") c = Client(cluster) cluster.scale(2) diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index dce785fdce5..91ab1b26ac2 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -77,7 +77,7 @@ async def test_prefix(c, s, a, b): assert is_valid_xml(body) -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families @@ -103,7 +103,7 @@ async def test_prometheus(c, s, a, b): assert client.samples[0].value == 1.0 -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 67842e141fb..3fa239d4a48 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -5,7 +5,7 @@ from distributed.utils_test import gen_cluster -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster(client=True, clean_kwargs={"threads": False, "instances": False}) async def test_prometheus_collect_task_states(c, s, a, b): pytest.importorskip("prometheus_client") from prometheus_client.parser import text_string_to_metric_families diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ad37548766b..433aacef8c8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key=None, worker=None): + def reschedule(self, key=None, worker=None, stimulus_id=None): """Reschedule a task Things may have shifted and this task may now be better suited to run @@ -6715,7 +6715,7 @@ def reschedule(self, key=None, worker=None): return if worker and ts.processing_on.address != worker: return - self.transitions({key: "released"}, f"reschedule-{time()}") + self.transitions({key: "released"}, stimulus_id or f"reschedule-{time()}") ##################### # Utility functions # @@ -7268,6 +7268,27 @@ def request_remove_replicas(self, addr: str, keys: list, *, stimulus_id: str): } ) + def clear_instance(self): + self.extensions.clear() + self.plugins.clear() + self.services.clear() + self.listeners.clear() + self.handlers.clear() + self.periodic_callbacks.clear() + self.stream_handlers.clear() + self.stream_comms.clear() + self.transitions_table.clear() + self.log.clear() + self.transition_log.clear() + + del self.http_application + del self.http_server + + @classmethod + def clear_instances(cls): + for instance in cls._instances: + instance.clear_instance() + def _remove_from_processing( state: SchedulerState, ts: TaskState diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index a0ee538f6fa..01fbe1d9a1e 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2888,7 +2888,12 @@ async def test_rebalance(c, s, a, b): assert len(b.data) == 50 -@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG) +@gen_cluster( + nthreads=[("", 1)] * 3, + client=True, + config=REBALANCE_MANAGED_CONFIG, + clean_kwargs={"instances": False}, +) async def test_rebalance_workers_and_keys(client, s, a, b, c): """Test Client.rebalance(). These are just to test the Client wrapper around Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py. @@ -4471,9 +4476,6 @@ def assert_no_data_loss(scheduler): @gen_cluster(client=True) async def test_interleave_computations(c, s, a, b): - import distributed - - distributed.g = s xs = [delayed(slowinc)(i, delay=0.02) for i in range(30)] ys = [delayed(slowdec)(x, delay=0.02) for x in xs] zs = [delayed(slowadd)(x, y, delay=0.02) for x, y in zip(xs, ys)] @@ -4919,6 +4921,7 @@ def test_quiet_client_close(loop): @pytest.mark.slow +@pytest.mark.parametrize("loop", [{"instances": False}], indirect=True) def test_quiet_client_close_when_cluster_is_closed_before_client(loop): with captured_logger(logging.getLogger("tornado.application")) as logger: cluster = LocalCluster(loop=loop, n_workers=1, dashboard_address=":0") diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 540e0abb4e9..a5c3d525247 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -55,7 +55,7 @@ def blocked_inc(x, event): return x + 1 -@gen_cluster(client=True) +@gen_cluster(client=True, clean_kwargs={"instances": False}) async def test_cluster_dump_state(c, s, a, b, tmp_path): filename = tmp_path / "dump" futs = c.map(inc, range(2)) diff --git a/distributed/tests/test_publish.py b/distributed/tests/test_publish.py index 85d3fc5ff4d..8b8954ac9c1 100644 --- a/distributed/tests/test_publish.py +++ b/distributed/tests/test_publish.py @@ -11,7 +11,7 @@ from distributed.utils_test import gen_cluster, inc -@gen_cluster() +@gen_cluster(clean_kwargs={"instances": False}) async def test_publish_simple(s, a, b): c = Client(s.address, asynchronous=True) f = Client(s.address, asynchronous=True) @@ -259,7 +259,7 @@ async def test_datasets_async(c, s, a, b): len(c.datasets) -@gen_cluster(client=True) +@gen_cluster(client=True, clean_kwargs={"instances": False}) async def test_pickle_safe(c, s, a, b): async with Client(s.address, asynchronous=True, serializers=["msgpack"]) as c2: await c2.publish_dataset(x=[1, 2, 3]) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 784448c588c..4a3e4dbc8d6 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2180,7 +2180,11 @@ async def test_gather_no_workers(c, s, a, b): @pytest.mark.slow @pytest.mark.parametrize("reschedule_different_worker", [True, False]) @pytest.mark.parametrize("swap_data_insert_order", [True, False]) -@gen_cluster(client=True, client_kwargs={"direct_to_workers": False}) +@gen_cluster( + client=True, + client_kwargs={"direct_to_workers": False}, + clean_kwargs={"instances": False}, +) async def test_gather_allow_worker_reconnect( c, s, a, b, reschedule_different_worker, swap_data_insert_order ): @@ -2806,7 +2810,12 @@ async def test_rebalance_managed_memory(c, s, a, b): assert len(b.data) == 50 -@gen_cluster(nthreads=[("", 1)] * 3, client=True, config=REBALANCE_MANAGED_CONFIG) +@gen_cluster( + nthreads=[("", 1)] * 3, + client=True, + config=REBALANCE_MANAGED_CONFIG, + clean_kwargs={"instances": False}, +) async def test_rebalance_workers_and_keys(client, s, a, b, c): futures = await client.scatter(range(100), workers=[a.address]) assert (len(a.data), len(b.data), len(c.data)) == (100, 0, 0) @@ -3250,7 +3259,7 @@ async def test_worker_heartbeat_after_cancel(c, s, *workers): await asyncio.gather(*(w.heartbeat() for w in workers)) -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False}) async def test_worker_reconnect_task_memory(c, s, a): a.periodic_callbacks["heartbeat"].stop() @@ -3270,7 +3279,7 @@ async def test_worker_reconnect_task_memory(c, s, a): } -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster(client=True, nthreads=[("", 1)], clean_kwargs={"instances": False}) async def test_worker_reconnect_task_memory_with_resources(c, s, a): async with Worker(s.address, resources={"A": 1}) as b: while s.workers[b.address].status != Status.running: diff --git a/distributed/tests/test_semaphore.py b/distributed/tests/test_semaphore.py index 50ad43dfce8..004d2f30a32 100644 --- a/distributed/tests/test_semaphore.py +++ b/distributed/tests/test_semaphore.py @@ -189,7 +189,7 @@ def f(x, release=True): @pytest.mark.slow -@gen_cluster(client=True, timeout=120) +@gen_cluster(client=True, timeout=120, clean_kwargs={"instances": False}) async def test_close_async(c, s, a, b): sem = await Semaphore(name="test") diff --git a/distributed/tests/test_stories.py b/distributed/tests/test_stories.py index 1616422f252..0571daba42b 100644 --- a/distributed/tests/test_stories.py +++ b/distributed/tests/test_stories.py @@ -104,7 +104,7 @@ async def get_story(self, *args, **kw): raise CommClosedError -@gen_cluster(client=True, Worker=WorkerBrokenStory) +@gen_cluster(client=True, Worker=WorkerBrokenStory, clean_kwargs={"instances": False}) @pytest.mark.parametrize("on_error", ["ignore", "raise"]) async def test_client_story_failed_worker(c, s, a, b, on_error): f = c.submit(inc, 1) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index ec76d45740a..c69ecda7a80 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -256,7 +256,11 @@ def test_secede_without_stealing_issue_1262(): """ # run the loop as an inner function so all workers are closed # and exceptions can be examined - @gen_cluster(client=True, scheduler_kwargs={"extensions": {}}) + @gen_cluster( + client=True, + scheduler_kwargs={"extensions": {}}, + clean_kwargs={"instances": False}, + ) async def secede_test(c, s, a, b): def func(x): with worker_client() as wc: diff --git a/distributed/utils.py b/distributed/utils.py index 475ea836aa5..d3f2793ac19 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1152,7 +1152,7 @@ def clear(self): self.deque.clear() @classmethod - def clear_all_instances(cls): + def clear_instances(cls): """ Clear the internal storage of all live DequeHandlers. """ diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 044bd662ac0..40b8e1ff4a3 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -28,7 +28,9 @@ from typing import Any, Generator, Literal from distributed.compatibility import MACOS +from distributed.profile import wait_profiler from distributed.scheduler import Scheduler +from distributed.utils import has_keyword try: import ssl @@ -138,9 +140,11 @@ async def cleanup_global_workers(): await worker.close(report=False, executor_wait=False) -@pytest.fixture -def loop(): - with check_instances(): +@pytest.fixture(params=[{"instances": True}]) +def loop(request): + instances = request.param["instances"] + + with check_instances() if instances else nullcontext(): with pristine_loop() as loop: # Monkey-patch IOLoop.start to wait for loop stop orig_start = loop.start @@ -1768,6 +1772,8 @@ def check_instances(): _global_clients.clear() + Scheduler.clear_instances() + for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop w.loop.add_callback(w.close, report=False, executor_wait=False) @@ -1796,22 +1802,39 @@ def check_instances(): for n in Nanny._instances ), {n: n.status for n in Nanny._instances} - # assert not list(SpecCluster._instances) # TODO - assert all(c.status == Status.closed for c in SpecCluster._instances), list( - SpecCluster._instances - ) - SpecCluster._instances.clear() + SpecCluster.clear_instances() + DequeHandler.clear_instances() + + from _pytest.logging import LogCaptureHandler + + for v in logging.Logger.manager.loggerDict.values(): + if not isinstance(v, logging.PlaceHolder): + for h in v.handlers: + if isinstance(h, LogCaptureHandler): + h.reset() + has_keyword.cache_clear() + + wait_profiler() + gc.collect() + + if Scheduler._instances: + s = next(iter(Scheduler._instances)) + import objgraph + + objgraph.show_backrefs([s], filename="scheduler.png") + assert not Scheduler._instances + + SpecCluster._instances.clear() Nanny._instances.clear() - DequeHandler.clear_all_instances() @contextmanager def clean(threads=True, instances=True, timeout=1, processes=True): with check_thread_leak() if threads else nullcontext(): - with pristine_loop() as loop: - with check_process_leak(check=processes): - with check_instances() if instances else nullcontext(): + with check_instances() if instances else nullcontext(): + with pristine_loop() as loop: + with check_process_leak(check=processes): with check_active_rpc(loop, timeout): reset_config()