diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 18dbf790f73..354c0ecaf87 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -3,8 +3,6 @@ //! IVF - Inverted File index. -use std::{any::Any, collections::HashMap, sync::Arc}; - use super::{builder::IvfIndexBuilder, utils::PartitionLoadLock}; use super::{ pq::{build_pq_model, PQIndex}, @@ -97,6 +95,8 @@ use roaring::RoaringBitmap; use serde::Serialize; use serde_json::json; use snafu::location; +use std::collections::HashSet; +use std::{any::Any, collections::HashMap, sync::Arc}; use tracing::instrument; use uuid::Uuid; @@ -2027,6 +2027,69 @@ pub async fn finalize_distributed_merge( let empty_batch = RecordBatch::new_empty(arrow_schema); v2_writer.write_batch(&empty_batch).await?; v2_writer.finish().await?; + + if let Err(err) = cleanup_partial_vector_dirs(object_store, index_dir).await { + warn!( + "Failed to cleanup partial_* vector index directories under '{}': {}", + index_dir.as_ref(), + err + ); + } + + Ok(()) +} + +/// Cleanup for distributed partial vector index directories after +/// a distributed merge. +/// +/// This helper scans `index_dir` for direct child directories whose names +/// start with `partial_` (e.g. `/partial_0`, `/partial_1`) +/// and attempts to recursively delete them via [`ObjectStore::remove_dir_all`]. +/// +/// Listing and deletion failures are logged with [`warn!`] and ignored so that +/// index finalization is never blocked by cleanup. The function always returns +/// `Ok(())`. +async fn cleanup_partial_vector_dirs( + object_store: &ObjectStore, + index_dir: &object_store::path::Path, +) -> Result<()> { + let mut partial_dirs: HashSet = HashSet::new(); + let mut list_stream = object_store.list(Some(index_dir.clone())); + + while let Some(item) = list_stream.next().await { + match item { + Ok(meta) => { + if let Some(relative_parts) = meta.location.prefix_match(index_dir) { + let rel_parts: Vec<_> = relative_parts.collect(); + // Expect paths like: /partial_*/ + if rel_parts.len() >= 2 { + let parent_name = rel_parts[0].as_ref(); + if parent_name.starts_with("partial_") { + partial_dirs.insert(index_dir.child(parent_name)); + } + } + } + } + Err(e) => { + warn!( + "Failed to list index directory '{}' while collecting partial_* dirs: {}", + index_dir.as_ref(), + e + ); + } + } + } + + for dir in partial_dirs { + if let Err(e) = object_store.remove_dir_all(dir.clone()).await { + warn!( + "Failed to remove partial_* directory '{}' after distributed merge: {}", + dir.as_ref(), + e + ); + } + } + Ok(()) } @@ -3538,4 +3601,69 @@ mod tests { assert!(correct_times >= 9, "correct: {}", correct_times); } + + #[tokio::test] + async fn test_cleanup_removes_only_partial_dirs() { + let object_store = ObjectStore::memory(); + let index_dir = Path::from("index/uuid_test_cleanup"); + + // partial_* directories that should be removed + let partial0_file = index_dir.child("partial_0").child("file.bin"); + let partial_abc_file = index_dir.child("partial_abc").child("file.bin"); + + // Non-partial paths that must be preserved + let partialx_file = index_dir.child("partialX").child("file.bin"); + let shard_file = index_dir.child("shard_0").child("file.bin"); + let keep_root_file = index_dir.child("keep_root.txt"); + + object_store.put(&partial0_file, b"partial0").await.unwrap(); + object_store + .put(&partial_abc_file, b"partial_abc") + .await + .unwrap(); + object_store.put(&partialx_file, b"partialx").await.unwrap(); + object_store.put(&shard_file, b"shard").await.unwrap(); + object_store.put(&keep_root_file, b"root").await.unwrap(); + + // Sanity: all files exist before cleanup + assert!(object_store.exists(&partial0_file).await.unwrap()); + assert!(object_store.exists(&partial_abc_file).await.unwrap()); + assert!(object_store.exists(&partialx_file).await.unwrap()); + assert!(object_store.exists(&shard_file).await.unwrap()); + assert!(object_store.exists(&keep_root_file).await.unwrap()); + + cleanup_partial_vector_dirs(&object_store, &index_dir) + .await + .unwrap(); + + // partial_* directories should be removed + assert!(!object_store.exists(&partial0_file).await.unwrap()); + assert!(!object_store.exists(&partial_abc_file).await.unwrap()); + + // Non-partial directories and root files must be preserved + assert!(object_store.exists(&partialx_file).await.unwrap()); + assert!(object_store.exists(&shard_file).await.unwrap()); + assert!(object_store.exists(&keep_root_file).await.unwrap()); + } + + #[tokio::test] + async fn test_cleanup_idempotent() { + let object_store = ObjectStore::memory(); + let index_dir = Path::from("index/uuid_test_cleanup_idempotent"); + + let partial_file = index_dir.child("partial_0").child("file.bin"); + object_store.put(&partial_file, b"partial").await.unwrap(); + + assert!(object_store.exists(&partial_file).await.unwrap()); + + cleanup_partial_vector_dirs(&object_store, &index_dir) + .await + .unwrap(); + assert!(!object_store.exists(&partial_file).await.unwrap()); + + // Second call should succeed even when there are no partial_* directories left. + cleanup_partial_vector_dirs(&object_store, &index_dir) + .await + .unwrap(); + } }