-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.3: Adding Rebalance operator solving for small files problem #8042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@huaxingao is probably the right person to review this |
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need some tests to prove the property is taking effect and that we get different plans.
|
Thanks for extending my PR on #7932, by introducing strict table distribution in table property. |
…l files problem Revert "Spark 3.3: Adding Rebalance operator for handling skew - solving small files problem" This reverts commit 9c82c35 Revert "Spark 3.3: Adding Rebalance operator for handling skew - solving small files problem" This reverts commit 5f0094d. Revert "Spark 3.3: Adding Rebalance operator for handling skew - solving small files problem" This reverts commit 0612543.
| // Controls whether the set distribution mode has to be followed or not. | ||
| public static final String STRICT_TABLE_DISTRIBUTION_AND_ORDERING = | ||
| "strict-table-distribution-and-ordering"; | ||
| public static final String STRICT_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT = "true"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New table level property introduced to decide whether we want strict distribution mode specified in properties. As Rebalance supports only hash partitioning in spark 3.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor suggestion here, since this is spark specific. maybe call this "write.spark.strict-table-distribution-ordering" Comment also should reflect that this is a spark property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is only going to be for Spark 3.3 as well right? Should probably document that since we won't be doing a patch in 3.4 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The doc which is currently in the planning section should probably be here instead
// if strict distribution mode is not enabled, then we fallback to spark AQE
// to determine the number of partitions by colaesceing and un-skewing partitions
// Also to note, Rebalance is only supported for hash distribution mode till spark 3.3
// By default the strictDistributionMode is set to true, to not disrupt regular
// plan of RepartitionByExpression
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea to add a new public-facing table property that is already applicable only to older versions of Spark. Can we add a SQL property instead in our 3.3 module? Also, I would be okay skipping it and just assuming the distribution has to be strict. Then Spark will coalesce small files but won't split large ones. If we want feature parity with 3.4, let's do a SQL property.
Also, the ordering is always required. It is just the distribution that can be strict or not. It should be reflected in the name of the property.
| RepartitionByExpression(ArraySeq.unsafeWrapArray(distribution), query, finalNumPartitions) | ||
|
|
||
| val tableProperties = if(table.isInstanceOf[RowLevelOperationTable]) { | ||
| table.asInstanceOf[RowLevelOperationTable].table.properties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of RowLevelOperationTable, had to get the nested table to get its properties.
| // Also to note, Rebalance is only supported for hash distribution mode till spark 3.3 | ||
| // By default the strictDistributionMode is set to true, to not disrupt regular | ||
| // plan of RepartitionByExpression | ||
| RebalancePartitions(ArraySeq.unsafeWrapArray(distribution), query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Operator which which helps in reducing number of files by AQE
| } | ||
|
|
||
| @Test | ||
| public void testCoalesceDelete() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests from #7532
|
@RussellSpitzer Added tests from spark 3.4 |
...ala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
...ala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
...ala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
|
LGTM. cc @RussellSpitzer @aokolnychyi |
|
@RussellSpitzer Addressed all review comments! Please take another look. |
|
I will have some time today/tomorrow to take a look as well. |
| if (strictDistributionMode.equals("false") && isHashDistributionMode) { | ||
| // if strict distribution mode is not enabled, then we fallback to spark AQE | ||
| // to determine the number of partitions by colaesceing and un-skewing partitions | ||
| // Also to note, Rebalance is only supported for hash distribution mode till spark 3.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Till Spark 3.3? Or 3.4? I am confused cause this change is for 3.3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi in spark 3.4 both hash, range is supported for rebalance operator. But for spark 3.3 only hash is supported.I will change the statement to in spark 3.3. Do we we want to fall back to rebalance even for none distribution mode (aka round robin partitioning)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's OK. Let's just fix the comment then cause it states that Spark 3.3 is supposed to work.
|
The change seems correct to me but I would not add a table property given that it is needed only for older versions of a particular engine. I'd add a SQL property in 3.3 instead. Thanks, @namrathamyske! |
...ala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
|
@aokolnychyi thanks for reviewing! addressed your comments |
|
@RussellSpitzer @aokolnychyi Pls give this PR another look |
...ala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
| // When set, new snapshots will be committed to this branch. | ||
| public static final String WAP_BRANCH = "spark.wap.branch"; | ||
|
|
||
| // This property doesn't need to be transferred to Spark 3.4 because we have already set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description seems to just describe the implementation, what we would need here is just what an end user would need to know. What does this property do for the user and why should they change it?
I would probably change the whole name to something like, "ENABLE_MERGE_AQE" or something like that.
"spark.merge-aqe.enabled" defaulting to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would apply to all writes, not only row-level operations, if extensions are enabled?
What about spark.sql.iceberg.write-aqe.enabled with false by default? I am not sure about write-aqe part of it but I think it has be generic and start with spark.sql.iceberg prefix.
Also, the comment is probably too specific, I agree with @RussellSpitzer it should be for the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this applies to all writes, not just row level operations. This property changes the final distribution, and used in ExtendedDistributionAndOrderingUtils, i would prefer spark.sql.iceberg.write-distribution.aqe.enabled. Else having just spark.sql.iceberg.write-aqe.enabled is confused for enabling or disabling AQE for the whole spark job
443256d to
7d82055
Compare
7d82055 to
bf7e866
Compare
|
@RussellSpitzer @aokolnychyi addressed the comments. Pls give this another look |
|
Hi all, any progress on this? |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
For spark 3.4, #7637, #7520 have been added for enabling AQE to solve for small files problem and taking care of skews.
For spark 3.4,
RequiresDistributionAndOrderinghasdistributionStrictlyRequired()which has been to false by iceberg https://github.com/apache/iceberg/blame/37f53518a09803e4ef6b4669f58fbcc960ea5994/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L139.To solve same problem in spark 3.3 without presence of
distributionStrictlyRequired()inRequiresDistributionAndOrdering, we can add a table level property which can be checked which determining Repartition vs Rebalance operator. Idea taken from https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala#L66.Extending this PR on #7932. @bowenliang123
@RussellSpitzer @rdblue @aokolnychyi @amogh-jahagirdar @jackye1995 @singhpk234 could you take a look at this PR?
cc: @SreeramGarlapati