From 178b9b5d777aa05a5f9d3fe496783fb592e43173 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 15:18:26 +0200 Subject: [PATCH 01/14] Add reason for Nanny.kill and Nanny.restart --- distributed/nanny.py | 22 +++++++++++++--------- distributed/scheduler.py | 5 ++++- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 43523694cc6..9f18f5427b6 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -15,7 +15,7 @@ from inspect import isawaitable from queue import Empty from time import sleep as sync_sleep -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Literal from toolz import merge from tornado import gen @@ -255,7 +255,7 @@ def __init__( # type: ignore[no-untyped-def] handlers = { "instantiate": self.instantiate, "kill": self.kill, - "restart": self.restart, + "restart": self.restart, # TODO: Is this being used anywhere? # cannot call it 'close' on the rpc side for naming conflict "get_logs": self.get_logs, "terminate": self.close, @@ -373,7 +373,7 @@ async def start_unsafe(self): return self - async def kill(self, timeout=2): + async def kill(self, timeout: float = 2, reason: str = "unknown") -> None: """Kill the local worker process Blocks until both the process is down and the scheduler is properly @@ -383,7 +383,7 @@ async def kill(self, timeout=2): return deadline = time() + timeout - await self.process.kill(timeout=0.8 * (deadline - time())) + await self.process.kill(reason=reason, timeout=0.8 * (deadline - time())) async def instantiate(self) -> Status: """Start a local worker process @@ -464,7 +464,7 @@ async def plugin_add(self, plugin=None, name=None): msg = error_message(e) return msg if getattr(plugin, "restart", False): - await self.restart() + await self.restart(reason=f"Nanny plugin {name} requested restart.") return {"status": "OK"} @@ -483,10 +483,12 @@ async def plugin_remove(self, name=None): return {"status": "OK"} - async def restart(self, timeout=30): + async def restart( + self, timeout: float = 30, reason: str = "unknown" + ) -> Literal["OK", "timed out"]: async def _(): if self.process is not None: - await self.kill() + await self.kill(reason=reason) await self.instantiate() try: @@ -764,7 +766,9 @@ def mark_stopped(self): if self.on_exit is not None: self.on_exit(r) - async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None: + async def kill( + self, reason: str, timeout: float = 2, executor_wait: bool = True + ) -> None: """ Ensure the worker process is stopped, waiting at most ``timeout * 0.8`` seconds before killing it abruptly. @@ -787,7 +791,7 @@ async def kill(self, timeout: float = 2, executor_wait: bool = True) -> None: Status.failed, # process failed to start, but hasn't been joined yet ), self.status self.status = Status.stopping - logger.info("Nanny asking worker to close") + logger.info("Nanny asking worker to close. Reason: %", reason) process = self.process assert process diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9cae2088cce..d28b240dbb5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5751,7 +5751,10 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): # FIXME does not raise if the process fails to shut down, # see https://github.com/dask/distributed/pull/6427/files#r894917424 # NOTE: Nanny will automatically restart worker process when it's killed - nanny.kill(timeout=timeout), + nanny.kill( + reason="Scheduler is restarting all workers.", + timeout=timeout, + ), timeout, ) for nanny in nannies From 819565a521361be4b8f456f76b4808b072b7537d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 15:35:39 +0200 Subject: [PATCH 02/14] Add more reasons --- distributed/nanny.py | 4 ++-- distributed/scheduler.py | 12 ++++++++---- distributed/worker.py | 18 +++++++++++++----- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 9f18f5427b6..8507da39d92 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -464,7 +464,7 @@ async def plugin_add(self, plugin=None, name=None): msg = error_message(e) return msg if getattr(plugin, "restart", False): - await self.restart(reason=f"Nanny plugin {name} requested restart.") + await self.restart(reason=f"Nanny plugin {name} requests restart.") return {"status": "OK"} @@ -594,7 +594,7 @@ async def close(self, timeout=5): self.stop() try: if self.process is not None: - await self.kill(timeout=timeout) + await self.kill(timeout=timeout, reason="Nanny is closing.") except Exception: logger.exception("Error in Nanny killing Worker subprocess") self.process = None diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d28b240dbb5..a819fce994a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3981,7 +3981,7 @@ async def log_errors(func): if not comm.closed(): # This closes the Worker and ensures that if a Nanny is around, # it is closed as well - comm.send({"op": "close"}) + comm.send({"op": "close", "reason": "Scheduler is closing."}) comm.send({"op": "close-stream"}) # ^ TODO remove? `Worker.close` will close the stream anyway. with suppress(AttributeError): @@ -4729,7 +4729,9 @@ def close_worker(self, worker: str) -> None: logger.info("Closing worker %s", worker) self.log_event(worker, {"action": "close-worker"}) - self.worker_send(worker, {"op": "close"}) + self.worker_send( + worker, {"op": "close", "reason": "Scheduler asks worker to close."} + ) @log_errors async def remove_worker( @@ -4768,7 +4770,9 @@ async def remove_worker( logger.info("Remove worker %s", ws) if close: with suppress(AttributeError, CommClosedError): - self.stream_comms[address].send({"op": "close"}) + self.stream_comms[address].send( + {"op": "close", "reason": "Scheduler removes worker."} + ) self.remove_resources(address) @@ -5752,7 +5756,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): # see https://github.com/dask/distributed/pull/6427/files#r894917424 # NOTE: Nanny will automatically restart worker process when it's killed nanny.kill( - reason="Scheduler is restarting all workers.", + reason="Scheduler restarts all workers.", timeout=timeout, ), timeout, diff --git a/distributed/worker.py b/distributed/worker.py index 258447e1091..7001159f68d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1461,6 +1461,7 @@ async def close( # type: ignore timeout: float = 30, executor_wait: bool = True, nanny: bool = True, + reason: str = "unknown", ) -> str | None: """Close the worker @@ -1469,12 +1470,14 @@ async def close( # type: ignore Parameters ---------- - timeout : float, default 30 + timeout Timeout in seconds for shutting down individual instructions - executor_wait : bool, default True + executor_wait If True, shut down executors synchronously, otherwise asynchronously - nanny : bool, default True + nanny If True, close the nanny + reason + Reason for closing the worker Returns ------- @@ -1486,6 +1489,11 @@ async def close( # type: ignore # nanny+worker, the nanny must be notified first. ==> Remove kwarg # nanny, see also Scheduler.retire_workers if self.status in (Status.closed, Status.closing, Status.failed): + logging.debug( + "Attempted to close worker that is already %s. Reason: %s", + self.status, + reason, + ) await self.finished() return None @@ -1503,9 +1511,9 @@ async def close( # type: ignore disable_gc_diagnosis() try: - logger.info("Stopping worker at %s", self.address) + logger.info("Stopping worker at %s. Reason: ", self.address, reason) except ValueError: # address not available if already closed - logger.info("Stopping worker") + logger.info("Stopping worker. Reason: %s", reason) if self.status not in WORKER_ANY_RUNNING: logger.info("Closed worker has not yet started: %s", self.status) if not executor_wait: From 9851ed5280731427b4b8a6723123d2168d0e58ca Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 15:57:08 +0200 Subject: [PATCH 03/14] Log messages --- distributed/nanny.py | 8 ++++---- distributed/worker.py | 7 +++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 8507da39d92..fe8c608823b 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -373,7 +373,7 @@ async def start_unsafe(self): return self - async def kill(self, timeout: float = 2, reason: str = "unknown") -> None: + async def kill(self, timeout: float = 2, reason: str | None = None) -> None: """Kill the local worker process Blocks until both the process is down and the scheduler is properly @@ -484,7 +484,7 @@ async def plugin_remove(self, name=None): return {"status": "OK"} async def restart( - self, timeout: float = 30, reason: str = "unknown" + self, timeout: float = 30, reason: str | None = None ) -> Literal["OK", "timed out"]: async def _(): if self.process is not None: @@ -767,7 +767,7 @@ def mark_stopped(self): self.on_exit(r) async def kill( - self, reason: str, timeout: float = 2, executor_wait: bool = True + self, timeout: float = 2, executor_wait: bool = True, reason: str | None = None ) -> None: """ Ensure the worker process is stopped, waiting at most @@ -791,7 +791,7 @@ async def kill( Status.failed, # process failed to start, but hasn't been joined yet ), self.status self.status = Status.stopping - logger.info("Nanny asking worker to close. Reason: %", reason) + logger.info("Nanny asking worker to close. Reason: %s", reason) process = self.process assert process diff --git a/distributed/worker.py b/distributed/worker.py index e99a37163ef..19579220975 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -207,7 +207,10 @@ async def _force_close(self): 2. If it doesn't, log and kill the process """ try: - await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30) + await asyncio.wait_for( + self.close(nanny=False, executor_wait=False, reason="Worker failed hard."), + 30, + ) except (KeyboardInterrupt, SystemExit): # pragma: nocover raise except BaseException: # pragma: nocover @@ -1455,7 +1458,7 @@ async def close( # type: ignore timeout: float = 30, executor_wait: bool = True, nanny: bool = True, - reason: str = "unknown", + reason: str | None = None, ) -> str | None: """Close the worker From fc5ed42c9d511fd082c46ab8a6d85a296b298b5c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 17:00:16 +0200 Subject: [PATCH 04/14] Typing --- distributed/nanny.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index fe8c608823b..4869329ab1e 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -110,7 +110,7 @@ class Nanny(ServerNode): """ _instances: ClassVar[weakref.WeakSet[Nanny]] = weakref.WeakSet() - process = None + process: WorkerProcess | None memory_manager: NannyMemoryManager env: dict[str, str] @@ -162,6 +162,7 @@ def __init__( # type: ignore[no-untyped-def] stacklevel=2, ) + self.process = None self._setup_logging(logger) self.loop = self.io_loop = IOLoop.current() @@ -617,6 +618,7 @@ class WorkerProcess: running: asyncio.Event stopped: asyncio.Event + process: AsyncProcess | None env: dict[str, str] pre_spawn_env: dict[str, str] From 753d93f51874e39bb6de4ee87652c0fdc2362866 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 18:26:38 +0200 Subject: [PATCH 05/14] Fix error --- distributed/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 19579220975..069181fdd02 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1508,7 +1508,7 @@ async def close( # type: ignore disable_gc_diagnosis() try: - logger.info("Stopping worker at %s. Reason: ", self.address, reason) + logger.info("Stopping worker at %s. Reason: %s", self.address, reason) except ValueError: # address not available if already closed logger.info("Stopping worker. Reason: %s", reason) if self.status not in WORKER_ANY_RUNNING: From d71bbbdd30e90f55a24054e107fc863f2961730d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 08:37:00 +0200 Subject: [PATCH 06/14] Drop TODO and add reasons --- distributed/nanny.py | 6 ++++-- distributed/tests/test_scheduler.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 4869329ab1e..0cb0cad6141 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -256,7 +256,7 @@ def __init__( # type: ignore[no-untyped-def] handlers = { "instantiate": self.instantiate, "kill": self.kill, - "restart": self.restart, # TODO: Is this being used anywhere? + "restart": self.restart, # cannot call it 'close' on the rpc side for naming conflict "get_logs": self.get_logs, "terminate": self.close, @@ -805,6 +805,7 @@ async def kill( "op": "stop", "timeout": wait_timeout, "executor_wait": executor_wait, + "reason": reason, } ) await asyncio.sleep(0) # otherwise we get broken pipe errors @@ -877,12 +878,13 @@ def _run( loop.make_current() worker = Worker(**worker_kwargs) - async def do_stop(timeout=5, executor_wait=True): + async def do_stop(timeout=5, executor_wait=True, reason=None): try: await worker.close( nanny=False, executor_wait=executor_wait, timeout=timeout, + reason=reason, ) finally: loop.stop() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8a4696aafde..a607467fdd8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -917,12 +917,12 @@ def __init__(self, *args, **kwargs): self.kill_called = asyncio.Event() super().__init__(*args, **kwargs) - async def kill(self, *, timeout): + async def kill(self, *, timeout, reason=None): self.kill_called.set() print("kill called") await asyncio.wait_for(self.kill_proceed.wait(), timeout) print("kill proceed") - return await super().kill(timeout=timeout) + return await super().kill(timeout=timeout, reason=reason) @gen_cluster(client=True, Worker=SlowKillNanny, nthreads=[("", 1)] * 2) From d7ab802aa175fe117b7519ad05035725effa5947 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 09:01:23 +0200 Subject: [PATCH 07/14] Improve tracking on nanny --- distributed/nanny.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 0cb0cad6141..44cd3ceafe9 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -365,7 +365,7 @@ async def start_unsafe(self): response = await self.instantiate() if response != Status.running: - await self.close() + await self.close(reason="nanny-start-failed") return assert self.worker_address @@ -374,7 +374,7 @@ async def start_unsafe(self): return self - async def kill(self, timeout: float = 2, reason: str | None = None) -> None: + async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None: """Kill the local worker process Blocks until both the process is down and the scheduler is properly @@ -431,7 +431,9 @@ async def instantiate(self) -> Status: self, self.scheduler_addr, ) - await self.close(timeout=self.death_timeout) + await self.close( + timeout=self.death_timeout, reason="nanny-instantiate-timeout" + ) raise else: @@ -439,7 +441,7 @@ async def instantiate(self) -> Status: result = await self.process.start() except Exception: logger.error("Failed to start process", exc_info=True) - await self.close() + await self.close(reason="nanny-instantiate-failed") raise return result @@ -465,7 +467,7 @@ async def plugin_add(self, plugin=None, name=None): msg = error_message(e) return msg if getattr(plugin, "restart", False): - await self.restart(reason=f"Nanny plugin {name} requests restart.") + await self.restart(reason=f"nanny-plugin-{name}-restart") return {"status": "OK"} @@ -485,7 +487,7 @@ async def plugin_remove(self, name=None): return {"status": "OK"} async def restart( - self, timeout: float = 30, reason: str | None = None + self, timeout: float = 30, reason: str = "nanny-restart" ) -> Literal["OK", "timed out"]: async def _(): if self.process is not None: @@ -528,7 +530,7 @@ async def _on_worker_exit(self, exitcode): except OSError: logger.exception("Failed to unregister") if not self.reconnect: - await self.close() + await self.close(reason="nanny-unregister-failed") return try: @@ -541,7 +543,7 @@ async def _on_worker_exit(self, exitcode): logger.warning("Restarting worker") await self.instantiate() elif self.status == Status.closing_gracefully: - await self.close() + await self.close(reason="nanny-closing-gracefully") except Exception: logger.error( @@ -556,6 +558,7 @@ def _close(self, *args, **kwargs): warnings.warn("Worker._close has moved to Worker.close", stacklevel=2) return self.close(*args, **kwargs) + # TODO: Include reason def close_gracefully(self): """ A signal that we shouldn't try to restart workers if they go away @@ -564,7 +567,9 @@ def close_gracefully(self): """ self.status = Status.closing_gracefully - async def close(self, timeout=5): + async def close( + self, timeout: float = 5, reason: str = "nanny-close" + ) -> Literal["OK"]: """ Close the worker process, stop all comms. """ @@ -576,10 +581,7 @@ async def close(self, timeout=5): return "OK" self.status = Status.closing - logger.info( - "Closing Nanny at %r.", - self.address_safe, - ) + logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) for preload in self.preloads: await preload.teardown() @@ -595,7 +597,7 @@ async def close(self, timeout=5): self.stop() try: if self.process is not None: - await self.kill(timeout=timeout, reason="Nanny is closing.") + await self.kill(timeout=timeout, reason=reason) except Exception: logger.exception("Error in Nanny killing Worker subprocess") self.process = None @@ -748,6 +750,7 @@ def pid(self): def mark_stopped(self): if self.status != Status.stopped: + assert self.process is not None r = self.process.exitcode assert r is not None if r != 0: @@ -769,7 +772,10 @@ def mark_stopped(self): self.on_exit(r) async def kill( - self, timeout: float = 2, executor_wait: bool = True, reason: str | None = None + self, + timeout: float = 2, + executor_wait: bool = True, + reason: str = "workerprocess-kill", ) -> None: """ Ensure the worker process is stopped, waiting at most @@ -878,7 +884,9 @@ def _run( loop.make_current() worker = Worker(**worker_kwargs) - async def do_stop(timeout=5, executor_wait=True, reason=None): + async def do_stop( + timeout=5, executor_wait=True, reason="workerprocess-stop" + ): try: await worker.close( nanny=False, From d8762a2004e7015b908f615825ae4197da18561b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 09:09:22 +0200 Subject: [PATCH 08/14] Improve worker tracking --- distributed/nanny.py | 2 +- distributed/worker.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 44cd3ceafe9..fef592bca47 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -543,7 +543,7 @@ async def _on_worker_exit(self, exitcode): logger.warning("Restarting worker") await self.instantiate() elif self.status == Status.closing_gracefully: - await self.close(reason="nanny-closing-gracefully") + await self.close(reason="nanny-close-gracefully") except Exception: logger.error( diff --git a/distributed/worker.py b/distributed/worker.py index 069181fdd02..5ba0749a84b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -166,6 +166,7 @@ } +# TODO: Parametrize with reason def fail_hard(method: Callable[P, T]) -> Callable[P, T]: """ Decorator to close the worker if this method encounters an exception. @@ -208,7 +209,7 @@ async def _force_close(self): """ try: await asyncio.wait_for( - self.close(nanny=False, executor_wait=False, reason="Worker failed hard."), + self.close(nanny=False, executor_wait=False, reason="worker-fail-hard"), 30, ) except (KeyboardInterrupt, SystemExit): # pragma: nocover @@ -827,7 +828,9 @@ def __init__( if lifetime: lifetime += (random.random() * 2 - 1) * lifetime_stagger - self.io_loop.call_later(lifetime, self.close_gracefully) + self.io_loop.call_later( + lifetime, self.close_gracefully, reason="worker-lifetime-reached" + ) self.lifetime = lifetime Worker._instances.add(self) @@ -1458,7 +1461,7 @@ async def close( # type: ignore timeout: float = 30, executor_wait: bool = True, nanny: bool = True, - reason: str | None = None, + reason: str = "worker-close", ) -> str | None: """Close the worker @@ -1632,7 +1635,7 @@ def _close(executor, wait): setproctitle("dask worker [closed]") return "OK" - async def close_gracefully(self, restart=None): + async def close_gracefully(self, reason="worker-close-gracefully"): """Gracefully shut down a worker This first informs the scheduler that we're shutting down, and asks it @@ -1644,10 +1647,7 @@ async def close_gracefully(self, restart=None): if self.status == Status.closed: return - if restart is None: - restart = self.lifetime_restart - - logger.info("Closing worker gracefully: %s", self.address) + logger.info("Closing worker gracefully: %s. Reason: %s", self.address, reason) # Wait for all tasks to leave the worker and don't accept any new ones. # Scheduler.retire_workers will set the status to closing_gracefully and push it # back to this worker. @@ -1657,7 +1657,7 @@ async def close_gracefully(self, restart=None): remove=False, stimulus_id=f"worker-close-gracefully-{time()}", ) - await self.close(nanny=not restart) + await self.close(nanny=not self.lifetime_restart, reason=reason) async def wait_until_closed(self): warnings.warn("wait_until_closed has moved to finished()") From 11662abb2769ff2f16f7aa69e5c3155503be7241 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 09:40:57 +0200 Subject: [PATCH 09/14] Further improvements --- distributed/nanny.py | 4 ++-- distributed/scheduler.py | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index fef592bca47..8d1aaf3bbef 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -256,9 +256,9 @@ def __init__( # type: ignore[no-untyped-def] handlers = { "instantiate": self.instantiate, "kill": self.kill, - "restart": self.restart, - # cannot call it 'close' on the rpc side for naming conflict + "restart": self.restart, # TODO: Remove since this is not being used anywhere "get_logs": self.get_logs, + # cannot call it 'close' on the rpc side for naming conflict "terminate": self.close, "close_gracefully": self.close_gracefully, "run": self.run, diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a819fce994a..cf240240cfb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3981,7 +3981,7 @@ async def log_errors(func): if not comm.closed(): # This closes the Worker and ensures that if a Nanny is around, # it is closed as well - comm.send({"op": "close", "reason": "Scheduler is closing."}) + comm.send({"op": "close", "reason": "scheduler-close"}) comm.send({"op": "close-stream"}) # ^ TODO remove? `Worker.close` will close the stream anyway. with suppress(AttributeError): @@ -4729,9 +4729,7 @@ def close_worker(self, worker: str) -> None: logger.info("Closing worker %s", worker) self.log_event(worker, {"action": "close-worker"}) - self.worker_send( - worker, {"op": "close", "reason": "Scheduler asks worker to close."} - ) + self.worker_send(worker, {"op": "close", "reason": "scheduler-close-worker"}) @log_errors async def remove_worker( @@ -4771,7 +4769,7 @@ async def remove_worker( if close: with suppress(AttributeError, CommClosedError): self.stream_comms[address].send( - {"op": "close", "reason": "Scheduler removes worker."} + {"op": "close", "reason": "scheduler-remove-worker"} ) self.remove_resources(address) @@ -5756,7 +5754,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): # see https://github.com/dask/distributed/pull/6427/files#r894917424 # NOTE: Nanny will automatically restart worker process when it's killed nanny.kill( - reason="Scheduler restarts all workers.", + reason="scheduler-restart", timeout=timeout, ), timeout, From 61e2068f22531cc5ef8a8fa3d756ed75d7c65f88 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 09:51:26 +0200 Subject: [PATCH 10/14] Add reason to PackageInstall plugin --- distributed/diagnostics/plugin.py | 4 +++- distributed/worker.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index a8445feea19..f6c82f4c06b 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -307,7 +307,9 @@ async def setup(self, worker): if self.restart and worker.nanny and not await self._is_restarted(worker): logger.info("Restarting worker to refresh interpreter.") await self._set_restarted(worker) - worker.loop.add_callback(worker.close_gracefully, restart=True) + worker.loop.add_callback( + worker.close_gracefully, restart=True, reason=f"{self.name}-setup" + ) @abc.abstractmethod def install(self) -> None: diff --git a/distributed/worker.py b/distributed/worker.py index 5ba0749a84b..33b2f239834 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1635,7 +1635,9 @@ def _close(executor, wait): setproctitle("dask worker [closed]") return "OK" - async def close_gracefully(self, reason="worker-close-gracefully"): + async def close_gracefully( + self, restart=None, reason: str = "worker-close-gracefully" + ): """Gracefully shut down a worker This first informs the scheduler that we're shutting down, and asks it @@ -1657,7 +1659,9 @@ async def close_gracefully(self, reason="worker-close-gracefully"): remove=False, stimulus_id=f"worker-close-gracefully-{time()}", ) - await self.close(nanny=not self.lifetime_restart, reason=reason) + if restart is None: + restart = self.lifetime_restart + await self.close(nanny=not restart, reason=reason) async def wait_until_closed(self): warnings.warn("wait_until_closed has moved to finished()") From 4bc0c831eda3908b4329741f657847f37caad30f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 10:00:50 +0200 Subject: [PATCH 11/14] Add reason to fail_hard --- distributed/tests/test_utils_test.py | 2 ++ distributed/worker.py | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 270275a9aa5..350985bbc6d 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -839,6 +839,8 @@ async def test(s): while a.status != Status.closed: await asyncio.sleep(0.01) + method_name = "fail_sync" if sync else "fail_async" + assert f"worker-{method_name}-fail-hard" in logger.getvalue() test_done = True diff --git a/distributed/worker.py b/distributed/worker.py index 33b2f239834..8ba729b809a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -166,11 +166,11 @@ } -# TODO: Parametrize with reason def fail_hard(method: Callable[P, T]) -> Callable[P, T]: """ Decorator to close the worker if this method encounters an exception. """ + reason = f"worker-{method.__name__}-fail-hard" if iscoroutinefunction(method): @functools.wraps(method) @@ -181,7 +181,7 @@ async def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> Any: if self.status not in (Status.closed, Status.closing): self.log_event("worker-fail-hard", error_message(e)) logger.exception(e) - await _force_close(self) + await _force_close(self, reason) raise else: @@ -194,13 +194,13 @@ def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> T: if self.status not in (Status.closed, Status.closing): self.log_event("worker-fail-hard", error_message(e)) logger.exception(e) - self.loop.add_callback(_force_close, self) + self.loop.add_callback(_force_close, self, reason) raise return wrapper # type: ignore -async def _force_close(self): +async def _force_close(self, reason: str): """ Used with the fail_hard decorator defined above @@ -209,7 +209,7 @@ async def _force_close(self): """ try: await asyncio.wait_for( - self.close(nanny=False, executor_wait=False, reason="worker-fail-hard"), + self.close(nanny=False, executor_wait=False, reason=reason), 30, ) except (KeyboardInterrupt, SystemExit): # pragma: nocover From 061e21cf3904de35e7f991a2416e47c23ba979e9 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 21 Oct 2022 12:58:52 +0200 Subject: [PATCH 12/14] Add reason --- distributed/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 8ba729b809a..6a91abdf71d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1260,7 +1260,7 @@ async def handle_scheduler(self, comm: Comm) -> None: self.address, self.status, ) - await self.close() + await self.close(reason="worker-handle-scheduler-connection-broken") async def upload_file( self, filename: str, data: str | bytes, load: bool = True From e8c61d5eb703a346eb29c37b19f8a882db01caff Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 24 Oct 2022 19:01:01 +0200 Subject: [PATCH 13/14] Add tests --- distributed/nanny.py | 6 ++++-- distributed/tests/test_nanny.py | 12 +++++++----- distributed/tests/test_scheduler.py | 5 +++-- distributed/tests/test_worker.py | 4 +++- distributed/worker.py | 2 +- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 8d1aaf3bbef..278779daaeb 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -558,14 +558,16 @@ def _close(self, *args, **kwargs): warnings.warn("Worker._close has moved to Worker.close", stacklevel=2) return self.close(*args, **kwargs) - # TODO: Include reason - def close_gracefully(self): + def close_gracefully(self, reason: str = "nanny-close-gracefully") -> None: """ A signal that we shouldn't try to restart workers if they go away This is used as part of the cluster shutdown process. """ self.status = Status.closing_gracefully + logger.info( + "Closing Nanny gracefully at %r. Reason: %s", self.address_safe, reason + ) async def close( self, timeout: float = 5, reason: str = "nanny-close" diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 1de2d758a18..0fe602c14aa 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -113,12 +113,14 @@ async def test_no_hang_when_scheduler_closes(s, a, b): Worker=Nanny, nthreads=[("127.0.0.1", 1)], worker_kwargs={"reconnect": False} ) async def test_close_on_disconnect(s, w): - await s.close() + with captured_logger("distributed.nanny") as logger: + await s.close() - start = time() - while w.status != Status.closed: - await asyncio.sleep(0.05) - assert time() < start + 9 + start = time() + while w.status != Status.closed: + await asyncio.sleep(0.05) + assert time() < start + 9 + assert "Reason: scheduler-close" in logger.getvalue() class Something(Worker): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a607467fdd8..b247ac5fba0 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -869,8 +869,9 @@ async def test_restart(c, s, a, b): with captured_logger("distributed.scheduler") as caplog: futures = c.map(inc, range(20)) await wait(futures) - - await s.restart() + with captured_logger("distributed.nanny") as nanny_logger: + await s.restart() + assert "Reason: scheduler-restart" in nanny_logger.getvalue() assert not s.computations assert not s.task_prefixes diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 25e11b9a9ca..aebf3b2b1e1 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1505,7 +1505,9 @@ async def test_close_gracefully(c, s, a, b): assert any(ts for ts in b.state.tasks.values() if ts.state == "executing") - await b.close_gracefully() + with captured_logger("distributed.worker") as logger: + await b.close_gracefully(reason="foo") + assert "Reason: foo" in logger.getvalue() assert b.status == Status.closed assert b.address not in s.workers diff --git a/distributed/worker.py b/distributed/worker.py index 6a91abdf71d..f349527f882 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1542,7 +1542,7 @@ async def close( # type: ignore if nanny and self.nanny: with self.rpc(self.nanny) as r: - await r.close_gracefully() + await r.close_gracefully(reason=reason) setproctitle("dask worker [closing]") From 459f0d3f9b8e582f975d4f4e1931b3a5e2eaa4dd Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 24 Oct 2022 19:57:02 +0200 Subject: [PATCH 14/14] Remove TODO --- distributed/nanny.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 278779daaeb..29e71f14c8e 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -256,7 +256,7 @@ def __init__( # type: ignore[no-untyped-def] handlers = { "instantiate": self.instantiate, "kill": self.kill, - "restart": self.restart, # TODO: Remove since this is not being used anywhere + "restart": self.restart, "get_logs": self.get_logs, # cannot call it 'close' on the rpc side for naming conflict "terminate": self.close,