Skip to content

Conversation

@phofl
Copy link
Collaborator

@phofl phofl commented Aug 21, 2023

The memory footprint of the blockwise version was terrible

@phofl phofl requested a review from rjzamora August 21, 2023 12:47
@rjzamora rjzamora changed the title Implement HashJoinLayer for merge Implement HashJoinP2P for merge Aug 21, 2023
Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @phofl !

from dask_expr._shuffle import Shuffle, _contains_index_name
from dask_expr._shuffle import AssignPartitioningIndex, Shuffle, _contains_index_name

_HASH_COLUMN_NAME = "__hash_partition"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it this is a "magic" string used by distributed's p2p merge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

self.how,
self.left_on,
self.right_on,
self.left._meta.drop(columns=_HASH_COLUMN_NAME),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how the p2p merge depends on _HASH_COLUMN_NAME and drops this column, but isn't responsible for generating this column. Unfortunately, the main issue is that merge_unpack drops the shuffle column, so there is not much we can do here to make the algorithm more intuitive.

To clarify, it would be much clearer if we could just lower the merge to "add partitioning columns" + "p2p merge" + "drop partitioning column"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I mostly agree but I fixed something else there today. This is on my medium term todo list

@phofl
Copy link
Collaborator Author

phofl commented Aug 23, 2023

Merging this

@phofl phofl merged commit 6238a7e into dask:main Aug 23, 2023
@phofl phofl deleted the hash_merge branch August 23, 2023 10:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DataFrame merge should use HashJoinLayer instead of two separate P2Ps

2 participants