Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions parquet-variant-compute/src/cast_to_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,13 +591,19 @@ mod tests {
Arc::new(microsecond_array.with_timezone("+01:00".to_string())),
);

// nanoseconds should get truncated to microseconds
let timestamp = DateTime::from_timestamp_nanos(nanosecond);
let nanosecond_array = TimestampNanosecondArray::from(vec![Some(nanosecond), None]);
run_array_tests(
microsecond,
run_test(
Arc::new(nanosecond_array.clone()),
vec![
Some(Variant::TimestampNtzNanos(timestamp.naive_utc())),
None,
],
);
run_test(
Arc::new(nanosecond_array.with_timezone("+01:00".to_string())),
)
vec![Some(Variant::TimestampNanos(timestamp)), None],
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions parquet-variant-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ parquet-variant = { path = "../parquet-variant" }
chrono = { workspace = true }
serde_json = "1.0"
base64 = "0.22"
uuid = "1.18.0"


[lib]
Expand Down
74 changes: 67 additions & 7 deletions parquet-variant-json/src/to_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,14 @@ impl<'m, 'v> VariantToJson for Variant<'m, 'v> {
Variant::Decimal8(decimal) => write!(buffer, "{decimal}")?,
Variant::Decimal16(decimal) => write!(buffer, "{decimal}")?,
Variant::Date(date) => write!(buffer, "\"{}\"", format_date_string(date))?,
Variant::TimestampMicros(ts) => write!(buffer, "\"{}\"", ts.to_rfc3339())?,
Variant::TimestampMicros(ts) | Variant::TimestampNanos(ts) => {
write!(buffer, "\"{}\"", ts.to_rfc3339())?
}
Variant::TimestampNtzMicros(ts) => {
write!(buffer, "\"{}\"", format_timestamp_ntz_string(ts))?
write!(buffer, "\"{}\"", format_timestamp_ntz_string(ts, 6))?
}
Variant::TimestampNtzNanos(ts) => {
write!(buffer, "\"{}\"", format_timestamp_ntz_string(ts, 9))?
}
Variant::Time(time) => write!(buffer, "\"{}\"", format_time_ntz_str(time))?,
Variant::Binary(bytes) => {
Expand All @@ -208,6 +213,9 @@ impl<'m, 'v> VariantToJson for Variant<'m, 'v> {
})?;
write!(buffer, "{json_str}")?
}
Variant::Uuid(uuid) => {
write!(buffer, "\"{uuid}\"")?;
}
Variant::Object(obj) => {
convert_object_to_json(buffer, obj)?;
}
Expand Down Expand Up @@ -297,12 +305,18 @@ impl<'m, 'v> VariantToJson for Variant<'m, 'v> {
Ok(value)
}
Variant::Date(date) => Ok(Value::String(format_date_string(date))),
Variant::TimestampMicros(ts) => Ok(Value::String(ts.to_rfc3339())),
Variant::TimestampNtzMicros(ts) => Ok(Value::String(format_timestamp_ntz_string(ts))),
Variant::TimestampMicros(ts) | Variant::TimestampNanos(ts) => {
Ok(Value::String(ts.to_rfc3339()))
}
Variant::TimestampNtzMicros(ts) => {
Ok(Value::String(format_timestamp_ntz_string(ts, 6)))
}
Variant::TimestampNtzNanos(ts) => Ok(Value::String(format_timestamp_ntz_string(ts, 9))),
Variant::Time(time) => Ok(Value::String(format_time_ntz_str(time))),
Variant::Binary(bytes) => Ok(Value::String(format_binary_base64(bytes))),
Variant::String(s) => Ok(Value::String(s.to_string())),
Variant::ShortString(s) => Ok(Value::String(s.to_string())),
Variant::Uuid(uuid) => Ok(Value::String(uuid.to_string())),
Variant::Object(obj) => {
let map = obj
.iter()
Expand All @@ -323,15 +337,18 @@ impl<'m, 'v> VariantToJson for Variant<'m, 'v> {

// Format string constants to avoid duplication and reduce errors
const DATE_FORMAT: &str = "%Y-%m-%d";
const TIMESTAMP_NTZ_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.6f";

// Helper functions for consistent formatting
fn format_date_string(date: &chrono::NaiveDate) -> String {
date.format(DATE_FORMAT).to_string()
}

fn format_timestamp_ntz_string(ts: &chrono::NaiveDateTime) -> String {
ts.format(TIMESTAMP_NTZ_FORMAT).to_string()
fn format_timestamp_ntz_string(ts: &chrono::NaiveDateTime, precision: usize) -> String {
let format_str = format!(
"{}",
ts.format(&format!("%Y-%m-%dT%H:%M:%S%.{}f", precision))
);
ts.format(format_str.as_str()).to_string()
}

fn format_binary_base64(bytes: &[u8]) -> String {
Expand Down Expand Up @@ -497,6 +514,34 @@ mod tests {
Ok(())
}

#[test]
fn test_timestamp_nanos_to_json() -> Result<(), ArrowError> {
let timestamp = DateTime::parse_from_rfc3339("2023-12-25T10:30:45.123456789Z")
.unwrap()
.with_timezone(&Utc);
let variant = Variant::TimestampNanos(timestamp);
let json = variant.to_json_string()?;
assert_eq!(json, "\"2023-12-25T10:30:45.123456789+00:00\"");

let json_value = variant.to_json_value()?;
assert!(matches!(json_value, Value::String(_)));
Ok(())
}

#[test]
fn test_timestamp_ntz_nanos_to_json() -> Result<(), ArrowError> {
let naive_timestamp = DateTime::from_timestamp(1703505045, 123456789)
.unwrap()
.naive_utc();
let variant = Variant::TimestampNtzNanos(naive_timestamp);
let json = variant.to_json_string()?;
assert_eq!(json, "\"2023-12-25T11:50:45.123456789\"");

let json_value = variant.to_json_value()?;
assert!(matches!(json_value, Value::String(_)));
Ok(())
}

#[test]
fn test_binary_to_json() -> Result<(), ArrowError> {
let binary_data = b"Hello, World!";
Expand Down Expand Up @@ -546,6 +591,21 @@ mod tests {
Ok(())
}

#[test]
fn test_uuid_to_json() -> Result<(), ArrowError> {
let uuid = uuid::Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap();
let variant = Variant::Uuid(uuid);
let json = variant.to_json_string()?;
assert_eq!(json, "\"123e4567-e89b-12d3-a456-426614174000\"");

let json_value = variant.to_json_value()?;
assert_eq!(
json_value,
Value::String("123e4567-e89b-12d3-a456-426614174000".to_string())
);
Ok(())
}

#[test]
fn test_string_escaping() -> Result<(), ArrowError> {
let variant = Variant::from("hello\nworld\t\"quoted\"");
Expand Down
1 change: 1 addition & 0 deletions parquet-variant/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ rust-version = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
indexmap = "2.10.0"
uuid = { version = "1.18.0"}

simdutf8 = { workspace = true , optional = true }

Expand Down
24 changes: 24 additions & 0 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow_schema::ArrowError;
use chrono::Timelike;
use indexmap::{IndexMap, IndexSet};
use std::collections::HashSet;
use uuid::Uuid;

const BASIC_TYPE_BITS: u8 = 2;
const UNIX_EPOCH_DATE: chrono::NaiveDate = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
Expand Down Expand Up @@ -198,6 +199,23 @@ impl ValueBuffer {
self.append_slice(&micros_from_midnight.to_le_bytes());
}

fn append_timestamp_nanos(&mut self, value: chrono::DateTime<chrono::Utc>) {
self.append_primitive_header(VariantPrimitiveType::TimestampNanos);
let nanos = value.timestamp_nanos_opt().unwrap();
self.append_slice(&nanos.to_le_bytes());
}

fn append_timestamp_ntz_nanos(&mut self, value: chrono::NaiveDateTime) {
self.append_primitive_header(VariantPrimitiveType::TimestampNtzNanos);
let nanos = value.and_utc().timestamp_nanos_opt().unwrap();
self.append_slice(&nanos.to_le_bytes());
}

fn append_uuid(&mut self, value: Uuid) {
self.append_primitive_header(VariantPrimitiveType::Uuid);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID is stored as big-endian

self.append_slice(&value.into_bytes());
}

fn append_decimal4(&mut self, decimal4: VariantDecimal4) {
self.append_primitive_header(VariantPrimitiveType::Decimal4);
self.append_u8(decimal4.scale());
Expand Down Expand Up @@ -332,6 +350,8 @@ impl ValueBuffer {
Variant::Date(v) => self.append_date(v),
Variant::TimestampMicros(v) => self.append_timestamp_micros(v),
Variant::TimestampNtzMicros(v) => self.append_timestamp_ntz_micros(v),
Variant::TimestampNanos(v) => self.append_timestamp_nanos(v),
Variant::TimestampNtzNanos(v) => self.append_timestamp_ntz_nanos(v),
Variant::Decimal4(decimal4) => self.append_decimal4(decimal4),
Variant::Decimal8(decimal8) => self.append_decimal8(decimal8),
Variant::Decimal16(decimal16) => self.append_decimal16(decimal16),
Expand All @@ -340,6 +360,7 @@ impl ValueBuffer {
Variant::Binary(v) => self.append_binary(v),
Variant::String(s) => self.append_string(s),
Variant::ShortString(s) => self.append_short_string(s),
Variant::Uuid(v) => self.append_uuid(v),
Variant::Object(obj) => self.append_object(metadata_builder, obj),
Variant::List(list) => self.append_list(metadata_builder, list),
Variant::Time(v) => self.append_time_micros(v),
Expand All @@ -363,12 +384,15 @@ impl ValueBuffer {
Variant::Date(v) => self.append_date(v),
Variant::TimestampMicros(v) => self.append_timestamp_micros(v),
Variant::TimestampNtzMicros(v) => self.append_timestamp_ntz_micros(v),
Variant::TimestampNanos(v) => self.append_timestamp_nanos(v),
Variant::TimestampNtzNanos(v) => self.append_timestamp_ntz_nanos(v),
Variant::Decimal4(decimal4) => self.append_decimal4(decimal4),
Variant::Decimal8(decimal8) => self.append_decimal8(decimal8),
Variant::Decimal16(decimal16) => self.append_decimal16(decimal16),
Variant::Float(v) => self.append_float(v),
Variant::Double(v) => self.append_double(v),
Variant::Binary(v) => self.append_binary(v),
Variant::Uuid(v) => self.append_uuid(v),
Variant::String(s) => self.append_string(s),
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(obj) => self.try_append_object(metadata_builder, obj)?,
Expand Down
81 changes: 81 additions & 0 deletions parquet-variant/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::ShortString;

use arrow_schema::ArrowError;
use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use uuid::Uuid;

/// The basic type of a [`Variant`] value, encoded in the first two bits of the
/// header byte.
Expand Down Expand Up @@ -64,6 +65,9 @@ pub enum VariantPrimitiveType {
Binary = 15,
String = 16,
Time = 17,
TimestampNanos = 18,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimestampNtzNanos = 19,
Uuid = 20,
}

/// Extracts the basic type from a header byte
Expand Down Expand Up @@ -106,6 +110,9 @@ impl TryFrom<u8> for VariantPrimitiveType {
15 => Ok(VariantPrimitiveType::Binary),
16 => Ok(VariantPrimitiveType::String),
17 => Ok(VariantPrimitiveType::Time),
18 => Ok(VariantPrimitiveType::TimestampNanos),
19 => Ok(VariantPrimitiveType::TimestampNtzNanos),
20 => Ok(VariantPrimitiveType::Uuid),
_ => Err(ArrowError::InvalidArgumentError(format!(
"unknown primitive type: {value}",
))),
Expand Down Expand Up @@ -316,6 +323,25 @@ pub(crate) fn decode_time_ntz(data: &[u8]) -> Result<NaiveTime, ArrowError> {
.ok_or(case_error)
}

/// Decodes a TimestampNanos from the value section of a variant.
pub(crate) fn decode_timestamp_nanos(data: &[u8]) -> Result<DateTime<Utc>, ArrowError> {
let nanos_since_epoch = i64::from_le_bytes(array_from_slice(data, 0)?);

// DateTime::from_timestamp_nanos would never fail
Ok(DateTime::from_timestamp_nanos(nanos_since_epoch))
}

/// Decodes a TimestampNtzNanos from the value section of a variant.
pub(crate) fn decode_timestampntz_nanos(data: &[u8]) -> Result<NaiveDateTime, ArrowError> {
decode_timestamp_nanos(data).map(|v| v.naive_utc())
}

/// Decodes a UUID from the value section of a variant.
pub(crate) fn decode_uuid(data: &[u8]) -> Result<Uuid, ArrowError> {
Uuid::from_slice(&data[0..16])
.map_err(|_| ArrowError::CastError(format!("Cant decode uuid from {:?}", &data[0..16])))
}

/// Decodes a Binary from the value section of a variant.
pub(crate) fn decode_binary(data: &[u8]) -> Result<&[u8], ArrowError> {
let len = u32::from_le_bytes(array_from_slice(data, 0)?) as usize;
Expand Down Expand Up @@ -460,6 +486,61 @@ mod tests {
.and_hms_milli_opt(16, 34, 56, 780)
.unwrap()
);

test_decoder_bounds!(
test_timestamp_nanos,
[0x15, 0x41, 0xa2, 0x5a, 0x36, 0xa2, 0x5b, 0x18],
decode_timestamp_nanos,
NaiveDate::from_ymd_opt(2025, 8, 14)
.unwrap()
.and_hms_nano_opt(12, 33, 54, 123456789)
.unwrap()
.and_utc()
);

test_decoder_bounds!(
test_timestamp_nanos_before_epoch,
[0x15, 0x41, 0x52, 0xd4, 0x94, 0xe5, 0xad, 0xfa],
decode_timestamp_nanos,
NaiveDate::from_ymd_opt(1957, 11, 7)
.unwrap()
.and_hms_nano_opt(12, 33, 54, 123456789)
.unwrap()
.and_utc()
);

test_decoder_bounds!(
test_timestampntz_nanos,
[0x15, 0x41, 0xa2, 0x5a, 0x36, 0xa2, 0x5b, 0x18],
decode_timestampntz_nanos,
NaiveDate::from_ymd_opt(2025, 8, 14)
.unwrap()
.and_hms_nano_opt(12, 33, 54, 123456789)
.unwrap()
);

test_decoder_bounds!(
test_timestampntz_nanos_before_epoch,
[0x15, 0x41, 0x52, 0xd4, 0x94, 0xe5, 0xad, 0xfa],
decode_timestampntz_nanos,
NaiveDate::from_ymd_opt(1957, 11, 7)
.unwrap()
.and_hms_nano_opt(12, 33, 54, 123456789)
.unwrap()
);
}

#[test]
fn test_uuid() {
let data = [
0xf2, 0x4f, 0x9b, 0x64, 0x81, 0xfa, 0x49, 0xd1, 0xb7, 0x4e, 0x8c, 0x09, 0xa6, 0xe3,
0x1c, 0x56,
];
let result = decode_uuid(&data).unwrap();
assert_eq!(
Uuid::parse_str("f24f9b64-81fa-49d1-b74e-8c09a6e31c56").unwrap(),
result
);
}

mod time {
Expand Down
Loading
Loading