From 2524ec823d3597b3e2e30600a912ba0ff3b2eb46 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 10:58:33 +0800 Subject: [PATCH 01/10] feat: add index segment commit API --- rust/lance-index/Cargo.toml | 1 + rust/lance-index/src/traits.rs | 16 +- rust/lance-table/src/format.rs | 2 +- rust/lance-table/src/format/index.rs | 21 +++ rust/lance/src/dataset.rs | 10 ++ rust/lance/src/dataset/scanner.rs | 5 +- rust/lance/src/index.rs | 232 +++++++++++++++++++++++++- rust/lance/src/index/vector/ivf/v2.rs | 20 ++- 8 files changed, 293 insertions(+), 14 deletions(-) diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 515c58cd53a..7a6190688f7 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -68,6 +68,7 @@ tracing.workspace = true tempfile.workspace = true crossbeam-queue.workspace = true bytes.workspace = true +chrono.workspace = true uuid.workspace = true twox-hash = "2.0" async-channel = "2.3.1" diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 305ec6892c5..03f14457835 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -8,7 +8,7 @@ use datafusion::execution::SendableRecordBatchStream; use lance_core::{Error, Result}; use crate::{IndexParams, IndexType, optimize::OptimizeOptions}; -use lance_table::format::IndexMetadata; +use lance_table::format::{IndexMetadata, IndexSegment}; use uuid::Uuid; /// A set of criteria used to filter potential indices to use for a query @@ -269,6 +269,20 @@ pub trait DatasetIndexExt { /// If the index does not exist, return Error. async fn index_statistics(&self, index_name: &str) -> Result; + /// Commit one or more existing physical index segments as a logical index. + async fn commit_existing_index_segments( + &mut self, + _index_name: &str, + _column: &str, + _segments: Vec, + ) -> Result<()> { + Err(Error::not_supported(format!( + "{} does not support commit_existing_index_segments", + std::any::type_name::() + ))) + } + + #[deprecated(since = "4.0.0", note = "Use commit_existing_index_segments instead")] async fn commit_existing_index( &mut self, index_name: &str, diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index db065199532..66fa0ef026b 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -13,7 +13,7 @@ pub use crate::rowids::version::{ RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, }; pub use fragment::*; -pub use index::IndexMetadata; +pub use index::{IndexMetadata, IndexSegment}; pub use manifest::{ BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader, diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 8db98bcdeba..7dd671ef76c 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -141,3 +141,24 @@ impl From<&IndexMetadata> for pb::IndexMetadata { } } } + +/// A single physical segment of a logical index. +/// +/// Each segment is stored independently and will become one manifest entry when committed. +/// The logical index identity (name / target column / dataset version) is provided separately +/// by the commit API. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegment { + /// Unique ID of the physical segment. + pub uuid: Uuid, + /// The fragments covered by this segment. + pub fragment_bitmap: Option, + /// Metadata specific to the index type. + pub index_details: Option>, + /// The on-disk index version for this segment. + pub index_version: i32, + /// Timestamp when the segment was created. + pub created_at: Option>, + /// Base path identifier when the segment is imported from another dataset. + pub base_id: Option, +} diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6adefc7bf3d..47970ed765f 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1817,6 +1817,16 @@ impl Dataset { .collect() } + /// Iterate over manifest fragments without allocating [`FileFragment`] wrappers. + pub fn iter_fragments(&self) -> impl Iterator { + self.manifest.fragments.iter() + } + + /// Iterate over fragment ids in manifest order. + pub fn iter_fragment_ids(&self) -> impl Iterator + '_ { + self.iter_fragments().map(|f| f.id as u32) + } + pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index cdaa9408281..2c4007dc3b6 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2698,8 +2698,7 @@ impl Scanner { let row_addrs = RowAddrTreeMap::from_iter(u64s); let row_addr_mask = RowAddrMask::from_allowed(row_addrs); let index_result = IndexExprResult::Exact(row_addr_mask); - let fragments_covered = - RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32)); + let fragments_covered = RoaringBitmap::from_iter(self.dataset.iter_fragment_ids()); let batch = index_result.serialize_to_arrow(&fragments_covered)?; let stream = futures::stream::once(async move { Ok(batch) }); let stream = Box::pin(RecordBatchStreamAdapter::new( @@ -4166,7 +4165,7 @@ impl Scanner { if let Some(fragments) = &self.fragments { RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32)) } else { - RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32)) + RoaringBitmap::from_iter(self.dataset.iter_fragment_ids()) } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index ab3876b7977..bd193591f5f 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -55,7 +55,7 @@ use lance_io::utils::{ read_version, }; use lance_table::format::IndexMetadata; -use lance_table::format::{Fragment, SelfDescribingFileReader}; +use lance_table::format::{Fragment, IndexSegment, SelfDescribingFileReader}; use lance_table::io::manifest::read_manifest_indexes; use roaring::RoaringBitmap; use scalar::index_matches_criteria; @@ -759,7 +759,7 @@ impl DatasetIndexExt for Dataset { column: &str, index_id: Uuid, ) -> Result<()> { - let Some(field) = self.schema().field(column) else { + let Some(_field) = self.schema().field(column) else { return Err(Error::index(format!( "CreateIndex: column '{column}' does not exist" ))); @@ -768,22 +768,66 @@ impl DatasetIndexExt for Dataset { // TODO: We will need some way to determine the index details here. Perhaps // we can load the index itself and get the details that way. - let new_idx = IndexMetadata { + let segment = IndexSegment { uuid: index_id, - name: index_name.to_string(), - fields: vec![field.id], - dataset_version: self.manifest.version, - fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()), + fragment_bitmap: Some(self.iter_fragment_ids().collect()), index_details: None, index_version: 0, created_at: Some(chrono::Utc::now()), base_id: None, // New indices don't have base_id (they're not from shallow clone) }; + self.commit_existing_index_segments(index_name, column, vec![segment]) + .await + } + + async fn commit_existing_index_segments( + &mut self, + index_name: &str, + column: &str, + segments: Vec, + ) -> Result<()> { + if segments.is_empty() { + return Err(Error::invalid_input( + "CreateIndex: at least one index segment is required".to_string(), + )); + } + + let Some(field) = self.schema().field(column) else { + return Err(Error::index(format!( + "CreateIndex: column '{column}' does not exist" + ))); + }; + + let mut seen_segment_ids = HashSet::with_capacity(segments.len()); + for segment in &segments { + if !seen_segment_ids.insert(segment.uuid) { + return Err(Error::invalid_input(format!( + "CreateIndex: duplicate segment uuid {} for index '{}'", + segment.uuid, index_name + ))); + } + } + + let new_indices = segments + .into_iter() + .map(|segment| IndexMetadata { + uuid: segment.uuid, + name: index_name.to_string(), + fields: vec![field.id], + dataset_version: self.manifest.version, + fragment_bitmap: segment.fragment_bitmap, + index_details: segment.index_details, + index_version: segment.index_version, + created_at: segment.created_at, + base_id: segment.base_id, + }) + .collect(); + let transaction = Transaction::new( self.manifest.version, Operation::CreateIndex { - new_indices: vec![new_idx], + new_indices, removed_indices: vec![], }, None, @@ -5115,6 +5159,178 @@ mod tests { ); } + #[tokio::test] + async fn test_commit_existing_index_segments_commits_multiple_segments() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(20), BatchCount::from(2)); + + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }), + ) + .await + .unwrap(); + + let seg0 = IndexSegment { + uuid: Uuid::new_v4(), + fragment_bitmap: Some(std::iter::once(0_u32).collect()), + index_details: None, + index_version: 0, + created_at: Some(chrono::Utc::now()), + base_id: None, + }; + let seg1 = IndexSegment { + uuid: Uuid::new_v4(), + fragment_bitmap: Some(std::iter::once(1_u32).collect()), + index_details: None, + index_version: 0, + created_at: Some(chrono::Utc::now()), + base_id: None, + }; + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![seg0.clone(), seg1.clone()], + ) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(committed.len(), 2); + let committed_uuids = committed.iter().map(|idx| idx.uuid).collect::>(); + assert_eq!( + committed_uuids, + HashSet::from([seg0.uuid, seg1.uuid]), + "all committed segment uuids should be preserved" + ); + assert_eq!( + committed + .iter() + .map(|idx| idx + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>()) + .collect::>(), + HashSet::from([vec![0], vec![1]]), + "each committed segment should preserve its fragment coverage" + ); + } + + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_duplicate_segment_ids() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let base = IndexSegment { + uuid: Uuid::new_v4(), + fragment_bitmap: Some(std::iter::once(0_u32).collect()), + index_details: None, + index_version: 0, + created_at: Some(chrono::Utc::now()), + base_id: None, + }; + + let err = dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![ + base.clone(), + IndexSegment { + fragment_bitmap: Some(std::iter::once(1_u32).collect()), + ..base + }, + ], + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("duplicate segment uuid")); + } + + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_empty_segments() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let err = dataset + .commit_existing_index_segments("vector_idx", "vector", vec![]) + .await + .unwrap_err(); + assert!(err.to_string().contains("at least one index segment")); + } + + #[tokio::test] + #[allow(deprecated)] + async fn test_commit_existing_index_wraps_single_segment_commit() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(10), BatchCount::from(1)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let segment_id = Uuid::new_v4(); + + dataset + .commit_existing_index("vector_idx", "vector", segment_id) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(committed.len(), 1); + assert_eq!(committed[0].uuid, segment_id); + } + #[tokio::test] async fn test_resolve_index_column_error_cases() { use lance_datagen::{BatchCount, RowCount, array}; diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 34e33e42e52..c08f8e567cd 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -657,6 +657,7 @@ mod tests { }; use lance_linalg::distance::{DistanceType, multivec_distance}; use lance_linalg::kernels::normalize_fsl; + use lance_table::format::IndexSegment; use lance_testing::datagen::{generate_random_array, generate_random_array_with_range}; use object_store::path::Path; use rand::distr::uniform::SampleUniform; @@ -1460,7 +1461,24 @@ mod tests { .unwrap(); dataset - .commit_existing_index(index_name, "vector", shared_uuid) + .commit_existing_index_segments( + index_name, + "vector", + vec![IndexSegment { + uuid: shared_uuid, + fragment_bitmap: Some( + dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(), + ), + index_details: None, + index_version: 0, + created_at: Some(chrono::Utc::now()), + base_id: None, + }], + ) .await .unwrap(); } From 260ab08192b14fb99a976f725a7ecc1c02823697 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 11:14:59 +0800 Subject: [PATCH 02/10] refactor: reuse dataset fragment bitmap --- rust/lance/src/dataset.rs | 5 ----- rust/lance/src/dataset/mem_wal/memtable/flush.rs | 13 +++---------- rust/lance/src/dataset/scanner.rs | 4 ++-- rust/lance/src/index.rs | 2 +- rust/lance/src/index/create.rs | 8 +------- rust/lance/src/index/vector.rs | 12 +----------- rust/lance/src/index/vector/ivf.rs | 16 ++-------------- rust/lance/src/index/vector/ivf/v2.rs | 8 +------- 8 files changed, 11 insertions(+), 57 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 47970ed765f..3c9653f97fb 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1822,11 +1822,6 @@ impl Dataset { self.manifest.fragments.iter() } - /// Iterate over fragment ids in manifest order. - pub fn iter_fragment_ids(&self) -> impl Iterator + '_ { - self.iter_fragments().map(|f| f.id as u32) - } - pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index eb91e856673..93b6c451fc7 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -247,11 +247,8 @@ impl MemTableFlusher { index_meta.fields = vec![field_idx]; index_meta.dataset_version = dataset.version().version; // Calculate fragment_bitmap from dataset fragments - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = + dataset.fragment_bitmap.as_ref().clone(); index_meta.fragment_bitmap = Some(fragment_ids); // Commit the index to the dataset @@ -467,11 +464,7 @@ impl MemTableFlusher { let schema = dataset.schema(); let field_idx = schema.field(&fts_cfg.column).map(|f| f.id).unwrap_or(0); - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone(); let index_meta = IndexMetadata { uuid: index_uuid, diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 2c4007dc3b6..8002dd69382 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2698,7 +2698,7 @@ impl Scanner { let row_addrs = RowAddrTreeMap::from_iter(u64s); let row_addr_mask = RowAddrMask::from_allowed(row_addrs); let index_result = IndexExprResult::Exact(row_addr_mask); - let fragments_covered = RoaringBitmap::from_iter(self.dataset.iter_fragment_ids()); + let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone(); let batch = index_result.serialize_to_arrow(&fragments_covered)?; let stream = futures::stream::once(async move { Ok(batch) }); let stream = Box::pin(RecordBatchStreamAdapter::new( @@ -4165,7 +4165,7 @@ impl Scanner { if let Some(fragments) = &self.fragments { RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32)) } else { - RoaringBitmap::from_iter(self.dataset.iter_fragment_ids()) + self.dataset.fragment_bitmap.as_ref().clone() } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index bd193591f5f..246137b57e5 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -770,7 +770,7 @@ impl DatasetIndexExt for Dataset { let segment = IndexSegment { uuid: index_id, - fragment_bitmap: Some(self.iter_fragment_ids().collect()), + fragment_bitmap: Some(self.fragment_bitmap.as_ref().clone()), index_details: None, index_version: 0, created_at: Some(chrono::Utc::now()), diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index d5c11c84694..e23cdf2feb2 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -417,13 +417,7 @@ impl<'a> CreateIndexBuilder<'a> { fragment_bitmap: if train { match &self.fragments { Some(fragment_ids) => Some(fragment_ids.iter().collect()), - None => Some( - self.dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + None => Some(self.dataset.fragment_bitmap.as_ref().clone()), } } else { // Empty bitmap for untrained indices diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 3d4eb895f1e..8782d2a8898 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -1581,17 +1581,7 @@ pub async fn initialize_vector_index( )) })?; - let fragment_bitmap = if target_dataset.get_fragments().is_empty() { - Some(roaring::RoaringBitmap::new()) - } else { - Some( - target_dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ) - }; + let fragment_bitmap = Some(target_dataset.fragment_bitmap.as_ref().clone()); let new_idx = IndexMetadata { uuid: new_uuid, diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 908564db4aa..f9f36882b17 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2655,13 +2655,7 @@ mod tests { dataset_version: dataset.version().version, fields: vec![field.id], name: INDEX_NAME.to_string(), - fragment_bitmap: Some( - dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + fragment_bitmap: Some(dataset.fragment_bitmap.as_ref().clone()), index_details: Some(Arc::new(vector_index_details())), index_version: VECTOR_INDEX_VERSION as i32, created_at: Some(chrono::Utc::now()), @@ -2758,13 +2752,7 @@ mod tests { dataset_version: dataset_mut.version().version, fields: vec![field.id], name: format!("{}_remapped", INDEX_NAME), - fragment_bitmap: Some( - dataset_mut - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + fragment_bitmap: Some(dataset_mut.fragment_bitmap.as_ref().clone()), index_details: Some(Arc::new(vector_index_details())), index_version: VECTOR_INDEX_VERSION as i32, created_at: Some(chrono::Utc::now()), diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index c08f8e567cd..107e6615082 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1466,13 +1466,7 @@ mod tests { "vector", vec![IndexSegment { uuid: shared_uuid, - fragment_bitmap: Some( - dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(), - ), + fragment_bitmap: Some(dataset.fragment_bitmap.as_ref().clone()), index_details: None, index_version: 0, created_at: Some(chrono::Utc::now()), From 1ee8aad837f8a9cd4a225fa85e743b04b6886363 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 11:36:04 +0800 Subject: [PATCH 03/10] refactor: narrow index segment API --- rust/lance-index/src/lib.rs | 2 ++ rust/lance-index/src/traits.rs | 4 ++-- rust/lance-index/src/types.rs | 24 +++++++++++++++++++++++ rust/lance-table/src/format.rs | 2 +- rust/lance-table/src/format/index.rs | 21 -------------------- rust/lance/src/index.rs | 28 ++++++++++----------------- rust/lance/src/index/vector/ivf/v2.rs | 7 ++----- 7 files changed, 41 insertions(+), 47 deletions(-) create mode 100644 rust/lance-index/src/types.rs diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 62ae68414a6..5f32a73675c 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -29,9 +29,11 @@ pub mod progress; pub mod registry; pub mod scalar; pub mod traits; +pub mod types; pub mod vector; pub use crate::traits::*; +pub use crate::types::IndexSegment; pub const INDEX_FILE_NAME: &str = "index.idx"; /// The name of the auxiliary index file. diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 03f14457835..bff859c9b76 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -7,8 +7,8 @@ use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use lance_core::{Error, Result}; -use crate::{IndexParams, IndexType, optimize::OptimizeOptions}; -use lance_table::format::{IndexMetadata, IndexSegment}; +use crate::{IndexParams, IndexType, optimize::OptimizeOptions, types::IndexSegment}; +use lance_table::format::IndexMetadata; use uuid::Uuid; /// A set of criteria used to filter potential indices to use for a query diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs new file mode 100644 index 00000000000..2839bdd624b --- /dev/null +++ b/rust/lance-index/src/types.rs @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use roaring::RoaringBitmap; +use uuid::Uuid; + +/// A single physical segment of a logical index. +/// +/// Each segment is stored independently and will become one manifest entry when committed. +/// The logical index identity (name / target column / dataset version) is provided separately +/// by the commit API. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegment { + /// Unique ID of the physical segment. + pub uuid: Uuid, + /// The fragments covered by this segment. + pub fragment_bitmap: RoaringBitmap, + /// Metadata specific to the index type. + pub index_details: Option>, + /// The on-disk index version for this segment. + pub index_version: i32, +} diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index 66fa0ef026b..db065199532 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -13,7 +13,7 @@ pub use crate::rowids::version::{ RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, }; pub use fragment::*; -pub use index::{IndexMetadata, IndexSegment}; +pub use index::IndexMetadata; pub use manifest::{ BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader, diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 7dd671ef76c..8db98bcdeba 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -141,24 +141,3 @@ impl From<&IndexMetadata> for pb::IndexMetadata { } } } - -/// A single physical segment of a logical index. -/// -/// Each segment is stored independently and will become one manifest entry when committed. -/// The logical index identity (name / target column / dataset version) is provided separately -/// by the commit API. -#[derive(Debug, Clone, PartialEq)] -pub struct IndexSegment { - /// Unique ID of the physical segment. - pub uuid: Uuid, - /// The fragments covered by this segment. - pub fragment_bitmap: Option, - /// Metadata specific to the index type. - pub index_details: Option>, - /// The on-disk index version for this segment. - pub index_version: i32, - /// Timestamp when the segment was created. - pub created_at: Option>, - /// Base path identifier when the segment is imported from another dataset. - pub base_id: Option, -} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 246137b57e5..3ae731d09fa 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -42,7 +42,7 @@ use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantize use lance_index::vector::hnsw::HNSW; use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::sq::ScalarQuantizer; -use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, IndexDescription}; +use lance_index::{DatasetIndexExt, INDEX_METADATA_SCHEMA_KEY, IndexDescription, IndexSegment}; use lance_index::{INDEX_FILE_NAME, Index, IndexType, pb, vector::VectorIndex}; use lance_index::{ IndexCriteria, is_system_index, @@ -55,7 +55,7 @@ use lance_io::utils::{ read_version, }; use lance_table::format::IndexMetadata; -use lance_table::format::{Fragment, IndexSegment, SelfDescribingFileReader}; +use lance_table::format::{Fragment, SelfDescribingFileReader}; use lance_table::io::manifest::read_manifest_indexes; use roaring::RoaringBitmap; use scalar::index_matches_criteria; @@ -770,11 +770,9 @@ impl DatasetIndexExt for Dataset { let segment = IndexSegment { uuid: index_id, - fragment_bitmap: Some(self.fragment_bitmap.as_ref().clone()), + fragment_bitmap: self.fragment_bitmap.as_ref().clone(), index_details: None, index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, // New indices don't have base_id (they're not from shallow clone) }; self.commit_existing_index_segments(index_name, column, vec![segment]) @@ -816,11 +814,11 @@ impl DatasetIndexExt for Dataset { name: index_name.to_string(), fields: vec![field.id], dataset_version: self.manifest.version, - fragment_bitmap: segment.fragment_bitmap, + fragment_bitmap: Some(segment.fragment_bitmap), index_details: segment.index_details, index_version: segment.index_version, - created_at: segment.created_at, - base_id: segment.base_id, + created_at: Some(chrono::Utc::now()), + base_id: None, }) .collect(); @@ -5188,19 +5186,15 @@ mod tests { let seg0 = IndexSegment { uuid: Uuid::new_v4(), - fragment_bitmap: Some(std::iter::once(0_u32).collect()), + fragment_bitmap: std::iter::once(0_u32).collect(), index_details: None, index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, }; let seg1 = IndexSegment { uuid: Uuid::new_v4(), - fragment_bitmap: Some(std::iter::once(1_u32).collect()), + fragment_bitmap: std::iter::once(1_u32).collect(), index_details: None, index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, }; dataset @@ -5254,11 +5248,9 @@ mod tests { let base = IndexSegment { uuid: Uuid::new_v4(), - fragment_bitmap: Some(std::iter::once(0_u32).collect()), + fragment_bitmap: std::iter::once(0_u32).collect(), index_details: None, index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, }; let err = dataset @@ -5268,7 +5260,7 @@ mod tests { vec![ base.clone(), IndexSegment { - fragment_bitmap: Some(std::iter::once(1_u32).collect()), + fragment_bitmap: std::iter::once(1_u32).collect(), ..base }, ], diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 107e6615082..8c257353894 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -646,7 +646,7 @@ mod tests { use lance_index::vector::{ pq::storage::ProductQuantizationMetadata, storage::STORAGE_METADATA_KEY, }; - use lance_index::{DatasetIndexExt, IndexType}; + use lance_index::{DatasetIndexExt, IndexSegment, IndexType}; use lance_index::{INDEX_AUXILIARY_FILE_NAME, metrics::NoOpMetricsCollector}; use lance_index::{optimize::OptimizeOptions, scalar::IndexReader}; use lance_index::{scalar::IndexWriter, vector::hnsw::builder::HnswBuildParams}; @@ -657,7 +657,6 @@ mod tests { }; use lance_linalg::distance::{DistanceType, multivec_distance}; use lance_linalg::kernels::normalize_fsl; - use lance_table::format::IndexSegment; use lance_testing::datagen::{generate_random_array, generate_random_array_with_range}; use object_store::path::Path; use rand::distr::uniform::SampleUniform; @@ -1466,11 +1465,9 @@ mod tests { "vector", vec![IndexSegment { uuid: shared_uuid, - fragment_bitmap: Some(dataset.fragment_bitmap.as_ref().clone()), + fragment_bitmap: dataset.fragment_bitmap.as_ref().clone(), index_details: None, index_version: 0, - created_at: Some(chrono::Utc::now()), - base_id: None, }], ) .await From a66d616119a766bacbd78267b820cc9bb1d6b7a3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 11:56:23 +0800 Subject: [PATCH 04/10] build: update binding lockfiles --- java/lance-jni/Cargo.lock | 1 + python/Cargo.lock | 1 + 2 files changed, 2 insertions(+) diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 9beaccc254a..e9e3c42fead 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3633,6 +3633,7 @@ dependencies = [ "bitpacking", "bitvec", "bytes", + "chrono", "crossbeam-queue", "datafusion", "datafusion-common", diff --git a/python/Cargo.lock b/python/Cargo.lock index 38579810684..9e584cc79d4 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4192,6 +4192,7 @@ dependencies = [ "bitpacking", "bitvec", "bytes", + "chrono", "crossbeam-queue", "datafusion", "datafusion-common", From 4b330dee916753df4fc0b4d409a9b163a4712393 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 12:46:25 +0800 Subject: [PATCH 05/10] refactor: encapsulate index segment construction --- python/src/indices.rs | 11 ++++- rust/lance-index/src/types.rs | 68 +++++++++++++++++++++++++-- rust/lance/src/index.rs | 63 +++++++++---------------- rust/lance/src/index/vector/ivf/v2.rs | 10 ++-- 4 files changed, 99 insertions(+), 53 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index 6b8d9e6c001..d53dd3d953b 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -36,7 +36,7 @@ use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::{DatasetIndexExt, IndexDescription}; +use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] @@ -415,7 +415,14 @@ async fn do_load_shuffled_vectors( .infer_error()?; let mut ds = dataset.ds.as_ref().clone(); - ds.commit_existing_index(index_name, column, index_id) + ds.commit_existing_index_segments( + index_name, + column, + vec![IndexSegment::new( + index_id, + ds.iter_fragments().map(|f| f.id as u32), + )], + ) .await .infer_error()?; diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs index 2839bdd624b..29a2cc464ff 100644 --- a/rust/lance-index/src/types.rs +++ b/rust/lance-index/src/types.rs @@ -14,11 +14,71 @@ use uuid::Uuid; #[derive(Debug, Clone, PartialEq)] pub struct IndexSegment { /// Unique ID of the physical segment. - pub uuid: Uuid, + uuid: Uuid, /// The fragments covered by this segment. - pub fragment_bitmap: RoaringBitmap, + fragment_bitmap: RoaringBitmap, /// Metadata specific to the index type. - pub index_details: Option>, + index_details: Option>, /// The on-disk index version for this segment. - pub index_version: i32, + index_version: i32, +} + +impl IndexSegment { + /// Create a segment with the given UUID and fragment coverage. + /// + /// The segment starts without index details and uses index version `0` + /// until additional metadata is attached with the builder-style methods. + pub fn new(uuid: Uuid, fragment_bitmap: I) -> Self + where + I: IntoIterator, + { + Self { + uuid, + fragment_bitmap: fragment_bitmap.into_iter().collect(), + index_details: None, + index_version: 0, + } + } + + /// Attach the serialized index details for this segment. + pub fn with_index_details(mut self, index_details: Arc) -> Self { + self.index_details = Some(index_details); + self + } + + /// Override the on-disk index version for this segment. + pub fn with_index_version(mut self, index_version: i32) -> Self { + self.index_version = index_version; + self + } + + /// Return the UUID of this segment. + pub fn uuid(&self) -> Uuid { + self.uuid + } + + /// Return the fragment coverage of this segment. + pub fn fragment_bitmap(&self) -> &RoaringBitmap { + &self.fragment_bitmap + } + + /// Return the optional serialized index details for this segment. + pub fn index_details(&self) -> Option<&Arc> { + self.index_details.as_ref() + } + + /// Return the on-disk index version for this segment. + pub fn index_version(&self) -> i32 { + self.index_version + } + + /// Consume the segment and return its component parts. + pub fn into_parts(self) -> (Uuid, RoaringBitmap, Option>, i32) { + ( + self.uuid, + self.fragment_bitmap, + self.index_details, + self.index_version, + ) + } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 3ae731d09fa..823a9f4dc1f 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -768,12 +768,7 @@ impl DatasetIndexExt for Dataset { // TODO: We will need some way to determine the index details here. Perhaps // we can load the index itself and get the details that way. - let segment = IndexSegment { - uuid: index_id, - fragment_bitmap: self.fragment_bitmap.as_ref().clone(), - index_details: None, - index_version: 0, - }; + let segment = IndexSegment::new(index_id, self.fragment_bitmap.as_ref().clone()); self.commit_existing_index_segments(index_name, column, vec![segment]) .await @@ -799,26 +794,30 @@ impl DatasetIndexExt for Dataset { let mut seen_segment_ids = HashSet::with_capacity(segments.len()); for segment in &segments { - if !seen_segment_ids.insert(segment.uuid) { + if !seen_segment_ids.insert(segment.uuid()) { return Err(Error::invalid_input(format!( "CreateIndex: duplicate segment uuid {} for index '{}'", - segment.uuid, index_name + segment.uuid(), + index_name ))); } } let new_indices = segments .into_iter() - .map(|segment| IndexMetadata { - uuid: segment.uuid, - name: index_name.to_string(), - fields: vec![field.id], - dataset_version: self.manifest.version, - fragment_bitmap: Some(segment.fragment_bitmap), - index_details: segment.index_details, - index_version: segment.index_version, - created_at: Some(chrono::Utc::now()), - base_id: None, + .map(|segment| { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field.id], + dataset_version: self.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details, + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + } }) .collect(); @@ -5184,18 +5183,8 @@ mod tests { .await .unwrap(); - let seg0 = IndexSegment { - uuid: Uuid::new_v4(), - fragment_bitmap: std::iter::once(0_u32).collect(), - index_details: None, - index_version: 0, - }; - let seg1 = IndexSegment { - uuid: Uuid::new_v4(), - fragment_bitmap: std::iter::once(1_u32).collect(), - index_details: None, - index_version: 0, - }; + let seg0 = IndexSegment::new(Uuid::new_v4(), std::iter::once(0_u32)); + let seg1 = IndexSegment::new(Uuid::new_v4(), std::iter::once(1_u32)); dataset .commit_existing_index_segments( @@ -5211,7 +5200,7 @@ mod tests { let committed_uuids = committed.iter().map(|idx| idx.uuid).collect::>(); assert_eq!( committed_uuids, - HashSet::from([seg0.uuid, seg1.uuid]), + HashSet::from([seg0.uuid(), seg1.uuid()]), "all committed segment uuids should be preserved" ); assert_eq!( @@ -5246,12 +5235,7 @@ mod tests { let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - let base = IndexSegment { - uuid: Uuid::new_v4(), - fragment_bitmap: std::iter::once(0_u32).collect(), - index_details: None, - index_version: 0, - }; + let base = IndexSegment::new(Uuid::new_v4(), std::iter::once(0_u32)); let err = dataset .commit_existing_index_segments( @@ -5259,10 +5243,7 @@ mod tests { "vector", vec![ base.clone(), - IndexSegment { - fragment_bitmap: std::iter::once(1_u32).collect(), - ..base - }, + IndexSegment::new(base.uuid(), std::iter::once(1_u32)), ], ) .await diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 8c257353894..70255434126 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1463,12 +1463,10 @@ mod tests { .commit_existing_index_segments( index_name, "vector", - vec![IndexSegment { - uuid: shared_uuid, - fragment_bitmap: dataset.fragment_bitmap.as_ref().clone(), - index_details: None, - index_version: 0, - }], + vec![IndexSegment::new( + shared_uuid, + dataset.fragment_bitmap.as_ref().clone(), + )], ) .await .unwrap(); From 575670b9d5fc81a70e51a5adf86954c562660a8f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Mar 2026 12:57:31 +0800 Subject: [PATCH 06/10] style: fix python indices formatting --- python/src/indices.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index d53dd3d953b..ff748380951 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -423,8 +423,8 @@ async fn do_load_shuffled_vectors( ds.iter_fragments().map(|f| f.id as u32), )], ) - .await - .infer_error()?; + .await + .infer_error()?; Ok(()) } From ecd23f056be2442290470aba4c58061675e7aafe Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 18 Mar 2026 23:53:54 +0800 Subject: [PATCH 07/10] refactor: tighten index segment follow-ups --- python/src/indices.rs | 2 +- rust/lance/src/dataset.rs | 5 ----- rust/lance/src/dataset/mem_wal/memtable/flush.rs | 13 ++++++++++--- rust/lance/src/index.rs | 6 ------ 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index ff748380951..8b6cd540940 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -420,7 +420,7 @@ async fn do_load_shuffled_vectors( column, vec![IndexSegment::new( index_id, - ds.iter_fragments().map(|f| f.id as u32), + ds.fragments().iter().map(|f| f.id as u32), )], ) .await diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3c9653f97fb..6adefc7bf3d 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1817,11 +1817,6 @@ impl Dataset { .collect() } - /// Iterate over manifest fragments without allocating [`FileFragment`] wrappers. - pub fn iter_fragments(&self) -> impl Iterator { - self.manifest.fragments.iter() - } - pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 93b6c451fc7..eb91e856673 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -247,8 +247,11 @@ impl MemTableFlusher { index_meta.fields = vec![field_idx]; index_meta.dataset_version = dataset.version().version; // Calculate fragment_bitmap from dataset fragments - let fragment_ids: roaring::RoaringBitmap = - dataset.fragment_bitmap.as_ref().clone(); + let fragment_ids: roaring::RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); index_meta.fragment_bitmap = Some(fragment_ids); // Commit the index to the dataset @@ -464,7 +467,11 @@ impl MemTableFlusher { let schema = dataset.schema(); let field_idx = schema.field(&fts_cfg.column).map(|f| f.id).unwrap_or(0); - let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone(); + let fragment_ids: roaring::RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); let index_meta = IndexMetadata { uuid: index_uuid, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 50aff085755..280f43d6703 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -761,12 +761,6 @@ impl DatasetIndexExt for Dataset { column: &str, index_id: Uuid, ) -> Result<()> { - let Some(_field) = self.schema().field(column) else { - return Err(Error::index(format!( - "CreateIndex: column '{column}' does not exist" - ))); - }; - // TODO: We will need some way to determine the index details here. Perhaps // we can load the index itself and get the details that way. From a1a805fb47af0f5e4827c2fc6a6f95575a6634ca Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 19 Mar 2026 00:36:26 +0800 Subject: [PATCH 08/10] refactor: require complete index segment metadata --- python/src/indices.rs | 7 ++- rust/lance-index/src/traits.rs | 9 ---- rust/lance-index/src/types.rs | 39 ++++++-------- rust/lance/src/index.rs | 74 +++++++++------------------ rust/lance/src/index/vector/ivf/v2.rs | 2 + 5 files changed, 48 insertions(+), 83 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index 8b6cd540940..e7a49c598aa 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -36,7 +36,7 @@ use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment}; +use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexType}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] @@ -421,6 +421,11 @@ async fn do_load_shuffled_vectors( vec![IndexSegment::new( index_id, ds.fragments().iter().map(|f| f.id as u32), + Arc::new( + prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default()) + .unwrap(), + ), + IndexType::IvfPq.version() as i32, )], ) .await diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index bff859c9b76..b24955e9f14 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -9,7 +9,6 @@ use lance_core::{Error, Result}; use crate::{IndexParams, IndexType, optimize::OptimizeOptions, types::IndexSegment}; use lance_table::format::IndexMetadata; -use uuid::Uuid; /// A set of criteria used to filter potential indices to use for a query #[derive(Debug, Default)] @@ -282,14 +281,6 @@ pub trait DatasetIndexExt { ))) } - #[deprecated(since = "4.0.0", note = "Use commit_existing_index_segments instead")] - async fn commit_existing_index( - &mut self, - index_name: &str, - column: &str, - index_id: Uuid, - ) -> Result<()>; - async fn read_index_partition( &self, index_name: &str, diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs index 29a2cc464ff..6a991be9932 100644 --- a/rust/lance-index/src/types.rs +++ b/rust/lance-index/src/types.rs @@ -18,40 +18,31 @@ pub struct IndexSegment { /// The fragments covered by this segment. fragment_bitmap: RoaringBitmap, /// Metadata specific to the index type. - index_details: Option>, + index_details: Arc, /// The on-disk index version for this segment. index_version: i32, } impl IndexSegment { - /// Create a segment with the given UUID and fragment coverage. - /// - /// The segment starts without index details and uses index version `0` - /// until additional metadata is attached with the builder-style methods. - pub fn new(uuid: Uuid, fragment_bitmap: I) -> Self + /// Create a fully described segment with the given UUID, fragment coverage, and index + /// metadata. + pub fn new( + uuid: Uuid, + fragment_bitmap: I, + index_details: Arc, + index_version: i32, + ) -> Self where I: IntoIterator, { Self { uuid, fragment_bitmap: fragment_bitmap.into_iter().collect(), - index_details: None, - index_version: 0, + index_details, + index_version, } } - /// Attach the serialized index details for this segment. - pub fn with_index_details(mut self, index_details: Arc) -> Self { - self.index_details = Some(index_details); - self - } - - /// Override the on-disk index version for this segment. - pub fn with_index_version(mut self, index_version: i32) -> Self { - self.index_version = index_version; - self - } - /// Return the UUID of this segment. pub fn uuid(&self) -> Uuid { self.uuid @@ -62,9 +53,9 @@ impl IndexSegment { &self.fragment_bitmap } - /// Return the optional serialized index details for this segment. - pub fn index_details(&self) -> Option<&Arc> { - self.index_details.as_ref() + /// Return the serialized index details for this segment. + pub fn index_details(&self) -> &Arc { + &self.index_details } /// Return the on-disk index version for this segment. @@ -73,7 +64,7 @@ impl IndexSegment { } /// Consume the segment and return its component parts. - pub fn into_parts(self) -> (Uuid, RoaringBitmap, Option>, i32) { + pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc, i32) { ( self.uuid, self.fragment_bitmap, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 280f43d6703..75d6cbbf7c8 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -755,21 +755,6 @@ impl DatasetIndexExt for Dataset { } } - async fn commit_existing_index( - &mut self, - index_name: &str, - column: &str, - index_id: Uuid, - ) -> Result<()> { - // TODO: We will need some way to determine the index details here. Perhaps - // we can load the index itself and get the details that way. - - let segment = IndexSegment::new(index_id, self.fragment_bitmap.as_ref().clone()); - - self.commit_existing_index_segments(index_name, column, vec![segment]) - .await - } - async fn commit_existing_index_segments( &mut self, index_name: &str, @@ -809,7 +794,7 @@ impl DatasetIndexExt for Dataset { fields: vec![field.id], dataset_version: self.manifest.version, fragment_bitmap: Some(fragment_bitmap), - index_details, + index_details: Some(index_details), index_version, created_at: Some(chrono::Utc::now()), base_id: None, @@ -5179,8 +5164,18 @@ mod tests { .await .unwrap(); - let seg0 = IndexSegment::new(Uuid::new_v4(), std::iter::once(0_u32)); - let seg1 = IndexSegment::new(Uuid::new_v4(), std::iter::once(1_u32)); + let seg0 = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(0_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version() as i32, + ); + let seg1 = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(1_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version() as i32, + ); dataset .commit_existing_index_segments( @@ -5231,7 +5226,12 @@ mod tests { let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - let base = IndexSegment::new(Uuid::new_v4(), std::iter::once(0_u32)); + let base = IndexSegment::new( + Uuid::new_v4(), + std::iter::once(0_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version() as i32, + ); let err = dataset .commit_existing_index_segments( @@ -5239,7 +5239,12 @@ mod tests { "vector", vec![ base.clone(), - IndexSegment::new(base.uuid(), std::iter::once(1_u32)), + IndexSegment::new( + base.uuid(), + std::iter::once(1_u32), + Arc::new(vector_index_details()), + IndexType::Vector.version() as i32, + ), ], ) .await @@ -5271,35 +5276,6 @@ mod tests { assert!(err.to_string().contains("at least one index segment")); } - #[tokio::test] - #[allow(deprecated)] - async fn test_commit_existing_index_wraps_single_segment_commit() { - use lance_datagen::{BatchCount, RowCount, array}; - - let test_dir = tempfile::tempdir().unwrap(); - let test_uri = test_dir.path().to_str().unwrap(); - - let reader = lance_datagen::gen_batch() - .col("id", array::step::()) - .col( - "vector", - array::rand_vec::(8.into()), - ) - .into_reader_rows(RowCount::from(10), BatchCount::from(1)); - - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - let segment_id = Uuid::new_v4(); - - dataset - .commit_existing_index("vector_idx", "vector", segment_id) - .await - .unwrap(); - - let committed = dataset.load_indices_by_name("vector_idx").await.unwrap(); - assert_eq!(committed.len(), 1); - assert_eq!(committed[0].uuid, segment_id); - } - #[tokio::test] async fn test_resolve_index_column_error_cases() { use lance_datagen::{BatchCount, RowCount, array}; diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 213a39a2e7a..1a8355f70d6 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1466,6 +1466,8 @@ mod tests { vec![IndexSegment::new( shared_uuid, dataset.fragment_bitmap.as_ref().clone(), + Arc::new(crate::index::vector_index_details()), + IndexType::IvfPq.version() as i32, )], ) .await From 4851fb63b27de5e56ab4dee93532decc2b9199ca Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 19 Mar 2026 00:44:21 +0800 Subject: [PATCH 09/10] refactor: require index segment commit support --- rust/lance-index/src/traits.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 197f2c95153..5ad13861ddc 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -277,15 +277,10 @@ pub trait DatasetIndexExt { /// Commit one or more existing physical index segments as a logical index. async fn commit_existing_index_segments( &mut self, - _index_name: &str, - _column: &str, - _segments: Vec, - ) -> Result<()> { - Err(Error::not_supported(format!( - "{} does not support commit_existing_index_segments", - std::any::type_name::() - ))) - } + index_name: &str, + column: &str, + segments: Vec, + ) -> Result<()>; async fn read_index_partition( &self, From 975c1f5a582d3d14944f4097c4886723eea61ffb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 19 Mar 2026 00:51:11 +0800 Subject: [PATCH 10/10] fix: remove unnecessary index version casts --- python/src/indices.rs | 2 +- rust/lance/src/index.rs | 8 ++++---- rust/lance/src/index/vector/ivf/v2.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index 2364eb2b01f..805c84ec6b9 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -426,7 +426,7 @@ async fn do_load_shuffled_vectors( prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default()) .unwrap(), ), - IndexType::IvfPq.version() as i32, + IndexType::IvfPq.version(), )], ) .await diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index f51abc75ba9..fdc91760a2e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -5208,13 +5208,13 @@ mod tests { Uuid::new_v4(), std::iter::once(0_u32), Arc::new(vector_index_details()), - IndexType::Vector.version() as i32, + IndexType::Vector.version(), ); let seg1 = IndexSegment::new( Uuid::new_v4(), std::iter::once(1_u32), Arc::new(vector_index_details()), - IndexType::Vector.version() as i32, + IndexType::Vector.version(), ); dataset @@ -5270,7 +5270,7 @@ mod tests { Uuid::new_v4(), std::iter::once(0_u32), Arc::new(vector_index_details()), - IndexType::Vector.version() as i32, + IndexType::Vector.version(), ); let err = dataset @@ -5283,7 +5283,7 @@ mod tests { base.uuid(), std::iter::once(1_u32), Arc::new(vector_index_details()), - IndexType::Vector.version() as i32, + IndexType::Vector.version(), ), ], ) diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 58a278b0e4f..0bdc0389648 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1474,7 +1474,7 @@ mod tests { shared_uuid, dataset.fragment_bitmap.as_ref().clone(), Arc::new(crate::index::vector_index_details()), - IndexType::IvfPq.version() as i32, + IndexType::IvfPq.version(), )], ) .await