From b21f094b4ea39361405ccf0941e8f1ebff83e3b6 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 26 Oct 2023 11:52:10 +0200 Subject: [PATCH 1/4] Reuse shuffle in P2P hash joinif it's exactly the same --- dask_expr/_merge.py | 11 ++++++---- dask_expr/tests/test_distributed.py | 31 ++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index 01a1c9e3d..f259dfca0 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -20,7 +20,7 @@ _contains_index_name, _select_columns_or_index, ) -from dask_expr._util import DASK_GT_20231000, _convert_to_list +from dask_expr._util import DASK_GT_20231000, _convert_to_list, _tokenize_deterministic _HASH_COLUMN_NAME = "__hash_partition" _PARTITION_COLUMN = "_partitions" @@ -428,9 +428,12 @@ def _layer(self) -> dict: from distributed.shuffle._shuffle import shuffle_barrier dsk = {} - token = self._name.split("-")[-1] - token_left = token + "-left" - token_right = token + "-right" + token_left = _tokenize_deterministic( + self.left._name, self.shuffle_left_on, self.npartitions, self._partitions + ) + token_right = _tokenize_deterministic( + self.right._name, self.shuffle_right_on, self.npartitions, self._partitions + ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 4bd59860a..8b301c401 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -73,7 +73,7 @@ async def test_self_merge_p2p_shuffle(c, s, a, b): @gen_cluster(client=True) -async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): +async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b): pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)}) pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) ddf1 = from_pandas(pdf1, npartitions=5) @@ -81,8 +81,8 @@ async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): out = ( ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p") - .repartition(20) - .merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p") + # Vary the number of output partitions for the shuffles of dd2 + .repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p") ) # Generate unique shuffle IDs if the input frame is the same but parameters differ assert sum(id_from_key(k) is not None for k in out.dask) == 4 @@ -96,6 +96,31 @@ async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b): ) +@gen_cluster(client=True) +async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b): + pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)}) + pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) + ddf1 = from_pandas(pdf1, npartitions=5) + ddf2 = from_pandas(pdf2, npartitions=10) + + out = ddf2.merge( + ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p"), + left_on="x", + right_on="b", + shuffle_backend="p2p", + ) + # Generate the same shuffle IDs if the input frame is the same and all its parameters match + assert sum(id_from_key(k) is not None for k in out.dask) == 3 + x = await c.compute(out) + expected = pdf2.merge( + pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b" + ) + lib.testing.assert_frame_equal( + x.sort_values("a", ignore_index=True), + expected.sort_values("a", ignore_index=True), + ) + + @pytest.mark.parametrize("npartitions_left", [5, 6]) @gen_cluster(client=True) async def test_index_merge_p2p_shuffle(c, s, a, b, npartitions_left): From 7def6a40440b2eac33649b2948032ee851749282 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 26 Oct 2023 13:12:21 +0200 Subject: [PATCH 2/4] Better safe than sorry --- dask_expr/_merge.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index f259dfca0..3c9be96a6 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -429,10 +429,18 @@ def _layer(self) -> dict: dsk = {} token_left = _tokenize_deterministic( - self.left._name, self.shuffle_left_on, self.npartitions, self._partitions + "hash-join", + self.left._name, + self.shuffle_left_on, + self.npartitions, + self._partitions, ) token_right = _tokenize_deterministic( - self.right._name, self.shuffle_right_on, self.npartitions, self._partitions + "hash-join", + self.right._name, + self.shuffle_right_on, + self.npartitions, + self._partitions, ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) From 149fc46d1e9682b837ae9f4df7dcd4368dd50415 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 26 Oct 2023 15:17:59 +0200 Subject: [PATCH 3/4] Comment --- dask_expr/tests/test_distributed.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 8b301c401..4963da45c 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -103,8 +103,16 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, ddf1 = from_pandas(pdf1, npartitions=5) ddf2 = from_pandas(pdf2, npartitions=10) + # This performs two shuffles: + # * ddf1 is shuffled on `a` + # * ddf2 is shuffled on `x` + ddf3 = (ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p"),) + + # This performs one shuffle: + # * ddf3 is shuffled on `b` + # We can reuse the shuffle of dd2 on `x` from the previous merge. out = ddf2.merge( - ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p"), + ddf3, left_on="x", right_on="b", shuffle_backend="p2p", From 2475f966157df8cd944893e0cc74e2d11e86ed26 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 27 Oct 2023 16:29:37 +0200 Subject: [PATCH 4/4] Update test_distributed.py --- dask_expr/tests/test_distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 4963da45c..d7257cb12 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -106,7 +106,7 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, # This performs two shuffles: # * ddf1 is shuffled on `a` # * ddf2 is shuffled on `x` - ddf3 = (ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p"),) + ddf3 = ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p") # This performs one shuffle: # * ddf3 is shuffled on `b`