Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7091,12 +7091,11 @@ async def retire_workers(
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
Whether to actually close the worker explicitly from here.
Otherwise, we expect some external job scheduler to finish off the worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us.
Whether to remove the worker metadata immediately or else wait for the
worker to contact us.

If close_workers=False and remove=False, this method just flushes the tasks
in memory out of the workers and then returns.
Expand Down
12 changes: 10 additions & 2 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ def blockable_compute(x, lock):
with lock:
return x + 1

async with BrokenWorker(s.address) as a:
async with BrokenWorker(
s.address,
# heartbeat will close a worker after remove_worker(close=False)
heartbeat_interval="100s",
) as a:
await c.wait_for_workers(2)
fut1 = c.submit(
blockable_compute,
Expand Down Expand Up @@ -267,7 +271,11 @@ def test_flight_cancelled_error(ws):

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_in_flight_lost_after_resumed(c, s, b):
async with BlockedGetData(s.address) as a:
async with BlockedGetData(
s.address,
# heartbeat will close a worker after remove_worker(close=False)
heartbeat_interval="100s",
) as a:
fut1 = c.submit(inc, 1, workers=[a.address], key="fut1")
# Ensure fut1 is in memory but block any further execution afterwards to
# ensure we control when the recomputation happens
Expand Down
10 changes: 4 additions & 6 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,8 @@ async def test_heartbeat_missing_real_cluster(s, a):
Worker=Nanny,
worker_kwargs={"heartbeat_interval": "1ms"},
)
async def test_heartbeat_missing_restarts(c, s, n):
async def test_heartbeat_missing_doesnt_restart(c, s, n):
"""Read: https://github.com/dask/distributed/pull/8522"""
old_heartbeat_handler = s.handlers["heartbeat_worker"]
s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"}

Expand All @@ -1843,11 +1844,8 @@ async def test_heartbeat_missing_restarts(c, s, n):

assert not s.workers
s.handlers["heartbeat_worker"] = old_heartbeat_handler

await n.process.running.wait()
assert n.status == Status.running

await c.wait_for_workers(1)
assert n.status == Status.closing_gracefully
assert not n.process.is_alive()


@gen_cluster(nthreads=[])
Expand Down
17 changes: 10 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,12 +1272,15 @@ async def heartbeat(self) -> None:
self._update_latency(end - start)

if response["status"] == "missing":
# Scheduler thought we left. Reconnection is not supported, so just shut down.
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
)
# Something is out of sync; have the nanny restart us if possible.
await self.close(nanny=False)
Comment on lines -1279 to -1280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read your description correctly, this is the bug. what does it even mean that "something is out of sync"? In which cases would be want this worker to be restarted like this?

The heartbeats only start once the worker is registered to the scheduler, see

await self._register_with_scheduler()
self.start_periodic_callbacks()

I don't see what kind of "desync" would justify a restart and adding more complexity to this logic feels like trouble. Your test also passes if we just shut down the nanny as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is designed to verify that the worker is not accidentally restarted after it's retired. If something kills off the worker and the nanny, it will not perturb the test.

In which cases would be want this worker to be restarted like this?

I cannot come up with use cases. I'll remove the branch and see if anything breaks.

# Scheduler thought we left.
# This is a common race condition when the scheduler calls
# remove_worker(); there can be a heartbeat between when the scheduler
# removes the worker on its side and when the {"op": "close"} command
# arrives through batched comms to the worker.
logger.warning("Scheduler was unaware of this worker; shutting down.")
# We close here just for safety's sake - the {op: close} should
# arrive soon anyway.
await self.close(reason="worker-heartbeat-missing")
return

self.scheduler_delay = response["time"] - middle
Expand All @@ -1290,7 +1293,7 @@ async def heartbeat(self) -> None:
logger.exception("Failed to communicate with scheduler during heartbeat.")
except Exception:
logger.exception("Unexpected exception during heartbeat. Closing worker.")
await self.close()
await self.close(reason="worker-heartbeat-error")
raise

@fail_hard
Expand Down