diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index a3b631832..87b799da0 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -535,18 +535,22 @@ def _layer(self) -> dict: dsk = {} token_left = _tokenize_deterministic( - "hash-join", + # Include self._name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self._name, self.left._name, self.shuffle_left_on, - self.npartitions, - self._partitions, + self.left_index, ) token_right = _tokenize_deterministic( - "hash-join", + # Include self._name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self._name, self.right._name, self.shuffle_right_on, - self.npartitions, - self._partitions, + self.right_index, ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 9fa02114d..e83a9ec44 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -136,7 +136,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s ddf2, left_on="b", right_on="x", shuffle_backend="p2p" ) ) - # Generate unique shuffle IDs if the input frame is the same but parameters differ + # Generate unique shuffle IDs if the input frame is the same but + # parameters differ. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. assert sum(id_from_key(k) is not None for k in out.dask) == 4 x = await c.compute(out) expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge( @@ -169,8 +171,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, right_on="b", shuffle_backend="p2p", ) - # Generate the same shuffle IDs if the input frame is the same and all its parameters match - assert sum(id_from_key(k) is not None for k in out.dask) == 3 + # Generate unique shuffle IDs if the input frame is the same and all its + # parameters match. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. + assert sum(id_from_key(k) is not None for k in out.dask) == 4 x = await c.compute(out) expected = pdf2.merge( pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"