Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 222 additions & 40 deletions rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashSet;
use std::future;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};

Expand All @@ -24,7 +25,7 @@ use itertools::Itertools;
use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt};
use lance_core::datatypes::Schema;
use lance_core::utils::tempfile::TempStdDir;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu};
use lance_core::ROW_ID;
use lance_core::{Error, Result, ROW_ID_FIELD};
use lance_file::writer::FileWriter;
Expand Down Expand Up @@ -1308,35 +1309,62 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
&mut assign_ops,
)?;
// assign the vectors in the reassigned partitions
for (i, idx) in reassign_part_ids.values().iter().enumerate() {
let part_idx = *idx as usize;
let Some((row_ids, vectors)) = self.load_partition_raw_vectors(part_idx).await? else {
// all vectors in this partition have been deleted
continue;
};

let d0 =
self.distance_type.arrow_batch_func()(&reassign_part_centroids.value(i), &vectors)?;
let d1 = self.distance_type.arrow_batch_func()(&c1, &vectors)?;
let d2 = self.distance_type.arrow_batch_func()(&c2, &vectors)?;
let d0 = d0.values();
let d1 = d1.values();
let d2 = d2.values();

self.assign_vectors::<T>(
part_idx,
centroid1_part_idx,
centroid2_part_idx,
&row_ids,
&vectors,
d0,
d1,
d2,
&reassign_part_ids,
&reassign_part_centroids,
false,
&mut assign_ops,
)?;
let reassign_targets = reassign_part_ids
.values()
.iter()
.copied()
.enumerate()
.collect::<Vec<_>>();
if !reassign_targets.is_empty() {
let builder = self;
let distance_type = self.distance_type;
let reassign_part_ids_clone = reassign_part_ids.clone();
let reassign_part_centroids_clone = reassign_part_centroids.clone();
stream::iter(
reassign_targets
.into_iter()
.map(move |(candidate_idx, part_id)| {
let builder = builder;
let reassign_part_ids = reassign_part_ids_clone.clone();
let reassign_part_centroids = reassign_part_centroids_clone.clone();
let centroid1 = c1.clone();
let centroid2 = c2.clone();
async move {
let part_idx = part_id as usize;
let Some((row_ids, vectors)) =
builder.load_partition_raw_vectors(part_idx).await?
else {
// all vectors in this partition have been deleted
return Ok::<Vec<(usize, AssignOp)>, Error>(Vec::new());
};
let ops = spawn_cpu(move || {
Self::compute_reassign_assign_ops::<T>(
distance_type,
part_idx,
candidate_idx,
centroid1_part_idx,
centroid2_part_idx,
&row_ids,
&vectors,
centroid1,
centroid2,
&reassign_part_ids,
&reassign_part_centroids,
)
})
.await?;
Ok(ops)
}
}),
)
.buffered(get_num_compute_intensive_cpus())
.try_for_each(|ops| {
for (target_idx, op) in ops {
assign_ops[target_idx].push(op);
}
future::ready(Ok(()))
})
.await?;
}

let new_centroids =
Expand Down Expand Up @@ -1713,51 +1741,142 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
// the length must be `old_num_partitions + 1`
deleted_original_partition: bool,
assign_ops: &mut [Vec<AssignOp>],
) -> Result<()> {
Self::assign_vectors_impl::<T, _>(
self.distance_type,
part_idx,
centroid1_part_idx,
centroid2_part_idx,
row_ids,
vectors,
d0,
d1,
d2,
reassign_part_ids,
reassign_part_centroids,
deleted_original_partition,
|idx, op| assign_ops[idx].push(op),
)
}

#[allow(clippy::too_many_arguments)]
fn assign_vectors_impl<T: ArrowPrimitiveType, F: FnMut(usize, AssignOp)>(
distance_type: DistanceType,
part_idx: usize,
centroid1_part_idx: usize,
centroid2_part_idx: usize,
row_ids: &UInt64Array,
vectors: &FixedSizeListArray,
d0: &[f32],
d1: &[f32],
d2: &[f32],
reassign_part_ids: &UInt32Array,
reassign_part_centroids: &FixedSizeListArray,
deleted_original_partition: bool,
mut sink: F,
) -> Result<()> {
for (i, &row_id) in row_ids.values().iter().enumerate() {
if d0[i] <= d1[i] && d0[i] <= d2[i] {
if !deleted_original_partition {
// the original partition is not deleted, we just keep the vector in the original partition
continue;
}
match self.reassign_vectors(
match Self::reassign_vectors_impl(
distance_type,
vectors.value(i).as_primitive::<T>(),
Some((d1[i], d2[i])),
reassign_part_ids,
reassign_part_centroids,
)? {
ReassignPartition::NewCentroid1 => {
// replace the original partition with the first new one
assign_ops[centroid1_part_idx]
.push(AssignOp::Add((row_id, vectors.value(i))));
sink(
centroid1_part_idx,
AssignOp::Add((row_id, vectors.value(i))),
);
}
ReassignPartition::NewCentroid2 => {
// append the new second one
assign_ops[centroid2_part_idx]
.push(AssignOp::Add((row_id, vectors.value(i))));
sink(
centroid2_part_idx,
AssignOp::Add((row_id, vectors.value(i))),
);
}
ReassignPartition::ReassignCandidate(idx) => {
// replace the original partition with the reassigned one
assign_ops[idx as usize].push(AssignOp::Add((row_id, vectors.value(i))));
sink(idx as usize, AssignOp::Add((row_id, vectors.value(i))));
}
}
} else {
if !deleted_original_partition {
// the original partition is not deleted, we need to remove the vector from the original partition
assign_ops[part_idx].push(AssignOp::Remove(row_id));
sink(part_idx, AssignOp::Remove(row_id));
}
if d1[i] <= d2[i] {
// centroid 1 is the closest one
assign_ops[centroid1_part_idx].push(AssignOp::Add((row_id, vectors.value(i))));
sink(
centroid1_part_idx,
AssignOp::Add((row_id, vectors.value(i))),
);
} else {
// centroid 2 is the closest one
assign_ops[centroid2_part_idx].push(AssignOp::Add((row_id, vectors.value(i))));
sink(
centroid2_part_idx,
AssignOp::Add((row_id, vectors.value(i))),
);
}
}
}
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn compute_reassign_assign_ops<T: ArrowPrimitiveType>(
distance_type: DistanceType,
part_idx: usize,
candidate_idx: usize,
centroid1_part_idx: usize,
centroid2_part_idx: usize,
row_ids: &UInt64Array,
vectors: &FixedSizeListArray,
centroid1: ArrayRef,
centroid2: ArrayRef,
reassign_part_ids: &UInt32Array,
reassign_part_centroids: &FixedSizeListArray,
) -> Result<Vec<(usize, AssignOp)>>
where
T::Native: Dot + L2 + Normalize,
PrimitiveArray<T>: From<Vec<T::Native>>,
{
let d0 = distance_type.arrow_batch_func()(
reassign_part_centroids.value(candidate_idx).as_ref(),
vectors,
)?;
let d1 = distance_type.arrow_batch_func()(centroid1.as_ref(), vectors)?;
let d2 = distance_type.arrow_batch_func()(centroid2.as_ref(), vectors)?;
let d0 = d0.values();
let d1 = d1.values();
let d2 = d2.values();

let mut ops = Vec::new();
Self::assign_vectors_impl::<T, _>(
distance_type,
part_idx,
centroid1_part_idx,
centroid2_part_idx,
row_ids,
vectors,
d0,
d1,
d2,
reassign_part_ids,
reassign_part_centroids,
false,
|idx, op| ops.push((idx, op)),
)?;
Ok(ops)
}

// assign a vector to the closest partition among:
// 1. the 2 new centroids
// 2. the closest REASSIGN_RANGE partitions from the original centroid
Expand All @@ -1769,7 +1888,23 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
reassign_candidate_ids: &UInt32Array,
reassign_candidate_centroids: &FixedSizeListArray,
) -> Result<ReassignPartition> {
let dists = self.distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?;
Self::reassign_vectors_impl(
self.distance_type,
vector,
split_centroids_dists,
reassign_candidate_ids,
reassign_candidate_centroids,
)
}

fn reassign_vectors_impl<T: ArrowPrimitiveType>(
distance_type: DistanceType,
vector: &PrimitiveArray<T>,
split_centroids_dists: Option<(f32, f32)>,
reassign_candidate_ids: &UInt32Array,
reassign_candidate_centroids: &FixedSizeListArray,
) -> Result<ReassignPartition> {
let dists = distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?;
let min_dist_idx = dists.values().iter().position_min_by(|a, b| a.total_cmp(b));
let min_dist = min_dist_idx
.map(|idx| dists.value(idx))
Expand Down Expand Up @@ -1866,6 +2001,7 @@ pub(crate) fn index_type_string(sub_index: SubIndexType, quantizer: Quantization
mod tests {
use super::*;
use arrow_array::Float32Array;
use lance_index::vector::flat::index::{FlatIndex, FlatQuantizer};

#[test]
fn select_reassign_candidates_skips_deleted_partition() {
Expand Down Expand Up @@ -1894,4 +2030,50 @@ mod tests {
expected_centroid.as_primitive::<Float32Type>().values()
);
}

#[test]
fn compute_reassign_assign_ops_moves_vectors_to_new_centroids() {
let row_ids = UInt64Array::from(vec![1_u64, 2_u64]);
let vectors = FixedSizeListArray::try_new_from_values(
Float32Array::from(vec![0.0_f32, 0.0, 10.0, 10.0]),
2,
)
.unwrap();
let reassign_part_ids = UInt32Array::from(vec![0_u32]);
let reassign_part_centroids =
FixedSizeListArray::try_new_from_values(Float32Array::from(vec![9.0_f32, 9.0]), 2)
.unwrap();
let centroid1: ArrayRef = Arc::new(Float32Array::from(vec![0.0_f32, 0.0]));
let centroid2: ArrayRef = Arc::new(Float32Array::from(vec![20.0_f32, 20.0]));

let ops = IvfIndexBuilder::<FlatIndex, FlatQuantizer>::compute_reassign_assign_ops::<
Float32Type,
>(
DistanceType::L2,
0,
0,
1,
2,
&row_ids,
&vectors,
centroid1,
centroid2,
&reassign_part_ids,
&reassign_part_centroids,
)
.unwrap();

assert_eq!(ops.len(), 2);
assert!(matches!(ops[0], (0, AssignOp::Remove(1))));
match &ops[1] {
(1, AssignOp::Add((row_id, vector))) => {
assert_eq!(*row_id, 1);
assert_eq!(
vector.as_primitive::<Float32Type>().values(),
&[0.0_f32, 0.0]
);
}
other => panic!("unexpected op: {:?}", other),
}
}
}
Loading