diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 9beaccc254a..e9e3c42fead 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3633,6 +3633,7 @@ dependencies = [ "bitpacking", "bitvec", "bytes", + "chrono", "crossbeam-queue", "datafusion", "datafusion-common", diff --git a/python/Cargo.lock b/python/Cargo.lock index 38579810684..9e584cc79d4 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4192,6 +4192,7 @@ dependencies = [ "bitpacking", "bitvec", "bytes", + "chrono", "crossbeam-queue", "datafusion", "datafusion-common", diff --git a/python/src/indices.rs b/python/src/indices.rs index 7428bf0680d..805c84ec6b9 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -37,7 +37,7 @@ use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::{DatasetIndexExt, IndexDescription}; +use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexType}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] @@ -416,9 +416,21 @@ async fn do_load_shuffled_vectors( .infer_error()?; let mut ds = dataset.ds.as_ref().clone(); - ds.commit_existing_index(index_name, column, index_id) - .await - .infer_error()?; + ds.commit_existing_index_segments( + index_name, + column, + vec![IndexSegment::new( + index_id, + ds.fragments().iter().map(|f| f.id as u32), + Arc::new( + prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default()) + .unwrap(), + ), + IndexType::IvfPq.version(), + )], + ) + .await + .infer_error()?; Ok(()) } diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 515c58cd53a..7a6190688f7 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -68,6 +68,7 @@ tracing.workspace = true tempfile.workspace = true crossbeam-queue.workspace = true bytes.workspace = true +chrono.workspace = true uuid.workspace = true twox-hash = "2.0" async-channel = "2.3.1" diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 62ae68414a6..5f32a73675c 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -29,9 +29,11 @@ pub mod progress; pub mod registry; pub mod scalar; pub mod traits; +pub mod types; pub mod vector; pub use crate::traits::*; +pub use crate::types::IndexSegment; pub const INDEX_FILE_NAME: &str = "index.idx"; /// The name of the auxiliary index file. diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index cb211854df2..5ad13861ddc 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -7,9 +7,8 @@ use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use lance_core::{Error, Result}; -use crate::{IndexParams, IndexType, optimize::OptimizeOptions}; +use crate::{IndexParams, IndexType, optimize::OptimizeOptions, types::IndexSegment}; use lance_table::format::IndexMetadata; -use uuid::Uuid; /// A set of criteria used to filter potential indices to use for a query #[derive(Debug, Default)] @@ -275,11 +274,12 @@ pub trait DatasetIndexExt { /// If the index does not exist, return Error. async fn index_statistics(&self, index_name: &str) -> Result; - async fn commit_existing_index( + /// Commit one or more existing physical index segments as a logical index. + async fn commit_existing_index_segments( &mut self, index_name: &str, column: &str, - index_id: Uuid, + segments: Vec, ) -> Result<()>; async fn read_index_partition( diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs new file mode 100644 index 00000000000..6a991be9932 --- /dev/null +++ b/rust/lance-index/src/types.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use roaring::RoaringBitmap; +use uuid::Uuid; + +/// A single physical segment of a logical index. +/// +/// Each segment is stored independently and will become one manifest entry when committed. +/// The logical index identity (name / target column / dataset version) is provided separately +/// by the commit API. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegment { + /// Unique ID of the physical segment. + uuid: Uuid, + /// The fragments covered by this segment. + fragment_bitmap: RoaringBitmap, + /// Metadata specific to the index type. + index_details: Arc, + /// The on-disk index version for this segment. + index_version: i32, +} + +impl IndexSegment { + /// Create a fully described segment with the given UUID, fragment coverage, and index + /// metadata. + pub fn new( + uuid: Uuid, + fragment_bitmap: I, + index_details: Arc, + index_version: i32, + ) -> Self + where + I: IntoIterator, + { + Self { + uuid, + fragment_bitmap: fragment_bitmap.into_iter().collect(), + index_details, + index_version, + } + } + + /// Return the UUID of this segment. + pub fn uuid(&self) -> Uuid { + self.uuid + } + + /// Return the fragment coverage of this segment. + pub fn fragment_bitmap(&self) -> &RoaringBitmap { + &self.fragment_bitmap + } + + /// Return the serialized index details for this segment. + pub fn index_details(&self) -> &Arc { + &self.index_details + } + + /// Return the on-disk index version for this segment. + pub fn index_version(&self) -> i32 { + self.index_version + } + + /// Consume the segment and return its component parts. + pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc, i32) { + ( + self.uuid, + self.fragment_bitmap, + self.index_details, + self.index_version, + ) + } +} diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 6cadcb1f7a7..325bb87af66 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2698,8 +2698,7 @@ impl Scanner { let row_addrs = RowAddrTreeMap::from_iter(u64s); let row_addr_mask = RowAddrMask::from_allowed(row_addrs); let index_result = IndexExprResult::Exact(row_addr_mask); - let fragments_covered = - RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32)); + let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone(); let batch = index_result.serialize_to_arrow(&fragments_covered)?; let stream = futures::stream::once(async move { Ok(batch) }); let stream = Box::pin(RecordBatchStreamAdapter::new( @@ -4172,7 +4171,7 @@ impl Scanner { if let Some(fragments) = &self.fragments { RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32)) } else { - RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32)) + self.dataset.fragment_bitmap.as_ref().clone() } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 9ae326ad7e6..fdc91760a2e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -42,7 +42,7 @@ use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantize use lance_index::vector::hnsw::HNSW; use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::sq::ScalarQuantizer; -use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, IndexDescription}; +use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, IndexDescription, IndexSegment}; use lance_index::{INDEX_FILE_NAME, Index, IndexType, pb, vector::VectorIndex}; use lance_index::{ IndexCriteria, is_system_index, @@ -774,38 +774,58 @@ impl DatasetIndexExt for Dataset { } } - async fn commit_existing_index( + async fn commit_existing_index_segments( &mut self, index_name: &str, column: &str, - index_id: Uuid, + segments: Vec, ) -> Result<()> { + if segments.is_empty() { + return Err(Error::invalid_input( + "CreateIndex: at least one index segment is required".to_string(), + )); + } + let Some(field) = self.schema().field(column) else { return Err(Error::index(format!( "CreateIndex: column '{column}' does not exist" ))); }; - // TODO: We will need some way to determine the index details here. Perhaps - // we can load the index itself and get the details that way. - - let new_idx = IndexMetadata { - uuid: index_id, - name: index_name.to_string(), - fields: vec![field.id], - dataset_version: self.manifest.version, - fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()), - index_details: None, - index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, // New indices don't have base_id (they're not from shallow clone) - files: None, // File info will be populated when index is created - }; + let mut seen_segment_ids = HashSet::with_capacity(segments.len()); + for segment in &segments { + if !seen_segment_ids.insert(segment.uuid()) { + return Err(Error::invalid_input(format!( + "CreateIndex: duplicate segment uuid {} for index '{}'", + segment.uuid(), + index_name + ))); + } + } + + let new_indices = segments + .into_iter() + .map(|segment| { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field.id], + dataset_version: self.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: None, // File info will be populated when index is created + } + }) + .collect(); let transaction = Transaction::new( self.manifest.version, Operation::CreateIndex { - new_indices: vec![new_idx], + new_indices, removed_indices: vec![], }, None, @@ -5157,6 +5177,145 @@ mod tests { ); } + #[tokio::test] + async fn test_commit_existing_index_segments_commits_multiple_segments() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(20), BatchCount::from(2)); + + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }), + ) + .await + .unwrap(); + + let seg0 = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(0_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ); + let seg1 = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(1_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ); + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![seg0.clone(), seg1.clone()], + ) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(committed.len(), 2); + let committed_uuids = committed.iter().map(|idx| idx.uuid).collect::>(); + assert_eq!( + committed_uuids, + HashSet::from([seg0.uuid(), seg1.uuid()]), + "all committed segment uuids should be preserved" + ); + assert_eq!( + committed + .iter() + .map(|idx| idx + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>()) + .collect::>(), + HashSet::from([vec![0], vec![1]]), + "each committed segment should preserve its fragment coverage" + ); + } + + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_duplicate_segment_ids() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let base = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(0_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ); + + let err = dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![ + base.clone(), + IndexSegment::new( + base.uuid(), + std::iter::once(1_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ), + ], + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("duplicate segment uuid")); + } + + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_empty_segments() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let err = dataset + .commit_existing_index_segments("vector_idx", "vector", vec![]) + .await + .unwrap_err(); + assert!(err.to_string().contains("at least one index segment")); + } + #[tokio::test] async fn test_resolve_index_column_error_cases() { use lance_datagen::{BatchCount, RowCount, array}; diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 562f2ed7df4..2678cb22471 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -427,13 +427,7 @@ impl<'a> CreateIndexBuilder<'a> { fragment_bitmap: if train { match &self.fragments { Some(fragment_ids) => Some(fragment_ids.iter().collect()), - None => Some( - self.dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + None => Some(self.dataset.fragment_bitmap.as_ref().clone()), } } else { // Empty bitmap for untrained indices diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index ba0611c462d..0d0ad64993a 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -1585,17 +1585,7 @@ pub async fn initialize_vector_index( )) })?; - let fragment_bitmap = if target_dataset.get_fragments().is_empty() { - Some(roaring::RoaringBitmap::new()) - } else { - Some( - target_dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ) - }; + let fragment_bitmap = Some(target_dataset.fragment_bitmap.as_ref().clone()); let new_idx = IndexMetadata { uuid: new_uuid, diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index ecbcde56b35..621773c60c0 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2656,13 +2656,7 @@ mod tests { dataset_version: dataset.version().version, fields: vec![field.id], name: INDEX_NAME.to_string(), - fragment_bitmap: Some( - dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + fragment_bitmap: Some(dataset.fragment_bitmap.as_ref().clone()), index_details: Some(Arc::new(vector_index_details())), index_version: VECTOR_INDEX_VERSION as i32, created_at: Some(chrono::Utc::now()), @@ -2761,13 +2755,7 @@ mod tests { dataset_version: dataset_mut.version().version, fields: vec![field.id], name: format!("{}_remapped", INDEX_NAME), - fragment_bitmap: Some( - dataset_mut - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + fragment_bitmap: Some(dataset_mut.fragment_bitmap.as_ref().clone()), index_details: Some(Arc::new(vector_index_details())), index_version: VECTOR_INDEX_VERSION as i32, created_at: Some(chrono::Utc::now()), diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index b214c4704e3..0bdc0389648 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -653,7 +653,7 @@ mod tests { use lance_index::vector::{ pq::storage::ProductQuantizationMetadata, storage::STORAGE_METADATA_KEY, }; - use lance_index::{DatasetIndexExt, IndexType}; + use lance_index::{DatasetIndexExt, IndexSegment, IndexType}; use lance_index::{INDEX_AUXILIARY_FILE_NAME, metrics::NoOpMetricsCollector}; use lance_index::{optimize::OptimizeOptions, scalar::IndexReader}; use lance_index::{scalar::IndexWriter, vector::hnsw::builder::HnswBuildParams}; @@ -1467,7 +1467,16 @@ mod tests { .unwrap(); dataset - .commit_existing_index(index_name, "vector", shared_uuid) + .commit_existing_index_segments( + index_name, + "vector", + vec![IndexSegment::new( + shared_uuid, + dataset.fragment_bitmap.as_ref().clone(), + Arc::new(crate::index::vector_index_details()), + IndexType::IvfPq.version(), + )], + ) .await .unwrap(); }