From 92625248b787eeea9cb38255269097b83bebf067 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 25 Mar 2022 05:38:25 -0500 Subject: [PATCH 1/5] Make test_reconnect async This was flakey due to cleaning up resources. My experience is that making things async helps with this in general. I don't have strong confidence that this will fix the issue, but I do have mild confidence, and strong confidence that it won't hurt. --- distributed/tests/test_client.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f9558d2dd3f..33f1e7ccf8e 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3701,9 +3701,11 @@ async def test_scatter_raises_if_no_workers(c, s): @pytest.mark.slow -def test_reconnect(loop): - w = Worker("127.0.0.1", 9393, loop=loop) - loop.add_callback(w.start) +@gen_test() +async def test_reconnect(): + futures = [] + w = Worker("127.0.0.1", 9393) + futures.append(asyncio.ensure_future(w.start())) scheduler_cli = [ "dask-scheduler", @@ -3714,46 +3716,46 @@ def test_reconnect(loop): "--no-dashboard", ] with popen(scheduler_cli): - c = Client("127.0.0.1:9393", loop=loop) - c.wait_for_workers(1, timeout=10) + c = await Client("127.0.0.1:9393", asynchronous=True) + await c.wait_for_workers(1, timeout=10) x = c.submit(inc, 1) - assert x.result(timeout=10) == 2 + assert (await x) == 2 start = time() while c.status != "connecting": assert time() < start + 10 - sleep(0.01) + await asyncio.sleep(0.01) assert x.status == "cancelled" with pytest.raises(CancelledError): - x.result(timeout=10) + await x with popen(scheduler_cli): start = time() while c.status != "running": - sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 10 start = time() - while len(c.nthreads()) != 1: - sleep(0.05) + while len(await c.nthreads()) != 1: + await asyncio.sleep(0.05) assert time() < start + 10 x = c.submit(inc, 1) - assert x.result(timeout=10) == 2 + assert (await x) == 2 start = time() while True: assert time() < start + 10 try: - x.result(timeout=10) + await x assert False except CommClosedError: continue except CancelledError: break - sync(loop, w.close, timeout=1) - c.close() + await w.close() + await c.close() class UnhandledException(Exception): From ec59e9681d037f72560334213abbf8c9f09f9d36 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 25 Mar 2022 06:31:31 -0500 Subject: [PATCH 2/5] extend timeout --- distributed/tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 33f1e7ccf8e..6873fc06220 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3701,7 +3701,7 @@ async def test_scatter_raises_if_no_workers(c, s): @pytest.mark.slow -@gen_test() +@gen_test(timeout=60) async def test_reconnect(): futures = [] w = Worker("127.0.0.1", 9393) From 6fa50edf3f61a184d332f4c40884e76bbdcc256a Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 30 Mar 2022 10:41:39 -0500 Subject: [PATCH 3/5] speed up worker and client close --- distributed/tests/test_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6873fc06220..fc22ee98326 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3754,8 +3754,8 @@ async def test_reconnect(): except CancelledError: break - await w.close() - await c.close() + await w.close(report=False) + await c._close(fast=True) class UnhandledException(Exception): From d08dfaeec8fa2702a8dcf745f8c5d3ad83a25883 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 30 Mar 2022 13:00:57 -0500 Subject: [PATCH 4/5] Go full async --- distributed/tests/test_client.py | 65 ++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index fc22ee98326..3f541bb51ba 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -66,7 +66,7 @@ from distributed.cluster_dump import load_cluster_dump from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS -from distributed.core import Status +from distributed.core import Server, Status from distributed.metrics import time from distributed.objects import HasWhat, WhoHas from distributed.scheduler import ( @@ -94,7 +94,6 @@ inc, map_varying, nodebug, - popen, pristine_loop, randominc, save_sys_modules, @@ -3700,26 +3699,33 @@ async def test_scatter_raises_if_no_workers(c, s): await c.scatter(1, timeout=0.5) -@pytest.mark.slow -@gen_test(timeout=60) +@gen_test() async def test_reconnect(): + async def hard_stop(s): + for pc in s.periodic_callbacks.values(): + pc.stop() + + s.stop_services() + for comm in list(s.stream_comms.values()): + comm.abort() + for comm in list(s.client_comms.values()): + comm.abort() + + await s.rpc.close() + s.stop() + await Server.close(s) + + port = 9393 futures = [] - w = Worker("127.0.0.1", 9393) + w = Worker(f"127.0.0.1:{port}") futures.append(asyncio.ensure_future(w.start())) - scheduler_cli = [ - "dask-scheduler", - "--host", - "127.0.0.1", - "--port", - "9393", - "--no-dashboard", - ] - with popen(scheduler_cli): - c = await Client("127.0.0.1:9393", asynchronous=True) - await c.wait_for_workers(1, timeout=10) - x = c.submit(inc, 1) - assert (await x) == 2 + s = await Scheduler(port=port) + c = await Client(f"127.0.0.1:{port}", asynchronous=True) + await c.wait_for_workers(1, timeout=10) + x = c.submit(inc, 1) + assert (await x) == 2 + await hard_stop(s) start = time() while c.status != "connecting": @@ -3730,18 +3736,19 @@ async def test_reconnect(): with pytest.raises(CancelledError): await x - with popen(scheduler_cli): - start = time() - while c.status != "running": - await asyncio.sleep(0.1) - assert time() < start + 10 - start = time() - while len(await c.nthreads()) != 1: - await asyncio.sleep(0.05) - assert time() < start + 10 + s = await Scheduler(port=port) + start = time() + while c.status != "running": + await asyncio.sleep(0.1) + assert time() < start + 10 + start = time() + while len(await c.nthreads()) != 1: + await asyncio.sleep(0.05) + assert time() < start + 10 - x = c.submit(inc, 1) - assert (await x) == 2 + x = c.submit(inc, 1) + assert (await x) == 2 + await hard_stop(s) start = time() while True: From aa8c7d949c5a38942904a3d24446c52d35e3d6d3 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 30 Mar 2022 14:38:04 -0500 Subject: [PATCH 5/5] add gc call in test_cleanup_repeated_tasks --- distributed/tests/test_steal.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e6469eae8b4..e16269d92f8 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1,5 +1,6 @@ import asyncio import contextlib +import gc import itertools import logging import random @@ -945,6 +946,7 @@ class Foo: assert not s.who_has assert not any(s.has_what.values()) + gc.collect() assert not list(ws)