-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.4: Handle skew in writes #7520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| @Test | ||
| public void testSkewDelete() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests for CoW row-level operations already cover SparkWrite, which is used in normal writes. There is not much logic on Iceberg side, the rest is covered by Spark tests.
|
This is just for 3.4 because of the new rebalance code for writes right? |
|
@RussellSpitzer, correct. This API does not exist in 3.3. |
|
|
||
| @Override | ||
| public boolean distributionStrictlyRequired() { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may actually need to move it to SparkWriteBuilder as SparkWrite is used for compaction. We explicitly disable table distribution/ordering and AQE in shuffling rewriters but not in bin-pack when the output spec mismatches.
Thoughts, @RussellSpitzer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to say I don't really mind our Compaction solution atm. I think disabling AQE is our best bet there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stick to that then, I agree.
| // that means there are 4 shuffle blocks, all assigned to the same reducer | ||
| // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks | ||
| // otherwise, there would be 1 task processing 4 shuffle blocks | ||
| int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] can use PropertyUtil here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not use PropertyUtil as it requires a default value and won't fit on one line. I can switch, though.
| int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP)); | ||
| Assert.assertEquals("Must produce 4 files", 4, addedFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] can this be moved to a private func for ex: assertAddedFiles to use in both MOR / COW ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was existing validateProperty, which I forgot about. I switched to that.
|
|
||
| @Override | ||
| public boolean distributionStrictlyRequired() { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also check ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED is true as well before disabling this requirement ? otherwise it will be a no-op for OptimizeSkewInRebalancePartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever a good reason to return true from this method? We don't require distributions to be strict and it is up to Spark to either handle the skew or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I was mostly comming from, the point that we are overriding this and setting it to false, in a hope that spark will optimize the skew whereas if the above conf is disabled spark will never do the same. I am fine with keeping it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is better to always return false and leave it up to Spark. It seems the safest way as Spark may add new configs or logic on when to do that in the future.
| // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks | ||
| // otherwise, there would be 2 tasks processing 2 shuffle blocks each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[doubt] should we also add a UT where coalese is happening ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to do so in a separate PR. This change focuses on skew.
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well, Thanks @aokolnychyi !
|
Thanks for reviewing, @singhpk234 @RussellSpitzer! |
This PR enables AQE to handle skew in writes.