Skip to content

Conversation

@bowenliang123
Copy link
Contributor

@bowenliang123 bowenliang123 commented Jun 28, 2023

  • Use Reblance instead of Repartition for distribution in Spark 3.2 and 3.3, to avoid small partitioned files cased with AQE decided partition numbers.

Example for executions plan for insertion to an iceberg table, explain EXTENDED insert into gfpersonas_platform.t_ptr_label_ice_bowen select * from gfpersonas_platform.t_ptr_label_ice;.

Before: (With 326 data files written, 3KB+ per file)

+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice_bowen], [], false, false, false
+- 'Project [*]
   +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false

== Analyzed Logical Plan ==
AppendData RelationV2[obj_id#485, lab_val#486, lab_numr#487, busi_date#488] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
+- Project [obj_id#481, lab_val#482, lab_numr#483, busi_date#484]
   +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
      +- RelationV2[obj_id#481, lab_val#482, lab_numr#483, busi_date#484] spark_catalog.gfpersonas_platform.t_ptr_label_ice

== Optimized Logical Plan ==
AppendData RelationV2[obj_id#485, lab_val#486, lab_numr#487, busi_date#488] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- Sort [busi_date#484 ASC NULLS FIRST], false
   +- RelationV2[obj_id#481, lab_val#482, lab_numr#483, busi_date#484] spark_catalog.gfpersonas_platform.t_ptr_label_ice

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3217/1372697900@67a7714c, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- *(1) Sort [busi_date#484 ASC NULLS FIRST], false, 0
   +- *(1) ColumnarToRow
      +- BatchScan[obj_id#481, lab_val#482, lab_numr#483, busi_date#484] spark_catalog.gfpersonas_platform.t_ptr_label_ice [filters=] RuntimeFilters: []
 |
+----------------------------------------------------+

After: (With 45 data files written, ~22MB per file)
Having REBALANCE_PARTITIONS_BY_COL in Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL,

+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice_bowen], [], false, false, false
+- 'Project [*]
   +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false

== Analyzed Logical Plan ==
AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
+- Project [obj_id#42, lab_val#43, lab_numr#44, busi_date#45]
   +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
      +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice

== Optimized Logical Plan ==
AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false
   +- RebalancePartitions [lab_numr#44, busi_date#45]
      +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
         +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
 |
+----------------------------------------------------+

@github-actions github-actions bot added the spark label Jun 28, 2023
@bowenliang123 bowenliang123 changed the title [WIP] Use Reblance instead of Repartition for distribution [WIP] Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution Jun 28, 2023
@bowenliang123 bowenliang123 changed the title [WIP] Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution [WIP] Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite Jun 28, 2023
@bowenliang123 bowenliang123 changed the title [WIP] Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite Jun 29, 2023
@ConeyLiu
Copy link
Contributor

Before:
Having REPARTITION_BY_NUM in +- Exchange hashpartitioning(lab_numr#270, busi_date#271, 200), REPARTITION_BY_NUM

It seems it is REBALANCE_PARTITIONS_BY_COL as well from the plan.

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
         +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
 |
+----------------------------------------------------+

@namrathamyske
Copy link
Contributor

@bowenliang123 Looks like REBALANCE_PARTITIONS_BY_COL does not have range partitioner support

@bowenliang123
Copy link
Contributor Author

Before:
Having REPARTITION_BY_NUM in +- Exchange hashpartitioning(lab_numr#270, busi_date#271, 200), REPARTITION_BY_NUM

It seems it is REBALANCE_PARTITIONS_BY_COL as well from the plan.

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
         +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
 |
+----------------------------------------------------+

Sorry, that I pasted a wrong one as a before plan. Updated with a new correct one.

@bowenliang123
Copy link
Contributor Author

bowenliang123 commented Jul 3, 2023

@bowenliang123 Looks like REBALANCE_PARTITIONS_BY_COL does not have range partitioner support

Yes, you are right. RebalancePartitions only supports RoundRobinPartitioning and HashPartitioning.

I initialled this PR as a workaround in my case to reduce written data files dramatically (366 files -> 45 files). It might not be perfect for satisfying ranger support and semantics of Distribution.

@namrathamyske
Copy link
Contributor

namrathamyske commented Jul 5, 2023

@bowenliang123 @ConeyLiu i understand REBALANCE_PARTITIONS_BY_COL this adds a adaptive coalesce(AQE) which just coalesces the partitions local to executor( hence reducing number of files written) . Is this effective if the partitions are spread across different workers since the partitions wont be local anymore( for coalesce to work) ?

@bowenliang123
Copy link
Contributor Author

bowenliang123 commented Jul 5, 2023

@bowenliang123 @ConeyLiu i understand REBALANCE_PARTITIONS_BY_COL this adds a adaptive coalesce(AQE) which just coalesces the partitions local to executor( hence reducing number of files written) . Is this effective if the partitions are spread across different workers since the partitions wont be local anymore( for coalesce to work) ?

image

Since RebalancePartitions is introduced, a shuffle read stage was introduced. So I think it works for partitions across worker nodes. @namrathamyske

@namrathamyske
Copy link
Contributor

namrathamyske commented Jul 10, 2023

@bowenliang123
Copy link
Contributor Author

@bowenliang123 Can we merge this to master by having a flag called "strictDistributionRequired" Similar to https://github.com/apache/spark/blob/453300b418bc03511ad9167bbaad49e0f1f1c090/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala#L63 for rebalance to be applied?

Yes, I have noticed this changes in Spark 3.4. And backporting them to 3.3 is a considerable approach.

@bowenliang123
Copy link
Contributor Author

I do not have a clue about how to fix the failures in GA tests, and where and why they fail. May need some help in this.

@github-actions
Copy link

github-actions bot commented Sep 4, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Sep 4, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Sep 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants