From bc777d3c67d6c41a8cf07b42782317ac6079747a Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 25 Nov 2025 19:35:43 +0800 Subject: [PATCH 1/2] squash --- rust/lance-table/src/io/commit.rs | 2 +- rust/lance-table/src/io/deletion.rs | 4 +- rust/lance/src/dataset.rs | 22 +++- rust/lance/src/dataset/cleanup.rs | 119 ++++++++++++++---- .../src/dataset/tests/dataset_transactions.rs | 4 +- rust/lance/src/io/commit.rs | 5 +- 6 files changed, 124 insertions(+), 32 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 96d7267e1bf..22ffb67fcee 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -67,7 +67,7 @@ use { std::time::{Duration, SystemTime}, }; -const VERSIONS_DIR: &str = "_versions"; +pub const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; const DETACHED_VERSION_PREFIX: &str = "d"; diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index ca714da4acd..c7c58d50af0 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -22,7 +22,7 @@ use tracing::{info, instrument}; use crate::format::{DeletionFile, DeletionFileType}; -pub(crate) const DELETION_DIRS: &str = "_deletions"; +pub const DELETIONS_DIR: &str = "_deletions"; /// Get the Arrow schema for an Arrow deletion file. fn deletion_arrow_schema() -> Arc { @@ -42,7 +42,7 @@ pub fn deletion_file_path(base: &Path, fragment_id: u64, deletion_file: &Deletio .. } = deletion_file; let suffix = file_type.suffix(); - base.child(DELETION_DIRS) + base.child(DELETIONS_DIR) .child(format!("{fragment_id}-{read_version}-{id}.{suffix}")) } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9de986ce022..a6a02055f0c 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -44,7 +44,7 @@ use lance_table::format::{ }; use lance_table::io::commit::{ migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler, - CommitLock, ManifestLocation, ManifestNamingScheme, + CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR, }; use lance_table::io::manifest::read_manifest; use object_store::path::Path; @@ -112,6 +112,7 @@ use lance_core::box_error; pub use lance_core::ROW_ID; use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest}; use lance_table::feature_flags::{apply_feature_flags, can_read_dataset}; +use lance_table::io::deletion::DELETIONS_DIR; pub use schema_evolution::{ BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, }; @@ -128,9 +129,10 @@ pub use write::{ WriteDestination, WriteMode, WriteParams, }; -const INDICES_DIR: &str = "_indices"; - +pub const INDICES_DIR: &str = "_indices"; pub const DATA_DIR: &str = "data"; +pub const TRANSACTIONS_DIR: &str = "_transactions"; + // We default to 6GB for the index cache, since indices are often large but // worth caching. pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024; @@ -1081,7 +1083,7 @@ impl Dataset { Transaction::try_from(tx).map(Some)? } else if let Some(path) = &self.manifest.transaction_file { // Fallback: read external transaction file if present - let path = self.base.child("_transactions").child(path.as_str()); + let path = self.transactions_dir().child(path.as_str()); let data = self.object_store.inner.get(&path).await?.bytes().await?; let transaction = lance_table::format::pb::Transaction::decode(data)?; Transaction::try_from(transaction).map(Some)? @@ -1598,6 +1600,18 @@ impl Dataset { self.base.child(INDICES_DIR) } + pub fn transactions_dir(&self) -> Path { + self.base.child(TRANSACTIONS_DIR) + } + + pub fn deletions_dir(&self) -> Path { + self.base.child(DELETIONS_DIR) + } + + pub fn versions_dir(&self) -> Path { + self.base.child(VERSIONS_DIR) + } + pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result { match data_file.base_id.as_ref() { Some(base_id) => { diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 75cf4a60996..b935ff7b3d4 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -52,6 +52,7 @@ use lance_table::{ }, }; use object_store::path::Path; +use object_store::{Error as ObjectStoreError, ObjectMeta}; use std::fmt::Debug; use std::{ collections::{HashMap, HashSet}, @@ -61,6 +62,7 @@ use std::{ use tracing::{info, instrument, Span}; use super::refs::TagContents; +use crate::dataset::TRANSACTIONS_DIR; use crate::{utils::temporal::utc_now, Dataset}; #[derive(Clone, Debug, Default)] @@ -240,7 +242,7 @@ impl<'a> CleanupTask<'a> { if let Some(relative_tx_path) = &manifest.transaction_file { referenced_files .tx_paths - .insert(Path::parse("_transactions")?.child(relative_tx_path.as_str())); + .insert(Path::parse(TRANSACTIONS_DIR)?.child(relative_tx_path.as_str())); } for index in indexes { @@ -258,26 +260,59 @@ impl<'a> CleanupTask<'a> { let removal_stats = Mutex::new(RemovalStats::default()); let verification_threshold = utc_now() - TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days"); - let unreferenced_paths = self - .dataset - .object_store - .read_dir_all( - &self.dataset.base, - inspection.earliest_retained_manifest_time, + + let is_not_found_err = |e: &Error| { + matches!( + e, + Error::IO { source,.. } + if source + .downcast_ref::() + .map(|os_err| matches!(os_err, ObjectStoreError::NotFound {.. })) + .unwrap_or(false) ) - .try_filter_map(|obj_meta| { - // If a file is new-ish then it might be part of an ongoing operation and so we only - // delete it if we can verify it is part of an old version. - let maybe_in_progress = !self.policy.delete_unverified - && obj_meta.last_modified >= verification_threshold; - let path_to_remove = - self.path_if_not_referenced(obj_meta.location, maybe_in_progress, &inspection); - if matches!(path_to_remove, Ok(Some(..))) { - removal_stats.lock().unwrap().bytes_removed += obj_meta.size; - } - future::ready(path_to_remove) - }) - .boxed(); + }; + // Build stream for a managed subtree + let build_listing_stream = |dir: Path| { + self.dataset + .object_store + .read_dir_all(&dir, inspection.earliest_retained_manifest_time) + .map_ok(|obj| stream::once(future::ready(Ok(obj))).boxed()) + .or_else(|e| { + // If the directory doesn't exist then we can just return an empty stream. + if is_not_found_err(&e) { + future::ready(Ok(stream::empty::>().boxed())) + } else { + future::ready(Err(e)) + } + }) + .try_flatten() + .try_filter_map(|obj_meta| { + // If a file is new-ish then it might be part of an ongoing operation and so we only + // delete it if we can verify it is part of an old version. + let maybe_in_progress = !self.policy.delete_unverified + && obj_meta.last_modified >= verification_threshold; + let path_to_remove = self.path_if_not_referenced( + obj_meta.location, + maybe_in_progress, + &inspection, + ); + if matches!(path_to_remove, Ok(Some(..))) { + removal_stats.lock().unwrap().bytes_removed += obj_meta.size; + } + future::ready(path_to_remove) + }) + .boxed() + }; + + // Restrict scanning to Lance-managed subtrees for safety and performance. + let streams = vec![ + build_listing_stream(self.dataset.versions_dir()), + build_listing_stream(self.dataset.transactions_dir()), + build_listing_stream(self.dataset.data_dir()), + build_listing_stream(self.dataset.indices_dir()), + build_listing_stream(self.dataset.deletions_dir()), + ]; + let unreferenced_paths = stream::iter(streams).flatten().boxed(); let old_manifests = inspection.old_manifests.clone(); let num_old_manifests = old_manifests.len(); @@ -420,7 +455,7 @@ impl<'a> CleanupTask<'a> { } } Some("txn") => { - if relative_path.as_ref().starts_with("_transactions") { + if relative_path.as_ref().starts_with(TRANSACTIONS_DIR) { if inspection .referenced_files .tx_paths @@ -1590,4 +1625,46 @@ mod tests { assert_eq!(after_count.num_data_files, 3); assert_eq!(after_count.num_manifest_files, 3); } + + #[tokio::test] + async fn cleanup_preserves_unmanaged_dirs_and_files() { + // Ensure cleanup does not delete unmanaged directories/files under the dataset root + // Uses MockDatasetFixture and run_cleanup_with_override to match other tests' style + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + + let registry = Arc::new(ObjectStoreRegistry::default()); + let (os, base) = + ObjectStore::from_uri_and_params(registry, &fixture.dataset_path, &fixture.os_params()) + .await + .unwrap(); + + // Create unmanaged directories/files under dataset root + let img = base.child("images").child("clip.mp4"); + let misc = base.child("misc").child("notes.txt"); + let branch_file = base.child("tree").child("branchA").child("data.bin"); + os.put(&img, b"video").await.unwrap(); + os.put(&misc, b"notes").await.unwrap(); + os.put(&branch_file, b"branch").await.unwrap(); + + // Create a temporary manifest file that should be cleaned + let tmp_manifest = base.child("_versions").child(".tmp").child("orphan"); + os.put(&tmp_manifest, b"tmp").await.unwrap(); + // Delete the _transactions directory so that we can test that if not_found err will be swallowed + os.remove_dir_all(base.child(TRANSACTIONS_DIR)) + .await + .unwrap(); + + fixture + .run_cleanup_with_override(utc_now(), Some(true), Some(false)) + .await + .unwrap(); + + // Temp manifest file is managed by Lance and should be removed + assert!(!os.exists(&tmp_manifest).await.unwrap()); + // Unrelated files must remain + assert!(os.exists(&img).await.unwrap()); + assert!(os.exists(&misc).await.unwrap()); + assert!(os.exists(&branch_file).await.unwrap()); + } } diff --git a/rust/lance/src/dataset/tests/dataset_transactions.rs b/rust/lance/src/dataset/tests/dataset_transactions.rs index 387512497bc..5e651a3df8c 100644 --- a/rust/lance/src/dataset/tests/dataset_transactions.rs +++ b/rust/lance/src/dataset/tests/dataset_transactions.rs @@ -7,7 +7,7 @@ use std::vec; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction}; -use crate::dataset::{write_manifest_file, ManifestWriteConfig}; +use crate::dataset::{write_manifest_file, ManifestWriteConfig, TRANSACTIONS_DIR}; use crate::io::ObjectStoreParams; use crate::session::Session; use crate::{Dataset, Result}; @@ -297,7 +297,7 @@ async fn test_inline_transaction() { async fn delete_external_tx_file(ds: &Dataset) { if let Some(tx_file) = ds.manifest.transaction_file.as_ref() { - let tx_path = ds.base.child("_transactions").child(tx_file.as_str()); + let tx_path = ds.base.child(TRANSACTIONS_DIR).child(tx_file.as_str()); let _ = ds.object_store.inner.delete(&tx_path).await; // ignore errors } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index f0064ad60cb..bb1270316b8 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -46,6 +46,7 @@ use crate::dataset::fragment::FileFragment; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::{ load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, + TRANSACTIONS_DIR, }; use crate::index::DatasetIndexInternalExt; use crate::io::deletion::read_dataset_deletion_file; @@ -77,7 +78,7 @@ pub(crate) async fn read_transaction_file( base_path: &Path, transaction_file: &str, ) -> Result { - let path = base_path.child("_transactions").child(transaction_file); + let path = base_path.child(TRANSACTIONS_DIR).child(transaction_file); let result = object_store.inner.get(&path).await?; let data = result.bytes().await?; let transaction = pb::Transaction::decode(data)?; @@ -91,7 +92,7 @@ pub(crate) async fn write_transaction_file( transaction: &Transaction, ) -> Result { let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid); - let path = base_path.child("_transactions").child(file_name.as_str()); + let path = base_path.child(TRANSACTIONS_DIR).child(file_name.as_str()); let message = pb::Transaction::from(transaction); let buf = message.encode_to_vec(); From 272debdae79af7aef25d3dca7c0610b5343226a6 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 16 Dec 2025 10:54:24 +0800 Subject: [PATCH 2/2] use pub(crate) --- rust/lance/src/dataset.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index a6a02055f0c..553cbcfae86 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -129,9 +129,9 @@ pub use write::{ WriteDestination, WriteMode, WriteParams, }; -pub const INDICES_DIR: &str = "_indices"; -pub const DATA_DIR: &str = "data"; -pub const TRANSACTIONS_DIR: &str = "_transactions"; +pub(crate) const INDICES_DIR: &str = "_indices"; +pub(crate) const DATA_DIR: &str = "data"; +pub(crate) const TRANSACTIONS_DIR: &str = "_transactions"; // We default to 6GB for the index cache, since indices are often large but // worth caching.