From d5eec32d122da245cf1073891eb8ca293215ecab Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 25 Apr 2022 14:24:49 -0500 Subject: [PATCH] Pass on in-flight transfers if they are already released A task that is released while still in-flight will stick around in the in_flight_workers state. This can cause a KeyError when gather_dep goes to handle this now-missing-task. The correct thing to do in this case is to just ignore the key, which is what this commit does. We could also keep in_flight_workers up-to-date on a release transition. I'm not sure how valuable this would be apart from passing. I'm open to either approach. This was the easiest. Fixes https://github.com/dask/distributed/issues/6194 --- distributed/worker.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index c935be8ef17..81cda8d6142 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3082,8 +3082,12 @@ async def gather_dep( pdb.set_trace() msg = error_message(e) for k in self.in_flight_workers[worker]: - ts = self.tasks[k] - recommendations[ts] = tuple(msg.values()) + try: + ts = self.tasks[k] + except KeyError: + continue + else: + recommendations[ts] = tuple(msg.values()) raise finally: self.comm_nbytes -= total_nbytes @@ -3102,7 +3106,11 @@ async def gather_dep( refresh_who_has = set() for d in self.in_flight_workers.pop(worker): - ts = self.tasks[d] + try: + ts = self.tasks[d] + except KeyError: + continue + ts.done = True if d in cancelled_keys: if ts.state == "cancelled":