Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2388,8 +2388,6 @@ class CometExecSuite extends CometTestBase {
}

test("SparkToColumnar eliminate redundant in AQE") {
// TODO fix for Spark 4.0.0
assume(!isSpark40Plus)
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Expand All @@ -2404,7 +2402,10 @@ class CometExecSuite extends CometTestBase {
val planAfter = df.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val numOperators = adaptivePlan.collect { case c: CometSparkToColumnarExec =>
// Use AdaptiveSparkPlanHelper.collect so traversal descends through QueryStageExec.plan;
// Spark 4 wraps the final plan in ResultQueryStageExec (a LeafExecNode) that
// SparkPlan.collect would otherwise stop at.
val numOperators = collect(adaptivePlan) { case c: CometSparkToColumnarExec =>
c
}
assert(numOperators.length == 1)
Expand Down Expand Up @@ -2478,8 +2479,6 @@ class CometExecSuite extends CometTestBase {
}

test("SparkToColumnar override node name for row input") {
// TODO fix for Spark 4.0.0
assume(!isSpark40Plus)
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Expand All @@ -2494,7 +2493,8 @@ class CometExecSuite extends CometTestBase {
val planAfter = df.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec =>
// See comment in the "eliminate redundant in AQE" test about AdaptiveSparkPlanHelper.collect.
val nodeNames = collect(adaptivePlan) { case c: CometSparkToColumnarExec =>
c.nodeName
}
assert(nodeNames.length == 1)
Expand Down
Loading