Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 133 additions & 26 deletions rust/lance-index/src/vector/distributed/index_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
use crate::vector::shared::partition_merger::{
write_unified_ivf_and_index_metadata, SupportedIvfIndexType,
};
use arrow::datatypes::Float32Type;
use arrow::{compute::concat_batches, datatypes::Float32Type};
use arrow_array::cast::AsArray;
use arrow_array::{Array, FixedSizeListArray, UInt64Array};
use arrow_array::types::UInt8Type;
use arrow_array::{Array, FixedSizeListArray, RecordBatch, UInt64Array};
use futures::StreamExt as _;
use lance_arrow::{FixedSizeListArrayExt, RecordBatchExt};
use lance_core::utils::address::RowAddress;
use lance_core::{Error, Result, ROW_ID_FIELD};
use snafu::location;
Expand All @@ -19,7 +21,7 @@ use std::sync::Arc;
use crate::pb;
use crate::vector::flat::index::FlatMetadata;
use crate::vector::ivf::storage::{IvfModel as IvfStorageModel, IVF_METADATA_KEY};
use crate::vector::pq::storage::{ProductQuantizationMetadata, PQ_METADATA_KEY};
use crate::vector::pq::storage::{transpose, ProductQuantizationMetadata, PQ_METADATA_KEY};
use crate::vector::quantizer::QuantizerMetadata;
use crate::vector::sq::storage::{ScalarQuantizationMetadata, SQ_METADATA_KEY};
use crate::vector::storage::STORAGE_METADATA_KEY;
Expand Down Expand Up @@ -272,6 +274,54 @@ pub async fn write_partition_rows(
Ok(())
}

/// Transpose the PQ code column for a batch and write it to the unified writer.
///
/// This helper assumes `batch` contains a contiguous range of rows for a single
/// IVF partition.
async fn write_partition_rows_pq_transposed(
w: &mut FileWriter,
mut batch: RecordBatch,
) -> Result<()> {
let num_rows = batch.num_rows();
if num_rows == 0 {
return Ok(());
}

let pq_col = batch
.column_by_name(PQ_CODE_COLUMN)
.ok_or_else(|| Error::Index {
message: format!("PQ column {} missing in auxiliary shard", PQ_CODE_COLUMN),
location: location!(),
})?;
let pq_fsl = pq_col
.as_fixed_size_list_opt()
.ok_or_else(|| Error::Index {
message: format!(
"PQ column {} is not a FixedSizeList in auxiliary shard, got {}",
PQ_CODE_COLUMN,
pq_col.data_type(),
),
location: location!(),
})?;
let num_bytes = pq_fsl.value_length() as usize;
let values = pq_fsl.values().as_primitive::<UInt8Type>();
let transposed_codes = transpose(values, num_rows, num_bytes);
let transposed_fsl = Arc::new(FixedSizeListArray::try_new_from_values(
transposed_codes,
num_bytes as i32,
)?);
batch = batch.replace_column_by_name(PQ_CODE_COLUMN, transposed_fsl)?;

// Write in reasonably sized chunks to avoid huge batches.
let batch_size: usize = 10_240;
for offset in (0..num_rows).step_by(batch_size) {
let len = std::cmp::min(batch_size, num_rows - offset);
let slice = batch.slice(offset, len);
w.write_batch(&slice).await?;
}
Ok(())
}

