From bd61588f690603c63b792085efe3bb6059adfaec Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 27 Jun 2022 19:41:42 +0200 Subject: [PATCH 01/28] Ensure client.restart waits for workers to leave --- distributed/scheduler.py | 10 ++++++++-- distributed/tests/test_client.py | 10 ++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9eadaf51288..3dadb3039d7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5083,11 +5083,15 @@ def clear_task_state(self): for collection in self._task_state_collections: collection.clear() + def _get_worker_ids(self) -> set[str]: + return set({ws.server_id for ws in self.workers.values()}) + @log_errors async def restart(self, client=None, timeout=30): """Restart all workers. Reset local state.""" stimulus_id = f"restart-{time()}" - n_workers = len(self.workers) + initial_workers = self._get_worker_ids() + n_workers = len(initial_workers) logger.info("Send lost future signal to clients") for cs in self.clients.values(): @@ -5161,7 +5165,9 @@ async def restart(self, client=None, timeout=30): self.log_event([client, "all"], {"action": "restart", "client": client}) start = time() - while time() < start + 10 and len(self.workers) < n_workers: + while time() < start + 10 and ( + len(self.workers) < n_workers or initial_workers & self._get_worker_ids() + ): await asyncio.sleep(0.01) self.report({"op": "restart"}) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 42f019bb983..6907a703b55 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3493,6 +3493,16 @@ async def test_Client_clears_references_after_restart(c, s, a, b): assert key not in c.refcount +@pytest.mark.slow +@gen_cluster(Worker=Nanny, client=True, nthreads=[("", 1)] * 5) +async def test_restart_waits_for_new_workers(c, s, *workers): + initial_workers = set(s.workers) + await c.restart() + assert len(s.workers) == len(initial_workers) + for w in workers: + assert w.address not in s.workers + + @gen_cluster(Worker=Nanny, client=True) async def test_restart_timeout_is_logged(c, s, a, b): with captured_logger(logging.getLogger("distributed.client")) as logger: From 4f4c5dc64310405547d49c737a19036e4f66af24 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 11 Jul 2022 14:37:56 -0400 Subject: [PATCH 02/28] Expect all workers to come back; timeout otherwise Non-nanny workers no longer go gentle into that good night. This breaks `test_restart_some_nannies_some_not` since it re-orders when plugins run, and causes a TimeoutError there. That test can be simplified a lot. Also, because the client doesn't call restart on the scheduler as an RPC, but rather a strange call-response pattern, errors from the scheduler aren't resurfaced to the client. If `restart` fails quickly on the scheduler, then the cilent will hang until its internal timeout passes as well (2x the defined timeout). This is all a bit silly and should just switch to an RPC. --- distributed/scheduler.py | 89 ++++++++++++++++------------- distributed/tests/test_client.py | 10 ---- distributed/tests/test_scheduler.py | 80 ++++++++++++++------------ 3 files changed, 90 insertions(+), 89 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3dadb3039d7..aade048714a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5083,15 +5083,26 @@ def clear_task_state(self): for collection in self._task_state_collections: collection.clear() - def _get_worker_ids(self) -> set[str]: - return set({ws.server_id for ws in self.workers.values()}) - @log_errors async def restart(self, client=None, timeout=30): - """Restart all workers. Reset local state.""" + """ + Restart all workers. Reset local state. + + Workers without nannies are shut down (assuming an external deployment system + may restart them). Therefore, if not using nannies and your deployment system + does not automatically restart workers, ``restart`` will just shut down all + workers! + + Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. + """ stimulus_id = f"restart-{time()}" - initial_workers = self._get_worker_ids() - n_workers = len(initial_workers) + n_workers = len(self.workers) + + for plugin in list(self.plugins.values()): + try: + plugin.restart(self) + except Exception as e: + logger.exception(e) logger.info("Send lost future signal to clients") for cs in self.clients.values(): @@ -5101,6 +5112,10 @@ async def restart(self, client=None, timeout=30): stimulus_id=stimulus_id, ) + self.clear_task_state() + + start = time() + nanny_workers = { addr: ws.nanny for addr, ws in self.workers.items() if ws.nanny } @@ -5115,13 +5130,11 @@ async def restart(self, client=None, timeout=30): ) ) - self.clear_task_state() - - for plugin in list(self.plugins.values()): - try: - plugin.restart(self) - except Exception as e: - logger.exception(e) + if time() - start > timeout: + raise TimeoutError( + f"Removing non-nanny workers took >{timeout}s. " + "Consider setting a longer `timeout=` in `restart`." + ) logger.debug("Send kill signal to nannies: %s", nanny_workers) async with contextlib.AsyncExitStack() as stack: @@ -5132,43 +5145,37 @@ async def restart(self, client=None, timeout=30): for nanny_address in nanny_workers.values() ] - try: - resps = await asyncio.wait_for( - asyncio.gather( - *( - nanny.restart(close=True, timeout=timeout * 0.8) - for nanny in nannies - ) - ), - timeout, - ) - # NOTE: the `WorkerState` entries for these workers will be removed - # naturally when they disconnect from the scheduler. - except TimeoutError: - logger.error( - "Nannies didn't report back restarted within " - "timeout. Continuing with restart process" - ) - else: - if not all(resp == "OK" for resp in resps): - logger.error( - "Not all workers responded positively: %s", - resps, - exc_info=True, + resps = await asyncio.wait_for( + asyncio.gather( + *( + nanny.restart(close=True, timeout=timeout * 0.8) + for nanny in nannies ) + ), + timeout, + ) + # NOTE: the `WorkerState` entries for these workers will be removed + # naturally when they disconnect from the scheduler. - self.clear_task_state() + if n_failed := sum(resp != "OK" for resp in resps): + raise TimeoutError( + f"{n_failed} worker(s) did not restart within {timeout}s" + ) with suppress(AttributeError): for c in self._worker_coroutines: c.cancel() self.log_event([client, "all"], {"action": "restart", "client": client}) - start = time() - while time() < start + 10 and ( - len(self.workers) < n_workers or initial_workers & self._get_worker_ids() - ): + while time() < start + timeout: + if len(self.workers) >= n_workers: + break await asyncio.sleep(0.01) + else: + raise TimeoutError( + f"Waited for {n_workers} worker(s) to reconnect after restarting, " + f"but after {timeout}s, only {len(self.workers)} have returned." + ) self.report({"op": "restart"}) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6907a703b55..42f019bb983 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3493,16 +3493,6 @@ async def test_Client_clears_references_after_restart(c, s, a, b): assert key not in c.refcount -@pytest.mark.slow -@gen_cluster(Worker=Nanny, client=True, nthreads=[("", 1)] * 5) -async def test_restart_waits_for_new_workers(c, s, *workers): - initial_workers = set(s.workers) - await c.restart() - assert len(s.workers) == len(initial_workers) - for w in workers: - assert w.address not in s.workers - - @gen_cluster(Worker=Nanny, client=True) async def test_restart_timeout_is_logged(c, s, a, b): with captured_logger(logging.getLogger("distributed.client")) as logger: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 34f3a610175..09a4b74b72e 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -629,38 +629,54 @@ async def test_restart(c, s, a, b): assert not s.tasks -@gen_cluster(client=True, Worker=Nanny, timeout=60) +@pytest.mark.slow +@gen_cluster(Worker=Nanny, nthreads=[("", 1)] * 5) +async def test_restart_waits_for_new_workers(s, *workers): + n_initial_workers = len(s.workers) + await s.restart() + assert len(s.workers) == n_initial_workers + for w in workers: + assert w.address not in s.workers + + +class SlowRestartNanny(Nanny): + def __init__(self, *args, **kwargs): + self.restart_proceed = asyncio.Event() + self.restart_called = asyncio.Event() + super().__init__(*args, **kwargs) + + async def restart(self, *, timeout): + self.restart_called.set() + try: + await asyncio.wait_for(self.restart_proceed.wait(), timeout) + except TimeoutError: + return "timed out" + return await super().restart(timeout=timeout) + + +@gen_cluster(Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) +async def test_restart_nanny_timeout_exceeded(s, a, b): + with pytest.raises(TimeoutError, match=r"2 worker\(s\) did not restart within 1s"): + await s.restart(timeout=1) + assert a.restart_called.is_set() + assert b.restart_called.is_set() + + +@gen_cluster(nthreads=[("", 1)] * 2) +async def test_restart_not_all_workers_return(s, a, b): + with pytest.raises(TimeoutError, match=r"after 1s, only 0 have returned"): + await s.restart(timeout=1) + + +@gen_cluster(client=True, Worker=Nanny) async def test_restart_some_nannies_some_not(c, s, a, b): original_procs = {a.process.process, b.process.process} original_workers = dict(s.workers) async with Worker(s.address, nthreads=1) as w: await c.wait_for_workers(3) - # Halfway through `Scheduler.restart`, only the non-Nanny workers should be removed. - # Nanny-based workers should be kept around so we can call their `restart` RPC. - class ValidateRestartPlugin(SchedulerPlugin): - error: Exception | None - - def restart(self, scheduler: Scheduler) -> None: - try: - assert scheduler.workers.keys() == { - a.worker_address, - b.worker_address, - } - assert all(ws.nanny for ws in scheduler.workers.values()) - except Exception as e: - # `Scheduler.restart` swallows exceptions within plugins - self.error = e - raise - else: - self.error = None - - plugin = ValidateRestartPlugin() - s.add_plugin(plugin) - await s.restart() - - if plugin.error: - raise plugin.error + with pytest.raises(TimeoutError, match="after 5s, only 2 have returned"): + await s.restart(timeout=5) assert w.status == Status.closed @@ -675,18 +691,6 @@ def restart(self, scheduler: Scheduler) -> None: assert set(s.workers.values()).isdisjoint(original_workers.values()) -class SlowRestartNanny(Nanny): - def __init__(self, *args, **kwargs): - self.restart_proceed = asyncio.Event() - self.restart_called = asyncio.Event() - super().__init__(*args, **kwargs) - - async def restart(self, **kwargs): - self.restart_called.set() - await self.restart_proceed.wait() - return await super().restart(**kwargs) - - @gen_cluster( client=True, nthreads=[("", 1)], From 010c911604326932e5f80f6142ccc9051365f5d5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 11 Jul 2022 15:02:21 -0400 Subject: [PATCH 03/28] Call restart as RPC from client This lets us propagate errors, and is simpler anyway. --- distributed/client.py | 20 +++++--------------- distributed/scheduler.py | 4 +--- distributed/tests/test_client.py | 8 -------- distributed/tests/test_scheduler.py | 20 ++++++++++---------- 4 files changed, 16 insertions(+), 36 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index e63a3173502..5b3b49eb0e3 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -908,7 +908,6 @@ def __init__( "cancelled-key": self._handle_cancelled_key, "task-retried": self._handle_retried_key, "task-erred": self._handle_task_erred, - "restart": self._handle_restart, "error": self._handle_error, "event": self._handle_event, } @@ -1464,14 +1463,6 @@ def _handle_task_erred(self, key=None, exception=None, traceback=None): if state is not None: state.set_error(exception, traceback) - def _handle_restart(self): - logger.info("Receive restart signal from scheduler") - for state in self.futures.values(): - state.cancel() - self.futures.clear() - with suppress(AttributeError): - self._restart_event.set() - def _handle_error(self, exception=None): logger.warning("Scheduler exception:") logger.exception(exception) @@ -3325,12 +3316,11 @@ async def _restart(self, timeout=no_default): if timeout is not None: timeout = parse_timedelta(timeout, "s") - self._send_to_scheduler({"op": "restart", "timeout": timeout}) - self._restart_event = asyncio.Event() - try: - await asyncio.wait_for(self._restart_event.wait(), timeout) - except TimeoutError: - logger.error("Restart timed out after %.2f seconds", timeout) + await self.scheduler.restart(timeout=timeout) + + for state in self.futures.values(): + state.cancel() + self.futures.clear() self.generation += 1 with self._refcount_lock: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index aade048714a..eb6a956bfb2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3045,7 +3045,6 @@ def __init__( "client-releases-keys": self.client_releases_keys, "heartbeat-client": self.client_heartbeat, "close-client": self.remove_client, - "restart": self.restart, "subscribe-topic": self.subscribe_topic, "unsubscribe-topic": self.unsubscribe_topic, } @@ -3082,6 +3081,7 @@ def __init__( "rebalance": self.rebalance, "replicate": self.replicate, "run_function": self.run_function, + "restart": self.restart, "update_data": self.update_data, "set_resources": self.add_resources, "retire_workers": self.retire_workers, @@ -5177,8 +5177,6 @@ async def restart(self, client=None, timeout=30): f"but after {timeout}s, only {len(self.workers)} have returned." ) - self.report({"op": "restart"}) - async def broadcast( self, comm=None, diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 42f019bb983..b6b781fd8cf 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3493,14 +3493,6 @@ async def test_Client_clears_references_after_restart(c, s, a, b): assert key not in c.refcount -@gen_cluster(Worker=Nanny, client=True) -async def test_restart_timeout_is_logged(c, s, a, b): - with captured_logger(logging.getLogger("distributed.client")) as logger: - await c.restart(timeout="0.5s") - text = logger.getvalue() - assert "Restart timed out after 0.50 seconds" in text - - def test_get_stops_work_after_error(c): with pytest.raises(RuntimeError): c.get({"x": (throws, 1), "y": (sleep, 1.5)}, ["x", "y"]) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 09a4b74b72e..6925a9cacd8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -630,10 +630,10 @@ async def test_restart(c, s, a, b): @pytest.mark.slow -@gen_cluster(Worker=Nanny, nthreads=[("", 1)] * 5) -async def test_restart_waits_for_new_workers(s, *workers): +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)] * 5) +async def test_restart_waits_for_new_workers(c, s, *workers): n_initial_workers = len(s.workers) - await s.restart() + await c.restart() assert len(s.workers) == n_initial_workers for w in workers: assert w.address not in s.workers @@ -654,18 +654,18 @@ async def restart(self, *, timeout): return await super().restart(timeout=timeout) -@gen_cluster(Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) -async def test_restart_nanny_timeout_exceeded(s, a, b): +@gen_cluster(client=True, Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) +async def test_restart_nanny_timeout_exceeded(c, s, a, b): with pytest.raises(TimeoutError, match=r"2 worker\(s\) did not restart within 1s"): - await s.restart(timeout=1) + await c.restart(timeout="1s") assert a.restart_called.is_set() assert b.restart_called.is_set() -@gen_cluster(nthreads=[("", 1)] * 2) -async def test_restart_not_all_workers_return(s, a, b): +@gen_cluster(client=True, nthreads=[("", 1)] * 2) +async def test_restart_not_all_workers_return(c, s, a, b): with pytest.raises(TimeoutError, match=r"after 1s, only 0 have returned"): - await s.restart(timeout=1) + await c.restart(timeout="1s") @gen_cluster(client=True, Worker=Nanny) @@ -676,7 +676,7 @@ async def test_restart_some_nannies_some_not(c, s, a, b): await c.wait_for_workers(3) with pytest.raises(TimeoutError, match="after 5s, only 2 have returned"): - await s.restart(timeout=5) + await c.restart(timeout="5s") assert w.status == Status.closed From f0a4938ca0f9abb35719651ce6ead24e4c7c74be Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 11 Jul 2022 15:10:19 -0400 Subject: [PATCH 04/28] Client restart cleanup even if restart fails --- distributed/client.py | 17 +++++++++-------- distributed/tests/test_client.py | 8 ++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 5b3b49eb0e3..369be1bbe10 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3316,15 +3316,16 @@ async def _restart(self, timeout=no_default): if timeout is not None: timeout = parse_timedelta(timeout, "s") - await self.scheduler.restart(timeout=timeout) - - for state in self.futures.values(): - state.cancel() - self.futures.clear() + try: + await self.scheduler.restart(timeout=timeout) + finally: + for state in self.futures.values(): + state.cancel() + self.futures.clear() - self.generation += 1 - with self._refcount_lock: - self.refcount.clear() + self.generation += 1 + with self._refcount_lock: + self.refcount.clear() return self diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index b6b781fd8cf..b5eb15097f9 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3478,13 +3478,17 @@ def block(ev): @pytest.mark.slow -@gen_cluster(Worker=Nanny, client=True, timeout=60) +@gen_cluster(client=True) async def test_Client_clears_references_after_restart(c, s, a, b): x = c.submit(inc, 1) assert x.key in c.refcount + assert x.key in c.futures + + with pytest.raises(TimeoutError): + await c.restart(timeout=5) - await c.restart() assert x.key not in c.refcount + assert not c.futures key = x.key del x From 5a388d1ec79a4b8dcd94f978ab15d35b864e198c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 14 Jul 2022 14:48:11 -0400 Subject: [PATCH 05/28] logging to track down why restart is hanging --- distributed/nanny.py | 5 +++++ distributed/worker.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/distributed/nanny.py b/distributed/nanny.py index 2eae45bc3d8..11157dbd188 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -642,8 +642,10 @@ async def start(self) -> Status: """ enable_proctitle_on_children() if self.status == Status.running: + logger.info(f"{self.worker_address} - Start called when already running") return self.status if self.status == Status.starting: + logger.info(f"{self.worker_address} - Start called when already starting") await self.running.wait() return self.status @@ -676,7 +678,9 @@ async def start(self) -> Status: os.environ.update(self.env) try: + logger.info(f"{self.worker_address} - Starting worker process") await self.process.start() + logger.info(f"{self.worker_address} - Worker process started") except OSError: logger.exception("Nanny failed to start process", exc_info=True) self.process.terminate() @@ -684,6 +688,7 @@ async def start(self) -> Status: return self.status try: msg = await self._wait_until_connected(uid) + logger.info(f"{self.worker_address} - Connected to worker") except Exception: logger.exception("Failed to connect to process") self.status = Status.failed diff --git a/distributed/worker.py b/distributed/worker.py index 221e4fc6c29..51c6ee3f07a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1520,7 +1520,10 @@ async def close( # type: ignore # otherwise c.close() + logger.info(f"{self.address} - Clients closed") + await self.scheduler.close_rpc() + logger.info(f"{self.address} - Scheduler RPC closed") self._workdir.release() self.stop_services() @@ -1538,26 +1541,32 @@ async def close( # type: ignore with suppress(TimeoutError): await self.batched_stream.close(timedelta(seconds=timeout)) + logger.info(f"{self.address} - Batched stream to scheduler closed") + for executor in self.executors.values(): if executor is utils._offload_executor: continue # Never shutdown the offload executor def _close(wait): + logger.info(f"{self.address} - Closing {executor}, {wait=}, {timeout=}") if isinstance(executor, ThreadPoolExecutor): executor._work_queue.queue.clear() executor.shutdown(wait=wait, timeout=timeout) else: executor.shutdown(wait=wait) + logger.info(f"{self.address} - {executor} closed") # Waiting for the shutdown can block the event loop causing # weird deadlocks particularly if the task that is executing in # the thread is waiting for a server reply, e.g. when using # worker clients, semaphores, etc. if is_python_shutting_down(): + logger.info(f"{self.address} - Python is shutting down") # If we're shutting down there is no need to wait for daemon # threads to finish _close(wait=False) else: + logger.info(f"{self.address} - Python is not shutting down") try: await to_thread(_close, wait=executor_wait) except RuntimeError: # Are we shutting down the process? @@ -1569,12 +1578,15 @@ def _close(wait): _close(wait=executor_wait) # Just run it directly self.stop() + logger.info(f"{self.address} - Server stop") await self.rpc.close() + logger.info(f"{self.address} - RPC closed") self.status = Status.closed await ServerNode.close(self) setproctitle("dask-worker [closed]") + logger.info(f"{self.address} - Worker closed successfully") return "OK" async def close_gracefully(self, restart=None): From 85afcbc10f39f9dd48ee7810e27d903d5009be46 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 14 Jul 2022 16:23:46 -0400 Subject: [PATCH 06/28] Add back broadcasting restart message to clients If multiple clients are connected, the ones that didn't call `restart` still need to release their keys. driveby: refcounts were not being reset on clients that didn't call `restart`. So after restart, if a client reused a key that was referenced before restart, it would never be releasable. --- distributed/client.py | 22 +++++++++++----------- distributed/scheduler.py | 1 + distributed/tests/test_failed_workers.py | 18 ++++++++++++++++++ 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 369be1bbe10..f5e8d28abfd 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -908,6 +908,7 @@ def __init__( "cancelled-key": self._handle_cancelled_key, "task-retried": self._handle_retried_key, "task-erred": self._handle_task_erred, + "restart": self._handle_restart, "error": self._handle_error, "event": self._handle_event, } @@ -1463,6 +1464,15 @@ def _handle_task_erred(self, key=None, exception=None, traceback=None): if state is not None: state.set_error(exception, traceback) + def _handle_restart(self): + logger.info("Receive restart signal from scheduler") + for state in self.futures.values(): + state.cancel() + self.futures.clear() + self.generation += 1 + with self._refcount_lock: + self.refcount.clear() + def _handle_error(self, exception=None): logger.warning("Scheduler exception:") logger.exception(exception) @@ -3316,17 +3326,7 @@ async def _restart(self, timeout=no_default): if timeout is not None: timeout = parse_timedelta(timeout, "s") - try: - await self.scheduler.restart(timeout=timeout) - finally: - for state in self.futures.values(): - state.cancel() - self.futures.clear() - - self.generation += 1 - with self._refcount_lock: - self.refcount.clear() - + await self.scheduler.restart(timeout=timeout) return self def restart(self, **kwargs): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8eb3aaae0bd..edd00f9af17 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5146,6 +5146,7 @@ async def restart(self, client=None, timeout=30): ) self.clear_task_state() + self.report({"op": "restart"}) start = time() diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 9aac7d6f4b7..754e2916c0c 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -19,6 +19,7 @@ from distributed.utils import CancelledError, sync from distributed.utils_test import ( BlockedGatherDep, + async_wait_for, captured_logger, cluster, div, @@ -238,6 +239,23 @@ async def test_multiple_clients_restart(s, a, b): await asyncio.sleep(0.01) assert time() < start + 5 + assert not c1.futures + assert not c2.futures + + # Ensure both clients still work after restart. + # Reusing a previous key has no effect. + x2 = c1.submit(inc, 1, key=x.key) + y2 = c2.submit(inc, 2, key=y.key) + + assert x2._generation != x._generation + assert y2._generation != y._generation + + assert await x2 == 2 + assert await y2 == 3 + + del x2, y2 + await async_wait_for(lambda: not s.tasks, timeout=5) + await c1.close() await c2.close() From f7da0d28f84140d89ba90b858959308dfcfd7f8c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 14 Jul 2022 18:22:17 -0400 Subject: [PATCH 07/28] Make restart timeout 2x longer by default Restarting is apparently very slow in CI. See if this actually fixes tests failing. --- distributed/client.py | 2 +- distributed/scheduler.py | 5 +---- distributed/tests/test_scheduler.py | 6 ++++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index f5e8d28abfd..89b00e79eba 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3322,7 +3322,7 @@ def persist( async def _restart(self, timeout=no_default): if timeout == no_default: - timeout = self._timeout * 2 + timeout = self._timeout * 4 if timeout is not None: timeout = parse_timedelta(timeout, "s") diff --git a/distributed/scheduler.py b/distributed/scheduler.py index edd00f9af17..37be7e80d7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5181,10 +5181,7 @@ async def restart(self, client=None, timeout=30): resps = await asyncio.wait_for( asyncio.gather( - *( - nanny.restart(close=True, timeout=timeout * 0.8) - for nanny in nannies - ) + *(nanny.restart(close=True, timeout=timeout) for nanny in nannies) ), timeout, ) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 07ceebbc84e..dde5e5a02a1 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -668,6 +668,7 @@ async def test_restart_not_all_workers_return(c, s, a, b): await c.restart(timeout="1s") +@pytest.mark.slow @gen_cluster(client=True, Worker=Nanny) async def test_restart_some_nannies_some_not(c, s, a, b): original_procs = {a.process.process, b.process.process} @@ -675,8 +676,9 @@ async def test_restart_some_nannies_some_not(c, s, a, b): async with Worker(s.address, nthreads=1) as w: await c.wait_for_workers(3) - with pytest.raises(TimeoutError, match="after 5s, only 2 have returned"): - await c.restart(timeout="5s") + # FIXME how to make this not always take 20s if the nannies do restart quickly? + with pytest.raises(TimeoutError, match="after 20s, only 2 have returned"): + await c.restart(timeout="20s") assert w.status == Status.closed From fdf8358c82c18867ac36622f452807b858ddaf70 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 15 Jul 2022 11:08:55 -0400 Subject: [PATCH 08/28] Revert "logging to track down why restart is hanging" This reverts commit 5a388d1ec79a4b8dcd94f978ab15d35b864e198c. --- distributed/nanny.py | 5 ----- distributed/worker.py | 12 ------------ 2 files changed, 17 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 11157dbd188..2eae45bc3d8 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -642,10 +642,8 @@ async def start(self) -> Status: """ enable_proctitle_on_children() if self.status == Status.running: - logger.info(f"{self.worker_address} - Start called when already running") return self.status if self.status == Status.starting: - logger.info(f"{self.worker_address} - Start called when already starting") await self.running.wait() return self.status @@ -678,9 +676,7 @@ async def start(self) -> Status: os.environ.update(self.env) try: - logger.info(f"{self.worker_address} - Starting worker process") await self.process.start() - logger.info(f"{self.worker_address} - Worker process started") except OSError: logger.exception("Nanny failed to start process", exc_info=True) self.process.terminate() @@ -688,7 +684,6 @@ async def start(self) -> Status: return self.status try: msg = await self._wait_until_connected(uid) - logger.info(f"{self.worker_address} - Connected to worker") except Exception: logger.exception("Failed to connect to process") self.status = Status.failed diff --git a/distributed/worker.py b/distributed/worker.py index 51c6ee3f07a..221e4fc6c29 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1520,10 +1520,7 @@ async def close( # type: ignore # otherwise c.close() - logger.info(f"{self.address} - Clients closed") - await self.scheduler.close_rpc() - logger.info(f"{self.address} - Scheduler RPC closed") self._workdir.release() self.stop_services() @@ -1541,32 +1538,26 @@ async def close( # type: ignore with suppress(TimeoutError): await self.batched_stream.close(timedelta(seconds=timeout)) - logger.info(f"{self.address} - Batched stream to scheduler closed") - for executor in self.executors.values(): if executor is utils._offload_executor: continue # Never shutdown the offload executor def _close(wait): - logger.info(f"{self.address} - Closing {executor}, {wait=}, {timeout=}") if isinstance(executor, ThreadPoolExecutor): executor._work_queue.queue.clear() executor.shutdown(wait=wait, timeout=timeout) else: executor.shutdown(wait=wait) - logger.info(f"{self.address} - {executor} closed") # Waiting for the shutdown can block the event loop causing # weird deadlocks particularly if the task that is executing in # the thread is waiting for a server reply, e.g. when using # worker clients, semaphores, etc. if is_python_shutting_down(): - logger.info(f"{self.address} - Python is shutting down") # If we're shutting down there is no need to wait for daemon # threads to finish _close(wait=False) else: - logger.info(f"{self.address} - Python is not shutting down") try: await to_thread(_close, wait=executor_wait) except RuntimeError: # Are we shutting down the process? @@ -1578,15 +1569,12 @@ def _close(wait): _close(wait=executor_wait) # Just run it directly self.stop() - logger.info(f"{self.address} - Server stop") await self.rpc.close() - logger.info(f"{self.address} - RPC closed") self.status = Status.closed await ServerNode.close(self) setproctitle("dask-worker [closed]") - logger.info(f"{self.address} - Worker closed successfully") return "OK" async def close_gracefully(self, restart=None): From 659e2d1c6a932e503035785d0a7bf5059be10a38 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 15 Jul 2022 12:28:37 -0400 Subject: [PATCH 09/28] Fix `test_AllProgress`: reorder plugin.restart() --- distributed/scheduler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 37be7e80d7b..3a76f8ee704 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5131,13 +5131,7 @@ async def restart(self, client=None, timeout=30): stimulus_id = f"restart-{time()}" n_workers = len(self.workers) - for plugin in list(self.plugins.values()): - try: - plugin.restart(self) - except Exception as e: - logger.exception(e) - - logger.info("Send lost future signal to clients") + logger.info("Releasing all requested keys") for cs in self.clients.values(): self.client_releases_keys( keys=[ts.key for ts in cs.wants_what], @@ -5148,6 +5142,12 @@ async def restart(self, client=None, timeout=30): self.clear_task_state() self.report({"op": "restart"}) + for plugin in list(self.plugins.values()): + try: + plugin.restart(self) + except Exception as e: + logger.exception(e) + start = time() nanny_workers = { From b5c6ff0ddceb9a67b655a24958a6c6ee81c52476 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 15 Jul 2022 13:29:21 -0400 Subject: [PATCH 10/28] Inner function for single `wait_for` timeout --- distributed/scheduler.py | 95 ++++++++++++++--------------- distributed/tests/test_scheduler.py | 12 +++- 2 files changed, 54 insertions(+), 53 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3a76f8ee704..5a1db5997df 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5129,7 +5129,6 @@ async def restart(self, client=None, timeout=30): Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. """ stimulus_id = f"restart-{time()}" - n_workers = len(self.workers) logger.info("Releasing all requested keys") for cs in self.clients.values(): @@ -5148,68 +5147,64 @@ async def restart(self, client=None, timeout=30): except Exception as e: logger.exception(e) - start = time() - + n_workers = len(self.workers) nanny_workers = { addr: ws.nanny for addr, ws in self.workers.items() if ws.nanny } - # Close non-Nanny workers. We have no way to restart them, so we just let them go, - # and assume a deployment system is going to restart them for us. - await asyncio.gather( - *( - self.remove_worker(address=addr, stimulus_id=stimulus_id) - for addr in self.workers - if addr not in nanny_workers + async def _restart(): + # Close non-Nanny workers. We have no way to restart them, so we just let them go, + # and assume a deployment system is going to restart them for us. + await asyncio.gather( + *( + self.remove_worker(address=addr, stimulus_id=stimulus_id) + for addr in self.workers + if addr not in nanny_workers + ) ) - ) - if time() - start > timeout: - raise TimeoutError( - f"Removing non-nanny workers took >{timeout}s. " - "Consider setting a longer `timeout=` in `restart`." - ) + logger.debug("Send kill signal to nannies: %s", nanny_workers) + async with contextlib.AsyncExitStack() as stack: + nannies = [ + await stack.enter_async_context( + rpc(nanny_address, connection_args=self.connection_args) + ) + for nanny_address in nanny_workers.values() + ] - logger.debug("Send kill signal to nannies: %s", nanny_workers) - async with contextlib.AsyncExitStack() as stack: - nannies = [ - await stack.enter_async_context( - rpc(nanny_address, connection_args=self.connection_args) + resps = await asyncio.gather( + *(nanny.restart(close=True, timeout=timeout) for nanny in nannies) ) - for nanny_address in nanny_workers.values() - ] + # NOTE: the `WorkerState` entries for these workers will be removed + # naturally when they disconnect from the scheduler. - resps = await asyncio.wait_for( - asyncio.gather( - *(nanny.restart(close=True, timeout=timeout) for nanny in nannies) - ), - timeout, - ) - # NOTE: the `WorkerState` entries for these workers will be removed - # naturally when they disconnect from the scheduler. + if any(resp != "OK" for resp in resps): + raise TimeoutError - if n_failed := sum(resp != "OK" for resp in resps): - raise TimeoutError( - f"{n_failed} worker(s) did not restart within {timeout}s" - ) + with suppress(AttributeError): + for c in self._worker_coroutines: + c.cancel() - with suppress(AttributeError): - for c in self._worker_coroutines: - c.cancel() + self.erred_tasks.clear() + self.computations.clear() - self.erred_tasks.clear() - self.computations.clear() + self.log_event([client, "all"], {"action": "restart", "client": client}) + while len(self.workers) < n_workers: + await asyncio.sleep(0.01) - self.log_event([client, "all"], {"action": "restart", "client": client}) - while time() < start + timeout: - if len(self.workers) >= n_workers: - break - await asyncio.sleep(0.01) - else: - raise TimeoutError( - f"Waited for {n_workers} worker(s) to reconnect after restarting, " - f"but after {timeout}s, only {len(self.workers)} have returned." - ) + try: + await asyncio.wait_for(_restart(), timeout=timeout) + except TimeoutError: + msg = f"Restarting {n_workers} workers did not complete in {timeout}s." + if (n_nanny := len(nanny_workers)) < n_workers: + msg += ( + f" The {n_nanny} worker(s) not using Nannies were just shut down " + "instead of restarted (restart is only possible with Nannies). If " + "your deployment system does not automatically re-launch terminated " + "processes, then those workers will never back, and `Client.restart` " + "will always time out. Do not use `Client.restart` in that case." + ) + raise TimeoutError(msg) from None async def broadcast( self, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index dde5e5a02a1..404eedc4704 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -656,7 +656,9 @@ async def restart(self, *, timeout): @gen_cluster(client=True, Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) async def test_restart_nanny_timeout_exceeded(c, s, a, b): - with pytest.raises(TimeoutError, match=r"2 worker\(s\) did not restart within 1s"): + with pytest.raises( + TimeoutError, match="Restarting 2 workers did not complete in 1s" + ): await c.restart(timeout="1s") assert a.restart_called.is_set() assert b.restart_called.is_set() @@ -664,7 +666,9 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): @gen_cluster(client=True, nthreads=[("", 1)] * 2) async def test_restart_not_all_workers_return(c, s, a, b): - with pytest.raises(TimeoutError, match=r"after 1s, only 0 have returned"): + with pytest.raises( + TimeoutError, match="Restarting 2 workers did not complete in 1s" + ): await c.restart(timeout="1s") @@ -677,7 +681,9 @@ async def test_restart_some_nannies_some_not(c, s, a, b): await c.wait_for_workers(3) # FIXME how to make this not always take 20s if the nannies do restart quickly? - with pytest.raises(TimeoutError, match="after 20s, only 2 have returned"): + with pytest.raises( + TimeoutError, match="Restarting 3 workers did not complete in 20s" + ): await c.restart(timeout="20s") assert w.status == Status.closed From e18ea3759ae1b6ecdfb3c734d48a6eed2a94819e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 15 Jul 2022 13:32:28 -0400 Subject: [PATCH 11/28] docstring on client as well --- distributed/client.py | 11 +++++++++-- distributed/scheduler.py | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 89b00e79eba..1656058ca56 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3329,13 +3329,20 @@ async def _restart(self, timeout=no_default): await self.scheduler.restart(timeout=timeout) return self - def restart(self, **kwargs): + def restart(self, timeout=no_default): """Restart the distributed network This kills all active work, deletes all data on the network, and restarts the worker processes. + + Workers without nannies are shut down, hoping an external deployment system + will restart them. Therefore, if not using nannies and your deployment system + does not automatically restart workers, ``restart`` will just shut down all + workers, then time out! + + Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. """ - return self.sync(self._restart, **kwargs) + return self.sync(self._restart, timeout=timeout) async def _upload_large_file(self, local_filename, remote_filename=None): if remote_filename is None: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5a1db5997df..91e2f3d2405 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5121,10 +5121,10 @@ async def restart(self, client=None, timeout=30): """ Restart all workers. Reset local state. - Workers without nannies are shut down (assuming an external deployment system - may restart them). Therefore, if not using nannies and your deployment system + Workers without nannies are shut down, hoping an external deployment system + will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers, ``restart`` will just shut down all - workers! + workers, then time out! Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. """ From b82e9ffa1a1cb2694ccef504db36a2e6b1fc7f24 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 15 Jul 2022 15:24:00 -0400 Subject: [PATCH 12/28] Clarify restart docstring --- distributed/client.py | 2 +- distributed/scheduler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 1656058ca56..a459a261485 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3330,7 +3330,7 @@ async def _restart(self, timeout=no_default): return self def restart(self, timeout=no_default): - """Restart the distributed network + """Clear all tasks, restart all workers, and wait for them to return. This kills all active work, deletes all data on the network, and restarts the worker processes. diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 91e2f3d2405..2232770cf21 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5119,7 +5119,7 @@ def clear_task_state(self): @log_errors async def restart(self, client=None, timeout=30): """ - Restart all workers. Reset local state. + Restart all workers. Reset local state. Wait for workers to return. Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system From c22f99c954767661a43f63e01ad55b0ea09bc29c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 10:23:19 -0400 Subject: [PATCH 13/28] Move other clearing ops first --- distributed/scheduler.py | 5 ++--- distributed/tests/test_scheduler.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2232770cf21..5f14e381a43 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5139,6 +5139,8 @@ async def restart(self, client=None, timeout=30): ) self.clear_task_state() + self.erred_tasks.clear() + self.computations.clear() self.report({"op": "restart"}) for plugin in list(self.plugins.values()): @@ -5185,9 +5187,6 @@ async def _restart(): for c in self._worker_coroutines: c.cancel() - self.erred_tasks.clear() - self.computations.clear() - self.log_event([client, "all"], {"action": "restart", "client": client}) while len(self.workers) < n_workers: await asyncio.sleep(0.01) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 404eedc4704..9b75160c51d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -656,6 +656,14 @@ async def restart(self, *, timeout): @gen_cluster(client=True, Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) async def test_restart_nanny_timeout_exceeded(c, s, a, b): + f = c.submit(div, 1, 0) + fr = c.submit(inc, 1, resources={"FOO": 1}) + await wait(f) + assert s.erred_tasks + assert s.computations + assert s.unrunnable + assert s.tasks + with pytest.raises( TimeoutError, match="Restarting 2 workers did not complete in 1s" ): @@ -663,6 +671,15 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): assert a.restart_called.is_set() assert b.restart_called.is_set() + assert not s.erred_tasks + assert not s.computations + assert not s.unrunnable + assert not s.tasks + + assert not c.futures + assert f.status == "cancelled" + assert fr.status == "cancelled" + @gen_cluster(client=True, nthreads=[("", 1)] * 2) async def test_restart_not_all_workers_return(c, s, a, b): From 025d02e9fb387ac5685a602f96002d5d1e684305 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 10:28:56 -0400 Subject: [PATCH 14/28] Don't apply timeout to `remove_worker` --- distributed/scheduler.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5f14e381a43..26786427646 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5126,7 +5126,8 @@ async def restart(self, client=None, timeout=30): does not automatically restart workers, ``restart`` will just shut down all workers, then time out! - Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. + Raises `TimeoutError` if not all workers come back within ``timeout`` seconds + after being shut down. """ stimulus_id = f"restart-{time()}" @@ -5153,18 +5154,18 @@ async def restart(self, client=None, timeout=30): nanny_workers = { addr: ws.nanny for addr, ws in self.workers.items() if ws.nanny } - - async def _restart(): - # Close non-Nanny workers. We have no way to restart them, so we just let them go, - # and assume a deployment system is going to restart them for us. - await asyncio.gather( - *( - self.remove_worker(address=addr, stimulus_id=stimulus_id) - for addr in self.workers - if addr not in nanny_workers - ) + # Close non-Nanny workers. We have no way to restart them, so we just let them go, + # and assume a deployment system is going to restart them for us. + # Don't apply timeout here, since `remove_worker` doesn't block on connections. + await asyncio.gather( + *( + self.remove_worker(address=addr, stimulus_id=stimulus_id) + for addr in self.workers + if addr not in nanny_workers ) + ) + async def _restart(): logger.debug("Send kill signal to nannies: %s", nanny_workers) async with contextlib.AsyncExitStack() as stack: nannies = [ From 648f1e9215f8a8ada5230f08af5fb5fb79766fe1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 10:30:53 -0400 Subject: [PATCH 15/28] Connect to nannies in parallel --- distributed/scheduler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 26786427646..2cbaca2ccdb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5168,12 +5168,14 @@ async def restart(self, client=None, timeout=30): async def _restart(): logger.debug("Send kill signal to nannies: %s", nanny_workers) async with contextlib.AsyncExitStack() as stack: - nannies = [ - await stack.enter_async_context( - rpc(nanny_address, connection_args=self.connection_args) + nannies = await asyncio.gather( + *( + stack.enter_async_context( + rpc(nanny_address, connection_args=self.connection_args) + ) + for nanny_address in nanny_workers.values() ) - for nanny_address in nanny_workers.values() - ] + ) resps = await asyncio.gather( *(nanny.restart(close=True, timeout=timeout) for nanny in nannies) From 868c0c2e06673bcd9e1c4af48925292656d98a65 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 12:46:38 -0400 Subject: [PATCH 16/28] Explain `TimeoutError` contract --- distributed/scheduler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2cbaca2ccdb..a10d1680371 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5126,8 +5126,12 @@ async def restart(self, client=None, timeout=30): does not automatically restart workers, ``restart`` will just shut down all workers, then time out! - Raises `TimeoutError` if not all workers come back within ``timeout`` seconds - after being shut down. + Raises `TimeoutError` if the restart process takes longer than ``timeout`` + seconds. This could mean not all workers came back within ``timeout`` seconds, + or that they didn't shut down in time, or even that connecting to all Nannies + took too long. After a `TimeoutError`, the cluster is still usable, but + non-restarted workers may still be connected to the cluster, and they may + shut themselves down at some point in the future. """ stimulus_id = f"restart-{time()}" From 2864e23e39624a82d88680fb0ddec9e46658027e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 12:53:03 -0400 Subject: [PATCH 17/28] Revert "Explain `TimeoutError` contract" This reverts commit 868c0c2e06673bcd9e1c4af48925292656d98a65. --- distributed/scheduler.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a10d1680371..2cbaca2ccdb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5126,12 +5126,8 @@ async def restart(self, client=None, timeout=30): does not automatically restart workers, ``restart`` will just shut down all workers, then time out! - Raises `TimeoutError` if the restart process takes longer than ``timeout`` - seconds. This could mean not all workers came back within ``timeout`` seconds, - or that they didn't shut down in time, or even that connecting to all Nannies - took too long. After a `TimeoutError`, the cluster is still usable, but - non-restarted workers may still be connected to the cluster, and they may - shut themselves down at some point in the future. + Raises `TimeoutError` if not all workers come back within ``timeout`` seconds + after being shut down. """ stimulus_id = f"restart-{time()}" From 08014e94ddfba46e5f4e999eb98a3ddbc51f3a23 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 13:19:04 -0400 Subject: [PATCH 18/28] Only apply timeout to worker-waiting --- distributed/scheduler.py | 70 ++++++++++++++++------------- distributed/tests/test_scheduler.py | 14 +++--- 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2cbaca2ccdb..016c3f781c5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -73,7 +73,7 @@ from distributed.event import EventExtension from distributed.http import get_handlers from distributed.lock import LockExtension -from distributed.metrics import time +from distributed.metrics import monotonic, time from distributed.multi_lock import MultiLockExtension from distributed.node import ServerNode from distributed.proctitle import setproctitle @@ -5156,7 +5156,6 @@ async def restart(self, client=None, timeout=30): } # Close non-Nanny workers. We have no way to restart them, so we just let them go, # and assume a deployment system is going to restart them for us. - # Don't apply timeout here, since `remove_worker` doesn't block on connections. await asyncio.gather( *( self.remove_worker(address=addr, stimulus_id=stimulus_id) @@ -5165,43 +5164,54 @@ async def restart(self, client=None, timeout=30): ) ) - async def _restart(): - logger.debug("Send kill signal to nannies: %s", nanny_workers) - async with contextlib.AsyncExitStack() as stack: - nannies = await asyncio.gather( - *( - stack.enter_async_context( - rpc(nanny_address, connection_args=self.connection_args) - ) - for nanny_address in nanny_workers.values() + logger.debug("Send kill signal to nannies: %s", nanny_workers) + async with contextlib.AsyncExitStack() as stack: + nannies = await asyncio.gather( + *( + stack.enter_async_context( + rpc(nanny_address, connection_args=self.connection_args) ) + for nanny_address in nanny_workers.values() ) + ) - resps = await asyncio.gather( - *(nanny.restart(close=True, timeout=timeout) for nanny in nannies) - ) - # NOTE: the `WorkerState` entries for these workers will be removed - # naturally when they disconnect from the scheduler. + start = monotonic() + resps = await asyncio.gather( + *( + asyncio.wait_for( + nanny.restart(close=True, timeout=timeout), timeout + ) + for nanny in nannies + ), + return_exceptions=True, + ) + # NOTE: the `WorkerState` entries for these workers will be removed + # naturally when they disconnect from the scheduler. - if any(resp != "OK" for resp in resps): - raise TimeoutError + if n_failed := sum(resp != "OK" for resp in resps): + raise TimeoutError( + f"{n_failed}/{len(nannies)} worker(s) did not restart within {timeout}s" + ) - with suppress(AttributeError): - for c in self._worker_coroutines: - c.cancel() + with suppress(AttributeError): + for c in self._worker_coroutines: + c.cancel() - self.log_event([client, "all"], {"action": "restart", "client": client}) - while len(self.workers) < n_workers: - await asyncio.sleep(0.01) + self.log_event([client, "all"], {"action": "restart", "client": client}) + while monotonic() < start + timeout: + if len(self.workers) >= n_workers: + return + await asyncio.sleep(0.01) + else: + msg = ( + f"Waited for {n_workers} worker(s) to reconnect after restarting, " + f"but after {timeout}s, only {len(self.workers)} have returned." + ) - try: - await asyncio.wait_for(_restart(), timeout=timeout) - except TimeoutError: - msg = f"Restarting {n_workers} workers did not complete in {timeout}s." if (n_nanny := len(nanny_workers)) < n_workers: msg += ( - f" The {n_nanny} worker(s) not using Nannies were just shut down " - "instead of restarted (restart is only possible with Nannies). If " + f" The {n_workers - n_nanny} worker(s) not using Nannies were just shut " + "down instead of restarted (restart is only possible with Nannies). If " "your deployment system does not automatically re-launch terminated " "processes, then those workers will never back, and `Client.restart` " "will always time out. Do not use `Client.restart` in that case." diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9b75160c51d..a65d9a2677f 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -665,7 +665,7 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): assert s.tasks with pytest.raises( - TimeoutError, match="Restarting 2 workers did not complete in 1s" + TimeoutError, match=r"2/2 worker\(s\) did not restart within 1s" ): await c.restart(timeout="1s") assert a.restart_called.is_set() @@ -683,11 +683,13 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): @gen_cluster(client=True, nthreads=[("", 1)] * 2) async def test_restart_not_all_workers_return(c, s, a, b): - with pytest.raises( - TimeoutError, match="Restarting 2 workers did not complete in 1s" - ): + with pytest.raises(TimeoutError, match="Waited for 2 worker"): await c.restart(timeout="1s") + assert not s.workers + assert a.status in (Status.closed, Status.closing) + assert b.status in (Status.closed, Status.closing) + @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny) @@ -698,9 +700,7 @@ async def test_restart_some_nannies_some_not(c, s, a, b): await c.wait_for_workers(3) # FIXME how to make this not always take 20s if the nannies do restart quickly? - with pytest.raises( - TimeoutError, match="Restarting 3 workers did not complete in 20s" - ): + with pytest.raises(TimeoutError, match=r"The 1 worker\(s\) not using Nannies"): await c.restart(timeout="20s") assert w.status == Status.closed From 55f16cd458596cff8153e891ae27c40425f20faa Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 13:22:53 -0400 Subject: [PATCH 19/28] move proc restart testing to more appropraite test --- distributed/tests/test_scheduler.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a65d9a2677f..89433c3f9a8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -632,12 +632,23 @@ async def test_restart(c, s, a, b): @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)] * 5) async def test_restart_waits_for_new_workers(c, s, *workers): - n_initial_workers = len(s.workers) + original_procs = {n.process.process for n in workers} + original_workers = dict(s.workers) + await c.restart() - assert len(s.workers) == n_initial_workers + assert len(s.workers) == len(original_workers) for w in workers: assert w.address not in s.workers + # Confirm they restarted + # NOTE: == for `psutil.Process` compares PID and creation time + new_procs = {n.process.process for n in workers} + assert new_procs != original_procs + # The workers should have new addresses + assert s.workers.keys().isdisjoint(original_workers.keys()) + # The old WorkerState instances should be replaced + assert set(s.workers.values()).isdisjoint(original_workers.values()) + class SlowRestartNanny(Nanny): def __init__(self, *args, **kwargs): @@ -694,8 +705,7 @@ async def test_restart_not_all_workers_return(c, s, a, b): @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny) async def test_restart_some_nannies_some_not(c, s, a, b): - original_procs = {a.process.process, b.process.process} - original_workers = dict(s.workers) + original_addrs = set(s.workers) async with Worker(s.address, nthreads=1) as w: await c.wait_for_workers(3) @@ -706,14 +716,8 @@ async def test_restart_some_nannies_some_not(c, s, a, b): assert w.status == Status.closed assert len(s.workers) == 2 - # Confirm they restarted - # NOTE: == for `psutil.Process` compares PID and creation time - new_procs = {a.process.process, b.process.process} - assert new_procs != original_procs - # The workers should have new addresses - assert s.workers.keys().isdisjoint(original_workers.keys()) - # The old WorkerState instances should be replaced - assert set(s.workers.values()).isdisjoint(original_workers.values()) + assert set(s.workers).isdisjoint(original_addrs) + assert w.address not in s.workers @gen_cluster( From 0e3d96dc7c76218b7012fdbea48c77a329289b3c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 13:23:26 -0400 Subject: [PATCH 20/28] remove `_worker_coroutines` --- distributed/scheduler.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 016c3f781c5..ef42aebc4bf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5193,10 +5193,6 @@ async def restart(self, client=None, timeout=30): f"{n_failed}/{len(nannies)} worker(s) did not restart within {timeout}s" ) - with suppress(AttributeError): - for c in self._worker_coroutines: - c.cancel() - self.log_event([client, "all"], {"action": "restart", "client": client}) while monotonic() < start + timeout: if len(self.workers) >= n_workers: From 3d6a938765d0c03c03c5bfa3c883dee6821f0af8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 16:21:00 -0400 Subject: [PATCH 21/28] `kill` nannies instead of `restart` separating whether a worker took too long to shut down vs start up allows us to guarantee all old workers are removed --- distributed/nanny.py | 2 +- distributed/scheduler.py | 25 ++++++++++++++++++--- distributed/tests/test_scheduler.py | 34 ++++++++++++++--------------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 2eae45bc3d8..7818efee85d 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -379,7 +379,7 @@ async def kill(self, timeout=2): informed """ if self.process is None: - return "OK" + return deadline = time() + timeout await self.process.kill(timeout=0.8 * (deadline - time())) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c28115b0a50..e1b9fc7ad78 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5123,6 +5123,9 @@ async def restart(self, client=None, timeout=30): Raises `TimeoutError` if not all workers come back within ``timeout`` seconds after being shut down. + + After `restart`, all connected workers are new, regardless of whether `TimeoutError` + was raised. """ stimulus_id = f"restart-{time()}" @@ -5174,7 +5177,11 @@ async def restart(self, client=None, timeout=30): resps = await asyncio.gather( *( asyncio.wait_for( - nanny.restart(close=True, timeout=timeout), timeout + # FIXME does not raise if the process fails to shut down, + # see https://github.com/dask/distributed/pull/6427/files#r894917424 + # NOTE: Nanny will automatically restart worker process when it's killed + nanny.kill(timeout=timeout), + timeout, ) for nanny in nannies ), @@ -5183,9 +5190,21 @@ async def restart(self, client=None, timeout=30): # NOTE: the `WorkerState` entries for these workers will be removed # naturally when they disconnect from the scheduler. - if n_failed := sum(resp != "OK" for resp in resps): + # Remove any workers that failed to shut down, so we can guarantee + # that after `restart`, there are no old workers around. + bad_nannies = [ + addr for addr, resp in zip(nanny_workers, resps) if resp is not None + ] + if bad_nannies: + await asyncio.gather( + *( + self.remove_worker(addr, stimulus_id=stimulus_id) + for addr in bad_nannies + ) + ) + raise TimeoutError( - f"{n_failed}/{len(nannies)} worker(s) did not restart within {timeout}s" + f"{len(bad_nannies)}/{len(nannies)} worker(s) did not shut down within {timeout}s" ) self.log_event([client, "all"], {"action": "restart", "client": client}) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 89433c3f9a8..a23510f7b26 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -650,22 +650,21 @@ async def test_restart_waits_for_new_workers(c, s, *workers): assert set(s.workers.values()).isdisjoint(original_workers.values()) -class SlowRestartNanny(Nanny): +class SlowKillNanny(Nanny): def __init__(self, *args, **kwargs): - self.restart_proceed = asyncio.Event() - self.restart_called = asyncio.Event() + self.kill_proceed = asyncio.Event() + self.kill_called = asyncio.Event() super().__init__(*args, **kwargs) - async def restart(self, *, timeout): - self.restart_called.set() - try: - await asyncio.wait_for(self.restart_proceed.wait(), timeout) - except TimeoutError: - return "timed out" - return await super().restart(timeout=timeout) + async def kill(self, *, timeout): + self.kill_called.set() + print("kill called") + await asyncio.wait_for(self.kill_proceed.wait(), timeout) + print("kill proceed") + return await super().kill(timeout=timeout) -@gen_cluster(client=True, Worker=SlowRestartNanny, nthreads=[("", 1)] * 2) +@gen_cluster(client=True, Worker=SlowKillNanny, nthreads=[("", 1)] * 2) async def test_restart_nanny_timeout_exceeded(c, s, a, b): f = c.submit(div, 1, 0) fr = c.submit(inc, 1, resources={"FOO": 1}) @@ -676,12 +675,13 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): assert s.tasks with pytest.raises( - TimeoutError, match=r"2/2 worker\(s\) did not restart within 1s" + TimeoutError, match=r"2/2 worker\(s\) did not shut down within 1s" ): await c.restart(timeout="1s") - assert a.restart_called.is_set() - assert b.restart_called.is_set() + assert a.kill_called.is_set() + assert b.kill_called.is_set() + assert not s.workers assert not s.erred_tasks assert not s.computations assert not s.unrunnable @@ -723,7 +723,7 @@ async def test_restart_some_nannies_some_not(c, s, a, b): @gen_cluster( client=True, nthreads=[("", 1)], - Worker=SlowRestartNanny, + Worker=SlowKillNanny, worker_kwargs={"heartbeat_interval": "1ms"}, ) async def test_restart_heartbeat_before_closing(c, s, n): @@ -734,13 +734,13 @@ async def test_restart_heartbeat_before_closing(c, s, n): prev_workers = dict(s.workers) restart_task = asyncio.create_task(s.restart()) - await n.restart_called.wait() + await n.kill_called.wait() await asyncio.sleep(0.5) # significantly longer than the heartbeat interval # WorkerState should not be removed yet, because the worker hasn't been told to close assert s.workers - n.restart_proceed.set() + n.kill_proceed.set() # Wait until the worker has left (possibly until it's come back too) while s.workers == prev_workers: await asyncio.sleep(0.01) From b7c5e40e488681eb4e8f4a44850dbcb8f8dd4010 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 18 Jul 2022 16:36:09 -0400 Subject: [PATCH 22/28] Add `wait_for_workers` option --- distributed/client.py | 32 +++++++++++----- distributed/scheduler.py | 59 ++++++++++++++++++----------- distributed/tests/test_scheduler.py | 11 ++++++ 3 files changed, 70 insertions(+), 32 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index a459a261485..6e041bbd55e 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3320,29 +3320,43 @@ def persist( else: return result - async def _restart(self, timeout=no_default): + async def _restart(self, timeout=no_default, wait_for_workers=True): if timeout == no_default: timeout = self._timeout * 4 if timeout is not None: timeout = parse_timedelta(timeout, "s") - await self.scheduler.restart(timeout=timeout) + await self.scheduler.restart(timeout=timeout, wait_for_workers=wait_for_workers) return self - def restart(self, timeout=no_default): - """Clear all tasks, restart all workers, and wait for them to return. - - This kills all active work, deletes all data on the network, and - restarts the worker processes. + def restart(self, timeout=no_default, wait_for_workers=True): + """ + Restart all workers. Reset local state. Optionally wait for workers to return. Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers, ``restart`` will just shut down all workers, then time out! - Raises `TimeoutError` if not all workers come back within ``timeout`` seconds. + After `restart`, all connected workers are new, regardless of whether `TimeoutError` + was raised. Any workers that failed to shut down in time are removed, and + may or many not shut down on their own in the future. + + Parameters + ---------- + timeout: + How long to wait for workers to shut down and come back, if `wait_for_workers` + is True, otherwise just how long to wait for workers to shut down. + Raises `asyncio.TimeoutError` if this is exceeded. + wait_for_workers: + Whether to wait for all workers to reconnect, or just for them to shut down + (default True). Use ``restart(wait_for_workers=False)`` combined with + `Client.wait_for_workers` for granular control over how many workers to + wait for. """ - return self.sync(self._restart, timeout=timeout) + return self.sync( + self._restart, timeout=timeout, wait_for_workers=wait_for_workers + ) async def _upload_large_file(self, local_filename, remote_filename=None): if remote_filename is None: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e1b9fc7ad78..81b0deceeb6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5112,20 +5112,30 @@ def clear_task_state(self): collection.clear() @log_errors - async def restart(self, client=None, timeout=30): + async def restart(self, client=None, timeout=30, wait_for_workers=True): """ - Restart all workers. Reset local state. Wait for workers to return. + Restart all workers. Reset local state. Optionally wait for workers to return. Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers, ``restart`` will just shut down all workers, then time out! - Raises `TimeoutError` if not all workers come back within ``timeout`` seconds - after being shut down. - After `restart`, all connected workers are new, regardless of whether `TimeoutError` - was raised. + was raised. Any workers that failed to shut down in time are removed, and + may or many not shut down on their own in the future. + + Parameters + ---------- + timeout: + How long to wait for workers to shut down and come back, if `wait_for_workers` + is True, otherwise just how long to wait for workers to shut down. + Raises `asyncio.TimeoutError` if this is exceeded. + wait_for_workers: + Whether to wait for all workers to reconnect, or just for them to shut down + (default True). Use ``restart(wait_for_workers=False)`` combined with + `Client.wait_for_workers` for granular control over how many workers to + wait for. """ stimulus_id = f"restart-{time()}" @@ -5208,25 +5218,28 @@ async def restart(self, client=None, timeout=30): ) self.log_event([client, "all"], {"action": "restart", "client": client}) - while monotonic() < start + timeout: - if len(self.workers) >= n_workers: - return - await asyncio.sleep(0.01) - else: - msg = ( - f"Waited for {n_workers} worker(s) to reconnect after restarting, " - f"but after {timeout}s, only {len(self.workers)} have returned." - ) - if (n_nanny := len(nanny_workers)) < n_workers: - msg += ( - f" The {n_workers - n_nanny} worker(s) not using Nannies were just shut " - "down instead of restarted (restart is only possible with Nannies). If " - "your deployment system does not automatically re-launch terminated " - "processes, then those workers will never back, and `Client.restart` " - "will always time out. Do not use `Client.restart` in that case." + if wait_for_workers: + while monotonic() < start + timeout: + if len(self.workers) >= n_workers: + return + await asyncio.sleep(0.01) + else: + msg = ( + f"Waited for {n_workers} worker(s) to reconnect after restarting, " + f"but after {timeout}s, only {len(self.workers)} have returned. " + "Consider a longer timeout, or `wait_for_workers=False`." ) - raise TimeoutError(msg) from None + + if (n_nanny := len(nanny_workers)) < n_workers: + msg += ( + f" The {n_workers - n_nanny} worker(s) not using Nannies were just shut " + "down instead of restarted (restart is only possible with Nannies). If " + "your deployment system does not automatically re-launch terminated " + "processes, then those workers will never come back, and `Client.restart` " + "will always time out. Do not use `Client.restart` in that case." + ) + raise TimeoutError(msg) from None async def broadcast( self, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a23510f7b26..d2dcbc3f1e6 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -702,6 +702,17 @@ async def test_restart_not_all_workers_return(c, s, a, b): assert b.status in (Status.closed, Status.closing) +@gen_cluster(client=True, nthreads=[("", 1)] * 2) +async def test_restart_no_wait_for_workers(c, s, a, b): + await c.restart(timeout="1s", wait_for_workers=False) + + assert not s.workers + # Workers are not immediately closed because of https://github.com/dask/distributed/issues/6390 + # (the message is still waiting in the BatchedSend) + await a.finished() + await b.finished() + + @pytest.mark.slow @gen_cluster(client=True, Worker=Nanny) async def test_restart_some_nannies_some_not(c, s, a, b): From 1dd9620a58e161932b9b3628880a79884df92bf7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 10:28:21 -0400 Subject: [PATCH 23/28] Fix docstrings & error messages Co-authored-by: Hendrik Makait --- distributed/client.py | 5 ++++- distributed/scheduler.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 6e041bbd55e..115aa1a6be3 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3340,7 +3340,7 @@ def restart(self, timeout=no_default, wait_for_workers=True): After `restart`, all connected workers are new, regardless of whether `TimeoutError` was raised. Any workers that failed to shut down in time are removed, and - may or many not shut down on their own in the future. + may or may not shut down on their own in the future. Parameters ---------- @@ -3353,6 +3353,9 @@ def restart(self, timeout=no_default, wait_for_workers=True): (default True). Use ``restart(wait_for_workers=False)`` combined with `Client.wait_for_workers` for granular control over how many workers to wait for. + See also + ---------- + Scheduler.restart """ return self.sync( self._restart, timeout=timeout, wait_for_workers=wait_for_workers diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 81b0deceeb6..ac316058163 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5136,6 +5136,9 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): (default True). Use ``restart(wait_for_workers=False)`` combined with `Client.wait_for_workers` for granular control over how many workers to wait for. + See also + ---------- + Client.restart """ stimulus_id = f"restart-{time()}" @@ -5214,7 +5217,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): ) raise TimeoutError( - f"{len(bad_nannies)}/{len(nannies)} worker(s) did not shut down within {timeout}s" + f"{len(bad_nannies)}/{len(nannies)} nanny worker(s) did not shut down within {timeout}s" ) self.log_event([client, "all"], {"action": "restart", "client": client}) From 675c425bceafaea7fa195089340da4569f96c159 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 10:32:26 -0400 Subject: [PATCH 24/28] Note new workers may take place of old ones --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ac316058163..7ced2e990b5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5224,6 +5224,8 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): if wait_for_workers: while monotonic() < start + timeout: + # NOTE: if new (unrelated) workers join while we're waiting, we may return before + # our shut-down workers have come back up. That's fine; workers are interchangeable. if len(self.workers) >= n_workers: return await asyncio.sleep(0.01) From 8b4fa1a574b3fd512ec32e98c3c8a711771fb53f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 10:34:24 -0400 Subject: [PATCH 25/28] decrease wait_for_workers poll interval 10ms is way too fast --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7ced2e990b5..60ce596a262 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5228,7 +5228,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): # our shut-down workers have come back up. That's fine; workers are interchangeable. if len(self.workers) >= n_workers: return - await asyncio.sleep(0.01) + await asyncio.sleep(0.2) else: msg = ( f"Waited for {n_workers} worker(s) to reconnect after restarting, " From 7eb97ba73cddeb9f2d0f1c7c31fa2a93aa443169 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 10:38:10 -0400 Subject: [PATCH 26/28] Missed one typo Co-authored-by: Hendrik Makait --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 60ce596a262..775732bb8bd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5123,7 +5123,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): After `restart`, all connected workers are new, regardless of whether `TimeoutError` was raised. Any workers that failed to shut down in time are removed, and - may or many not shut down on their own in the future. + may or may not shut down on their own in the future. Parameters ---------- From b4b96051e263b3d16b667cd774521833bfba1903 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 18 Jul 2022 19:34:27 +0200 Subject: [PATCH 27/28] Drop redundant geninc (#6740) --- distributed/tests/test_client.py | 13 ++++++------- distributed/utils_test.py | 5 ----- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 2fc77242ab0..3625caa163c 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -92,7 +92,6 @@ double, gen_cluster, gen_test, - geninc, get_cert, inc, map_varying, @@ -2662,13 +2661,13 @@ def func(x, y=10): @gen_cluster(client=True) async def test_run_coroutine(c, s, a, b): - results = await c.run(geninc, 1, delay=0.05) + results = await c.run(asyncinc, 1, delay=0.05) assert results == {a.address: 2, b.address: 2} - results = await c.run(geninc, 1, delay=0.05, workers=[a.address]) + results = await c.run(asyncinc, 1, delay=0.05, workers=[a.address]) assert results == {a.address: 2} - results = await c.run(geninc, 1, workers=[]) + results = await c.run(asyncinc, 1, workers=[]) assert results == {} with pytest.raises(RuntimeError, match="hello"): @@ -2679,14 +2678,14 @@ async def test_run_coroutine(c, s, a, b): def test_run_coroutine_sync(c, s, a, b): - result = c.run(geninc, 2, delay=0.01) + result = c.run(asyncinc, 2, delay=0.01) assert result == {a["address"]: 3, b["address"]: 3} - result = c.run(geninc, 2, workers=[a["address"]]) + result = c.run(asyncinc, 2, workers=[a["address"]]) assert result == {a["address"]: 3} t1 = time() - result = c.run(geninc, 2, delay=10, wait=False) + result = c.run(asyncinc, 2, delay=10, wait=False) t2 = time() assert result is None assert t2 - t1 <= 1.0 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 4994ea8c464..fadc6002376 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -430,11 +430,6 @@ def apply(func, *args, **kwargs): return apply, list(map(varying, itemslists)) -async def geninc(x, delay=0.02): - await asyncio.sleep(delay) - return x + 1 - - async def asyncinc(x, delay=0.02): await asyncio.sleep(delay) return x + 1 From 9ead9c400ee40122ebc1529dae1ad53003992030 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 19 Jul 2022 11:17:32 -0400 Subject: [PATCH 28/28] fix test_restart_nanny_timeout_exceeded --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d2dcbc3f1e6..a0ee5823dda 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -675,7 +675,7 @@ async def test_restart_nanny_timeout_exceeded(c, s, a, b): assert s.tasks with pytest.raises( - TimeoutError, match=r"2/2 worker\(s\) did not shut down within 1s" + TimeoutError, match=r"2/2 nanny worker\(s\) did not shut down within 1s" ): await c.restart(timeout="1s") assert a.kill_called.is_set()