-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53738][SQL] Fix planned write when query output contains foldable orderings #52584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…n query output contains literal
| WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) | ||
| val newId = codegenStageCounter.incrementAndGet() | ||
| val newPlan = WholeStageCodegenExec(insertInputAdapter(plan))(newId) | ||
| plan.logicalLink.foreach(newPlan.setLogicalLink) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that WholeStageCodegenExec misses setting logicalLink, is it by design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, and it never caused issue with AQE before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't seen the real issues in both production and existing UT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan if I revert changes in FileFormatWriter.scala, this is not required.
Do you want me to keep it or revert it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's revert it to be safe, the logical plan is quite sensitive to AQE. And technically, the CollapseCodegenStages is newly generated at planning phase, it does have have a corresponding logical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan reverted, and thanks for the explanation.
| plan.logicalLink match { | ||
| case Some(WriteFiles(query, _, _, _, _, _)) => | ||
| V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering | ||
| case Some(query) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the query can be WholeStageCodegenExec, that's why I set logicalLink on WholeStageCodegenExec
|
|
||
| val listener = new QueryExecutionListener { | ||
| override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
| val conf = qe.sparkSession.sessionState.conf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bugfix, the listener runs in another thread, without this change, conf.getConf actually gets conf from the thread local, thus may cause issues on concurrency running tests
| def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { | ||
| val map = AttributeMap(output.zip(newOutput)) | ||
| val newOutputOrdering = outputOrdering | ||
| .map(_.transform { case a: Attribute => map(a) }) | ||
| .asInstanceOf[Seq[SortOrder]] | ||
| InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue was identified in previous try, see #52474 (comment)
|
|
||
| override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = { | ||
| val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation] | ||
| copied.statsOfPlanToCache = this.statsOfPlanToCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, issue was identified in previous try, see #52474 (comment)
|
@cloud-fan BTW, the "planned write" switch (an internal config) was added since 3.4, do we have a plan to remove it to simplify code, or tend to preserve it forever? |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
Outdated
Show resolved
Hide resolved
| expressions.exists(_.exists(_.isInstanceOf[Empty2Null])) | ||
| } | ||
|
|
||
| def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add comments to explain the reason behind it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| .getOrElse(materializeAdaptiveSparkPlan(plan)) | ||
| .outputOrdering | ||
|
|
||
| val requiredOrdering = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the code path when planned write is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave it unfixed, as this code path is rarely reached and this fix is kind of an optimization: it's only about perf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a necessary change for the "planned write" to make UT happy
if (Utils.isTesting) outputOrderingMatched = orderingMatched
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK this is a necessary for the current codebase, but do we really need to do it in theory? The planned write should have added the sort already, ideally we don't need to try to add sort again here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The planned write should have added the sort already, ideally we don't need to try to add sort again here.
yes, exactly
peter-toth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending CI.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
| // columns. | ||
| val ordering = partitionColumns.drop(numStaticPartitionCols) ++ | ||
| writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||
| plan.logicalLink match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about this. In AQE we have a fallback to find logical link in the children, so that it's more reliable. Now we have the risk of perf regression if the logical link is not present and we add an extra sort.
Shall we remove the adding sort here completly if planned write is enabled (WriteFiles is present)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about this. In AQE we have a fallback to find logical link in the children, so that it's more reliable.
@cloud-fan do you suggest
- plan.logicalLink match {
+ plan.logicalLink.orElse {
+ plan.collectFirst { case p if p.logicalLink.isDefined => p.logicalLink.get }
+ } match {Shall we remove the adding sort here completly if planned write is enabled (
WriteFilesis present)?
I think the current code has already satisfied your expectation, when planned write is enabled:
- if concurrent writer is disabled, the calculated required ordering won't be used.
- if concurrent writer is enabled, the calculated required ordering is only used in the concurrent writer step 2.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Lines 393 to 406 in 29434ea
| /** | |
| * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened | |
| * for writing. | |
| * | |
| * The process has the following steps: | |
| * - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all | |
| * writers opened and write rows one by one. | |
| * - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or | |
| * bucket column(s). Write rows one by one, and eagerly close the writer when finishing | |
| * each partition and/or bucket. | |
| * | |
| * Caller is expected to call `writeWithIterator()` instead of `write()` to write records. | |
| */ | |
| class DynamicPartitionDataConcurrentWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I have updated the code to fallback to find logical link in the children, then setLogicalLink for WholeStageCodegenExec is unnecessary for this PR, please let me know if you want me to keep it or restore it.
|
what happens if we don't modify I think the only issue is for test: |
|
@cloud-fan I agree with your summary. Only the newly added tests are affected if I don't touch Have updated the code, please take another look. |
|
Kindly ping @cloud-fan, do you have further concerns with this PR? |
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only one comment: https://github.com/apache/spark/pull/52584/files#r2441550259
|
@cloud-fan I have addressed your last comment. @peter-toth @dongjoon-hyun Can anyone help merge this PR? |
…ble orderings ### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit f33d8aa) Signed-off-by: Peter Toth <peter.toth@gmail.com>
|
Thanks @pan3793 for the fix and @cloud-fan and @dongjoon-hyun for the review! Merged to @pan3793 , there were conflicts with |
|
@peter-toth seems SPARK-46485 didn't land on branch-3.5, I think we should backport that first then this one, or neither. |
|
Seem like SPARK-46485 was not needed for 3.5.x because SPARK-46378 had never landed in it, so probably we don't need this PR either. But let's recover |
|
I take a closer look at branch-3.5 - I confirm this issue also affects branch-3.5 by only porting the UT (it fails). SPARK-46485 actually fixes a hidden bug (until exposed by SPARK-46378) that has existed since 3.4. Therefore, I think we should backport SPARK-46485 and this one. @peter-toth WDYT? |
|
For SPARK-46485, please ping there (in the following PR) once more again, @pan3793 , because there are more audience there. |
|
@dongjoon-hyun thanks for the reminder. |
Yeah, IMO in that case it makes sense to backport. Especially that this PR fixes 2 other, so far hidden issues (#52584 (comment), #52584 (comment)). |
…ble orderings ### What changes were proposed in this pull request? This is the second try of apache#52474, following [the suggestion from cloud-fan](apache#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit f33d8aa) Signed-off-by: Peter Toth <peter.toth@gmail.com>
…foldable orderings Backport #52584 to branch-3.5 ### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52697 from pan3793/SPARK-53694-3.5. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Peter Toth <peter.toth@gmail.com>
…ble orderings ### What changes were proposed in this pull request? This is the second try of apache#52474, following [the suggestion from cloud-fan](apache#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Peter Toth <peter.toth@gmail.com> (cherry picked from commit 6289672) Signed-off-by: Peter Toth <peter.toth@gmail.com>
|
|
||
| test("v1 write to hive table with sort by literal column preserve custom order") { | ||
| withCovnertMetastore { _ => | ||
| withPlannedWrite { enabled => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pan3793 This enabled is not actually being utilized in the test, so does this test case really need to be wrapped with withPlannedWrite? This will result in the test content being executed twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's correct that the variable enabled is not referenced, but the wrapper withPlannedWrite is indeed required, we should ensure preserving the user-provided sort w/ and w/o enabling planned write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got ~
…ble orderings ### What changes were proposed in this pull request? This is the second try of apache#52474, following [the suggestion from cloud-fan](apache#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Peter Toth <peter.toth@gmail.com>
What changes were proposed in this pull request?
This is the second try of #52474, following the suggestion from cloud-fan
This PR fixes a bug in
plannedWrite, where thequeryhas foldable orderings in the partition columns.The evaluation of
FileFormatWriter.orderingMatchedfails becauseSortOrder(Literal)is eliminated byEliminateSorts.Why are the changes needed?
V1Writeswill override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect.SPARK-53707 partially fixes the issue on the logical plan by adding a
Projectof query inV1Writes.Before SPARK-53707
After SPARK-53707
Note, note the issue still exists because there is another place to check the ordering match again in
FileFormatWriter.This PR fixes the issue thoroughly, with new UTs added.
Does this PR introduce any user-facing change?
Yes, it's a bug fix.
How was this patch tested?
New UTs are added.
Was this patch authored or co-authored using generative AI tooling?
No.