diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index b05d0e023653..7f723d0d49bb 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -56,7 +56,9 @@ pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder}; pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; pub use from_json::json_to_variant; -pub use shred_variant::{IntoShreddingField, ShreddedSchemaBuilder, shred_variant}; +pub use shred_variant::{ + IntoShreddingField, ShreddedSchemaBuilder, shred_variant, shred_variant_with_options, +}; pub use to_json::variant_to_json; pub use unshred_variant::unshred_variant; pub use variant_get::{GetOptions, variant_get}; diff --git a/parquet-variant-compute/src/shred_variant.rs b/parquet-variant-compute/src/shred_variant.rs index 7b181179d3d6..67123b0cb23c 100644 --- a/parquet-variant-compute/src/shred_variant.rs +++ b/parquet-variant-compute/src/shred_variant.rs @@ -68,6 +68,14 @@ use std::sync::Arc; /// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type` /// value passed to this function. pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result { + shred_variant_with_options(array, as_type, &CastOptions::default()) +} + +pub fn shred_variant_with_options( + array: &VariantArray, + as_type: &DataType, + cast_options: &CastOptions, +) -> Result { if array.typed_value_field().is_some() { return Err(ArrowError::InvalidArgumentError( "Input is already shredded".to_string(), @@ -79,10 +87,9 @@ pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> Result VariantToShreddedArrayVariantRowBuilder<'a> { // If the variant is not an array, typed_value must be null. // If the variant is an array, value must be null. match variant { - Variant::List(list) => { + Variant::List(ref list) => { self.nulls.append_non_null(); - self.value_builder.append_null(); - self.typed_value_builder - .append_value(&Variant::List(list))?; - Ok(true) + + // With `safe` cast option set to false, appending list of wrong size to + // `typed_value_builder` of type `FixedSizeList` will result in an error. In such a + // case, the provided list should be appended to the `value_builder. + let shredded = self.typed_value_builder.append_value(&variant)?; + if shredded { + self.value_builder.append_null(); + } else { + self.value_builder.append_value(Variant::List(list.clone())); + } + Ok(shredded) } other => { self.nulls.append_non_null(); @@ -690,9 +704,9 @@ mod tests { use super::*; use crate::VariantArrayBuilder; use arrow::array::{ - Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray, - GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray, ListArray, - ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray, + Array, BinaryViewArray, FixedSizeBinaryArray, FixedSizeListArray, Float64Array, + GenericListArray, GenericListViewArray, Int64Array, LargeBinaryArray, LargeStringArray, + ListArray, ListLikeArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray, }; use arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode, @@ -1608,6 +1622,67 @@ mod tests { #[test] fn test_array_shredding_as_fixed_size_list() { + let input = build_variant_array(vec![ + VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]), + VariantRow::Value(VariantValue::from("This should not be shredded")), + VariantRow::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]), + ]); + + let list_schema = + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 3); + + // The first row should be shredded, so the `value` field should be null and the + // `typed_value` field should contain the list + assert!(result.is_valid(0)); + assert!(result.value_field().unwrap().is_null(0)); + assert!(result.typed_value_field().unwrap().is_valid(0)); + + // The second row should not be shredded because the provided schema for shredding did not + // match. Hence, the `value` field should contain the raw value and the `typed_value` field + // should be null. + assert!(result.is_valid(1)); + assert!(result.value_field().unwrap().is_valid(1)); + assert!(result.typed_value_field().unwrap().is_null(1)); + + // The third row should be shredded, so the `value` field should be null and the + // `typed_value` field should contain the list + assert!(result.is_valid(2)); + assert!(result.value_field().unwrap().is_null(2)); + assert!(result.typed_value_field().unwrap().is_valid(2)); + + let typed_value = result.typed_value_field().unwrap(); + let fixed_size_list = typed_value + .as_any() + .downcast_ref::() + .expect("Expected FixedSizeListArray"); + + // Verify that typed value is `FixedSizeList`. + assert_eq!(fixed_size_list.len(), 3); + assert_eq!(fixed_size_list.value_length(), 2); + + // Verify that the first entry in the `FixedSizeList` contains the expected value. + let val0 = fixed_size_list.value(0); + let val0_struct = val0.as_any().downcast_ref::().unwrap(); + let val0_typed = val0_struct.column_by_name("typed_value").unwrap(); + let val0_ints = val0_typed.as_any().downcast_ref::().unwrap(); + assert_eq!(val0_ints.values(), &[1i64, 2i64]); + + // Verify that second entry in the `FixedSizeList` cannot be shredded hence the value is + // invalid. + assert!(fixed_size_list.is_null(1)); + + // Verify that the third entry in the `FixedSizeList` contains the expected value. + let val2 = fixed_size_list.value(2); + let val2_struct = val2.as_any().downcast_ref::().unwrap(); + let val2_typed = val2_struct.column_by_name("typed_value").unwrap(); + let val2_ints = val2_typed.as_any().downcast_ref::().unwrap(); + assert_eq!(val2_ints.values(), &[3i64, 4i64]); + } + + #[test] + fn test_array_shredding_as_fixed_size_list_wrong_size() { let input = build_variant_array(vec![VariantRow::List(vec![ VariantValue::from(1i64), VariantValue::from(2i64), @@ -1615,10 +1690,37 @@ mod tests { ])]); let list_schema = DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2); - let err = shred_variant(&input, &list_schema).unwrap_err(); - assert_eq!( - err.to_string(), - "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists" + + let result = shred_variant_with_options( + &input, + &list_schema, + &CastOptions { + safe: true, + ..Default::default() + }, + ) + .unwrap(); + assert_eq!(result.len(), 1); + + // With `safe` set to to true, the incorrect size should not raise error. + assert!(result.is_valid(0)); + assert!(result.value_field().unwrap().is_valid(0)); + assert!(result.typed_value_field().unwrap().is_null(0)); + + // With `safe` set to false, the incorrect size should raise error. + let err = shred_variant_with_options( + &input, + &list_schema, + &CastOptions { + safe: false, + ..Default::default() + }, + ) + .unwrap_err(); + assert!( + err.to_string() + .contains("Expected fixed size list of size 2, got size 3"), + "got: {err}", ); } diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 83df6879aa54..3e5ea7876d38 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -344,9 +344,9 @@ mod test { use arrow::array::{ Array, ArrayRef, AsArray, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, - Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, - LargeBinaryArray, LargeListArray, LargeListViewArray, LargeStringArray, ListArray, - ListViewArray, NullBuilder, StringArray, StringViewArray, StructArray, + FixedSizeListArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, + Int64Array, LargeBinaryArray, LargeListArray, LargeListViewArray, LargeStringArray, + ListArray, ListViewArray, NullBuilder, StringArray, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, }; use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; @@ -4112,13 +4112,29 @@ mod test { ( DataType::LargeListView(field.clone()), Arc::new(LargeListViewArray::new( - field, + field.clone(), ScalarBuffer::from(vec![0, 3]), ScalarBuffer::from(vec![3, 0]), element_array, Some(NullBuffer::from(vec![true, false])), )) as ArrayRef, ), + ( + DataType::FixedSizeList(field.clone(), 3), + Arc::new(FixedSizeListArray::new( + field, + 3, + Arc::new(Int64Array::from(vec![ + Some(1), + None, + Some(3), + None, + None, + None, + ])), + Some(NullBuffer::from(vec![true, false])), + )) as ArrayRef, + ), ]; for (request_type, expected) in expectations { @@ -4281,7 +4297,8 @@ mod test { DataType::List(item_field.clone()), DataType::LargeList(item_field.clone()), DataType::ListView(item_field.clone()), - DataType::LargeListView(item_field), + DataType::LargeListView(item_field.clone()), + DataType::FixedSizeList(item_field, 2), ]; for data_type in data_types { @@ -4298,27 +4315,46 @@ mod test { } #[test] - fn test_variant_get_fixed_size_list_not_implemented() { - let string_array: ArrayRef = Arc::new(StringArray::from(vec!["[1, 2]", "\"not a list\""])); + fn test_variant_get_fixed_size_list_wrong_size() { + let string_array: ArrayRef = Arc::new(StringArray::from(vec!["[1, 2, 3]"])); let variant_array = ArrayRef::from(json_to_variant(&string_array).unwrap()); let item_field = Arc::new(Field::new("item", Int64, true)); - for safe in [true, false] { - let options = GetOptions::new() - .with_as_type(Some(FieldRef::from(Field::new( - "result", - DataType::FixedSizeList(item_field.clone(), 2), - true, - )))) - .with_cast_options(CastOptions { - safe, - ..Default::default() - }); - let err = variant_get(&variant_array, options).unwrap_err(); - assert!( - err.to_string() - .contains("Converting unshredded variant arrays to arrow fixed-size lists") - ); - } + // With `safe` set to true, size mismatch should return Null. + let options = GetOptions::new() + .with_as_type(Some(FieldRef::from(Field::new( + "result", + DataType::FixedSizeList(item_field.clone(), 2), + true, + )))) + .with_cast_options(CastOptions { + safe: true, + ..Default::default() + }); + let result = variant_get(&variant_array, options).unwrap(); + let fixed_size_list = result + .as_any() + .downcast_ref::() + .expect("Expected FixedSizeListArray"); + assert_eq!(fixed_size_list.len(), 1); + assert!(fixed_size_list.is_null(0)); + + // With `safe` set to false, error should be raised on wrong sized fixed list. + let options = GetOptions::new() + .with_as_type(Some(FieldRef::from(Field::new( + "result", + DataType::FixedSizeList(item_field.clone(), 2), + true, + )))) + .with_cast_options(CastOptions { + safe: false, + ..Default::default() + }); + let err = variant_get(&variant_array, options).unwrap_err(); + assert!( + err.to_string() + .contains("Expected fixed size list of size 2, got size 3"), + "got: {err}", + ); } } diff --git a/parquet-variant-compute/src/variant_to_arrow.rs b/parquet-variant-compute/src/variant_to_arrow.rs index dd054a5f7d82..9d7c3a96fbd3 100644 --- a/parquet-variant-compute/src/variant_to_arrow.rs +++ b/parquet-variant-compute/src/variant_to_arrow.rs @@ -27,10 +27,10 @@ use crate::variant_array::ShreddedVariantFieldArray; use crate::{VariantArray, VariantValueArrayBuilder}; use arrow::array::{ ArrayRef, ArrowNativeTypeOp, BinaryBuilder, BinaryLikeArrayBuilder, BinaryViewArray, - BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, GenericListArray, - GenericListViewArray, LargeBinaryBuilder, LargeStringBuilder, NullArray, NullBufferBuilder, - OffsetSizeTrait, PrimitiveBuilder, StringBuilder, StringLikeArrayBuilder, StringViewBuilder, - StructArray, + BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, FixedSizeListArray, + GenericListArray, GenericListViewArray, LargeBinaryBuilder, LargeStringBuilder, NullArray, + NullBufferBuilder, OffsetSizeTrait, PrimitiveBuilder, StringBuilder, StringLikeArrayBuilder, + StringViewBuilder, StructArray, }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::compute::{CastOptions, DecimalCast}; @@ -507,6 +507,7 @@ pub(crate) enum ArrayVariantToArrowRowBuilder<'a> { LargeList(VariantToListArrowRowBuilder<'a, i64, false>), ListView(VariantToListArrowRowBuilder<'a, i32, true>), LargeListView(VariantToListArrowRowBuilder<'a, i64, true>), + FixedSizeList(VariantToFixedSizeListArrowRowBuilder<'a>), } pub(crate) struct StructVariantToArrowRowBuilder<'a> { @@ -619,10 +620,15 @@ impl<'a> ArrayVariantToArrowRowBuilder<'a> { DataType::LargeList(field) => make_list_builder!(LargeList, i64, false, field), DataType::ListView(field) => make_list_builder!(ListView, i32, true, field), DataType::LargeListView(field) => make_list_builder!(LargeListView, i64, true, field), - DataType::FixedSizeList(..) => { - return Err(ArrowError::NotYetImplemented( - "Converting unshredded variant arrays to arrow fixed-size lists".to_string(), - )); + DataType::FixedSizeList(field, size) => { + FixedSizeList(VariantToFixedSizeListArrowRowBuilder::try_new( + field.clone(), + field.data_type(), + *size, + cast_options, + capacity, + shredded, + )?) } other => { return Err(ArrowError::InvalidArgumentError(format!( @@ -639,6 +645,7 @@ impl<'a> ArrayVariantToArrowRowBuilder<'a> { Self::LargeList(builder) => builder.append_null(), Self::ListView(builder) => builder.append_null(), Self::LargeListView(builder) => builder.append_null(), + Self::FixedSizeList(builder) => builder.append_null(), } } @@ -648,6 +655,7 @@ impl<'a> ArrayVariantToArrowRowBuilder<'a> { Self::LargeList(builder) => builder.append_value(value), Self::ListView(builder) => builder.append_value(value), Self::LargeListView(builder) => builder.append_value(value), + Self::FixedSizeList(builder) => builder.append_value(value), } } @@ -657,6 +665,7 @@ impl<'a> ArrayVariantToArrowRowBuilder<'a> { Self::LargeList(builder) => builder.finish(), Self::ListView(builder) => builder.finish(), Self::LargeListView(builder) => builder.finish(), + Self::FixedSizeList(builder) => builder.finish(), } } } @@ -911,6 +920,13 @@ enum ListElementBuilder<'a> { } impl<'a> ListElementBuilder<'a> { + fn append_null(&mut self) -> Result<()> { + match self { + Self::Typed(b) => b.append_null(), + Self::Shredded(b) => b.append_null(), + } + } + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { match self { Self::Typed(b) => b.append_value(value), @@ -1050,6 +1066,97 @@ where } } +pub(crate) struct VariantToFixedSizeListArrowRowBuilder<'a> { + field: FieldRef, + list_size: i32, + element_builder: ListElementBuilder<'a>, + nulls: NullBufferBuilder, + cast_options: &'a CastOptions<'a>, +} + +impl<'a> VariantToFixedSizeListArrowRowBuilder<'a> { + fn try_new( + field: FieldRef, + element_data_type: &'a DataType, + list_size: i32, + cast_options: &'a CastOptions, + capacity: usize, + shredded: bool, + ) -> Result { + let element_builder = if shredded { + let builder = make_variant_to_shredded_variant_arrow_row_builder( + element_data_type, + cast_options, + capacity, + NullValue::ArrayElement, + )?; + ListElementBuilder::Shredded(Box::new(builder)) + } else { + let builder = + make_typed_variant_to_arrow_row_builder(element_data_type, cast_options, capacity)?; + ListElementBuilder::Typed(Box::new(builder)) + }; + Ok(Self { + field, + list_size, + element_builder, + nulls: NullBufferBuilder::new(capacity), + cast_options, + }) + } + + fn append_null(&mut self) -> Result<()> { + for _ in 0..self.list_size { + self.element_builder.append_null()?; + } + self.nulls.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result { + match variant_cast_with_options(value, self.cast_options, Variant::as_list) { + Ok(Some(list)) => { + let len = list.len(); + if len != self.list_size as usize { + if self.cast_options.safe { + self.append_null()?; + return Ok(false); + } + return Err(ArrowError::CastError(format!( + "Expected fixed size list of size {}, got size {}", + self.list_size, len + ))); + } + for element in list.iter() { + self.element_builder.append_value(element)?; + } + self.nulls.append_non_null(); + Ok(true) + } + Ok(None) => { + self.append_null()?; + Ok(false) + } + Err(_) => Err(ArrowError::CastError(format!( + "Failed to extract list from variant {value:?}" + ))), + } + } + + fn finish(mut self) -> Result { + let element_array: ArrayRef = self.element_builder.finish()?; + let field = Arc::new( + self.field + .as_ref() + .clone() + .with_data_type(element_array.data_type().clone()), + ); + let fixed_size_list_array = + FixedSizeListArray::try_new(field, self.list_size, element_array, self.nulls.finish())?; + Ok(Arc::new(fixed_size_list_array)) + } +} + /// Builder for creating VariantArray output (for path extraction without type conversion) pub(crate) struct VariantToBinaryVariantArrowRowBuilder { metadata: BinaryViewArray,