Skip to content

Commit 1e5f0e1

Browse files
committed
refactor: internalize distributed vector segment build
1 parent a86274d commit 1e5f0e1

6 files changed

Lines changed: 620 additions & 342 deletions

File tree

rust/lance-index/src/vector/distributed/index_merger.rs

Lines changed: 50 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -602,53 +602,26 @@ async fn read_shard_window_partitions(
602602
Ok(per_partition_batches)
603603
}
604604

605-
/// Merge all partial_* vector index auxiliary files under `index_dir/{uuid}/partial_*/auxiliary.idx`
606-
/// into `index_dir/{uuid}/auxiliary.idx`.
605+
/// Merge the selected partial-shard auxiliary files into `target_dir`.
607606
///
608-
/// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, IVF_HNSW_SQ storage types.
609-
/// For PQ and SQ, this assumes all partial indices share the same quantizer/codebook
610-
/// and distance type; it will reuse the first encountered metadata.
607+
/// This is the storage merge kernel for vector staged segment build. Callers
608+
/// choose which partial shards belong to one built segment and pass the corresponding
609+
/// auxiliary files here. The merge writes one unified `auxiliary.idx` into
610+
/// `target_dir`.
611+
///
612+
/// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, and
613+
/// IVF_HNSW_SQ storage types. For PQ and SQ, this assumes all selected partial
614+
/// shards share the same quantizer/codebook and distance type; it reuses the
615+
/// first encountered metadata.
611616
pub async fn merge_partial_vector_auxiliary_files(
612617
object_store: &lance_io::object_store::ObjectStore,
613-
index_dir: &object_store::path::Path,
618+
aux_paths: &[object_store::path::Path],
619+
target_dir: &object_store::path::Path,
614620
) -> Result<()> {
615-
let mut aux_paths: Vec<object_store::path::Path> = Vec::new();
616-
let mut stream = object_store.list(Some(index_dir.clone()));
617-
while let Some(item) = stream.next().await {
618-
if let Ok(meta) = item
619-
&& let Some(fname) = meta.location.filename()
620-
&& fname == INDEX_AUXILIARY_FILE_NAME
621-
{
622-
// Check parent dir name starts with partial_
623-
let parts: Vec<_> = meta.location.parts().collect();
624-
if parts.len() >= 2 {
625-
let pname = parts[parts.len() - 2].as_ref();
626-
if pname.starts_with("partial_") {
627-
aux_paths.push(meta.location.clone());
628-
}
629-
}
630-
}
631-
}
632-
633621
if aux_paths.is_empty() {
634-
// If a unified auxiliary file already exists at the root, no merge is required.
635-
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
636-
if object_store.exists(&aux_out).await.unwrap_or(false) {
637-
log::warn!(
638-
"No partial_* auxiliary files found under index dir: {}, but unified auxiliary file already exists; skipping merge",
639-
index_dir
640-
);
641-
return Ok(());
642-
}
643-
// For certain index types (e.g., FLAT/HNSW-only) the merge may be a no-op in distributed setups
644-
// where shards were committed directly. In such cases, proceed without error to avoid blocking
645-
// index manifest merge. PQ/SQ variants still require merging artifacts and will be handled by
646-
// downstream open logic if missing.
647-
log::warn!(
648-
"No partial_* auxiliary files found under index dir: {}; proceeding without merge for index types that do not require auxiliary shards",
649-
index_dir
650-
);
651-
return Ok(());
622+
return Err(Error::index(
623+
"No partial auxiliary files were selected for merge".to_string(),
624+
));
652625
}
653626

654627
// Prepare IVF model and storage metadata aggregation
@@ -661,7 +634,7 @@ pub async fn merge_partial_vector_auxiliary_files(
661634
let mut format_version: Option<LanceFileVersion> = None;
662635

663636
// Prepare output path; we'll create writer once when we know schema
664-
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
637+
let aux_out = target_dir.child(INDEX_AUXILIARY_FILE_NAME);
665638

666639
// We'll delay creating the V2 writer until we know the vector schema (dim and quantizer type)
667640
let mut v2w_opt: Option<V2Writer> = None;
@@ -682,7 +655,7 @@ pub async fn merge_partial_vector_auxiliary_files(
682655
let mut shard_infos: Vec<ShardInfo> = Vec::new();
683656

684657
// Iterate over each shard auxiliary file and merge its metadata and collect lengths
685-
for aux in &aux_paths {
658+
for aux in aux_paths {
686659
let fh = sched.open_file(aux, &CachedFileSize::unknown()).await?;
687660
let reader = V2Reader::try_open(
688661
fh,
@@ -1417,9 +1390,13 @@ mod tests {
14171390
.await
14181391
.unwrap();
14191392

1420-
merge_partial_vector_auxiliary_files(&object_store, &index_dir)
1421-
.await
1422-
.unwrap();
1393+
merge_partial_vector_auxiliary_files(
1394+
&object_store,
1395+
&[aux0.clone(), aux1.clone()],
1396+
&index_dir,
1397+
)
1398+
.await
1399+
.unwrap();
14231400

14241401
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
14251402
assert!(object_store.exists(&aux_out).await.unwrap());
@@ -1515,7 +1492,12 @@ mod tests {
15151492
.await
15161493
.unwrap();
15171494

1518-
let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await;
1495+
let res = merge_partial_vector_auxiliary_files(
1496+
&object_store,
1497+
&[aux0.clone(), aux1.clone()],
1498+
&index_dir,
1499+
)
1500+
.await;
15191501
match res {
15201502
Err(Error::Index { message, .. }) => {
15211503
assert!(
@@ -1690,9 +1672,13 @@ mod tests {
16901672
.unwrap();
16911673

16921674
// Merge PQ auxiliary files.
1693-
merge_partial_vector_auxiliary_files(&object_store, &index_dir)
1694-
.await
1695-
.unwrap();
1675+
merge_partial_vector_auxiliary_files(
1676+
&object_store,
1677+
&[aux0.clone(), aux1.clone()],
1678+
&index_dir,
1679+
)
1680+
.await
1681+
.unwrap();
16961682

16971683
// 3) Unified auxiliary file exists.
16981684
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
@@ -1818,7 +1804,12 @@ mod tests {
18181804
.await
18191805
.unwrap();
18201806

1821-
let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await;
1807+
let res = merge_partial_vector_auxiliary_files(
1808+
&object_store,
1809+
&[aux0.clone(), aux1.clone()],
1810+
&index_dir,
1811+
)
1812+
.await;
18221813
match res {
18231814
Err(Error::Index { message, .. }) => {
18241815
assert!(
@@ -1893,9 +1884,13 @@ mod tests {
18931884
.unwrap();
18941885

18951886
// Merge must succeed and produce a unified auxiliary file.
1896-
merge_partial_vector_auxiliary_files(&object_store, &index_dir)
1897-
.await
1898-
.unwrap();
1887+
merge_partial_vector_auxiliary_files(
1888+
&object_store,
1889+
&[aux_a.clone(), aux_b.clone()],
1890+
&index_dir,
1891+
)
1892+
.await
1893+
.unwrap();
18991894

19001895
let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME);
19011896
assert!(object_store.exists(&aux_out).await.unwrap());

rust/lance-index/src/vector/sq/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636

3737
pub const SQ_METADATA_KEY: &str = "lance:sq";
3838

39-
#[derive(Debug, Clone, Serialize, Deserialize)]
39+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
4040
pub struct ScalarQuantizationMetadata {
4141
pub dim: usize,
4242
pub num_bits: u16,

rust/lance/src/dataset.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,11 @@ impl Dataset {
18171817
.collect()
18181818
}
18191819

1820+
/// Iterate over manifest fragments without allocating [`FileFragment`] wrappers.
1821+
pub fn iter_fragments(&self) -> impl Iterator<Item = &Fragment> {
1822+
self.manifest.fragments.iter()
1823+
}
1824+
18201825
pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
18211826
let dataset = Arc::new(self.clone());
18221827
let fragment = self
@@ -2658,6 +2663,7 @@ impl Dataset {
26582663
self.merge_impl(stream, left_on, right_on).await
26592664
}
26602665

2666+
/// Merge a staged distributed index into a single root artifact.
26612667
pub async fn merge_index_metadata(
26622668
&self,
26632669
index_uuid: &str,
@@ -2688,14 +2694,59 @@ impl Dataset {
26882694
}
26892695
// Precise vector index types: IVF_FLAT, IVF_PQ, IVF_SQ
26902696
IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => {
2691-
// Merge distributed vector index partials and finalize root index via Lance IVF helper
2692-
crate::index::vector::ivf::finalize_distributed_merge(
2693-
self.object_store(),
2697+
let mut partial_indices = self
2698+
.object_store()
2699+
.read_dir(index_dir.clone())
2700+
.await?
2701+
.into_iter()
2702+
.filter(|name| name.starts_with("partial_"))
2703+
.map(|name| {
2704+
name.strip_prefix("partial_")
2705+
.ok_or_else(|| {
2706+
Error::index(format!(
2707+
"Distributed vector shard '{}' does not start with 'partial_'",
2708+
name
2709+
))
2710+
})
2711+
.and_then(|shard_uuid| {
2712+
uuid::Uuid::parse_str(shard_uuid).map_err(|err| {
2713+
Error::index(format!(
2714+
"Distributed vector shard '{}' does not end with a valid UUID: {}",
2715+
name, err
2716+
))
2717+
})
2718+
})
2719+
.map(|shard_uuid| IndexMetadata {
2720+
uuid: shard_uuid,
2721+
name: String::new(),
2722+
fields: Vec::new(),
2723+
dataset_version: self.manifest.version,
2724+
fragment_bitmap: Some(RoaringBitmap::new()),
2725+
index_details: None,
2726+
index_version: index_type.version(),
2727+
created_at: None,
2728+
base_id: None,
2729+
files: Some(Vec::new()),
2730+
})
2731+
})
2732+
.collect::<Result<Vec<_>>>()?;
2733+
partial_indices.sort_by_key(|index| index.uuid);
2734+
let segment_plans = crate::index::vector::ivf::plan_staging_segments(
26942735
&index_dir,
2736+
&partial_indices,
26952737
Some(index_type),
2738+
None,
26962739
)
26972740
.await?;
2698-
Ok(())
2741+
let merged_plan =
2742+
crate::index::vector::ivf::collapse_segment_plans(&segment_plans)?;
2743+
crate::index::vector::ivf::build_staging_segment(
2744+
self.object_store(),
2745+
&self.indices_dir(),
2746+
&merged_plan,
2747+
)
2748+
.await
2749+
.map(|_| ())
26992750
}
27002751
_ => Err(Error::invalid_input_source(Box::new(std::io::Error::new(
27012752
std::io::ErrorKind::InvalidInput,

rust/lance/src/dataset/mem_wal/memtable/flush.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,8 @@ impl MemTableFlusher {
247247
index_meta.fields = vec![field_idx];
248248
index_meta.dataset_version = dataset.version().version;
249249
// Calculate fragment_bitmap from dataset fragments
250-
let fragment_ids: roaring::RoaringBitmap = dataset
251-
.get_fragments()
252-
.iter()
253-
.map(|f| f.id() as u32)
254-
.collect();
250+
let fragment_ids: roaring::RoaringBitmap =
251+
dataset.fragment_bitmap.as_ref().clone();
255252
index_meta.fragment_bitmap = Some(fragment_ids);
256253

257254
// Commit the index to the dataset
@@ -467,11 +464,7 @@ impl MemTableFlusher {
467464
let schema = dataset.schema();
468465
let field_idx = schema.field(&fts_cfg.column).map(|f| f.id).unwrap_or(0);
469466

470-
let fragment_ids: roaring::RoaringBitmap = dataset
471-
.get_fragments()
472-
.iter()
473-
.map(|f| f.id() as u32)
474-
.collect();
467+
let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone();
475468

476469
let index_meta = IndexMetadata {
477470
uuid: index_uuid,

0 commit comments

Comments
 (0)