diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d1e9a0d2e8f..157b1ee160c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -2684,12 +2684,15 @@ fn inner_cleanup_with_policy<'local>( })? .unwrap_or(false); + let delete_rate_limit = env.get_optional_u64_from_method(&jpolicy, "getDeleteRateLimit")?; + let policy = CleanupPolicy { before_timestamp, before_version, delete_unverified, error_if_tagged_old_versions, clean_referenced_branches, + delete_rate_limit, }; let stats = { diff --git a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java index 3b437f0307b..6316f70f1a6 100644 --- a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java +++ b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java @@ -27,18 +27,21 @@ public class CleanupPolicy { private final Optional deleteUnverified; private final Optional errorIfTaggedOldVersions; private final Optional cleanReferencedBranches; + private final Optional deleteRateLimit; private CleanupPolicy( Optional beforeTimestampMillis, Optional beforeVersion, Optional deleteUnverified, Optional errorIfTaggedOldVersions, - Optional cleanReferencedBranches) { + Optional cleanReferencedBranches, + Optional deleteRateLimit) { this.beforeTimestampMillis = beforeTimestampMillis; this.beforeVersion = beforeVersion; this.deleteUnverified = deleteUnverified; this.errorIfTaggedOldVersions = errorIfTaggedOldVersions; this.cleanReferencedBranches = cleanReferencedBranches; + this.deleteRateLimit = deleteRateLimit; } public static Builder builder() { @@ -65,6 +68,10 @@ public Optional getCleanReferencedBranches() { return cleanReferencedBranches; } + public Optional getDeleteRateLimit() { + return deleteRateLimit; + } + /** Builder for CleanupPolicy. */ public static class Builder { private Optional beforeTimestampMillis = Optional.empty(); @@ -72,6 +79,7 @@ public static class Builder { private Optional deleteUnverified = Optional.empty(); private Optional errorIfTaggedOldVersions = Optional.empty(); private Optional cleanReferencedBranches = Optional.empty(); + private Optional deleteRateLimit = Optional.empty(); private Builder() {} @@ -105,13 +113,20 @@ public Builder withCleanReferencedBranches(boolean cleanReferencedBranches) { return this; } + /** Set the maximum number of delete operations per second. */ + public Builder withDeleteRateLimit(long deleteRateLimit) { + this.deleteRateLimit = Optional.of(deleteRateLimit); + return this; + } + public CleanupPolicy build() { return new CleanupPolicy( beforeTimestampMillis, beforeVersion, deleteUnverified, errorIfTaggedOldVersions, - cleanReferencedBranches); + cleanReferencedBranches, + deleteRateLimit); } } } diff --git a/java/src/test/java/org/lance/CleanupTest.java b/java/src/test/java/org/lance/CleanupTest.java index 56955122ca0..79231f32bb9 100644 --- a/java/src/test/java/org/lance/CleanupTest.java +++ b/java/src/test/java/org/lance/CleanupTest.java @@ -22,8 +22,11 @@ import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; +import java.time.Duration; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class CleanupTest { @Test @@ -110,4 +113,35 @@ public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception { } } } + + @Test + public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + + testDataset.createEmptyDataset().close(); + testDataset.write(1, 100).close(); + testDataset.write(2, 100).close(); + try (Dataset dataset = testDataset.write(3, 100)) { + List versions = dataset.listVersions(); + assertEquals(4, versions.size()); + long beforeTimestampMillis = + versions.get(versions.size() - 1).getDataTime().toInstant().toEpochMilli() + 1; + long start = System.nanoTime(); + RemovalStats stats = + dataset.cleanupWithPolicy( + CleanupPolicy.builder() + .withBeforeTimestampMillis(beforeTimestampMillis) + .withDeleteRateLimit(1L) + .build()); + long elapsed = System.nanoTime() - start; + + assertEquals(3L, stats.getOldVersions()); + assertTrue(stats.getBytesRemoved() > 0); + assertTrue(elapsed >= Duration.ofSeconds(2).toNanos()); + } + } + } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 6d5cfee6453..0a5602dcf00 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2419,6 +2419,7 @@ def cleanup_old_versions( *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True, + delete_rate_limit: Optional[int] = None, ) -> CleanupStats: """ Cleans up old versions of the dataset. @@ -2458,6 +2459,12 @@ def cleanup_old_versions( tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up. + + delete_rate_limit: int, optional + Maximum number of delete operations per second. When not set (default), + deletions run at full speed. Set this to a positive integer to avoid + hitting object store request rate limits (e.g. S3 HTTP 503 SlowDown). + For example, ``delete_rate_limit=100`` limits to 100 operations/second. """ if older_than is None and retain_versions is None: older_than = timedelta(days=14) @@ -2467,6 +2474,7 @@ def cleanup_old_versions( retain_versions, delete_unverified, error_if_tagged_old_versions, + delete_rate_limit, ) def create_scalar_index( diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 56fb86d6644..c99f5a8f450 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -313,6 +313,7 @@ class _Dataset: older_than_micros: int, delete_unverified: Optional[bool] = None, error_if_tagged_old_versions: Optional[bool] = None, + delete_rate_limit: Optional[int] = None, ) -> CleanupStats: ... def get_version(self, tag: str) -> int: ... # Tag operations diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 4e0ef9f92c0..98cbeff1111 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1525,6 +1525,36 @@ def test_enable_disable_auto_cleanup(tmp_path): assert len(ds.versions()) == 7 +def test_cleanup_with_rate_limit(tmp_path): + """Test that cleanup_old_versions works with delete_rate_limit parameter.""" + table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) + base_dir = tmp_path / "test" + + lance.write_dataset(table, base_dir, mode="create") + lance.write_dataset(table, base_dir, mode="overwrite") + lance.write_dataset(table, base_dir, mode="overwrite") + lance.write_dataset(table, base_dir, mode="overwrite") + + dataset = lance.dataset(base_dir) + latest_version_timestamp = dataset.versions()[-1]["timestamp"] + now = ( + datetime.now(latest_version_timestamp.tzinfo) + if latest_version_timestamp.tzinfo is not None + else datetime.now() + ) + + start = time.time_ns() + # Cleanup with a rate limit should still remove old versions correctly + stats = dataset.cleanup_old_versions( + older_than=(now - latest_version_timestamp), delete_rate_limit=1 + ) + finished = time.time_ns() + + assert stats.old_versions == 3 + assert stats.bytes_removed > 0 + assert (finished - start) >= 2_000_000_000 # 2s + + def test_create_from_commit(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index b75da04e10a..0be2d9ee6af 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1558,13 +1558,14 @@ impl Dataset { } /// Cleanup old versions from the dataset - #[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))] + #[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None, delete_rate_limit = None))] fn cleanup_old_versions( &self, older_than_micros: Option, retain_versions: Option, delete_unverified: Option, error_if_tagged_old_versions: Option, + delete_rate_limit: Option, ) -> PyResult { let cleanup_stats = rt() .block_on(None, async { @@ -1582,6 +1583,9 @@ impl Dataset { if let Some(v) = error_if_tagged_old_versions { builder = builder.error_if_tagged_old_versions(v); } + if let Some(v) = delete_rate_limit { + builder = builder.delete_rate_limit(v)?; + } self.ds.cleanup_with_policy(builder.build()).await })? diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index f3483bfe345..2c322d89844 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -39,6 +39,7 @@ use crate::{Dataset, utils::temporal::utc_now}; use chrono::{DateTime, TimeDelta, Utc}; use dashmap::DashSet; use futures::future::try_join_all; +use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt, stream}; use humantime::parse_duration; use lance_core::{ @@ -63,7 +64,10 @@ use std::{ collections::{HashMap, HashSet}, future, sync::{Mutex, MutexGuard}, + time::Duration, }; +use tokio::time::{MissedTickBehavior, interval}; +use tokio_stream::wrappers::IntervalStream; use tracing::{Span, debug, info, instrument}; #[derive(Clone, Debug, Default)] @@ -113,6 +117,8 @@ struct CleanupInspection { /// If a file cannot be verified then it will only be deleted if it is at least /// this many days old. const UNVERIFIED_THRESHOLD_DAYS: i64 = 7; +const S3_DELETE_STREAM_BATCH_SIZE: u64 = 1_000; +const AZURE_DELETE_STREAM_BATCH_SIZE: u64 = 256; impl<'a> CleanupTask<'a> { fn new(dataset: &'a Dataset, policy: CleanupPolicy) -> Self { @@ -367,10 +373,24 @@ impl<'a> CleanupTask<'a> { let all_paths_to_remove = stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten(); + let paths_to_delete: BoxStream> = if let Some(rate) = + self.policy.delete_rate_limit + { + let duration = calculate_duration(self.dataset.object_store.scheme().to_string(), rate); + let mut ticker = interval(duration); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + IntervalStream::new(ticker) + .zip(all_paths_to_remove) + .map(|(_, path)| path) + .boxed() + } else { + all_paths_to_remove.boxed() + }; + let delete_fut = self .dataset .object_store - .remove_stream(all_paths_to_remove.boxed()) + .remove_stream(paths_to_delete) .try_for_each(|_| future::ready(Ok(()))); delete_fut.await?; @@ -786,6 +806,25 @@ impl<'a> CleanupTask<'a> { } } +fn calculate_duration(scheme: String, rate: u64) -> Duration { + let batch_size = if scheme.to_lowercase().contains("s3") { + S3_DELETE_STREAM_BATCH_SIZE + } else if scheme.to_lowercase().contains("az") { + AZURE_DELETE_STREAM_BATCH_SIZE + } else { + 1 + }; + let effective_rate = rate.max(1); + let path_rate = effective_rate * batch_size; + info!( + "delete_rate_limit enabled: limit {} delete requests/sec", + effective_rate + ); + // convert user given op/s to the rate of issuing paths + let duration_ns = 1_000_000_000u64.div_ceil(path_rate).max(1); + Duration::from_nanos(duration_ns) +} + #[derive(Clone, Debug)] pub struct CleanupPolicy { /// If not none, cleanup all versions before the specified timestamp. @@ -798,6 +837,12 @@ pub struct CleanupPolicy { pub error_if_tagged_old_versions: bool, /// If clean the referenced branches pub clean_referenced_branches: bool, + /// Maximum number of delete requests per second. If None, no rate limiting is applied. + /// + /// Use this to avoid hitting S3 (or other object store) request rate limits during cleanup. + /// On stores with bulk delete, each request can include multiple paths. + /// For example, `Some(100)` limits deletions to 100 delete requests per second. + pub delete_rate_limit: Option, } impl CleanupPolicy { @@ -821,6 +866,7 @@ impl Default for CleanupPolicy { delete_unverified: false, error_if_tagged_old_versions: true, clean_referenced_branches: false, + delete_rate_limit: None, } } } @@ -874,6 +920,25 @@ impl CleanupPolicyBuilder { self } + /// Limit the number of delete requests per second during cleanup. + /// + /// By default (None), deletions run at full speed. Set this to a positive value to + /// throttle deletions and avoid hitting object store request rate limits (e.g. S3 HTTP 503). + /// On backends with bulk delete APIs, effective path throughput scales with batch size. + /// + /// # Errors + /// + /// Returns an error if `rate` is zero. + pub fn delete_rate_limit(mut self, rate: u64) -> Result { + if rate == 0 { + return Err(Error::Cleanup { + message: format!("delete_rate_limit must be greater than 0, got {}", rate), + }); + } + self.policy.delete_rate_limit = Some(rate); + Ok(self) + } + pub fn build(self) -> CleanupPolicy { self.policy } @@ -1000,6 +1065,23 @@ pub async fn build_cleanup_policy( // Map config to policy flag controlling whether referenced branches are cleaned builder = builder.clean_referenced_branches(clean_referenced); } + if let Some(delete_rate_limit) = manifest.config.get("lance.auto_cleanup.delete_rate_limit") { + let rate: u64 = match delete_rate_limit.parse() { + Ok(r) => r, + Err(e) => { + return Err(Error::Cleanup { + message: format!( + "Error encountered while parsing lance.auto_cleanup.delete_rate_limit as u64: {}", + e + ), + }); + } + }; + builder = match builder.delete_rate_limit(rate) { + Ok(b) => b, + Err(e) => return Err(e), + }; + } Ok(Some(builder.build())) } @@ -3354,4 +3436,65 @@ mod tests { assert_eq!(setup.branch4.counts.num_delete_files, 0); assert_eq!(setup.branch4.counts.num_index_files, 4); } + + #[test] + fn test_calculate_duration_s3() { + // Normal case: duration is computed from S3 batch size and configured rate. + let normal_rate = 100; + let expected_duration_ns = + 1_000_000_000u64.div_ceil(normal_rate * S3_DELETE_STREAM_BATCH_SIZE); + assert_eq!( + calculate_duration("s3".to_string(), normal_rate), + Duration::from_nanos(expected_duration_ns) + ); + + // Edge case: rate too small should be clamped to 1. + let min_rate_duration = calculate_duration("s3".to_string(), 1); + assert_eq!(calculate_duration("s3".to_string(), 0), min_rate_duration); + + // Edge case: computed duration_ns too small should be clamped to at least 1ns. + let very_large_rate = 2_000_000; + assert_eq!( + calculate_duration("s3".to_string(), very_large_rate), + Duration::from_nanos(1) + ); + } + + #[tokio::test] + async fn test_cleanup_with_rate_limit() { + // Create multiple versions with data files that will be deleted. + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + // Create several old versions + for _ in 0..4 { + fixture.overwrite_some_data().await.unwrap(); + } + + MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap()); + + // Set rate limit to 1 ops/second so cleanup of several files must take at least ~1s + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now() - TimeDelta::try_days(8).unwrap()) + .delete_rate_limit(1) + .unwrap() + .build(); + + let start = std::time::Instant::now(); + let db = fixture.open().await.unwrap(); + let stats = cleanup_old_versions(&db, policy).await.unwrap(); + let elapsed = start.elapsed(); + + // We deleted old versions, so there should be removed files + assert!( + stats.old_versions > 0, + "expected some old versions to be removed" + ); + // With rate=1 and multiple files, it must take at least 2s + // (even just 2 deletions at 1/s means ≥2s) + assert!( + elapsed.as_millis() >= 2000, + "expected cleanup to be rate-limited (elapsed: {:?})", + elapsed + ); + } }