Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {

fn ivf_model(&self) -> &IvfModel;
fn quantizer(&self) -> Quantizer;
fn partition_size(&self, part_id: usize) -> usize;

/// the index type of this vector index.
fn sub_index_type(&self) -> (SubIndexType, QuantizationType);
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/vector/hnsw/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ impl<Q: Quantization + Send + Sync + 'static> VectorIndex for HNSWIndex<Q> {
self.partition_storage.quantizer().clone()
}

fn partition_size(&self, _: usize) -> usize {
unimplemented!("only for IVF")
}

fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
(
SubIndexType::Hnsw,
Expand Down
46 changes: 0 additions & 46 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use lance_io::{
utils::CachedFileSize,
};
use object_store::path::Path;
use snafu::location;
use tokio::sync::Mutex;

use crate::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN};

Expand Down Expand Up @@ -106,10 +104,6 @@ impl Shuffler for IvfShuffler {
&self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
) -> Result<Box<dyn ShuffleReader>> {
if self.num_partitions == 1 {
return Ok(Box::new(SinglePartitionReader::new(data)));
}

let num_partitions = self.num_partitions;
let mut partition_sizes = vec![0; num_partitions];
let schema = data.schema().without_column(PART_ID_COLUMN);
Expand Down Expand Up @@ -287,46 +281,6 @@ impl ShuffleReader for IvfShufflerReader {
}
}

pub struct SinglePartitionReader {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't know how many rows in the reader, but SPFresh needs num_rows to determine whether to trigger a split job, so remove this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would slow down indexing if num_partitions=1, but that means the dataset is small (otherwise users should indexing with more partitions), so it's fine

data: Mutex<Option<Box<dyn RecordBatchStream + Unpin + 'static>>>,
}

impl SinglePartitionReader {
pub fn new(data: Box<dyn RecordBatchStream + Unpin + 'static>) -> Self {
Self {
data: Mutex::new(Some(data)),
}
}
}

#[async_trait::async_trait]
impl ShuffleReader for SinglePartitionReader {
async fn read_partition(
&self,
_partition_id: usize,
) -> Result<Option<Box<dyn RecordBatchStream + Unpin + 'static>>> {
let mut data = self.data.lock().await;
match data.as_mut() {
Some(_) => Ok(data.take()),
None => Err(Error::Internal {
message: "the partition has been read and consumed".to_string(),
location: location!(),
}),
}
}

fn partition_size(&self, _partition_id: usize) -> Result<usize> {
// we don't really care about the partition size
// it's used for determining the order of building the index and skipping empty partitions
// so we just return 1 here
Ok(1)
}

fn total_loss(&self) -> Option<f64> {
None
}
}

pub struct EmptyReader;

#[async_trait::async_trait]
Expand Down
42 changes: 25 additions & 17 deletions rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
// if no partitions to split, we just create a new delta index,
// otherwise, we need to merge all existing indices and split large partitions.
let reader = reader.clone();
let (assign_batches, merge_indices) =
let (assign_batches, merge_indices, replaced_partition) =
match Self::should_split(ivf, reader.as_ref(), &self.existing_indices)? {
Some(partition) => {
// Perform split and record the fact for downstream build/merge
Expand All @@ -731,6 +731,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
(
split_results.assign_batches,
Arc::new(self.existing_indices.clone()),
Some(partition),
)
}
None => {
Expand All @@ -752,7 +753,11 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
[self.existing_indices.len().saturating_sub(num_to_merge)..]
.to_vec();

(vec![None; ivf.num_partitions()], Arc::new(indices_to_merge))
(
vec![None; ivf.num_partitions()],
Arc::new(indices_to_merge),
None,
)
}
};
self.merged_num = merge_indices.len();
Expand All @@ -777,13 +782,18 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
let sub_index_params = sub_index_params.clone();
let column = column.clone();
let frag_reuse_index = frag_reuse_index.clone();
let skip_existing_batches = replaced_partition == Some(partition);
async move {
let (mut batches, loss) = Self::take_partition_batches(
partition,
indices.as_ref(),
Some(reader.as_ref()),
)
.await?;
let (mut batches, loss) = if skip_existing_batches {
(Vec::new(), 0.0)
} else {
Self::take_partition_batches(
partition,
indices.as_ref(),
Some(reader.as_ref()),
)
.await?
};

if let Some((assign_batch, deleted_row_ids)) = assign_batch {
if !deleted_row_ids.is_empty() {
Expand Down Expand Up @@ -1163,7 +1173,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
for partition in 0..ivf.num_partitions() {
let mut num_rows = reader.partition_size(partition)?;
for index in existing_indices.iter() {
num_rows += index.ivf_model().partition_size(partition);
num_rows += index.partition_size(partition);
}
if num_rows > max_partition_size
&& num_rows > MAX_PARTITION_SIZE_FACTOR * index_type.target_partition_size()
Expand Down Expand Up @@ -1760,17 +1770,15 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
reassign_candidate_centroids: &FixedSizeListArray,
) -> Result<ReassignPartition> {
let dists = self.distance_type.arrow_batch_func()(vector, reassign_candidate_centroids)?;
let min_dist_idx = dists
.values()
.iter()
.position_min_by(|a, b| a.total_cmp(b))
.unwrap();
let min_dist = dists.value(min_dist_idx);
let min_dist_idx = dists.values().iter().position_min_by(|a, b| a.total_cmp(b));
let min_dist = min_dist_idx
.map(|idx| dists.value(idx))
.unwrap_or(f32::INFINITY);
match split_centroids_dists {
Some((d1, d2)) => {
if min_dist <= d1 && min_dist <= d2 {
Ok(ReassignPartition::ReassignCandidate(
reassign_candidate_ids.value(min_dist_idx),
reassign_candidate_ids.value(min_dist_idx.unwrap()),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here min_dist is not f32::INFINITY so min_dist_idx must be Some(...)

))
} else if d1 <= d2 {
Ok(ReassignPartition::NewCentroid1)
Expand All @@ -1779,7 +1787,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
}
}
None => Ok(ReassignPartition::ReassignCandidate(
reassign_candidate_ids.value(min_dist_idx),
reassign_candidate_ids.value(min_dist_idx.unwrap()),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a join job, which means there are at least 2 partitions, so min_dist_idx won't be None

)),
}
}
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/src/index/vector/fixture_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,15 @@ mod test {
fn ivf_model(&self) -> &IvfModel {
unimplemented!("only for IVF")
}

fn quantizer(&self) -> Quantizer {
unimplemented!("only for IVF")
}

fn partition_size(&self, _: usize) -> usize {
unimplemented!("only for IVF")
}

/// the index type of this vector index.
fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
unimplemented!("only for IVF")
Expand Down
4 changes: 4 additions & 0 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,10 @@ impl VectorIndex for IVFIndex {
unimplemented!("only for v2 IVFIndex")
}

fn partition_size(&self, part_id: usize) -> usize {
self.ivf.partition_size(part_id)
}

/// the index type of this vector index.
fn sub_index_type(&self) -> (SubIndexType, QuantizationType) {
unimplemented!("only for v2 IVFIndex")
Expand Down
Loading
Loading