Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::{future::BoxFuture, FutureExt, Stream};
use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery};
use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap};
use lance_core::{Error, Result};
use roaring::RoaringBitmap;
use serde::Serialize;
use snafu::location;

Expand Down Expand Up @@ -888,10 +889,14 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
) -> Result<CreatedIndex>;

/// Add the new data into the index, creating an updated version of the index in `dest_store`
///
/// If `valid_old_fragments` is provided, old index data for fragments not in the bitmap
/// will be filtered out during the merge.
Comment on lines +892 to +894
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this only matters for btree because it is the only index that scans the old portion of the index instead of rescanning the whole column?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, bitmap does this too. I need to fix that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed bitmap. Other indices should be done as a follow up, but I'm not aware of a code path where they cause a bug yet. Many of them like ZoneMap and BloomFilter given inexact results.

async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex>;

/// Returns the criteria that will be used to update the index
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ impl ScalarIndex for BitmapIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
let mut state = HashMap::new();

Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl ScalarIndex for BloomFilterIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
// Re-train bloom filters for the appended data using the shared trainer
let params = BloomFilterIndexBuilderParams {
Expand Down
41 changes: 33 additions & 8 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,20 +1302,21 @@ impl BTreeIndex {
)))
}

async fn into_old_data(self) -> Result<Arc<dyn ExecutionPlan>> {
let stream = self.into_data_stream().await?;
Ok(Arc::new(OneShotExec::new(stream)))
}

async fn combine_old_new(
self,
new_data: SendableRecordBatchStream,
chunk_size: u64,
valid_old_fragments: Option<RoaringBitmap>,
) -> Result<SendableRecordBatchStream> {
let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?;

let new_input = Arc::new(OneShotExec::new(new_data));
let old_input = self.into_old_data().await?;
let old_stream = self.into_data_stream().await?;
let old_stream = match valid_old_fragments {
Some(valid_frags) => filter_row_ids_by_fragments(old_stream, valid_frags),
None => old_stream,
};
let old_input = Arc::new(OneShotExec::new(old_stream));
debug_assert_eq!(
old_input.schema().flattened_fields().len(),
new_input.schema().flattened_fields().len()
Expand Down Expand Up @@ -1344,6 +1345,29 @@ impl BTreeIndex {
}
}

/// Filter a stream of record batches to only include rows whose row address
/// belongs to a fragment in `valid_fragments`. Row addresses encode the fragment
/// ID in the upper 32 bits.
fn filter_row_ids_by_fragments(
stream: SendableRecordBatchStream,
valid_fragments: RoaringBitmap,
) -> SendableRecordBatchStream {
let schema = stream.schema();
let filtered = stream.map(move |batch_result| {
let batch = batch_result?;
let row_ids = batch[ROW_ID]
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("expected UInt64Array for row_id column");
let mask: arrow_array::BooleanArray = row_ids
.iter()
.map(|id| id.map(|id| valid_fragments.contains((id >> 32) as u32)))
.collect();
Ok(arrow_select::filter::filter_record_batch(&batch, &mask)?)
});
Box::pin(RecordBatchStreamAdapter::new(schema, filtered))
}

fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
match bound {
Bound::Unbounded => Bound::Unbounded,
Expand Down Expand Up @@ -1570,11 +1594,12 @@ impl ScalarIndex for BTreeIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
// Merge the existing index data with the new data and then retrain the index on the merged stream
let merged_data_source = self
.clone()
.combine_old_new(new_data, self.batch_size)
.combine_old_new(new_data, self.batch_size, valid_old_fragments.cloned())
.await?;
train_btree_index(merged_data_source, dest_store, self.batch_size, None, None).await?;

Expand Down Expand Up @@ -3984,7 +4009,7 @@ mod tests {

// update the ranged index
ranged_index
.update(update_data_source, new_store.as_ref())
.update(update_data_source, new_store.as_ref(), None)
.await
.expect("Error in updating ranged index");

Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ impl ScalarIndex for InvertedIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
self.to_builder().update(new_data, dest_store).await?;

Expand Down
6 changes: 5 additions & 1 deletion rust/lance-index/src/scalar/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ impl ScalarIndex for JsonIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
let target_created = self.target_index.update(new_data, dest_store).await?;
let target_created = self
.target_index
.update(new_data, dest_store, valid_old_fragments)
.await?;
let json_details = crate::pb::JsonIndexDetails {
path: self.path.clone(),
target_details: Some(target_created.index_details),
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ impl ScalarIndex for LabelListIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
self.values_index
.update(unnest_chunks(new_data)?, dest_store)
.update(unnest_chunks(new_data)?, dest_store, valid_old_fragments)
.await?;

Ok(CreatedIndex {
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ pub mod tests {
.update(
lance_datafusion::utils::reader_to_stream(Box::new(data)),
updated_index_store.as_ref(),
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1290,6 +1291,7 @@ pub mod tests {
.update(
lance_datafusion::utils::reader_to_stream(Box::new(data)),
updated_index_store.as_ref(),
None,
)
.await
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-index/src/scalar/ngram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ impl ScalarIndex for NGramIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
let mut builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default())?;
let spill_files = builder.train(new_data).await?;
Expand Down Expand Up @@ -1620,7 +1621,7 @@ mod tests {
Arc::new(LanceCache::no_cache()),
));

index.update(data, test_store.as_ref()).await.unwrap();
index.update(data, test_store.as_ref(), None).await.unwrap();

let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache())
.await
Expand Down Expand Up @@ -1699,7 +1700,7 @@ mod tests {
Arc::new(LanceCache::no_cache()),
));

index.update(data, test_store.as_ref()).await.unwrap();
index.update(data, test_store.as_ref(), None).await.unwrap();

let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache())
.await
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/scalar/rtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ impl ScalarIndex for RTreeIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
let bbox_data = RTreeIndexPlugin::convert_bbox_stream(new_data)?;
let tmpdir = Arc::new(TempDir::default());
Expand Down Expand Up @@ -1172,7 +1173,7 @@ mod tests {
Arc::new(UInt64Array::from(new_rowaddr_arr.clone())),
);
rtree_index
.update(stream, new_store.as_ref())
.update(stream, new_store.as_ref(), None)
.await
.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ impl ScalarIndex for ZoneMapIndex {
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
_valid_old_fragments: Option<&RoaringBitmap>,
) -> Result<CreatedIndex> {
// Train new zones for the incoming data stream
let schema = new_data.schema();
Expand Down Expand Up @@ -1111,7 +1112,7 @@ mod tests {
// Directly pass the stream with proper row addresses instead of using MockTrainingSource
// which would regenerate row addresses starting from 0
index
.update(new_data_stream, test_store.as_ref())
.update(new_data_stream, test_store.as_ref(), None)
.await
.unwrap();

Expand Down
Loading