From f0da035dbcbb6e9ea58df7129169a6be21142e01 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Apr 2026 09:45:49 -0600 Subject: [PATCH] test: fix SparkToColumnar plan-shape assertions on Spark 4 Closes #4031. Spark 4 wraps the final AQE plan in `ResultQueryStageExec`, a `LeafExecNode` that `SparkPlan.collect` treats as opaque. The two affected tests used `adaptivePlan.collect { case c: CometSparkToColumnarExec => c }`, which therefore found zero matches on Spark 4 even though the node was present in the materialized plan. Switch to the `collect` method from `AdaptiveSparkPlanHelper` (already mixed into `CometTestBase`), which descends through query stages. Drop the `assume(!isSpark40Plus)` guards that disabled the tests on Spark 4. --- .../scala/org/apache/comet/exec/CometExecSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9790c87147..a0617735be 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2388,8 +2388,6 @@ class CometExecSuite extends CometTestBase { } test("SparkToColumnar eliminate redundant in AQE") { - // TODO fix for Spark 4.0.0 - assume(!isSpark40Plus) withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { @@ -2404,7 +2402,10 @@ class CometExecSuite extends CometTestBase { val planAfter = df.queryExecution.executedPlan assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val numOperators = adaptivePlan.collect { case c: CometSparkToColumnarExec => + // Use AdaptiveSparkPlanHelper.collect so traversal descends through QueryStageExec.plan; + // Spark 4 wraps the final plan in ResultQueryStageExec (a LeafExecNode) that + // SparkPlan.collect would otherwise stop at. + val numOperators = collect(adaptivePlan) { case c: CometSparkToColumnarExec => c } assert(numOperators.length == 1) @@ -2478,8 +2479,6 @@ class CometExecSuite extends CometTestBase { } test("SparkToColumnar override node name for row input") { - // TODO fix for Spark 4.0.0 - assume(!isSpark40Plus) withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { @@ -2494,7 +2493,8 @@ class CometExecSuite extends CometTestBase { val planAfter = df.queryExecution.executedPlan assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan - val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec => + // See comment in the "eliminate redundant in AQE" test about AdaptiveSparkPlanHelper.collect. + val nodeNames = collect(adaptivePlan) { case c: CometSparkToColumnarExec => c.nodeName } assert(nodeNames.length == 1)