From f6044ada8a8835762afe005d545a95a9490e4641 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 29 Dec 2025 11:21:40 -0800 Subject: [PATCH 1/3] fix: filter garbage entries from null maps during encoding Null map entries can have non-zero length with garbage values that should be ignored. MapStructuralEncoder was passing all entries to the child encoder, but repdef only counted valid entries, which caused errors to occur when encoding Structs with Map values. Add MapArrayExt trait (mirroring ListArrayExt) with filter_garbage_nulls() and trimmed_entries() methods, and use them in MapStructuralEncoder. --- rust/lance-arrow/src/lib.rs | 1 + rust/lance-arrow/src/map.rs | 140 ++++++++++++++++++ .../src/encodings/logical/map.rs | 79 ++++++++-- 3 files changed, 208 insertions(+), 12 deletions(-) create mode 100644 rust/lance-arrow/src/map.rs diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 97738938ef2..85a53ed70b3 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -33,6 +33,7 @@ pub use floats::*; pub mod cast; pub mod json; pub mod list; +pub mod map; pub mod memory; pub mod r#struct; diff --git a/rust/lance-arrow/src/map.rs b/rust/lance-arrow/src/map.rs new file mode 100644 index 00000000000..15be85a2946 --- /dev/null +++ b/rust/lance-arrow/src/map.rs @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{Array, BooleanArray, MapArray}; +use arrow_buffer::{BooleanBufferBuilder, OffsetBuffer, ScalarBuffer}; + +pub trait MapArrayExt { + /// Filters out masked null items from the map array + /// + /// Similar to ListArrayExt::filter_garbage_nulls, but for Map arrays. + /// Null map entries may have non-zero length with garbage values that should be ignored. + /// This function filters the entries array to remove the garbage values. + /// + /// The output map will always have zero-length nulls. + fn filter_garbage_nulls(&self) -> Self; + + /// Returns a copy of the map's entries array that has been sliced to size + /// + /// Similar to ListArrayExt::trimmed_values, but for Map arrays. + fn trimmed_entries(&self) -> Arc; +} + +impl MapArrayExt for MapArray { + fn filter_garbage_nulls(&self) -> Self { + if self.is_empty() { + return self.clone(); + } + let Some(validity) = self.nulls().cloned() else { + return self.clone(); + }; + + let entries = self.entries(); + let mut should_keep = BooleanBufferBuilder::new(entries.len()); + + // Handle preamble (entries before first offset) + should_keep.append_n(self.offsets().first().copied().unwrap_or(0) as usize, false); + + let mut new_offsets: Vec = Vec::with_capacity(self.len() + 1); + new_offsets.push(0); + let mut cur_len: i32 = 0; + for (offset, is_valid) in self.offsets().windows(2).zip(validity.iter()) { + let len = offset[1] - offset[0]; + should_keep.append_n(len as usize, is_valid); + if is_valid { + cur_len += len; + } + new_offsets.push(cur_len); + } + + // Handle trailer (entries after last offset) + should_keep.append_n(entries.len() - should_keep.len(), false); + + let should_keep = BooleanArray::new(should_keep.finish(), None); + let new_entries = arrow_select::filter::filter(entries, &should_keep) + .expect("filter should succeed") + .as_any() + .downcast_ref::() + .expect("map entries should be struct") + .clone(); + + let (entries_field, keys_sorted) = match self.data_type() { + arrow_schema::DataType::Map(field, sorted) => (field.clone(), *sorted), + _ => unreachable!(), + }; + + Self::new( + entries_field, + OffsetBuffer::new(ScalarBuffer::from(new_offsets)), + new_entries, + Some(validity), + keys_sorted, + ) + } + + fn trimmed_entries(&self) -> Arc { + let first_value = self.offsets().first().copied().unwrap_or(0) as usize; + let last_value = self.offsets().last().copied().unwrap_or(0) as usize; + Arc::new(self.entries().slice(first_value, last_value - first_value)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, MapArray, StringArray, StructArray}; + use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; + use arrow_schema::{DataType, Field, Fields}; + + use super::*; + + fn make_test_map() -> MapArray { + // Create a map array with 3 maps: + // - map 0: {"a": 1, "b": 2} (valid, 2 entries) + // - map 1: {"c": 3} (null but has 1 garbage entry) + // - map 2: {"d": 4, "e": 5} (valid, 2 entries) + let keys = StringArray::from(vec!["a", "b", "c", "d", "e"]); + let values = Int32Array::from(vec![1, 2, 3, 4, 5]); + + let entries_fields = Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + ]); + let entries = StructArray::new( + entries_fields.clone(), + vec![Arc::new(keys), Arc::new(values)], + None, + ); + + let entries_field = Arc::new(Field::new("entries", DataType::Struct(entries_fields), false)); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 2, 3, 5])); + let validity = NullBuffer::from(vec![true, false, true]); + + MapArray::new(entries_field, offsets, entries, Some(validity), false) + } + + #[test] + fn test_filter_garbage_nulls() { + let map_arr = make_test_map(); + assert_eq!(map_arr.len(), 3); + assert_eq!(map_arr.entries().len(), 5); + + let filtered = map_arr.filter_garbage_nulls(); + assert_eq!(filtered.len(), 3); + // Garbage entry from null map should be removed + assert_eq!(filtered.entries().len(), 4); + + // Check offsets: [0, 2, 2, 4] (null map now has 0 length) + assert_eq!(filtered.value_offsets(), &[0, 2, 2, 4]); + } + + #[test] + fn test_trimmed_entries() { + let map_arr = make_test_map(); + let trimmed = map_arr.trimmed_entries(); + assert_eq!(trimmed.len(), 5); + } +} diff --git a/rust/lance-encoding/src/encodings/logical/map.rs b/rust/lance-encoding/src/encodings/logical/map.rs index 4205a01a892..154ee642560 100644 --- a/rust/lance-encoding/src/encodings/logical/map.rs +++ b/rust/lance-encoding/src/encodings/logical/map.rs @@ -7,6 +7,7 @@ use arrow_array::{Array, ArrayRef, MapArray}; use arrow_schema::DataType; use futures::future::BoxFuture; use lance_arrow::deepcopy::deep_copy_nulls; +use lance_arrow::map::MapArrayExt; use lance_core::{Error, Result}; use snafu::location; @@ -53,22 +54,22 @@ impl FieldEncoder for MapStructuralEncoder { .downcast_ref::() .expect("MapEncoder used for non-map data"); - // Map internally has offsets and entries (struct array) - let entries = map_array.entries(); - let offsets = map_array.offsets(); - // Add offsets to RepDefBuilder to handle nullability and list structure - if self.keep_original_array { - repdef.add_offsets(offsets.clone(), array.nulls().cloned()) + let has_garbage_values = if self.keep_original_array { + repdef.add_offsets(map_array.offsets().clone(), array.nulls().cloned()) + } else { + repdef.add_offsets(map_array.offsets().clone(), deep_copy_nulls(array.nulls())) + }; + + // Get entries, filtering garbage if needed (similar to ListStructuralEncoder) + let entries = if has_garbage_values { + map_array.filter_garbage_nulls().trimmed_entries() } else { - repdef.add_offsets(offsets.clone(), deep_copy_nulls(array.nulls())) + map_array.trimmed_entries() }; - // Pass the entries (struct array) to the child encoder - // Convert to Arc - let entries_arc: ArrayRef = Arc::new(entries.clone()); self.child - .maybe_encode(entries_arc, external_buffers, repdef, row_number, num_rows) + .maybe_encode(entries, external_buffers, repdef, row_number, num_rows) } fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result> { @@ -240,7 +241,7 @@ mod tests { builder::{Int32Builder, MapBuilder, StringBuilder}, Array, Int32Array, MapArray, StringArray, StructArray, }; - use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field, Fields}; use crate::encoder::{default_encoding_strategy, ColumnIndexSequence, EncodingOptions}; @@ -410,6 +411,60 @@ mod tests { .await; } + #[test_log::test(tokio::test)] + async fn test_map_in_nullable_struct() { + // Test Struct where null struct rows have garbage map entries. + // The encoder must filter these garbage entries before encoding. + let entries_fields = Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + ]); + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields.clone()), + false, + )); + let map_entries = StructArray::new( + entries_fields, + vec![ + Arc::new(StringArray::from(vec!["a", "garbage", "b"])), + Arc::new(Int32Array::from(vec![1, 999, 2])), + ], + None, + ); + // map0: {"a": 1}, map1 (garbage): {"garbage": 999}, map2: {"b": 2} + let map_array: Arc = Arc::new(MapArray::new( + entries_field, + OffsetBuffer::new(ScalarBuffer::from(vec![0, 1, 2, 3])), + map_entries, + None, // No nulls at map level - nulls come from struct + false, + )); + + let struct_array = StructArray::new( + Fields::from(vec![ + Field::new("id", DataType::Int32, true), + Field::new("props", map_array.data_type().clone(), true), + ]), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + map_array, + ], + Some(NullBuffer::from(vec![true, false, true])), // Middle row is null + ); + + let test_cases = TestCases::default() + .with_range(0..3) + .with_min_file_version(LanceFileVersion::V2_2); + + check_round_trip_encoding_of_data( + vec![Arc::new(struct_array)], + &test_cases, + HashMap::new(), + ) + .await; + } + #[test_log::test(tokio::test)] async fn test_list_of_maps() { // Test List> From 6342a156b031b216f38e1bc8481389a68702994d Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 29 Dec 2025 12:38:32 -0800 Subject: [PATCH 2/3] lint --- rust/lance-arrow/src/map.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/lance-arrow/src/map.rs b/rust/lance-arrow/src/map.rs index 15be85a2946..07b5706c47a 100644 --- a/rust/lance-arrow/src/map.rs +++ b/rust/lance-arrow/src/map.rs @@ -109,7 +109,11 @@ mod tests { None, ); - let entries_field = Arc::new(Field::new("entries", DataType::Struct(entries_fields), false)); + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, + )); let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 2, 3, 5])); let validity = NullBuffer::from(vec![true, false, true]); From 2ab3e7ce569e085708146c4e4e9f6e530c4f1317 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 31 Dec 2025 01:03:11 +0000 Subject: [PATCH 3/3] update per feedback --- rust/lance-arrow/src/lib.rs | 1 - rust/lance-arrow/src/map.rs | 144 ------------------ .../src/encodings/logical/map.rs | 11 +- 3 files changed, 6 insertions(+), 150 deletions(-) delete mode 100644 rust/lance-arrow/src/map.rs diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 85a53ed70b3..97738938ef2 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -33,7 +33,6 @@ pub use floats::*; pub mod cast; pub mod json; pub mod list; -pub mod map; pub mod memory; pub mod r#struct; diff --git a/rust/lance-arrow/src/map.rs b/rust/lance-arrow/src/map.rs deleted file mode 100644 index 07b5706c47a..00000000000 --- a/rust/lance-arrow/src/map.rs +++ /dev/null @@ -1,144 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -use std::sync::Arc; - -use arrow_array::{Array, BooleanArray, MapArray}; -use arrow_buffer::{BooleanBufferBuilder, OffsetBuffer, ScalarBuffer}; - -pub trait MapArrayExt { - /// Filters out masked null items from the map array - /// - /// Similar to ListArrayExt::filter_garbage_nulls, but for Map arrays. - /// Null map entries may have non-zero length with garbage values that should be ignored. - /// This function filters the entries array to remove the garbage values. - /// - /// The output map will always have zero-length nulls. - fn filter_garbage_nulls(&self) -> Self; - - /// Returns a copy of the map's entries array that has been sliced to size - /// - /// Similar to ListArrayExt::trimmed_values, but for Map arrays. - fn trimmed_entries(&self) -> Arc; -} - -impl MapArrayExt for MapArray { - fn filter_garbage_nulls(&self) -> Self { - if self.is_empty() { - return self.clone(); - } - let Some(validity) = self.nulls().cloned() else { - return self.clone(); - }; - - let entries = self.entries(); - let mut should_keep = BooleanBufferBuilder::new(entries.len()); - - // Handle preamble (entries before first offset) - should_keep.append_n(self.offsets().first().copied().unwrap_or(0) as usize, false); - - let mut new_offsets: Vec = Vec::with_capacity(self.len() + 1); - new_offsets.push(0); - let mut cur_len: i32 = 0; - for (offset, is_valid) in self.offsets().windows(2).zip(validity.iter()) { - let len = offset[1] - offset[0]; - should_keep.append_n(len as usize, is_valid); - if is_valid { - cur_len += len; - } - new_offsets.push(cur_len); - } - - // Handle trailer (entries after last offset) - should_keep.append_n(entries.len() - should_keep.len(), false); - - let should_keep = BooleanArray::new(should_keep.finish(), None); - let new_entries = arrow_select::filter::filter(entries, &should_keep) - .expect("filter should succeed") - .as_any() - .downcast_ref::() - .expect("map entries should be struct") - .clone(); - - let (entries_field, keys_sorted) = match self.data_type() { - arrow_schema::DataType::Map(field, sorted) => (field.clone(), *sorted), - _ => unreachable!(), - }; - - Self::new( - entries_field, - OffsetBuffer::new(ScalarBuffer::from(new_offsets)), - new_entries, - Some(validity), - keys_sorted, - ) - } - - fn trimmed_entries(&self) -> Arc { - let first_value = self.offsets().first().copied().unwrap_or(0) as usize; - let last_value = self.offsets().last().copied().unwrap_or(0) as usize; - Arc::new(self.entries().slice(first_value, last_value - first_value)) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow_array::{Int32Array, MapArray, StringArray, StructArray}; - use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; - use arrow_schema::{DataType, Field, Fields}; - - use super::*; - - fn make_test_map() -> MapArray { - // Create a map array with 3 maps: - // - map 0: {"a": 1, "b": 2} (valid, 2 entries) - // - map 1: {"c": 3} (null but has 1 garbage entry) - // - map 2: {"d": 4, "e": 5} (valid, 2 entries) - let keys = StringArray::from(vec!["a", "b", "c", "d", "e"]); - let values = Int32Array::from(vec![1, 2, 3, 4, 5]); - - let entries_fields = Fields::from(vec![ - Field::new("keys", DataType::Utf8, false), - Field::new("values", DataType::Int32, true), - ]); - let entries = StructArray::new( - entries_fields.clone(), - vec![Arc::new(keys), Arc::new(values)], - None, - ); - - let entries_field = Arc::new(Field::new( - "entries", - DataType::Struct(entries_fields), - false, - )); - let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 2, 3, 5])); - let validity = NullBuffer::from(vec![true, false, true]); - - MapArray::new(entries_field, offsets, entries, Some(validity), false) - } - - #[test] - fn test_filter_garbage_nulls() { - let map_arr = make_test_map(); - assert_eq!(map_arr.len(), 3); - assert_eq!(map_arr.entries().len(), 5); - - let filtered = map_arr.filter_garbage_nulls(); - assert_eq!(filtered.len(), 3); - // Garbage entry from null map should be removed - assert_eq!(filtered.entries().len(), 4); - - // Check offsets: [0, 2, 2, 4] (null map now has 0 length) - assert_eq!(filtered.value_offsets(), &[0, 2, 2, 4]); - } - - #[test] - fn test_trimmed_entries() { - let map_arr = make_test_map(); - let trimmed = map_arr.trimmed_entries(); - assert_eq!(trimmed.len(), 5); - } -} diff --git a/rust/lance-encoding/src/encodings/logical/map.rs b/rust/lance-encoding/src/encodings/logical/map.rs index 154ee642560..8d70b0fa532 100644 --- a/rust/lance-encoding/src/encodings/logical/map.rs +++ b/rust/lance-encoding/src/encodings/logical/map.rs @@ -3,11 +3,11 @@ use std::{ops::Range, sync::Arc}; -use arrow_array::{Array, ArrayRef, MapArray}; +use arrow_array::{Array, ArrayRef, ListArray, MapArray}; use arrow_schema::DataType; use futures::future::BoxFuture; use lance_arrow::deepcopy::deep_copy_nulls; -use lance_arrow::map::MapArrayExt; +use lance_arrow::list::ListArrayExt; use lance_core::{Error, Result}; use snafu::location; @@ -61,11 +61,12 @@ impl FieldEncoder for MapStructuralEncoder { repdef.add_offsets(map_array.offsets().clone(), deep_copy_nulls(array.nulls())) }; - // Get entries, filtering garbage if needed (similar to ListStructuralEncoder) + // MapArray is physically a ListArray, so convert and use ListArrayExt + let list_array: ListArray = map_array.clone().into(); let entries = if has_garbage_values { - map_array.filter_garbage_nulls().trimmed_entries() + list_array.filter_garbage_nulls().trimmed_values() } else { - map_array.trimmed_entries() + list_array.trimmed_values() }; self.child