Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,18 +535,22 @@ def _layer(self) -> dict:

dsk = {}
token_left = _tokenize_deterministic(
"hash-join",
# Include self._name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self._name,
self.left._name,
self.shuffle_left_on,
self.npartitions,
self._partitions,
self.left_index,
)
token_right = _tokenize_deterministic(
"hash-join",
# Include self._name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self._name,
self.right._name,
self.shuffle_right_on,
self.npartitions,
self._partitions,
self.right_index,
)
_barrier_key_left = barrier_key(ShuffleId(token_left))
_barrier_key_right = barrier_key(ShuffleId(token_right))
Expand Down
10 changes: 7 additions & 3 deletions dask_expr/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s
ddf2, left_on="b", right_on="x", shuffle_backend="p2p"
)
)
# Generate unique shuffle IDs if the input frame is the same but parameters differ
# Generate unique shuffle IDs if the input frame is the same but
# parameters differ. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
x = await c.compute(out)
expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge(
Expand Down Expand Up @@ -169,8 +171,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a,
right_on="b",
shuffle_backend="p2p",
)
# Generate the same shuffle IDs if the input frame is the same and all its parameters match
assert sum(id_from_key(k) is not None for k in out.dask) == 3
# Generate unique shuffle IDs if the input frame is the same and all its
# parameters match. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
x = await c.compute(out)
expected = pdf2.merge(
pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"
Expand Down