diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 919b7d51a8..5c09fd77eb 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1309,16 +1309,15 @@ index 0df7f806272..92390bd819f 100644 test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -index 2e33f6505ab..949fdea0003 100644 +index 2e33f6505ab..54f5081e10a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -@@ -23,10 +23,12 @@ import org.apache.spark.SparkRuntimeException +@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union} +import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.FileScanRDD -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -1326,7 +1325,7 @@ index 2e33f6505ab..949fdea0003 100644 import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -@@ -1529,6 +1531,18 @@ class SubquerySuite extends QueryTest +@@ -1529,6 +1530,18 @@ class SubquerySuite extends QueryTest fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( _.files.forall(_.urlEncodedPath.contains("p=0")))) @@ -1345,7 +1344,7 @@ index 2e33f6505ab..949fdea0003 100644 case _ => false }) } -@@ -2094,7 +2108,7 @@ class SubquerySuite extends QueryTest +@@ -2094,7 +2107,7 @@ class SubquerySuite extends QueryTest df.collect() val exchanges = collect(df.queryExecution.executedPlan) { @@ -1354,13 +1353,7 @@ index 2e33f6505ab..949fdea0003 100644 } assert(exchanges.size === 1) } -@@ -2674,22 +2688,31 @@ class SubquerySuite extends QueryTest - } - } - -- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") { -+ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { +@@ -2678,18 +2691,25 @@ class SubquerySuite extends QueryTest def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = { val df = sql(query) checkAnswer(df, answer) @@ -1369,6 +1362,7 @@ index 2e33f6505ab..949fdea0003 100644 + val dataSourceScanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanLike => f + case c: CometScanExec => c ++ case n: CometNativeScanExec => n } sparkContext.listenerBus.waitUntilEmpty() - assert(fileSourceScanExec.size === 1) @@ -1378,13 +1372,11 @@ index 2e33f6505ab..949fdea0003 100644 + assert(dataSourceScanExec.size === 1) + val scalarSubquery = dataSourceScanExec.head match { + case f: FileSourceScanLike => -+ f.dataFilters.flatMap(_.collect { -+ case s: ScalarSubquery => s -+ }) ++ f.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s }) + case c: CometScanExec => -+ c.dataFilters.flatMap(_.collect { -+ case s: ScalarSubquery => s -+ }) ++ c.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s }) ++ case n: CometNativeScanExec => ++ n.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s }) + } assert(scalarSubquery.length === 1) assert(scalarSubquery.head.plan.isInstanceOf[ReusedSubqueryExec]) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index b89e57422b..1ef901d065 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -32,13 +32,45 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf._ -import org.apache.comet.rules.{CometExecRule, CometScanRule, EliminateRedundantTransitions} +import org.apache.comet.rules.{CometExecRule, CometReuseSubquery, CometScanRule, EliminateRedundantTransitions} import org.apache.comet.shims.ShimCometSparkSessionExtensions /** * CometDriverPlugin will register an instance of this class with Spark. * - * This class is responsible for injecting Comet rules and extensions into Spark. + * Comet rules are injected into Spark's rule pipeline at several extension points. The execution + * order differs between AQE and non-AQE paths: + * + * Non-AQE (QueryExecution.preparations): + * {{{ + * 1. PlanDynamicPruningFilters -- Spark creates DPP filters + * 2. PlanSubqueries -- Spark creates SubqueryExec for scalar subqueries + * 3. EnsureRequirements -- Spark inserts shuffles/sorts + * 4. ApplyColumnarRulesAndInsertTransitions: + * a. preColumnarTransitions: CometScanRule, CometExecRule (replace Spark -> Comet nodes) + * b. insertTransitions: ColumnarToRow/RowToColumnar added + * c. postColumnarTransitions: EliminateRedundantTransitions + * 5. ReuseExchangeAndSubquery -- Spark deduplicates subqueries (sees Comet nodes) + * }}} + * + * AQE (AdaptiveSparkPlanExec): + * {{{ + * Initial plan: + * queryStagePreparationRules: CometScanRule, CometExecRule (replace Spark -> Comet nodes) + * + * Per stage (optimizeQueryStage + postStageCreationRules): + * 1. queryStageOptimizerRules: ReuseAdaptiveSubquery, CometReuseSubquery + * 2. postStageCreationRules -> ApplyColumnarRulesAndInsertTransitions: + * a. preColumnarTransitions: CometScanRule, CometExecRule (no-ops, already converted) + * b. insertTransitions + * c. postColumnarTransitions: EliminateRedundantTransitions + * }}} + * + * CometReuseSubquery is needed in AQE because Spark's ReuseAdaptiveSubquery may run before + * Comet's node replacements in the initial plan construction, and the replacements can disrupt + * subquery reuse that was already applied. The shim-based registration + * (injectQueryStageOptimizerRuleShim) handles API availability: Spark 3.5+ has + * injectQueryStageOptimizerRule, Spark 3.4 does not (no-op). */ class CometSparkSessionExtensions extends (SparkSessionExtensions => Unit) @@ -49,6 +81,7 @@ class CometSparkSessionExtensions extensions.injectColumnar { session => CometExecColumnar(session) } extensions.injectQueryStagePrepRule { session => CometScanRule(session) } extensions.injectQueryStagePrepRule { session => CometExecRule(session) } + injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery) } case class CometScanColumnar(session: SparkSession) extends ColumnarRule { diff --git a/spark/src/main/scala/org/apache/comet/rules/CometReuseSubquery.scala b/spark/src/main/scala/org/apache/comet/rules/CometReuseSubquery.scala new file mode 100644 index 0000000000..fb92b932e0 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/rules/CometReuseSubquery.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan} + +/** + * Re-applies subquery deduplication after Comet node conversions. + * + * Spark's ReuseAdaptiveSubquery runs as a queryStageOptimizerRule before postStageCreationRules, + * which is where CometScanRule/CometExecRule replace Spark operators with Comet equivalents. The + * Comet rules copy expressions from the original Spark nodes, which can disrupt subquery reuse + * that was already applied by Spark's rule. This rule runs after Comet conversions to restore + * proper deduplication. + * + * Uses the same algorithm as Spark's ReuseExchangeAndSubquery (subquery portion): top-down + * traversal via transformAllExpressionsWithPruning, caching by canonical form. + * + * For non-AQE, Spark's ReuseExchangeAndSubquery runs after ApplyColumnarRulesAndInsertTransitions + * in QueryExecution.preparations and handles reuse correctly without this rule. + * + * @see + * ReuseExchangeAndSubquery (Spark's non-AQE subquery reuse) + * @see + * ReuseAdaptiveSubquery (Spark's AQE subquery reuse) + */ +case object CometReuseSubquery extends Rule[SparkPlan] { + + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.subqueryReuseEnabled) { + return plan + } + + val cache = mutable.Map.empty[SparkPlan, BaseSubqueryExec] + + plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + case sub: ExecSubqueryExpression if !sub.plan.isInstanceOf[ReusedSubqueryExec] => + val cached = cache.getOrElseUpdate(sub.plan.canonicalized, sub.plan) + if (cached.ne(sub.plan)) { + sub.withNewPlan(ReusedSubqueryExec(cached)) + } else { + sub + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index cd09f07138..050c6d431f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.comet.shims.ShimStreamSourceAwareSparkPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.{ScalarSubquery => ExecScalarSubquery} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -41,6 +42,7 @@ import com.google.common.base.Objects import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils} import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.QueryPlanSerde.exprToProto /** * Native scan operator for DataSource V1 Parquet files using DataFusion's ParquetExec. @@ -77,16 +79,16 @@ case class CometNativeScanExec( override lazy val metadata: Map[String, String] = originalPlan.metadata /** - * Prepare DPP subquery plans before execution. + * Prepare subquery plans before execution. * - * For non-AQE DPP, partitionFilters contains DynamicPruningExpression(InSubqueryExec(...)) - * inserted by PlanDynamicPruningFilters (which runs before Comet rules). We call - * e.plan.prepare() here so that the subquery plans are set up before execution begins. + * DPP: partitionFilters may contain DynamicPruningExpression(InSubqueryExec(...)) from + * PlanDynamicPruningFilters. * - * Note: doPrepare() alone is NOT sufficient for DPP resolution. serializedPartitionData can be - * triggered from findAllPlanData (via commonData) on a BroadcastExchangeExec thread, outside - * the normal prepare() -> executeSubqueries() flow. The actual DPP resolution (updateResult) - * happens in serializedPartitionData below. + * Scalar subquery pushdown (SPARK-43402, Spark 4.0+): dataFilters may contain ScalarSubquery. + * + * serializedPartitionData can be triggered outside the normal prepare() -> executeSubqueries() + * flow (e.g., from a BroadcastExchangeExec thread), so we prepare subquery plans here and + * resolve them explicitly in serializedPartitionData via updateResult(). */ override protected def doPrepare(): Unit = { partitionFilters.foreach { @@ -94,6 +96,13 @@ case class CometNativeScanExec( e.plan.prepare() case _ => } + dataFilters.foreach { f => + f.foreach { + case s: ExecScalarSubquery => + s.plan.prepare() + case _ => + } + } super.doPrepare() } @@ -138,7 +147,7 @@ case class CometNativeScanExec( // // originalPlan.inputRDD triggers FileSourceScanExec's full scan pipeline including // codegen on partition filter expressions. With DPP, this calls - // InSubqueryExec.doGenCode which requires the subquery to have finished — but + // InSubqueryExec.doGenCode which requires the subquery to have finished - but // outputPartitioning can be accessed before prepare() runs (e.g., by // ValidateRequirements during plan validation). // @@ -208,8 +217,40 @@ case class CometNativeScanExec( case _ => } } - // Extract common data from nativeOp - val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray + // Resolve scalar subqueries in dataFilters and push to the native Parquet reader. + // supportedDataFilters excludes PlanExpression at planning time (unresolved), so these + // aren't in the serialized native plan yet. We resolve them here and append to the + // NativeScanCommon protobuf. Same approach as FileSourceScanLike.pushedDownFilters + // (DataSourceScanExec.scala), which resolves ScalarSubquery -> Literal at execution time. + val commonBytes = { + val base = nativeOp.getNativeScan.getCommon + val scalarSubqueryFilters = dataFilters + .filter(_.exists(_.isInstanceOf[ExecScalarSubquery])) + scalarSubqueryFilters.foreach { f => + f.foreach { + case s: ExecScalarSubquery => + s.updateResult() + case _ => + } + } + val resolvedFilters = scalarSubqueryFilters + .map(_.transform { case s: ExecScalarSubquery => + Literal.create(s.eval(null), s.dataType) + }) + if (resolvedFilters.nonEmpty) { + val commonBuilder = base.toBuilder + for (filter <- resolvedFilters) { + exprToProto(filter, output) match { + case Some(proto) => commonBuilder.addDataFilters(proto) + case _ => + logWarning(s"Could not serialize resolved scalar subquery filter: $filter") + } + } + commonBuilder.build().toByteArray + } else { + base.toByteArray + } + } // Get file partitions from CometScanExec (handles bucketing, etc.) val filePartitions = scan.getFilePartitions() @@ -299,13 +340,15 @@ case class CometNativeScanExec( case other: CometNativeScanExec => this.originalPlan == other.originalPlan && this.serializedPlanOpt == other.serializedPlanOpt && - this.partitionFilters == other.partitionFilters + this.partitionFilters == other.partitionFilters && + this.dataFilters == other.dataFilters case _ => false } } - override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt) + override def hashCode(): Int = + Objects.hashCode(originalPlan, serializedPlanOpt, partitionFilters, dataFilters) private val driverMetricKeys = Set( diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala similarity index 80% rename from spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala rename to spark/src/main/spark-3.4/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala index 0dd783201a..9276fe8190 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -19,21 +19,16 @@ package org.apache.comet.shims +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} trait ShimCometSparkSessionExtensions { - /** - * TODO: delete after dropping Spark 3.x support and directly call - * SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key - */ protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = "spark.sql.extendedExplainProviders" - // Extended info is available only since Spark 4.0.0 - // (https://issues.apache.org/jira/browse/SPARK-47289) def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = { try { - // Look for QueryExecution.extendedExplainInfo(scala.Function1[String, Unit], SparkPlan) qe.getClass.getDeclaredMethod( "extendedExplainInfo", classOf[String => Unit], @@ -43,4 +38,9 @@ trait ShimCometSparkSessionExtensions { } true } + + // injectQueryStageOptimizerRule not available on Spark 3.4 + def injectQueryStageOptimizerRuleShim( + extensions: SparkSessionExtensions, + rule: Rule[SparkPlan]): Unit = {} } diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala new file mode 100644 index 0000000000..15bf156f89 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} + +trait ShimCometSparkSessionExtensions { + + protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = "spark.sql.extendedExplainProviders" + + def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = { + try { + qe.getClass.getDeclaredMethod( + "extendedExplainInfo", + classOf[String => Unit], + classOf[SparkPlan]) + } catch { + case _: NoSuchMethodException | _: SecurityException => return false + } + true + } + + // Available since Spark 3.5 (SPARK-45785) + def injectQueryStageOptimizerRuleShim( + extensions: SparkSessionExtensions, + rule: Rule[SparkPlan]): Unit = { + extensions.injectQueryStageOptimizerRule(_ => rule) + } +} diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala index e68c0cb3ec..cac636c45c 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -19,8 +19,10 @@ package org.apache.comet.shims +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.expressions.aggregate.Aggregation -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf @@ -30,4 +32,10 @@ trait ShimCometSparkSessionExtensions { protected def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = true protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key + + def injectQueryStageOptimizerRuleShim( + extensions: SparkSessionExtensions, + rule: Rule[SparkPlan]): Unit = { + extensions.injectQueryStageOptimizerRule(_ => rule) + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a3f5df3e58..8ba59e50d1 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -570,6 +570,50 @@ class CometExecSuite extends CometTestBase { } } + // Reproduces CI failure from DynamicPartitionPruningSuiteV1AEOn SPARK-37995. + // DynamicPartitionPruningSuiteBase.checkPartitionPruningPredicate (line 233-240) asserts + // that all non-broadcast subquery plans contain AdaptiveSparkPlanExec when AQE is on. + test("SPARK-37995: DPP with scalar subquery does not break subquery assertions") { + withDppTables { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { + val df = sql("""SELECT f.date_id, f.store_id FROM fact_sk f + |JOIN dim_store s ON f.store_id = s.store_id AND s.country = 'NL' + |WHERE s.state_province != (SELECT max(state_province) FROM dim_stats) + |""".stripMargin) + val (_, plan) = checkSparkAnswer(df) + + val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec] + val dpExprs = flatMap(plan) { + case s: FileSourceScanExec => + s.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case s: CometScanExec => + s.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case s: CometNativeScanExec => + s.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case _ => Nil + } + val subqueryBroadcast = dpExprs.collect { + case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b + case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b + } + subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { s => + val subquery = s match { + case r: ReusedSubqueryExec => r.child + case o => o + } + assert( + subquery.exists(_.isInstanceOf[AdaptiveSparkPlanExec]) == isMainQueryAdaptive, + s"Subquery ${subquery.getClass.getSimpleName} adaptive mismatch:\n" + + s"${subquery.treeString}") + } + } + } + } + test("non-AQE DPP with two separate broadcast joins") { withTempDir { dir => val path = s"${dir.getAbsolutePath}/data" @@ -1161,6 +1205,63 @@ class CometExecSuite extends CometTestBase { } } + // Regression test for https://github.com/apache/datafusion-comet/issues/4042 + // SPARK-43402 (Spark 4.0+) pushes scalar subqueries into FileSourceScanExec.dataFilters. + // CometReuseSubquery re-applies subquery deduplication after Comet node conversions, and + // the resolved literal is pushed to the native Parquet reader at execution time (same + // approach as FileSourceScanLike.pushedDownFilters in DataSourceScanExec.scala). + test("scalar subquery in data filters does not break subquery reuse") { + assume(isSpark40Plus, "SPARK-43402 scalar subquery pushdown is Spark 4.0+ only") + + Seq(true, false).foreach { aqeEnabled => + withTable("t1", "t2") { + withSQLConf( + SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { + Seq(1, 2, 3).toDF("c1").write.format("parquet").saveAsTable("t1") + Seq(4, 5, 6).toDF("c2").write.format("parquet").saveAsTable("t2") + + checkSparkAnswer(sql("SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)")) + + val df = sql("SELECT * FROM t1 WHERE c1 < (SELECT min(c2) FROM t2)") + val (_, cometPlan) = checkSparkAnswer(df) + + val nativeScans = collectWithSubqueries(cometPlan) { case n: CometNativeScanExec => + n + } + val t1Scan = + nativeScans.find(_.dataFilters.exists(_.exists(_.isInstanceOf[ScalarSubquery]))) + assert(t1Scan.isDefined, "Expected CometNativeScanExec with ScalarSubquery") + val scalarSubqueries = t1Scan.get.dataFilters.flatMap(_.collect { + case s: ScalarSubquery => s + }) + assert(scalarSubqueries.length === 1) + assert(t1Scan.get.metrics("numFiles").value === 1) + + // Exactly one copy should be ReusedSubqueryExec. AQE (top-down traversal via + // CometReuseSubquery) puts it on the scan; non-AQE (bottom-up via Spark's + // ReuseExchangeAndSubquery) puts it on the filter. Either way the subquery + // executes once. + val allReused = collectWithSubqueries(cometPlan) { case p: SparkPlan => + p.expressions.flatMap(_.collect { + case s: ScalarSubquery if s.plan.isInstanceOf[ReusedSubqueryExec] => s + }) + }.flatten + assert( + allReused.nonEmpty, + s"Expected at least one ReusedSubqueryExec in plan (AQE=$aqeEnabled)") + + if (aqeEnabled) { + assert( + scalarSubqueries.head.plan.isInstanceOf[ReusedSubqueryExec], + "Expected ReusedSubqueryExec on scan's ScalarSubquery (AQE=true) " + + s"but got ${scalarSubqueries.head.plan.getClass.getSimpleName}") + } + } + } + } + } + test("Comet native metrics: scan") { Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanMode =>