diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index e6d0fb16c51..f99ae0587cd 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::collections::HashSet; +use std::future; use std::sync::Arc; use std::{collections::HashMap, pin::Pin}; @@ -24,7 +25,7 @@ use itertools::Itertools; use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt}; use lance_core::datatypes::Schema; use lance_core::utils::tempfile::TempStdDir; -use lance_core::utils::tokio::get_num_compute_intensive_cpus; +use lance_core::utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}; use lance_core::ROW_ID; use lance_core::{Error, Result, ROW_ID_FIELD}; use lance_file::writer::FileWriter; @@ -1308,35 +1309,62 @@ impl IvfIndexBuilder &mut assign_ops, )?; // assign the vectors in the reassigned partitions - for (i, idx) in reassign_part_ids.values().iter().enumerate() { - let part_idx = *idx as usize; - let Some((row_ids, vectors)) = self.load_partition_raw_vectors(part_idx).await? else { - // all vectors in this partition have been deleted - continue; - }; - - let d0 = - self.distance_type.arrow_batch_func()(&reassign_part_centroids.value(i), &vectors)?; - let d1 = self.distance_type.arrow_batch_func()(&c1, &vectors)?; - let d2 = self.distance_type.arrow_batch_func()(&c2, &vectors)?; - let d0 = d0.values(); - let d1 = d1.values(); - let d2 = d2.values(); - - self.assign_vectors::( - part_idx, - centroid1_part_idx, - centroid2_part_idx, - &row_ids, - &vectors, - d0, - d1, - d2, - &reassign_part_ids, - &reassign_part_centroids, - false, - &mut assign_ops, - )?; + let reassign_targets = reassign_part_ids + .values() + .iter() + .copied() + .enumerate() + .collect::>(); + if !reassign_targets.is_empty() { + let builder = self; + let distance_type = self.distance_type; + let reassign_part_ids_clone = reassign_part_ids.clone(); + let reassign_part_centroids_clone = reassign_part_centroids.clone(); + stream::iter( + reassign_targets + .into_iter() + .map(move |(candidate_idx, part_id)| { + let builder = builder; + let reassign_part_ids = reassign_part_ids_clone.clone(); + let reassign_part_centroids = reassign_part_centroids_clone.clone(); + let centroid1 = c1.clone(); + let centroid2 = c2.clone(); + async move { + let part_idx = part_id as usize; + let Some((row_ids, vectors)) = + builder.load_partition_raw_vectors(part_idx).await? + else { + // all vectors in this partition have been deleted + return Ok::, Error>(Vec::new()); + }; + let ops = spawn_cpu(move || { + Self::compute_reassign_assign_ops::( + distance_type, + part_idx, + candidate_idx, + centroid1_part_idx, + centroid2_part_idx, + &row_ids, + &vectors, + centroid1, + centroid2, + &reassign_part_ids, + &reassign_part_centroids, + ) + }) + .await?; + Ok(ops) + } + }), + ) + .buffered(get_num_compute_intensive_cpus()) + .try_for_each(|ops| { + for (target_idx, op) in ops { + assign_ops[target_idx].push(op); + } + future::ready(Ok(())) + }) + .await?; } let new_centroids = @@ -1713,6 +1741,39 @@ impl IvfIndexBuilder // the length must be `old_num_partitions + 1` deleted_original_partition: bool, assign_ops: &mut [Vec], + ) -> Result<()> { + Self::assign_vectors_impl::( + self.distance_type, + part_idx, + centroid1_part_idx, + centroid2_part_idx, + row_ids, + vectors, + d0, + d1, + d2, + reassign_part_ids, + reassign_part_centroids, + deleted_original_partition, + |idx, op| assign_ops[idx].push(op), + ) + } + + #[allow(clippy::too_many_arguments)] + fn assign_vectors_impl( + distance_type: DistanceType, + part_idx: usize, + centroid1_part_idx: usize, + centroid2_part_idx: usize, + row_ids: &UInt64Array, + vectors: &FixedSizeListArray, + d0: &[f32], + d1: &[f32], + d2: &[f32], + reassign_part_ids: &UInt32Array, + reassign_part_centroids: &FixedSizeListArray, + deleted_original_partition: bool, + mut sink: F, ) -> Result<()> { for (i, &row_id) in row_ids.values().iter().enumerate() { if d0[i] <= d1[i] && d0[i] <= d2[i] { @@ -1720,7 +1781,8 @@ impl IvfIndexBuilder // the original partition is not deleted, we just keep the vector in the original partition continue; } - match self.reassign_vectors( + match Self::reassign_vectors_impl( + distance_type, vectors.value(i).as_primitive::(), Some((d1[i], d2[i])), reassign_part_ids, @@ -1728,36 +1790,93 @@ impl IvfIndexBuilder )? { ReassignPartition::NewCentroid1 => { // replace the original partition with the first new one - assign_ops[centroid1_part_idx] - .push(AssignOp::Add((row_id, vectors.value(i)))); + sink( + centroid1_part_idx, + AssignOp::Add((row_id, vectors.value(i))), + ); } ReassignPartition::NewCentroid2 => { // append the new second one - assign_ops[centroid2_part_idx] - .push(AssignOp::Add((row_id, vectors.value(i)))); + sink( + centroid2_part_idx, + AssignOp::Add((row_id, vectors.value(i))), + ); } ReassignPartition::ReassignCandidate(idx) => { // replace the original partition with the reassigned one - assign_ops[idx as usize].push(AssignOp::Add((row_id, vectors.value(i)))); + sink(idx as usize, AssignOp::Add((row_id, vectors.value(i)))); } } } else { if !deleted_original_partition { // the original partition is not deleted, we need to remove the vector from the original partition - assign_ops[part_idx].push(AssignOp::Remove(row_id)); + sink(part_idx, AssignOp::Remove(row_id)); } if d1[i] <= d2[i] { // centroid 1 is the closest one - assign_ops[centroid1_part_idx].push(AssignOp::Add((row_id, vectors.value(i)))); + sink( + centroid1_part_idx, + AssignOp::Add((row_id, vectors.value(i))), + ); } else { // centroid 2 is the closest one - assign_ops[centroid2_part_idx].push(AssignOp::Add((row_id, vectors.value(i)))); + sink( + centroid2_part_idx, + AssignOp::Add((row_id, vectors.value(i))), + ); } } } Ok(()) } + #[allow(clippy::too_many_arguments)] + fn compute_reassign_assign_ops( + distance_type: DistanceType, + part_idx: usize, + candidate_idx: usize, + centroid1_part_idx: usize, + centroid2_part_idx: usize, + row_ids: &UInt64Array, + vectors: &FixedSizeListArray, + centroid1: ArrayRef, + centroid2: ArrayRef, + reassign_part_ids: &UInt32Array, + reassign_part_centroids: &FixedSizeListArray, + ) -> Result> + where + T::Native: Dot + L2 + Normalize, + PrimitiveArray: From>, + { + let d0 = distance_type.arrow_batch_func()( + reassign_part_centroids.value(candidate_idx).as_ref(), + vectors, + )?; + let d1 = distance_type.arrow_batch_func()(centroid1.as_ref(), vectors)?; + let d2 = distance_type.arrow_batch_func()(centroid2.as_ref(), vectors)?; + let d0 = d0.values(); + let d1 = d1.values(); + let d2 = d2.values(); + + let mut ops = Vec::new(); + Self::assign_vectors_impl::( + distance_type, + part_idx, + centroid1_part_idx, + centroid2_part_idx, + row_ids, + vectors, + d0, + d1, + d2, + reassign_part_ids, + reassign_part_centroids, + false, + |idx, op| ops.push((idx, op)), + )?; + Ok(ops) + } + // assign a vector to the closest partition among: // 1. the 2 new centroids // 2. the closest REASSIGN_RANGE partitions from the original centroid @@ -1769,7 +1888,23 @@ impl IvfIndexBuilder reassign_candidate_ids: &UInt32Array, reassign_candidate_centroids: &FixedSizeListArray, ) -> Result { - let dists = self.distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?; + Self::reassign_vectors_impl( + self.distance_type, + vector, + split_centroids_dists, + reassign_candidate_ids, + reassign_candidate_centroids, + ) + } + + fn reassign_vectors_impl( + distance_type: DistanceType, + vector: &PrimitiveArray, + split_centroids_dists: Option<(f32, f32)>, + reassign_candidate_ids: &UInt32Array, + reassign_candidate_centroids: &FixedSizeListArray, + ) -> Result { + let dists = distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?; let min_dist_idx = dists.values().iter().position_min_by(|a, b| a.total_cmp(b)); let min_dist = min_dist_idx .map(|idx| dists.value(idx)) @@ -1866,6 +2001,7 @@ pub(crate) fn index_type_string(sub_index: SubIndexType, quantizer: Quantization mod tests { use super::*; use arrow_array::Float32Array; + use lance_index::vector::flat::index::{FlatIndex, FlatQuantizer}; #[test] fn select_reassign_candidates_skips_deleted_partition() { @@ -1894,4 +2030,50 @@ mod tests { expected_centroid.as_primitive::().values() ); } + + #[test] + fn compute_reassign_assign_ops_moves_vectors_to_new_centroids() { + let row_ids = UInt64Array::from(vec![1_u64, 2_u64]); + let vectors = FixedSizeListArray::try_new_from_values( + Float32Array::from(vec![0.0_f32, 0.0, 10.0, 10.0]), + 2, + ) + .unwrap(); + let reassign_part_ids = UInt32Array::from(vec![0_u32]); + let reassign_part_centroids = + FixedSizeListArray::try_new_from_values(Float32Array::from(vec![9.0_f32, 9.0]), 2) + .unwrap(); + let centroid1: ArrayRef = Arc::new(Float32Array::from(vec![0.0_f32, 0.0])); + let centroid2: ArrayRef = Arc::new(Float32Array::from(vec![20.0_f32, 20.0])); + + let ops = IvfIndexBuilder::::compute_reassign_assign_ops::< + Float32Type, + >( + DistanceType::L2, + 0, + 0, + 1, + 2, + &row_ids, + &vectors, + centroid1, + centroid2, + &reassign_part_ids, + &reassign_part_centroids, + ) + .unwrap(); + + assert_eq!(ops.len(), 2); + assert!(matches!(ops[0], (0, AssignOp::Remove(1)))); + match &ops[1] { + (1, AssignOp::Add((row_id, vector))) => { + assert_eq!(*row_id, 1); + assert_eq!( + vector.as_primitive::().values(), + &[0.0_f32, 0.0] + ); + } + other => panic!("unexpected op: {:?}", other), + } + } }