From a86274da2b1ea2bb94ea41185792f420faec1899 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 20 Mar 2026 23:34:59 +0800 Subject: [PATCH 1/8] test: cover distributed vector segment build --- python/python/tests/test_vector_index.py | 52 ++ rust/lance/src/index/create.rs | 315 +++++++++++- rust/lance/src/index/vector/ivf/v2.rs | 581 ++++++++++++++++++++++- 3 files changed, 931 insertions(+), 17 deletions(-) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index cf3f38158b8..b38a5764fdc 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2750,6 +2750,58 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): assert 0 < len(results) <= 5 +def test_index_segment_builder_builds_vector_segments(tmp_path): + ds = _make_sample_dataset_base(tmp_path, "segment_builder_ds", 2000, 128) + frags = ds.get_fragments() + assert len(frags) >= 2 + shared_uuid = str(uuid.uuid4()) + + builder = IndicesBuilder(ds, "vector") + preprocessed = builder.prepare_global_ivf_pq( + num_partitions=4, + num_subvectors=4, + distance_type="l2", + sample_rate=7, + max_iters=20, + ) + + partial_indices = [] + for fragment in frags[:2]: + partial_indices.append( + ds._ds.create_index( + ["vector"], + "IVF_FLAT", + "vector_idx", + False, + True, + None, + { + "fragment_ids": [fragment.fragment_id], + "index_uuid": shared_uuid, + "num_partitions": 4, + "num_sub_vectors": 128, + "ivf_centroids": preprocessed["ivf_centroids"], + "pq_codebook": preprocessed["pq_codebook"], + }, + ) + ) + + segment_builder = ds.create_index_segment_builder(shared_uuid).with_partial_indices( + partial_indices + ) + plans = segment_builder.plan() + assert len(plans) == 2 + assert all(len(plan.partial_indices) == 1 for plan in plans) + + segments = segment_builder.build_all() + assert len(segments) == 2 + ds = ds.commit_existing_index_segments("vector_idx", "vector", segments) + + q = np.random.rand(128).astype(np.float32) + results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) + assert 0 < len(results) <= 5 + + def test_distributed_ivf_pq_order_invariance(tmp_path: Path): """Ensure distributed IVF_PQ build is invariant to shard build order.""" ds = _make_sample_dataset_base(tmp_path, "dist_ds", 2000, 128) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 2678cb22471..7edfbf4c6b8 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -498,17 +498,24 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { mod tests { use super::*; use crate::dataset::{WriteMode, WriteParams}; + use crate::index::DatasetIndexExt; use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow::datatypes::{Float32Type, Int32Type}; - use arrow_array::RecordBatchIterator; + use arrow_array::cast::AsArray; + use arrow_array::{FixedSizeListArray, RecordBatchIterator}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_arrow::FixedSizeListArrayExt; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{self, gen_batch}; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; - use lance_linalg::distance::MetricType; + use lance_index::vector::hnsw::builder::HnswBuildParams; + use lance_index::vector::ivf::IvfBuildParams; + use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; + use lance_linalg::distance::{DistanceType, MetricType}; use std::sync::Arc; + use uuid::Uuid; #[test] fn test_inverted_training_params_include_build_only_fields() { @@ -759,6 +766,39 @@ mod tests { .unwrap() } + async fn prepare_vector_ivf(dataset: &Dataset, vector_column: &str) -> IvfBuildParams { + let batch = dataset + .scan() + .project(&[vector_column.to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch + .column_by_name(vector_column) + .expect("vector column should exist") + .as_fixed_size_list(); + let dim = vectors.value_length() as usize; + let values = vectors.values().as_primitive::(); + + let kmeans = train_kmeans::( + values, + KMeansParams::new(None, 10, 1, DistanceType::L2), + dim, + 4, + 3, + ) + .unwrap(); + let centroids = Arc::new( + FixedSizeListArray::try_new_from_values( + kmeans.centroids.as_primitive::().clone(), + dim as i32, + ) + .unwrap(), + ); + IvfBuildParams::try_with_centroids(4, centroids).unwrap() + } + #[tokio::test] async fn test_execute_uncommitted() { // Test the complete workflow that covers the user's specified code pattern: @@ -922,6 +962,277 @@ mod tests { assert_eq!(all_covered_fragments, expected_fragments); } + #[tokio::test] + async fn test_merge_index_metadata_vector_preserves_shared_uuid_commit_workflow() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + + for fragment in &fragments { + let mut builder = + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()); + builder.execute_uncommitted().await.unwrap(); + } + + dataset + .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfFlat, None) + .await + .unwrap(); + + let merged_root = dataset + .indices_dir() + .child(shared_uuid.to_string()) + .child(crate::index::INDEX_FILE_NAME); + assert!(dataset.object_store().exists(&merged_root).await.unwrap()); + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![IndexSegment::new( + shared_uuid, + fragments.iter().map(|fragment| fragment.id() as u32), + Arc::new(vector_index_details()), + IndexType::IvfFlat.version(), + )], + ) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0].uuid, shared_uuid); + let committed_fragments: Vec = indices[0] + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect(); + assert_eq!( + committed_fragments, + fragments + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>() + ); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } + + #[tokio::test] + async fn test_index_segment_builder_vector_commits_multi_segment_logical_index() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + let mut partial_indices = Vec::new(); + + for fragment in fragments.iter().take(2) { + let index_metadata = + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + partial_indices.push(index_metadata); + } + + let segments = dataset + .create_index_segment_builder(shared_uuid.to_string()) + .with_partial_indices(partial_indices) + .build_all() + .await + .unwrap(); + assert_eq!(segments.len(), 2); + + dataset + .commit_existing_index_segments("vector_idx", "vector", segments) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 2); + let mut committed_fragment_sets = indices + .iter() + .map(|metadata| { + metadata + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>() + }) + .collect::>(); + committed_fragment_sets.sort(); + assert_eq!(committed_fragment_sets, vec![vec![0], vec![1]]); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } + + #[tokio::test] + async fn test_commit_existing_index_supports_local_hnsw_segments() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(128), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let uuid = Uuid::new_v4(); + let params = VectorIndexParams::ivf_hnsw( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + HnswBuildParams::default(), + ); + + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![IndexSegment::new( + uuid, + dataset.fragment_bitmap.as_ref().clone(), + Arc::new(vector_index_details()), + IndexType::IvfHnswFlat.version(), + )], + ) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0].uuid, uuid); + assert_eq!( + indices[0].fragment_bitmap.as_ref().unwrap(), + dataset.fragment_bitmap.as_ref() + ); + } + #[tokio::test] async fn test_optimize_should_not_removes_delta_indices() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0bdc0389648..6cab3f591bd 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -618,6 +618,7 @@ mod tests { }; use arrow_buffer::OffsetBuffer; use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use futures::StreamExt; use itertools::Itertools; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::bq::{ @@ -627,8 +628,8 @@ mod tests { use crate::dataset::{InsertBuilder, UpdateBuilder, WriteMode, WriteParams}; use crate::index::DatasetIndexInternalExt; - use crate::index::vector::ivf::finalize_distributed_merge; use crate::index::vector::ivf::v2::IvfPq; + use crate::index::vector::ivf::{build_staging_segment, plan_staging_segments}; use crate::utils::test::copy_test_data_to_tmp; use crate::{ Dataset, @@ -664,6 +665,7 @@ mod tests { }; use lance_linalg::distance::{DistanceType, multivec_distance}; use lance_linalg::kernels::normalize_fsl; + use lance_table::format::IndexMetadata; use lance_testing::datagen::{generate_random_array, generate_random_array_with_range}; use object_store::path::Path; use rand::distr::uniform::SampleUniform; @@ -1437,6 +1439,64 @@ mod tests { (ivf_params, pq_params) } + async fn prepare_global_ivf(dataset: &Dataset, vector_column: &str) -> IvfBuildParams { + let batch = dataset + .scan() + .project(&[vector_column.to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch + .column_by_name(vector_column) + .expect("vector column should exist") + .as_fixed_size_list(); + + let dim = vectors.value_length() as usize; + assert_eq!(dim, TWO_FRAG_DIM, "unexpected vector dimension"); + + let values = vectors.values().as_primitive::(); + let kmeans_params = KMeansParams::new(None, TWO_FRAG_MAX_ITERS, 1, DistanceType::L2); + let kmeans = train_kmeans::( + values, + kmeans_params, + dim, + TWO_FRAG_NUM_PARTITIONS, + TWO_FRAG_SAMPLE_RATE, + ) + .unwrap(); + + let centroids_flat = kmeans.centroids.as_primitive::().clone(); + let centroids_fsl = + Arc::new(FixedSizeListArray::try_new_from_values(centroids_flat, dim as i32).unwrap()); + let mut ivf_params = + IvfBuildParams::try_with_centroids(TWO_FRAG_NUM_PARTITIONS, centroids_fsl).unwrap(); + ivf_params.max_iters = TWO_FRAG_MAX_ITERS as usize; + ivf_params.sample_rate = TWO_FRAG_SAMPLE_RATE; + ivf_params + } + + async fn build_distributed_partial_index_for_fragment_groups( + dataset: &mut Dataset, + fragment_groups: Vec>, // each group is a set of fragment ids + params: &VectorIndexParams, + index_name: &str, + ) -> (Uuid, Vec) { + let shared_uuid = Uuid::new_v4(); + let mut partial_indices = Vec::new(); + + for fragments in fragment_groups { + let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, params); + builder = builder + .name(index_name.to_string()) + .fragments(fragments) + .index_uuid(shared_uuid.to_string()); + partial_indices.push(builder.execute_uncommitted().await.unwrap()); + } + + (shared_uuid, partial_indices) + } + async fn build_ivfpq_for_fragment_groups( dataset: &mut Dataset, fragment_groups: Vec>, // each group is a set of fragment ids @@ -1457,12 +1517,11 @@ mod tests { .name(index_name.to_string()) .fragments(fragments) .index_uuid(shared_uuid.to_string()); - // Build partial index shards without committing to manifest. builder.execute_uncommitted().await.unwrap(); } - let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); - finalize_distributed_merge(dataset.object_store(), &index_dir, Some(IndexType::IvfPq)) + dataset + .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfPq, None) .await .unwrap(); @@ -1485,7 +1544,6 @@ mod tests { let idx_a = &stats_a["indices"][0]; let idx_b = &stats_b["indices"][0]; - // Centroids: same shape and values (within tolerance). let centroids_a = idx_a["centroids"] .as_array() .expect("centroids should be an array"); @@ -1517,7 +1575,6 @@ mod tests { } } - // Partitions sizes. let parts_a = idx_a["partitions"] .as_array() .expect("partitions should be an array"); @@ -1536,6 +1593,94 @@ mod tests { assert_eq!(sizes_a, sizes_b, "partition sizes mismatch"); } + async fn load_staging_shard_uuids(dataset: &Dataset, shared_uuid: Uuid) -> Vec { + let mut shard_uuids = dataset + .object_store() + .read_dir(dataset.indices_dir().child(shared_uuid.to_string())) + .await + .unwrap() + .into_iter() + .filter(|name| name.starts_with("partial_")) + .map(|name| Uuid::parse_str(name.trim_start_matches("partial_")).unwrap()) + .collect::>(); + shard_uuids.sort(); + shard_uuids + } + + /// Reconstruct the caller-side shard contract used by the tests. + /// + /// Production callers are expected to already know this mapping from + /// distributed scheduling state. The test helper rebuilds it by inspecting + /// which `partial_*` directories were created and pairing them with the + /// fragment groups we originally assigned. + async fn build_partial_indices( + dataset: &Dataset, + shared_uuid: Uuid, + fragment_groups: &[Vec], + ) -> Vec { + let shard_uuids = load_staging_shard_uuids(dataset, shared_uuid).await; + assert_eq!(shard_uuids.len(), fragment_groups.len()); + let mut partial_indices = Vec::with_capacity(shard_uuids.len()); + for (shard_uuid, fragment_group) in shard_uuids.into_iter().zip(fragment_groups.iter()) { + let partial_dir = dataset + .indices_dir() + .child(shared_uuid.to_string()) + .child(format!("partial_{}", shard_uuid)); + let mut estimated_bytes = 0_u64; + let mut files = dataset.object_store().list(Some(partial_dir)); + while let Some(item) = files.next().await { + estimated_bytes += item.unwrap().size; + } + partial_indices.push(IndexMetadata { + uuid: shard_uuid, + name: String::new(), + fields: Vec::new(), + dataset_version: dataset.version().version, + fragment_bitmap: Some(fragment_group.iter().copied().collect()), + index_details: None, + index_version: IndexType::Vector.version(), + created_at: None, + base_id: None, + files: Some(vec![lance_table::format::IndexFile { + path: String::new(), + size_bytes: estimated_bytes, + }]), + }); + } + partial_indices + } + + /// Execute the internal staged segment-build workflow used by the + /// regression tests: plan segment groups from caller-provided shard + /// metadata, build each segment, and publish them as one logical index. + async fn build_distributed_segments( + dataset: &mut Dataset, + shared_uuid: Uuid, + partial_indices: &[IndexMetadata], + target_segment_bytes: Option, + index_name: &str, + ) -> Vec { + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let segment_plans = + plan_staging_segments(&index_dir, partial_indices, None, target_segment_bytes) + .await + .unwrap(); + let mut segments = Vec::with_capacity(segment_plans.len()); + for plan in &segment_plans { + segments.push( + build_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments(index_name, "vector", segments.clone()) + .await + .unwrap(); + + segments + } + #[tokio::test] async fn test_ivfpq_recall_performance_on_two_frags_single_vs_split() { const INDEX_NAME: &str = "vector_idx"; @@ -1543,7 +1688,6 @@ mod tests { let test_dir = TempStrDir::default(); let base_uri = test_dir.as_str(); - // Generate the data once, then write it twice to two independent dataset URIs. let (schema, batches) = make_two_fragment_batches(); let ds_single_uri = format!("{}/single", base_uri); @@ -1553,7 +1697,6 @@ mod tests { write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; - // Ensure we have at least 2 fragments. let fragments_single = ds_single.get_fragments(); assert!( fragments_single.len() >= 2, @@ -1567,10 +1710,8 @@ mod tests { fragments_split.len() ); - // Pretrain global IVF centroids and PQ codebook. let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; - // Build single index using two fragments in one distributed build. let group_single = vec![ fragments_single[0].id() as u32, fragments_single[1].id() as u32, @@ -1584,7 +1725,6 @@ mod tests { ) .await; - // Build split index: one fragment per distributed build, then merge. let group0 = vec![fragments_split[0].id() as u32]; let group1 = vec![fragments_split[1].id() as u32]; build_ivfpq_for_fragment_groups( @@ -1596,14 +1736,12 @@ mod tests { ) .await; - // Compare IVF layout via index statistics. let stats_single_json = ds_single.index_statistics(INDEX_NAME).await.unwrap(); let stats_split_json = ds_split.index_statistics(INDEX_NAME).await.unwrap(); let stats_single: serde_json::Value = serde_json::from_str(&stats_single_json).unwrap(); let stats_split: serde_json::Value = serde_json::from_str(&stats_split_json).unwrap(); assert_ivf_layout_equal(&stats_single, &stats_split); - // Compare row id sets per partition. let ctx_single = load_vector_index_context(&ds_single, "vector", INDEX_NAME).await; let ctx_split = load_vector_index_context(&ds_split, "vector", INDEX_NAME).await; @@ -1624,7 +1762,6 @@ mod tests { ); } - // Compare Top-K row ids on a deterministic set of queries. const K: usize = 10; const NUM_QUERIES: usize = 10; @@ -1653,7 +1790,6 @@ mod tests { ids_per_query } - // Collect a deterministic query set from ds_single. let query_batch = ds_single .scan() .project(&["vector"] as &[&str]) @@ -1677,6 +1813,421 @@ mod tests { ); } + #[rstest] + #[case::ivf_flat(IndexType::IvfFlat)] + #[case::ivf_pq(IndexType::IvfPq)] + #[case::ivf_sq(IndexType::IvfSq)] + #[tokio::test] + async fn test_distributed_vector_build_commits_multiple_segments_and_preserves_query_results( + #[case] index_type: IndexType, + ) { + const INDEX_NAME: &str = "vector_idx"; + const K: usize = 10; + const NUM_QUERIES: usize = 10; + + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + + // Generate the data once, then write it twice to two independent dataset URIs. + let (schema, batches) = make_two_fragment_batches(); + + let ds_single_uri = format!("{}/single", base_uri); + let ds_split_uri = format!("{}/split", base_uri); + + let mut ds_single = + write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; + let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; + + // Ensure we have at least 2 fragments. + let fragments_single = ds_single.get_fragments(); + assert!( + fragments_single.len() >= 2, + "expected at least 2 fragments in ds_single, got {}", + fragments_single.len() + ); + let fragments_split = ds_split.get_fragments(); + assert!( + fragments_split.len() >= 2, + "expected at least 2 fragments in ds_split, got {}", + fragments_split.len() + ); + + let distributed_params = match index_type { + IndexType::IvfFlat => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_flat_params(DistanceType::L2, ivf_params) + } + IndexType::IvfPq => { + let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; + VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params) + } + IndexType::IvfSq => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_sq_params( + DistanceType::L2, + ivf_params, + SQBuildParams::default(), + ) + } + other => panic!("unsupported test index type: {}", other), + }; + + ds_single + .create_index( + &["vector"], + IndexType::Vector, + Some(INDEX_NAME.to_string()), + &distributed_params, + true, + ) + .await + .unwrap(); + + let fragment_groups = fragments_split + .iter() + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let expected_segment_count = fragment_groups.len(); + let (shared_uuid, partial_indices) = build_distributed_partial_index_for_fragment_groups( + &mut ds_split, + fragment_groups, + &distributed_params, + INDEX_NAME, + ) + .await; + let segments = build_distributed_segments( + &mut ds_split, + shared_uuid, + &partial_indices, + None, + INDEX_NAME, + ) + .await; + assert_eq!(segments.len(), expected_segment_count); + let staging_dir = ds_split.indices_dir().child(shared_uuid.to_string()); + let staging_entries = ds_split.object_store().read_dir(staging_dir).await.unwrap(); + assert!( + staging_entries + .iter() + .all(|entry| !entry.starts_with("partial_")), + "built segments should clean up consumed partial shards", + ); + + let committed_segments = ds_split.load_indices_by_name(INDEX_NAME).await.unwrap(); + assert_eq!(committed_segments.len(), expected_segment_count); + for committed in committed_segments { + let covered_fragments = committed + .fragment_bitmap + .as_ref() + .expect("distributed segment should have fragment coverage"); + assert_eq!(covered_fragments.len(), 1); + } + + async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { + let mut ids_per_query = Vec::with_capacity(queries.len()); + for q in queries { + let result = ds + .scan() + .with_row_id() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), K) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let row_ids = result[ROW_ID] + .as_primitive::() + .values() + .iter() + .copied() + .collect::>(); + ids_per_query.push(row_ids); + } + ids_per_query + } + + // Collect a deterministic query set from ds_single. + let query_batch = ds_single + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(NUM_QUERIES as i64), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = query_batch["vector"].as_fixed_size_list(); + let queries: Vec> = (0..vectors.len()) + .map(|i| vectors.value(i) as Arc) + .collect(); + + let ids_single = collect_row_ids(&ds_single, &queries).await; + let ids_split = collect_row_ids(&ds_split, &queries).await; + + assert_eq!( + ids_single, ids_split, + "single vs segmented distributed index returned different Top-K row ids", + ); + } + + #[rstest] + #[case::ivf_flat(IndexType::IvfFlat)] + #[case::ivf_pq(IndexType::IvfPq)] + #[case::ivf_sq(IndexType::IvfSq)] + #[tokio::test] + async fn test_distributed_vector_grouped_build_allows_concurrent_group_execution( + #[case] index_type: IndexType, + ) { + const INDEX_NAME: &str = "grouped_idx"; + const K: usize = 10; + const NUM_QUERIES: usize = 10; + + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + + let (schema, batches) = make_two_fragment_batches(); + let ds_single_uri = format!("{}/grouped_single", base_uri); + let ds_split_uri = format!("{}/grouped_split", base_uri); + + let mut ds_single = + write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; + let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; + + let distributed_params = match index_type { + IndexType::IvfFlat => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_flat_params(DistanceType::L2, ivf_params) + } + IndexType::IvfPq => { + let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; + VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params) + } + IndexType::IvfSq => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_sq_params( + DistanceType::L2, + ivf_params, + SQBuildParams::default(), + ) + } + other => panic!("unsupported test index type: {}", other), + }; + + ds_single + .create_index( + &["vector"], + IndexType::Vector, + Some(INDEX_NAME.to_string()), + &distributed_params, + true, + ) + .await + .unwrap(); + + let fragment_groups = ds_split + .get_fragments() + .into_iter() + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let (shared_uuid, partial_indices) = build_distributed_partial_index_for_fragment_groups( + &mut ds_split, + fragment_groups, + &distributed_params, + INDEX_NAME, + ) + .await; + + let index_dir = ds_split.indices_dir().child(shared_uuid.to_string()); + let shard_plan = plan_staging_segments(&index_dir, &partial_indices, None, None) + .await + .unwrap(); + let shard_count = shard_plan.len(); + assert!(shard_count >= 4); + let target_segment_bytes = shard_plan[0].estimated_bytes().saturating_mul(2); + + let grouped_plan = plan_staging_segments( + &index_dir, + &partial_indices, + None, + Some(target_segment_bytes), + ) + .await + .unwrap(); + assert!(grouped_plan.len() < shard_count); + assert!( + grouped_plan + .iter() + .any(|plan| plan.partial_indices().len() > 1) + ); + + let grouped_segments = build_distributed_segments( + &mut ds_split, + shared_uuid, + &partial_indices, + Some(target_segment_bytes), + INDEX_NAME, + ) + .await; + assert_eq!(grouped_segments.len(), grouped_plan.len()); + + async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { + let mut ids_per_query = Vec::with_capacity(queries.len()); + for q in queries { + let result = ds + .scan() + .with_row_id() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), K) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + ids_per_query.push( + result[ROW_ID] + .as_primitive::() + .values() + .iter() + .copied() + .collect(), + ); + } + ids_per_query + } + + let query_batch = ds_single + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(NUM_QUERIES as i64), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = query_batch["vector"].as_fixed_size_list(); + let queries: Vec> = (0..vectors.len()) + .map(|i| vectors.value(i) as Arc) + .collect(); + + let ids_single = collect_row_ids(&ds_single, &queries).await; + let ids_split = collect_row_ids(&ds_split, &queries).await; + assert_eq!(ids_single, ids_split); + } + + #[tokio::test] + async fn test_distributed_vector_plan_rejects_overlapping_fragment_coverage() { + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + let (schema, batches) = make_two_fragment_batches(); + let dataset_uri = format!("{}/overlap_fragments", base_uri); + let mut dataset = write_dataset_from_batches(&dataset_uri, schema, batches).await; + + let fragment = dataset.get_fragments()[0].id() as u32; + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_global_ivf(&dataset, "vector").await, + ); + + for _ in 0..2 { + dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + } + + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let partial_indices = + build_partial_indices(&dataset, shared_uuid, &[vec![fragment], vec![fragment]]).await; + let err = plan_staging_segments(&index_dir, &partial_indices, None, None) + .await + .unwrap_err(); + assert!(err.to_string().contains("overlapping fragment coverage")); + } + + #[tokio::test] + async fn test_distributed_vector_build_supports_hnsw_variants() { + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + let (schema, batches) = make_two_fragment_batches(); + let dataset_uri = format!("{}/distributed_hnsw_supported", base_uri); + let mut dataset = write_dataset_from_batches(&dataset_uri, schema, batches).await; + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::ivf_hnsw( + DistanceType::L2, + prepare_global_ivf(&dataset, "vector").await, + HnswBuildParams::default(), + ); + + for fragment in fragments.iter().take(2) { + dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + } + + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let fragment_groups = fragments + .iter() + .take(2) + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let partial_indices = build_partial_indices(&dataset, shared_uuid, &fragment_groups).await; + let plans = plan_staging_segments(&index_dir, &partial_indices, None, Some(1)) + .await + .unwrap(); + assert_eq!(plans.len(), fragments.iter().take(2).count()); + + let mut segments = Vec::with_capacity(plans.len()); + for plan in &plans { + segments.push( + build_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + .await + .unwrap(), + ); + } + assert_eq!(segments.len(), plans.len()); + + dataset + .commit_existing_index_segments("vector_idx", "vector", segments) + .await + .unwrap(); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } async fn test_index( params: VectorIndexParams, nlist: usize, From 1e5f0e15b1f3349805f3e3fa58466d7ce0576edd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 20 Mar 2026 23:35:25 +0800 Subject: [PATCH 2/8] refactor: internalize distributed vector segment build --- .../src/vector/distributed/index_merger.rs | 105 ++- rust/lance-index/src/vector/sq/storage.rs | 2 +- rust/lance/src/dataset.rs | 59 +- .../src/dataset/mem_wal/memtable/flush.rs | 13 +- rust/lance/src/index/vector.rs | 182 +++--- rust/lance/src/index/vector/ivf.rs | 601 +++++++++++++----- 6 files changed, 620 insertions(+), 342 deletions(-) diff --git a/rust/lance-index/src/vector/distributed/index_merger.rs b/rust/lance-index/src/vector/distributed/index_merger.rs index 0632a693342..2edcf3e0707 100755 --- a/rust/lance-index/src/vector/distributed/index_merger.rs +++ b/rust/lance-index/src/vector/distributed/index_merger.rs @@ -602,53 +602,26 @@ async fn read_shard_window_partitions( Ok(per_partition_batches) } -/// Merge all partial_* vector index auxiliary files under `index_dir/{uuid}/partial_*/auxiliary.idx` -/// into `index_dir/{uuid}/auxiliary.idx`. +/// Merge the selected partial-shard auxiliary files into `target_dir`. /// -/// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, IVF_HNSW_SQ storage types. -/// For PQ and SQ, this assumes all partial indices share the same quantizer/codebook -/// and distance type; it will reuse the first encountered metadata. +/// This is the storage merge kernel for vector staged segment build. Callers +/// choose which partial shards belong to one built segment and pass the corresponding +/// auxiliary files here. The merge writes one unified `auxiliary.idx` into +/// `target_dir`. +/// +/// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, and +/// IVF_HNSW_SQ storage types. For PQ and SQ, this assumes all selected partial +/// shards share the same quantizer/codebook and distance type; it reuses the +/// first encountered metadata. pub async fn merge_partial_vector_auxiliary_files( object_store: &lance_io::object_store::ObjectStore, - index_dir: &object_store::path::Path, + aux_paths: &[object_store::path::Path], + target_dir: &object_store::path::Path, ) -> Result<()> { - let mut aux_paths: Vec = Vec::new(); - let mut stream = object_store.list(Some(index_dir.clone())); - while let Some(item) = stream.next().await { - if let Ok(meta) = item - && let Some(fname) = meta.location.filename() - && fname == INDEX_AUXILIARY_FILE_NAME - { - // Check parent dir name starts with partial_ - let parts: Vec<_> = meta.location.parts().collect(); - if parts.len() >= 2 { - let pname = parts[parts.len() - 2].as_ref(); - if pname.starts_with("partial_") { - aux_paths.push(meta.location.clone()); - } - } - } - } - if aux_paths.is_empty() { - // If a unified auxiliary file already exists at the root, no merge is required. - let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); - if object_store.exists(&aux_out).await.unwrap_or(false) { - log::warn!( - "No partial_* auxiliary files found under index dir: {}, but unified auxiliary file already exists; skipping merge", - index_dir - ); - return Ok(()); - } - // For certain index types (e.g., FLAT/HNSW-only) the merge may be a no-op in distributed setups - // where shards were committed directly. In such cases, proceed without error to avoid blocking - // index manifest merge. PQ/SQ variants still require merging artifacts and will be handled by - // downstream open logic if missing. - log::warn!( - "No partial_* auxiliary files found under index dir: {}; proceeding without merge for index types that do not require auxiliary shards", - index_dir - ); - return Ok(()); + return Err(Error::index( + "No partial auxiliary files were selected for merge".to_string(), + )); } // Prepare IVF model and storage metadata aggregation @@ -661,7 +634,7 @@ pub async fn merge_partial_vector_auxiliary_files( let mut format_version: Option = None; // Prepare output path; we'll create writer once when we know schema - let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); + let aux_out = target_dir.child(INDEX_AUXILIARY_FILE_NAME); // We'll delay creating the V2 writer until we know the vector schema (dim and quantizer type) let mut v2w_opt: Option = None; @@ -682,7 +655,7 @@ pub async fn merge_partial_vector_auxiliary_files( let mut shard_infos: Vec = Vec::new(); // Iterate over each shard auxiliary file and merge its metadata and collect lengths - for aux in &aux_paths { + for aux in aux_paths { let fh = sched.open_file(aux, &CachedFileSize::unknown()).await?; let reader = V2Reader::try_open( fh, @@ -1417,9 +1390,13 @@ mod tests { .await .unwrap(); - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await + .unwrap(); let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); assert!(object_store.exists(&aux_out).await.unwrap()); @@ -1515,7 +1492,12 @@ mod tests { .await .unwrap(); - let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await; + let res = merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await; match res { Err(Error::Index { message, .. }) => { assert!( @@ -1690,9 +1672,13 @@ mod tests { .unwrap(); // Merge PQ auxiliary files. - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await + .unwrap(); // 3) Unified auxiliary file exists. let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); @@ -1818,7 +1804,12 @@ mod tests { .await .unwrap(); - let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await; + let res = merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await; match res { Err(Error::Index { message, .. }) => { assert!( @@ -1893,9 +1884,13 @@ mod tests { .unwrap(); // Merge must succeed and produce a unified auxiliary file. - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux_a.clone(), aux_b.clone()], + &index_dir, + ) + .await + .unwrap(); let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); assert!(object_store.exists(&aux_out).await.unwrap()); diff --git a/rust/lance-index/src/vector/sq/storage.rs b/rust/lance-index/src/vector/sq/storage.rs index 8f6bcea6c1a..8311c20acaa 100644 --- a/rust/lance-index/src/vector/sq/storage.rs +++ b/rust/lance-index/src/vector/sq/storage.rs @@ -36,7 +36,7 @@ use crate::{ pub const SQ_METADATA_KEY: &str = "lance:sq"; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ScalarQuantizationMetadata { pub dim: usize, pub num_bits: u16, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6adefc7bf3d..201d5b1f738 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1817,6 +1817,11 @@ impl Dataset { .collect() } + /// Iterate over manifest fragments without allocating [`FileFragment`] wrappers. + pub fn iter_fragments(&self) -> impl Iterator { + self.manifest.fragments.iter() + } + pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self @@ -2658,6 +2663,7 @@ impl Dataset { self.merge_impl(stream, left_on, right_on).await } + /// Merge a staged distributed index into a single root artifact. pub async fn merge_index_metadata( &self, index_uuid: &str, @@ -2688,14 +2694,59 @@ impl Dataset { } // Precise vector index types: IVF_FLAT, IVF_PQ, IVF_SQ IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { - // Merge distributed vector index partials and finalize root index via Lance IVF helper - crate::index::vector::ivf::finalize_distributed_merge( - self.object_store(), + let mut partial_indices = self + .object_store() + .read_dir(index_dir.clone()) + .await? + .into_iter() + .filter(|name| name.starts_with("partial_")) + .map(|name| { + name.strip_prefix("partial_") + .ok_or_else(|| { + Error::index(format!( + "Distributed vector shard '{}' does not start with 'partial_'", + name + )) + }) + .and_then(|shard_uuid| { + uuid::Uuid::parse_str(shard_uuid).map_err(|err| { + Error::index(format!( + "Distributed vector shard '{}' does not end with a valid UUID: {}", + name, err + )) + }) + }) + .map(|shard_uuid| IndexMetadata { + uuid: shard_uuid, + name: String::new(), + fields: Vec::new(), + dataset_version: self.manifest.version, + fragment_bitmap: Some(RoaringBitmap::new()), + index_details: None, + index_version: index_type.version(), + created_at: None, + base_id: None, + files: Some(Vec::new()), + }) + }) + .collect::>>()?; + partial_indices.sort_by_key(|index| index.uuid); + let segment_plans = crate::index::vector::ivf::plan_staging_segments( &index_dir, + &partial_indices, Some(index_type), + None, ) .await?; - Ok(()) + let merged_plan = + crate::index::vector::ivf::collapse_segment_plans(&segment_plans)?; + crate::index::vector::ivf::build_staging_segment( + self.object_store(), + &self.indices_dir(), + &merged_plan, + ) + .await + .map(|_| ()) } _ => Err(Error::invalid_input_source(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index ba2344fafaf..c3b955fc265 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -247,11 +247,8 @@ impl MemTableFlusher { index_meta.fields = vec![field_idx]; index_meta.dataset_version = dataset.version().version; // Calculate fragment_bitmap from dataset fragments - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = + dataset.fragment_bitmap.as_ref().clone(); index_meta.fragment_bitmap = Some(fragment_ids); // Commit the index to the dataset @@ -467,11 +464,7 @@ impl MemTableFlusher { let schema = dataset.schema(); let field_idx = schema.field(&fts_cfg.column).map(|f| f.id).unwrap_or(0); - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone(); let index_meta = IndexMetadata { uuid: index_uuid, diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 0d0ad64993a..c0c957a7fcf 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -35,7 +35,7 @@ use object_store::path::Path; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::quantizer::QuantizationType; -use lance_index::vector::v3::shuffler::create_ivf_shuffler; +use lance_index::vector::v3::shuffler::{Shuffler, create_ivf_shuffler}; use lance_index::vector::v3::subindex::SubIndexType; use lance_index::vector::{ VectorIndex, @@ -329,56 +329,51 @@ impl IndexParams for VectorIndexParams { } } -/// Build a Distributed Vector Index for specific fragments -#[allow(clippy::too_many_arguments)] -#[instrument(level = "debug", skip(dataset))] -pub(crate) async fn build_distributed_vector_index( +/// Prepare the shared build inputs used by both direct local builds and +/// staged shard builds. +/// +/// These paths emit different file layouts, but they follow the same rules for +/// validating the vector column, deriving the effective index type, sizing IVF +/// partitions, and constructing the shuffler. +async fn prepare_vector_segment_build( dataset: &Dataset, column: &str, - _name: &str, - uuid: &str, params: &VectorIndexParams, - frag_reuse_index: Option>, - fragment_ids: &[u32], progress: Arc, -) -> Result<()> { + mode: &str, + require_precomputed_ivf: bool, +) -> Result<(DataType, IndexType, IvfBuildParams, Box)> { let stages = ¶ms.stages; if stages.is_empty() { - return Err(Error::index( - "Build Distributed Vector Index: must have at least 1 stage".to_string(), - )); - }; + return Err(Error::index(format!("{mode}: must have at least 1 stage"))); + } let StageParams::Ivf(ivf_params0) = &stages[0] else { return Err(Error::index(format!( - "Build Distributed Vector Index: invalid stages: {:?}", + "{mode}: invalid stages: {:?}", stages ))); }; - if ivf_params0.centroids.is_none() { - return Err(Error::index( - "Build Distributed Vector Index: missing precomputed IVF centroids; \ - please provide IvfBuildParams.centroids \ - for concurrent distributed create_index" - .to_string(), - )); + if require_precomputed_ivf && ivf_params0.centroids.is_none() { + return Err(Error::index(format!( + "{mode}: missing precomputed IVF centroids; please provide \ + IvfBuildParams.centroids for distributed segment build" + ))); } let (vector_type, element_type) = get_vector_type(dataset.schema(), column)?; if let DataType::List(_) = vector_type && params.metric_type != DistanceType::Cosine { - return Err(Error::index( - "Build Distributed Vector Index: multivector type supports only cosine distance" - .to_string(), - )); + return Err(Error::index(format!( + "{mode}: multivector type supports only cosine distance" + ))); } let num_rows = dataset.count_rows(None).await?; let index_type = params.index_type(); - let num_partitions = ivf_params0.num_partitions.unwrap_or_else(|| { recommended_num_partitions( num_rows, @@ -387,37 +382,58 @@ pub(crate) async fn build_distributed_vector_index( .unwrap_or(index_type.target_partition_size()), ) }); - let mut ivf_params = ivf_params0.clone(); ivf_params.num_partitions = Some(num_partitions); - let ivf_centroids = ivf_params - .centroids - .as_ref() - .expect("precomputed IVF centroids required for distributed indexing; checked above") - .as_ref() - .clone(); - let format_version = dataset_format_version(dataset); - let temp_dir = TempStdDir::default(); let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; let shuffler = create_ivf_shuffler( temp_dir_path, num_partitions, format_version, - Some(progress.clone()), + Some(progress), ); + Ok((element_type, index_type, ivf_params, shuffler)) +} + +/// Build a Distributed Vector Index for specific fragments +#[allow(clippy::too_many_arguments)] +#[instrument(level = "debug", skip(dataset))] +pub(crate) async fn build_distributed_vector_index( + dataset: &Dataset, + column: &str, + _name: &str, + uuid: &str, + params: &VectorIndexParams, + frag_reuse_index: Option>, + fragment_ids: &[u32], + progress: Arc, +) -> Result { + let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Distributed Vector Index", + true, + ) + .await?; + let stages = ¶ms.stages; + + let ivf_centroids = ivf_params + .centroids + .as_ref() + .expect("precomputed IVF centroids required for distributed indexing; checked above") + .as_ref() + .clone(); + let filtered_dataset = dataset.clone(); let out_base = dataset.indices_dir().child(uuid); - - let make_partial_index_dir = |out_base: &Path| -> Path { - let shard_uuid = Uuid::new_v4(); - out_base.child(format!("partial_{}", shard_uuid)) - }; - let new_index_dir = || make_partial_index_dir(&out_base); + let shard_uuid = Uuid::new_v4(); + let index_dir = out_base.child(format!("partial_{}", shard_uuid)); let fragment_filter = fragment_ids.to_vec(); @@ -458,13 +474,12 @@ pub(crate) async fn build_distributed_vector_index( match index_type { IndexType::IvfFlat => match element_type { DataType::Float16 | DataType::Float32 | DataType::Float64 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -479,13 +494,12 @@ pub(crate) async fn build_distributed_vector_index( .await?; } DataType::UInt8 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -523,14 +537,13 @@ pub(crate) async fn build_distributed_vector_index( )); } IndexFileVersion::V3 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); let global_pq = make_global_pq(pq_params)?; IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -559,12 +572,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -586,12 +597,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -619,14 +628,13 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); let global_pq = make_global_pq(pq_params)?; IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -659,12 +667,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -694,7 +700,7 @@ pub(crate) async fn build_distributed_vector_index( } }; - Ok(()) + Ok(shard_uuid) } /// Build a Vector Index @@ -708,53 +714,17 @@ pub(crate) async fn build_vector_index( frag_reuse_index: Option>, progress: Arc, ) -> Result<()> { + let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Vector Index", + false, + ) + .await?; let stages = ¶ms.stages; - if stages.is_empty() { - return Err(Error::index( - "Build Vector Index: must have at least 1 stage".to_string(), - )); - }; - - let StageParams::Ivf(ivf_params) = &stages[0] else { - return Err(Error::index(format!( - "Build Vector Index: invalid stages: {:?}", - stages - ))); - }; - - let (vector_type, element_type) = get_vector_type(dataset.schema(), column)?; - if let DataType::List(_) = vector_type - && params.metric_type != DistanceType::Cosine - { - return Err(Error::index( - "Build Vector Index: multivector type supports only cosine distance".to_string(), - )); - } - - let num_rows = dataset.count_rows(None).await?; - let index_type = params.index_type(); - let num_partitions = ivf_params.num_partitions.unwrap_or_else(|| { - recommended_num_partitions( - num_rows, - ivf_params - .target_partition_size - .unwrap_or(index_type.target_partition_size()), - ) - }); - let mut ivf_params = ivf_params.clone(); - ivf_params.num_partitions = Some(num_partitions); - - let format_version = dataset_format_version(dataset); - - let temp_dir = TempStdDir::default(); - let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; - let shuffler = create_ivf_shuffler( - temp_dir_path, - num_partitions, - format_version, - Some(progress.clone()), - ); match index_type { IndexType::IvfFlat => match element_type { DataType::Float16 | DataType::Float32 | DataType::Float64 => { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 621773c60c0..d780bed3f61 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -56,14 +56,16 @@ use lance_index::vector::bq::builder::RabitQuantizer; use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer}; use lance_index::vector::hnsw::HnswMetadata; use lance_index::vector::hnsw::builder::HNSW_METADATA_KEY; -use lance_index::vector::ivf::storage::{IVF_METADATA_KEY, IvfModel}; +use lance_index::vector::ivf::storage::IVF_METADATA_KEY; +use lance_index::vector::ivf::storage::IvfModel; use lance_index::vector::kmeans::KMeansParams; use lance_index::vector::pq::storage::transpose; use lance_index::vector::quantizer::QuantizationType; use lance_index::vector::v3::shuffler::create_ivf_shuffler; use lance_index::vector::v3::subindex::{IvfSubIndex, SubIndexType}; use lance_index::{ - INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, Index, IndexMetadata, IndexType, + INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, Index, IndexMetadata, IndexSegment, + IndexSegmentPlan, IndexType, optimize::OptimizeOptions, vector::{ Query, VectorIndex, @@ -88,6 +90,7 @@ use lance_io::{ }; use lance_linalg::distance::{DistanceType, Dot, L2, MetricType}; use lance_linalg::{distance::Normalize, kernels::normalize_fsl_owned}; +use lance_table::format::IndexMetadata as TableIndexMetadata; use log::{info, warn}; use object_store::path::Path; use prost::Message; @@ -1857,29 +1860,438 @@ async fn write_ivf_hnsw_file( Ok(()) } -/// Finalize distributed merge for IVF-based vector indices. +/// Distributed vector segment build uses three storage-level concepts: /// -/// This helper merges partial auxiliary index files produced by distributed -/// jobs into a unified `auxiliary.idx` and then creates a root `index.idx` -/// using the v2 index format so that `open_vector_index_v2` can load it. +/// - A **staging root** is the shared UUID directory used during distributed +/// shard build. Each worker writes one `partial_/` directory under this +/// root by calling `execute_uncommitted()` with the same `index_uuid`. +/// - A **partial shard** is one such worker output. The caller provides the +/// `IndexMetadata` returned by `execute_uncommitted()` so the planner knows +/// shard UUIDs, fragment coverage, and approximate shard sizes. +/// - A **built segment** is a physical index segment that can be committed into +/// the manifest with `commit_existing_index_segments(...)`. /// -/// The caller must pass `index_dir` pointing at the index UUID directory -/// (e.g. `/indices/`). `requested_index_type` is only used as -/// a fallback when the unified auxiliary file does not contain index -/// metadata. -pub async fn finalize_distributed_merge( - object_store: &ObjectStore, - index_dir: &object_store::path::Path, +/// The staged segment-build path is therefore: +/// +/// 1. workers build `partial_*` shards under one staging root +/// 2. the caller groups those shards into one or more built segments +/// 3. each segment is built from its selected shards +/// 4. the resulting physical segments are committed as one logical index +/// +/// A single merge work item produced from one staging root. +/// +/// Each plan says: +/// - which staging root it belongs to +/// - which partial shards should be consumed together +/// - what the built segment metadata should look like +/// +/// The planner returns a `Vec` so callers can decide +/// whether to execute the work serially or fan it out externally. + +/// Plan how one staging root should be turned into built physical segments. +/// +/// This function does not touch storage. It only: +/// - validates that the caller-supplied shard contract is self-consistent +/// - enforces that shard fragment coverage is disjoint +/// - groups shards into built segments according to `target_segment_bytes` +/// +/// The grouping rule is intentionally simple: +/// - `target_segment_bytes = None`: keep the shard boundary, so each shard becomes one segment +/// - `target_segment_bytes = Some(limit)`: greedily pack consecutive shards until the next shard +/// would exceed `limit` +/// +/// Callers that want one built segment for the entire staging root should pass a +/// sufficiently large `target_segment_bytes`. +pub(crate) async fn plan_staging_segments( + index_dir: &Path, + partial_indices: &[TableIndexMetadata], requested_index_type: Option, + target_segment_bytes: Option, +) -> Result> { + if let Some(index_type) = requested_index_type + && !matches!( + index_type, + IndexType::IvfFlat + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq + | IndexType::Vector + ) + { + return Err(Error::invalid_input(format!( + "Unsupported distributed vector segment build type: {}", + index_type + ))); + } + + if let Some(0) = target_segment_bytes { + return Err(Error::invalid_input( + "target_segment_bytes must be greater than zero".to_string(), + )); + } + + if partial_indices.is_empty() { + return Err(Error::index(format!( + "No partial index metadata was provided for '{}'", + index_dir + ))); + } + + let mut sorted_partial_indices = partial_indices.to_vec(); + sorted_partial_indices.sort_by_key(|index| index.uuid); + let mut expected_shard_ids = HashSet::with_capacity(sorted_partial_indices.len()); + for partial_index in &sorted_partial_indices { + if !expected_shard_ids.insert(partial_index.uuid) { + return Err(Error::index(format!( + "Distributed vector partial shard '{}' was provided more than once", + partial_index.uuid + ))); + } + } + + let staging_index_uuid = index_dir + .filename() + .ok_or_else(|| Error::index(format!("Index directory '{}' has no filename", index_dir))) + .and_then(|name| { + Uuid::parse_str(name).map_err(|err| { + Error::index(format!( + "Index directory '{}' does not end with a valid UUID: {}", + index_dir, err + )) + }) + })?; + + let mut covered_fragments = RoaringBitmap::new(); + for partial_index in &sorted_partial_indices { + let fragment_bitmap = partial_index.fragment_bitmap.as_ref().ok_or_else(|| { + Error::index(format!( + "Partial index '{}' is missing fragment coverage", + partial_index.uuid + )) + })?; + if covered_fragments.intersection_len(fragment_bitmap) > 0 { + return Err(Error::index( + "Distributed vector shards have overlapping fragment coverage".to_string(), + )); + } + covered_fragments |= fragment_bitmap.clone(); + } + + if target_segment_bytes.is_none() { + return sorted_partial_indices + .into_iter() + .map(|partial_index| { + build_segment_plan( + staging_index_uuid, + vec![partial_index], + requested_index_type, + ) + }) + .collect(); + } + + let target_segment_bytes = target_segment_bytes.unwrap(); + let mut plans = Vec::new(); + let mut current_group = Vec::new(); + let mut current_bytes = 0_u64; + + for partial_index in sorted_partial_indices { + let partial_bytes = estimate_partial_index_bytes(&partial_index); + if !current_group.is_empty() + && current_bytes.saturating_add(partial_bytes) > target_segment_bytes + { + plans.push(build_segment_plan( + staging_index_uuid, + std::mem::take(&mut current_group), + requested_index_type, + )?); + current_bytes = 0; + } + current_bytes = current_bytes.saturating_add(partial_bytes); + current_group.push(partial_index); + } + + if !current_group.is_empty() { + plans.push(build_segment_plan( + staging_index_uuid, + current_group, + requested_index_type, + )?); + } + + Ok(plans) +} + +/// Build one planned segment into its output directory. +/// +/// Most plans write directly to `indices//`. If the target +/// directory is also the staging root, we first write into a temporary +/// directory and then swap the final files back into place. +/// +/// This is similar in shape to a compaction step: several temporary shard +/// outputs are consumed and replaced by a new built physical segment. The +/// difference is that this operates on index shard outputs instead of data +/// fragments. +pub(crate) async fn build_staging_segment( + object_store: &ObjectStore, + indices_dir: &Path, + segment_plan: &IndexSegmentPlan, +) -> Result { + let built_segment = segment_plan.segment().clone(); + let final_dir = indices_dir.child(built_segment.uuid().to_string()); + let staging_dir = indices_dir.child(segment_plan.staging_index_uuid().to_string()); + if final_dir == staging_dir { + let temp_dir = indices_dir.child(Uuid::new_v4().to_string()); + build_staging_segment_to_dir(object_store, indices_dir, &temp_dir, segment_plan, false) + .await?; + + // Re-materializing back into the staging root is not atomic: we delete + // and rewrite the root files one by one because object stores do not + // offer a directory rename primitive. We only clean up the source + // `partial_*` shards after these copies succeed, so a crash here can + // leave a partial root artifact but should not destroy the source data. + for file_name in [INDEX_FILE_NAME, INDEX_AUXILIARY_FILE_NAME] { + let target_file = final_dir.child(file_name); + // ObjectStore::copy is additive. Remove any previous root artifact first so + // re-materialization back into the staging root does not leave stale files + // behind. + if object_store.exists(&target_file).await? { + object_store.delete(&target_file).await?; + } + let source_file = temp_dir.child(file_name); + if object_store.exists(&source_file).await? { + object_store.copy(&source_file, &target_file).await?; + } + } + + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + reset_final_segment_dir(object_store, &temp_dir).await?; + } else { + build_staging_segment_to_dir(object_store, indices_dir, &final_dir, segment_plan, true) + .await?; + } + + Ok(built_segment) +} + +/// Write one built segment into `final_dir`. +/// +/// For a single-shard plan this is just a file copy. For a multi-shard plan we +/// read the selected `partial_*` shards directly from the staging root and +/// write the merged auxiliary/index files into `final_dir`. +async fn build_staging_segment_to_dir( + object_store: &ObjectStore, + indices_dir: &Path, + final_dir: &Path, + segment_plan: &IndexSegmentPlan, + cleanup_source_shards: bool, ) -> Result<()> { - // Merge per-shard auxiliary files into a unified auxiliary.idx. + reset_final_segment_dir(object_store, final_dir).await?; + + let partial_indices = segment_plan.partial_indices(); + if partial_indices.len() == 1 { + let source_dir = indices_dir + .child(segment_plan.staging_index_uuid().to_string()) + .child(format!("partial_{}", partial_indices[0].uuid)); + copy_partial_segment_contents(object_store, &source_dir, final_dir).await?; + if cleanup_source_shards { + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + } + return Ok(()); + } + + let staging_root = indices_dir.child(segment_plan.staging_index_uuid().to_string()); + let aux_paths = partial_indices + .iter() + .map(|partial_index| { + staging_root + .child(format!("partial_{}", partial_index.uuid)) + .child(INDEX_AUXILIARY_FILE_NAME) + }) + .collect::>(); + let partial_index_paths = partial_indices + .iter() + .map(|partial_index| { + staging_root + .child(format!("partial_{}", partial_index.uuid)) + .child(INDEX_FILE_NAME) + }) + .collect::>(); + lance_index::vector::distributed::index_merger::merge_partial_vector_auxiliary_files( object_store, - index_dir, + &aux_paths, + final_dir, + ) + .await?; + write_root_vector_index_from_auxiliary( + object_store, + final_dir, + segment_plan.requested_index_type(), + &partial_index_paths, ) .await?; + if cleanup_source_shards { + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + } + + Ok(()) +} + +/// Collapse one group of staging shards into a single built-segment plan. +fn build_segment_plan( + staging_index_uuid: Uuid, + group: Vec, + requested_index_type: Option, +) -> Result { + debug_assert!(!group.is_empty()); + let first = &group[0]; + let mut fragment_bitmap = RoaringBitmap::new(); + let mut estimated_bytes = 0_u64; + let mut partial_indices = Vec::with_capacity(group.len()); + + for partial_index in &group { + let partial_fragment_bitmap = partial_index.fragment_bitmap.as_ref().ok_or_else(|| { + Error::index(format!( + "Partial index '{}' is missing fragment coverage", + partial_index.uuid + )) + })?; + fragment_bitmap |= partial_fragment_bitmap.clone(); + estimated_bytes = + estimated_bytes.saturating_add(estimate_partial_index_bytes(partial_index)); + partial_indices.push(partial_index.clone()); + } - // Open the unified auxiliary file. + let final_uuid = if group.len() == 1 { + first.uuid + } else { + Uuid::new_v4() + }; + let index_type = requested_index_type.unwrap_or(IndexType::Vector); + let segment = IndexSegment::new( + final_uuid, + fragment_bitmap, + Arc::new(crate::index::vector_index_details()), + index_type.version(), + ); + + Ok(IndexSegmentPlan::new( + staging_index_uuid, + segment, + partial_indices, + estimated_bytes, + requested_index_type, + )) +} + +/// Collapse an entire staging root into one built-segment plan. +/// +/// Some callers want one final output for the entire staging root instead of +/// one output per planned group. This helper reduces an existing set of plans +/// into a single plan covering the same shard set. +pub(crate) fn collapse_segment_plans( + segment_plans: &[IndexSegmentPlan], +) -> Result { + let Some(first_plan) = segment_plans.first() else { + return Err(Error::index( + "Distributed vector segment build plan contains no segment plans".to_string(), + )); + }; + + let mut fragment_bitmap = RoaringBitmap::new(); + let mut partial_indices = Vec::new(); + let mut estimated_bytes = 0_u64; + + for plan in segment_plans { + fragment_bitmap |= plan.segment().fragment_bitmap().clone(); + partial_indices.extend_from_slice(plan.partial_indices()); + estimated_bytes = estimated_bytes.saturating_add(plan.estimated_bytes()); + } + + let staging_index_uuid = first_plan.staging_index_uuid(); + let segment = IndexSegment::new( + staging_index_uuid, + fragment_bitmap, + first_plan.segment().index_details().clone(), + first_plan.segment().index_version(), + ); + + Ok(IndexSegmentPlan::new( + staging_index_uuid, + segment, + partial_indices, + estimated_bytes, + first_plan.requested_index_type(), + )) +} + +/// Remove the source `partial_*` directories consumed by one segment plan. +async fn cleanup_consumed_partial_shards( + object_store: &ObjectStore, + indices_dir: &Path, + segment_plan: &IndexSegmentPlan, +) -> Result<()> { + for partial_index in segment_plan.partial_indices() { + let source_dir = indices_dir + .child(segment_plan.staging_index_uuid().to_string()) + .child(format!("partial_{}", partial_index.uuid)); + reset_final_segment_dir(object_store, &source_dir).await?; + } + Ok(()) +} + +fn estimate_partial_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { + index_metadata + .files + .as_ref() + .map(|files| files.iter().map(|file| file.size_bytes).sum()) + .unwrap_or(0) +} + +/// Copy all files that belong to one partial shard into a new directory. +async fn copy_partial_segment_contents( + object_store: &ObjectStore, + source_dir: &Path, + target_dir: &Path, +) -> Result<()> { + let mut files = object_store.list(Some(source_dir.clone())); + while let Some(item) = files.next().await { + let meta = item?; + let Some(relative_parts) = meta.location.prefix_match(source_dir) else { + continue; + }; + let relative_parts = relative_parts.collect::>(); + if relative_parts.is_empty() { + continue; + } + let mut final_path = target_dir.clone(); + for part in relative_parts { + final_path = final_path.child(part.as_ref()); + } + object_store.copy(&meta.location, &final_path).await?; + } + Ok(()) +} + +/// Best-effort reset of one target directory before rewriting it. +async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { + match object_store.remove_dir_all(final_dir.clone()).await { + Ok(()) => {} + Err(Error::NotFound { .. }) => {} + Err(err) => return Err(err), + } + Ok(()) +} + +async fn write_root_vector_index_from_auxiliary( + object_store: &ObjectStore, + index_dir: &Path, + requested_index_type: Option, + centroid_source_index_paths: &[Path], +) -> Result<()> { let aux_path = index_dir.child(INDEX_AUXILIARY_FILE_NAME); let scheduler = ScanScheduler::new( Arc::new(object_store.clone()), @@ -1912,36 +2324,20 @@ pub async fn finalize_distributed_merge( let mut pb_ivf: lance_index::pb::Ivf = Message::decode(raw_ivf_bytes.clone())?; // If the unified IVF metadata does not contain centroids, try to source them - // from any partial_* index.idx under this index directory. + // from one of the shard index files that fed this merge. if pb_ivf.centroids_tensor.is_none() { - let mut stream = object_store.list(Some(index_dir.clone())); - let mut partial_index_path = None; - - while let Some(item) = stream.next().await { - let meta = item?; - if let Some(fname) = meta.location.filename() - && fname == INDEX_FILE_NAME - { - let parts: Vec<_> = meta.location.parts().collect(); - if parts.len() >= 2 { - let parent = parts[parts.len() - 2].as_ref(); - if parent.starts_with("partial_") { - partial_index_path = Some(meta.location.clone()); - break; - } - } + for partial_index_path in centroid_source_index_paths { + if !object_store.exists(partial_index_path).await? { + continue; } - } - - if let Some(partial_index_path) = partial_index_path { let fh = scheduler - .open_file(&partial_index_path, &CachedFileSize::unknown()) + .open_file(partial_index_path, &CachedFileSize::unknown()) .await?; let partial_reader = V2Reader::try_open( fh, None, Arc::default(), - &lance_core::cache::LanceCache::no_cache(), + &LanceCache::no_cache(), V2ReaderOptions::default(), ) .await?; @@ -1953,6 +2349,7 @@ pub async fn finalize_distributed_merge( let partial_pb_ivf: lance_index::pb::Ivf = Message::decode(partial_ivf_bytes)?; if partial_pb_ivf.centroids_tensor.is_some() { pb_ivf.centroids_tensor = partial_pb_ivf.centroids_tensor; + break; } } } @@ -2033,69 +2430,6 @@ pub async fn finalize_distributed_merge( let empty_batch = RecordBatch::new_empty(arrow_schema); v2_writer.write_batch(&empty_batch).await?; v2_writer.finish().await?; - - if let Err(err) = cleanup_partial_vector_dirs(object_store, index_dir).await { - warn!( - "Failed to cleanup partial_* vector index directories under '{}': {}", - index_dir.as_ref(), - err - ); - } - - Ok(()) -} - -/// Cleanup for distributed partial vector index directories after -/// a distributed merge. -/// -/// This helper scans `index_dir` for direct child directories whose names -/// start with `partial_` (e.g. `/partial_0`, `/partial_1`) -/// and attempts to recursively delete them via [`ObjectStore::remove_dir_all`]. -/// -/// Listing and deletion failures are logged with [`warn!`] and ignored so that -/// index finalization is never blocked by cleanup. The function always returns -/// `Ok(())`. -async fn cleanup_partial_vector_dirs( - object_store: &ObjectStore, - index_dir: &object_store::path::Path, -) -> Result<()> { - let mut partial_dirs: HashSet = HashSet::new(); - let mut list_stream = object_store.list(Some(index_dir.clone())); - - while let Some(item) = list_stream.next().await { - match item { - Ok(meta) => { - if let Some(relative_parts) = meta.location.prefix_match(index_dir) { - let rel_parts: Vec<_> = relative_parts.collect(); - // Expect paths like: /partial_*/ - if rel_parts.len() >= 2 { - let parent_name = rel_parts[0].as_ref(); - if parent_name.starts_with("partial_") { - partial_dirs.insert(index_dir.child(parent_name)); - } - } - } - } - Err(e) => { - warn!( - "Failed to list index directory '{}' while collecting partial_* dirs: {}", - index_dir.as_ref(), - e - ); - } - } - } - - for dir in partial_dirs { - if let Err(e) = object_store.remove_dir_all(dir.clone()).await { - warn!( - "Failed to remove partial_* directory '{}' after distributed merge: {}", - dir.as_ref(), - e - ); - } - } - Ok(()) } @@ -3644,50 +3978,6 @@ mod tests { assert!(correct_times >= 9, "correct: {}", correct_times); } - #[tokio::test] - async fn test_cleanup_removes_only_partial_dirs() { - let object_store = ObjectStore::memory(); - let index_dir = Path::from("index/uuid_test_cleanup"); - - // partial_* directories that should be removed - let partial0_file = index_dir.child("partial_0").child("file.bin"); - let partial_abc_file = index_dir.child("partial_abc").child("file.bin"); - - // Non-partial paths that must be preserved - let partialx_file = index_dir.child("partialX").child("file.bin"); - let shard_file = index_dir.child("shard_0").child("file.bin"); - let keep_root_file = index_dir.child("keep_root.txt"); - - object_store.put(&partial0_file, b"partial0").await.unwrap(); - object_store - .put(&partial_abc_file, b"partial_abc") - .await - .unwrap(); - object_store.put(&partialx_file, b"partialx").await.unwrap(); - object_store.put(&shard_file, b"shard").await.unwrap(); - object_store.put(&keep_root_file, b"root").await.unwrap(); - - // Sanity: all files exist before cleanup - assert!(object_store.exists(&partial0_file).await.unwrap()); - assert!(object_store.exists(&partial_abc_file).await.unwrap()); - assert!(object_store.exists(&partialx_file).await.unwrap()); - assert!(object_store.exists(&shard_file).await.unwrap()); - assert!(object_store.exists(&keep_root_file).await.unwrap()); - - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - - // partial_* directories should be removed - assert!(!object_store.exists(&partial0_file).await.unwrap()); - assert!(!object_store.exists(&partial_abc_file).await.unwrap()); - - // Non-partial directories and root files must be preserved - assert!(object_store.exists(&partialx_file).await.unwrap()); - assert!(object_store.exists(&shard_file).await.unwrap()); - assert!(object_store.exists(&keep_root_file).await.unwrap()); - } - #[tokio::test(flavor = "multi_thread")] async fn test_build_ivf_model_progress_callback() { use lance_index::progress::IndexBuildProgress; @@ -3758,27 +4048,6 @@ mod tests { } } - #[tokio::test] - async fn test_cleanup_idempotent() { - let object_store = ObjectStore::memory(); - let index_dir = Path::from("index/uuid_test_cleanup_idempotent"); - - let partial_file = index_dir.child("partial_0").child("file.bin"); - object_store.put(&partial_file, b"partial").await.unwrap(); - - assert!(object_store.exists(&partial_file).await.unwrap()); - - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - assert!(!object_store.exists(&partial_file).await.unwrap()); - - // Second call should succeed even when there are no partial_* directories left. - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - } - #[tokio::test] async fn test_prewarm_ivf_legacy() { use lance_io::assert_io_eq; From 691cecb9af7c2e712d8a7028b0b092c35f3f999c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 20 Mar 2026 23:35:44 +0800 Subject: [PATCH 3/8] feat: add public vector segment builder API --- rust/lance-index/src/lib.rs | 2 +- rust/lance-index/src/traits.rs | 23 ++++++++ rust/lance-index/src/types.rs | 57 ++++++++++++++++++ rust/lance/src/index.rs | 8 +++ rust/lance/src/index/create.rs | 104 +++++++++++++++++++++++++++++++-- 5 files changed, 188 insertions(+), 6 deletions(-) diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 5f32a73675c..bb00a716173 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -33,7 +33,7 @@ pub mod types; pub mod vector; pub use crate::traits::*; -pub use crate::types::IndexSegment; +pub use crate::types::{IndexSegment, IndexSegmentPlan}; pub const INDEX_FILE_NAME: &str = "index.idx"; /// The name of the auxiliary index file. diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 5ad13861ddc..1c5923e4050 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -126,6 +126,9 @@ pub trait IndexDescription: Send + Sync { #[async_trait] pub trait DatasetIndexExt { type IndexBuilder<'a> + where + Self: 'a; + type IndexSegmentBuilder<'a> where Self: 'a; @@ -145,6 +148,21 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; + /// Create a builder for building index segments from partial index outputs. + /// + /// The staging UUID identifies a directory containing previously-built shard + /// outputs. The caller supplies the partial index metadata returned by + /// `execute_uncommitted()` so the builder can plan segment grouping without + /// rediscovering shard coverage. + /// + /// This is the canonical entry point for distributed vector segment build. + /// After building the physical segments, publish them as a + /// logical index with [`Self::commit_existing_index_segments`]. + fn create_index_segment_builder<'a>( + &'a self, + staging_index_uuid: String, + ) -> Self::IndexSegmentBuilder<'a>; + /// Create indices on columns. /// /// Upon finish, a new dataset version is generated. @@ -275,6 +293,11 @@ pub trait DatasetIndexExt { async fn index_statistics(&self, index_name: &str) -> Result; /// Commit one or more existing physical index segments as a logical index. + /// + /// This publishes already-built physical segments. It does not build + /// or merge index data; callers should first build segments with + /// [`Self::create_index_segment_builder`] or another index-specific build + /// path and then pass the resulting segments here. async fn commit_existing_index_segments( &mut self, index_name: &str, diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs index 6a991be9932..6520878991b 100644 --- a/rust/lance-index/src/types.rs +++ b/rust/lance-index/src/types.rs @@ -3,6 +3,8 @@ use std::sync::Arc; +use crate::IndexType; +use lance_table::format::IndexMetadata; use roaring::RoaringBitmap; use uuid::Uuid; @@ -73,3 +75,58 @@ impl IndexSegment { ) } } + +/// A plan for building one physical segment from one or more vector index +/// partial indices. +#[derive(Debug, Clone, PartialEq)] +pub struct IndexSegmentPlan { + staging_index_uuid: Uuid, + segment: IndexSegment, + partial_indices: Vec, + estimated_bytes: u64, + requested_index_type: Option, +} + +impl IndexSegmentPlan { + /// Create a plan for one built segment. + pub fn new( + staging_index_uuid: Uuid, + segment: IndexSegment, + partial_indices: Vec, + estimated_bytes: u64, + requested_index_type: Option, + ) -> Self { + Self { + staging_index_uuid, + segment, + partial_indices, + estimated_bytes, + requested_index_type, + } + } + + /// Return the staging index UUID that owns the partial shard outputs. + pub fn staging_index_uuid(&self) -> Uuid { + self.staging_index_uuid + } + + /// Return the segment metadata that should be committed after this plan is built. + pub fn segment(&self) -> &IndexSegment { + &self.segment + } + + /// Return the uncommitted partial index metadata that should be combined into the segment. + pub fn partial_indices(&self) -> &[IndexMetadata] { + &self.partial_indices + } + + /// Return the estimated number of bytes covered by this plan. + pub fn estimated_bytes(&self) -> u64 { + self.estimated_bytes + } + + /// Return the requested logical index type, if one was supplied to the planner. + pub fn requested_index_type(&self) -> Option { + self.requested_index_type + } +} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index fdc91760a2e..1063fdd05af 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -582,6 +582,7 @@ impl IndexDescription for IndexDescriptionImpl { #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; + type IndexSegmentBuilder<'a> = create::IndexSegmentBuilder<'a>; /// Create a builder for creating an index on columns. /// @@ -627,6 +628,13 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } + fn create_index_segment_builder<'a>( + &'a self, + staging_index_uuid: String, + ) -> create::IndexSegmentBuilder<'a> { + create::IndexSegmentBuilder::new(self, staging_index_uuid) + } + #[instrument(skip_all)] async fn create_index( &mut self, diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 7edfbf4c6b8..93e6a65c903 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -17,10 +17,10 @@ use crate::{ vector_index_details, }, }; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, try_join_all}; use lance_core::datatypes::format_field_path; use lance_index::progress::{IndexBuildProgress, NoopIndexBuildProgress}; -use lance_index::{IndexParams, IndexType, scalar::CreatedIndex}; +use lance_index::{IndexParams, IndexSegment, IndexSegmentPlan, IndexType, scalar::CreatedIndex}; use lance_index::{ metrics::NoOpMetricsCollector, scalar::{LANCE_SCALAR_INDEX, ScalarIndexParams, inverted::tokenizer::InvertedIndexParams}, @@ -196,6 +196,7 @@ impl<'a> CreateIndexBuilder<'a> { .map_err(|e| Error::index(format!("Invalid UUID string provided: {}", e)))?, None => Uuid::new_v4(), }; + let mut output_index_uuid = index_id; let created_index = match (self.index_type, self.params.index_name()) { ( IndexType::Bitmap @@ -323,7 +324,7 @@ impl<'a> CreateIndexBuilder<'a> { if let Some(fragments) = &self.fragments { // For distributed indexing, build only on specified fragments // This creates temporary index metadata without committing - Box::pin(build_distributed_vector_index( + let shard_uuid = Box::pin(build_distributed_vector_index( self.dataset, column, &index_name, @@ -334,6 +335,7 @@ impl<'a> CreateIndexBuilder<'a> { self.progress.clone(), )) .await?; + output_index_uuid = shard_uuid; } else { // Standard full dataset indexing Box::pin(build_vector_index( @@ -359,7 +361,14 @@ impl<'a> CreateIndexBuilder<'a> { .await?; } // Capture file sizes after vector index creation - let index_dir = self.dataset.indices_dir().child(index_id.to_string()); + let index_dir = if self.fragments.is_some() { + self.dataset + .indices_dir() + .child(index_id.to_string()) + .child(format!("partial_{}", output_index_uuid)) + } else { + self.dataset.indices_dir().child(index_id.to_string()) + }; let files = list_index_files_with_sizes(&self.dataset.object_store, &index_dir).await?; CreatedIndex { @@ -420,7 +429,7 @@ impl<'a> CreateIndexBuilder<'a> { }; Ok(IndexMetadata { - uuid: index_id, + uuid: output_index_uuid, name: index_name, fields: vec![field.id], dataset_version: self.dataset.manifest.version, @@ -494,6 +503,91 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } +/// Build physical index segments from previously-written partial index outputs. +/// +/// Use [`DatasetIndexExt::create_index_segment_builder`] to open a staging root +/// and then either: +/// +/// - call [`Self::plan`] and orchestrate individual segment builds externally, or +/// - call [`Self::build_all`] to build all segments on the current node. +/// +/// This builder only builds physical segments. Publishing those segments as +/// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`]. +/// Together these two APIs form the canonical distributed vector segment build workflow. +#[derive(Clone)] +pub struct IndexSegmentBuilder<'a> { + dataset: &'a Dataset, + staging_index_uuid: String, + partial_indices: Vec, + target_segment_bytes: Option, +} + +impl<'a> IndexSegmentBuilder<'a> { + pub(crate) fn new(dataset: &'a Dataset, staging_index_uuid: String) -> Self { + Self { + dataset, + staging_index_uuid, + partial_indices: Vec::new(), + target_segment_bytes: None, + } + } + + /// Provide the partial index metadata returned by `execute_uncommitted()` + /// for this staging root. + pub fn with_partial_indices(mut self, partial_indices: Vec) -> Self { + self.partial_indices = partial_indices; + self + } + + /// Set the target size, in bytes, for merged built segments. + /// + /// When set, shard outputs will be grouped into larger built segments up to + /// approximately this size. When unset, each shard output becomes one built + /// segment. + pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { + self.target_segment_bytes = Some(bytes); + self + } + + /// Plan how partial indices should be grouped into built segments. + pub async fn plan(&self) -> Result> { + if self.partial_indices.is_empty() { + return Err(Error::invalid_input( + "IndexSegmentBuilder requires at least one partial index; \ + call with_partial_indices(...) with execute_uncommitted() outputs" + .to_string(), + )); + } + + crate::index::vector::ivf::plan_staging_segments( + &self + .dataset + .indices_dir() + .child(self.staging_index_uuid.as_str()), + &self.partial_indices, + None, + self.target_segment_bytes, + ) + .await + } + + /// Build one segment from a previously-generated plan. + pub async fn build(&self, plan: &IndexSegmentPlan) -> Result { + crate::index::vector::ivf::build_staging_segment( + self.dataset.object_store(), + &self.dataset.indices_dir(), + plan, + ) + .await + } + + /// Plan and build all segments from this staging root. + pub async fn build_all(&self) -> Result> { + let plans = self.plan().await?; + try_join_all(plans.iter().map(|plan| self.build(plan))).await + } +} + #[cfg(test)] mod tests { use super::*; From a07ef614415e967aad5bfb990d3bfa235d1ad113 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 20 Mar 2026 23:35:57 +0800 Subject: [PATCH 4/8] feat: add Python vector segment builder API --- python/python/lance/__init__.py | 2 + python/python/lance/dataset.py | 22 +++- python/python/lance/indices/__init__.py | 11 +- python/python/lance/lance/__init__.pyi | 17 +++ .../python/lance/lance/indices/__init__.pyi | 16 +++ python/src/dataset.rs | 113 +++++++++++++++++- python/src/indices.rs | 95 ++++++++++++++- python/src/lib.rs | 5 +- 8 files changed, 275 insertions(+), 6 deletions(-) diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index 0b905457a50..95bacfc3091 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -29,6 +29,7 @@ from .lance import ( DatasetBasePath, FFILanceTableProvider, + IndexSegmentBuilder, ScanStatistics, bytes_read_counter, iops_counter, @@ -64,6 +65,7 @@ "FragmentMetadata", "Index", "IndexFile", + "IndexSegmentBuilder", "LanceDataset", "LanceFragment", "LanceOperation", diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index c73a7b5a724..b4c4be4bb03 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -47,7 +47,7 @@ from .dependencies import numpy as np from .dependencies import pandas as pd from .fragment import DataFile, FragmentMetadata, LanceFragment -from .indices import IndexConfig, SupportedDistributedIndices +from .indices import IndexConfig, IndexSegment, SupportedDistributedIndices from .lance import ( CleanupStats, Compaction, @@ -3391,6 +3391,26 @@ def merge_index_metadata( self._ds.merge_index_metadata(index_uuid, t, batch_readhead) return None + def create_index_segment_builder(self, staging_index_uuid: str): + """ + Create a builder for turning partial index outputs into committed segments. + + The caller should pass the shared index UUID used during + :meth:`create_index` with ``fragment_ids=...`` and ``index_uuid=...``. + Then provide the returned partial index metadata through + :meth:`IndexSegmentBuilder.with_partial_indices`. + """ + return self._ds.create_index_segment_builder(staging_index_uuid) + + def commit_existing_index_segments( + self, index_name: str, column: str, segments: List[IndexSegment] + ) -> LanceDataset: + """ + Commit built index segments as one logical index. + """ + self._ds.commit_existing_index_segments(index_name, column, segments) + return self + def session(self) -> Session: """ Return the dataset session, which holds the dataset's state. diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index ac586876da0..1ed73c29375 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -5,9 +5,18 @@ from lance.indices.builder import IndexConfig, IndicesBuilder from lance.indices.ivf import IvfModel +from lance.lance.indices import IndexSegment, IndexSegmentPlan from lance.indices.pq import PqModel -__all__ = ["IndicesBuilder", "IndexConfig", "PqModel", "IvfModel", "IndexFileVersion"] +__all__ = [ + "IndicesBuilder", + "IndexConfig", + "PqModel", + "IvfModel", + "IndexFileVersion", + "IndexSegment", + "IndexSegmentPlan", +] class IndexFileVersion(str, Enum): diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 41db5678b0b..e2f70a853a1 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -61,6 +61,8 @@ from .fragment import ( RowIdMeta as RowIdMeta, ) from .indices import IndexDescription as IndexDescription +from .indices import IndexSegment as IndexSegment +from .indices import IndexSegmentPlan as IndexSegmentPlan from .lance import PySearchFilter from .optimize import ( Compaction as Compaction, @@ -185,6 +187,15 @@ class LanceColumnStatistics: class _Session: def size_bytes(self) -> int: ... +class IndexSegmentBuilder: + @property + def staging_index_uuid(self) -> str: ... + def with_partial_indices(self, partial_indices: List[Index]) -> Self: ... + def with_target_segment_bytes(self, bytes: int) -> Self: ... + def plan(self) -> List[IndexSegmentPlan]: ... + def build(self, plan: IndexSegmentPlan) -> IndexSegment: ... + def build_all(self) -> List[IndexSegment]: ... + class LanceBlobFile: def close(self): ... def is_closed(self) -> bool: ... @@ -360,6 +371,12 @@ class _Dataset: def merge_index_metadata( self, index_uuid: str, index_type: str, batch_readhead: Optional[int] = None ): ... + def create_index_segment_builder( + self, staging_index_uuid: str + ) -> IndexSegmentBuilder: ... + def commit_existing_index_segments( + self, index_name: str, column: str, segments: List[IndexSegment] + ) -> None: ... def count_fragments(self) -> int: ... def num_small_files(self, max_rows_per_group: int) -> int: ... def get_fragments(self) -> List[_Fragment]: ... diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index 38e34218399..9ca2c2de0bb 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -16,11 +16,27 @@ from datetime import datetime from typing import Optional import pyarrow as pa +from ...dataset import Index class IndexConfig: index_type: str config: str +class IndexSegment: + uuid: str + fragment_ids: set[int] + index_version: int + + def __repr__(self) -> str: ... + +class IndexSegmentPlan: + staging_index_uuid: str + segment: IndexSegment + partial_indices: list[Index] + estimated_bytes: int + + def __repr__(self) -> str: ... + def train_ivf_model( dataset, column: str, diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d1dcfe5c515..3475ee90af5 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -87,7 +87,7 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; -use crate::indices::{PyIndexConfig, PyIndexDescription}; +use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment, PyIndexSegmentPlan}; use crate::namespace::extract_namespace_arc; use crate::rt; use crate::scanner::ScanStatistics; @@ -323,6 +323,88 @@ impl MergeInsertBuilder { } } +#[pyclass(name = "IndexSegmentBuilder", module = "lance", subclass)] +#[derive(Clone)] +pub struct PyIndexSegmentBuilder { + dataset: Arc, + staging_index_uuid: String, + partial_indices: Vec, + target_segment_bytes: Option, +} + +#[pymethods] +impl PyIndexSegmentBuilder { + #[getter] + fn staging_index_uuid(&self) -> String { + self.staging_index_uuid.clone() + } + + fn with_partial_indices<'a>( + mut slf: PyRefMut<'a, Self>, + partial_indices: &Bound<'_, PyAny>, + ) -> PyResult> { + let mut indices = Vec::new(); + for item in partial_indices.try_iter()? { + indices.push(item?.extract::>()?.0); + } + slf.partial_indices = indices; + Ok(slf) + } + + fn with_target_segment_bytes<'a>( + mut slf: PyRefMut<'a, Self>, + bytes: u64, + ) -> PyResult> { + slf.target_segment_bytes = Some(bytes); + Ok(slf) + } + + fn plan(&self, py: Python<'_>) -> PyResult>> { + let mut builder = self + .dataset + .create_index_segment_builder(self.staging_index_uuid.clone()) + .with_partial_indices(self.partial_indices.clone()); + if let Some(target_segment_bytes) = self.target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + let plans = rt().block_on(Some(py), builder.plan())?.infer_error()?; + plans.into_iter() + .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) + .collect() + } + + fn build( + &self, + py: Python<'_>, + plan: &Bound<'_, PyAny>, + ) -> PyResult> { + let plan = plan.extract::>()?; + let builder = self + .dataset + .create_index_segment_builder(self.staging_index_uuid.clone()) + .with_partial_indices(self.partial_indices.clone()); + let segment = rt() + .block_on(Some(py), builder.build(&plan.inner))? + .infer_error()?; + Py::new(py, PyIndexSegment::from_inner(segment)) + } + + fn build_all(&self, py: Python<'_>) -> PyResult>> { + let mut builder = self + .dataset + .create_index_segment_builder(self.staging_index_uuid.clone()) + .with_partial_indices(self.partial_indices.clone()); + if let Some(target_segment_bytes) = self.target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + let segments = rt().block_on(Some(py), builder.build_all())?.infer_error()?; + segments + .into_iter() + .map(|segment| Py::new(py, PyIndexSegment::from_inner(segment))) + .collect() + } +} + impl MergeInsertBuilder { fn build_stats<'a>(stats: &MergeStats, py: Python<'a>) -> PyResult> { let dict = PyDict::new(py); @@ -2019,6 +2101,35 @@ impl Dataset { Ok(PyLance(index_metadata)) } + fn create_index_segment_builder(&self, staging_index_uuid: String) -> PyResult { + Ok(PyIndexSegmentBuilder { + dataset: self.ds.clone(), + staging_index_uuid, + partial_indices: Vec::new(), + target_segment_bytes: None, + }) + } + + fn commit_existing_index_segments( + &mut self, + index_name: &str, + column: &str, + segments: Vec>, + ) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + let segments = segments + .into_iter() + .map(|segment| segment.inner.clone()) + .collect(); + rt().block_on( + None, + new_self.commit_existing_index_segments(index_name, column, segments), + )? + .infer_error()?; + self.ds = Arc::new(new_self); + Ok(()) + } + fn drop_index(&mut self, name: &str) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); rt().block_on(None, new_self.drop_index(name))? diff --git a/python/src/indices.rs b/python/src/indices.rs index 805c84ec6b9..8980871ee51 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -32,12 +32,14 @@ use pyo3::{ use lance::index::DatasetIndexInternalExt; use crate::fragment::FileFragment; -use crate::utils::PyJson; +use crate::utils::{PyJson, PyLance}; use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexType}; +use lance_index::{ + DatasetIndexExt, IndexDescription, IndexSegment, IndexSegmentPlan, IndexType, +}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] @@ -58,6 +60,93 @@ impl PyIndexConfig { } } +#[pyclass(name = "IndexSegment", module = "lance.indices")] +#[derive(Debug, Clone)] +pub struct PyIndexSegment { + pub(crate) inner: IndexSegment, +} + +impl PyIndexSegment { + pub(crate) fn from_inner(inner: IndexSegment) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyIndexSegment { + #[getter] + fn uuid(&self) -> String { + self.inner.uuid().to_string() + } + + #[getter] + fn fragment_ids(&self) -> HashSet { + self.inner.fragment_bitmap().iter().collect() + } + + #[getter] + fn index_version(&self) -> i32 { + self.inner.index_version() + } + + fn __repr__(&self) -> String { + format!( + "IndexSegment(uuid={}, fragment_ids={:?}, index_version={})", + self.uuid(), + self.fragment_ids(), + self.index_version() + ) + } +} + +#[pyclass(name = "IndexSegmentPlan", module = "lance.indices")] +#[derive(Debug, Clone)] +pub struct PyIndexSegmentPlan { + pub(crate) inner: IndexSegmentPlan, +} + +impl PyIndexSegmentPlan { + pub(crate) fn from_inner(inner: IndexSegmentPlan) -> Self { + Self { inner } + } +} + +#[pymethods] +impl PyIndexSegmentPlan { + #[getter] + fn staging_index_uuid(&self) -> String { + self.inner.staging_index_uuid().to_string() + } + + #[getter] + fn segment(&self) -> PyIndexSegment { + PyIndexSegment::from_inner(self.inner.segment().clone()) + } + + #[getter] + fn partial_indices(&self) -> Vec> { + self.inner + .partial_indices() + .iter() + .cloned() + .map(PyLance) + .collect() + } + + #[getter] + fn estimated_bytes(&self) -> u64 { + self.inner.estimated_bytes() + } + fn __repr__(&self) -> String { + format!( + "IndexSegmentPlan(staging_index_uuid={}, partial_indices={}, estimated_bytes={})", + self.staging_index_uuid(), + self.inner.partial_indices().len(), + self.estimated_bytes() + ) + } +} + #[pyclass(name = "IvfModel", module = "lance.indices")] #[derive(Debug, Clone)] pub struct PyIvfModel { @@ -619,6 +708,8 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { indices.add_wrapped(wrap_pyfunction!(load_shuffled_vectors))?; indices.add_class::()?; indices.add_class::()?; + indices.add_class::()?; + indices.add_class::()?; indices.add_class::()?; indices.add_class::()?; indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?; diff --git a/python/src/lib.rs b/python/src/lib.rs index 8c6f7c186ed..f7d233cf117 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -44,7 +44,9 @@ use dataset::io_stats::IoStats; use dataset::optimize::{ PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult, }; -use dataset::{DatasetBasePath, MergeInsertBuilder, PyFullTextQuery, PySearchFilter}; +use dataset::{ + DatasetBasePath, MergeInsertBuilder, PyFullTextQuery, PyIndexSegmentBuilder, PySearchFilter, +}; use env_logger::{Builder, Env}; use file::{ LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, @@ -252,6 +254,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From 0a3f230d7920d4e35103db9c7e0c28e5a8c4dfde Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 20 Mar 2026 23:36:09 +0800 Subject: [PATCH 5/8] docs: document distributed vector segment build --- docs/src/guide/.pages | 3 +- docs/src/guide/distributed_indexing.md | 154 ++++++++++++++++++ .../distributed_vector_segment_build.svg | 121 ++++++++++++++ 3 files changed, 277 insertions(+), 1 deletion(-) create mode 100644 docs/src/guide/distributed_indexing.md create mode 100644 docs/src/images/distributed_vector_segment_build.svg diff --git a/docs/src/guide/.pages b/docs/src/guide/.pages index 0c9a93c6920..46ddd475799 100644 --- a/docs/src/guide/.pages +++ b/docs/src/guide/.pages @@ -7,7 +7,8 @@ nav: - Tags and Branches: tags_and_branches.md - Object Store Configuration: object_store.md - Distributed Write: distributed_write.md + - Distributed Indexing: distributed_indexing.md - Migration Guide: migration.md - Performance Guide: performance.md - Tokenizer: tokenizer.md - - Extension Arrays: arrays.md \ No newline at end of file + - Extension Arrays: arrays.md diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md new file mode 100644 index 00000000000..4ec54c2c3ab --- /dev/null +++ b/docs/src/guide/distributed_indexing.md @@ -0,0 +1,154 @@ +# Distributed Indexing + +!!! warning + Lance exposes public APIs that can be integrated into an external + distributed index build workflow, but Lance itself does not provide a full + distributed scheduler or end-to-end orchestration layer. + + This page describes the current model, terminology, and execution flow so + that callers can integrate these APIs correctly. + +## Overview + +Distributed index build in Lance follows the same high-level pattern as distributed +write: + +1. multiple workers build index data in parallel +2. the caller invokes Lance segment build APIs for one distributed build +3. Lance discovers the relevant worker outputs, then plans and builds index artifacts +4. the built artifacts are committed into the dataset manifest + +For vector indices, the worker outputs are temporary shard directories under a +shared UUID. Internally, Lance can turn these shard outputs into one or more +built physical segments. + +![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg) + +## Terminology + +This guide uses the following terms consistently: + +- **Staging root**: the shared UUID directory used during distributed index build +- **Partial shard**: one worker output written under the staging root as + `partial_/` +- **Built segment**: one physical index segment produced during segment build and + ready to be committed into the manifest +- **Logical index**: the user-visible index identified by name; a logical index + may contain one or more built segments + +For example, a distributed vector build may create a layout like: + +```text +indices// +├── partial_/ +│ ├── index.idx +│ └── auxiliary.idx +├── partial_/ +│ ├── index.idx +│ └── auxiliary.idx +└── partial_/ + ├── index.idx + └── auxiliary.idx +``` + +After segment build, Lance produces one or more segment directories: + +```text +indices// +├── index.idx +└── auxiliary.idx + +indices// +├── index.idx +└── auxiliary.idx +``` + +These physical segments are then committed together as one logical index. + +## Roles + +There are two parties involved in distributed indexing: + +- **Workers** build partial shards +- **The caller** launches workers, chooses when a distributed build should be + turned into built segments, provides any additional inputs requested by the + segment build APIs, and + commits the final result + +Lance does not provide a distributed scheduler. The caller is responsible for +launching workers and driving the overall workflow. + +## Current Model + +The current model for distributed vector indexing has two layers of parallelism. + +### Shard Build + +First, multiple workers build partial shards in parallel: + +1. on each worker, call + `create_index_builder(...).fragments(...).index_uuid(staging_index_uuid).execute_uncommitted()` +2. each worker writes one `partial_/` under the shared staging root + +### Segment Build + +Then the caller turns that staging root into one or more built segments: + +1. open the staging root with `create_index_segment_builder(staging_index_uuid)` +2. provide partial index metadata with `with_partial_indices(...)` +3. optionally choose a grouping policy with `with_target_segment_bytes(...)` +4. call `plan()` to get `Vec` + +At that point the caller has two execution choices: + +- call `build(plan)` for each plan and run those builds in parallel +- call `build_all()` to let Lance build every planned segment on the current node + +After the segments are built, publish them with +`commit_existing_index_segments(...)`. + +## Internal Segmented Finalize Model + +Internally, Lance models distributed vector segment build as: + +1. **plan** which partial shards should become each built segment +2. **build** each segment from its selected partial shards +3. **commit** the resulting physical segments as one logical index + +The plan step is driven by the staging root and any additional shard metadata +required by the segment build APIs. + +This is intentionally a storage-level model: + +- partial shards are temporary worker outputs +- built segments are durable physical artifacts +- the logical index identity is attached only at commit time + +## Segment Grouping + +When Lance builds segments from a staging root, it may either: + +- keep shard boundaries, so each partial shard becomes one built segment +- group multiple partial shards into a larger built segment + +The grouping decision is separate from shard build. Workers only build partial +shards; Lance applies the segment build policy when it plans built segments. + +## Responsibility Boundaries + +The caller is expected to know: + +- which distributed build is ready for segment build +- any additional shard metadata requested by the segment build APIs +- how the resulting built segments should be published + +Lance is responsible for: + +- writing partial shard artifacts +- discovering partial shards under the staging root +- planning built segments from the discovered shard set +- merging shard storage into built segment artifacts +- committing built segments into the manifest + +This split keeps distributed scheduling outside the storage engine while still +letting Lance own the on-disk index format. diff --git a/docs/src/images/distributed_vector_segment_build.svg b/docs/src/images/distributed_vector_segment_build.svg new file mode 100644 index 00000000000..d36f2726010 --- /dev/null +++ b/docs/src/images/distributed_vector_segment_build.svg @@ -0,0 +1,121 @@ + + + + + + + + + + + Caller + + + + +launch workers + + + + + Parallel index build (per worker) + + + + + + Worker 1 + execute_uncommitted() + + + + + Worker 2 + execute_uncommitted() + + + + + Worker N + execute_uncommitted() + + + + + + partial_<u1>/ + + + + + + partial_<u2>/ + + + + + + partial_<uN>/ + + + + +indices/<staging_uuid>/partial_<u*>/ + + + + + + + + Segment planner + create_index_segment_builder → plan() + + + + +Vec<IndexSegmentPlan> + + + + + Parallel segment build + + + + + + build(plan[0]) + → IndexSegment 0 + + + + + build(plan[1]) + → IndexSegment 1 + + + + + build(plan[N]) + → IndexSegment N + + + + + + + + + commit_existing_index_segments(...) + + + + + + + + + Logical index + + From c1d3b1666df0f266335ccde9dcd72c4ef47a6b4c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 21 Mar 2026 00:22:25 +0800 Subject: [PATCH 6/8] fix: expose python create_index_uncommitted --- docs/src/guide/distributed_indexing.md | 3 +- python/python/lance/dataset.py | 537 ++++++++++++------ python/python/lance/indices/__init__.py | 11 +- .../python/lance/lance/indices/__init__.pyi | 1 + python/python/tests/test_vector_index.py | 33 +- rust/lance/src/index/vector/ivf.rs | 1 - 6 files changed, 373 insertions(+), 213 deletions(-) diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index 4ec54c2c3ab..e6aa241feee 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -86,8 +86,9 @@ The current model for distributed vector indexing has two layers of parallelism. First, multiple workers build partial shards in parallel: -1. on each worker, call +1. on each worker, call an uncommitted shard-build API such as `create_index_builder(...).fragments(...).index_uuid(staging_index_uuid).execute_uncommitted()` + or Python `create_index_uncommitted(..., fragment_ids=..., index_uuid=...)` 2. each worker writes one `partial_/` under the shared staging root ### Segment Build diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index b4c4be4bb03..e8d632b0ecb 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2785,7 +2785,7 @@ def create_scalar_index( self._ds.create_index([column], index_type, name, replace, train, None, kwargs) - def create_index( + def _create_index_impl( self, column: Union[str, List[str]], index_type: str, @@ -2804,198 +2804,25 @@ def create_index( index_cache_size: Optional[int] = None, shuffle_partition_batches: Optional[int] = None, shuffle_partition_concurrency: Optional[int] = None, - # experimental parameters ivf_centroids_file: Optional[str] = None, precomputed_partition_dataset: Optional[str] = None, storage_options: Optional[Dict[str, str]] = None, filter_nan: bool = True, train: bool = True, - # distributed indexing parameters fragment_ids: Optional[List[int]] = None, index_uuid: Optional[str] = None, *, target_partition_size: Optional[int] = None, skip_transpose: bool = False, + require_commit: bool = True, **kwargs, - ) -> LanceDataset: - """Create index on column. - - **Experimental API** - - Parameters - ---------- - column : str - The column to be indexed. - index_type : str - The type of the index. - ``"IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ"`` are supported now. - name : str, optional - The index name. If not provided, it will be generated from the - column name. - metric : str - The distance metric type, i.e., "L2" (alias to "euclidean"), "cosine" - or "dot" (dot product). Default is "L2". - replace : bool - Replace the existing index if it exists. - num_partitions : int, optional - The number of partitions of IVF (Inverted File Index). - Deprecated. Use target_partition_size instead. - ivf_centroids : optional - It can be either :py:class:`np.ndarray`, - :py:class:`pyarrow.FixedSizeListArray` or - :py:class:`pyarrow.FixedShapeTensorArray`. - A ``num_partitions x dimension`` array of existing K-mean centroids - for IVF clustering. If not provided, a new KMeans model will be trained. - pq_codebook : optional, - It can be :py:class:`np.ndarray`, :py:class:`pyarrow.FixedSizeListArray`, - or :py:class:`pyarrow.FixedShapeTensorArray`. - A ``num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)`` - array of K-mean centroids for PQ codebook. - - Note: ``nbits`` is always 8 for now. - If not provided, a new PQ model will be trained. - num_sub_vectors : int, optional - The number of sub-vectors for PQ (Product Quantization). - accelerator : str or ``torch.Device``, optional - If set, use an accelerator to speed up the training process. - Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU). - If not set, use the CPU. - index_cache_size : int, optional - The size of the index cache in number of entries. Default value is 256. - shuffle_partition_batches : int, optional - The number of batches, using the row group size of the dataset, to include - in each shuffle partition. Default value is 10240. - - Assuming the row group size is 1024, each shuffle partition will hold - 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle - will consume less memory but will take longer to complete, and vice versa. - shuffle_partition_concurrency : int, optional - The number of shuffle partitions to process concurrently. Default value is 2 - - By making this value smaller, this shuffle will consume less memory but will - take longer to complete, and vice versa. - storage_options : optional, dict - Extra options that make sense for a particular storage connection. This is - used to store connection parameters like credentials, endpoint, etc. - filter_nan: bool - Defaults to True. False is UNSAFE, and will cause a crash if any null/nan - values are present (and otherwise will not). Disables the null filter used - for nullable columns. Obtains a small speed boost. - train : bool, default True - If True, the index will be trained on the data (e.g., compute IVF - centroids, PQ codebooks). If False, an empty index structure will be - created without training, which can be populated later. - fragment_ids : List[int], optional - If provided, the index will be created only on the specified fragments. - This enables distributed/fragment-level indexing. When provided, the - method creates temporary index metadata but does not commit the index - to the dataset. The index can be committed later using - merge_index_metadata(index_uuid, "VECTOR", column=..., index_name=...). - index_uuid : str, optional - A UUID to use for fragment-level distributed indexing. Multiple - fragment-level indices need to share UUID for later merging. - If not provided, a new UUID will be generated. - target_partition_size: int, optional - The target partition size. If set, the number of partitions will be computed - based on the target partition size. - Otherwise, the target partition size will be set by index type. - kwargs : - Parameters passed to the index building process. - - - - The SQ (Scalar Quantization) is available for only ``IVF_HNSW_SQ`` index type, - this quantization method is used to reduce the memory usage of the index, - it maps the float vectors to integer vectors, each integer is of ``num_bits``, - now only 8 bits are supported. - - If ``index_type`` is "IVF_*", then the following parameters are required: - num_partitions - - If ``index_type`` is with "PQ", then the following parameters are required: - num_sub_vectors - - Optional parameters for `IVF_PQ`: - - - ivf_centroids - Existing K-mean centroids for IVF clustering. - - num_bits - The number of bits for PQ (Product Quantization). Default is 8. - Only 4, 8 are supported. - - index_file_version - The version of the index file. Default is "V3". - - Optional parameters for `IVF_RQ`: - - - num_bits - The number of bits for RQ (Rabit Quantization). Default is 1. - - Optional parameters for `IVF_HNSW_*`: - max_level - Int, the maximum number of levels in the graph. - m - Int, the number of edges per node in the graph. - ef_construction - Int, the number of nodes to examine during the construction. - - Examples - -------- - - .. code-block:: python - - import lance - - dataset = lance.dataset("/tmp/sift.lance") - dataset.create_index( - "vector", - "IVF_PQ", - num_partitions=256, - num_sub_vectors=16 - ) - - .. code-block:: python - - import lance - - dataset = lance.dataset("/tmp/sift.lance") - dataset.create_index( - "vector", - "IVF_HNSW_SQ", - num_partitions=256, - ) - - Experimental Accelerator (GPU) support: - - - *accelerate*: use GPU to train IVF partitions. - Only supports CUDA (Nvidia) or MPS (Apple) currently. - Requires PyTorch being installed. - - .. code-block:: python - - import lance - - dataset = lance.dataset("/tmp/sift.lance") - dataset.create_index( - "vector", - "IVF_PQ", - num_partitions=256, - num_sub_vectors=16, - accelerator="cuda" + ) -> Index: + if not require_commit and fragment_ids is None: + raise ValueError( + "create_index_uncommitted requires fragment_ids " + "for distributed index build" ) - Note: GPU acceleration is currently supported only for the ``IVF_PQ`` index - type. Providing an accelerator for other index types will fall back to CPU - index building. - - References - ---------- - * `Faiss Index `_ - * IVF introduced in `Video Google: a text retrieval approach to object matching - in videos `_ - * `Product quantization for nearest neighbor search - `_ - - """ # Only support building index for 1 column from the API aspect, however # the internal implementation might support building multi-column index later. if isinstance(column, str): @@ -3103,13 +2930,21 @@ def create_index( pass if torch_detected: - if fragment_ids is not None or index_uuid is not None: - LOGGER.info( - "Torch detected; " - "enforce single-node indexing (distributed is CPU-only)." - ) - fragment_ids = None - index_uuid = None + if require_commit: + if fragment_ids is not None or index_uuid is not None: + LOGGER.info( + "Torch detected; " + "enforce single-node indexing (distributed is CPU-only)." + ) + fragment_ids = None + index_uuid = None + else: + if index_uuid is not None: + LOGGER.info( + "Torch detected; " + "enforce single-node indexing (distributed is CPU-only)." + ) + index_uuid = None if accelerator is not None: from .vector import ( @@ -3302,7 +3137,7 @@ def create_index( kwargs["index_uuid"] = index_uuid timers["final_create_index:start"] = time.time() - self._ds.create_index( + index = self._ds.create_index( column, index_type, name, replace, train, storage_options, kwargs ) timers["final_create_index:end"] = time.time() @@ -3318,8 +3153,334 @@ def create_index( "Temporary shuffle buffers stored at %s, you may want to delete it.", kwargs["precomputed_shuffle_buffers_path"], ) + return index + + def create_index( + self, + column: Union[str, List[str]], + index_type: str, + name: Optional[str] = None, + metric: str = "L2", + replace: bool = False, + num_partitions: Optional[int] = None, + ivf_centroids: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, + pq_codebook: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, + num_sub_vectors: Optional[int] = None, + accelerator: Optional[Union[str, "torch.Device"]] = None, + index_cache_size: Optional[int] = None, + shuffle_partition_batches: Optional[int] = None, + shuffle_partition_concurrency: Optional[int] = None, + # experimental parameters + ivf_centroids_file: Optional[str] = None, + precomputed_partition_dataset: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, + filter_nan: bool = True, + train: bool = True, + # distributed indexing parameters + fragment_ids: Optional[List[int]] = None, + index_uuid: Optional[str] = None, + *, + target_partition_size: Optional[int] = None, + skip_transpose: bool = False, + **kwargs, + ) -> LanceDataset: + """Create index on column. + + **Experimental API** + + Parameters + ---------- + column : str + The column to be indexed. + index_type : str + The type of the index. + ``"IVF_PQ, IVF_HNSW_PQ and IVF_HNSW_SQ"`` are supported now. + name : str, optional + The index name. If not provided, it will be generated from the + column name. + metric : str + The distance metric type, i.e., "L2" (alias to "euclidean"), "cosine" + or "dot" (dot product). Default is "L2". + replace : bool + Replace the existing index if it exists. + num_partitions : int, optional + The number of partitions of IVF (Inverted File Index). + Deprecated. Use target_partition_size instead. + ivf_centroids : optional + It can be either :py:class:`np.ndarray`, + :py:class:`pyarrow.FixedSizeListArray` or + :py:class:`pyarrow.FixedShapeTensorArray`. + A ``num_partitions x dimension`` array of existing K-mean centroids + for IVF clustering. If not provided, a new KMeans model will be trained. + pq_codebook : optional, + It can be :py:class:`np.ndarray`, :py:class:`pyarrow.FixedSizeListArray`, + or :py:class:`pyarrow.FixedShapeTensorArray`. + A ``num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)`` + array of K-mean centroids for PQ codebook. + + Note: ``nbits`` is always 8 for now. + If not provided, a new PQ model will be trained. + num_sub_vectors : int, optional + The number of sub-vectors for PQ (Product Quantization). + accelerator : str or ``torch.Device``, optional + If set, use an accelerator to speed up the training process. + Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU). + If not set, use the CPU. + index_cache_size : int, optional + The size of the index cache in number of entries. Default value is 256. + shuffle_partition_batches : int, optional + The number of batches, using the row group size of the dataset, to include + in each shuffle partition. Default value is 10240. + + Assuming the row group size is 1024, each shuffle partition will hold + 10240 * 1024 = 10,485,760 rows. By making this value smaller, this shuffle + will consume less memory but will take longer to complete, and vice versa. + shuffle_partition_concurrency : int, optional + The number of shuffle partitions to process concurrently. Default value is 2 + + By making this value smaller, this shuffle will consume less memory but will + take longer to complete, and vice versa. + storage_options : optional, dict + Extra options that make sense for a particular storage connection. This is + used to store connection parameters like credentials, endpoint, etc. + filter_nan: bool + Defaults to True. False is UNSAFE, and will cause a crash if any null/nan + values are present (and otherwise will not). Disables the null filter used + for nullable columns. Obtains a small speed boost. + train : bool, default True + If True, the index will be trained on the data (e.g., compute IVF + centroids, PQ codebooks). If False, an empty index structure will be + created without training, which can be populated later. + fragment_ids : List[int], optional + If provided, the index will be created only on the specified fragments. + This enables distributed/fragment-level indexing. When provided, the + method creates temporary index metadata but does not commit the index + to the dataset. The index can be committed later using + merge_index_metadata(index_uuid, "VECTOR", column=..., index_name=...). + index_uuid : str, optional + A UUID to use for fragment-level distributed indexing. Multiple + fragment-level indices need to share UUID for later merging. + If not provided, a new UUID will be generated. + target_partition_size: int, optional + The target partition size. If set, the number of partitions will be computed + based on the target partition size. + Otherwise, the target partition size will be set by index type. + kwargs : + Parameters passed to the index building process. + + + + The SQ (Scalar Quantization) is available for only ``IVF_HNSW_SQ`` index type, + this quantization method is used to reduce the memory usage of the index, + it maps the float vectors to integer vectors, each integer is of ``num_bits``, + now only 8 bits are supported. + + If ``index_type`` is "IVF_*", then the following parameters are required: + num_partitions + + If ``index_type`` is with "PQ", then the following parameters are required: + num_sub_vectors + + Optional parameters for `IVF_PQ`: + + - ivf_centroids + Existing K-mean centroids for IVF clustering. + - num_bits + The number of bits for PQ (Product Quantization). Default is 8. + Only 4, 8 are supported. + - index_file_version + The version of the index file. Default is "V3". + + Optional parameters for `IVF_RQ`: + + - num_bits + The number of bits for RQ (Rabit Quantization). Default is 1. + + Optional parameters for `IVF_HNSW_*`: + max_level + Int, the maximum number of levels in the graph. + m + Int, the number of edges per node in the graph. + ef_construction + Int, the number of nodes to examine during the construction. + + Examples + -------- + + .. code-block:: python + + import lance + + dataset = lance.dataset("/tmp/sift.lance") + dataset.create_index( + "vector", + "IVF_PQ", + num_partitions=256, + num_sub_vectors=16 + ) + + .. code-block:: python + + import lance + + dataset = lance.dataset("/tmp/sift.lance") + dataset.create_index( + "vector", + "IVF_HNSW_SQ", + num_partitions=256, + ) + + Experimental Accelerator (GPU) support: + + - *accelerate*: use GPU to train IVF partitions. + Only supports CUDA (Nvidia) or MPS (Apple) currently. + Requires PyTorch being installed. + + .. code-block:: python + + import lance + + dataset = lance.dataset("/tmp/sift.lance") + dataset.create_index( + "vector", + "IVF_PQ", + num_partitions=256, + num_sub_vectors=16, + accelerator="cuda" + ) + + Note: GPU acceleration is currently supported only for the ``IVF_PQ`` index + type. Providing an accelerator for other index types will fall back to CPU + index building. + + References + ---------- + * `Faiss Index `_ + * IVF introduced in `Video Google: a text retrieval approach to object matching + in videos `_ + * `Product quantization for nearest neighbor search + `_ + + """ + self._create_index_impl( + column, + index_type, + name=name, + metric=metric, + replace=replace, + num_partitions=num_partitions, + ivf_centroids=ivf_centroids, + pq_codebook=pq_codebook, + num_sub_vectors=num_sub_vectors, + accelerator=accelerator, + index_cache_size=index_cache_size, + shuffle_partition_batches=shuffle_partition_batches, + shuffle_partition_concurrency=shuffle_partition_concurrency, + ivf_centroids_file=ivf_centroids_file, + precomputed_partition_dataset=precomputed_partition_dataset, + storage_options=storage_options, + filter_nan=filter_nan, + train=train, + fragment_ids=fragment_ids, + index_uuid=index_uuid, + target_partition_size=target_partition_size, + skip_transpose=skip_transpose, + require_commit=True, + **kwargs, + ) return self + def create_index_uncommitted( + self, + column: Union[str, List[str]], + index_type: str, + name: Optional[str] = None, + metric: str = "L2", + replace: bool = False, + num_partitions: Optional[int] = None, + ivf_centroids: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, + pq_codebook: Optional[ + Union[np.ndarray, pa.FixedSizeListArray, pa.FixedShapeTensorArray] + ] = None, + num_sub_vectors: Optional[int] = None, + accelerator: Optional[Union[str, "torch.Device"]] = None, + index_cache_size: Optional[int] = None, + shuffle_partition_batches: Optional[int] = None, + shuffle_partition_concurrency: Optional[int] = None, + ivf_centroids_file: Optional[str] = None, + precomputed_partition_dataset: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, + filter_nan: bool = True, + train: bool = True, + fragment_ids: Optional[List[int]] = None, + index_uuid: Optional[str] = None, + *, + target_partition_size: Optional[int] = None, + skip_transpose: bool = False, + **kwargs, + ) -> Index: + """ + Create one uncommitted partial index and return its metadata. + + This is the public shard-build API for distributed index construction. + Unlike :meth:`create_index`, this method does not publish the index into + the dataset manifest. Instead, it writes one partial index under the + staging UUID and returns the resulting :class:`Index` metadata. + + Callers should: + + 1. run :meth:`create_index_uncommitted` on each worker with the worker's + assigned ``fragment_ids`` and a shared ``index_uuid`` + 2. collect the returned :class:`Index` objects + 3. pass them to :meth:`IndexSegmentBuilder.with_partial_indices` + 4. build one or more segments and commit them with + :meth:`commit_existing_index_segments` + + Parameters are the same as :meth:`create_index`, with two additional + requirements for distributed shard build: + + - ``fragment_ids`` must be provided + - workers that belong to the same distributed build must share the same + ``index_uuid`` + + Returns + ------- + Index + Metadata for the partial index that was written by this call. + """ + return self._create_index_impl( + column, + index_type, + name=name, + metric=metric, + replace=replace, + num_partitions=num_partitions, + ivf_centroids=ivf_centroids, + pq_codebook=pq_codebook, + num_sub_vectors=num_sub_vectors, + accelerator=accelerator, + index_cache_size=index_cache_size, + shuffle_partition_batches=shuffle_partition_batches, + shuffle_partition_concurrency=shuffle_partition_concurrency, + ivf_centroids_file=ivf_centroids_file, + precomputed_partition_dataset=precomputed_partition_dataset, + storage_options=storage_options, + filter_nan=filter_nan, + train=train, + fragment_ids=fragment_ids, + index_uuid=index_uuid, + target_partition_size=target_partition_size, + skip_transpose=skip_transpose, + require_commit=False, + **kwargs, + ) + def drop_index(self, name: str): """ Drops an index from the dataset diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index 1ed73c29375..085ff66e252 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -3,10 +3,13 @@ from enum import Enum -from lance.indices.builder import IndexConfig, IndicesBuilder -from lance.indices.ivf import IvfModel -from lance.lance.indices import IndexSegment, IndexSegmentPlan -from lance.indices.pq import PqModel +from .. import lance as _lance +from .builder import IndexConfig, IndicesBuilder +from .ivf import IvfModel +from .pq import PqModel + +IndexSegment = _lance.indices.IndexSegment +IndexSegmentPlan = _lance.indices.IndexSegmentPlan __all__ = [ "IndicesBuilder", diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index 9ca2c2de0bb..e1282b675a6 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -16,6 +16,7 @@ from datetime import datetime from typing import Optional import pyarrow as pa + from ...dataset import Index class IndexConfig: diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index b38a5764fdc..973264c475c 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2765,26 +2765,21 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): max_iters=20, ) - partial_indices = [] - for fragment in frags[:2]: - partial_indices.append( - ds._ds.create_index( - ["vector"], - "IVF_FLAT", - "vector_idx", - False, - True, - None, - { - "fragment_ids": [fragment.fragment_id], - "index_uuid": shared_uuid, - "num_partitions": 4, - "num_sub_vectors": 128, - "ivf_centroids": preprocessed["ivf_centroids"], - "pq_codebook": preprocessed["pq_codebook"], - }, - ) + partial_indices = [ + ds.create_index_uncommitted( + "vector", + "IVF_FLAT", + name="vector_idx", + train=True, + fragment_ids=[fragment.fragment_id], + index_uuid=shared_uuid, + num_partitions=4, + num_sub_vectors=128, + ivf_centroids=preprocessed["ivf_centroids"], + pq_codebook=preprocessed["pq_codebook"], ) + for fragment in frags[:2] + ] segment_builder = ds.create_index_segment_builder(shared_uuid).with_partial_indices( partial_indices diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index d780bed3f61..b693de2e0a0 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1887,7 +1887,6 @@ async fn write_ivf_hnsw_file( /// /// The planner returns a `Vec` so callers can decide /// whether to execute the work serially or fan it out externally. - /// Plan how one staging root should be turned into built physical segments. /// /// This function does not touch storage. It only: From b86f91c4df074cee4507eb0fd0a172ef9d40f31a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 21 Mar 2026 00:34:47 +0800 Subject: [PATCH 7/8] fix: format python bindings --- python/src/dataset.rs | 5 ++++- python/src/indices.rs | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 3475ee90af5..1dad283309d 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2101,7 +2101,10 @@ impl Dataset { Ok(PyLance(index_metadata)) } - fn create_index_segment_builder(&self, staging_index_uuid: String) -> PyResult { + fn create_index_segment_builder( + &self, + staging_index_uuid: String, + ) -> PyResult { Ok(PyIndexSegmentBuilder { dataset: self.ds.clone(), staging_index_uuid, diff --git a/python/src/indices.rs b/python/src/indices.rs index 8980871ee51..9589b78ff36 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -37,9 +37,7 @@ use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; use lance::index::vector::ivf::write_ivf_pq_file_from_existing_index; -use lance_index::{ - DatasetIndexExt, IndexDescription, IndexSegment, IndexSegmentPlan, IndexType, -}; +use lance_index::{DatasetIndexExt, IndexDescription, IndexSegment, IndexSegmentPlan, IndexType}; use uuid::Uuid; #[pyclass(name = "IndexConfig", module = "lance.indices", get_all)] From bfc9e63a0f56511681eebf229b6ae930cf33cdf2 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 21 Mar 2026 01:05:06 +0800 Subject: [PATCH 8/8] fix: format python segment builder bindings --- python/src/dataset.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 1dad283309d..3b76f3ce043 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -368,16 +368,13 @@ impl PyIndexSegmentBuilder { builder = builder.with_target_segment_bytes(target_segment_bytes); } let plans = rt().block_on(Some(py), builder.plan())?.infer_error()?; - plans.into_iter() + plans + .into_iter() .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) .collect() } - fn build( - &self, - py: Python<'_>, - plan: &Bound<'_, PyAny>, - ) -> PyResult> { + fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { let plan = plan.extract::>()?; let builder = self .dataset @@ -397,7 +394,9 @@ impl PyIndexSegmentBuilder { if let Some(target_segment_bytes) = self.target_segment_bytes { builder = builder.with_target_segment_bytes(target_segment_bytes); } - let segments = rt().block_on(Some(py), builder.build_all())?.infer_error()?; + let segments = rt() + .block_on(Some(py), builder.build_all())? + .infer_error()?; segments .into_iter() .map(|segment| Py::new(py, PyIndexSegment::from_inner(segment)))