diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index 66b53af2bd..495253c832 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -1068,7 +1068,19 @@ impl ColumnarToRowContext { })?; Ok(Arc::new(decimal_array)) } - _ => Ok(Arc::clone(array)), + _ => { + // For any other type mismatch, attempt an Arrow cast. + // This handles cases like Int32 → Date32 (which can happen when Spark + // generates default column values using the physical storage type rather + // than the logical type). + let options = CastOptions::default(); + cast_with_options(array, schema_type, &options).map_err(|e| { + CometError::Internal(format!( + "Failed to cast array from {:?} to {:?}: {}", + actual_type, schema_type, e + )) + }) + } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala index a520098ed1..2345386143 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala @@ -232,11 +232,6 @@ case class CometNativeColumnarToRowExec(child: SparkPlan) object CometNativeColumnarToRowExec { - /** - * Checks if native columnar to row conversion is enabled. - */ - def isEnabled: Boolean = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() - /** * Checks if the given schema is supported by native columnar to row conversion. * diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 0e179b40fc..6b6f02f9fd 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -111,14 +111,16 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { - Seq("native", "jvm").foreach { shuffleMode => - super.test(testName + s" ($shuffleMode shuffle)", testTags: _*) { - withSQLConf( - CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", - CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { - testFun + Seq(("native", "false"), ("jvm", "true"), ("jvm", "false")).foreach { + case (shuffleMode, nativeC2R) => + super.test(testName + s" ($shuffleMode shuffle, nativeC2R=$nativeC2R)", testTags: _*) { + withSQLConf( + CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key -> nativeC2R, + CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { + testFun + } } - } } }