Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,16 @@ async def _():
if self.status == Status.created:
await self._start()
await self.scheduler
await self._correct_state()
if self.workers:
await asyncio.wait(list(self.workers.values())) # maybe there are more
return self
try:
await self._correct_state()
if self.workers:
await asyncio.wait(
list(self.workers.values())
) # maybe there are more
return self
except Exception:
await self.scheduler.close()
raise
Comment on lines +402 to +411
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new and was also a cause for this warning message. If an exception during worker startup would fail, the scheduler was not cleaned up properly. This should've been implemented regardless of the warning, I believe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


return _().__await__()

Expand Down
6 changes: 3 additions & 3 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,12 +1070,12 @@ async def test_local_cluster_redundant_kwarg(nanny):
# Extra arguments are forwarded to the worker class. Depending on
# whether we use the nanny or not, the error treatment is quite
# different and we should assert that an exception is raised
async with await LocalCluster(
typo_kwarg="foo", processes=nanny, n_workers=1
async with LocalCluster(
typo_kwarg="foo", processes=nanny, n_workers=1, asynchronous=True
) as cluster:

# This will never work but is a reliable way to block without hard
# coding any sleep values
async with Client(cluster) as c:
async with Client(cluster, asynchronous=True) as c:
f = c.submit(sleep, 0)
await f
4 changes: 0 additions & 4 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import dask
from dask.distributed import Client, Nanny, Scheduler, SpecCluster, Worker

from distributed.compatibility import WINDOWS
from distributed.core import Status
from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec
from distributed.metrics import time
Expand Down Expand Up @@ -218,7 +217,6 @@ async def test_restart(cleanup):
assert time() < start + 60


@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out")
@pytest.mark.asyncio
async def test_broken_worker():
with pytest.raises(Exception) as info:
Expand All @@ -232,8 +230,6 @@ async def test_broken_worker():
assert "Broken" in str(info.value)


@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out")
@pytest.mark.slow
def test_spec_close_clusters(loop):
workers = {0: {"cls": Worker}}
scheduler = {"cls": Scheduler, "options": {"port": 0}}
Expand Down
15 changes: 13 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,15 @@ def func(scheduler):
await comm.close()


def test_scheduler_init_pulls_blocked_handlers_from_config():
@pytest.mark.asyncio
async def test_scheduler_init_pulls_blocked_handlers_from_config():
# This test is async to allow us to properly close the scheduler since the
# init is already opening sockets for the HTTP server See also
# https://github.com/dask/distributed/issues/4806
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}):
s = Scheduler()
assert s.blocked_handlers == ["test-handler"]
await s.close()
Comment on lines 321 to +324
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}):
s = Scheduler()
assert s.blocked_handlers == ["test-handler"]
await s.close()
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}):
async with Scheduler() as s:
assert s.blocked_handlers == ["test-handler"]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the comment earlier in this test, it looks like we want to check that Scheduler.__init__ handles blocking handlers. Using async with Scheduler() as s: will also call start which makes it difficult to isolate Scheduler.__init__ behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK



@gen_cluster()
Expand Down Expand Up @@ -673,6 +678,7 @@ async def test_update_graph_culls(s, a, b):
def test_io_loop(loop):
s = Scheduler(loop=loop, validate=True)
assert s.io_loop is loop
loop.run_sync(s.close)


@gen_cluster(client=True)
Expand Down Expand Up @@ -1396,7 +1402,11 @@ async def test_get_task_status(c, s, a, b):
assert result == {future.key: "memory"}


def test_deque_handler():
@pytest.mark.asyncio
async def test_deque_handler():
# This test is async to allow us to properly close the scheduler since the
# init is already opening sockets for the HTTP server See also
# https://github.com/dask/distributed/issues/4806
from distributed.scheduler import logger

s = Scheduler()
Expand All @@ -1406,6 +1416,7 @@ def test_deque_handler():
msg = deque_handler.deque[-1]
assert "distributed.scheduler" in deque_handler.format(msg)
assert any(msg.msg == "foo123" for msg in deque_handler.deque)
await s.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably use async with Scheduler() as s here?



@gen_cluster(client=True)
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ parentdir_prefix = distributed-
addopts = -v -rsxfE --durations=20
filterwarnings =
error:Since distributed.*:PendingDeprecationWarning

# See https://github.com/dask/distributed/issues/4806
error:Port:UserWarning:distributed.node
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would raise and fail the tests whenever the warning is raised during CI. AFAIU, we should not hit this condition accidentally if we properly close all servers properly.

minversion = 4
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
Expand Down