Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 124 additions & 88 deletions rust/lance-core/src/utils/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,39 @@ impl std::ops::BitOr for RowAddrMask {
}
}

/// Common operations over a set of rows (either row ids or row addresses).
///
/// The concrete representation can be address-based (`RowAddrTreeMap`) or
/// id-based (for example a future `RowIdSet`), but the semantics are the same:
/// a set of unique rows.
pub trait RowSetOps: Clone + Sized {
/// Logical row handle (`u64` for both row ids and row addresses).
type Row;

/// Returns true if the set is empty.
fn is_empty(&self) -> bool;

/// Returns the number of rows in the set, if it is known.
///
/// Implementations that cannot always compute an exact size (for example
/// because of "full fragment" markers) should return `None`.
fn len(&self) -> Option<u64>;

/// Remove a value from the row set.
fn remove(&mut self, row: Self::Row) -> bool;

/// Returns whether this set contains the given row.
fn contains(&self, row: Self::Row) -> bool;

/// Returns the union of `other` and init self.
fn union_all(other: &[&Self]) -> Self;

/// Builds a row set from an iterator of rows.
fn from_sorted_iter<I>(iter: I) -> Result<Self>
where
I: IntoIterator<Item = Self::Row>;
}

/// A collection of row addresses.
///
/// Note: For stable row id mode, this may be split into a separate structure in the future.
Expand Down Expand Up @@ -302,20 +335,14 @@ impl RowAddrSelection {
}
}

impl RowAddrTreeMap {
/// Create an empty set
pub fn new() -> Self {
Self::default()
}
impl RowSetOps for RowAddrTreeMap {
type Row = u64;

pub fn is_empty(&self) -> bool {
fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// The number of rows in the map
///
/// If there are any "full fragment" items then this is unknown and None is returned
pub fn len(&self) -> Option<u64> {
fn len(&self) -> Option<u64> {
self.inner
.values()
.map(|row_addr_selection| match row_addr_selection {
Expand All @@ -325,6 +352,92 @@ impl RowAddrTreeMap {
.try_fold(0_u64, |acc, next| next.map(|next| next + acc))
}

fn remove(&mut self, row: Self::Row) -> bool {
let upper = (row >> 32) as u32;
let lower = row as u32;
match self.inner.get_mut(&upper) {
None => false,
Some(RowAddrSelection::Full) => {
let mut set = RoaringBitmap::full();
set.remove(lower);
self.inner.insert(upper, RowAddrSelection::Partial(set));
true
}
Some(RowAddrSelection::Partial(lower_set)) => {
let removed = lower_set.remove(lower);
if lower_set.is_empty() {
self.inner.remove(&upper);
}
removed
}
}
}

fn contains(&self, row: Self::Row) -> bool {
let upper = (row >> 32) as u32;
let lower = row as u32;
match self.inner.get(&upper) {
None => false,
Some(RowAddrSelection::Full) => true,
Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower),
}
}

fn union_all(other: &[&Self]) -> Self {
let mut new_map = BTreeMap::new();

for map in other {
for (fragment, selection) in &map.inner {
new_map
.entry(fragment)
// I hate this allocation, but I can't think of a better way
.or_insert_with(|| Vec::with_capacity(other.len()))
.push(selection);
}
}

let new_map = new_map
.into_iter()
.map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections)))
.collect();

Self { inner: new_map }
}

#[track_caller]
fn from_sorted_iter<I>(iter: I) -> Result<Self>
where
I: IntoIterator<Item = Self::Row>,
{
let mut iter = iter.into_iter().peekable();
let mut inner = BTreeMap::new();

while let Some(row_id) = iter.peek() {
let fragment_id = (row_id >> 32) as u32;
let next_bitmap_iter = iter
.peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id)
.map(|row_id| row_id as u32);
let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else {
return Err(Error::Internal {
message: "RowAddrTreeMap::from_sorted_iter called with non-sorted input"
.to_string(),
// Use the caller location since we aren't the one that got it out of order
location: std::panic::Location::caller().to_snafu_location(),
});
};
inner.insert(fragment_id, RowAddrSelection::Partial(bitmap));
}

Ok(Self { inner })
}
}

impl RowAddrTreeMap {
/// Create an empty set
pub fn new() -> Self {
Self::default()
}

/// An iterator of row addrs
///
/// If there are any "full fragment" items then this can't be calculated and None
Expand Down Expand Up @@ -354,7 +467,7 @@ impl RowAddrTreeMap {
/// Returns true if the value was not already in the set.
///
/// ```rust
/// use lance_core::utils::mask::RowAddrTreeMap;
/// use lance_core::utils::mask::{RowAddrTreeMap, RowSetOps};
///
/// let mut set = RowAddrTreeMap::new();
/// assert_eq!(set.insert(10), true);
Expand Down Expand Up @@ -444,38 +557,6 @@ impl RowAddrTreeMap {
}
}

/// Returns whether the set contains the given value
pub fn contains(&self, value: u64) -> bool {
let upper = (value >> 32) as u32;
let lower = value as u32;
match self.inner.get(&upper) {
None => false,
Some(RowAddrSelection::Full) => true,
Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower),
}
}

pub fn remove(&mut self, value: u64) -> bool {
let upper = (value >> 32) as u32;
let lower = value as u32;
match self.inner.get_mut(&upper) {
None => false,
Some(RowAddrSelection::Full) => {
let mut set = RoaringBitmap::full();
set.remove(lower);
self.inner.insert(upper, RowAddrSelection::Partial(set));
true
}
Some(RowAddrSelection::Partial(lower_set)) => {
let removed = lower_set.remove(lower);
if lower_set.is_empty() {
self.inner.remove(&upper);
}
removed
}
}
}

pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator<Item = u32>) {
let frag_id_set = frag_ids.into_iter().collect::<HashSet<_>>();
self.inner
Expand Down Expand Up @@ -542,27 +623,6 @@ impl RowAddrTreeMap {
Ok(Self { inner })
}

pub fn union_all(maps: &[&Self]) -> Self {
let mut new_map = BTreeMap::new();

for map in maps {
for (fragment, selection) in &map.inner {
new_map
.entry(fragment)
// I hate this allocation, but I can't think of a better way
.or_insert_with(|| Vec::with_capacity(maps.len()))
.push(selection);
}
}

let new_map = new_map
.into_iter()
.map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections)))
.collect();

