-
-
Notifications
You must be signed in to change notification settings - Fork 748
Reuse identical shuffles in P2P hash join #8306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reuse identical shuffles in P2P hash join #8306
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 16h 41m 21s ⏱️ + 1h 13m 39s For more details on these failures, see this check. Results for commit 596adff. ± Comparison against base commit 7d0ba9c. ♻️ This comment has been updated with latest results. |
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes LGTM. I'm just having a little difficulty wrapping my head around what exactly we're now caching
|
|
||
|
|
||
| @gen_cluster(client=True) | ||
| async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test passes for me on main. From what I can tell, this is expected, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there's no reuse on main. This test makes sure that we don't reuse too much (which is what originally happened in dask-expr).
| # Generate the same shuffle IDs if the input frame is the same and all its parameters match | ||
| assert sum(id_from_key(k) is not None for k in out.dask) == 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This took me a little while to wrap my head around. Maybe an additional comment is helpful.
What's happening here is
- Shuffle1: shuffle ddf1
- Shuffle2: shuffle ddf2
- Those two shuffle will perform the first merge
- Shuffle 3: ?? Which DF is now shuffled and what can be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a comment in the test would be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shuffle 3 is again ddf2? I'm currently wondering why there is a third shuffle. Shouldn't this all be doable with just two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've adjusted the test a bit, I mixed something up when moving it from dask-expr. The idea with using three shuffles is that I want to guarantee that sharing works between multiple merges irrespective of the side the dataframe is assigned.
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good find!
IFF a shuffle has the exact same input and parameters, P2P hash joins should reuse it.
Identical optimization as in dask/dask-expr#361
pre-commit run --all-files