diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index 01a1c9e3d..3c9be96a6 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -20,7 +20,7 @@ _contains_index_name, _select_columns_or_index, ) -from dask_expr._util import DASK_GT_20231000, _convert_to_list +from dask_expr._util import DASK_GT_20231000, _convert_to_list, _tokenize_deterministic _HASH_COLUMN_NAME = "__hash_partition" _PARTITION_COLUMN = "_partitions" @@ -428,9 +428,20 @@ def _layer(self) -> dict: from distributed.shuffle._shuffle import shuffle_barrier dsk = {} - token = self._name.split("-")[-1] - token_left = token + "-left" - token_right = token + "-right" + token_left = _tokenize_deterministic( + "hash-join", + self.left._name, + self.shuffle_left_on, + self.npartitions, + self._partitions, + ) + token_right = _tokenize_deterministic( + "hash-join", + self.right._name, + self.shuffle_right_on, + self.npartitions, + self._partitions, + ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 4bd59860a..d7257cb12 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -73,7 +73,7 @@ async def test_self_merge_p2p_shuffle(c, s, a, b): @gen_cluster(client=True) -async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): +async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b): pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)}) pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) ddf1 = from_pandas(pdf1, npartitions=5) @@ -81,8 +81,8 @@ async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): out = ( ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p") - .repartition(20) - .merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p") + # Vary the number of output partitions for the shuffles of dd2 + .repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p") ) # Generate unique shuffle IDs if the input frame is the same but parameters differ assert sum(id_from_key(k) is not None for k in out.dask) == 4 @@ -96,6 +96,39 @@ async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): ) +@gen_cluster(client=True) +async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b): + pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)}) + pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) + ddf1 = from_pandas(pdf1, npartitions=5) + ddf2 = from_pandas(pdf2, npartitions=10) + + # This performs two shuffles: + # * ddf1 is shuffled on `a` + # * ddf2 is shuffled on `x` + ddf3 = ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p") + + # This performs one shuffle: + # * ddf3 is shuffled on `b` + # We can reuse the shuffle of dd2 on `x` from the previous merge. + out = ddf2.merge( + ddf3, + left_on="x", + right_on="b", + shuffle_backend="p2p", + ) + # Generate the same shuffle IDs if the input frame is the same and all its parameters match + assert sum(id_from_key(k) is not None for k in out.dask) == 3 + x = await c.compute(out) + expected = pdf2.merge( + pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b" + ) + lib.testing.assert_frame_equal( + x.sort_values("a", ignore_index=True), + expected.sort_values("a", ignore_index=True), + ) + + @pytest.mark.parametrize("npartitions_left", [5, 6]) @gen_cluster(client=True) async def test_index_merge_p2p_shuffle(c, s, a, b, npartitions_left):