Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,23 +302,30 @@ impl SparkPhysicalExprAdapter {
let physical_type = cast.input_field().data_type();
let target_type = cast.target_field().data_type();

// For complex nested types (Struct, List, Map) and Timestamp timezone
// mismatches, use CometCastColumnExpr with spark_parquet_convert which
// handles field-name-based selection, reordering, nested type casting,
// and metadata-only timestamp timezone relabeling correctly.
// For complex nested types (Struct, List, Map), Timestamp timezone
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
// reordering, nested type casting, metadata-only timestamp timezone
// relabeling, and raw value reinterpretation correctly.
//
// Timestamp mismatches (e.g., Timestamp(us, None) -> Timestamp(us, Some("UTC")))
// occur when INT96 Parquet timestamps are coerced to Timestamp(us, None) by
// DataFusion but the logical schema expects Timestamp(us, Some("UTC")).
// Using Spark's Cast here would incorrectly treat the None-timezone values as
// local time (TimestampNTZ) and apply a timezone conversion, but the values are
// already in UTC. spark_parquet_convert handles this as a metadata-only change.
//
// Timestamp→Int64 occurs when Spark's `nanosAsLong` config converts
// TIMESTAMP(NANOS) to LongType. Spark's Cast would divide by MICROS_PER_SECOND
// (assuming microseconds), but the values are nanoseconds. Arrow cast correctly
// reinterprets the raw i64 value without conversion.
if matches!(
(physical_type, target_type),
(DataType::Struct(_), DataType::Struct(_))
| (DataType::List(_), DataType::List(_))
| (DataType::Map(_, _), DataType::Map(_, _))
| (DataType::Timestamp(_, _), DataType::Timestamp(_, _))
| (DataType::Timestamp(_, _), DataType::Int64)
) {
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
CometCastColumnExpr::new(
Expand Down
Loading