/// Detect and return supported index type from reader and schema.
///
/// This is a lightweight wrapper around SupportedIndexType::detect to keep
Expand Down Expand Up @@ -817,7 +867,9 @@ pub async fn merge_partial_vector_auxiliary_files(
pq_meta = Some(pm.clone());
}
if v2w_opt.is_none() {
let w = init_writer_for_pq(object_store, &aux_out, dt, &pm).await?;
let mut pm_for_unified = pm.clone();
pm_for_unified.transposed = true;
let w = init_writer_for_pq(object_store, &aux_out, dt, &pm_for_unified).await?;
v2w_opt = Some(w);
}
}
Expand Down Expand Up @@ -1023,7 +1075,9 @@ pub async fn merge_partial_vector_auxiliary_files(
pq_meta = Some(pm.clone());
}
if v2w_opt.is_none() {
let w = init_writer_for_pq(object_store, &aux_out, dt, &pm).await?;
let mut pm_for_unified = pm.clone();
pm_for_unified.transposed = true;
let w = init_writer_for_pq(object_store, &aux_out, dt, &pm_for_unified).await?;
v2w_opt = Some(w);
}
}
Expand Down Expand Up @@ -1117,24 +1171,81 @@ pub async fn merge_partial_vector_auxiliary_files(
message: "Missing IVF partition count".to_string(),
location: location!(),
})?;
for pid in 0..nlist {
for (path, lens, _) in shard_infos.iter() {
let part_len = lens[pid] as usize;
if part_len == 0 {
continue;
let idx_type_final = detected_index_type.ok_or_else(|| Error::Index {
message: "Unable to detect index type".to_string(),
location: location!(),
})?;

match idx_type_final {
SupportedIvfIndexType::IvfPq | SupportedIvfIndexType::IvfHnswPq => {
// For PQ-backed indices, transpose PQ codes while merging partitions
// so that the unified file stores column-major PQ codes.
for pid in 0..nlist {
let total_len = accumulated_lengths[pid] as usize;
if total_len == 0 {
continue;
}

let mut part_batches: Vec<RecordBatch> = Vec::new();
for (path, lens, _) in shard_infos.iter() {
let part_len = lens[pid] as usize;
if part_len == 0 {
continue;
}
let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum();
let fh = sched.open_file(path, &CachedFileSize::unknown()).await?;
let reader = V2Reader::try_open(
fh,
None,
Arc::default(),
&lance_core::cache::LanceCache::no_cache(),
V2ReaderOptions::default(),
)
.await?;
let mut stream = reader.read_stream(
lance_io::ReadBatchParams::Range(offset..offset + part_len),
u32::MAX,
4,
lance_encoding::decoder::FilterExpression::no_filter(),
)?;
while let Some(rb) = stream.next().await {
let rb = rb?;
part_batches.push(rb);
}
}

if part_batches.is_empty() {
continue;
}

let schema = part_batches[0].schema();
let partition_batch = concat_batches(&schema, part_batches.iter())?;
if let Some(w) = v2w_opt.as_mut() {
write_partition_rows_pq_transposed(w, partition_batch).await?;
}
}
let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum();
let fh = sched.open_file(path, &CachedFileSize::unknown()).await?;
let reader = V2Reader::try_open(
fh,
None,
Arc::default(),
&lance_core::cache::LanceCache::no_cache(),
V2ReaderOptions::default(),
)
.await?;
if let Some(w) = v2w_opt.as_mut() {
write_partition_rows(&reader, w, offset..offset + part_len).await?;
}
_ => {
for pid in 0..nlist {
for (path, lens, _) in shard_infos.iter() {
let part_len = lens[pid] as usize;
if part_len == 0 {
continue;
}
let offset: usize = lens.iter().take(pid).map(|x| *x as usize).sum();
let fh = sched.open_file(path, &CachedFileSize::unknown()).await?;
let reader = V2Reader::try_open(
fh,
None,
Arc::default(),
&lance_core::cache::LanceCache::no_cache(),
V2ReaderOptions::default(),
)
.await?;
if let Some(w) = v2w_opt.as_mut() {
write_partition_rows(&reader, w, offset..offset + part_len).await?;
}
}
}
}
}
Expand All @@ -1153,10 +1264,6 @@ pub async fn merge_partial_vector_auxiliary_files(
message: "Distance type missing".to_string(),
location: location!(),
})?;
let idx_type_final = detected_index_type.ok_or_else(|| Error::Index {
message: "Unable to detect index type".to_string(),
location: location!(),
})?;
write_unified_ivf_and_index_metadata(w, &ivf_model, dt2, idx_type_final).await?;
w.finish().await?;
} else {
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ please provide PQBuildParams.codebook for distributed indexing"
)?
.with_ivf(ivf_model)
.with_quantizer(global_pq)
// For distributed shards, keep PQ codes in their original layout
// and transpose only after all shards are merged.
.with_transpose(false)
.with_fragment_filter(fragment_filter)
.build()
.await?;
Expand Down Expand Up @@ -615,6 +618,9 @@ please provide PQBuildParams.codebook for distributed indexing"
)?
.with_ivf(ivf_model)
.with_quantizer(global_pq)
// For distributed shards, keep PQ codes in their original layout
// and transpose only after all shards are merged.
.with_transpose(false)
.with_fragment_filter(fragment_filter)
.build()
.await?;
Expand Down
Loading