From 31029ce66d6634f209ca13f19b7a4bb39a16a5bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 13:43:50 -0600 Subject: [PATCH 1/2] fix: reject string/binary read as numeric in native_datafusion scan The native_datafusion Spark physical expression adapter fell through to DataFusion's cast for Utf8/Binary -> numeric type changes (because SparkCastOptions.is_adapting_schema delegates to DataFusion's cast), which silently parses the bytes (returning nulls or, on some paths, reinterpreting raw bytes) where Spark's vectorized reader and the native_iceberg_compat scan throw SchemaColumnConvertNotSupportedException. Add a guard in replace_with_spark_cast that rejects when the source type is Utf8/LargeUtf8/Binary/LargeBinary and the target type is any integer or floating-point type, mirroring TypeUtil.checkParquetType on the JVM side. Closes #4088. --- native/core/src/parquet/schema_adapter.rs | 33 +++++++++++++++++++ .../comet/parquet/ParquetReadSuite.scala | 19 +++++++++++ 2 files changed, 52 insertions(+) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index af79d9082d..ce12e971e3 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -385,6 +385,39 @@ impl SparkPhysicalExprAdapter { let physical_type = cast.input_field().data_type(); let target_type = cast.target_field().data_type(); + // Reject reading a string/binary Parquet column as a numeric type. + // Mirrors Spark's TypeUtil.checkParquetType for the BINARY case: a + // BINARY (or UTF8-annotated BINARY) physical column is only readable + // as StringType, BinaryType, or a binary-encoded decimal. Without + // this guard, Spark's Cast below (in is_adapting_schema mode) would + // delegate to DataFusion's cast, which silently parses the bytes + // (returning nulls for non-numeric strings or, depending on the + // path, raw byte reinterpretation). See issue #4088. + if matches!( + physical_type, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary + ) && matches!( + target_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + ) { + return Err(DataFusionError::Plan(format!( + "Parquet column cannot be converted. Column: [{}], \ + Expected: {}, Found: {}", + cast.input_field().name(), + target_type, + physical_type, + ))); + } + // For complex nested types (Struct, List, Map), Timestamp timezone // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr // with spark_parquet_convert which handles field-name-based selection, diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 75ac889228..ac8b5b4c0f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -998,6 +998,25 @@ abstract class ParquetReadSuite extends CometTestBase { } } + test("native_datafusion rejects string read as numeric") { + // Regression guard for https://github.com/apache/datafusion-comet/issues/4088. + // Spark's vectorized reader rejects reading a Parquet BINARY column as any + // numeric type on all versions (see TypeUtil.checkParquetType, BINARY case). + // The native_datafusion scan must do the same in its schema adapter rather + // than letting DataFusion's cast silently parse the bytes (returning nulls + // for non-numeric strings, or raw byte reinterpretation in some paths). + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq("a", "b", "c").toDF("c").write.parquet(path) + val df = spark.read.schema("c int").parquet(path) + assertThrows[SparkException](df.collect()) + } + } + } + test("type widening: byte → short/int/long, short → int/long, int → long") { withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { withTempPath { dir => From dfcf960cfc0ab87ab9acccdc77f45c8f1f708f17 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 27 Apr 2026 12:58:38 -0600 Subject: [PATCH 2/2] fix: tighten guard to allow-list of valid string/binary targets Per review feedback, switch from a deny-list of numeric target types to an allow-list of the only targets Spark's vectorized reader permits for a BINARY column: StringType, BinaryType, and a binary-encoded decimal. This also rejects mismatches like BINARY -> Boolean, BINARY -> Date, and BINARY -> Timestamp, which the previous guard silently handed to DataFusion's cast. Test now covers numeric, boolean, date, and timestamp targets. --- native/core/src/parquet/schema_adapter.rs | 39 ++++++++++--------- .../comet/parquet/ParquetReadSuite.scala | 21 ++++++---- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index ce12e971e3..f2ff5ddce1 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -385,29 +385,30 @@ impl SparkPhysicalExprAdapter { let physical_type = cast.input_field().data_type(); let target_type = cast.target_field().data_type(); - // Reject reading a string/binary Parquet column as a numeric type. - // Mirrors Spark's TypeUtil.checkParquetType for the BINARY case: a - // BINARY (or UTF8-annotated BINARY) physical column is only readable - // as StringType, BinaryType, or a binary-encoded decimal. Without - // this guard, Spark's Cast below (in is_adapting_schema mode) would - // delegate to DataFusion's cast, which silently parses the bytes - // (returning nulls for non-numeric strings or, depending on the - // path, raw byte reinterpretation). See issue #4088. + // Reject reading a string/binary Parquet column as anything other + // than string, binary, or a binary-encoded decimal. This mirrors + // Spark's TypeUtil.checkParquetType for the BINARY case (lines + // 208-221): a BINARY (or UTF8-annotated BINARY) physical column is + // only readable as StringType, BinaryType, or a binary-encoded + // decimal; every other target type (numeric, boolean, date, + // timestamp, ...) raises SchemaColumnConvertNotSupportedException. + // + // Without this guard, Spark's Cast below (in is_adapting_schema + // mode) falls through to DataFusion's cast, which silently parses + // the bytes (returning nulls for non-numeric strings, parsing + // date/timestamp/boolean strings, or in some paths reinterpreting + // raw bytes). See issue #4088. if matches!( physical_type, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary - ) && matches!( + ) && !matches!( target_type, - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float32 - | DataType::Float64 + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) ) { return Err(DataFusionError::Plan(format!( "Parquet column cannot be converted. Column: [{}], \ diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index ac8b5b4c0f..c19a285ca0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -998,21 +998,26 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("native_datafusion rejects string read as numeric") { + test("native_datafusion rejects string read as non-string/binary type") { // Regression guard for https://github.com/apache/datafusion-comet/issues/4088. - // Spark's vectorized reader rejects reading a Parquet BINARY column as any - // numeric type on all versions (see TypeUtil.checkParquetType, BINARY case). - // The native_datafusion scan must do the same in its schema adapter rather - // than letting DataFusion's cast silently parse the bytes (returning nulls - // for non-numeric strings, or raw byte reinterpretation in some paths). + // Spark's vectorized reader rejects reading a Parquet BINARY column as + // anything except StringType, BinaryType, or a binary-encoded decimal (see + // TypeUtil.checkParquetType, BINARY case). The native_datafusion scan + // must do the same in its schema adapter rather than letting DataFusion's + // cast silently parse the bytes or reinterpret them. withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { withTempPath { dir => val path = dir.getCanonicalPath Seq("a", "b", "c").toDF("c").write.parquet(path) - val df = spark.read.schema("c int").parquet(path) - assertThrows[SparkException](df.collect()) + // Cover representative non-string/binary target types: numeric, + // boolean, date, and timestamp. Each would silently produce wrong + // results without the schema-adapter guard. + Seq("int", "bigint", "double", "boolean", "date", "timestamp").foreach { sqlType => + val df = spark.read.schema(s"c $sqlType").parquet(path) + assertThrows[SparkException](df.collect()) + } } } }