diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 0aa926ee7fa4..65ee0b33fc71 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -36,6 +36,7 @@ arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } parquet-variant = { workspace = true } parquet-variant-json = { workspace = true } +chrono = {workspace = true} [lib] name = "parquet_variant_compute" diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index ed40538cabe1..6c212e390211 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -15,14 +15,24 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::{VariantArray, VariantArrayBuilder}; -use arrow::array::{Array, AsArray}; +use arrow::array::{ + Array, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; use arrow::datatypes::{ i256, BinaryType, BinaryViewType, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; -use arrow_schema::{ArrowError, DataType}; +use arrow::temporal_conversions::{ + timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, + timestamp_us_to_datetime, +}; +use arrow_schema::{ArrowError, DataType, TimeUnit}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use half::f16; use parquet_variant::{Variant, VariantDecimal16, VariantDecimal4, VariantDecimal8}; @@ -75,6 +85,74 @@ macro_rules! non_generic_conversion { }}; } +fn convert_timestamp( + time_unit: &TimeUnit, + time_zone: &Option>, + input: &dyn Array, + builder: &mut VariantArrayBuilder, +) { + let native_datetimes: Vec> = match time_unit { + arrow_schema::TimeUnit::Second => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampSecondArray"); + + ts_array + .iter() + .map(|x| x.map(|y| timestamp_s_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Millisecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMillisecondArray"); + + ts_array + .iter() + .map(|x| x.map(|y| timestamp_ms_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Microsecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMicrosecondArray"); + ts_array + .iter() + .map(|x| x.map(|y| timestamp_us_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Nanosecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampNanosecondArray"); + ts_array + .iter() + .map(|x| x.map(|y| timestamp_ns_to_datetime(y).unwrap())) + .collect() + } + }; + + for x in native_datetimes { + match x { + Some(ndt) => { + if time_zone.is_none() { + builder.append_variant(ndt.into()); + } else { + let utc_dt: DateTime = Utc.from_utc_datetime(&ndt); + builder.append_variant(utc_dt.into()); + } + } + None => { + builder.append_null(); + } + } + } +} + /// Convert a decimal value to a `VariantDecimal` macro_rules! decimal_to_variant_decimal { ($v:ident, $scale:expr, $value_type:ty, $variant_type:ty) => { @@ -123,6 +201,12 @@ macro_rules! decimal_to_variant_decimal { /// assert!(result.is_null(1)); // note null, not Variant::Null /// assert_eq!(result.value(2), Variant::Int64(3)); /// ``` +/// +/// For `DataType::Timestamp`s: if the timestamp has any level of precision +/// greater than a microsecond, it will be truncated. For example +/// `1970-01-01T00:00:01.234567890Z` +/// will be truncated to +/// `1970-01-01T00:00:01.234567Z` pub fn cast_to_variant(input: &dyn Array) -> Result { let mut builder = VariantArrayBuilder::new(input.len()); @@ -234,6 +318,9 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { builder.append_null(); } } + DataType::Timestamp(time_unit, time_zone) => { + convert_timestamp(time_unit, time_zone, input, &mut builder); + } dt => { return Err(ArrowError::CastError(format!( "Unsupported data type for casting to Variant: {dt:?}", @@ -274,6 +361,59 @@ mod tests { }; } + #[test] + fn test_cast_to_variant_timestamp() { + let run_array_tests = + |microseconds: i64, array_ntz: Arc, array_tz: Arc| { + let timestamp = DateTime::from_timestamp_nanos(microseconds * 1000); + run_test( + array_tz, + vec![Some(Variant::TimestampMicros(timestamp)), None], + ); + run_test( + array_ntz, + vec![ + Some(Variant::TimestampNtzMicros(timestamp.naive_utc())), + None, + ], + ); + }; + + let nanosecond = 1234567890; + let microsecond = 1234567; + let millisecond = 1234; + let second = 1; + + let second_array = TimestampSecondArray::from(vec![Some(second), None]); + run_array_tests( + second * 1000 * 1000, + Arc::new(second_array.clone()), + Arc::new(second_array.with_timezone("+01:00".to_string())), + ); + + let millisecond_array = TimestampMillisecondArray::from(vec![Some(millisecond), None]); + run_array_tests( + millisecond * 1000, + Arc::new(millisecond_array.clone()), + Arc::new(millisecond_array.with_timezone("+01:00".to_string())), + ); + + let microsecond_array = TimestampMicrosecondArray::from(vec![Some(microsecond), None]); + run_array_tests( + microsecond, + Arc::new(microsecond_array.clone()), + Arc::new(microsecond_array.with_timezone("+01:00".to_string())), + ); + + // nanoseconds should get truncated to microseconds + let nanosecond_array = TimestampNanosecondArray::from(vec![Some(nanosecond), None]); + run_array_tests( + microsecond, + Arc::new(nanosecond_array.clone()), + Arc::new(nanosecond_array.with_timezone("+01:00".to_string())), + ) + } + #[test] fn test_cast_to_variant_fixed_size_binary() { let v1 = vec![1, 2];