Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use {
std::time::{Duration, SystemTime},
};

const VERSIONS_DIR: &str = "_versions";
pub const VERSIONS_DIR: &str = "_versions";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some inconsistency with all these dir constants. Some are here, others are in dataset.rs. I don't think it really matters but it might be nice sometime to just put them all in datasets.rs and either make them all pub (which I think we've done here) or refer to them all from the Dataset accessor functions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion.

I was planning to go with the latter approach, but I didn’t want to be too aggressive. I will carry this forward in other PRs

const MANIFEST_EXTENSION: &str = "manifest";
const DETACHED_VERSION_PREFIX: &str = "d";

Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::{info, instrument};

use crate::format::{DeletionFile, DeletionFileType};

pub(crate) const DELETION_DIRS: &str = "_deletions";
pub const DELETIONS_DIR: &str = "_deletions";

/// Get the Arrow schema for an Arrow deletion file.
fn deletion_arrow_schema() -> Arc<Schema> {
Expand All @@ -42,7 +42,7 @@ pub fn deletion_file_path(base: &Path, fragment_id: u64, deletion_file: &Deletio
..
} = deletion_file;
let suffix = file_type.suffix();
base.child(DELETION_DIRS)
base.child(DELETIONS_DIR)
.child(format!("{fragment_id}-{read_version}-{id}.{suffix}"))
}

Expand Down
22 changes: 18 additions & 4 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use lance_table::format::{
};
use lance_table::io::commit::{
migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler,
CommitLock, ManifestLocation, ManifestNamingScheme,
CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR,
};
use lance_table::io::manifest::read_manifest;
use object_store::path::Path;
Expand Down Expand Up @@ -112,6 +112,7 @@ use lance_core::box_error;
pub use lance_core::ROW_ID;
use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest};
use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
use lance_table::io::deletion::DELETIONS_DIR;
pub use schema_evolution::{
BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
};
Expand All @@ -128,9 +129,10 @@ pub use write::{
WriteDestination, WriteMode, WriteParams,
};

const INDICES_DIR: &str = "_indices";
pub(crate) const INDICES_DIR: &str = "_indices";
pub(crate) const DATA_DIR: &str = "data";
pub(crate) const TRANSACTIONS_DIR: &str = "_transactions";

