diff --git a/rust/lance-core/src/utils/mask.rs b/rust/lance-core/src/utils/mask.rs index 7258a9b3bcd..a1f56d48a84 100644 --- a/rust/lance-core/src/utils/mask.rs +++ b/rust/lance-core/src/utils/mask.rs @@ -243,6 +243,39 @@ impl std::ops::BitOr for RowAddrMask { } } +/// Common operations over a set of rows (either row ids or row addresses). +/// +/// The concrete representation can be address-based (`RowAddrTreeMap`) or +/// id-based (for example a future `RowIdSet`), but the semantics are the same: +/// a set of unique rows. +pub trait RowSetOps: Clone + Sized { + /// Logical row handle (`u64` for both row ids and row addresses). + type Row; + + /// Returns true if the set is empty. + fn is_empty(&self) -> bool; + + /// Returns the number of rows in the set, if it is known. + /// + /// Implementations that cannot always compute an exact size (for example + /// because of "full fragment" markers) should return `None`. + fn len(&self) -> Option; + + /// Remove a value from the row set. + fn remove(&mut self, row: Self::Row) -> bool; + + /// Returns whether this set contains the given row. + fn contains(&self, row: Self::Row) -> bool; + + /// Returns the union of `other` and init self. + fn union_all(other: &[&Self]) -> Self; + + /// Builds a row set from an iterator of rows. + fn from_sorted_iter(iter: I) -> Result + where + I: IntoIterator; +} + /// A collection of row addresses. /// /// Note: For stable row id mode, this may be split into a separate structure in the future. @@ -302,20 +335,14 @@ impl RowAddrSelection { } } -impl RowAddrTreeMap { - /// Create an empty set - pub fn new() -> Self { - Self::default() - } +impl RowSetOps for RowAddrTreeMap { + type Row = u64; - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { self.inner.is_empty() } - /// The number of rows in the map - /// - /// If there are any "full fragment" items then this is unknown and None is returned - pub fn len(&self) -> Option { + fn len(&self) -> Option { self.inner .values() .map(|row_addr_selection| match row_addr_selection { @@ -325,6 +352,92 @@ impl RowAddrTreeMap { .try_fold(0_u64, |acc, next| next.map(|next| next + acc)) } + fn remove(&mut self, row: Self::Row) -> bool { + let upper = (row >> 32) as u32; + let lower = row as u32; + match self.inner.get_mut(&upper) { + None => false, + Some(RowAddrSelection::Full) => { + let mut set = RoaringBitmap::full(); + set.remove(lower); + self.inner.insert(upper, RowAddrSelection::Partial(set)); + true + } + Some(RowAddrSelection::Partial(lower_set)) => { + let removed = lower_set.remove(lower); + if lower_set.is_empty() { + self.inner.remove(&upper); + } + removed + } + } + } + + fn contains(&self, row: Self::Row) -> bool { + let upper = (row >> 32) as u32; + let lower = row as u32; + match self.inner.get(&upper) { + None => false, + Some(RowAddrSelection::Full) => true, + Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower), + } + } + + fn union_all(other: &[&Self]) -> Self { + let mut new_map = BTreeMap::new(); + + for map in other { + for (fragment, selection) in &map.inner { + new_map + .entry(fragment) + // I hate this allocation, but I can't think of a better way + .or_insert_with(|| Vec::with_capacity(other.len())) + .push(selection); + } + } + + let new_map = new_map + .into_iter() + .map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections))) + .collect(); + + Self { inner: new_map } + } + + #[track_caller] + fn from_sorted_iter(iter: I) -> Result + where + I: IntoIterator, + { + let mut iter = iter.into_iter().peekable(); + let mut inner = BTreeMap::new(); + + while let Some(row_id) = iter.peek() { + let fragment_id = (row_id >> 32) as u32; + let next_bitmap_iter = iter + .peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id) + .map(|row_id| row_id as u32); + let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else { + return Err(Error::Internal { + message: "RowAddrTreeMap::from_sorted_iter called with non-sorted input" + .to_string(), + // Use the caller location since we aren't the one that got it out of order + location: std::panic::Location::caller().to_snafu_location(), + }); + }; + inner.insert(fragment_id, RowAddrSelection::Partial(bitmap)); + } + + Ok(Self { inner }) + } +} + +impl RowAddrTreeMap { + /// Create an empty set + pub fn new() -> Self { + Self::default() + } + /// An iterator of row addrs /// /// If there are any "full fragment" items then this can't be calculated and None @@ -354,7 +467,7 @@ impl RowAddrTreeMap { /// Returns true if the value was not already in the set. /// /// ```rust - /// use lance_core::utils::mask::RowAddrTreeMap; + /// use lance_core::utils::mask::{RowAddrTreeMap, RowSetOps}; /// /// let mut set = RowAddrTreeMap::new(); /// assert_eq!(set.insert(10), true); @@ -444,38 +557,6 @@ impl RowAddrTreeMap { } } - /// Returns whether the set contains the given value - pub fn contains(&self, value: u64) -> bool { - let upper = (value >> 32) as u32; - let lower = value as u32; - match self.inner.get(&upper) { - None => false, - Some(RowAddrSelection::Full) => true, - Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower), - } - } - - pub fn remove(&mut self, value: u64) -> bool { - let upper = (value >> 32) as u32; - let lower = value as u32; - match self.inner.get_mut(&upper) { - None => false, - Some(RowAddrSelection::Full) => { - let mut set = RoaringBitmap::full(); - set.remove(lower); - self.inner.insert(upper, RowAddrSelection::Partial(set)); - true - } - Some(RowAddrSelection::Partial(lower_set)) => { - let removed = lower_set.remove(lower); - if lower_set.is_empty() { - self.inner.remove(&upper); - } - removed - } - } - } - pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator) { let frag_id_set = frag_ids.into_iter().collect::>(); self.inner @@ -542,27 +623,6 @@ impl RowAddrTreeMap { Ok(Self { inner }) } - pub fn union_all(maps: &[&Self]) -> Self { - let mut new_map = BTreeMap::new(); - - for map in maps { - for (fragment, selection) in &map.inner { - new_map - .entry(fragment) - // I hate this allocation, but I can't think of a better way - .or_insert_with(|| Vec::with_capacity(maps.len())) - .push(selection); - } - } - - let new_map = new_map - .into_iter() - .map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections))) - .collect(); - - Self { inner: new_map } - } - /// Apply a mask to the row addrs /// /// For AllowList: only keep rows that are in the selection and not null @@ -597,30 +657,6 @@ impl RowAddrTreeMap { }), }) } - - #[track_caller] - pub fn from_sorted_iter(iter: impl IntoIterator) -> Result { - let mut iter = iter.into_iter().peekable(); - let mut inner = BTreeMap::new(); - - while let Some(row_id) = iter.peek() { - let fragment_id = (row_id >> 32) as u32; - let next_bitmap_iter = iter - .peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id) - .map(|row_id| row_id as u32); - let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else { - return Err(Error::Internal { - message: "RowAddrTreeMap::from_sorted_iter called with non-sorted input" - .to_string(), - // Use the caller location since we aren't the one that got it out of order - location: std::panic::Location::caller().to_snafu_location(), - }); - }; - inner.insert(fragment_id, RowAddrSelection::Partial(bitmap)); - } - - Ok(Self { inner }) - } } impl std::ops::BitOr for RowAddrTreeMap { diff --git a/rust/lance-core/src/utils/mask/nullable.rs b/rust/lance-core/src/utils/mask/nullable.rs index 24f54443474..1f4b5b7dad3 100644 --- a/rust/lance-core/src/utils/mask/nullable.rs +++ b/rust/lance-core/src/utils/mask/nullable.rs @@ -3,7 +3,7 @@ use deepsize::DeepSizeOf; -use super::{RowAddrMask, RowAddrTreeMap}; +use super::{RowAddrMask, RowAddrTreeMap, RowSetOps}; /// A set of row ids, with optional set of nulls. /// diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 3766f3381d5..12d0b6232a1 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -9,7 +9,6 @@ use std::{ sync::Arc, }; -use crate::pbold; use arrow::array::BinaryBuilder; use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; @@ -18,6 +17,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; use deepsize::DeepSizeOf; use futures::{stream, StreamExt, TryStreamExt}; +use lance_core::utils::mask::RowSetOps; use lance_core::{ cache::{CacheKey, LanceCache, WeakLanceCache}, error::LanceOptionExt, @@ -36,6 +36,7 @@ use super::{ btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, }; use super::{AnyQuery, IndexStore, ScalarIndex}; +use crate::pbold; use crate::{ frag_reuse::FragReuseIndex, scalar::{ @@ -844,6 +845,7 @@ pub mod tests { use arrow_schema::{DataType, Field, Schema}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; + use lance_core::utils::mask::RowSetOps; use lance_core::utils::{address::RowAddress, tempfile::TempObjDir}; use lance_io::object_store::ObjectStore; use std::collections::HashMap; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 5fe7cb0d14b..dbf3b99c088 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -2567,6 +2567,7 @@ mod tests { use deepsize::DeepSizeOf; use futures::stream; use futures::TryStreamExt; + use lance_core::utils::mask::RowSetOps; use lance_core::utils::tempfile::TempObjDir; use lance_core::{cache::LanceCache, utils::mask::RowAddrTreeMap}; use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt}; diff --git a/rust/lance-index/src/scalar/btree/flat.rs b/rust/lance-index/src/scalar/btree/flat.rs index 4cd029ea717..37eb84c8216 100644 --- a/rust/lance-index/src/scalar/btree/flat.rs +++ b/rust/lance-index/src/scalar/btree/flat.rs @@ -15,7 +15,7 @@ use datafusion_physical_expr::create_physical_expr; use deepsize::DeepSizeOf; use lance_arrow::RecordBatchExt; use lance_core::utils::address::RowAddress; -use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap}; +use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; use lance_core::Result; use roaring::RoaringBitmap; use tracing::instrument; diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 36530182f88..817fb803c64 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -327,7 +327,7 @@ pub mod tests { use arrow_select::take::TakeOptions; use datafusion_common::ScalarValue; use futures::FutureExt; - use lance_core::utils::mask::RowAddrTreeMap; + use lance_core::utils::mask::{RowAddrTreeMap, RowSetOps}; use lance_core::utils::tempfile::TempDir; use lance_core::ROW_ID; use lance_datagen::{array, gen_batch, ArrayGeneratorExt, BatchCount, ByteCount, RowCount}; diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index e531030a122..3f36ee399ab 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -33,7 +33,7 @@ use geoarrow_schema::{Dimension, RectType}; use lance_arrow::RecordBatchExt; use lance_core::cache::{CacheKey, LanceCache, WeakLanceCache}; use lance_core::utils::address::RowAddress; -use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap}; +use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; use lance_core::utils::tempfile::TempDir; use lance_core::{Error, Result, ROW_ID}; use lance_datafusion::chunker::chunk_concat_stream; diff --git a/rust/lance-table/src/rowids.rs b/rust/lance-table/src/rowids.rs index ad7959bb924..74b0e224bb9 100644 --- a/rust/lance-table/src/rowids.rs +++ b/rust/lance-table/src/rowids.rs @@ -36,6 +36,7 @@ pub use serde::{read_row_ids, write_row_ids}; use snafu::location; use crate::utils::LanceIteratorExtension; +use lance_core::utils::mask::RowSetOps; use segment::U64Segment; use tracing::instrument; diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 9220071d488..e0b4fdb5198 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -12,6 +12,7 @@ use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue}; use futures::{FutureExt, TryStreamExt}; use lance::{io::ObjectStore, Dataset}; use lance_core::cache::LanceCache; +use lance_core::utils::mask::RowSetOps; use lance_core::utils::tempfile::TempStrDir; use lance_datafusion::utils::reader_to_stream; use lance_datagen::{array, gen_batch, BatchCount, RowCount}; diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index 00c8e352f4b..917cfe12b45 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -278,6 +278,7 @@ impl PreFilter for DatasetPreFilter { #[cfg(test)] mod test { + use lance_core::utils::mask::RowSetOps; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use crate::dataset::WriteParams; diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index f65d7047923..bb6f9aae866 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -9,6 +9,7 @@ use crate::{ Dataset, }; use futures::{StreamExt, TryStreamExt}; +use lance_core::utils::mask::RowSetOps; use lance_core::{ utils::{deletion::DeletionVector, mask::RowAddrTreeMap}, Error, Result, diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 567c23dcafb..92c050144cc 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -25,6 +25,7 @@ use datafusion::{ }; use datafusion_physical_expr::EquivalenceProperties; use futures::{stream::BoxStream, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use lance_core::utils::mask::RowSetOps; use lance_core::{ utils::{ address::RowAddress,