From 67833edcd75dd053a22e7003f6b8cb6ef563438c Mon Sep 17 00:00:00 2001 From: Mark Nash Date: Mon, 11 Aug 2025 22:41:02 -0700 Subject: [PATCH 1/4] wrote code and got one test working --- parquet-variant-compute/Cargo.toml | 1 + .../src/cast_to_variant.rs | 140 +++++++++++++++++- 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 0aa926ee7fa4..65ee0b33fc71 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -36,6 +36,7 @@ arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } parquet-variant = { workspace = true } parquet-variant-json = { workspace = true } +chrono = {workspace = true} [lib] name = "parquet_variant_compute" diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index 617e5cfbe52e..95a772425a1d 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -16,12 +16,21 @@ // under the License. use crate::{VariantArray, VariantArrayBuilder}; -use arrow::array::{Array, AsArray}; +use arrow::array::{ + Array, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; use arrow::datatypes::{ BinaryType, BinaryViewType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, - Int64Type, Int8Type, LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + Int64Type, Int8Type, LargeBinaryType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, +}; +use arrow::temporal_conversions::{ + as_datetime, timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, + timestamp_us_to_datetime, }; use arrow_schema::{ArrowError, DataType}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use half::f16; use parquet_variant::Variant; @@ -151,6 +160,80 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { DataType::FixedSizeBinary(_) => { cast_conversion_nongeneric!(as_fixed_size_binary, |v| v, input, builder); } + DataType::Timestamp(time_unit, time_zone) => { + let native_datetimes: Vec> = match time_unit { + arrow_schema::TimeUnit::Second => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampSecondArray"); + + ts_array + .iter() + .map(|x| match x { + Some(y) => Some(timestamp_s_to_datetime(y).unwrap()), + None => None, + }) + .collect() + } + arrow_schema::TimeUnit::Millisecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMillisecondArray"); + + ts_array + .iter() + .map(|x| match x { + Some(y) => Some(timestamp_ms_to_datetime(y).unwrap()), + None => None, + }) + .collect() + } + arrow_schema::TimeUnit::Microsecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMicrosecondArray"); + ts_array + .iter() + .map(|x| match x { + Some(y) => Some(timestamp_us_to_datetime(y).unwrap()), + None => None, + }) + .collect() + } + arrow_schema::TimeUnit::Nanosecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampNanosecondArray"); + ts_array + .iter() + .map(|x| match x { + Some(y) => Some(timestamp_ns_to_datetime(y).unwrap()), + None => None, + }) + .collect() + } + }; + + for x in native_datetimes { + match x { + Some(ndt) => { + if time_zone.is_none() { + builder.append_variant(ndt.into()); + } else { + let utc_dt: DateTime = Utc.from_utc_datetime(&ndt); + builder.append_variant(utc_dt.into()); + } + } + None => { + builder.append_null(); + } + } + } + } dt => { return Err(ArrowError::CastError(format!( "Unsupported data type for casting to Variant: {dt:?}", @@ -175,6 +258,59 @@ mod tests { use parquet_variant::{Variant, VariantDecimal16}; use std::{sync::Arc, vec}; + #[test] + fn test_cast_to_variant_timestamp() { + let one_second = 1; + let two_second = 2; + let three_second = 3; + + let arr = TimestampSecondArray::from(vec![ + Some(one_second), + Some(two_second), + None, + Some(three_second), + ]); + + run_test( + Arc::new(arr.clone()), + vec![ + Some(Variant::TimestampNtzMicros( + DateTime::from_timestamp_millis(one_second * 1000) + .unwrap() + .naive_utc(), + )), + Some(Variant::TimestampNtzMicros( + DateTime::from_timestamp_millis(two_second * 1000) + .unwrap() + .naive_utc(), + )), + None, + Some(Variant::TimestampNtzMicros( + DateTime::from_timestamp_millis(three_second * 1000) + .unwrap() + .naive_utc(), + )), + ], + ); + + let arr = arr.with_timezone("+01:00".to_string()); + run_test( + Arc::new(arr), + vec![ + Some(Variant::TimestampMicros( + DateTime::from_timestamp_millis(one_second * 1000).unwrap(), + )), + Some(Variant::TimestampMicros( + DateTime::from_timestamp_millis(two_second * 1000).unwrap(), + )), + None, + Some(Variant::TimestampMicros( + DateTime::from_timestamp_millis(three_second * 1000).unwrap(), + )), + ], + ); + } + #[test] fn test_cast_to_variant_fixed_size_binary() { let v1 = vec![1, 2]; From da6f266706528ffcba2ca2c55674471e9c9ab8da Mon Sep 17 00:00:00 2001 From: Mark Nash Date: Mon, 11 Aug 2025 23:50:08 -0700 Subject: [PATCH 2/4] Added all tests and documentation --- .../src/cast_to_variant.rs | 94 ++++++++++--------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index 95a772425a1d..4a6e453eff1b 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -103,6 +103,12 @@ macro_rules! cast_conversion_nongeneric { /// assert!(result.is_null(1)); // note null, not Variant::Null /// assert_eq!(result.value(2), Variant::Int64(3)); /// ``` +/// +/// For `DataType::Timestamp`s: if the timestamp has any level of precision +/// greater than a microsecond, it will be truncated. For example +/// `1970-01-01T00:00:01.234567890Z` +/// will be truncated to +/// `1970-01-01T00:00:01.234567Z` pub fn cast_to_variant(input: &dyn Array) -> Result { let mut builder = VariantArrayBuilder::new(input.len()); @@ -260,55 +266,55 @@ mod tests { #[test] fn test_cast_to_variant_timestamp() { - let one_second = 1; - let two_second = 2; - let three_second = 3; + let run_array_tests = + |microseconds: i64, array_ntz: Arc, array_tz: Arc| { + let timestamp = DateTime::from_timestamp_nanos(microseconds * 1000); + run_test( + array_tz, + vec![Some(Variant::TimestampMicros(timestamp)), None], + ); + run_test( + array_ntz, + vec![ + Some(Variant::TimestampNtzMicros(timestamp.naive_utc())), + None, + ], + ); + }; - let arr = TimestampSecondArray::from(vec![ - Some(one_second), - Some(two_second), - None, - Some(three_second), - ]); + let nanosecond = 1234567890; + let microsecond = 1234567; + let millisecond = 1234; + let second = 1; - run_test( - Arc::new(arr.clone()), - vec![ - Some(Variant::TimestampNtzMicros( - DateTime::from_timestamp_millis(one_second * 1000) - .unwrap() - .naive_utc(), - )), - Some(Variant::TimestampNtzMicros( - DateTime::from_timestamp_millis(two_second * 1000) - .unwrap() - .naive_utc(), - )), - None, - Some(Variant::TimestampNtzMicros( - DateTime::from_timestamp_millis(three_second * 1000) - .unwrap() - .naive_utc(), - )), - ], + let second_array = TimestampSecondArray::from(vec![Some(second), None]); + run_array_tests( + second * 1000 * 1000, + Arc::new(second_array.clone()), + Arc::new(second_array.with_timezone("+01:00".to_string())), ); - let arr = arr.with_timezone("+01:00".to_string()); - run_test( - Arc::new(arr), - vec![ - Some(Variant::TimestampMicros( - DateTime::from_timestamp_millis(one_second * 1000).unwrap(), - )), - Some(Variant::TimestampMicros( - DateTime::from_timestamp_millis(two_second * 1000).unwrap(), - )), - None, - Some(Variant::TimestampMicros( - DateTime::from_timestamp_millis(three_second * 1000).unwrap(), - )), - ], + let millisecond_array = TimestampMillisecondArray::from(vec![Some(millisecond), None]); + run_array_tests( + millisecond * 1000, + Arc::new(millisecond_array.clone()), + Arc::new(millisecond_array.with_timezone("+01:00".to_string())), ); + + let microsecond_array = TimestampMicrosecondArray::from(vec![Some(microsecond), None]); + run_array_tests( + microsecond, + Arc::new(microsecond_array.clone()), + Arc::new(microsecond_array.with_timezone("+01:00".to_string())), + ); + + // nanoseconds should get truncated to microseconds + let nanosecond_array = TimestampNanosecondArray::from(vec![Some(nanosecond), None]); + run_array_tests( + microsecond, + Arc::new(nanosecond_array.clone()), + Arc::new(nanosecond_array.with_timezone("+01:00".to_string())), + ) } #[test] From 45fc722835903b50f4686fcd0134e68331fc208d Mon Sep 17 00:00:00 2001 From: Mark Nash Date: Mon, 11 Aug 2025 23:55:00 -0700 Subject: [PATCH 3/4] cargo fmt && cargo clippy --- .../src/cast_to_variant.rs | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index 4a6e453eff1b..a3488df13a42 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -22,11 +22,10 @@ use arrow::array::{ }; use arrow::datatypes::{ BinaryType, BinaryViewType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, - Int64Type, Int8Type, LargeBinaryType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, + Int64Type, Int8Type, LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow::temporal_conversions::{ - as_datetime, timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, + timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, timestamp_us_to_datetime, }; use arrow_schema::{ArrowError, DataType}; @@ -103,7 +102,7 @@ macro_rules! cast_conversion_nongeneric { /// assert!(result.is_null(1)); // note null, not Variant::Null /// assert_eq!(result.value(2), Variant::Int64(3)); /// ``` -/// +/// /// For `DataType::Timestamp`s: if the timestamp has any level of precision /// greater than a microsecond, it will be truncated. For example /// `1970-01-01T00:00:01.234567890Z` @@ -176,10 +175,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { ts_array .iter() - .map(|x| match x { - Some(y) => Some(timestamp_s_to_datetime(y).unwrap()), - None => None, - }) + .map(|x| x.map(|y| timestamp_s_to_datetime(y).unwrap())) .collect() } arrow_schema::TimeUnit::Millisecond => { @@ -190,10 +186,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { ts_array .iter() - .map(|x| match x { - Some(y) => Some(timestamp_ms_to_datetime(y).unwrap()), - None => None, - }) + .map(|x| x.map(|y| timestamp_ms_to_datetime(y).unwrap())) .collect() } arrow_schema::TimeUnit::Microsecond => { @@ -203,10 +196,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { .expect("Array is not TimestampMicrosecondArray"); ts_array .iter() - .map(|x| match x { - Some(y) => Some(timestamp_us_to_datetime(y).unwrap()), - None => None, - }) + .map(|x| x.map(|y| timestamp_us_to_datetime(y).unwrap())) .collect() } arrow_schema::TimeUnit::Nanosecond => { @@ -216,10 +206,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { .expect("Array is not TimestampNanosecondArray"); ts_array .iter() - .map(|x| match x { - Some(y) => Some(timestamp_ns_to_datetime(y).unwrap()), - None => None, - }) + .map(|x| x.map(|y| timestamp_ns_to_datetime(y).unwrap())) .collect() } }; From a87762d57e06f1406ba131294ffb3456c3694e09 Mon Sep 17 00:00:00 2001 From: Mark Nash Date: Tue, 12 Aug 2025 00:04:12 -0700 Subject: [PATCH 4/4] Moved conversion out to a function --- .../src/cast_to_variant.rs | 133 ++++++++++-------- 1 file changed, 72 insertions(+), 61 deletions(-) diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index a3488df13a42..60143ca92447 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{ Array, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, @@ -28,7 +30,7 @@ use arrow::temporal_conversions::{ timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, timestamp_us_to_datetime, }; -use arrow_schema::{ArrowError, DataType}; +use arrow_schema::{ArrowError, DataType, TimeUnit}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use half::f16; use parquet_variant::Variant; @@ -79,6 +81,74 @@ macro_rules! cast_conversion_nongeneric { }}; } +fn convert_timestamp( + time_unit: &TimeUnit, + time_zone: &Option>, + input: &dyn Array, + builder: &mut VariantArrayBuilder, +) { + let native_datetimes: Vec> = match time_unit { + arrow_schema::TimeUnit::Second => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampSecondArray"); + + ts_array + .iter() + .map(|x| x.map(|y| timestamp_s_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Millisecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMillisecondArray"); + + ts_array + .iter() + .map(|x| x.map(|y| timestamp_ms_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Microsecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampMicrosecondArray"); + ts_array + .iter() + .map(|x| x.map(|y| timestamp_us_to_datetime(y).unwrap())) + .collect() + } + arrow_schema::TimeUnit::Nanosecond => { + let ts_array = input + .as_any() + .downcast_ref::() + .expect("Array is not TimestampNanosecondArray"); + ts_array + .iter() + .map(|x| x.map(|y| timestamp_ns_to_datetime(y).unwrap())) + .collect() + } + }; + + for x in native_datetimes { + match x { + Some(ndt) => { + if time_zone.is_none() { + builder.append_variant(ndt.into()); + } else { + let utc_dt: DateTime = Utc.from_utc_datetime(&ndt); + builder.append_variant(utc_dt.into()); + } + } + None => { + builder.append_null(); + } + } + } +} + /// Casts a typed arrow [`Array`] to a [`VariantArray`]. This is useful when you /// need to convert a specific data type /// @@ -166,66 +236,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result { cast_conversion_nongeneric!(as_fixed_size_binary, |v| v, input, builder); } DataType::Timestamp(time_unit, time_zone) => { - let native_datetimes: Vec> = match time_unit { - arrow_schema::TimeUnit::Second => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampSecondArray"); - - ts_array - .iter() - .map(|x| x.map(|y| timestamp_s_to_datetime(y).unwrap())) - .collect() - } - arrow_schema::TimeUnit::Millisecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampMillisecondArray"); - - ts_array - .iter() - .map(|x| x.map(|y| timestamp_ms_to_datetime(y).unwrap())) - .collect() - } - arrow_schema::TimeUnit::Microsecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampMicrosecondArray"); - ts_array - .iter() - .map(|x| x.map(|y| timestamp_us_to_datetime(y).unwrap())) - .collect() - } - arrow_schema::TimeUnit::Nanosecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampNanosecondArray"); - ts_array - .iter() - .map(|x| x.map(|y| timestamp_ns_to_datetime(y).unwrap())) - .collect() - } - }; - - for x in native_datetimes { - match x { - Some(ndt) => { - if time_zone.is_none() { - builder.append_variant(ndt.into()); - } else { - let utc_dt: DateTime = Utc.from_utc_datetime(&ndt); - builder.append_variant(utc_dt.into()); - } - } - None => { - builder.append_null(); - } - } - } + convert_timestamp(time_unit, time_zone, input, &mut builder); } dt => { return Err(ArrowError::CastError(format!(