Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.comet.{CometMetricNode, CometNativeExec, CometPlan, CometSinkPlaceHolder}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -555,16 +555,27 @@ object CometShuffleExchangeExec
* Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic
* Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the
* stage with Comet shuffle creates inefficient row-to-columnar transitions.
*
* The walk explicitly descends into `QueryStageExec.plan`. Under AQE, a child subtree that has
* already materialized is wrapped in a `ShuffleQueryStageExec` (a `LeafExecNode`), so a plain
* `.exists` stops at the wrapper. That would cause the same shuffle to return a different
* answer here during initial planning (DPP visible, fall back to Spark) vs. AQE stage prep (DPP
* hidden, convert to Comet) and produce plan-shape inconsistencies across the two passes.
*/
private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

s.child.exists {
def containsDpp(plan: SparkPlan): Boolean = plan match {
case scan: FileSourceScanExec =>
scan.partitionFilters.exists(isDynamicPruningFilter)
case _ => false
case stage: QueryStageExec =>
containsDpp(stage.plan)
case other =>
other.children.exists(containsDpp)
}

containsDpp(s.child)
}

def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf

/**
* Pins the fix for the DPP fallback decision being stable across the initial-plan vs.
* AQE-stage-prep passes.
*
* Background: Comet's `stageContainsDPPScan` (used by `columnarShuffleSupported`) walks the
* shuffle's child tree looking for a `FileSourceScanExec` with a `PlanExpression` partition
* filter. Under AQE, once a child stage materializes, the subtree is replaced by a
* `ShuffleQueryStageExec` (a `LeafExecNode`, so `children == Seq.empty`). A plain `.exists` walk
* cannot descend into it, so the DPP scan becomes invisible and the same shuffle flips from Spark
* (initial plan) to Comet (stage prep) — producing plan-shape inconsistencies across the two
* passes.
*
* The fix descends into `QueryStageExec.plan` explicitly. This suite verifies the fix.
*/
class CometDppFallbackConsistencySuite extends CometTestBase {

private def buildDppTables(dir: java.io.File): Unit = {
val factPath = s"${dir.getAbsolutePath}/fact.parquet"
val dimPath = s"${dir.getAbsolutePath}/dim.parquet"
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
val sess = spark
import sess.implicits._
val oneDay = 24L * 60L * 60000L
val now = System.currentTimeMillis()
(0 until 400)
.map(i => (i, new java.sql.Date(now + (i % 40) * oneDay), i.toString))
.toDF("fact_id", "fact_date", "fact_str")
.write
.partitionBy("fact_date")
.parquet(factPath)
(0 until 40)
.map(i => (i, new java.sql.Date(now + i * oneDay), i.toString))
.toDF("dim_id", "dim_date", "dim_str")
.write
.parquet(dimPath)
}
spark.read.parquet(factPath).createOrReplaceTempView("dpp_consistency_fact")
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_consistency_dim")
}

private def unwrapAqe(plan: SparkPlan): SparkPlan = plan match {
case a: AdaptiveSparkPlanExec => a.initialPlan
case other => other
}

private def findFirstShuffle(plan: SparkPlan): Option[ShuffleExchangeExec] = {
var found: Option[ShuffleExchangeExec] = None
plan.foreach {
case s: ShuffleExchangeExec if found.isEmpty => found = Some(s)
case _ =>
}
found
}

test("columnarShuffleSupported detects DPP through a materialized ShuffleQueryStageExec") {
withTempDir { dir =>
buildDppTables(dir)
withSQLConf(
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true",
// Force SMJ so we get a shuffle above the DPP scan.
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.PREFER_SORTMERGEJOIN.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {

val df = spark.sql(
"select f.fact_date, count(*) c " +
"from dpp_consistency_fact f " +
"join dpp_consistency_dim d on f.fact_date = d.dim_date " +
"where d.dim_id > 35 " +
"group by f.fact_date")
val initialPlan = unwrapAqe(df.queryExecution.executedPlan)

val dppShuffle = findFirstShuffle(initialPlan).getOrElse {
fail(s"No ShuffleExchangeExec found in initial plan:\n${initialPlan.treeString}")
}

// (1) Direct call — the child subtree is walkable, DPP is visible.
val initialDecision = CometShuffleExchangeExec.columnarShuffleSupported(dppShuffle)

// (2) Simulate AQE having materialized the DPP stage: wrap the real DPP shuffle in a
// ShuffleQueryStageExec (the exact wrapper AQE places around a completed shuffle), then
// parent it under a new outer shuffle. The outer shuffle's child is now an opaque stage
// from the perspective of a naive `.exists` walk, but the DPP scan still lives in
// `stage.plan`.
val materializedStage = ShuffleQueryStageExec(
id = 0,
plan = dppShuffle,
_canonicalized = dppShuffle.canonicalized)
val outerShuffle = ShuffleExchangeExec(SinglePartition, materializedStage)
val postAqeDecision = CometShuffleExchangeExec.columnarShuffleSupported(outerShuffle)

// The fix descends into `QueryStageExec.plan`, so the outer shuffle must also see the
// DPP scan through the materialized stage and fall back to Spark. Without the fix
// this would return true (Comet shuffle), producing plan-shape inconsistencies across
// the two planning passes.
assert(
!initialDecision,
s"expected Spark fallback for the DPP shuffle, got $initialDecision")
assert(
!postAqeDecision,
"expected Spark fallback for a shuffle whose child tree contains a materialized " +
"ShuffleQueryStageExec wrapping a DPP scan, but got Comet conversion")
}
}
}
}
Loading