Self { inner: new_map }
}

/// Apply a mask to the row addrs
///
/// For AllowList: only keep rows that are in the selection and not null
Expand Down Expand Up @@ -597,30 +657,6 @@ impl RowAddrTreeMap {
}),
})
}

#[track_caller]
pub fn from_sorted_iter(iter: impl IntoIterator<Item = u64>) -> Result<Self> {
let mut iter = iter.into_iter().peekable();
let mut inner = BTreeMap::new();

while let Some(row_id) = iter.peek() {
let fragment_id = (row_id >> 32) as u32;
let next_bitmap_iter = iter
.peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id)
.map(|row_id| row_id as u32);
let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else {
return Err(Error::Internal {
message: "RowAddrTreeMap::from_sorted_iter called with non-sorted input"
.to_string(),
// Use the caller location since we aren't the one that got it out of order
location: std::panic::Location::caller().to_snafu_location(),
});
};
inner.insert(fragment_id, RowAddrSelection::Partial(bitmap));
}

Ok(Self { inner })
}
}

impl std::ops::BitOr<Self> for RowAddrTreeMap {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-core/src/utils/mask/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use deepsize::DeepSizeOf;

use super::{RowAddrMask, RowAddrTreeMap};
use super::{RowAddrMask, RowAddrTreeMap, RowSetOps};

/// A set of row ids, with optional set of nulls.
///
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
sync::Arc,
};

use crate::pbold;
use arrow::array::BinaryBuilder;
use arrow_array::{new_null_array, Array, BinaryArray, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
Expand All @@ -18,6 +17,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
use futures::{stream, StreamExt, TryStreamExt};
use lance_core::utils::mask::RowSetOps;
use lance_core::{
cache::{CacheKey, LanceCache, WeakLanceCache},
error::LanceOptionExt,
Expand All @@ -36,6 +36,7 @@ use super::{
btree::OrderableScalarValue, BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult,
};
use super::{AnyQuery, IndexStore, ScalarIndex};
use crate::pbold;
use crate::{
frag_reuse::FragReuseIndex,
scalar::{
Expand Down Expand Up @@ -844,6 +845,7 @@ pub mod tests {
use arrow_schema::{DataType, Field, Schema};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance_core::utils::mask::RowSetOps;
use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
use lance_io::object_store::ObjectStore;
use std::collections::HashMap;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,7 @@ mod tests {
use deepsize::DeepSizeOf;
use futures::stream;
use futures::TryStreamExt;
use lance_core::utils::mask::RowSetOps;
use lance_core::utils::tempfile::TempObjDir;
use lance_core::{cache::LanceCache, utils::mask::RowAddrTreeMap};
use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt};
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/btree/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use datafusion_physical_expr::create_physical_expr;
use deepsize::DeepSizeOf;
use lance_arrow::RecordBatchExt;
use lance_core::utils::address::RowAddress;
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
use lance_core::Result;
use roaring::RoaringBitmap;
use tracing::instrument;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ pub mod tests {
use arrow_select::take::TakeOptions;
use datafusion_common::ScalarValue;
use futures::FutureExt;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_core::utils::mask::{RowAddrTreeMap, RowSetOps};
use lance_core::utils::tempfile::TempDir;
use lance_core::ROW_ID;
use lance_datagen::{array, gen_batch, ArrayGeneratorExt, BatchCount, ByteCount, RowCount};
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/rtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use geoarrow_schema::{Dimension, RectType};
use lance_arrow::RecordBatchExt;
use lance_core::cache::{CacheKey, LanceCache, WeakLanceCache};
use lance_core::utils::address::RowAddress;
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
use lance_core::utils::tempfile::TempDir;
use lance_core::{Error, Result, ROW_ID};
use lance_datafusion::chunker::chunk_concat_stream;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-table/src/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use serde::{read_row_ids, write_row_ids};
use snafu::location;

use crate::utils::LanceIteratorExtension;
use lance_core::utils::mask::RowSetOps;
use segment::U64Segment;
use tracing::instrument;

Expand Down
1 change: 1 addition & 0 deletions rust/lance/benches/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue};
use futures::{FutureExt, TryStreamExt};
use lance::{io::ObjectStore, Dataset};
use lance_core::cache::LanceCache;
use lance_core::utils::mask::RowSetOps;
use lance_core::utils::tempfile::TempStrDir;
use lance_datafusion::utils::reader_to_stream;
use lance_datagen::{array, gen_batch, BatchCount, RowCount};
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl PreFilter for DatasetPreFilter {

#[cfg(test)]
mod test {
use lance_core::utils::mask::RowSetOps;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};

use crate::dataset::WriteParams;
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/io/commit/conflict_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
Dataset,
};
use futures::{StreamExt, TryStreamExt};
use lance_core::utils::mask::RowSetOps;
use lance_core::{
utils::{deletion::DeletionVector, mask::RowAddrTreeMap},
Error, Result,
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/io/exec/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::{
};
use datafusion_physical_expr::EquivalenceProperties;
use futures::{stream::BoxStream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use lance_core::utils::mask::RowSetOps;
use lance_core::{
utils::{
address::RowAddress,
Expand Down
Loading