From 392fa7f7200eccaeb4849a4140755be1071e1152 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Apr 2026 12:38:35 -0600 Subject: [PATCH 1/3] debug: assert columnar-to-row transitions have columnar children Diagnostic-only assertion in EliminateRedundantTransitions to surface an intermittent bad-plan shape reported after #3879 (DPP shuffle fallback). Every ColumnarToRowExec / CometColumnarToRowExec / CometNativeColumnarToRowExec in the post-rule plan must have child.supportsColumnar == true. Not intended for merge. Remove once the root cause is identified. --- .../rules/EliminateRedundantTransitions.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index 7402a83248..201ca0f038 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -57,6 +57,7 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa override def apply(plan: SparkPlan): SparkPlan = { val newPlan = _apply(plan) + checkTransitionInvariant(newPlan) if (showTransformations && !newPlan.fastEquals(plan)) { logInfo(s""" |=== Applying Rule $ruleName === @@ -66,6 +67,29 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa newPlan } + // Diagnostic only: every columnar-to-row transition must have a columnar child. + // Intended to surface the bad-plan shape reported after #3879 as a loud + // driver-side failure instead of a runtime symptom. Remove once root cause + // is identified. + private def checkTransitionInvariant(plan: SparkPlan): Unit = { + plan.foreach { + case c: ColumnarToRowExec if !c.child.supportsColumnar => + val cls = c.child.getClass.getName + throw new IllegalStateException( + s"ColumnarToRowExec wraps non-columnar child ($cls):\n" + plan.treeString) + case c: CometColumnarToRowExec if !c.child.supportsColumnar => + val cls = c.child.getClass.getName + throw new IllegalStateException( + s"CometColumnarToRowExec wraps non-columnar child ($cls):\n" + plan.treeString) + case c: CometNativeColumnarToRowExec if !c.child.supportsColumnar => + val cls = c.child.getClass.getName + throw new IllegalStateException( + s"CometNativeColumnarToRowExec wraps non-columnar child ($cls):\n" + + plan.treeString) + case _ => + } + } + private def _apply(plan: SparkPlan): SparkPlan = { val eliminatedPlan = plan transformUp { case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec) From 16fe78da614c47eb131130f8d971fb73e376d7db Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Apr 2026 12:43:11 -0600 Subject: [PATCH 2/3] debug: gate assertion behind spark.comet.assertValidPlanTransitions.enabled Converts the columnar-to-row invariant check in EliminateRedundantTransitions from an always-on assertion into an off-by-default diagnostic config, so the check stays available for future transition regressions without runtime cost in production. --- .../src/main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++ .../comet/rules/EliminateRedundantTransitions.scala | 13 ++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..b6ea86d4c7 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -618,6 +618,16 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ASSERT_VALID_PLAN_TRANSITIONS: ConfigEntry[Boolean] = + conf("spark.comet.assertValidPlanTransitions.enabled") + .category(CATEGORY_EXEC_EXPLAIN) + .doc( + "When enabled, Comet asserts that every columnar-to-row transition in the " + + "post-rule plan has a columnar child. Intended for debugging intermittent " + + "bad-plan shapes; off by default.") + .booleanConf + .createWithDefault(false) + val COMET_LOG_FALLBACK_REASONS: ConfigEntry[Boolean] = conf("spark.comet.logFallbackReasons.enabled") .category(CATEGORY_EXEC_EXPLAIN) diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index 201ca0f038..8ebe72785a 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -54,10 +54,14 @@ import org.apache.comet.CometConf case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] { private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get() + private lazy val assertValidPlanTransitions = + CometConf.COMET_ASSERT_VALID_PLAN_TRANSITIONS.get() override def apply(plan: SparkPlan): SparkPlan = { val newPlan = _apply(plan) - checkTransitionInvariant(newPlan) + if (assertValidPlanTransitions) { + checkTransitionInvariant(newPlan) + } if (showTransformations && !newPlan.fastEquals(plan)) { logInfo(s""" |=== Applying Rule $ruleName === @@ -67,10 +71,9 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa newPlan } - // Diagnostic only: every columnar-to-row transition must have a columnar child. - // Intended to surface the bad-plan shape reported after #3879 as a loud - // driver-side failure instead of a runtime symptom. Remove once root cause - // is identified. + // Gated by spark.comet.assertValidPlanTransitions.enabled. + // Every columnar-to-row transition must have a columnar child; violations + // indicate a bad plan produced by an earlier rule. private def checkTransitionInvariant(plan: SparkPlan): Unit = { plan.foreach { case c: ColumnarToRowExec if !c.child.supportsColumnar => From a8a7dc71c026b9416a9a89cb3c887885cf3a789f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Apr 2026 19:37:46 -0600 Subject: [PATCH 3/3] enable for testing --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b6ea86d4c7..d58dfa62da 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -626,7 +626,7 @@ object CometConf extends ShimCometConf { "post-rule plan has a columnar child. Intended for debugging intermittent " + "bad-plan shapes; off by default.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_LOG_FALLBACK_REASONS: ConfigEntry[Boolean] = conf("spark.comet.logFallbackReasons.enabled")