diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index b986b1d6c20..f694810aec2 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -257,6 +257,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index { fn ivf_model(&self) -> &IvfModel; fn quantizer(&self) -> Quantizer; + fn partition_size(&self, part_id: usize) -> usize; /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType); diff --git a/rust/lance-index/src/vector/hnsw/index.rs b/rust/lance-index/src/vector/hnsw/index.rs index 755bb25a7ce..0155dbbba0d 100644 --- a/rust/lance-index/src/vector/hnsw/index.rs +++ b/rust/lance-index/src/vector/hnsw/index.rs @@ -331,6 +331,10 @@ impl VectorIndex for HNSWIndex { self.partition_storage.quantizer().clone() } + fn partition_size(&self, _: usize) -> usize { + unimplemented!("only for IVF") + } + fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { ( SubIndexType::Hnsw, diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index a7f0e0a1b0a..018254e12f1 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -27,8 +27,6 @@ use lance_io::{ utils::CachedFileSize, }; use object_store::path::Path; -use snafu::location; -use tokio::sync::Mutex; use crate::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN}; @@ -106,10 +104,6 @@ impl Shuffler for IvfShuffler { &self, data: Box, ) -> Result> { - if self.num_partitions == 1 { - return Ok(Box::new(SinglePartitionReader::new(data))); - } - let num_partitions = self.num_partitions; let mut partition_sizes = vec![0; num_partitions]; let schema = data.schema().without_column(PART_ID_COLUMN); @@ -287,46 +281,6 @@ impl ShuffleReader for IvfShufflerReader { } } -pub struct SinglePartitionReader { - data: Mutex>>, -} - -impl SinglePartitionReader { - pub fn new(data: Box) -> Self { - Self { - data: Mutex::new(Some(data)), - } - } -} - -#[async_trait::async_trait] -impl ShuffleReader for SinglePartitionReader { - async fn read_partition( - &self, - _partition_id: usize, - ) -> Result>> { - let mut data = self.data.lock().await; - match data.as_mut() { - Some(_) => Ok(data.take()), - None => Err(Error::Internal { - message: "the partition has been read and consumed".to_string(), - location: location!(), - }), - } - } - - fn partition_size(&self, _partition_id: usize) -> Result { - // we don't really care about the partition size - // it's used for determining the order of building the index and skipping empty partitions - // so we just return 1 here - Ok(1) - } - - fn total_loss(&self) -> Option { - None - } -} - pub struct EmptyReader; #[async_trait::async_trait] diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 9dc1159e35c..01bb481cb0c 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -711,7 +711,7 @@ impl IvfIndexBuilder // if no partitions to split, we just create a new delta index, // otherwise, we need to merge all existing indices and split large partitions. let reader = reader.clone(); - let (assign_batches, merge_indices) = + let (assign_batches, merge_indices, replaced_partition) = match Self::should_split(ivf, reader.as_ref(), &self.existing_indices)? { Some(partition) => { // Perform split and record the fact for downstream build/merge @@ -731,6 +731,7 @@ impl IvfIndexBuilder ( split_results.assign_batches, Arc::new(self.existing_indices.clone()), + Some(partition), ) } None => { @@ -752,7 +753,11 @@ impl IvfIndexBuilder [self.existing_indices.len().saturating_sub(num_to_merge)..] .to_vec(); - (vec![None; ivf.num_partitions()], Arc::new(indices_to_merge)) + ( + vec![None; ivf.num_partitions()], + Arc::new(indices_to_merge), + None, + ) } }; self.merged_num = merge_indices.len(); @@ -777,13 +782,18 @@ impl IvfIndexBuilder let sub_index_params = sub_index_params.clone(); let column = column.clone(); let frag_reuse_index = frag_reuse_index.clone(); + let skip_existing_batches = replaced_partition == Some(partition); async move { - let (mut batches, loss) = Self::take_partition_batches( - partition, - indices.as_ref(), - Some(reader.as_ref()), - ) - .await?; + let (mut batches, loss) = if skip_existing_batches { + (Vec::new(), 0.0) + } else { + Self::take_partition_batches( + partition, + indices.as_ref(), + Some(reader.as_ref()), + ) + .await? + }; if let Some((assign_batch, deleted_row_ids)) = assign_batch { if !deleted_row_ids.is_empty() { @@ -1163,7 +1173,7 @@ impl IvfIndexBuilder for partition in 0..ivf.num_partitions() { let mut num_rows = reader.partition_size(partition)?; for index in existing_indices.iter() { - num_rows += index.ivf_model().partition_size(partition); + num_rows += index.partition_size(partition); } if num_rows > max_partition_size && num_rows > MAX_PARTITION_SIZE_FACTOR * index_type.target_partition_size() @@ -1760,17 +1770,15 @@ impl IvfIndexBuilder reassign_candidate_centroids: &FixedSizeListArray, ) -> Result { let dists = self.distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?; - let min_dist_idx = dists - .values() - .iter() - .position_min_by(|a, b| a.total_cmp(b)) - .unwrap(); - let min_dist = dists.value(min_dist_idx); + let min_dist_idx = dists.values().iter().position_min_by(|a, b| a.total_cmp(b)); + let min_dist = min_dist_idx + .map(|idx| dists.value(idx)) + .unwrap_or(f32::INFINITY); match split_centroids_dists { Some((d1, d2)) => { if min_dist <= d1 && min_dist <= d2 { Ok(ReassignPartition::ReassignCandidate( - reassign_candidate_ids.value(min_dist_idx), + reassign_candidate_ids.value(min_dist_idx.unwrap()), )) } else if d1 <= d2 { Ok(ReassignPartition::NewCentroid1) @@ -1779,7 +1787,7 @@ impl IvfIndexBuilder } } None => Ok(ReassignPartition::ReassignCandidate( - reassign_candidate_ids.value(min_dist_idx), + reassign_candidate_ids.value(min_dist_idx.unwrap()), )), } } diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index 0ec68319121..6316e88d898 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -163,10 +163,15 @@ mod test { fn ivf_model(&self) -> &IvfModel { unimplemented!("only for IVF") } + fn quantizer(&self) -> Quantizer { unimplemented!("only for IVF") } + fn partition_size(&self, _: usize) -> usize { + unimplemented!("only for IVF") + } + /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { unimplemented!("only for IVF") diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index af7742cfef2..8a590ea8513 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1042,6 +1042,10 @@ impl VectorIndex for IVFIndex { unimplemented!("only for v2 IVFIndex") } + fn partition_size(&self, part_id: usize) -> usize { + self.ivf.partition_size(part_id) + } + /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { unimplemented!("only for v2 IVFIndex") diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 072935a30a1..667fedb5dac 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -590,6 +590,10 @@ impl VectorIndex for IVFInd self.storage.quantizer().unwrap() } + fn partition_size(&self, part_id: usize) -> usize { + self.storage.partition_size(part_id) + } + /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { (S::name().try_into().unwrap(), Q::quantization_type()) @@ -668,7 +672,6 @@ mod tests { const NUM_ROWS: usize = 512; const DIM: usize = 32; - const PARTITION_SPLIT_APPEND_ROWS: usize = 50_000; async fn generate_test_dataset( test_uri: &str, @@ -728,32 +731,6 @@ mod tests { vectors } - async fn append_identical_vectors(dataset: &mut Dataset, num_rows: usize, vector: &[f32]) { - assert_eq!( - vector.len(), - DIM, - "vector length ({}) must match DIM ({})", - vector.len(), - DIM - ); - let start_id = dataset.count_all_rows().await.unwrap() as u64; - let ids: ArrayRef = Arc::new(UInt64Array::from_iter_values( - start_id..start_id + num_rows as u64, - )); - let mut values = Vec::with_capacity(num_rows * DIM); - for _ in 0..num_rows { - values.extend_from_slice(vector); - } - let vectors: ArrayRef = Arc::new( - FixedSizeListArray::try_new_from_values(Float32Array::from(values), DIM as i32) - .unwrap(), - ); - let schema = Arc::new(Schema::from(dataset.schema())); - let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap(); - let batches = RecordBatchIterator::new(vec![Ok(batch)], schema); - dataset.append(batches, None).await.unwrap(); - } - fn generate_batch( num_rows: usize, start_id: Option, @@ -868,9 +845,9 @@ mod tests { test_uri: &str, params: VectorIndexParams, description: &str, - append_override: Option>, ) { const INDEX_NAME: &str = "vector_idx"; + const APPEND_ROWS: usize = 50_000; dataset .create_index( @@ -892,13 +869,8 @@ mod tests { initial_ctx.stats_json() ); - // Append additional data to trigger a split. - if let Some(vector) = append_override { - append_identical_vectors(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, &vector).await; - } else { - append_dataset::(&mut dataset, PARTITION_SPLIT_APPEND_ROWS, 0.0..0.05) - .await; - } + // Append tightly clustered vectors so data flows into the same partition. + append_dataset::(&mut dataset, APPEND_ROWS, 0.0..0.05).await; dataset .optimize_indices(&OptimizeOptions::new()) @@ -916,6 +888,174 @@ mod tests { ); } + async fn shrink_smallest_partition( + dataset: &mut Dataset, + index_name: &str, + expected_after_join: usize, + ) -> usize { + let index_ctx = load_vector_index_context(dataset, "vector", index_name).await; + let partitions = index_ctx.stats()["indices"][0]["partitions"] + .as_array() + .expect("partitions should be present"); + let (partition_idx, size) = partitions + .iter() + .enumerate() + .filter_map(|(idx, part)| part["size"].as_u64().map(|size| (idx, size))) + .min_by_key(|(_, size)| *size) + .expect("should have at least one partition"); + assert!( + size > 1, + "Partition {} must contain at least two rows to trigger join", + partition_idx + ); + + let row_ids = load_partition_row_ids(index_ctx.ivf(), partition_idx).await; + assert!( + row_ids.len() > 1, + "Partition {} should have removable rows", + partition_idx + ); + + let rows = dataset + .take_rows(&row_ids, dataset.schema().clone()) + .await + .unwrap(); + let ids = rows["id"].as_primitive::().values(); + + delete_ids(dataset, &ids[1..]).await; + compact_after_deletions(dataset).await; + + let post_ctx = load_vector_index_context(dataset, "vector", index_name).await; + assert_eq!( + post_ctx.num_partitions(), + expected_after_join, + "Expected partitions to decrease to {} after join, got stats: {}", + expected_after_join, + post_ctx.stats_json() + ); + + row_ids.len() - 1 + } + + async fn append_constant_vector(dataset: &mut Dataset, rows: usize, template: &[f32]) { + assert_eq!( + template.len(), + DIM, + "Template vector should have {} dimensions", + DIM + ); + + let start_id = dataset.count_all_rows().await.unwrap() as u64; + let ids = Arc::new(UInt64Array::from_iter_values( + start_id..start_id + rows as u64, + )); + let mut appended_values = Vec::with_capacity(rows * DIM); + for _ in 0..rows { + appended_values.extend_from_slice(template); + } + let vectors = Arc::new( + FixedSizeListArray::try_new_from_values( + Float32Array::from(appended_values), + DIM as i32, + ) + .unwrap(), + ); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new("vector", vectors.data_type().clone(), false), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![ids, vectors]).unwrap(); + let batches = RecordBatchIterator::new(vec![Ok(batch)], schema); + dataset.append(batches, None).await.unwrap(); + } + + #[allow(clippy::too_many_arguments)] + async fn append_and_verify_append_phase( + dataset: &mut Dataset, + index_name: &str, + template: &[f32], + rows_to_append: usize, + expected_partitions: usize, + expected_total_rows: usize, + expected_index_count: usize, + expect_split: bool, + ) { + append_constant_vector(dataset, rows_to_append, template).await; + dataset + .optimize_indices(&OptimizeOptions::new()) + .await + .unwrap(); + + let stats_json = dataset.index_statistics(index_name).await.unwrap(); + let stats: serde_json::Value = serde_json::from_str(&stats_json).unwrap(); + + let indices = stats["indices"] + .as_array() + .expect("indices array should exist"); + assert_eq!( + indices.len(), + expected_index_count, + "Expected {} index entries, got {}, stats: {}", + expected_index_count, + indices.len(), + stats + ); + assert_eq!( + stats["num_indices"].as_u64().unwrap() as usize, + expected_index_count, + "num_indices mismatch in stats" + ); + assert_eq!( + stats["num_indexed_rows"].as_u64().unwrap() as usize, + expected_total_rows, + "Total indexed rows mismatch after append" + ); + + let base_index = indices + .iter() + .max_by_key(|entry| entry["num_partitions"].as_u64().unwrap_or(0)) + .expect("at least one index entry should exist"); + assert_eq!( + base_index["num_partitions"].as_u64().unwrap() as usize, + expected_partitions, + "Partition count mismatch after append" + ); + + if expected_index_count == 1 { + let partitions = base_index["partitions"] + .as_array() + .expect("partitions should exist"); + assert_eq!( + partitions.len(), + expected_partitions, + "Expected {} partitions, found {}", + expected_partitions, + partitions.len() + ); + let partition_sizes: Vec = partitions + .iter() + .map(|part| part["size"].as_u64().unwrap() as usize) + .collect(); + let total_partition_rows: usize = partition_sizes.iter().sum(); + assert_eq!( + total_partition_rows, expected_total_rows, + "Partition sizes should sum to total rows: {:?}", + partition_sizes + ); + } else { + assert!( + !expect_split, + "Split should result in a single merged index" + ); + } + + assert_eq!( + dataset.count_all_rows().await.unwrap(), + expected_total_rows, + "Dataset row count mismatch after append" + ); + } + async fn load_partition_row_ids(index: &IvfPq, partition_idx: usize) -> Vec { index .storage @@ -2220,117 +2360,29 @@ mod tests { } #[tokio::test] - async fn test_partition_split_on_append() { - // This test verifies that when we append enough data to a partition - // such that it exceeds MAX_PARTITION_SIZE_FACTOR * target_partition_size, - // the partition will be split into 2 partitions. - - let test_dir = TempStrDir::default(); - let test_uri = test_dir.as_str(); - - // Build deterministic two-cluster data so centroids are predictable. - const ROWS_PER_CLUSTER: usize = 2_048; - let cluster_vectors = vec![ - { - let mut v = vec![0.0; DIM]; - v[0] = 1.0; - v - }, - { - let mut v = vec![0.0; DIM]; - v[1] = 1.0; - v - }, - ]; - let total_rows = ROWS_PER_CLUSTER * cluster_vectors.len(); - let mut ids = Vec::with_capacity(total_rows); - let mut vector_values = Vec::with_capacity(total_rows * DIM); - let mut current_id = 0u64; - for cluster_vector in &cluster_vectors { - for _ in 0..ROWS_PER_CLUSTER { - ids.push(current_id); - current_id += 1; - vector_values.extend_from_slice(cluster_vector); - } - } - - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt64, false), - Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - DIM as i32, - ), - false, - ), - ])); - let ids_array: ArrayRef = Arc::new(UInt64Array::from(ids)); - let vectors_array: ArrayRef = Arc::new( - FixedSizeListArray::try_new_from_values(Float32Array::from(vector_values), DIM as i32) - .unwrap(), - ); - let batch = RecordBatch::try_new(schema.clone(), vec![ids_array, vectors_array]).unwrap(); - let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let dataset = Dataset::write( - batches, - test_uri, - Some(WriteParams { - mode: crate::dataset::WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); + async fn test_spfresh_join_split() { + // Two join cycles followed by three append cycles: + // 1. Each deletion shrinks the smallest partition and verifies the partition count. + // 2. Append #1 (10k rows) creates a delta index without splitting. + // 3. Append #2 and #3 (40k rows each) trigger splits, forcing merges and validating partition sizes. - // Create an IVF-PQ index with 2 partitions - // For IvfPq, target_partition_size = 8192 - // Split triggers when partition_size > 4 * 8192 = 32,768 - let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::L2, 50); - verify_partition_split_after_append( - dataset, - test_uri, - params, - "scalar vector data", - Some(cluster_vectors[0].clone()), - ) - .await; - } - - #[tokio::test] - async fn test_join_partition_on_delete() { - // This test verifies that partition join works correctly when partitions become - // too small after deletions. - // - // The test uses deterministic data with predefined centroids to ensure: - // 1. Predictable partition sizes - // 2. Reliable triggering of partition join when size < threshold - // 3. Correct handling of small partitions - // - // Join threshold: MIN_PARTITION_SIZE_PERCENT * target_partition_size / 100 - // = 25 * 8192 / 100 = 2048 rows + const INDEX_NAME: &str = "vector_idx"; + const NLIST: usize = 3; + const FIRST_APPEND_ROWS: usize = 10_000; + const SECOND_APPEND_ROWS: usize = 30_000; + const THIRD_APPEND_ROWS: usize = 35_000; let test_dir = TempStrDir::default(); let test_uri = test_dir.as_str(); - // Create deterministic test data with 3 clusters - // Cluster 0: 100 rows (will be deleted to trigger join) - // Cluster 1: 3000 rows - // Cluster 2: 3000 rows - let nlist = 3; - let cluster_sizes = [100, 3000, 3000]; + // Two small clusters (for joins) and two large clusters (for splits). + let cluster_sizes = [100, 4_000, 4_000]; let total_rows: usize = cluster_sizes.iter().sum(); - // Generate 3 well-separated centroids in DIM-dimensional space let mut centroid_values = Vec::new(); - for i in 0..nlist { + for i in 0..NLIST { for j in 0..DIM { - // Place centroids far apart to ensure clear cluster separation - centroid_values.push(if j == 0 { - (i as f32) * 10.0 // Separate along first dimension - } else { - 0.0 - }); + centroid_values.push(if j == 0 { (i as f32) * 10.0 } else { 0.0 }); } } let centroids = Arc::new( @@ -2341,21 +2393,17 @@ mod tests { .unwrap(), ); - // Generate vectors clustered around each centroid let mut ids = Vec::new(); let mut vector_values = Vec::new(); let mut current_id = 0u64; - for (cluster_idx, &size) in cluster_sizes.iter().enumerate() { let centroid_base = (cluster_idx as f32) * 10.0; for _ in 0..size { ids.push(current_id); current_id += 1; - - // Generate vector close to centroid (within 0.5 radius) for j in 0..DIM { vector_values.push(if j == 0 { - centroid_base + (current_id % 100) as f32 * 0.005 // Small variation + centroid_base + (current_id % 100) as f32 * 0.005 } else { (current_id % 50) as f32 * 0.01 }); @@ -2368,12 +2416,10 @@ mod tests { FixedSizeListArray::try_new_from_values(Float32Array::from(vector_values), DIM as i32) .unwrap(), ); - let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::UInt64, false), Field::new("vector", vectors.data_type().clone(), false), ])); - let batch = RecordBatch::try_new(schema.clone(), vec![ids_array, vectors]).unwrap(); let batches = RecordBatchIterator::new(vec![Ok(batch)], schema); @@ -2388,142 +2434,102 @@ mod tests { .await .unwrap(); - // Create IVF-PQ index with predefined centroids - let ivf_params = IvfBuildParams::try_with_centroids(nlist, centroids).unwrap(); + let ivf_params = IvfBuildParams::try_with_centroids(NLIST, centroids).unwrap(); let params = VectorIndexParams::with_ivf_pq_params( DistanceType::L2, ivf_params, PQBuildParams::default(), ); - dataset .create_index( &["vector"], IndexType::Vector, - Some("vector_idx".to_string()), + Some(INDEX_NAME.to_string()), ¶ms, true, ) .await .unwrap(); - // Verify initial partition count - let index_ctx = load_vector_index_context(&dataset, "vector", "vector_idx").await; - assert_eq!(index_ctx.num_partitions(), nlist); - - // Verify partition sizes match expected (approximately, due to potential edge cases) - let partitions = index_ctx.stats()["indices"][0]["partitions"] - .as_array() - .unwrap(); - let partition_0_size = partitions[0]["size"].as_u64().unwrap(); - assert!( - (50..=150).contains(&partition_0_size), - "Partition 0 should have ~100 rows, got {}", - partition_0_size - ); - - // Delete most rows from partition 0, keeping only 1 row - // This should bring it well below the 2048 threshold - let row_ids = load_partition_row_ids(index_ctx.ivf(), 0).await; - - assert!(!row_ids.is_empty(), "Partition 0 should not be empty"); - - let res = dataset - .take_rows(&row_ids, dataset.schema().clone()) + // Template vector from the first large cluster for deterministic appends. + let template_id = (cluster_sizes[0] + cluster_sizes[1]) as u64; + let template_batch = dataset + .take_rows(&[template_id], dataset.schema().clone()) .await .unwrap(); - let ids = res["id"].as_primitive::().values(); - let retained_id = ids[0]; // Save the ID of the row we're keeping - let first_vector = res["vector"].as_fixed_size_list().value(0); - - // Delete all but the first row in partition 0 - delete_ids(&mut dataset, &ids[1..]).await; - - // Compact to trigger partition join - compact_after_deletions(&mut dataset).await; - - // Verify partition was joined (should have nlist-1 partitions now) - let final_ctx = load_vector_index_context(&dataset, "vector", "vector_idx").await; - let final_num_partitions = final_ctx.num_partitions(); - assert_eq!( - final_num_partitions, - nlist - 1, - "Expected partition join to decrease partitions from {} to {}, got stats: {}", - nlist, - nlist - 1, - final_ctx.stats_json() - ); - - // Verify that vector search still works after partition join - let result = dataset - .scan() - .nearest("vector", &first_vector, 10) - .unwrap() - .try_into_batch() - .await - .unwrap(); - assert!( - result.num_rows() > 0, - "Search should return results after partition join" - ); - - // Verify the retained row still exists in the dataset by filtering by ID - let retained_row_result = dataset - .scan() - .filter(&format!("id = {}", retained_id)) - .unwrap() - .try_into_batch() - .await - .unwrap(); - assert_eq!( - retained_row_result.num_rows(), - 1, - "The retained row (id={}) should still exist in the dataset", - retained_id - ); - assert_eq!( - retained_row_result["id"] - .as_primitive::() - .value(0), - retained_id, - "The filtered result should match the retained ID" - ); - - // Verify total row count - let remaining_rows = dataset.count_all_rows().await.unwrap(); - let expected_rows = total_rows - cluster_sizes[0] + 1; // Deleted all but 1 from cluster 0 + let template_values = template_batch["vector"] + .as_fixed_size_list() + .value(0) + .as_primitive::() + .values() + .to_vec(); assert_eq!( - remaining_rows, expected_rows, - "Should have {} rows remaining", - expected_rows + template_values.len(), + DIM, + "Template vector should match DIM" ); - // Verify the index is still functional by searching for different vectors - for cluster_idx in 1..nlist { - let test_vector_values: Vec = (0..DIM) - .map(|j| { - if j == 0 { - (cluster_idx as f32) * 10.0 - } else { - 0.0 - } - }) - .collect(); - let test_vector = Float32Array::from(test_vector_values); + let mut expected_partitions = NLIST; + let mut expected_rows = total_rows; - let result = dataset - .scan() - .nearest("vector", &test_vector, 5) - .unwrap() - .try_into_batch() - .await - .unwrap(); - assert!( - result.num_rows() > 0, - "Search for cluster {} should return results", - cluster_idx + // Two join cycles. + for expected_after in [NLIST - 1, NLIST - 2] { + let deleted_rows = + shrink_smallest_partition(&mut dataset, INDEX_NAME, expected_after).await; + expected_rows -= deleted_rows; + assert_eq!( + dataset.count_all_rows().await.unwrap(), + expected_rows, + "Row count mismatch after join" ); + expected_partitions = expected_after; } + + // Append #1: no split, expect a delta index. + let rows = FIRST_APPEND_ROWS; + append_and_verify_append_phase( + &mut dataset, + INDEX_NAME, + &template_values, + rows, + expected_partitions, + expected_rows + rows, + 2, + false, + ) + .await; + expected_rows += rows; + + // Append #2: triggers split and merge. + expected_partitions += 1; + let rows = SECOND_APPEND_ROWS; + append_and_verify_append_phase( + &mut dataset, + INDEX_NAME, + &template_values, + rows, + expected_partitions, + expected_rows + rows, + 1, + true, + ) + .await; + expected_rows += rows; + + // Append #3: triggers another split, remains a single merged index. + expected_partitions += 1; + let rows = THIRD_APPEND_ROWS; + append_and_verify_append_phase( + &mut dataset, + INDEX_NAME, + &template_values, + rows, + expected_partitions, + expected_rows + rows, + 1, + true, + ) + .await; } #[tokio::test] @@ -2542,8 +2548,7 @@ mod tests { // For IvfPq, target_partition_size = 8192 // Split triggers when partition_size > 4 * 8192 = 32,768 let params = VectorIndexParams::ivf_pq(2, 8, DIM / 8, DistanceType::Cosine, 50); - verify_partition_split_after_append(dataset, test_uri, params, "multivector data", None) - .await; + verify_partition_split_after_append(dataset, test_uri, params, "multivector data").await; } #[tokio::test] diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index 3b8de14fbbd..c6167876455 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -472,10 +472,15 @@ impl VectorIndex for PQIndex { fn ivf_model(&self) -> &IvfModel { unimplemented!("only for IVF") } + fn quantizer(&self) -> Quantizer { unimplemented!("only for IVF") } + fn partition_size(&self, _: usize) -> usize { + unimplemented!("only for IVF") + } + /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { (SubIndexType::Flat, QuantizationType::Product) diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index 96fe8cbf8a6..bc3d3e6858d 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -196,10 +196,15 @@ mod test { fn ivf_model(&self) -> &IvfModel { unimplemented!() } + fn quantizer(&self) -> Quantizer { unimplemented!() } + fn partition_size(&self, _: usize) -> usize { + unimplemented!() + } + /// the index type of this vector index. fn sub_index_type(&self) -> (SubIndexType, QuantizationType) { unimplemented!()