From b5083acb8fd7b92361e47556a5e12d27d248b23b Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 15 Jun 2021 18:46:20 +0200 Subject: [PATCH 1/3] Ensure test_scheduler does not leak open sockets --- distributed/tests/test_scheduler.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f1aeef606d2..71ef3b250bb 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -313,10 +313,15 @@ def func(scheduler): await comm.close() -def test_scheduler_init_pulls_blocked_handlers_from_config(): +@pytest.mark.asyncio +async def test_scheduler_init_pulls_blocked_handlers_from_config(): + # This test is async to allow us to properly close the scheduler since the + # init is already opening sockets for the HTTP server See also + # https://github.com/dask/distributed/issues/4806 with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): s = Scheduler() assert s.blocked_handlers == ["test-handler"] + await s.close() @gen_cluster() @@ -1396,7 +1401,11 @@ async def test_get_task_status(c, s, a, b): assert result == {future.key: "memory"} -def test_deque_handler(): +@pytest.mark.asyncio +async def test_deque_handler(): + # This test is async to allow us to properly close the scheduler since the + # init is already opening sockets for the HTTP server See also + # https://github.com/dask/distributed/issues/4806 from distributed.scheduler import logger s = Scheduler() @@ -1406,6 +1415,7 @@ def test_deque_handler(): msg = deque_handler.deque[-1] assert "distributed.scheduler" in deque_handler.format(msg) assert any(msg.msg == "foo123" for msg in deque_handler.deque) + await s.close() @gen_cluster(client=True) From e129015820a6db517141257f8cd68f24f7157fb3 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 17 Jun 2021 12:16:17 +0200 Subject: [PATCH 2/3] Ensure HTTP server is closed if exception during spec cluster startup --- distributed/deploy/spec.py | 14 ++++++++++---- distributed/deploy/tests/test_local.py | 6 +++--- distributed/deploy/tests/test_spec_cluster.py | 4 ---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 070d6d0624d..21f7bad293d 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -399,10 +399,16 @@ async def _(): if self.status == Status.created: await self._start() await self.scheduler - await self._correct_state() - if self.workers: - await asyncio.wait(list(self.workers.values())) # maybe there are more - return self + try: + await self._correct_state() + if self.workers: + await asyncio.wait( + list(self.workers.values()) + ) # maybe there are more + return self + except Exception: + await self.scheduler.close() + raise return _().__await__() diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index ee1b12913dc..076aca3caae 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1070,12 +1070,12 @@ async def test_local_cluster_redundant_kwarg(nanny): # Extra arguments are forwarded to the worker class. Depending on # whether we use the nanny or not, the error treatment is quite # different and we should assert that an exception is raised - async with await LocalCluster( - typo_kwarg="foo", processes=nanny, n_workers=1 + async with LocalCluster( + typo_kwarg="foo", processes=nanny, n_workers=1, asynchronous=True ) as cluster: # This will never work but is a reliable way to block without hard # coding any sleep values - async with Client(cluster) as c: + async with Client(cluster, asynchronous=True) as c: f = c.submit(sleep, 0) await f diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index ca96104de3b..a9a45c416d6 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -9,7 +9,6 @@ import dask from dask.distributed import Client, Nanny, Scheduler, SpecCluster, Worker -from distributed.compatibility import WINDOWS from distributed.core import Status from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec from distributed.metrics import time @@ -218,7 +217,6 @@ async def test_restart(cleanup): assert time() < start + 60 -@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") @pytest.mark.asyncio async def test_broken_worker(): with pytest.raises(Exception) as info: @@ -232,8 +230,6 @@ async def test_broken_worker(): assert "Broken" in str(info.value) -@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") -@pytest.mark.slow def test_spec_close_clusters(loop): workers = {0: {"cls": Worker}} scheduler = {"cls": Scheduler, "options": {"port": 0}} From 741da97c51f13a9f141981658cd25055fc69b1b2 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 17 Jun 2021 12:54:29 +0200 Subject: [PATCH 3/3] Close scheduler in test_io_loop --- distributed/tests/test_scheduler.py | 1 + setup.cfg | 3 +++ 2 files changed, 4 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 71ef3b250bb..44141c8d624 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -678,6 +678,7 @@ async def test_update_graph_culls(s, a, b): def test_io_loop(loop): s = Scheduler(loop=loop, validate=True) assert s.io_loop is loop + loop.run_sync(s.close) @gen_cluster(client=True) diff --git a/setup.cfg b/setup.cfg index 136ae4c363f..4e4edaa2f75 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,6 +45,9 @@ parentdir_prefix = distributed- addopts = -v -rsxfE --durations=20 filterwarnings = error:Since distributed.*:PendingDeprecationWarning + + # See https://github.com/dask/distributed/issues/4806 + error:Port:UserWarning:distributed.node minversion = 4 markers = slow: marks tests as slow (deselect with '-m "not slow"')