From 8f847448b8a0ae69bc943c623772a5037de107f8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 12:40:31 +0100 Subject: [PATCH 1/4] Do not reuse --- dask_expr/_merge.py | 10 ++-------- dask_expr/tests/test_distributed.py | 10 +++++++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index a3b631832..fa1aa29a8 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -535,18 +535,12 @@ def _layer(self) -> dict: dsk = {} token_left = _tokenize_deterministic( - "hash-join", + self._name, self.left._name, - self.shuffle_left_on, - self.npartitions, - self._partitions, ) token_right = _tokenize_deterministic( - "hash-join", + self._name, self.right._name, - self.shuffle_right_on, - self.npartitions, - self._partitions, ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) diff --git a/dask_expr/tests/test_distributed.py b/dask_expr/tests/test_distributed.py index 9fa02114d..e83a9ec44 100644 --- a/dask_expr/tests/test_distributed.py +++ b/dask_expr/tests/test_distributed.py @@ -136,7 +136,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s ddf2, left_on="b", right_on="x", shuffle_backend="p2p" ) ) - # Generate unique shuffle IDs if the input frame is the same but parameters differ + # Generate unique shuffle IDs if the input frame is the same but + # parameters differ. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. assert sum(id_from_key(k) is not None for k in out.dask) == 4 x = await c.compute(out) expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge( @@ -169,8 +171,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, right_on="b", shuffle_backend="p2p", ) - # Generate the same shuffle IDs if the input frame is the same and all its parameters match - assert sum(id_from_key(k) is not None for k in out.dask) == 3 + # Generate unique shuffle IDs if the input frame is the same and all its + # parameters match. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. + assert sum(id_from_key(k) is not None for k in out.dask) == 4 x = await c.compute(out) expected = pdf2.merge( pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b" From 0f89c165a3cb693ad00d8957e66787a29293b454 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 12:44:02 +0100 Subject: [PATCH 2/4] fix --- dask_expr/_merge.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index fa1aa29a8..daad62290 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -537,10 +537,12 @@ def _layer(self) -> dict: token_left = _tokenize_deterministic( self._name, self.left._name, + self.shuffle_left_on, ) token_right = _tokenize_deterministic( self._name, self.right._name, + self.shuffle_right_on, ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) From 865990c96f4d0ab311a094ad71713754b4a848e5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 12:46:46 +0100 Subject: [PATCH 3/4] Minor --- dask_expr/_merge.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index daad62290..1c4c605e2 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -538,11 +538,13 @@ def _layer(self) -> dict: self._name, self.left._name, self.shuffle_left_on, + self.left_index, ) token_right = _tokenize_deterministic( self._name, self.right._name, self.shuffle_right_on, + self.right_index, ) _barrier_key_left = barrier_key(ShuffleId(token_left)) _barrier_key_right = barrier_key(ShuffleId(token_right)) From 69751cde65cada83194b7be06ed51f0b6808696a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 17:52:39 +0100 Subject: [PATCH 4/4] docs --- dask_expr/_merge.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index 1c4c605e2..87b799da0 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -535,12 +535,18 @@ def _layer(self) -> dict: dsk = {} token_left = _tokenize_deterministic( + # Include self._name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. self._name, self.left._name, self.shuffle_left_on, self.left_index, ) token_right = _tokenize_deterministic( + # Include self._name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. self._name, self.right._name, self.shuffle_right_on,