Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3082,8 +3082,12 @@ async def gather_dep(
pdb.set_trace()
msg = error_message(e)
for k in self.in_flight_workers[worker]:
ts = self.tasks[k]
recommendations[ts] = tuple(msg.values())
try:
ts = self.tasks[k]
except KeyError:
continue
else:
recommendations[ts] = tuple(msg.values())
raise
finally:
self.comm_nbytes -= total_nbytes
Expand All @@ -3102,7 +3106,11 @@ async def gather_dep(
refresh_who_has = set()

for d in self.in_flight_workers.pop(worker):
ts = self.tasks[d]
try:
ts = self.tasks[d]
except KeyError:
continue
Comment on lines -3105 to +3112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks are not allowed to be released while in flight. This is why I introduced the cancelled state. Releasing at this stage, i.e. forgetting can lead to all sorts of race conditions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you make of the attached story? It seems like they are being forgotten in between being placed in in_flight_workers and this stage (there is an await in there, during which I expect pretty much anything can happen). It seems like they're getting cancelled, then resumed, then released and forgotten all in the time span created by the await above.

("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'flight', 'released', 'cancelled', {}, 'processing-released-1650891186.0754924', 1650891186.16295)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'compute-task', 'compute-task-1650891186.392905', 1650891186.4060445)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'cancelled', 'waiting', 'cancelled', {"('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)": ('resumed', 'waiting')}, 'compute-task-1650891186.392905', 1650891186.40608)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'cancelled', 'resumed', 'resumed', {}, 'compute-task-1650891186.392905', 1650891186.406102)
('free-keys', ("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)",), 'processing-released-1650891188.465502', 1650891188.5917683)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'release-key', 'processing-released-1650891188.465502', 1650891188.5917764)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'resumed', 'released', 'released', {"('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)": 'forgotten'}, 'processing-released-1650891188.465502', 1650891188.5917976)
("('rechunk-split-82117560f6f829a7fa07bfef62cff7d5', 1006)", 'released', 'forgotten', 'forgotten', {}, 'processing-released-1650891188.465502', 1650891188.591806)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that story is very helpful. There are a couple of signals

  • Fetch Task (-> flight; not shown)
  • Release -> Cancel (i.e. safe guard against forgetting)
  • Compute Task (-> resumed task)
  • Free keys

This free keys is recommending a release of the task which in turn triggers a resumed -> released transition which has no safe guard implemented to verify that the task isn't indeed still being used. I.e.

("resumed", "released"): self.transition_generic_released,

should not be using transition_generic_released but a specialized version.

That should even be reasonably easy to reproduce

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. As a heads-up I'm pausing work here to go play with #6206 . The extent to which you can take this over would be welcome.

The full story is in the linked issue if that provides any more insight.


ts.done = True
if d in cancelled_keys:
if ts.state == "cancelled":
Expand Down