Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 130 additions & 2 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

//! IVF - Inverted File index.

use std::{any::Any, collections::HashMap, sync::Arc};

use super::{builder::IvfIndexBuilder, utils::PartitionLoadLock};
use super::{
pq::{build_pq_model, PQIndex},
Expand Down Expand Up @@ -97,6 +95,8 @@ use roaring::RoaringBitmap;
use serde::Serialize;
use serde_json::json;
use snafu::location;
use std::collections::HashSet;
use std::{any::Any, collections::HashMap, sync::Arc};
use tracing::instrument;
use uuid::Uuid;

Expand Down Expand Up @@ -2027,6 +2027,69 @@ pub async fn finalize_distributed_merge(
let empty_batch = RecordBatch::new_empty(arrow_schema);
v2_writer.write_batch(&empty_batch).await?;
v2_writer.finish().await?;

if let Err(err) = cleanup_partial_vector_dirs(object_store, index_dir).await {
warn!(
"Failed to cleanup partial_* vector index directories under '{}': {}",
index_dir.as_ref(),
err
);
}

Ok(())
}

/// Cleanup for distributed partial vector index directories after
/// a distributed merge.
///
/// This helper scans `index_dir` for direct child directories whose names
/// start with `partial_` (e.g. `<index_dir>/partial_0`, `<index_dir>/partial_1`)
/// and attempts to recursively delete them via [`ObjectStore::remove_dir_all`].
///
/// Listing and deletion failures are logged with [`warn!`] and ignored so that
/// index finalization is never blocked by cleanup. The function always returns
/// `Ok(())`.
async fn cleanup_partial_vector_dirs(
object_store: &ObjectStore,
index_dir: &object_store::path::Path,
) -> Result<()> {
let mut partial_dirs: HashSet<Path> = HashSet::new();
let mut list_stream = object_store.list(Some(index_dir.clone()));

while let Some(item) = list_stream.next().await {
match item {
Ok(meta) => {
if let Some(relative_parts) = meta.location.prefix_match(index_dir) {
let rel_parts: Vec<_> = relative_parts.collect();
// Expect paths like: <index_dir>/partial_*/<file>
if rel_parts.len() >= 2 {
let parent_name = rel_parts[0].as_ref();
if parent_name.starts_with("partial_") {
partial_dirs.insert(index_dir.child(parent_name));
}
}
}
}
Err(e) => {
warn!(
"Failed to list index directory '{}' while collecting partial_* dirs: {}",
index_dir.as_ref(),
e
);
}
}
}

for dir in partial_dirs {
if let Err(e) = object_store.remove_dir_all(dir.clone()).await {
warn!(
"Failed to remove partial_* directory '{}' after distributed merge: {}",
dir.as_ref(),
e
);
}
}

Ok(())
}

Expand Down Expand Up @@ -3538,4 +3601,69 @@ mod tests {

assert!(correct_times >= 9, "correct: {}", correct_times);
}

#[tokio::test]
async fn test_cleanup_removes_only_partial_dirs() {
let object_store = ObjectStore::memory();
let index_dir = Path::from("index/uuid_test_cleanup");

// partial_* directories that should be removed
let partial0_file = index_dir.child("partial_0").child("file.bin");
let partial_abc_file = index_dir.child("partial_abc").child("file.bin");

// Non-partial paths that must be preserved
let partialx_file = index_dir.child("partialX").child("file.bin");
let shard_file = index_dir.child("shard_0").child("file.bin");
let keep_root_file = index_dir.child("keep_root.txt");

object_store.put(&partial0_file, b"partial0").await.unwrap();
object_store
.put(&partial_abc_file, b"partial_abc")
.await
.unwrap();
object_store.put(&partialx_file, b"partialx").await.unwrap();
object_store.put(&shard_file, b"shard").await.unwrap();
object_store.put(&keep_root_file, b"root").await.unwrap();

// Sanity: all files exist before cleanup
assert!(object_store.exists(&partial0_file).await.unwrap());
assert!(object_store.exists(&partial_abc_file).await.unwrap());
assert!(object_store.exists(&partialx_file).await.unwrap());
assert!(object_store.exists(&shard_file).await.unwrap());
assert!(object_store.exists(&keep_root_file).await.unwrap());

cleanup_partial_vector_dirs(&object_store, &index_dir)
.await
.unwrap();

// partial_* directories should be removed
assert!(!object_store.exists(&partial0_file).await.unwrap());
assert!(!object_store.exists(&partial_abc_file).await.unwrap());

// Non-partial directories and root files must be preserved
assert!(object_store.exists(&partialx_file).await.unwrap());
assert!(object_store.exists(&shard_file).await.unwrap());
assert!(object_store.exists(&keep_root_file).await.unwrap());
}

#[tokio::test]
async fn test_cleanup_idempotent() {
let object_store = ObjectStore::memory();
let index_dir = Path::from("index/uuid_test_cleanup_idempotent");

let partial_file = index_dir.child("partial_0").child("file.bin");
object_store.put(&partial_file, b"partial").await.unwrap();

assert!(object_store.exists(&partial_file).await.unwrap());

cleanup_partial_vector_dirs(&object_store, &index_dir)
.await
.unwrap();
assert!(!object_store.exists(&partial_file).await.unwrap());

// Second call should succeed even when there are no partial_* directories left.
cleanup_partial_vector_dirs(&object_store, &index_dir)
.await
.unwrap();
}
}
Loading