diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 070d6d0624d..21f7bad293d 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -399,10 +399,16 @@ async def _(): if self.status == Status.created: await self._start() await self.scheduler - await self._correct_state() - if self.workers: - await asyncio.wait(list(self.workers.values())) # maybe there are more - return self + try: + await self._correct_state() + if self.workers: + await asyncio.wait( + list(self.workers.values()) + ) # maybe there are more + return self + except Exception: + await self.scheduler.close() + raise return _().__await__() diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index ee1b12913dc..076aca3caae 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1070,12 +1070,12 @@ async def test_local_cluster_redundant_kwarg(nanny): # Extra arguments are forwarded to the worker class. Depending on # whether we use the nanny or not, the error treatment is quite # different and we should assert that an exception is raised - async with await LocalCluster( - typo_kwarg="foo", processes=nanny, n_workers=1 + async with LocalCluster( + typo_kwarg="foo", processes=nanny, n_workers=1, asynchronous=True ) as cluster: # This will never work but is a reliable way to block without hard # coding any sleep values - async with Client(cluster) as c: + async with Client(cluster, asynchronous=True) as c: f = c.submit(sleep, 0) await f diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index ca96104de3b..a9a45c416d6 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -9,7 +9,6 @@ import dask from dask.distributed import Client, Nanny, Scheduler, SpecCluster, Worker -from distributed.compatibility import WINDOWS from distributed.core import Status from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec from distributed.metrics import time @@ -218,7 +217,6 @@ async def test_restart(cleanup): assert time() < start + 60 -@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") @pytest.mark.asyncio async def test_broken_worker(): with pytest.raises(Exception) as info: @@ -232,8 +230,6 @@ async def test_broken_worker(): assert "Broken" in str(info.value) -@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") -@pytest.mark.slow def test_spec_close_clusters(loop): workers = {0: {"cls": Worker}} scheduler = {"cls": Scheduler, "options": {"port": 0}} diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f1aeef606d2..44141c8d624 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -313,10 +313,15 @@ def func(scheduler): await comm.close() -def test_scheduler_init_pulls_blocked_handlers_from_config(): +@pytest.mark.asyncio +async def test_scheduler_init_pulls_blocked_handlers_from_config(): + # This test is async to allow us to properly close the scheduler since the + # init is already opening sockets for the HTTP server See also + # https://github.com/dask/distributed/issues/4806 with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): s = Scheduler() assert s.blocked_handlers == ["test-handler"] + await s.close() @gen_cluster() @@ -673,6 +678,7 @@ async def test_update_graph_culls(s, a, b): def test_io_loop(loop): s = Scheduler(loop=loop, validate=True) assert s.io_loop is loop + loop.run_sync(s.close) @gen_cluster(client=True) @@ -1396,7 +1402,11 @@ async def test_get_task_status(c, s, a, b): assert result == {future.key: "memory"} -def test_deque_handler(): +@pytest.mark.asyncio +async def test_deque_handler(): + # This test is async to allow us to properly close the scheduler since the + # init is already opening sockets for the HTTP server See also + # https://github.com/dask/distributed/issues/4806 from distributed.scheduler import logger s = Scheduler() @@ -1406,6 +1416,7 @@ def test_deque_handler(): msg = deque_handler.deque[-1] assert "distributed.scheduler" in deque_handler.format(msg) assert any(msg.msg == "foo123" for msg in deque_handler.deque) + await s.close() @gen_cluster(client=True) diff --git a/setup.cfg b/setup.cfg index 136ae4c363f..4e4edaa2f75 100644 --- a/setup.cfg +++ b/setup.cfg @@ -45,6 +45,9 @@ parentdir_prefix = distributed- addopts = -v -rsxfE --durations=20 filterwarnings = error:Since distributed.*:PendingDeprecationWarning + + # See https://github.com/dask/distributed/issues/4806 + error:Port:UserWarning:distributed.node minversion = 4 markers = slow: marks tests as slow (deselect with '-m "not slow"')