Skip to content

Conversation

@hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Dec 15, 2023

Follow-up to #361 and dask-expr's version of dask/distributed#8416

@phofl
Copy link
Collaborator

phofl commented Dec 15, 2023

Can you explain what this is doing?

@phofl
Copy link
Collaborator

phofl commented Dec 15, 2023

My understanding: We are not reusing shuffles for merges anymore (not a fan of fwiw), to solve issues when a worker gets killed (spot replacement for example) between the merges. But does this actually solve the underlying problem in the sense that it's also resilient against workers leaving during the merge?

@hendrikmakait
Copy link
Member Author

Generally speaking, merge relies on the coordination between the input shuffles such that they end up putting matching output partitions on the same workers. This coordination is manageable for an individual merge. With reuse, we suddenly have to coordinate all the shuffles used in several merges. If that does not work, the merge will deadlock (we could also add a kill switch but that's only marginally better). I am convinced that I can draw up scenarios where this fails right now. I'm not yet sure whether it's at all possible to coordinate within a reasonable amount of time, at the very least it would add significant complexity.

@hendrikmakait
Copy link
Member Author

Note that this is unrelated to workers leaving, it is about workers joining. In the case of workers leaving, the usual P2P restart mechanism kicks in.

@phofl
Copy link
Collaborator

phofl commented Dec 18, 2023

Are there other cases where this could deadlock except workers joining then?

If not, can we somehow pass the number of workers we want to operate on into the function that moves the output partitions to the workers?

That's not possible right now, but we should be able to do this when we generate the graph on the scheduler.

@hendrikmakait
Copy link
Member Author

Are there other cases where this could deadlock except workers joining then?

None that I am aware of.

If not, can we somehow pass the number of workers we want to operate on into the function that moves the output partitions to the workers?

That's not possible right now, but we should be able to do this when we generate the graph on the scheduler.

Possibly, I guess? As you said, it's not possible right now and would depend on the implementation of materialization on the scheduler. I generally expect us to beable to leverage the expression DAG for more optimizations. Note that reuse also doesn't work with diskless P2P (which is still experimental).

@phofl phofl merged commit 60e4cd7 into dask:main Dec 18, 2023
@phofl
Copy link
Collaborator

phofl commented Dec 18, 2023

thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants