Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,47 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_FUZZ_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.fuzz.fallback.enabled")
.category(CATEGORY_TESTING)
.doc(
"Diagnostic: when enabled, Comet randomly vetoes converting shuffles/operators to " +
"Comet equivalents so the rule pipeline produces irregular Spark/Comet boundaries. " +
"Used to surface plan-shape bugs that are hard to trigger via normal queries. " +
"Decisions are deterministic given `spark.comet.fuzz.fallback.seed`.")
.booleanConf
.createWithDefault(false)

val COMET_FUZZ_FALLBACK_SEED: ConfigEntry[Long] =
conf("spark.comet.fuzz.fallback.seed")
.category(CATEGORY_TESTING)
.doc("Seed for the fuzz fallback RNG. Same seed + same query reproduces the same pattern " +
"of forced fallbacks. Only used when `spark.comet.fuzz.fallback.enabled=true`.")
.longConf
.createWithDefault(0L)

val COMET_FUZZ_FALLBACK_SHUFFLE_VETO_PROBABILITY: ConfigEntry[Double] =
conf("spark.comet.fuzz.fallback.shuffleVetoProbability")
.category(CATEGORY_TESTING)
.doc(
"Probability in [0.0, 1.0] that the fuzz fallback vetoes converting a given " +
"ShuffleExchangeExec to a CometShuffleExchangeExec. Only used when " +
"`spark.comet.fuzz.fallback.enabled=true`.")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Probability must be in [0.0, 1.0]")
.createWithDefault(0.5)

val COMET_FUZZ_FALLBACK_EXEC_VETO_PROBABILITY: ConfigEntry[Double] =
conf("spark.comet.fuzz.fallback.execVetoProbability")
.category(CATEGORY_TESTING)
.doc(
"Probability in [0.0, 1.0] that the fuzz fallback vetoes converting a given " +
"Spark operator (aggregate, join, project, etc.) to its Comet equivalent. " +
"Only used when `spark.comet.fuzz.fallback.enabled=true`.")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Probability must be in [0.0, 1.0]")
.createWithDefault(0.0)

val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.category(CATEGORY_EXEC)
Expand Down
89 changes: 89 additions & 0 deletions spark/src/main/scala/org/apache/comet/FuzzFallback.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet

import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.execution.SparkPlan

/**
* Diagnostic utility that randomly vetoes Comet conversions so the rule pipeline produces
* irregular Spark/Comet boundaries. Used by fuzz tests to surface plan-shape bugs that arise when
* adjacent operators belong to different execution modes (e.g. the assertion failure described in
* issue #3949).
*
* Determinism: each decision is a pure function of the seed, the node identity hash, and the
* decision kind. This means repeated calls for the same node return the same answer (important
* because `getSupportLevel` and `createExec` are called at different times during rule
* application), and a failing seed can be reproduced by rerunning the test with the same
* configuration.
*/
object FuzzFallback {

// Cache decisions per (kind, identityHashCode(plan)). The cache is cleared between queries via
// reset(); identity hash collisions within one query are astronomically unlikely.
private val decisions = new ConcurrentHashMap[(Int, Int), Boolean]()

/** Reset cached decisions. Call this between queries so every query starts clean. */
def reset(): Unit = decisions.clear()

private def decide(kind: Int, plan: SparkPlan, probability: Double): Boolean = {
if (probability <= 0.0) return false
val key = (kind, System.identityHashCode(plan))
val cached = decisions.get(key)
if (cached != null) return cached
val seed = CometConf.COMET_FUZZ_FALLBACK_SEED.get()
// Mix seed, kind, and node identity into a deterministic hash, then compare against the
// probability. Using SplitMix64-style avalanche gives a reasonable uniform distribution.
var h: Long = seed
h ^= kind.toLong * 0x9e3779b97f4a7c15L
h ^= System.identityHashCode(plan).toLong * 0xbf58476d1ce4e5b9L
h ^= h >>> 30
h *= 0xbf58476d1ce4e5b9L
h ^= h >>> 27
h *= 0x94d049bb133111ebL
h ^= h >>> 31
// Map to [0.0, 1.0)
val u = (h >>> 11) * (1.0 / (1L << 53))
val result = u < probability
decisions.put(key, result)
result
}

/**
* Decide whether to veto converting this shuffle exchange to a Comet shuffle. Returns false
* unless fuzz fallback is enabled. When enabled, returns true with probability
* `spark.comet.fuzz.fallback.shuffleVetoProbability`.
*/
def shouldVetoShuffle(plan: SparkPlan): Boolean = {
if (!CometConf.COMET_FUZZ_FALLBACK_ENABLED.get()) false
else decide(1, plan, CometConf.COMET_FUZZ_FALLBACK_SHUFFLE_VETO_PROBABILITY.get())
}

/**
* Decide whether to veto converting this operator to a Comet equivalent. Returns false unless
* fuzz fallback is enabled. When enabled, returns true with probability
* `spark.comet.fuzz.fallback.execVetoProbability`.
*/
def shouldVetoExec(plan: SparkPlan): Boolean = {
if (!CometConf.COMET_FUZZ_FALLBACK_ENABLED.get()) false
else decide(2, plan, CometConf.COMET_FUZZ_FALLBACK_EXEC_VETO_PROBABILITY.get())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo, FuzzFallback}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
Expand Down Expand Up @@ -269,6 +269,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
handler match {
case Some(handler) =>
if (FuzzFallback.shouldVetoExec(op)) {
withInfo(op, "Fuzz fallback vetoed operator conversion")
return op
}
return convertToComet(op, handler).getOrElse(op)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.util.random.XORShiftRandom

import com.google.common.base.Objects

import org.apache.comet.CometConf
import org.apache.comet.{CometConf, FuzzFallback}
import org.apache.comet.CometConf.{COMET_EXEC_SHUFFLE_ENABLED, COMET_SHUFFLE_MODE}
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleManagerEnabled, withInfo}
import org.apache.comet.serde.{Compatible, OperatorOuterClass, QueryPlanSerde, SupportLevel, Unsupported}
Expand Down Expand Up @@ -342,6 +342,11 @@ object CometShuffleExchangeExec
return false
}

if (FuzzFallback.shouldVetoShuffle(s)) {
withInfo(s, "Fuzz fallback vetoed native shuffle")
return false
}

val inputs = s.child.output

for (input <- inputs) {
Expand Down Expand Up @@ -459,6 +464,11 @@ object CometShuffleExchangeExec
return false
}

if (FuzzFallback.shouldVetoShuffle(s)) {
withInfo(s, "Fuzz fallback vetoed columnar shuffle")
return false
}

if (!isCometJVMShuffleMode(s.conf)) {
withInfo(s, "Comet columnar shuffle not enabled")
return false
Expand Down
Loading
Loading