From a55c543a5c7f1fda0ab0c95b013e0aba28c8e6b2 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 23 Sep 2025 09:28:03 -0700 Subject: [PATCH 1/4] [Variant] VariantArray::value can return owned bytes now --- parquet-variant-compute/src/lib.rs | 2 +- parquet-variant-compute/src/shred_variant.rs | 4 +- parquet-variant-compute/src/variant_array.rs | 150 +++++++++++++----- .../src/variant_array_builder.rs | 3 +- parquet-variant-compute/src/variant_get.rs | 20 +-- parquet-variant/src/variant.rs | 6 +- 6 files changed, 134 insertions(+), 51 deletions(-) diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 496d550d95b1..d52d921247e1 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -46,7 +46,7 @@ mod variant_array_builder; pub mod variant_get; mod variant_to_arrow; -pub use variant_array::{ShreddingState, VariantArray, VariantType}; +pub use variant_array::{ShreddingState, VariantArray, VariantArrayValue, VariantType}; pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder}; pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; diff --git a/parquet-variant-compute/src/shred_variant.rs b/parquet-variant-compute/src/shred_variant.rs index 138209802ab4..0408ad66183d 100644 --- a/parquet-variant-compute/src/shred_variant.rs +++ b/parquet-variant-compute/src/shred_variant.rs @@ -86,7 +86,9 @@ pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result { + Borrowed(Variant<'m, 'v>), + Owned { + metadata: VariantMetadata<'m>, + value_bytes: Vec, + }, +} + +impl<'m, 'v> VariantArrayValue<'m, 'v> { + pub fn borrowed(value: Variant<'m, 'v>) -> Self { + Self::Borrowed(value) + } + pub fn owned(metadata_bytes: &'m [u8], value_bytes: Vec) -> Self { + Self::Owned { + metadata: VariantMetadata::new(metadata_bytes), + value_bytes, + } + } + pub fn consume(self, visitor: impl FnOnce(Variant<'_, '_>) -> R) -> R { + match self { + VariantArrayValue::Borrowed(v) => visitor(v), + VariantArrayValue::Owned { + metadata, + value_bytes, + } => visitor(Variant::new_with_metadata(metadata, &value_bytes)), + } + } + // internal helper for when we don't want to pay the extra clone + fn as_variant_cow(&self) -> Cow<'_, Variant<'m, '_>> { + match self { + VariantArrayValue::Borrowed(v) => Cow::Borrowed(v), + VariantArrayValue::Owned { + metadata, + value_bytes, + } => Cow::Owned(Variant::new_with_metadata(metadata.clone(), value_bytes)), + } + } + pub fn as_variant(&self) -> Variant<'m, '_> { + self.as_variant_cow().into_owned() + } + pub fn metadata(&self) -> &VariantMetadata<'m> { + match self { + VariantArrayValue::Borrowed(v) => v.metadata(), + VariantArrayValue::Owned { metadata, .. } => metadata, + } + } + pub fn as_object(&self) -> Option> { + self.as_variant_cow().as_object().cloned() + } + pub fn as_list(&self) -> Option> { + self.as_variant_cow().as_list().cloned() + } + pub fn get_object_field<'s>(&'s self, field_name: &str) -> Option> { + self.as_variant_cow().get_object_field(field_name) + } + pub fn get_list_element(&self, index: usize) -> Option> { + self.as_variant_cow().get_list_element(index) + } +} + +impl<'m, 'v> From> for VariantArrayValue<'m, 'v> { + fn from(value: Variant<'m, 'v>) -> Self { + Self::borrowed(value) + } +} + +impl PartialEq for VariantArrayValue<'_, '_> { + fn eq(&self, other: &VariantArrayValue<'_, '_>) -> bool { + self.as_variant_cow().as_ref() == other.as_variant_cow().as_ref() + } +} + +impl PartialEq> for VariantArrayValue<'_, '_> { + fn eq(&self, other: &Variant<'_, '_>) -> bool { + self.as_variant_cow().as_ref() == other + } +} + +impl PartialEq> for Variant<'_, '_> { + fn eq(&self, other: &VariantArrayValue<'_, '_>) -> bool { + self == other.as_variant_cow().as_ref() + } +} + +impl std::fmt::Debug for VariantArrayValue<'_, '_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.as_variant_cow().fmt(f) + } +} + /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying @@ -352,39 +444,24 @@ impl VariantArray { /// /// Note: Does not do deep validation of the [`Variant`], so it is up to the /// caller to ensure that the metadata and value were constructed correctly. - pub fn value(&self, index: usize) -> Variant<'_, '_> { - match &self.shredding_state { - ShreddingState::Unshredded { value, .. } => { - // Unshredded case - Variant::new(self.metadata.value(index), value.value(index)) + pub fn value(&self, index: usize) -> VariantArrayValue<'_, '_> { + let value = match &self.shredding_state { + // Always prefer to use the typed_value, if present + ShreddingState::Typed { typed_value, .. } + | ShreddingState::PartiallyShredded { typed_value, .. } + if typed_value.is_valid(index) => + { + return typed_value_to_variant(typed_value, index); } - ShreddingState::Typed { typed_value, .. } => { - // Typed case (formerly PerfectlyShredded) - if typed_value.is_null(index) { - Variant::Null - } else { - typed_value_to_variant(typed_value, index) - } - } - ShreddingState::PartiallyShredded { - value, typed_value, .. - } => { - // PartiallyShredded case (formerly ImperfectlyShredded) - if typed_value.is_null(index) { - Variant::new(self.metadata.value(index), value.value(index)) - } else { - typed_value_to_variant(typed_value, index) - } - } - ShreddingState::AllNull => { - // AllNull case: neither value nor typed_value fields exist - // NOTE: This handles the case where neither value nor typed_value fields exist. - // For top-level variants, this returns Variant::Null (JSON null). - // For shredded object fields, this technically should indicate SQL NULL, - // but the current API cannot distinguish these contexts. - Variant::Null + // If no typed_value, fall back to value, if present + ShreddingState::Unshredded { value, .. } + | ShreddingState::PartiallyShredded { value, .. } => { + Variant::new(self.metadata.value(index), value.value(index)) } - } + // If neither value nor typed_value fields exist, return Variant::Null. + ShreddingState::Typed { .. } | ShreddingState::AllNull => Variant::Null, + }; + value.into() } /// Return a reference to the metadata field of the [`StructArray`] @@ -796,8 +873,8 @@ impl StructArrayBuilder { } /// returns the non-null element at index as a Variant -fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, '_> { - match typed_value.data_type() { +fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> VariantArrayValue<'_, '_> { + let value = match typed_value.data_type() { DataType::Boolean => { let boolean_array = typed_value.as_boolean(); let value = boolean_array.value(index); @@ -815,7 +892,7 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' let value = array.value(index); if *binary_len == 16 { if let Ok(uuid) = Uuid::from_slice(value) { - return Variant::from(uuid); + return Variant::from(uuid).into(); } } let value = array.value(index); @@ -877,7 +954,8 @@ fn typed_value_to_variant(typed_value: &ArrayRef, index: usize) -> Variant<'_, ' ); Variant::Null } - } + }; + value.into() } /// Workaround for lack of direct support for BinaryArray diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 68c1fd6b5492..cbd264384778 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -485,7 +485,7 @@ mod test { let mut builder = VariantValueArrayBuilder::new(3); // straight copy - builder.append_value(array.value(0)); + array.value(0).consume(|value| builder.append_value(value)); // filtering fields takes more work because we need to manually create an object builder let value = array.value(1); @@ -498,6 +498,7 @@ mod test { // same bytes, but now nested and duplicated inside a list let value = array.value(2); + let value = value.as_variant(); let mut metadata_builder = ReadOnlyMetadataBuilder::new(value.metadata().clone()); let state = builder.parent_state(&mut metadata_builder); ListBuilder::new(state, false) diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index ef602e84f1bf..428e3f306a68 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -144,7 +144,9 @@ fn shredded_get_path( if target.is_null(i) { builder.append_null()?; } else { - builder.append_value(target.value(i))?; + target + .value(i) + .consume(|value| builder.append_value(value))?; } } builder.finish() @@ -1596,7 +1598,7 @@ mod test { let json_str = r#"{"x": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1607,7 +1609,7 @@ mod test { let json_str = r#"{"x": "foo"}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1618,7 +1620,7 @@ mod test { let json_str = r#"{"y": 10}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1637,7 +1639,7 @@ mod test { let json_str = r#"{"a": {"x": 55}, "b": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1648,7 +1650,7 @@ mod test { let json_str = r#"{"a": {"x": "foo"}, "b": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1667,7 +1669,7 @@ mod test { let json_str = r#"{"a": {"b": {"x": 100}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1678,7 +1680,7 @@ mod test { let json_str = r#"{"a": {"b": {"x": "bar"}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1689,7 +1691,7 @@ mod test { let json_str = r#"{"a": {"b": {"y": 200}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - builder.append_variant(variant_array.value(0)); + variant_array.value(0).consume(|value| builder.append_variant(value)); } else { builder.append_null(); } diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index 849947675b13..1db20fa776ef 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -1218,7 +1218,7 @@ impl<'m, 'v> Variant<'m, 'v> { /// let obj = variant.as_object().expect("variant should be an object"); /// assert_eq!(obj.get("name"), Some(Variant::from("John"))); /// ``` - pub fn as_object(&'m self) -> Option<&'m VariantObject<'m, 'v>> { + pub fn as_object(&self) -> Option<&VariantObject<'m, 'v>> { if let Variant::Object(obj) = self { Some(obj) } else { @@ -1280,7 +1280,7 @@ impl<'m, 'v> Variant<'m, 'v> { /// assert_eq!(list.get(0).unwrap(), Variant::from("John")); /// assert_eq!(list.get(1).unwrap(), Variant::from("Doe")); /// ``` - pub fn as_list(&'m self) -> Option<&'m VariantList<'m, 'v>> { + pub fn as_list(&self) -> Option<&VariantList<'m, 'v>> { if let Variant::List(list) = self { Some(list) } else { @@ -1308,7 +1308,7 @@ impl<'m, 'v> Variant<'m, 'v> { /// let v2 = Variant::from("Hello"); /// assert_eq!(None, v2.as_time_utc()); /// ``` - pub fn as_time_utc(&'m self) -> Option { + pub fn as_time_utc(&self) -> Option { if let Variant::Time(time) = self { Some(*time) } else { From 79b31811dc79ee003574a76d339d15b733c9af65 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 23 Sep 2025 12:40:46 -0700 Subject: [PATCH 2/4] self review + fmt --- parquet-variant-compute/src/variant_array.rs | 37 +++++++++++++++++++- parquet-variant-compute/src/variant_get.rs | 32 ++++++++++++----- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 48f66fca2ffe..c6c27322803a 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -74,6 +74,9 @@ impl ExtensionType for VariantType { } } +/// A [`Cow`]-like representation of a [`Variant`] value returned by [`VariantArray::value`], which +/// may use owned or borrowed value bytes depending on how the underlying variant was shredded. We +/// cannot "just" use [`Cow`] because of the special lifetime management that [`Variant`] requires. pub enum VariantArrayValue<'m, 'v> { Borrowed(Variant<'m, 'v>), Owned { @@ -83,15 +86,23 @@ pub enum VariantArrayValue<'m, 'v> { } impl<'m, 'v> VariantArrayValue<'m, 'v> { + /// Creates a new instance that borrows from a normal [`Variant`] value. pub fn borrowed(value: Variant<'m, 'v>) -> Self { Self::Borrowed(value) } + + /// Creates a new instance that wraps owned bytes that can be converted to a [`Variant`] value. pub fn owned(metadata_bytes: &'m [u8], value_bytes: Vec) -> Self { Self::Owned { metadata: VariantMetadata::new(metadata_bytes), value_bytes, } } + + /// Consumes this variant value, passing the result to a `visitor` function. + /// + /// The visitor idiom is helpful because a variant value based on owned bytes cannot outlive + /// self. pub fn consume(self, visitor: impl FnOnce(Variant<'_, '_>) -> R) -> R { match self { VariantArrayValue::Borrowed(v) => visitor(v), @@ -101,6 +112,7 @@ impl<'m, 'v> VariantArrayValue<'m, 'v> { } => visitor(Variant::new_with_metadata(metadata, &value_bytes)), } } + // internal helper for when we don't want to pay the extra clone fn as_variant_cow(&self) -> Cow<'_, Variant<'m, '_>> { match self { @@ -111,24 +123,44 @@ impl<'m, 'v> VariantArrayValue<'m, 'v> { } => Cow::Owned(Variant::new_with_metadata(metadata.clone(), value_bytes)), } } + + /// Returns a [`Variant`] instance for this value. pub fn as_variant(&self) -> Variant<'m, '_> { self.as_variant_cow().into_owned() } + + /// Returns the variant metadata that backs this value. pub fn metadata(&self) -> &VariantMetadata<'m> { match self { VariantArrayValue::Borrowed(v) => v.metadata(), VariantArrayValue::Owned { metadata, .. } => metadata, } } + + /// Extracts the underlying [`VariantObject`], if this is a variant object. + /// + /// See also [`Variant::as_object`]. pub fn as_object(&self) -> Option> { self.as_variant_cow().as_object().cloned() } + + /// Extracts the underlying [`VariantList`], if this is a variant array. + /// + /// See also [`Variant::as_list`]. pub fn as_list(&self) -> Option> { self.as_variant_cow().as_list().cloned() } + + /// Extracts the value of the named variant object field, if this is a variant object. + /// + /// See also [`Variant::get_object_field`]. pub fn get_object_field<'s>(&'s self, field_name: &str) -> Option> { self.as_variant_cow().get_object_field(field_name) } + + /// Extracts the value of the variant array element at `index`, if this is a variant object. + /// + /// See also [`Variant::get_list_element`]. pub fn get_list_element(&self, index: usize) -> Option> { self.as_variant_cow().get_list_element(index) } @@ -140,6 +172,8 @@ impl<'m, 'v> From> for VariantArrayValue<'m, 'v> { } } +// By providing PartialEq for all three combinations, we avoid changing a lot of unit test code that +// relies on comparisons. impl PartialEq for VariantArrayValue<'_, '_> { fn eq(&self, other: &VariantArrayValue<'_, '_>) -> bool { self.as_variant_cow().as_ref() == other.as_variant_cow().as_ref() @@ -158,9 +192,10 @@ impl PartialEq> for Variant<'_, '_> { } } +// Make it transparent -- looks just like the underlying value it proxies impl std::fmt::Debug for VariantArrayValue<'_, '_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.as_variant_cow().fmt(f) + self.as_variant_cow().as_ref().fmt(f) } } diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 428e3f306a68..47029184ba01 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -1598,7 +1598,9 @@ mod test { let json_str = r#"{"x": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1609,7 +1611,9 @@ mod test { let json_str = r#"{"x": "foo"}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1620,7 +1624,9 @@ mod test { let json_str = r#"{"y": 10}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1639,7 +1645,9 @@ mod test { let json_str = r#"{"a": {"x": 55}, "b": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1650,7 +1658,9 @@ mod test { let json_str = r#"{"a": {"x": "foo"}, "b": 42}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1669,7 +1679,9 @@ mod test { let json_str = r#"{"a": {"b": {"x": 100}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1680,7 +1692,9 @@ mod test { let json_str = r#"{"a": {"b": {"x": "bar"}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } @@ -1691,7 +1705,9 @@ mod test { let json_str = r#"{"a": {"b": {"y": 200}}}"#; let string_array: ArrayRef = Arc::new(StringArray::from(vec![json_str])); if let Ok(variant_array) = json_to_variant(&string_array) { - variant_array.value(0).consume(|value| builder.append_variant(value)); + variant_array + .value(0) + .consume(|value| builder.append_variant(value)); } else { builder.append_null(); } From 229d2f155eef97e3a9a574fff471bdea2a6cc6e2 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 23 Sep 2025 18:47:29 -0700 Subject: [PATCH 3/4] [Variant] VariantArray::value supports shredded structs --- parquet-variant-compute/src/variant_array.rs | 200 ++++++++++++++++--- parquet-variant/src/builder.rs | 2 +- parquet/tests/variant_integration.rs | 38 ++-- 3 files changed, 188 insertions(+), 52 deletions(-) diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 76c41007e978..8105b91549e9 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -22,11 +22,14 @@ use arrow::array::{Array, ArrayRef, AsArray, BinaryViewArray, StructArray}; use arrow::buffer::NullBuffer; use arrow::compute::cast; use arrow::datatypes::{ - Date32Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, }; use arrow_schema::extension::ExtensionType; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit}; -use parquet_variant::{Uuid, Variant, VariantList, VariantMetadata, VariantObject}; +use parquet_variant::{ + ListBuilder, ObjectBuilder, ObjectFieldBuilder, ParentState, ReadOnlyMetadataBuilder, Uuid, + ValueBuilder, Variant, VariantBuilderExt, VariantList, VariantMetadata, VariantObject, +}; use std::borrow::Cow; use std::sync::Arc; @@ -90,9 +93,9 @@ impl<'m, 'v> VariantArrayValue<'m, 'v> { } /// Creates a new instance that wraps owned bytes that can be converted to a [`Variant`] value. - pub fn owned(metadata_bytes: &'m [u8], value_bytes: Vec) -> Self { + pub fn owned(metadata: VariantMetadata<'m>, value_bytes: Vec) -> Self { Self::Owned { - metadata: VariantMetadata::new(metadata_bytes), + metadata, value_bytes, } } @@ -481,7 +484,16 @@ impl VariantArray { match (self.typed_value_field(), self.value_field()) { // Always prefer typed_value, if available (Some(typed_value), value) if typed_value.is_valid(index) => { - typed_value_to_variant(typed_value, value, index) + let metadata = VariantMetadata::new(self.metadata.value(index)); + let mut builder = SingleValueVariantBuilder::new(metadata.clone()); + let Err(err) = + typed_value_to_variant(typed_value, value, index, &metadata, &mut builder) + else { + let value_bytes = builder.into_bytes(); + return VariantArrayValue::owned(metadata.clone(), value_bytes); + }; + debug_assert!(false, "typed_value_to_variant failed: {err}"); + Variant::Null.into() } // Otherwise fall back to value, if available (_, Some(value)) if value.is_valid(index) => { @@ -901,65 +913,119 @@ impl StructArrayBuilder { } } +/// A simple wrapper that provides VariantBuilderExt for building a single variant value +/// +/// This is used specifically by VariantArray::value to build a single variant from shredded data +/// without the complexity of array-level state management. +struct SingleValueVariantBuilder<'m> { + value_builder: ValueBuilder, + metadata_builder: ReadOnlyMetadataBuilder<'m>, +} + +impl<'m> SingleValueVariantBuilder<'m> { + fn new(metadata: VariantMetadata<'m>) -> Self { + Self { + value_builder: ValueBuilder::new(), + metadata_builder: ReadOnlyMetadataBuilder::new(metadata), + } + } + + fn into_bytes(self) -> Vec { + self.value_builder.into_inner() + } + + fn parent_state(&mut self) -> ParentState<'_, ()> { + ParentState::variant(&mut self.value_builder, &mut self.metadata_builder) + } +} + +impl VariantBuilderExt for SingleValueVariantBuilder<'_> { + type State<'a> + = () + where + Self: 'a; + + fn append_null(&mut self) { + self.value_builder.append_null(); + } + + fn append_value<'m, 'v>(&mut self, value: impl Into>) { + ValueBuilder::append_variant_bytes(self.parent_state(), value.into()); + } + + fn try_new_list(&mut self) -> Result>, ArrowError> { + Ok(ListBuilder::new(self.parent_state(), false)) + } + + fn try_new_object(&mut self) -> Result>, ArrowError> { + Ok(ObjectBuilder::new(self.parent_state(), false)) + } +} + /// returns the non-null element at index as a Variant -fn typed_value_to_variant<'a>( - typed_value: &'a ArrayRef, +fn typed_value_to_variant( + typed_value: &ArrayRef, value: Option<&BinaryViewArray>, index: usize, -) -> VariantArrayValue<'a, 'a> { - let data_type = typed_value.data_type(); - if value.is_some_and(|v| !matches!(data_type, DataType::Struct(_)) && v.is_valid(index)) { - // Only a partially shredded struct is allowed to have values for both columns - panic!("Invalid variant, conflicting value and typed_value"); - } - let value = match data_type { + metadata: &VariantMetadata, + builder: &mut impl VariantBuilderExt, +) -> Result<(), ArrowError> { + match typed_value.data_type() { DataType::Boolean => { let boolean_array = typed_value.as_boolean(); let value = boolean_array.value(index); - Variant::from(value) + builder.append_value(value); } DataType::Date32 => { let array = typed_value.as_primitive::(); let value = array.value(index); let date = Date32Type::to_naive_date(value); - Variant::from(date) + builder.append_value(date); } // 16-byte FixedSizeBinary alway corresponds to a UUID; all other sizes are illegal. DataType::FixedSizeBinary(16) => { let array = typed_value.as_fixed_size_binary(); let value = array.value(index); - Uuid::from_slice(value).unwrap().into() // unwrap is safe: slice is always 16 bytes + let value = Uuid::from_slice(value).unwrap(); // unwrap safety: slice is always 16 bytes + builder.append_value(value); } DataType::BinaryView => { let array = typed_value.as_binary_view(); let value = array.value(index); - Variant::from(value) + builder.append_value(value); } DataType::Utf8 => { let array = typed_value.as_string::(); let value = array.value(index); - Variant::from(value) + builder.append_value(value); } DataType::Int8 => { - primitive_conversion_single_value!(Int8Type, typed_value, index) + let variant = primitive_conversion_single_value!(Int8Type, typed_value, index); + builder.append_value(variant); } DataType::Int16 => { - primitive_conversion_single_value!(Int16Type, typed_value, index) + let variant = primitive_conversion_single_value!(Int16Type, typed_value, index); + builder.append_value(variant); } DataType::Int32 => { - primitive_conversion_single_value!(Int32Type, typed_value, index) + let variant = primitive_conversion_single_value!(Int32Type, typed_value, index); + builder.append_value(variant); } DataType::Int64 => { - primitive_conversion_single_value!(Int64Type, typed_value, index) - } - DataType::Float16 => { - primitive_conversion_single_value!(Float16Type, typed_value, index) + let variant = primitive_conversion_single_value!(Int64Type, typed_value, index); + builder.append_value(variant); } DataType::Float32 => { - primitive_conversion_single_value!(Float32Type, typed_value, index) + let variant = primitive_conversion_single_value!(Float32Type, typed_value, index); + builder.append_value(variant); } DataType::Float64 => { - primitive_conversion_single_value!(Float64Type, typed_value, index) + let variant = primitive_conversion_single_value!(Float64Type, typed_value, index); + builder.append_value(variant); + } + DataType::Struct(_) => { + // Return directly in order to bypass the partial shredding check below + return struct_typed_value_to_variant(typed_value, value, index, metadata, builder); } // todo other types here (note this is very similar to cast_to_variant.rs) // so it would be great to figure out how to share this code @@ -972,10 +1038,82 @@ fn typed_value_to_variant<'a>( "Unsupported typed_value type: {}", typed_value.data_type() ); - Variant::Null + builder.append_value(Variant::Null); } - }; - value.into() + } + + // Only a partially shredded struct is allowed to have values for both columns + if value.is_some_and(|v| v.is_valid(index)) { + return Err(ArrowError::InvalidArgumentError( + "Invalid variant, conflicting value and typed_value".to_string(), + )); + } + + Ok(()) +} + +/// Handles reconstruction of variant objects from shredded struct data +fn struct_typed_value_to_variant( + typed_value: &ArrayRef, + value: Option<&BinaryViewArray>, + index: usize, + metadata: &VariantMetadata, + builder: &mut impl VariantBuilderExt, +) -> Result<(), ArrowError> { + let struct_array = typed_value.as_struct(); + let mut obj_builder = builder.try_new_object()?; + + // Track all shredded field names -- we must ignore them when processing value fields below. + let mut shredded_field_names = std::collections::HashSet::new(); + for (field_name, field_array) in struct_array + .column_names() + .iter() + .zip(struct_array.columns()) + { + shredded_field_names.insert(*field_name); + let field_shredded_array = ShreddedVariantFieldArray::try_new(field_array.as_ref())?; + let shredding_state = field_shredded_array.shredding_state(); + let value_field = shredding_state.value_field(); + let typed_value_field = shredding_state.typed_value_field(); + + match (typed_value_field, value_field) { + (Some(typed_value), value) if typed_value.is_valid(index) => { + // Handle typed value with optional value column + let mut field_builder = ObjectFieldBuilder::new(field_name, &mut obj_builder); + typed_value_to_variant(typed_value, value, index, metadata, &mut field_builder)?; + } + (_, Some(value)) if value.is_valid(index) => { + // Handle unshredded value only + // TODO: Add raw byte append capability to VariantBuilderExt to avoid bytes -> variant -> bytes conversion + let variant_bytes = value.value(index); + let field_variant = Variant::new_with_metadata(metadata.clone(), variant_bytes); + obj_builder.insert_bytes(field_name, field_variant); + } + // Skip missing or invalid fields + _ => {} + } + } + + // Add fields from the value column if present (with collision detection) + if let Some(value_array) = value { + if value_array.is_valid(index) { + let variant_bytes = value_array.value(index); + let field_variant = Variant::new_with_metadata(metadata.clone(), variant_bytes); + let Variant::Object(obj) = field_variant else { + return Err(ArrowError::InvalidArgumentError( + "Invalid variant, non-object value with shredded fields".to_string(), + )); + }; + for (obj_field_name, obj_field_value) in obj.iter() { + if !shredded_field_names.contains(obj_field_name) { + obj_builder.insert_bytes(obj_field_name, obj_field_value); + } + } + } + } + + obj_builder.finish(); + Ok(()) } /// Workaround for lack of direct support for BinaryArray diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 95a30c206d59..4b8fabc134bf 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -154,7 +154,7 @@ impl ValueBuilder { // Variant types below - fn append_null(&mut self) { + pub fn append_null(&mut self) { self.append_primitive_header(VariantPrimitiveType::Null); } diff --git a/parquet/tests/variant_integration.rs b/parquet/tests/variant_integration.rs index 9f202f4db803..1f467176f5b1 100644 --- a/parquet/tests/variant_integration.rs +++ b/parquet/tests/variant_integration.rs @@ -113,20 +113,18 @@ variant_test_case!(34, "Unsupported typed_value type: Timestamp(ns, \"UTC\")"); variant_test_case!(35, "Unsupported typed_value type: Timestamp(ns)"); variant_test_case!(36, "Unsupported typed_value type: Timestamp(ns)"); variant_test_case!(37); -// https://github.com/apache/arrow-rs/issues/8336 -variant_test_case!(38, "Unsupported typed_value type: Struct("); +variant_test_case!(38); variant_test_case!(39); // Is an error case (should be failing as the expected error message indicates) variant_test_case!(40, "Unsupported typed_value type: List("); variant_test_case!(41, "Unsupported typed_value type: List("); // Is an error case (should be failing as the expected error message indicates) variant_test_case!(42, "Invalid variant, conflicting value and typed_value"); -// https://github.com/apache/arrow-rs/issues/8336 -variant_test_case!(43, "Unsupported typed_value type: Struct("); -variant_test_case!(44, "Unsupported typed_value type: Struct("); +variant_test_case!(43); +variant_test_case!(44); // https://github.com/apache/arrow-rs/issues/8337 variant_test_case!(45, "Unsupported typed_value type: List("); -variant_test_case!(46, "Unsupported typed_value type: Struct("); +variant_test_case!(46); variant_test_case!(47); variant_test_case!(48); variant_test_case!(49); @@ -163,15 +161,13 @@ variant_test_case!(79); variant_test_case!(80); variant_test_case!(81); variant_test_case!(82); -// https://github.com/apache/arrow-rs/issues/8336 -variant_test_case!(83, "Unsupported typed_value type: Struct("); -variant_test_case!(84, "Unsupported typed_value type: Struct("); +variant_test_case!(83); +variant_test_case!(84); // https://github.com/apache/arrow-rs/issues/8337 variant_test_case!(85, "Unsupported typed_value type: List("); variant_test_case!(86, "Unsupported typed_value type: List("); // Is an error case (should be failing as the expected error message indicates) -// TODO: Once structs are supported, expect "Invalid variant, non-object value with shredded fields" -variant_test_case!(87, "Unsupported typed_value type: Struct("); +variant_test_case!(87, "Invalid variant, non-object value with shredded fields"); variant_test_case!(88, "Unsupported typed_value type: List("); variant_test_case!(89); variant_test_case!(90); @@ -209,23 +205,25 @@ variant_test_case!(121); variant_test_case!(122); variant_test_case!(123); variant_test_case!(124); -variant_test_case!(125, "Unsupported typed_value type: Struct"); +variant_test_case!(125); variant_test_case!(126, "Unsupported typed_value type: List("); -// Is an error case (should be failing as the expected error message indicates) +// Is an error case (error message mentions arrow data type instead of parquet logical type) variant_test_case!(127, "Illegal shredded value type: UInt32"); // Is an error case (should be failing as the expected error message indicates) -// TODO: Once structs are supported, expect "Invalid variant, non-object value with shredded fields" -variant_test_case!(128, "Unsupported typed_value type: Struct("); +variant_test_case!( + 128, + "Invalid variant, non-object value with shredded fields" +); variant_test_case!(129); -variant_test_case!(130, "Unsupported typed_value type: Struct("); +variant_test_case!(130); variant_test_case!(131); -variant_test_case!(132, "Unsupported typed_value type: Struct("); -variant_test_case!(133, "Unsupported typed_value type: Struct("); -variant_test_case!(134, "Unsupported typed_value type: Struct("); +variant_test_case!(132); +variant_test_case!(133); +variant_test_case!(134); variant_test_case!(135); variant_test_case!(136, "Unsupported typed_value type: List("); variant_test_case!(137, "Illegal shredded value type: FixedSizeBinary(4)"); -variant_test_case!(138, "Unsupported typed_value type: Struct("); +variant_test_case!(138); /// Test case definition structure matching the format from /// `parquet-testing/parquet_shredded/cases.json` From f356c855967c664c87ca670b1ce50b6ae5381bc9 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Thu, 25 Sep 2025 10:14:46 -0700 Subject: [PATCH 4/4] add fallible VariantArray::try_value --- parquet-variant-compute/src/variant_array.rs | 47 ++++++++++++++------ 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 8105b91549e9..3de5178e0e22 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -467,6 +467,29 @@ impl VariantArray { /// /// # Panics /// * if the index is out of bounds + /// * if variant construction failed + /// + /// If this is a shredded variant but has no value at the shredded location, it + /// will return [`Variant::Null`]. + /// + /// # Performance Note + /// + /// This is certainly not the most efficient way to access values in a + /// `VariantArray`, but it is useful for testing and debugging. + /// + /// Note: Does not do deep validation of the [`Variant`], so it is up to the + /// caller to ensure that the metadata and value were constructed correctly. + pub fn value(&self, index: usize) -> VariantArrayValue<'_, '_> { + self.try_value(index).unwrap() + } + + /// Try to return the [`Variant`] instance stored at the given row + /// + /// Note: This method does not check for nulls and the value is arbitrary + /// (but still well-defined) if [`is_null`](Self::is_null) returns true for the index. + /// + /// # Panics + /// * if the index is out of bounds /// * if the array value is null /// /// If this is a shredded variant but has no value at the shredded location, it @@ -480,29 +503,27 @@ impl VariantArray { /// /// Note: Does not do deep validation of the [`Variant`], so it is up to the /// caller to ensure that the metadata and value were constructed correctly. - pub fn value(&self, index: usize) -> VariantArrayValue<'_, '_> { - match (self.typed_value_field(), self.value_field()) { + pub fn try_value(&self, index: usize) -> Result, ArrowError> { + let value = match (self.typed_value_field(), self.value_field()) { // Always prefer typed_value, if available (Some(typed_value), value) if typed_value.is_valid(index) => { let metadata = VariantMetadata::new(self.metadata.value(index)); let mut builder = SingleValueVariantBuilder::new(metadata.clone()); - let Err(err) = - typed_value_to_variant(typed_value, value, index, &metadata, &mut builder) - else { - let value_bytes = builder.into_bytes(); - return VariantArrayValue::owned(metadata.clone(), value_bytes); - }; - debug_assert!(false, "typed_value_to_variant failed: {err}"); - Variant::Null.into() + typed_value_to_variant(typed_value, value, index, &metadata, &mut builder)?; + return Ok(VariantArrayValue::owned( + metadata.clone(), + builder.into_bytes(), + )); } // Otherwise fall back to value, if available (_, Some(value)) if value.is_valid(index) => { - Variant::new(self.metadata.value(index), value.value(index)).into() + Variant::new(self.metadata.value(index), value.value(index)) } // It is technically invalid for neither value nor typed_value fields to be available, // but the spec specifically requires readers to return Variant::Null in this case. - _ => Variant::Null.into(), - } + _ => Variant::Null, + }; + Ok(value.into()) } /// Return a reference to the metadata field of the [`StructArray`]