From 791aa769235ccf688d553d8038ffac964ad13231 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 16 Jan 2026 20:47:17 +0800 Subject: [PATCH 01/10] fix: low recall performance for ivf_pq in distributed mode --- rust/lance/src/index/create.rs | 42 ++- rust/lance/src/index/vector.rs | 47 ++++ rust/lance/src/index/vector/builder.rs | 61 ++++- rust/lance/src/index/vector/ivf/io.rs | 20 +- rust/lance/src/index/vector/ivf/v2.rs | 340 ++++++++++++++++++++++++- 5 files changed, 492 insertions(+), 18 deletions(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index d929dbffffd..c854ef10c07 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -9,8 +9,8 @@ use crate::{ index::{ scalar::build_scalar_index, vector::{ - build_distributed_vector_index, build_empty_vector_index, build_vector_index, - VectorIndexParams, LANCE_VECTOR_INDEX, + build_distributed_vector_index, build_empty_vector_index, build_ivf_pq_distributed, + build_ivf_pq_single, build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX, }, vector_index_details, DatasetIndexExt, DatasetIndexInternalExt, }, @@ -312,10 +312,38 @@ impl<'a> CreateIndexBuilder<'a> { })?; if train { - // Check if this is distributed indexing (fragment-level) - if self.fragments.is_some() { - // For distributed indexing, build only on specified fragments - // This creates temporary index metadata without committing + let is_pq = matches!(vec_params.index_type(), IndexType::IvfPq); + let is_distributed = + self.fragments.as_ref().map_or(false, |ids| !ids.is_empty()); + + if is_pq && is_distributed { + // IVF_PQ distributed: keep logic isolated in builder layer and + // delegate to a dedicated helper. This avoids touching the + // generic IVF/RQ implementations in lance-index. + Box::pin(build_ivf_pq_distributed( + self.dataset, + column, + &index_name, + &index_id.to_string(), + vec_params, + fri, + self.fragments.as_ref().unwrap(), + )) + .await?; + } else if is_pq { + // IVF_PQ single-machine build: keep behavior identical to upstream + // by delegating to the standard vector index builder. + Box::pin(build_ivf_pq_single( + self.dataset, + column, + &index_name, + &index_id.to_string(), + vec_params, + fri, + )) + .await?; + } else if self.fragments.is_some() { + // Non-PQ distributed indexing: use the generic distributed builder. Box::pin(build_distributed_vector_index( self.dataset, column, @@ -327,7 +355,7 @@ impl<'a> CreateIndexBuilder<'a> { )) .await?; } else { - // Standard full dataset indexing + // Standard full dataset indexing. Box::pin(build_vector_index( self.dataset, column, diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 4e7316722b7..09ce4f07b4f 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -683,6 +683,53 @@ is not supported in distributed mode; skipping this shard", Ok(()) } +/// Build a single-machine IVF_PQ index. +/// +/// This is a thin wrapper around `build_vector_index` that is only used +/// for clarity at the builder layer so that the single-machine PQ build +/// logic stays identical to the upstream path. +#[instrument(level = "debug", skip(dataset))] +pub(crate) async fn build_ivf_pq_single( + dataset: &Dataset, + column: &str, + name: &str, + uuid: &str, + params: &VectorIndexParams, + frag_reuse_index: Option>, +) -> Result<()> { + // Delegate to the generic vector index builder. This keeps the + // single-machine IVF_PQ path identical to the upstream implementation. + build_vector_index(dataset, column, name, uuid, params, frag_reuse_index).await +} + +/// Build a distributed IVF_PQ index for specific fragments. +/// +/// This is a thin wrapper around `build_distributed_vector_index`. It is +/// only used when `VectorIndexParams::index_type()` is `IndexType::IvfPq` +/// and a non-empty `fragment_ids` list is provided at the Dataset/index +/// builder layer. All core IVF/PQ logic remains inside lance-index. +#[instrument(level = "debug", skip(dataset))] +pub(crate) async fn build_ivf_pq_distributed( + dataset: &Dataset, + column: &str, + name: &str, + uuid: &str, + params: &VectorIndexParams, + frag_reuse_index: Option>, + fragment_ids: &[u32], +) -> Result<()> { + build_distributed_vector_index( + dataset, + column, + name, + uuid, + params, + frag_reuse_index, + fragment_ids, + ) + .await +} + /// Build a Vector Index #[instrument(level = "debug", skip(dataset))] pub(crate) async fn build_vector_index( diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 26fed852b86..8ea5bfc2502 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -88,6 +88,26 @@ use super::{ utils::{self, get_vector_type}, }; +/// Stably sort a RecordBatch by the ROW_ID column in ascending order. +/// +/// If the batch has no ROW_ID column or has fewer than 2 rows, it is +/// returned unchanged. When sorting, the relative order of rows with the +/// same ROW_ID is preserved. +fn stable_sort_batch_by_row_id(batch: &RecordBatch) -> Result { + if let Some(row_id_col) = batch.column_by_name(ROW_ID) { + let row_ids = row_id_col.as_primitive::(); + if row_ids.len() > 1 { + let mut order: Vec = (0..row_ids.len()).collect(); + // Vec::sort_by is stable, so equal ROW_IDs keep their + // original relative order. + order.sort_by(|&i, &j| row_ids.value(i).cmp(&row_ids.value(j))); + let indices = UInt32Array::from_iter_values(order.into_iter().map(|i| i as u32)); + return Ok(batch.take(&indices)?); + } + } + Ok(batch.clone()) +} + // the number of partitions to evaluate for reassigning const REASSIGN_RANGE: usize = 64; @@ -935,6 +955,15 @@ impl IvfIndexBuilder } _ => {} } + + // Normalize each batch for this partition to be stably sorted by ROW_ID. + for batch in part_batches.iter_mut() { + if batch.num_rows() == 0 { + continue; + } + *batch = stable_sort_batch_by_row_id(batch)?; + } + batches.extend(part_batches); } @@ -958,6 +987,7 @@ impl IvfIndexBuilder .map(|s| s.parse::().unwrap_or(0.0)) .unwrap_or(0.0); let batch = batch.drop_column(PART_ID_COLUMN)?; + let batch = stable_sort_batch_by_row_id(&batch)?; batches.push(batch); } } @@ -981,6 +1011,8 @@ impl IvfIndexBuilder )); }; + let is_pq = Q::quantization_type() == QuantizationType::Product; + // prepare the final writers let storage_path = self.index_dir.child(INDEX_AUXILIARY_FILE_NAME); let index_path = self.index_dir.child(INDEX_FILE_NAME); @@ -1024,7 +1056,32 @@ impl IvfIndexBuilder storage_ivf.add_partition(0); } else { let batches = storage.to_batches()?.collect::>(); - let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?; + let mut batch = + arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?; + + if is_pq && batch.column_by_name(PQ_CODE_COLUMN).is_some() { + // The PQ storage keeps codes in a transposed layout (bytes grouped + // across all rows). Convert them back to per-row layout so that a + // stable ROW_ID sort moves PQ_CODE_COLUMN together with ROW_ID. + let codes_fsl = batch + .column_by_name(PQ_CODE_COLUMN) + .unwrap() + .as_fixed_size_list(); + let num_rows = batch.num_rows(); + let bytes_per_code = codes_fsl.value_length() as usize; + let codes = codes_fsl.values().as_primitive::(); + let original_codes = transpose(codes, bytes_per_code, num_rows); + let original_fsl = Arc::new(FixedSizeListArray::try_new_from_values( + original_codes, + bytes_per_code as i32, + )?); + batch = batch.replace_column_by_name(PQ_CODE_COLUMN, original_fsl)?; + } + + // Enforce a stable ROW_ID ordering for all auxiliary batches so that the + // PQ code column moves together with ROW_ID. + batch = stable_sort_batch_by_row_id(&batch)?; + storage_writer.write_batch(&batch).await?; storage_ivf.add_partition(batch.num_rows() as u32); } @@ -1071,7 +1128,7 @@ impl IvfIndexBuilder let mut metadata = quantizer.metadata(Some(QuantizationMetadata { codebook_position: Some(0), codebook: None, - transposed: true, + transposed: !is_pq, })); if let Some(extra_metadata) = metadata.extra_metadata()? { let idx = storage_writer.add_global_buffer(extra_metadata).await?; diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index c79d568a6c3..de938e67bff 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -201,20 +201,26 @@ pub(super) async fn write_pq_partitions( location: location!(), })?; if let Some(pq_code) = pq_index.code.as_ref() { - let original_pq_codes = transpose( - pq_code, - pq_index.pq.num_sub_vectors, - pq_code.len() / pq_index.pq.code_dim(), - ); + let row_ids = pq_index.row_ids.as_ref().unwrap(); + let num_vectors = row_ids.len(); + if num_vectors == 0 || pq_code.len() == 0 { + continue; + } + if pq_code.len() % num_vectors != 0 { + continue; + } + let num_bytes_per_code = pq_code.len() / num_vectors; + let original_pq_codes = transpose(pq_code, num_bytes_per_code, num_vectors); let fsl = Arc::new( FixedSizeListArray::try_new_from_values( original_pq_codes, - pq_index.pq.code_dim() as i32, + num_bytes_per_code as i32, ) .unwrap(), ); + pq_array.push(fsl); - row_id_array.push(pq_index.row_ids.as_ref().unwrap().clone()); + row_id_array.push(row_ids.clone()); } } } diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0e85378ab97..fe2ab314620 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -611,7 +611,7 @@ pub type IvfHnswPqIndex = IVFIndex; #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{BTreeMap, HashMap, HashSet}; use std::{ops::Range, sync::Arc}; use all_asserts::{assert_ge, assert_le, assert_lt}; @@ -629,6 +629,7 @@ mod tests { use lance_index::vector::storage::VectorStore; use crate::dataset::{InsertBuilder, UpdateBuilder, WriteMode, WriteParams}; + use crate::index::vector::ivf::finalize_distributed_merge; use crate::index::vector::ivf::v2::IvfPq; use crate::index::DatasetIndexInternalExt; use crate::utils::test::copy_test_data_to_tmp; @@ -641,19 +642,21 @@ mod tests { Dataset, }; use lance_core::cache::LanceCache; + use lance_core::utils::address::RowAddress; use lance_core::utils::tempfile::TempStrDir; use lance_core::{Result, ROW_ID}; use lance_encoding::decoder::DecoderPlugins; use lance_file::reader::{FileReader, FileReaderOptions}; use lance_file::writer::FileWriter; use lance_index::vector::ivf::IvfBuildParams; + use lance_index::vector::kmeans::{train_kmeans, KMeansParams}; use lance_index::vector::pq::PQBuildParams; use lance_index::vector::quantizer::QuantizerMetadata; use lance_index::vector::sq::builder::SQBuildParams; - use lance_index::vector::DIST_COL; use lance_index::vector::{ pq::storage::ProductQuantizationMetadata, storage::STORAGE_METADATA_KEY, }; + use lance_index::vector::{DIST_COL, PQ_CODE_COLUMN}; use lance_index::{metrics::NoOpMetricsCollector, INDEX_AUXILIARY_FILE_NAME}; use lance_index::{optimize::OptimizeOptions, scalar::IndexReader}; use lance_index::{scalar::IndexWriter, vector::hnsw::builder::HnswBuildParams}; @@ -670,6 +673,7 @@ mod tests { use rand::distr::uniform::SampleUniform; use rand::{rngs::StdRng, Rng, SeedableRng}; use rstest::rstest; + use uuid::Uuid; const NUM_ROWS: usize = 512; const DIM: usize = 32; @@ -1293,6 +1297,338 @@ mod tests { .collect() } + const TWO_FRAG_NUM_ROWS: usize = 2000; + const TWO_FRAG_DIM: usize = 128; + const TWO_FRAG_NUM_PARTITIONS: usize = 4; + const TWO_FRAG_NUM_SUBVECTORS: usize = 16; + const TWO_FRAG_NUM_BITS: usize = 8; + const TWO_FRAG_SAMPLE_RATE: usize = 7; + const TWO_FRAG_MAX_ITERS: u32 = 20; + + fn make_two_fragment_batches() -> (Arc, Vec) { + let ids = Arc::new(UInt64Array::from_iter_values(0..TWO_FRAG_NUM_ROWS as u64)); + + let values = generate_random_array_with_range(TWO_FRAG_NUM_ROWS * TWO_FRAG_DIM, 0.0..1.0); + let vectors = Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(values), + TWO_FRAG_DIM as i32, + ) + .unwrap(), + ); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new("vector", vectors.data_type().clone(), false), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap(); + + (schema, vec![batch]) + } + + async fn write_dataset_from_batches( + test_uri: &str, + schema: Arc, + batches: Vec, + ) -> Dataset { + let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + + let write_params = WriteParams { + max_rows_per_file: 500, + mode: WriteMode::Overwrite, + ..Default::default() + }; + + Dataset::write(batches, test_uri, Some(write_params)) + .await + .unwrap() + } + + async fn prepare_global_ivf_pq( + dataset: &Dataset, + vector_column: &str, + ) -> (IvfBuildParams, PQBuildParams) { + let batch = dataset + .scan() + .project(&[vector_column.to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch + .column_by_name(vector_column) + .expect("vector column should exist") + .as_fixed_size_list(); + + let dim = vectors.value_length() as usize; + assert_eq!(dim, TWO_FRAG_DIM, "unexpected vector dimension"); + + let values = vectors.values().as_primitive::(); + + let kmeans_params = KMeansParams::new(None, TWO_FRAG_MAX_ITERS, 1, DistanceType::L2); + let kmeans = train_kmeans::( + values, + kmeans_params, + dim, + TWO_FRAG_NUM_PARTITIONS, + TWO_FRAG_SAMPLE_RATE, + ) + .unwrap(); + + let centroids_flat = kmeans.centroids.as_primitive::().clone(); + let centroids_fsl = + Arc::new(FixedSizeListArray::try_new_from_values(centroids_flat, dim as i32).unwrap()); + let mut ivf_params = + IvfBuildParams::try_with_centroids(TWO_FRAG_NUM_PARTITIONS, centroids_fsl).unwrap(); + ivf_params.max_iters = TWO_FRAG_MAX_ITERS as usize; + ivf_params.sample_rate = TWO_FRAG_SAMPLE_RATE; + + let mut pq_train_params = PQBuildParams::new(TWO_FRAG_NUM_SUBVECTORS, TWO_FRAG_NUM_BITS); + pq_train_params.max_iters = TWO_FRAG_MAX_ITERS as usize; + pq_train_params.sample_rate = TWO_FRAG_SAMPLE_RATE; + + let pq = pq_train_params.build(vectors, DistanceType::L2).unwrap(); + let codebook_flat = pq.codebook.values().as_primitive::().clone(); + let pq_codebook: ArrayRef = Arc::new(codebook_flat); + let mut pq_params = + PQBuildParams::with_codebook(TWO_FRAG_NUM_SUBVECTORS, TWO_FRAG_NUM_BITS, pq_codebook); + pq_params.max_iters = TWO_FRAG_MAX_ITERS as usize; + pq_params.sample_rate = TWO_FRAG_SAMPLE_RATE; + + (ivf_params, pq_params) + } + + async fn build_ivfpq_for_fragment_groups( + dataset: &mut Dataset, + fragment_groups: Vec>, // each group is a set of fragment ids + ivf_params: &IvfBuildParams, + pq_params: &PQBuildParams, + index_name: &str, + ) { + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_pq_params( + DistanceType::L2, + ivf_params.clone(), + pq_params.clone(), + ); + + for fragments in fragment_groups { + let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, ¶ms); + builder = builder + .name(index_name.to_string()) + .fragments(fragments) + .index_uuid(shared_uuid.to_string()); + // Build partial index shards without committing to manifest. + builder.execute_uncommitted().await.unwrap(); + } + + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + finalize_distributed_merge(dataset.object_store(), &index_dir, Some(IndexType::IvfPq)) + .await + .unwrap(); + + dataset + .commit_existing_index(index_name, "vector", shared_uuid) + .await + .unwrap(); + } + + fn assert_ivf_layout_equal(stats_a: &serde_json::Value, stats_b: &serde_json::Value) { + let idx_a = &stats_a["indices"][0]; + let idx_b = &stats_b["indices"][0]; + + // Centroids: same shape and values (within tolerance). + let centroids_a = idx_a["centroids"] + .as_array() + .expect("centroids should be an array"); + let centroids_b = idx_b["centroids"] + .as_array() + .expect("centroids should be an array"); + assert_eq!( + centroids_a.len(), + centroids_b.len(), + "num centroids mismatch", + ); + for (row_a, row_b) in centroids_a.iter().zip(centroids_b.iter()) { + let row_a = row_a + .as_array() + .unwrap_or_else(|| panic!("invalid centroid row: {:?}", row_a)); + let row_b = row_b + .as_array() + .unwrap_or_else(|| panic!("invalid centroid row: {:?}", row_b)); + assert_eq!(row_a.len(), row_b.len(), "centroid dim mismatch"); + for (va, vb) in row_a.iter().zip(row_b.iter()) { + let fa = va.as_f64().expect("centroid must be numeric") as f32; + let fb = vb.as_f64().expect("centroid must be numeric") as f32; + assert!( + (fa - fb).abs() <= 1e-4, + "centroid mismatch: {} vs {}", + fa, + fb + ); + } + } + + // Partitions sizes. + let parts_a = idx_a["partitions"] + .as_array() + .expect("partitions should be an array"); + let parts_b = idx_b["partitions"] + .as_array() + .expect("partitions should be an array"); + assert_eq!(parts_a.len(), parts_b.len(), "num partitions mismatch"); + let sizes_a: Vec = parts_a + .iter() + .map(|p| p["size"].as_u64().expect("partition size")) + .collect(); + let sizes_b: Vec = parts_b + .iter() + .map(|p| p["size"].as_u64().expect("partition size")) + .collect(); + assert_eq!(sizes_a, sizes_b, "partition sizes mismatch"); + } + + #[tokio::test] + async fn test_ivfpq_two_fragments_single_vs_split_layout_and_rowids() { + const INDEX_NAME: &str = "vector_idx"; + + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + + // Generate the data once, then write it twice to two independent dataset URIs. + let (schema, batches) = make_two_fragment_batches(); + + let ds_single_uri = format!("{}/single", base_uri); + let ds_split_uri = format!("{}/split", base_uri); + + let mut ds_single = + write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; + let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; + + // Ensure we have at least 2 fragments. + let fragments_single = ds_single.get_fragments(); + assert!( + fragments_single.len() >= 2, + "expected at least 2 fragments in ds_single, got {}", + fragments_single.len() + ); + let fragments_split = ds_split.get_fragments(); + assert!( + fragments_split.len() >= 2, + "expected at least 2 fragments in ds_split, got {}", + fragments_split.len() + ); + + // Pretrain global IVF centroids and PQ codebook. + let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; + + // Build single index using two fragments in one distributed build. + let group_single = vec![ + fragments_single[0].id() as u32, + fragments_single[1].id() as u32, + ]; + build_ivfpq_for_fragment_groups( + &mut ds_single, + vec![group_single], + &ivf_params, + &pq_params, + INDEX_NAME, + ) + .await; + + // Build split index: one fragment per distributed build, then merge. + let group0 = vec![fragments_split[0].id() as u32]; + let group1 = vec![fragments_split[1].id() as u32]; + build_ivfpq_for_fragment_groups( + &mut ds_split, + vec![group0, group1], + &ivf_params, + &pq_params, + INDEX_NAME, + ) + .await; + + // Compare IVF layout via index statistics. + let stats_single_json = ds_single.index_statistics(INDEX_NAME).await.unwrap(); + let stats_split_json = ds_split.index_statistics(INDEX_NAME).await.unwrap(); + let stats_single: serde_json::Value = serde_json::from_str(&stats_single_json).unwrap(); + let stats_split: serde_json::Value = serde_json::from_str(&stats_split_json).unwrap(); + assert_ivf_layout_equal(&stats_single, &stats_split); + + // Compare row id sets per partition. + let ctx_single = load_vector_index_context(&ds_single, "vector", INDEX_NAME).await; + let ctx_split = load_vector_index_context(&ds_split, "vector", INDEX_NAME).await; + + let ivf_single = ctx_single.ivf(); + let ivf_split = ctx_split.ivf(); + let total_partitions = ivf_single.total_partitions(); + assert_eq!(total_partitions, ivf_split.total_partitions()); + + for part_id in 0..total_partitions { + let row_ids_single = load_partition_row_ids(ivf_single, part_id).await; + let row_ids_split = load_partition_row_ids(ivf_split, part_id).await; + let set_single: HashSet = row_ids_single.into_iter().collect(); + let set_split: HashSet = row_ids_split.into_iter().collect(); + assert_eq!( + set_single, set_split, + "row id set mismatch for partition {}", + part_id + ); + } + + // Compare Top-K row ids on a deterministic set of queries. + const K: usize = 10; + const NUM_QUERIES: usize = 10; + + async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { + let mut ids_per_query = Vec::with_capacity(queries.len()); + for q in queries { + let result = ds + .scan() + .with_row_id() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), K) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let row_ids = result[ROW_ID] + .as_primitive::() + .values() + .iter() + .copied() + .collect::>(); + ids_per_query.push(row_ids); + } + ids_per_query + } + + // Collect a deterministic query set from ds_single. + let query_batch = ds_single + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(NUM_QUERIES as i64), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = query_batch["vector"].as_fixed_size_list(); + let queries: Vec> = (0..vectors.len()) + .map(|i| vectors.value(i) as Arc) + .collect(); + + let ids_single = collect_row_ids(&ds_single, &queries).await; + let ids_split = collect_row_ids(&ds_split, &queries).await; + + assert_eq!( + ids_single, ids_split, + "single vs split index returned different Top-K row ids", + ); + } + async fn test_index( params: VectorIndexParams, nlist: usize, From bd24884269998580cb366432aba8c481258f80f9 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 11:33:22 +0800 Subject: [PATCH 02/10] fix code style issue --- rust/lance/src/index/create.rs | 3 +-- rust/lance/src/index/vector/ivf/io.rs | 2 +- rust/lance/src/index/vector/ivf/v2.rs | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index c854ef10c07..4c6be8c563a 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -313,8 +313,7 @@ impl<'a> CreateIndexBuilder<'a> { if train { let is_pq = matches!(vec_params.index_type(), IndexType::IvfPq); - let is_distributed = - self.fragments.as_ref().map_or(false, |ids| !ids.is_empty()); + let is_distributed = self.fragments.as_ref().is_some_and(|ids| !ids.is_empty()); if is_pq && is_distributed { // IVF_PQ distributed: keep logic isolated in builder layer and diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index de938e67bff..dc06c935521 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -203,7 +203,7 @@ pub(super) async fn write_pq_partitions( if let Some(pq_code) = pq_index.code.as_ref() { let row_ids = pq_index.row_ids.as_ref().unwrap(); let num_vectors = row_ids.len(); - if num_vectors == 0 || pq_code.len() == 0 { + if num_vectors == 0 || pq_code.is_empty() { continue; } if pq_code.len() % num_vectors != 0 { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index fe2ab314620..8faec1116b4 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -611,7 +611,7 @@ pub type IvfHnswPqIndex = IVFIndex; #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap, HashSet}; + use std::collections::HashSet; use std::{ops::Range, sync::Arc}; use all_asserts::{assert_ge, assert_le, assert_lt}; @@ -642,7 +642,6 @@ mod tests { Dataset, }; use lance_core::cache::LanceCache; - use lance_core::utils::address::RowAddress; use lance_core::utils::tempfile::TempStrDir; use lance_core::{Result, ROW_ID}; use lance_encoding::decoder::DecoderPlugins; @@ -653,10 +652,10 @@ mod tests { use lance_index::vector::pq::PQBuildParams; use lance_index::vector::quantizer::QuantizerMetadata; use lance_index::vector::sq::builder::SQBuildParams; + use lance_index::vector::DIST_COL; use lance_index::vector::{ pq::storage::ProductQuantizationMetadata, storage::STORAGE_METADATA_KEY, }; - use lance_index::vector::{DIST_COL, PQ_CODE_COLUMN}; use lance_index::{metrics::NoOpMetricsCollector, INDEX_AUXILIARY_FILE_NAME}; use lance_index::{optimize::OptimizeOptions, scalar::IndexReader}; use lance_index::{scalar::IndexWriter, vector::hnsw::builder::HnswBuildParams}; From 331916a387202e9d130b5095723f34f38a165336 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 12:22:48 +0800 Subject: [PATCH 03/10] fix test issue --- rust/lance/src/index/vector/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 8ea5bfc2502..444df10563a 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -1128,7 +1128,7 @@ impl IvfIndexBuilder let mut metadata = quantizer.metadata(Some(QuantizationMetadata { codebook_position: Some(0), codebook: None, - transposed: !is_pq, + transposed: true, })); if let Some(extra_metadata) = metadata.extra_metadata()? { let idx = storage_writer.add_global_buffer(extra_metadata).await?; From a51208666d474c092cafcafc67729359ff9a3dc5 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 14:02:39 +0800 Subject: [PATCH 04/10] fix test issue --- python/python/tests/test_vector_index.py | 2 +- rust/lance/src/index/vector/builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 71850135ae3..c0f01e41dea 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -959,7 +959,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path): "metric_type": "l2", "nbits": 8, "num_sub_vectors": 8, - "transposed": True, + "transposed": False, }, "index_file_version": IndexFileVersion.V3, } diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 444df10563a..8ea5bfc2502 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -1128,7 +1128,7 @@ impl IvfIndexBuilder let mut metadata = quantizer.metadata(Some(QuantizationMetadata { codebook_position: Some(0), codebook: None, - transposed: true, + transposed: !is_pq, })); if let Some(extra_metadata) = metadata.extra_metadata()? { let idx = storage_writer.add_global_buffer(extra_metadata).await?; From 5d1f765c17ea55d080f6f9b94234573fa4140789 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 16:53:57 +0800 Subject: [PATCH 05/10] fix test issue --- rust/lance/src/index/create.rs | 20 +++--------- rust/lance/src/index/vector.rs | 47 --------------------------- rust/lance/src/index/vector/ivf/v2.rs | 2 +- 3 files changed, 5 insertions(+), 64 deletions(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 4c6be8c563a..d6f0ecbd263 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -9,8 +9,8 @@ use crate::{ index::{ scalar::build_scalar_index, vector::{ - build_distributed_vector_index, build_empty_vector_index, build_ivf_pq_distributed, - build_ivf_pq_single, build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX, + build_distributed_vector_index, build_empty_vector_index, + build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX, }, vector_index_details, DatasetIndexExt, DatasetIndexInternalExt, }, @@ -319,7 +319,7 @@ impl<'a> CreateIndexBuilder<'a> { // IVF_PQ distributed: keep logic isolated in builder layer and // delegate to a dedicated helper. This avoids touching the // generic IVF/RQ implementations in lance-index. - Box::pin(build_ivf_pq_distributed( + Box::pin(build_distributed_vector_index( self.dataset, column, &index_name, @@ -329,18 +329,6 @@ impl<'a> CreateIndexBuilder<'a> { self.fragments.as_ref().unwrap(), )) .await?; - } else if is_pq { - // IVF_PQ single-machine build: keep behavior identical to upstream - // by delegating to the standard vector index builder. - Box::pin(build_ivf_pq_single( - self.dataset, - column, - &index_name, - &index_id.to_string(), - vec_params, - fri, - )) - .await?; } else if self.fragments.is_some() { // Non-PQ distributed indexing: use the generic distributed builder. Box::pin(build_distributed_vector_index( @@ -354,7 +342,7 @@ impl<'a> CreateIndexBuilder<'a> { )) .await?; } else { - // Standard full dataset indexing. + // Standard full dataset indexing Box::pin(build_vector_index( self.dataset, column, diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 09ce4f07b4f..4e7316722b7 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -683,53 +683,6 @@ is not supported in distributed mode; skipping this shard", Ok(()) } -/// Build a single-machine IVF_PQ index. -/// -/// This is a thin wrapper around `build_vector_index` that is only used -/// for clarity at the builder layer so that the single-machine PQ build -/// logic stays identical to the upstream path. -#[instrument(level = "debug", skip(dataset))] -pub(crate) async fn build_ivf_pq_single( - dataset: &Dataset, - column: &str, - name: &str, - uuid: &str, - params: &VectorIndexParams, - frag_reuse_index: Option>, -) -> Result<()> { - // Delegate to the generic vector index builder. This keeps the - // single-machine IVF_PQ path identical to the upstream implementation. - build_vector_index(dataset, column, name, uuid, params, frag_reuse_index).await -} - -/// Build a distributed IVF_PQ index for specific fragments. -/// -/// This is a thin wrapper around `build_distributed_vector_index`. It is -/// only used when `VectorIndexParams::index_type()` is `IndexType::IvfPq` -/// and a non-empty `fragment_ids` list is provided at the Dataset/index -/// builder layer. All core IVF/PQ logic remains inside lance-index. -#[instrument(level = "debug", skip(dataset))] -pub(crate) async fn build_ivf_pq_distributed( - dataset: &Dataset, - column: &str, - name: &str, - uuid: &str, - params: &VectorIndexParams, - frag_reuse_index: Option>, - fragment_ids: &[u32], -) -> Result<()> { - build_distributed_vector_index( - dataset, - column, - name, - uuid, - params, - frag_reuse_index, - fragment_ids, - ) - .await -} - /// Build a Vector Index #[instrument(level = "debug", skip(dataset))] pub(crate) async fn build_vector_index( diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 8faec1116b4..2cabe0b2fb4 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1488,7 +1488,7 @@ mod tests { } #[tokio::test] - async fn test_ivfpq_two_fragments_single_vs_split_layout_and_rowids() { + async fn test_ivfpq_recall_performance_on_two_frags_single_vs_split() { const INDEX_NAME: &str = "vector_idx"; let test_dir = TempStrDir::default(); From bbfb314180dfb4d7b5bacb9d25a07f19c9cf8503 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 17:26:14 +0800 Subject: [PATCH 06/10] fix test issue --- rust/lance/src/index/create.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index d6f0ecbd263..d9c81ced748 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -9,8 +9,8 @@ use crate::{ index::{ scalar::build_scalar_index, vector::{ - build_distributed_vector_index, build_empty_vector_index, - build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX, + build_distributed_vector_index, build_empty_vector_index, build_vector_index, + VectorIndexParams, LANCE_VECTOR_INDEX, }, vector_index_details, DatasetIndexExt, DatasetIndexInternalExt, }, From 75ad7c417b66714604c50881aea87fb788463043 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 19:27:19 +0800 Subject: [PATCH 07/10] fix test issue --- rust/lance/src/index/create.rs | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index d9c81ced748..4a9a03ea66c 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -312,24 +312,7 @@ impl<'a> CreateIndexBuilder<'a> { })?; if train { - let is_pq = matches!(vec_params.index_type(), IndexType::IvfPq); - let is_distributed = self.fragments.as_ref().is_some_and(|ids| !ids.is_empty()); - - if is_pq && is_distributed { - // IVF_PQ distributed: keep logic isolated in builder layer and - // delegate to a dedicated helper. This avoids touching the - // generic IVF/RQ implementations in lance-index. - Box::pin(build_distributed_vector_index( - self.dataset, - column, - &index_name, - &index_id.to_string(), - vec_params, - fri, - self.fragments.as_ref().unwrap(), - )) - .await?; - } else if self.fragments.is_some() { + if self.fragments.is_some() { // Non-PQ distributed indexing: use the generic distributed builder. Box::pin(build_distributed_vector_index( self.dataset, From ff77fd2789ec179863d1f02bc25fe59852a785f5 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 28 Jan 2026 21:12:41 +0800 Subject: [PATCH 08/10] fix test issue --- rust/lance/src/index/create.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 4a9a03ea66c..d929dbffffd 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -312,8 +312,10 @@ impl<'a> CreateIndexBuilder<'a> { })?; if train { + // Check if this is distributed indexing (fragment-level) if self.fragments.is_some() { - // Non-PQ distributed indexing: use the generic distributed builder. + // For distributed indexing, build only on specified fragments + // This creates temporary index metadata without committing Box::pin(build_distributed_vector_index( self.dataset, column, From e3eb509f67da48ff8affdaf29159210a555a38bb Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 30 Jan 2026 12:13:09 +0800 Subject: [PATCH 09/10] address review comments --- python/python/tests/test_vector_index.py | 2 +- .../src/vector/distributed/index_merger.rs | 9 ++++- rust/lance/src/index/vector.rs | 6 +++ rust/lance/src/index/vector/builder.rs | 38 ++++++++++++++++++- 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index c0f01e41dea..71850135ae3 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -959,7 +959,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path): "metric_type": "l2", "nbits": 8, "num_sub_vectors": 8, - "transposed": False, + "transposed": True, }, "index_file_version": IndexFileVersion.V3, } diff --git a/rust/lance-index/src/vector/distributed/index_merger.rs b/rust/lance-index/src/vector/distributed/index_merger.rs index c5181b7f842..d91cdd79571 100755 --- a/rust/lance-index/src/vector/distributed/index_merger.rs +++ b/rust/lance-index/src/vector/distributed/index_merger.rs @@ -206,6 +206,7 @@ pub async fn init_writer_for_pq( FileWriterOptions::default(), )?; let mut pm_init = pm.clone(); + pm_init.transposed = true; let cb = pm_init.codebook.as_ref().ok_or_else(|| Error::Index { message: "PQ codebook missing".to_string(), location: snafu::location!(), @@ -777,16 +778,20 @@ pub async fn merge_partial_vector_auxiliary_files( if existing_pm.num_sub_vectors != pm.num_sub_vectors || existing_pm.nbits != pm.nbits || existing_pm.dimension != pm.dimension + || existing_pm.transposed != pm.transposed { return Err(Error::Index { message: format!( - "Distributed PQ merge: structural mismatch across shards; first(dim={}, m={}, nbits={}), current(dim={}, m={}, nbits={})", + "Distributed PQ merge: structural mismatch across shards; first(\ + dim={}, m={}, nbits={}, transposed={}), current(dim={}, m={}, nbits={}, transposed={})", existing_pm.dimension, existing_pm.num_sub_vectors, existing_pm.nbits, + existing_pm.transposed, pm.dimension, pm.num_sub_vectors, - pm.nbits + pm.nbits, + pm.transposed ), location: location!(), }); diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 4e7316722b7..1dd015f789b 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -513,6 +513,9 @@ please provide PQBuildParams.codebook for distributed indexing" )? .with_ivf(ivf_model) .with_quantizer(global_pq) + // For distributed shards, keep PQ codes in their original layout + // and transpose only after all shards are merged. + .with_transpose(false) .with_fragment_filter(fragment_filter) .build() .await?; @@ -615,6 +618,9 @@ please provide PQBuildParams.codebook for distributed indexing" )? .with_ivf(ivf_model) .with_quantizer(global_pq) + // For distributed shards, keep PQ codes in their original layout + // and transpose only after all shards are merged. + .with_transpose(false) .with_fragment_filter(fragment_filter) .build() .await?; diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 8ea5bfc2502..59300550608 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -148,6 +148,8 @@ pub struct IvfIndexBuilder { optimize_options: Option, // number of indices merged merged_num: usize, + // whether to transpose codes when building storage + transpose_codes: bool, } type BuildStream = @@ -189,6 +191,7 @@ impl IvfIndexBuilder fragment_filter: None, optimize_options: None, merged_num: 0, + transpose_codes: true, }) } @@ -255,6 +258,7 @@ impl IvfIndexBuilder fragment_filter: None, optimize_options: None, merged_num: 0, + transpose_codes: true, }) } @@ -354,6 +358,13 @@ impl IvfIndexBuilder self } + /// Control whether codes are transposed when building storage. + /// This mainly affects intermediate PQ/RQ storage when building distributed indices. + pub fn with_transpose(&mut self, transpose: bool) -> &mut Self { + self.transpose_codes = transpose; + self + } + #[instrument(name = "load_or_build_ivf", level = "debug", skip_all)] async fn load_or_build_ivf(&self) -> Result { match &self.ivf { @@ -1082,6 +1093,25 @@ impl IvfIndexBuilder // PQ code column moves together with ROW_ID. batch = stable_sort_batch_by_row_id(&batch)?; + // For PQ storages, optionally convert codes back to transposed layout + // in the unified auxiliary file. This keeps final PQ storage column-major + // when `transpose_pq_codes` is enabled. + if is_pq && self.transpose_codes && batch.column_by_name(PQ_CODE_COLUMN).is_some() { + let codes_fsl = batch + .column_by_name(PQ_CODE_COLUMN) + .unwrap() + .as_fixed_size_list(); + let num_rows = batch.num_rows(); + let bytes_per_code = codes_fsl.value_length() as usize; + let codes = codes_fsl.values().as_primitive::(); + let transposed_codes = transpose(codes, num_rows, bytes_per_code); + let transposed_fsl = Arc::new(FixedSizeListArray::try_new_from_values( + transposed_codes, + bytes_per_code as i32, + )?); + batch = batch.replace_column_by_name(PQ_CODE_COLUMN, transposed_fsl)?; + } + storage_writer.write_batch(&batch).await?; storage_ivf.add_partition(batch.num_rows() as u32); } @@ -1123,12 +1153,18 @@ impl IvfIndexBuilder .add_global_buffer(storage_ivf_pb.encode_to_vec().into()) .await?; storage_writer.add_schema_metadata(IVF_METADATA_KEY, ivf_buffer_pos.to_string()); + let quant_type = Q::quantization_type(); + let transposed = match quant_type { + QuantizationType::Product => self.transpose_codes, + QuantizationType::Rabit => true, + _ => false, + }; // For now, each partition's metadata is just the quantizer, // it's all the same for now, so we just take the first one let mut metadata = quantizer.metadata(Some(QuantizationMetadata { codebook_position: Some(0), codebook: None, - transposed: !is_pq, + transposed, })); if let Some(extra_metadata) = metadata.extra_metadata()? { let idx = storage_writer.add_global_buffer(extra_metadata).await?; From c3ef29b2001a3af84274aee80febbe8e97651c83 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 30 Jan 2026 21:19:55 +0800 Subject: [PATCH 10/10] address review comments --- .../src/vector/distributed/index_merger.rs | 168 ++++++++++++++---- 1 file changed, 135 insertions(+), 33 deletions(-) diff --git a/rust/lance-index/src/vector/distributed/index_merger.rs b/rust/lance-index/src/vector/distributed/index_merger.rs index d91cdd79571..dd604adb138 100755 --- a/rust/lance-index/src/vector/distributed/index_merger.rs +++ b/rust/lance-index/src/vector/distributed/index_merger.rs @@ -6,10 +6,12 @@ use crate::vector::shared::partition_merger::{ write_unified_ivf_and_index_metadata, SupportedIvfIndexType, }; -use arrow::datatypes::Float32Type; +use arrow::{compute::concat_batches, datatypes::Float32Type}; use arrow_array::cast::AsArray; -use arrow_array::{Array, FixedSizeListArray, UInt64Array}; +use arrow_array::types::UInt8Type; +use arrow_array::{Array, FixedSizeListArray, RecordBatch, UInt64Array}; use futures::StreamExt as _; +use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt}; use lance_core::utils::address::RowAddress; use lance_core::{Error, Result, ROW_ID_FIELD}; use snafu::location; @@ -19,7 +21,7 @@ use std::sync::Arc; use crate::pb; use crate::vector::flat::index::FlatMetadata; use crate::vector::ivf::storage::{IvfModel as IvfStorageModel, IVF_METADATA_KEY}; -use crate::vector::pq::storage::{ProductQuantizationMetadata, PQ_METADATA_KEY}; +use crate::vector::pq::storage::{transpose, ProductQuantizationMetadata, PQ_METADATA_KEY}; use crate::vector::quantizer::QuantizerMetadata; use crate::vector::sq::storage::{ScalarQuantizationMetadata, SQ_METADATA_KEY}; use crate::vector::storage::STORAGE_METADATA_KEY; @@ -206,7 +208,6 @@ pub async fn init_writer_for_pq( FileWriterOptions::default(), )?; let mut pm_init = pm.clone(); - pm_init.transposed = true; let cb = pm_init.codebook.as_ref().ok_or_else(|| Error::Index { message: "PQ codebook missing".to_string(), location: snafu::location!(), @@ -273,6 +274,54 @@ pub async fn write_partition_rows( Ok(()) } +/// Transpose the PQ code column for a batch and write it to the unified writer. +/// +/// This helper assumes `batch` contains a contiguous range of rows for a single +/// IVF partition. +async fn write_partition_rows_pq_transposed( + w: &mut FileWriter, + mut batch: RecordBatch, +) -> Result<()> { + let num_rows = batch.num_rows(); + if num_rows == 0 { + return Ok(()); + } + + let pq_col = batch + .column_by_name(PQ_CODE_COLUMN) + .ok_or_else(|| Error::Index { + message: format!("PQ column {} missing in auxiliary shard", PQ_CODE_COLUMN), + location: location!(), + })?; + let pq_fsl = pq_col + .as_fixed_size_list_opt() + .ok_or_else(|| Error::Index { + message: format!( + "PQ column {} is not a FixedSizeList in auxiliary shard, got {}", + PQ_CODE_COLUMN, + pq_col.data_type(), + ), + location: location!(), + })?; + let num_bytes = pq_fsl.value_length() as usize; + let values = pq_fsl.values().as_primitive::(); + let transposed_codes = transpose(values, num_rows, num_bytes); + let transposed_fsl = Arc::new(FixedSizeListArray::try_new_from_values( + transposed_codes, + num_bytes as i32, + )?); + batch = batch.replace_column_by_name(PQ_CODE_COLUMN, transposed_fsl)?; + + // Write in reasonably sized chunks to avoid huge batches. + let batch_size: usize = 10_240; + for offset in (0..num_rows).step_by(batch_size) { + let len = std::cmp::min(batch_size, num_rows - offset); + let slice = batch.slice(offset, len); + w.write_batch(&slice).await?; + } + Ok(()) +} + /// Detect and return supported index type from reader and schema. /// /// This is a lightweight wrapper around SupportedIndexType::detect to keep @@ -778,20 +827,16 @@ pub async fn merge_partial_vector_auxiliary_files( if existing_pm.num_sub_vectors != pm.num_sub_vectors || existing_pm.nbits != pm.nbits || existing_pm.dimension != pm.dimension - || existing_pm.transposed != pm.transposed { return Err(Error::Index { message: format!( - "Distributed PQ merge: structural mismatch across shards; first(\ - dim={}, m={}, nbits={}, transposed={}), current(dim={}, m={}, nbits={}, transposed={})", + "Distributed PQ merge: structural mismatch across shards; first(dim={}, m={}, nbits={}), current(dim={}, m={}, nbits={})", existing_pm.dimension, existing_pm.num_sub_vectors, existing_pm.nbits, - existing_pm.transposed, pm.dimension, pm.num_sub_vectors, - pm.nbits, - pm.transposed + pm.nbits ), location: location!(), }); @@ -822,7 +867,9 @@ pub async fn merge_partial_vector_auxiliary_files( pq_meta = Some(pm.clone()); } if v2w_opt.is_none() { - let w = init_writer_for_pq(object_store, &aux_out, dt, &pm).await?; + let mut pm_for_unified = pm.clone(); + pm_for_unified.transposed = true; + let w = init_writer_for_pq(object_store, &aux_out, dt, &pm_for_unified).await?; v2w_opt = Some(w); } } @@ -1028,7 +1075,9 @@ pub async fn merge_partial_vector_auxiliary_files( pq_meta = Some(pm.clone()); } if v2w_opt.is_none() { - let w = init_writer_for_pq(object_store, &aux_out, dt, &pm).await?; + let mut pm_for_unified = pm.clone(); + pm_for_unified.transposed = true; + let w = init_writer_for_pq(object_store, &aux_out, dt, &pm_for_unified).await?; v2w_opt = Some(w); } } @@ -1122,24 +1171,81 @@ pub async fn merge_partial_vector_auxiliary_files( message: "Missing IVF partition count".to_string(), location: location!(), })?; - for pid in 0..nlist { - for (path, lens, _) in shard_infos.iter() { - let part_len = lens[pid] as usize; - if part_len == 0 { - continue; + let idx_type_final = detected_index_type.ok_or_else(|| Error::Index { + message: "Unable to detect index type".to_string(), + location: location!(), + })?; + + match idx_type_final { + SupportedIvfIndexType::IvfPq | SupportedIvfIndexType::IvfHnswPq => { + // For PQ-backed indices, transpose PQ codes while merging partitions + // so that the unified file stores column-major PQ codes. + for pid in 0..nlist { + let total_len = accumulated_lengths[pid] as usize; + if total_len == 0 { + continue; + } + + let mut part_batches: Vec = Vec::new(); + for (path, lens, _) in shard_infos.iter() { + let part_len = lens[pid] as usize; + if part_len == 0 { + continue; + } + let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum(); + let fh = sched.open_file(path, &CachedFileSize::unknown()).await?; + let reader = V2Reader::try_open( + fh, + None, + Arc::default(), + &lance_core::cache::LanceCache::no_cache(), + V2ReaderOptions::default(), + ) + .await?; + let mut stream = reader.read_stream( + lance_io::ReadBatchParams::Range(offset..offset + part_len), + u32::MAX, + 4, + lance_encoding::decoder::FilterExpression::no_filter(), + )?; + while let Some(rb) = stream.next().await { + let rb = rb?; + part_batches.push(rb); + } + } + + if part_batches.is_empty() { + continue; + } + + let schema = part_batches[0].schema(); + let partition_batch = concat_batches(&schema, part_batches.iter())?; + if let Some(w) = v2w_opt.as_mut() { + write_partition_rows_pq_transposed(w, partition_batch).await?; + } } - let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum(); - let fh = sched.open_file(path, &CachedFileSize::unknown()).await?; - let reader = V2Reader::try_open( - fh, - None, - Arc::default(), - &lance_core::cache::LanceCache::no_cache(), - V2ReaderOptions::default(), - ) - .await?; - if let Some(w) = v2w_opt.as_mut() { - write_partition_rows(&reader, w, offset..offset + part_len).await?; + } + _ => { + for pid in 0..nlist { + for (path, lens, _) in shard_infos.iter() { + let part_len = lens[pid] as usize; + if part_len == 0 { + continue; + } + let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum(); + let fh = sched.open_file(path, &CachedFileSize::unknown()).await?; + let reader = V2Reader::try_open( + fh, + None, + Arc::default(), + &lance_core::cache::LanceCache::no_cache(), + V2ReaderOptions::default(), + ) + .await?; + if let Some(w) = v2w_opt.as_mut() { + write_partition_rows(&reader, w, offset..offset + part_len).await?; + } + } } } } @@ -1158,10 +1264,6 @@ pub async fn merge_partial_vector_auxiliary_files( message: "Distance type missing".to_string(), location: location!(), })?; - let idx_type_final = detected_index_type.ok_or_else(|| Error::Index { - message: "Unable to detect index type".to_string(), - location: location!(), - })?; write_unified_ivf_and_index_metadata(w, &ivf_model, dt2, idx_type_final).await?; w.finish().await?; } else {