From 0e329ddb366436b6e5f4a9a5d79d68b49c252cbd Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 22 Feb 2024 18:54:07 +0000 Subject: [PATCH 1/9] Race condition on graceful shutdown --- distributed/scheduler.py | 9 ++++----- distributed/tests/test_nanny.py | 30 ++++++++++++++++++++++++++++++ distributed/worker.py | 26 +++++++++++++++++++------- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bebfbc2109b..8bf58a3a038 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7091,12 +7091,11 @@ async def retire_workers( If neither ``workers`` nor ``names`` are provided, we call ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) - Whether or not to actually close the worker explicitly from here. - Otherwise we expect some external job scheduler to finish off the - worker. + Whether to actually close the worker explicitly from here. + Otherwise, we expect some external job scheduler to finish off the worker. remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us. + Whether to remove the worker metadata immediately or else wait for the + worker to contact us. If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 74d049a35d4..950c66bc16a 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -956,3 +956,33 @@ async def test_nanny_plugin_register_nanny_killed(c, s, restart): finally: proc.kill() assert await register == {} + + +@pytest.mark.slow +@gen_cluster( + client=True, + Worker=Nanny, + nthreads=[("", 1)], + worker_kwargs={"heartbeat_interval": "10ms"}, +) +async def test_nanny_does_not_restart_worker_on_graceful_retirement(c, s, a): + """Tests https://github.com/dask/distributed/pull/8522 + + Some clusters (e.g. SpecCluster) implement downscaling by calling + `Scheduler.retire_workers()` without arguments, which defaults to + `remove=True, close_workers=False`. + + and then use an external system to tear down the worker and the nanny. In these + cases, make sure that the worker doesn't kill itself and that the nanny doesn't + restart it after the heartbeat to the scheduler fails. + """ + await s.retire_workers([a.worker_address], stimulus_id="test") + # On Linux, it takes ~3.5s for the nanny to resuscitate a worker + await asyncio.sleep(5) + assert not s.workers + events = [ + ev + for _, ev in s.events["all"] + if isinstance(ev, dict) and ev.get("action") == "add-worker" + ] + assert len(events) == 1 diff --git a/distributed/worker.py b/distributed/worker.py index 55dd5a7724f..10aa85908eb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1272,12 +1272,24 @@ async def heartbeat(self) -> None: self._update_latency(end - start) if response["status"] == "missing": - # Scheduler thought we left. Reconnection is not supported, so just shut down. - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. Shutting down." - ) - # Something is out of sync; have the nanny restart us if possible. - await self.close(nanny=False) + # Scheduler thought we left. + # Reconnection is not supported, so just shut down. + + if self.status == Status.closing_gracefully: + # Called Scheduler.retire_workers(remove=True, close_workers=False) + # The worker will remain indefinitely in this state, unknown to the + # scheduler, until something else shuts it down. + # Stopping the heartbeat is just a nice-to-have to reduce + # unnecessary warnings on the scheduler log. + logger.info("Stopping heartbeat to the scheduler") + self.periodic_callbacks["heartbeat"].stop() + else: + logger.error( + f"Scheduler was unaware of this worker {self.address!r}. " + "Shutting down." + ) + # Have the nanny restart us if possible + await self.close(nanny=False, reason="worker-heartbeat-missing") return self.scheduler_delay = response["time"] - middle @@ -1290,7 +1302,7 @@ async def heartbeat(self) -> None: logger.exception("Failed to communicate with scheduler during heartbeat.") except Exception: logger.exception("Unexpected exception during heartbeat. Closing worker.") - await self.close() + await self.close(reason="worker-heartbeat-error") raise @fail_hard From 1b4e6d246c7f0019075ff80f7189017f4e0779a1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 12:54:49 +0000 Subject: [PATCH 2/9] Code review --- distributed/tests/test_nanny.py | 30 ----------- distributed/tests/test_worker.py | 89 +++++--------------------------- distributed/worker.py | 37 +++++-------- 3 files changed, 27 insertions(+), 129 deletions(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 950c66bc16a..74d049a35d4 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -956,33 +956,3 @@ async def test_nanny_plugin_register_nanny_killed(c, s, restart): finally: proc.kill() assert await register == {} - - -@pytest.mark.slow -@gen_cluster( - client=True, - Worker=Nanny, - nthreads=[("", 1)], - worker_kwargs={"heartbeat_interval": "10ms"}, -) -async def test_nanny_does_not_restart_worker_on_graceful_retirement(c, s, a): - """Tests https://github.com/dask/distributed/pull/8522 - - Some clusters (e.g. SpecCluster) implement downscaling by calling - `Scheduler.retire_workers()` without arguments, which defaults to - `remove=True, close_workers=False`. - - and then use an external system to tear down the worker and the nanny. In these - cases, make sure that the worker doesn't kill itself and that the nanny doesn't - restart it after the heartbeat to the scheduler fails. - """ - await s.retire_workers([a.worker_address], stimulus_id="test") - # On Linux, it takes ~3.5s for the nanny to resuscitate a worker - await asyncio.sleep(5) - assert not s.workers - events = [ - ev - for _, ev in s.events["all"] - if isinstance(ev, dict) and ev.get("action") == "add-worker" - ] - assert len(events) == 1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f67cd4c26f8..54ebc59c284 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1766,88 +1766,27 @@ def bad_heartbeat_worker(*args, **kwargs): assert w.status == Status.running logs = logger.getvalue() assert "Failed to communicate with scheduler during heartbeat" in logs - assert "Traceback" in logs -@gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"}) -async def test_heartbeat_missing(s, a, monkeypatch): - async def missing_heartbeat_worker(*args, **kwargs): - return {"status": "missing"} - - with captured_logger("distributed.worker", level=logging.WARNING) as wlogger: - monkeypatch.setattr(a.scheduler, "heartbeat_worker", missing_heartbeat_worker) - await a.heartbeat() - assert a.status == Status.closed - assert "Scheduler was unaware of this worker" in wlogger.getvalue() - - while s.workers: - await asyncio.sleep(0.01) - - -@gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"}) -async def test_heartbeat_missing_real_cluster(s, a): - # The idea here is to create a situation where `s.workers[a.address]`, - # doesn't exist, but the worker is not yet closed and can still heartbeat. - # Ideally, this state would be impossible, and this test would be removed, - # and the `status: missing` handling would be replaced with an assertion error. - # However, `Scheduler.remove_worker` and `Worker.close` both currently leave things - # in degenerate, half-closed states while they're running (and yielding control - # via `await`). - # When https://github.com/dask/distributed/issues/6390 is fixed, this should no - # longer be possible. - - assumption_msg = "Test assumptions have changed. Race condition may have been fixed; this test may be removable." - - with captured_logger( - "distributed.worker", level=logging.WARNING - ) as wlogger, captured_logger( - "distributed.scheduler", level=logging.WARNING - ) as slogger: - with freeze_batched_send(s.stream_comms[a.address]): - await s.remove_worker(a.address, stimulus_id="foo") - assert not s.workers - - # The scheduler has removed the worker state, but the close message has - # not reached the worker yet. - assert a.status == Status.running, assumption_msg - assert a.periodic_callbacks["heartbeat"].is_running(), assumption_msg - - # The heartbeat PeriodicCallback is still running, so one _could_ fire - # before the `op: close` message reaches the worker. We simulate that explicitly. - await a.heartbeat() - - # The heartbeat receives a `status: missing` from the scheduler, so it - # closes the worker. Heartbeats aren't sent over batched comms, so - # `freeze_batched_send` doesn't affect them. - assert a.status == Status.closed - - assert "Scheduler was unaware of this worker" in wlogger.getvalue() - assert "Received heartbeat from unregistered worker" in slogger.getvalue() - - assert not s.workers - - -@pytest.mark.slow @gen_cluster( client=True, nthreads=[("", 1)], - Worker=Nanny, - worker_kwargs={"heartbeat_interval": "1ms"}, + worker_kwargs={"heartbeat_interval": "10ms"}, ) -async def test_heartbeat_missing_restarts(c, s, n): - old_heartbeat_handler = s.handlers["heartbeat_worker"] - s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"} - - assert n.process - await n.process.stopped.wait() - - assert not s.workers - s.handlers["heartbeat_worker"] = old_heartbeat_handler - - await n.process.running.wait() - assert n.status == Status.running +async def test_heartbeat_missing(c, s, a): + with ( + captured_logger("distributed.scheduler", level=logging.WARNING) as slog, + captured_logger("distributed.worker", level=logging.INFO) as wlog, + ): + await s.remove_worker(a.address, close=False, stimulus_id="test") + while "Received heartbeat from unregistered worker" not in slog.getvalue(): + await asyncio.sleep(0.01) + await asyncio.sleep(0.2) - await c.wait_for_workers(1) + assert slog.getvalue().count("Received heartbeat from unregistered worker") == 1 + assert wlog.getvalue().count("Stopping heartbeat") == 1 + assert a.status == Status.running + assert a.address not in s.workers @gen_cluster(nthreads=[]) diff --git a/distributed/worker.py b/distributed/worker.py index 10aa85908eb..52814af16c7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1265,44 +1265,33 @@ async def heartbeat(self) -> None: if hasattr(extension, "heartbeat") }, ) - end = time() middle = (start + end) / 2 - self._update_latency(end - start) if response["status"] == "missing": # Scheduler thought we left. - # Reconnection is not supported, so just shut down. - - if self.status == Status.closing_gracefully: - # Called Scheduler.retire_workers(remove=True, close_workers=False) - # The worker will remain indefinitely in this state, unknown to the - # scheduler, until something else shuts it down. - # Stopping the heartbeat is just a nice-to-have to reduce - # unnecessary warnings on the scheduler log. - logger.info("Stopping heartbeat to the scheduler") - self.periodic_callbacks["heartbeat"].stop() - else: - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. " - "Shutting down." - ) - # Have the nanny restart us if possible - await self.close(nanny=False, reason="worker-heartbeat-missing") + # This happens after one calls Scheduler.remove_worker(close=False). + # ***DO NOT call close()!*** + # Read: https://github.com/dask/distributed/pull/8522 + logger.info("Stopping heartbeat to the scheduler") + self.periodic_callbacks["heartbeat"].stop() return - self.scheduler_delay = response["time"] - middle self.periodic_callbacks["heartbeat"].callback_time = ( response["heartbeat-interval"] * 1000 ) + self.scheduler_delay = response["time"] - middle self.bandwidth_workers.clear() self.bandwidth_types.clear() + except OSError: - logger.exception("Failed to communicate with scheduler during heartbeat.") - except Exception: - logger.exception("Unexpected exception during heartbeat. Closing worker.") - await self.close(reason="worker-heartbeat-error") + # Hopefully a temporary network failure, or maybe the scheduler' CPU is at + # 100%. DO NOT call close() and try again later. Keep the worker alive for + # as long as the kernel keeps the batched comms TCP channel open. + logger.warning("Failed to communicate with scheduler during heartbeat") + except Exception: # pragma: nocover + logger.exception("Unexpected exception during heartbeat") raise @fail_hard From 4a6582b4801995badb27a25af186de480ebccd38 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 13:09:11 +0000 Subject: [PATCH 3/9] lint --- distributed/tests/test_worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 54ebc59c284..b64d3bf8b4f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -62,7 +62,6 @@ captured_logger, dec, div, - freeze_batched_send, freeze_data_fetching, gen_cluster, gen_test, From ad2c3b1a2751e4e9871e55e67ea71fdf645d96ec Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 13:11:05 +0000 Subject: [PATCH 4/9] re-add close on unexpected exception --- distributed/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/worker.py b/distributed/worker.py index 52814af16c7..8ce4b2d1398 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1292,6 +1292,7 @@ async def heartbeat(self) -> None: logger.warning("Failed to communicate with scheduler during heartbeat") except Exception: # pragma: nocover logger.exception("Unexpected exception during heartbeat") + await self.close(reason="worker-heartbeat-exception") raise @fail_hard From 013f5f98006e0c958518f6922957de30c310fa97 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 16:30:15 +0000 Subject: [PATCH 5/9] Revert "re-add close on unexpected exception" This reverts commit ad2c3b1a2751e4e9871e55e67ea71fdf645d96ec. --- distributed/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 8ce4b2d1398..52814af16c7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1292,7 +1292,6 @@ async def heartbeat(self) -> None: logger.warning("Failed to communicate with scheduler during heartbeat") except Exception: # pragma: nocover logger.exception("Unexpected exception during heartbeat") - await self.close(reason="worker-heartbeat-exception") raise @fail_hard From efa635a6cb5e5f63400d7e0e57732dd04f062eae Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 16:30:19 +0000 Subject: [PATCH 6/9] Revert "lint" This reverts commit 4a6582b4801995badb27a25af186de480ebccd38. --- distributed/tests/test_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b64d3bf8b4f..54ebc59c284 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -62,6 +62,7 @@ captured_logger, dec, div, + freeze_batched_send, freeze_data_fetching, gen_cluster, gen_test, From 11f91c9084ad99e72658e40b958dfed6d0f7036f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 16:30:24 +0000 Subject: [PATCH 7/9] Revert "Code review" This reverts commit 1b4e6d246c7f0019075ff80f7189017f4e0779a1. --- distributed/tests/test_nanny.py | 30 +++++++++++ distributed/tests/test_worker.py | 89 +++++++++++++++++++++++++++----- distributed/worker.py | 37 ++++++++----- 3 files changed, 129 insertions(+), 27 deletions(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 74d049a35d4..950c66bc16a 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -956,3 +956,33 @@ async def test_nanny_plugin_register_nanny_killed(c, s, restart): finally: proc.kill() assert await register == {} + + +@pytest.mark.slow +@gen_cluster( + client=True, + Worker=Nanny, + nthreads=[("", 1)], + worker_kwargs={"heartbeat_interval": "10ms"}, +) +async def test_nanny_does_not_restart_worker_on_graceful_retirement(c, s, a): + """Tests https://github.com/dask/distributed/pull/8522 + + Some clusters (e.g. SpecCluster) implement downscaling by calling + `Scheduler.retire_workers()` without arguments, which defaults to + `remove=True, close_workers=False`. + + and then use an external system to tear down the worker and the nanny. In these + cases, make sure that the worker doesn't kill itself and that the nanny doesn't + restart it after the heartbeat to the scheduler fails. + """ + await s.retire_workers([a.worker_address], stimulus_id="test") + # On Linux, it takes ~3.5s for the nanny to resuscitate a worker + await asyncio.sleep(5) + assert not s.workers + events = [ + ev + for _, ev in s.events["all"] + if isinstance(ev, dict) and ev.get("action") == "add-worker" + ] + assert len(events) == 1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 54ebc59c284..f67cd4c26f8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1766,27 +1766,88 @@ def bad_heartbeat_worker(*args, **kwargs): assert w.status == Status.running logs = logger.getvalue() assert "Failed to communicate with scheduler during heartbeat" in logs + assert "Traceback" in logs +@gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"}) +async def test_heartbeat_missing(s, a, monkeypatch): + async def missing_heartbeat_worker(*args, **kwargs): + return {"status": "missing"} + + with captured_logger("distributed.worker", level=logging.WARNING) as wlogger: + monkeypatch.setattr(a.scheduler, "heartbeat_worker", missing_heartbeat_worker) + await a.heartbeat() + assert a.status == Status.closed + assert "Scheduler was unaware of this worker" in wlogger.getvalue() + + while s.workers: + await asyncio.sleep(0.01) + + +@gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"}) +async def test_heartbeat_missing_real_cluster(s, a): + # The idea here is to create a situation where `s.workers[a.address]`, + # doesn't exist, but the worker is not yet closed and can still heartbeat. + # Ideally, this state would be impossible, and this test would be removed, + # and the `status: missing` handling would be replaced with an assertion error. + # However, `Scheduler.remove_worker` and `Worker.close` both currently leave things + # in degenerate, half-closed states while they're running (and yielding control + # via `await`). + # When https://github.com/dask/distributed/issues/6390 is fixed, this should no + # longer be possible. + + assumption_msg = "Test assumptions have changed. Race condition may have been fixed; this test may be removable." + + with captured_logger( + "distributed.worker", level=logging.WARNING + ) as wlogger, captured_logger( + "distributed.scheduler", level=logging.WARNING + ) as slogger: + with freeze_batched_send(s.stream_comms[a.address]): + await s.remove_worker(a.address, stimulus_id="foo") + assert not s.workers + + # The scheduler has removed the worker state, but the close message has + # not reached the worker yet. + assert a.status == Status.running, assumption_msg + assert a.periodic_callbacks["heartbeat"].is_running(), assumption_msg + + # The heartbeat PeriodicCallback is still running, so one _could_ fire + # before the `op: close` message reaches the worker. We simulate that explicitly. + await a.heartbeat() + + # The heartbeat receives a `status: missing` from the scheduler, so it + # closes the worker. Heartbeats aren't sent over batched comms, so + # `freeze_batched_send` doesn't affect them. + assert a.status == Status.closed + + assert "Scheduler was unaware of this worker" in wlogger.getvalue() + assert "Received heartbeat from unregistered worker" in slogger.getvalue() + + assert not s.workers + + +@pytest.mark.slow @gen_cluster( client=True, nthreads=[("", 1)], - worker_kwargs={"heartbeat_interval": "10ms"}, + Worker=Nanny, + worker_kwargs={"heartbeat_interval": "1ms"}, ) -async def test_heartbeat_missing(c, s, a): - with ( - captured_logger("distributed.scheduler", level=logging.WARNING) as slog, - captured_logger("distributed.worker", level=logging.INFO) as wlog, - ): - await s.remove_worker(a.address, close=False, stimulus_id="test") - while "Received heartbeat from unregistered worker" not in slog.getvalue(): - await asyncio.sleep(0.01) - await asyncio.sleep(0.2) +async def test_heartbeat_missing_restarts(c, s, n): + old_heartbeat_handler = s.handlers["heartbeat_worker"] + s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"} + + assert n.process + await n.process.stopped.wait() + + assert not s.workers + s.handlers["heartbeat_worker"] = old_heartbeat_handler - assert slog.getvalue().count("Received heartbeat from unregistered worker") == 1 - assert wlog.getvalue().count("Stopping heartbeat") == 1 - assert a.status == Status.running - assert a.address not in s.workers + await n.process.running.wait() + assert n.status == Status.running + + await c.wait_for_workers(1) @gen_cluster(nthreads=[]) diff --git a/distributed/worker.py b/distributed/worker.py index 52814af16c7..10aa85908eb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1265,33 +1265,44 @@ async def heartbeat(self) -> None: if hasattr(extension, "heartbeat") }, ) + end = time() middle = (start + end) / 2 + self._update_latency(end - start) if response["status"] == "missing": # Scheduler thought we left. - # This happens after one calls Scheduler.remove_worker(close=False). - # ***DO NOT call close()!*** - # Read: https://github.com/dask/distributed/pull/8522 - logger.info("Stopping heartbeat to the scheduler") - self.periodic_callbacks["heartbeat"].stop() + # Reconnection is not supported, so just shut down. + + if self.status == Status.closing_gracefully: + # Called Scheduler.retire_workers(remove=True, close_workers=False) + # The worker will remain indefinitely in this state, unknown to the + # scheduler, until something else shuts it down. + # Stopping the heartbeat is just a nice-to-have to reduce + # unnecessary warnings on the scheduler log. + logger.info("Stopping heartbeat to the scheduler") + self.periodic_callbacks["heartbeat"].stop() + else: + logger.error( + f"Scheduler was unaware of this worker {self.address!r}. " + "Shutting down." + ) + # Have the nanny restart us if possible + await self.close(nanny=False, reason="worker-heartbeat-missing") return + self.scheduler_delay = response["time"] - middle self.periodic_callbacks["heartbeat"].callback_time = ( response["heartbeat-interval"] * 1000 ) - self.scheduler_delay = response["time"] - middle self.bandwidth_workers.clear() self.bandwidth_types.clear() - except OSError: - # Hopefully a temporary network failure, or maybe the scheduler' CPU is at - # 100%. DO NOT call close() and try again later. Keep the worker alive for - # as long as the kernel keeps the batched comms TCP channel open. - logger.warning("Failed to communicate with scheduler during heartbeat") - except Exception: # pragma: nocover - logger.exception("Unexpected exception during heartbeat") + logger.exception("Failed to communicate with scheduler during heartbeat.") + except Exception: + logger.exception("Unexpected exception during heartbeat. Closing worker.") + await self.close(reason="worker-heartbeat-error") raise @fail_hard From 3cbde57c8ee15a7bf697dc0c4f9a2b58e93d3fca Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 17:01:13 +0000 Subject: [PATCH 8/9] Call self.close(nanny=True) --- distributed/tests/test_cancelled_state.py | 13 ++++++++-- distributed/tests/test_nanny.py | 30 ----------------------- distributed/tests/test_worker.py | 10 +++----- distributed/worker.py | 25 ++++++------------- 4 files changed, 23 insertions(+), 55 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 8daa83bf871..790a52fc575 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -169,7 +169,11 @@ def blockable_compute(x, lock): with lock: return x + 1 - async with BrokenWorker(s.address) as a: + async with BrokenWorker( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: await c.wait_for_workers(2) fut1 = c.submit( blockable_compute, @@ -265,7 +269,12 @@ def test_flight_cancelled_error(ws): assert not ws.tasks -@gen_cluster(client=True, nthreads=[("", 1)]) +@gen_cluster( + client=True, + nthreads=[("", 1)], + # heartbeat will close a worker after remove_worker(close=False) + worker_kwargs={"heartbeat_interval": "100s"}, +) async def test_in_flight_lost_after_resumed(c, s, b): async with BlockedGetData(s.address) as a: fut1 = c.submit(inc, 1, workers=[a.address], key="fut1") diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 950c66bc16a..74d049a35d4 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -956,33 +956,3 @@ async def test_nanny_plugin_register_nanny_killed(c, s, restart): finally: proc.kill() assert await register == {} - - -@pytest.mark.slow -@gen_cluster( - client=True, - Worker=Nanny, - nthreads=[("", 1)], - worker_kwargs={"heartbeat_interval": "10ms"}, -) -async def test_nanny_does_not_restart_worker_on_graceful_retirement(c, s, a): - """Tests https://github.com/dask/distributed/pull/8522 - - Some clusters (e.g. SpecCluster) implement downscaling by calling - `Scheduler.retire_workers()` without arguments, which defaults to - `remove=True, close_workers=False`. - - and then use an external system to tear down the worker and the nanny. In these - cases, make sure that the worker doesn't kill itself and that the nanny doesn't - restart it after the heartbeat to the scheduler fails. - """ - await s.retire_workers([a.worker_address], stimulus_id="test") - # On Linux, it takes ~3.5s for the nanny to resuscitate a worker - await asyncio.sleep(5) - assert not s.workers - events = [ - ev - for _, ev in s.events["all"] - if isinstance(ev, dict) and ev.get("action") == "add-worker" - ] - assert len(events) == 1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f67cd4c26f8..cfe6941bb8e 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1834,7 +1834,8 @@ async def test_heartbeat_missing_real_cluster(s, a): Worker=Nanny, worker_kwargs={"heartbeat_interval": "1ms"}, ) -async def test_heartbeat_missing_restarts(c, s, n): +async def test_heartbeat_missing_doesnt_restart(c, s, n): + """Read: https://github.com/dask/distributed/pull/8522""" old_heartbeat_handler = s.handlers["heartbeat_worker"] s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"} @@ -1843,11 +1844,8 @@ async def test_heartbeat_missing_restarts(c, s, n): assert not s.workers s.handlers["heartbeat_worker"] = old_heartbeat_handler - - await n.process.running.wait() - assert n.status == Status.running - - await c.wait_for_workers(1) + assert n.status == Status.closing_gracefully + assert not n.process.is_alive() @gen_cluster(nthreads=[]) diff --git a/distributed/worker.py b/distributed/worker.py index 10aa85908eb..396ae203925 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1273,23 +1273,14 @@ async def heartbeat(self) -> None: if response["status"] == "missing": # Scheduler thought we left. - # Reconnection is not supported, so just shut down. - - if self.status == Status.closing_gracefully: - # Called Scheduler.retire_workers(remove=True, close_workers=False) - # The worker will remain indefinitely in this state, unknown to the - # scheduler, until something else shuts it down. - # Stopping the heartbeat is just a nice-to-have to reduce - # unnecessary warnings on the scheduler log. - logger.info("Stopping heartbeat to the scheduler") - self.periodic_callbacks["heartbeat"].stop() - else: - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. " - "Shutting down." - ) - # Have the nanny restart us if possible - await self.close(nanny=False, reason="worker-heartbeat-missing") + # This is a common race condition when the scheduler calls + # remove_worker(); there can be a heartbeat between when the scheduler + # removes the worker on its side and when the {"op": "close"} command + # arrives through batched comms to the worker. + logger.warning("Scheduler was unaware of this worker; shutting down.") + # We close here just for safety's sake - the {op: close} should + # arrive soon anyway. + await self.close(reason="worker-heartbeat-missing") return self.scheduler_delay = response["time"] - middle From d9673300272ad4dcdbb57606a36b8b521077416c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 23 Feb 2024 17:03:55 +0000 Subject: [PATCH 9/9] tweak test --- distributed/tests/test_cancelled_state.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 790a52fc575..982947b0bd6 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -269,14 +269,13 @@ def test_flight_cancelled_error(ws): assert not ws.tasks -@gen_cluster( - client=True, - nthreads=[("", 1)], - # heartbeat will close a worker after remove_worker(close=False) - worker_kwargs={"heartbeat_interval": "100s"}, -) +@gen_cluster(client=True, nthreads=[("", 1)]) async def test_in_flight_lost_after_resumed(c, s, b): - async with BlockedGetData(s.address) as a: + async with BlockedGetData( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: fut1 = c.submit(inc, 1, workers=[a.address], key="fut1") # Ensure fut1 is in memory but block any further execution afterwards to # ensure we control when the recomputation happens