fix: CometShuffleExchangeExec logical link should be correct#324
fix: CometShuffleExchangeExec logical link should be correct#324viirya merged 8 commits intoapache:mainfrom
Conversation
| "spark.sql.adaptive.autoBroadcastJoinThreshold" -> "-1", | ||
| "spark.sql.autoBroadcastJoinThreshold" -> "-1", |
There was a problem hiding this comment.
Query plan is changed due to the logical link fix. In order to have CometSortMergeJoin, we need to disable broadcast join and AQE broadcast join.
3a9fdb9 to
aa100c6
Compare
| withSQLConf( | ||
| SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true", | ||
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", | ||
| SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", |
There was a problem hiding this comment.
Similar reason here. The query plan is changed and AQE interferes with a broadcast join.
aa100c6 to
96b9826
Compare
| override def equals(obj: Any): Boolean = { | ||
| obj match { | ||
| case other: CometShuffleExchangeExec => | ||
| this.outputPartitioning == other.outputPartitioning && | ||
| this.shuffleOrigin == other.shuffleOrigin && this.child == other.child && | ||
| this.shuffleType == other.shuffleType && | ||
| this.advisoryPartitionSize == other.advisoryPartitionSize | ||
| case _ => | ||
| false | ||
| } | ||
| } |
There was a problem hiding this comment.
Because CometShuffleExchangeExec adds originalPlan parameter which is not covered by canonicalization in Spark, we need to exclude it when compare two CometShuffleExchangeExec to make sure Spark reuse shuffle rule work.
| override def stringArgs: Iterator[Any] = | ||
| Iterator(outputPartitioning, shuffleOrigin, shuffleType, child) ++ Iterator(s"[plan_id=$id]") |
There was a problem hiding this comment.
Follow Spark Exchange.stringArgs.
| conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") | ||
| conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true") | ||
| conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") | ||
| conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") |
1 similar comment
|
cc @andygrove Can you take a look if you have time? Thanks. |
| case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) => | ||
| inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() | ||
| case BroadcastQueryStageExec( | ||
| _, | ||
| ReusedExchangeExec(_, c: CometBroadcastExchangeExec), | ||
| _) => | ||
| inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() |
There was a problem hiding this comment.
These are necessary but missed to add previously. This fix exposes that.
|
Merged. Thanks @sunchao |
Which issue does this PR close?
Closes #323.
Rationale for this change
What changes are included in this PR?
How are these changes tested?