diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bebfbc2109b..8bf58a3a038 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7091,12 +7091,11 @@ async def retire_workers( If neither ``workers`` nor ``names`` are provided, we call ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) - Whether or not to actually close the worker explicitly from here. - Otherwise we expect some external job scheduler to finish off the - worker. + Whether to actually close the worker explicitly from here. + Otherwise, we expect some external job scheduler to finish off the worker. remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us. + Whether to remove the worker metadata immediately or else wait for the + worker to contact us. If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 8daa83bf871..982947b0bd6 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -169,7 +169,11 @@ def blockable_compute(x, lock): with lock: return x + 1 - async with BrokenWorker(s.address) as a: + async with BrokenWorker( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: await c.wait_for_workers(2) fut1 = c.submit( blockable_compute, @@ -267,7 +271,11 @@ def test_flight_cancelled_error(ws): @gen_cluster(client=True, nthreads=[("", 1)]) async def test_in_flight_lost_after_resumed(c, s, b): - async with BlockedGetData(s.address) as a: + async with BlockedGetData( + s.address, + # heartbeat will close a worker after remove_worker(close=False) + heartbeat_interval="100s", + ) as a: fut1 = c.submit(inc, 1, workers=[a.address], key="fut1") # Ensure fut1 is in memory but block any further execution afterwards to # ensure we control when the recomputation happens diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f67cd4c26f8..cfe6941bb8e 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1834,7 +1834,8 @@ async def test_heartbeat_missing_real_cluster(s, a): Worker=Nanny, worker_kwargs={"heartbeat_interval": "1ms"}, ) -async def test_heartbeat_missing_restarts(c, s, n): +async def test_heartbeat_missing_doesnt_restart(c, s, n): + """Read: https://github.com/dask/distributed/pull/8522""" old_heartbeat_handler = s.handlers["heartbeat_worker"] s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"} @@ -1843,11 +1844,8 @@ async def test_heartbeat_missing_restarts(c, s, n): assert not s.workers s.handlers["heartbeat_worker"] = old_heartbeat_handler - - await n.process.running.wait() - assert n.status == Status.running - - await c.wait_for_workers(1) + assert n.status == Status.closing_gracefully + assert not n.process.is_alive() @gen_cluster(nthreads=[]) diff --git a/distributed/worker.py b/distributed/worker.py index 55dd5a7724f..396ae203925 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1272,12 +1272,15 @@ async def heartbeat(self) -> None: self._update_latency(end - start) if response["status"] == "missing": - # Scheduler thought we left. Reconnection is not supported, so just shut down. - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. Shutting down." - ) - # Something is out of sync; have the nanny restart us if possible. - await self.close(nanny=False) + # Scheduler thought we left. + # This is a common race condition when the scheduler calls + # remove_worker(); there can be a heartbeat between when the scheduler + # removes the worker on its side and when the {"op": "close"} command + # arrives through batched comms to the worker. + logger.warning("Scheduler was unaware of this worker; shutting down.") + # We close here just for safety's sake - the {op: close} should + # arrive soon anyway. + await self.close(reason="worker-heartbeat-missing") return self.scheduler_delay = response["time"] - middle @@ -1290,7 +1293,7 @@ async def heartbeat(self) -> None: logger.exception("Failed to communicate with scheduler during heartbeat.") except Exception: logger.exception("Unexpected exception during heartbeat. Closing worker.") - await self.close() + await self.close(reason="worker-heartbeat-error") raise @fail_hard