diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index df7804e7b36c..f6555a9a0559 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -97,6 +97,39 @@ impl ValueBuilder { } } +/// Macro to generate the match statement for each append_variant, try_append_variant, and +/// append_variant_bytes -- they each have slightly different handling for object and list handling. +macro_rules! variant_append_value { + ($builder:expr, $value:expr, $object_pat:pat => $object_arm:expr, $list_pat:pat => $list_arm:expr) => { + match $value { + Variant::Null => $builder.append_null(), + Variant::BooleanTrue => $builder.append_bool(true), + Variant::BooleanFalse => $builder.append_bool(false), + Variant::Int8(v) => $builder.append_int8(v), + Variant::Int16(v) => $builder.append_int16(v), + Variant::Int32(v) => $builder.append_int32(v), + Variant::Int64(v) => $builder.append_int64(v), + Variant::Date(v) => $builder.append_date(v), + Variant::Time(v) => $builder.append_time_micros(v), + Variant::TimestampMicros(v) => $builder.append_timestamp_micros(v), + Variant::TimestampNtzMicros(v) => $builder.append_timestamp_ntz_micros(v), + Variant::TimestampNanos(v) => $builder.append_timestamp_nanos(v), + Variant::TimestampNtzNanos(v) => $builder.append_timestamp_ntz_nanos(v), + Variant::Decimal4(decimal4) => $builder.append_decimal4(decimal4), + Variant::Decimal8(decimal8) => $builder.append_decimal8(decimal8), + Variant::Decimal16(decimal16) => $builder.append_decimal16(decimal16), + Variant::Float(v) => $builder.append_float(v), + Variant::Double(v) => $builder.append_double(v), + Variant::Binary(v) => $builder.append_binary(v), + Variant::String(s) => $builder.append_string(s), + Variant::ShortString(s) => $builder.append_short_string(s), + Variant::Uuid(v) => $builder.append_uuid(v), + $object_pat => $object_arm, + $list_pat => $list_arm, + } + }; +} + impl ValueBuilder { fn append_u8(&mut self, term: u8) { self.0.push(term); @@ -296,32 +329,12 @@ impl ValueBuilder { /// when validation is enabled. For a fallible version, use [`ValueBuilder::try_append_variant`] pub fn append_variant(mut state: ParentState<'_>, variant: Variant<'_, '_>) { let builder = state.value_builder(); - match variant { - Variant::Null => builder.append_null(), - Variant::BooleanTrue => builder.append_bool(true), - Variant::BooleanFalse => builder.append_bool(false), - Variant::Int8(v) => builder.append_int8(v), - Variant::Int16(v) => builder.append_int16(v), - Variant::Int32(v) => builder.append_int32(v), - Variant::Int64(v) => builder.append_int64(v), - Variant::Date(v) => builder.append_date(v), - Variant::Time(v) => builder.append_time_micros(v), - Variant::TimestampMicros(v) => builder.append_timestamp_micros(v), - Variant::TimestampNtzMicros(v) => builder.append_timestamp_ntz_micros(v), - Variant::TimestampNanos(v) => builder.append_timestamp_nanos(v), - Variant::TimestampNtzNanos(v) => builder.append_timestamp_ntz_nanos(v), - Variant::Decimal4(decimal4) => builder.append_decimal4(decimal4), - Variant::Decimal8(decimal8) => builder.append_decimal8(decimal8), - Variant::Decimal16(decimal16) => builder.append_decimal16(decimal16), - Variant::Float(v) => builder.append_float(v), - Variant::Double(v) => builder.append_double(v), - Variant::Binary(v) => builder.append_binary(v), - Variant::String(s) => builder.append_string(s), - Variant::ShortString(s) => builder.append_short_string(s), - Variant::Uuid(v) => builder.append_uuid(v), + variant_append_value!( + builder, + variant, Variant::Object(obj) => return Self::append_object(state, obj), - Variant::List(list) => return Self::append_list(state, list), - } + Variant::List(list) => return Self::append_list(state, list) + ); state.finish(); } @@ -334,37 +347,35 @@ impl ValueBuilder { variant: Variant<'_, '_>, ) -> Result<(), ArrowError> { let builder = state.value_builder(); - match variant { - Variant::Null => builder.append_null(), - Variant::BooleanTrue => builder.append_bool(true), - Variant::BooleanFalse => builder.append_bool(false), - Variant::Int8(v) => builder.append_int8(v), - Variant::Int16(v) => builder.append_int16(v), - Variant::Int32(v) => builder.append_int32(v), - Variant::Int64(v) => builder.append_int64(v), - Variant::Date(v) => builder.append_date(v), - Variant::Time(v) => builder.append_time_micros(v), - Variant::TimestampMicros(v) => builder.append_timestamp_micros(v), - Variant::TimestampNtzMicros(v) => builder.append_timestamp_ntz_micros(v), - Variant::TimestampNanos(v) => builder.append_timestamp_nanos(v), - Variant::TimestampNtzNanos(v) => builder.append_timestamp_ntz_nanos(v), - Variant::Decimal4(decimal4) => builder.append_decimal4(decimal4), - Variant::Decimal8(decimal8) => builder.append_decimal8(decimal8), - Variant::Decimal16(decimal16) => builder.append_decimal16(decimal16), - Variant::Float(v) => builder.append_float(v), - Variant::Double(v) => builder.append_double(v), - Variant::Binary(v) => builder.append_binary(v), - Variant::String(s) => builder.append_string(s), - Variant::ShortString(s) => builder.append_short_string(s), - Variant::Uuid(v) => builder.append_uuid(v), + variant_append_value!( + builder, + variant, Variant::Object(obj) => return Self::try_append_object(state, obj), - Variant::List(list) => return Self::try_append_list(state, list), - } - + Variant::List(list) => return Self::try_append_list(state, list) + ); state.finish(); Ok(()) } + /// Appends a variant to the buffer by copying raw bytes when possible. + /// + /// For objects and lists, this directly copies their underlying byte representation instead of + /// performing a logical copy and without touching the metadata builder. For other variant + /// types, this falls back to the standard append behavior. + /// + /// The caller must ensure that the metadata dictionary is already built and correct for + /// any objects or lists being appended. + pub fn append_variant_bytes(mut state: ParentState<'_>, variant: Variant<'_, '_>) { + let builder = state.value_builder(); + variant_append_value!( + builder, + variant, + Variant::Object(obj) => builder.append_slice(obj.value), + Variant::List(list) => builder.append_slice(list.value) + ); + state.finish(); + } + /// Writes out the header byte for a variant object or list, from the starting position /// of the builder, will return the position after this write fn append_header_start_from_buf_pos( @@ -1176,7 +1187,7 @@ impl VariantBuilder { /// You can use this to pre-populate a [`VariantBuilder`] with a sorted dictionary if you /// know the field names beforehand. Sorted dictionaries can accelerate field access when /// reading [`Variant`]s. - pub fn with_field_names<'a>(mut self, field_names: impl Iterator) -> Self { + pub fn with_field_names<'a>(mut self, field_names: impl IntoIterator) -> Self { self.metadata_builder.extend(field_names); self @@ -1264,6 +1275,19 @@ impl VariantBuilder { ValueBuilder::try_append_variant(state, value.into()) } + /// Appends a variant value to the builder by copying raw bytes when possible. + /// + /// For objects and lists, this directly copies their underlying byte representation instead of + /// performing a logical copy and without touching the metadata builder. For other variant + /// types, this falls back to the standard append behavior. + /// + /// The caller must ensure that the metadata dictionary entries are already built and correct for + /// any objects or lists being appended. + pub fn append_value_bytes<'m, 'd>(&mut self, value: impl Into>) { + let state = ParentState::variant(&mut self.value_builder, &mut self.metadata_builder); + ValueBuilder::append_variant_bytes(state, value.into()); + } + /// Finish the builder and return the metadata and value buffers. pub fn finish(mut self) -> (Vec, Vec) { self.metadata_builder.finish(); @@ -1352,6 +1376,19 @@ impl<'a> ListBuilder<'a> { ValueBuilder::try_append_variant(state, value.into()) } + /// Appends a variant value to this list by copying raw bytes when possible. + /// + /// For objects and lists, this directly copies their underlying byte representation instead of + /// performing a logical copy. For other variant types, this falls back to the standard append + /// behavior. + /// + /// The caller must ensure that the metadata dictionary is already built and correct for + /// any objects or lists being appended. + pub fn append_value_bytes<'m, 'd>(&mut self, value: impl Into>) { + let (state, _) = self.parent_state(); + ValueBuilder::append_variant_bytes(state, value.into()) + } + /// Builder-style API for appending a value to the list and returning self to enable method chaining. /// /// # Panics @@ -1458,7 +1495,8 @@ impl<'a> ObjectBuilder<'a> { /// - [`ObjectBuilder::insert`] for an infallible version that panics /// - [`ObjectBuilder::try_with_field`] for a builder-style API. /// - /// # Note Attempting to insert a duplicate field name produces an error if unique field + /// # Note + /// Attempting to insert a duplicate field name produces an error if unique field /// validation is enabled. Otherwise, the new value overwrites the previous field mapping /// without erasing the old value, resulting in a larger variant pub fn try_insert<'m, 'd, T: Into>>( @@ -1470,6 +1508,45 @@ impl<'a> ObjectBuilder<'a> { ValueBuilder::try_append_variant(state, value.into()) } + /// Add a field with key and value to the object by copying raw bytes when possible. + /// + /// For objects and lists, this directly copies their underlying byte representation instead of + /// performing a logical copy, and without touching the metadata builder. For other variant + /// types, this falls back to the standard append behavior. + /// + /// The caller must ensure that the metadata dictionary is already built and correct for + /// any objects or lists being appended, but the value's new field name is handled normally. + /// + /// # Panics + /// + /// This method will panic if the variant contains duplicate field names in objects + /// when validation is enabled. For a fallible version, use [`ObjectBuilder::try_insert_bytes`] + pub fn insert_bytes<'m, 'd>(&mut self, key: &str, value: impl Into>) { + self.try_insert_bytes(key, value).unwrap() + } + + /// Add a field with key and value to the object by copying raw bytes when possible. + /// + /// For objects and lists, this directly copies their underlying byte representation instead of + /// performing a logical copy, and without touching the metadata builder. For other variant + /// types, this falls back to the standard append behavior. + /// + /// The caller must ensure that the metadata dictionary is already built and correct for + /// any objects or lists being appended, but the value's new field name is handled normally. + /// + /// # Note + /// When inserting duplicate keys, the new value overwrites the previous mapping, + /// but the old value remains in the buffer, resulting in a larger variant + pub fn try_insert_bytes<'m, 'd>( + &mut self, + key: &str, + value: impl Into>, + ) -> Result<(), ArrowError> { + let (state, _) = self.parent_state(key)?; + ValueBuilder::append_variant_bytes(state, value.into()); + Ok(()) + } + /// Builder style API for adding a field with key and value to the object /// /// Same as [`ObjectBuilder::insert`], but returns `self` for chaining. @@ -2615,7 +2692,7 @@ mod tests { #[test] fn test_sorted_dictionary() { // check if variant metadatabuilders are equivalent from different ways of constructing them - let mut variant1 = VariantBuilder::new().with_field_names(["b", "c", "d"].into_iter()); + let mut variant1 = VariantBuilder::new().with_field_names(["b", "c", "d"]); let mut variant2 = { let mut builder = VariantBuilder::new(); @@ -2665,7 +2742,7 @@ mod tests { #[test] fn test_object_sorted_dictionary() { // predefine the list of field names - let mut variant1 = VariantBuilder::new().with_field_names(["a", "b", "c"].into_iter()); + let mut variant1 = VariantBuilder::new().with_field_names(["a", "b", "c"]); let mut obj = variant1.new_object(); obj.insert("c", true); @@ -2699,7 +2776,7 @@ mod tests { #[test] fn test_object_not_sorted_dictionary() { // predefine the list of field names - let mut variant1 = VariantBuilder::new().with_field_names(["b", "c", "d"].into_iter()); + let mut variant1 = VariantBuilder::new().with_field_names(["b", "c", "d"]); let mut obj = variant1.new_object(); obj.insert("c", true); @@ -2741,12 +2818,12 @@ mod tests { assert!(builder.metadata_builder.is_sorted); assert_eq!(builder.metadata_builder.num_field_names(), 1); - let builder = builder.with_field_names(["b", "c", "d"].into_iter()); + let builder = builder.with_field_names(["b", "c", "d"]); assert!(builder.metadata_builder.is_sorted); assert_eq!(builder.metadata_builder.num_field_names(), 4); - let builder = builder.with_field_names(["z", "y"].into_iter()); + let builder = builder.with_field_names(["z", "y"]); assert!(!builder.metadata_builder.is_sorted); assert_eq!(builder.metadata_builder.num_field_names(), 6); } @@ -3297,4 +3374,347 @@ mod tests { .contains("Field name 'unknown_field' not found")); } } + + #[test] + fn test_append_variant_bytes_round_trip() { + // Create a complex variant with the normal builder + let mut builder = VariantBuilder::new(); + { + let mut obj = builder.new_object(); + obj.insert("name", "Alice"); + obj.insert("age", 30i32); + { + let mut scores_list = obj.new_list("scores"); + scores_list.append_value(95i32); + scores_list.append_value(87i32); + scores_list.append_value(92i32); + scores_list.finish(); + } + { + let mut address = obj.new_object("address"); + address.insert("street", "123 Main St"); + address.insert("city", "Anytown"); + address.finish().unwrap(); + } + obj.finish().unwrap(); + } + let (metadata, value1) = builder.finish(); + let variant1 = Variant::try_new(&metadata, &value1).unwrap(); + + // Copy using the new bytes API + let metadata = VariantMetadata::new(&metadata); + let mut metadata = ReadOnlyMetadataBuilder::new(metadata); + let mut builder2 = ValueBuilder::new(); + let state = ParentState::variant(&mut builder2, &mut metadata); + ValueBuilder::append_variant_bytes(state, variant1.clone()); + let value2 = builder2.into_inner(); + + // The bytes should be identical, we merely copied them across. + assert_eq!(value1, value2); + } + + #[test] + fn test_object_insert_bytes_subset() { + // Create an original object, making sure to inject the field names we'll add later. + let mut builder = VariantBuilder::new().with_field_names(["new_field", "another_field"]); + { + let mut obj = builder.new_object(); + obj.insert("field1", "value1"); + obj.insert("field2", 42i32); + obj.insert("field3", true); + obj.insert("field4", "value4"); + obj.finish().unwrap(); + } + let (metadata1, value1) = builder.finish(); + let original_variant = Variant::try_new(&metadata1, &value1).unwrap(); + let original_obj = original_variant.as_object().unwrap(); + + // Create a new object copying subset of fields interleaved with new ones + let metadata2 = VariantMetadata::new(&metadata1); + let mut metadata2 = ReadOnlyMetadataBuilder::new(metadata2); + let mut builder2 = ValueBuilder::new(); + let state = ParentState::variant(&mut builder2, &mut metadata2); + { + let mut obj = ObjectBuilder::new(state, true); + + // Copy field1 using bytes API + obj.insert_bytes("field1", original_obj.get("field1").unwrap()); + + // Add new field + obj.insert("new_field", "new_value"); + + // Copy field3 using bytes API + obj.insert_bytes("field3", original_obj.get("field3").unwrap()); + + // Add another new field + obj.insert("another_field", 99i32); + + // Copy field2 using bytes API + obj.insert_bytes("field2", original_obj.get("field2").unwrap()); + + obj.finish().unwrap(); + } + let value2 = builder2.into_inner(); + let result_variant = Variant::try_new(&metadata1, &value2).unwrap(); + let result_obj = result_variant.as_object().unwrap(); + + // Verify the object contains expected fields + assert_eq!(result_obj.len(), 5); + assert_eq!( + result_obj.get("field1").unwrap().as_string().unwrap(), + "value1" + ); + assert_eq!(result_obj.get("field2").unwrap().as_int32().unwrap(), 42); + assert!(result_obj.get("field3").unwrap().as_boolean().unwrap()); + assert_eq!( + result_obj.get("new_field").unwrap().as_string().unwrap(), + "new_value" + ); + assert_eq!( + result_obj.get("another_field").unwrap().as_int32().unwrap(), + 99 + ); + } + + #[test] + fn test_list_append_bytes_subset() { + // Create an original list + let mut builder = VariantBuilder::new(); + { + let mut list = builder.new_list(); + list.append_value("item1"); + list.append_value(42i32); + list.append_value(true); + list.append_value("item4"); + list.append_value(1.234f64); + list.finish(); + } + let (metadata1, value1) = builder.finish(); + let original_variant = Variant::try_new(&metadata1, &value1).unwrap(); + let original_list = original_variant.as_list().unwrap(); + + // Create a new list copying subset of elements interleaved with new ones + let metadata2 = VariantMetadata::new(&metadata1); + let mut metadata2 = ReadOnlyMetadataBuilder::new(metadata2); + let mut builder2 = ValueBuilder::new(); + let state = ParentState::variant(&mut builder2, &mut metadata2); + { + let mut list = ListBuilder::new(state, true); + + // Copy first element using bytes API + list.append_value_bytes(original_list.get(0).unwrap()); + + // Add new element + list.append_value("new_item"); + + // Copy third element using bytes API + list.append_value_bytes(original_list.get(2).unwrap()); + + // Add another new element + list.append_value(99i32); + + // Copy last element using bytes API + list.append_value_bytes(original_list.get(4).unwrap()); + + list.finish(); + } + let value2 = builder2.into_inner(); + let result_variant = Variant::try_new(&metadata1, &value2).unwrap(); + let result_list = result_variant.as_list().unwrap(); + + // Verify the list contains expected elements + assert_eq!(result_list.len(), 5); + assert_eq!(result_list.get(0).unwrap().as_string().unwrap(), "item1"); + assert_eq!(result_list.get(1).unwrap().as_string().unwrap(), "new_item"); + assert!(result_list.get(2).unwrap().as_boolean().unwrap()); + assert_eq!(result_list.get(3).unwrap().as_int32().unwrap(), 99); + assert_eq!(result_list.get(4).unwrap().as_f64().unwrap(), 1.234); + } + + #[test] + fn test_complex_nested_filtering_injection() { + // Create a complex nested structure: object -> list -> objects. Make sure to pre-register + // the extra field names we'll need later while manipulating variant bytes. + let mut builder = VariantBuilder::new().with_field_names([ + "active_count", + "active_users", + "computed_score", + "processed_at", + "status", + ]); + + { + let mut root_obj = builder.new_object(); + root_obj.insert("metadata", "original"); + + { + let mut users_list = root_obj.new_list("users"); + + // User 1 + { + let mut user1 = users_list.new_object(); + user1.insert("id", 1i32); + user1.insert("name", "Alice"); + user1.insert("active", true); + user1.finish().unwrap(); + } + + // User 2 + { + let mut user2 = users_list.new_object(); + user2.insert("id", 2i32); + user2.insert("name", "Bob"); + user2.insert("active", false); + user2.finish().unwrap(); + } + + // User 3 + { + let mut user3 = users_list.new_object(); + user3.insert("id", 3i32); + user3.insert("name", "Charlie"); + user3.insert("active", true); + user3.finish().unwrap(); + } + + users_list.finish(); + } + + root_obj.insert("total_count", 3i32); + root_obj.finish().unwrap(); + } + let (metadata1, value1) = builder.finish(); + let original_variant = Variant::try_new(&metadata1, &value1).unwrap(); + let original_obj = original_variant.as_object().unwrap(); + let original_users = original_obj.get("users").unwrap(); + let original_users = original_users.as_list().unwrap(); + + // Create filtered/modified version: only copy active users and inject new data + let metadata2 = VariantMetadata::new(&metadata1); + let mut metadata2 = ReadOnlyMetadataBuilder::new(metadata2); + let mut builder2 = ValueBuilder::new(); + let state = ParentState::variant(&mut builder2, &mut metadata2); + { + let mut root_obj = ObjectBuilder::new(state, true); + + // Copy metadata using bytes API + root_obj.insert_bytes("metadata", original_obj.get("metadata").unwrap()); + + // Add processing timestamp + root_obj.insert("processed_at", "2024-01-01T00:00:00Z"); + + { + let mut filtered_users = root_obj.new_list("active_users"); + + // Copy only active users and inject additional data + for i in 0..original_users.len() { + let user = original_users.get(i).unwrap(); + let user = user.as_object().unwrap(); + if user.get("active").unwrap().as_boolean().unwrap() { + { + let mut new_user = filtered_users.new_object(); + + // Copy existing fields using bytes API + new_user.insert_bytes("id", user.get("id").unwrap()); + new_user.insert_bytes("name", user.get("name").unwrap()); + + // Inject new computed field + let user_id = user.get("id").unwrap().as_int32().unwrap(); + new_user.insert("computed_score", user_id * 10); + + // Add status transformation (don't copy the 'active' field) + new_user.insert("status", "verified"); + + new_user.finish().unwrap(); + } + } + } + + // Inject a completely new user + { + let mut new_user = filtered_users.new_object(); + new_user.insert("id", 999i32); + new_user.insert("name", "System User"); + new_user.insert("computed_score", 0i32); + new_user.insert("status", "system"); + new_user.finish().unwrap(); + } + + filtered_users.finish(); + } + + // Update count + root_obj.insert("active_count", 3i32); // 2 active + 1 new + + root_obj.finish().unwrap(); + } + let value2 = builder2.into_inner(); + let result_variant = Variant::try_new(&metadata1, &value2).unwrap(); + let result_obj = result_variant.as_object().unwrap(); + + // Verify the filtered/modified structure + assert_eq!( + result_obj.get("metadata").unwrap().as_string().unwrap(), + "original" + ); + assert_eq!( + result_obj.get("processed_at").unwrap().as_string().unwrap(), + "2024-01-01T00:00:00Z" + ); + assert_eq!( + result_obj.get("active_count").unwrap().as_int32().unwrap(), + 3 + ); + + let active_users = result_obj.get("active_users").unwrap(); + let active_users = active_users.as_list().unwrap(); + assert_eq!(active_users.len(), 3); + + // Verify Alice (id=1, was active) + let alice = active_users.get(0).unwrap(); + let alice = alice.as_object().unwrap(); + assert_eq!(alice.get("id").unwrap().as_int32().unwrap(), 1); + assert_eq!(alice.get("name").unwrap().as_string().unwrap(), "Alice"); + assert_eq!(alice.get("computed_score").unwrap().as_int32().unwrap(), 10); + assert_eq!( + alice.get("status").unwrap().as_string().unwrap(), + "verified" + ); + assert!(alice.get("active").is_none()); // This field was not copied + + // Verify Charlie (id=3, was active) - Bob (id=2) was not active so not included + let charlie = active_users.get(1).unwrap(); + let charlie = charlie.as_object().unwrap(); + assert_eq!(charlie.get("id").unwrap().as_int32().unwrap(), 3); + assert_eq!(charlie.get("name").unwrap().as_string().unwrap(), "Charlie"); + assert_eq!( + charlie.get("computed_score").unwrap().as_int32().unwrap(), + 30 + ); + assert_eq!( + charlie.get("status").unwrap().as_string().unwrap(), + "verified" + ); + + // Verify injected system user + let system_user = active_users.get(2).unwrap(); + let system_user = system_user.as_object().unwrap(); + assert_eq!(system_user.get("id").unwrap().as_int32().unwrap(), 999); + assert_eq!( + system_user.get("name").unwrap().as_string().unwrap(), + "System User" + ); + assert_eq!( + system_user + .get("computed_score") + .unwrap() + .as_int32() + .unwrap(), + 0 + ); + assert_eq!( + system_user.get("status").unwrap().as_string().unwrap(), + "system" + ); + } } diff --git a/parquet-variant/src/variant/object.rs b/parquet-variant/src/variant/object.rs index 9542f31e6073..2d58c897c118 100644 --- a/parquet-variant/src/variant/object.rs +++ b/parquet-variant/src/variant/object.rs @@ -904,7 +904,7 @@ mod tests { // create another object pre-filled with field names, b and a // but insert the fields in the order of a, b - let mut b = VariantBuilder::new().with_field_names(["b", "a"].into_iter()); + let mut b = VariantBuilder::new().with_field_names(["b", "a"]); let mut o = b.new_object(); o.insert("a", ()); @@ -939,7 +939,7 @@ mod tests { assert!(v1.metadata().unwrap().is_sorted()); // create a second object with different insertion order - let mut b = VariantBuilder::new().with_field_names(["d", "c", "b", "a"].into_iter()); + let mut b = VariantBuilder::new().with_field_names(["d", "c", "b", "a"]); let mut o = b.new_object(); o.insert("b", 4.3);