From a5b00fbdda9266109f9e67e82fabd6c5ebd7f96f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Feb 2026 10:52:01 -0700 Subject: [PATCH 1/5] fix: handle type mismatches in native columnar-to-row conversion When Spark generates default column values, it can produce Arrow arrays with physical types (e.g. Int32) that differ from the logical schema type (e.g. Date32). The c2r converter's maybe_cast_to_schema_type previously passed these through silently, causing downcast failures. Now the fallback arm attempts an Arrow cast for any type mismatch, fixing the immediate Date32 bug and preventing similar issues for other data types. Closes #3482 --- native/core/src/execution/columnar_to_row.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index 66b53af2bd..495253c832 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -1068,7 +1068,19 @@ impl ColumnarToRowContext { })?; Ok(Arc::new(decimal_array)) } - _ => Ok(Arc::clone(array)), + _ => { + // For any other type mismatch, attempt an Arrow cast. + // This handles cases like Int32 → Date32 (which can happen when Spark + // generates default column values using the physical storage type rather + // than the logical type). + let options = CastOptions::default(); + cast_with_options(array, schema_type, &options).map_err(|e| { + CometError::Internal(format!( + "Failed to cast array from {:?} to {:?}: {}", + actual_type, schema_type, e + )) + }) + } } } From 7bb1d19f0a33183a76ff7025ca366edccfe809f5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Feb 2026 10:55:27 -0700 Subject: [PATCH 2/5] remove unused code --- .../spark/sql/comet/CometNativeColumnarToRowExec.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala index a520098ed1..2345386143 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala @@ -232,11 +232,6 @@ case class CometNativeColumnarToRowExec(child: SparkPlan) object CometNativeColumnarToRowExec { - /** - * Checks if native columnar to row conversion is enabled. - */ - def isEnabled: Boolean = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() - /** * Checks if the given schema is supported by native columnar to row conversion. * From 258fa860cc7cee85e3abf146c5b01c1afb71d0f4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Feb 2026 10:55:53 -0700 Subject: [PATCH 3/5] enable COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED for testing in CI --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 480eafdcb7..a67346705e 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -303,7 +303,7 @@ object CometConf extends ShimCometConf { "of the JVM implementation. This can improve performance for queries that need to " + "convert between columnar and row formats. This is an experimental feature.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") From 75f133e7b3ee5ce137ccffe87d028c8cb7fb0ffc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Feb 2026 11:50:21 -0700 Subject: [PATCH 4/5] revert config change --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index a67346705e..480eafdcb7 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -303,7 +303,7 @@ object CometConf extends ShimCometConf { "of the JVM implementation. This can improve performance for queries that need to " + "convert between columnar and row formats. This is an experimental feature.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") From 6317aebfd97181b844e011091260d954854e6166 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Feb 2026 12:57:20 -0700 Subject: [PATCH 5/5] test --- .../org/apache/comet/CometFuzzTestBase.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 0e179b40fc..6b6f02f9fd 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -111,14 +111,16 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { - Seq("native", "jvm").foreach { shuffleMode => - super.test(testName + s" ($shuffleMode shuffle)", testTags: _*) { - withSQLConf( - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", - CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { - testFun + Seq(("native", "false"), ("jvm", "true"), ("jvm", "false")).foreach { + case (shuffleMode, nativeC2R) => + super.test(testName + s" ($shuffleMode shuffle, nativeC2R=$nativeC2R)", testTags: _*) { + withSQLConf( + CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key -> nativeC2R, + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { + testFun + } } - } } }