pub const DATA_DIR: &str = "data";
// We default to 6GB for the index cache, since indices are often large but
// worth caching.
pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024;
Expand Down Expand Up @@ -1081,7 +1083,7 @@ impl Dataset {
Transaction::try_from(tx).map(Some)?
} else if let Some(path) = &self.manifest.transaction_file {
// Fallback: read external transaction file if present
let path = self.base.child("_transactions").child(path.as_str());
let path = self.transactions_dir().child(path.as_str());
let data = self.object_store.inner.get(&path).await?.bytes().await?;
let transaction = lance_table::format::pb::Transaction::decode(data)?;
Transaction::try_from(transaction).map(Some)?
Expand Down Expand Up @@ -1598,6 +1600,18 @@ impl Dataset {
self.base.child(INDICES_DIR)
}

pub fn transactions_dir(&self) -> Path {
self.base.child(TRANSACTIONS_DIR)
}

pub fn deletions_dir(&self) -> Path {
self.base.child(DELETIONS_DIR)
}

pub fn versions_dir(&self) -> Path {
self.base.child(VERSIONS_DIR)
}

pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
match data_file.base_id.as_ref() {
Some(base_id) => {
Expand Down
119 changes: 98 additions & 21 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use lance_table::{
},
};
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectMeta};
use std::fmt::Debug;
use std::{
collections::{HashMap, HashSet},
Expand All @@ -61,6 +62,7 @@ use std::{
use tracing::{info, instrument, Span};

use super::refs::TagContents;
use crate::dataset::TRANSACTIONS_DIR;
use crate::{utils::temporal::utc_now, Dataset};

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -240,7 +242,7 @@ impl<'a> CleanupTask<'a> {
if let Some(relative_tx_path) = &manifest.transaction_file {
referenced_files
.tx_paths
.insert(Path::parse("_transactions")?.child(relative_tx_path.as_str()));
.insert(Path::parse(TRANSACTIONS_DIR)?.child(relative_tx_path.as_str()));
}

for index in indexes {
Expand All @@ -258,26 +260,59 @@ impl<'a> CleanupTask<'a> {
let removal_stats = Mutex::new(RemovalStats::default());
let verification_threshold = utc_now()
- TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days");
let unreferenced_paths = self
.dataset
.object_store
.read_dir_all(
&self.dataset.base,
inspection.earliest_retained_manifest_time,

let is_not_found_err = |e: &Error| {
matches!(
e,
Error::IO { source,.. }
if source
.downcast_ref::<ObjectStoreError>()
.map(|os_err| matches!(os_err, ObjectStoreError::NotFound {.. }))
.unwrap_or(false)
)
.try_filter_map(|obj_meta| {
// If a file is new-ish then it might be part of an ongoing operation and so we only
// delete it if we can verify it is part of an old version.
let maybe_in_progress = !self.policy.delete_unverified
&& obj_meta.last_modified >= verification_threshold;
let path_to_remove =
self.path_if_not_referenced(obj_meta.location, maybe_in_progress, &inspection);
if matches!(path_to_remove, Ok(Some(..))) {
removal_stats.lock().unwrap().bytes_removed += obj_meta.size;
}
future::ready(path_to_remove)
})
.boxed();
};
// Build stream for a managed subtree
let build_listing_stream = |dir: Path| {
self.dataset
.object_store
.read_dir_all(&dir, inspection.earliest_retained_manifest_time)
.map_ok(|obj| stream::once(future::ready(Ok(obj))).boxed())
.or_else(|e| {
// If the directory doesn't exist then we can just return an empty stream.
if is_not_found_err(&e) {
future::ready(Ok(stream::empty::<Result<ObjectMeta>>().boxed()))
} else {
future::ready(Err(e))
}
})
.try_flatten()
.try_filter_map(|obj_meta| {
// If a file is new-ish then it might be part of an ongoing operation and so we only
// delete it if we can verify it is part of an old version.
let maybe_in_progress = !self.policy.delete_unverified
&& obj_meta.last_modified >= verification_threshold;
let path_to_remove = self.path_if_not_referenced(
obj_meta.location,
maybe_in_progress,
&inspection,
);
if matches!(path_to_remove, Ok(Some(..))) {
removal_stats.lock().unwrap().bytes_removed += obj_meta.size;
}
future::ready(path_to_remove)
})
.boxed()
};

// Restrict scanning to Lance-managed subtrees for safety and performance.
let streams = vec![
build_listing_stream(self.dataset.versions_dir()),
build_listing_stream(self.dataset.transactions_dir()),
build_listing_stream(self.dataset.data_dir()),
build_listing_stream(self.dataset.indices_dir()),
build_listing_stream(self.dataset.deletions_dir()),
];
let unreferenced_paths = stream::iter(streams).flatten().boxed();

let old_manifests = inspection.old_manifests.clone();
let num_old_manifests = old_manifests.len();
Expand Down Expand Up @@ -420,7 +455,7 @@ impl<'a> CleanupTask<'a> {
}
}
Some("txn") => {
if relative_path.as_ref().starts_with("_transactions") {
if relative_path.as_ref().starts_with(TRANSACTIONS_DIR) {
if inspection
.referenced_files
.tx_paths
Expand Down Expand Up @@ -1590,4 +1625,46 @@ mod tests {
assert_eq!(after_count.num_data_files, 3);
assert_eq!(after_count.num_manifest_files, 3);
}

#[tokio::test]
async fn cleanup_preserves_unmanaged_dirs_and_files() {
// Ensure cleanup does not delete unmanaged directories/files under the dataset root
// Uses MockDatasetFixture and run_cleanup_with_override to match other tests' style
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();

let registry = Arc::new(ObjectStoreRegistry::default());
let (os, base) =
ObjectStore::from_uri_and_params(registry, &fixture.dataset_path, &fixture.os_params())
.await
.unwrap();

// Create unmanaged directories/files under dataset root
let img = base.child("images").child("clip.mp4");
let misc = base.child("misc").child("notes.txt");
let branch_file = base.child("tree").child("branchA").child("data.bin");
os.put(&img, b"video").await.unwrap();
os.put(&misc, b"notes").await.unwrap();
os.put(&branch_file, b"branch").await.unwrap();

// Create a temporary manifest file that should be cleaned
let tmp_manifest = base.child("_versions").child(".tmp").child("orphan");
os.put(&tmp_manifest, b"tmp").await.unwrap();
// Delete the _transactions directory so that we can test that if not_found err will be swallowed
os.remove_dir_all(base.child(TRANSACTIONS_DIR))
.await
.unwrap();

fixture
.run_cleanup_with_override(utc_now(), Some(true), Some(false))
.await
.unwrap();

// Temp manifest file is managed by Lance and should be removed
assert!(!os.exists(&tmp_manifest).await.unwrap());
// Unrelated files must remain
assert!(os.exists(&img).await.unwrap());
assert!(os.exists(&misc).await.unwrap());
assert!(os.exists(&branch_file).await.unwrap());
}
}
4 changes: 2 additions & 2 deletions rust/lance/src/dataset/tests/dataset_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::vec;

use crate::dataset::builder::DatasetBuilder;
use crate::dataset::transaction::{Operation, Transaction};
use crate::dataset::{write_manifest_file, ManifestWriteConfig};
use crate::dataset::{write_manifest_file, ManifestWriteConfig, TRANSACTIONS_DIR};
use crate::io::ObjectStoreParams;
use crate::session::Session;
use crate::{Dataset, Result};
Expand Down Expand Up @@ -297,7 +297,7 @@ async fn test_inline_transaction() {

async fn delete_external_tx_file(ds: &Dataset) {
if let Some(tx_file) = ds.manifest.transaction_file.as_ref() {
let tx_path = ds.base.child("_transactions").child(tx_file.as_str());
let tx_path = ds.base.child(TRANSACTIONS_DIR).child(tx_file.as_str());
let _ = ds.object_store.inner.delete(&tx_path).await; // ignore errors
}
}
Expand Down
5 changes: 3 additions & 2 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::dataset::fragment::FileFragment;
use crate::dataset::transaction::{Operation, Transaction};
use crate::dataset::{
load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult,
TRANSACTIONS_DIR,
};
use crate::index::DatasetIndexInternalExt;
use crate::io::deletion::read_dataset_deletion_file;
Expand Down Expand Up @@ -77,7 +78,7 @@ pub(crate) async fn read_transaction_file(
base_path: &Path,
transaction_file: &str,
) -> Result<Transaction> {
let path = base_path.child("_transactions").child(transaction_file);
let path = base_path.child(TRANSACTIONS_DIR).child(transaction_file);
let result = object_store.inner.get(&path).await?;
let data = result.bytes().await?;
let transaction = pb::Transaction::decode(data)?;
Expand All @@ -91,7 +92,7 @@ pub(crate) async fn write_transaction_file(
transaction: &Transaction,
) -> Result<String> {
let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
let path = base_path.child("_transactions").child(file_name.as_str());
let path = base_path.child(TRANSACTIONS_DIR).child(file_name.as_str());

let message = pb::Transaction::from(transaction);
let buf = message.encode_to_vec();
Expand Down
Loading