Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
_contains_index_name,
_select_columns_or_index,
)
from dask_expr._util import DASK_GT_20231000, _convert_to_list
from dask_expr._util import DASK_GT_20231000, _convert_to_list, _tokenize_deterministic

_HASH_COLUMN_NAME = "__hash_partition"
_PARTITION_COLUMN = "_partitions"
Expand Down Expand Up @@ -428,9 +428,20 @@ def _layer(self) -> dict:
from distributed.shuffle._shuffle import shuffle_barrier

dsk = {}
token = self._name.split("-")[-1]
token_left = token + "-left"
token_right = token + "-right"
token_left = _tokenize_deterministic(
"hash-join",
Copy link
Member Author

@hendrikmakait hendrikmakait Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to ensure that we avoid any unwanted sharing between merges and shuffles. I don't have a good-enough overview right now to make sharing between them work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can rely on name_input_{left/right} to truly be unique and identify distinct input dataframes, I believe we can even share between merges and ordinary shuffles. I wonder where this use case would come up, though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern here is that shuffle_transfer and merge_transfer do different things. We'd have to refactor this so that they are in fact interchangeable and shareable. I guess there are only few cases where this would come in handy though. Maybe something like joining a dataframe on x and aggregating that same dataframe on x within the same graph?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they do slightly different things at the moment, because the merge layer caused all kinds of troubles before that. This is on my todo list to align more closely.

self.left._name,
self.shuffle_left_on,
self.npartitions,
self._partitions,
)
token_right = _tokenize_deterministic(
"hash-join",
self.right._name,
self.shuffle_right_on,
self.npartitions,
self._partitions,
)
_barrier_key_left = barrier_key(ShuffleId(token_left))
_barrier_key_right = barrier_key(ShuffleId(token_right))

Expand Down
39 changes: 36 additions & 3 deletions dask_expr/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ async def test_self_merge_p2p_shuffle(c, s, a, b):


@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b):
async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = from_pandas(pdf1, npartitions=5)
ddf2 = from_pandas(pdf2, npartitions=10)

out = (
ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p")
.repartition(20)
.merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p")
# Vary the number of output partitions for the shuffles of dd2
.repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle_backend="p2p")
)
# Generate unique shuffle IDs if the input frame is the same but parameters differ
assert sum(id_from_key(k) is not None for k in out.dask) == 4
Expand All @@ -96,6 +96,39 @@ async def test_merge_p2p_shuffle_reused_dataframe(c, s, a, b):
)


@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
pdf1 = lib.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = lib.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = from_pandas(pdf1, npartitions=5)
ddf2 = from_pandas(pdf2, npartitions=10)

# This performs two shuffles:
# * ddf1 is shuffled on `a`
# * ddf2 is shuffled on `x`
ddf3 = ddf1.merge(ddf2, left_on="a", right_on="x", shuffle_backend="p2p")

# This performs one shuffle:
# * ddf3 is shuffled on `b`
# We can reuse the shuffle of dd2 on `x` from the previous merge.
out = ddf2.merge(
ddf3,
left_on="x",
right_on="b",
shuffle_backend="p2p",
)
# Generate the same shuffle IDs if the input frame is the same and all its parameters match
assert sum(id_from_key(k) is not None for k in out.dask) == 3
x = await c.compute(out)
expected = pdf2.merge(
pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"
)
lib.testing.assert_frame_equal(
x.sort_values("a", ignore_index=True),
expected.sort_values("a", ignore_index=True),
)


@pytest.mark.parametrize("npartitions_left", [5, 6])
@gen_cluster(client=True)
async def test_index_merge_p2p_shuffle(c, s, a, b, npartitions_left):
Expand Down