Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from distributed.cluster_dump import load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Status
from distributed.core import Server, Status
from distributed.metrics import time
from distributed.objects import HasWhat, WhoHas
from distributed.scheduler import (
Expand Down Expand Up @@ -94,7 +94,6 @@
inc,
map_varying,
nodebug,
popen,
pristine_loop,
randominc,
save_sys_modules,
Expand Down Expand Up @@ -3700,60 +3699,70 @@ async def test_scatter_raises_if_no_workers(c, s):
await c.scatter(1, timeout=0.5)


@pytest.mark.slow
def test_reconnect(loop):
w = Worker("127.0.0.1", 9393, loop=loop)
loop.add_callback(w.start)

scheduler_cli = [
"dask-scheduler",
"--host",
"127.0.0.1",
"--port",
"9393",
"--no-dashboard",
]
with popen(scheduler_cli):
c = Client("127.0.0.1:9393", loop=loop)
c.wait_for_workers(1, timeout=10)
x = c.submit(inc, 1)
assert x.result(timeout=10) == 2
@gen_test()
async def test_reconnect():
async def hard_stop(s):
for pc in s.periodic_callbacks.values():
pc.stop()

s.stop_services()
for comm in list(s.stream_comms.values()):
comm.abort()
for comm in list(s.client_comms.values()):
comm.abort()

await s.rpc.close()
s.stop()
await Server.close(s)

port = 9393
futures = []
w = Worker(f"127.0.0.1:{port}")
futures.append(asyncio.ensure_future(w.start()))

s = await Scheduler(port=port)
c = await Client(f"127.0.0.1:{port}", asynchronous=True)
await c.wait_for_workers(1, timeout=10)
x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s)

start = time()
while c.status != "connecting":
assert time() < start + 10
sleep(0.01)
await asyncio.sleep(0.01)

assert x.status == "cancelled"
with pytest.raises(CancelledError):
x.result(timeout=10)
await x

with popen(scheduler_cli):
start = time()
while c.status != "running":
sleep(0.1)
assert time() < start + 10
start = time()
while len(c.nthreads()) != 1:
sleep(0.05)
assert time() < start + 10
s = await Scheduler(port=port)
start = time()
while c.status != "running":
await asyncio.sleep(0.1)
assert time() < start + 10
start = time()
while len(await c.nthreads()) != 1:
await asyncio.sleep(0.05)
assert time() < start + 10

x = c.submit(inc, 1)
assert x.result(timeout=10) == 2
x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s)

start = time()
while True:
assert time() < start + 10
try:
x.result(timeout=10)
await x
assert False
except CommClosedError:
continue
except CancelledError:
break

sync(loop, w.close, timeout=1)
c.close()
await w.close(report=False)
await c._close(fast=True)


class UnhandledException(Exception):
Expand Down
2 changes: 2 additions & 0 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import contextlib
import gc
import itertools
import logging
import random
Expand Down Expand Up @@ -945,6 +946,7 @@ class Foo:
assert not s.who_has
assert not any(s.has_what.values())

gc.collect()
assert not list(ws)


Expand Down