From 3b411e332bded365c0e41d1e6143256a0af6cc9d Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 3 Mar 2026 16:47:08 +0800 Subject: [PATCH 01/10] feat(cleanup): support rate limiter for cleanp operation --- java/lance-jni/src/blocking_dataset.rs | 6 + .../java/org/lance/cleanup/CleanupPolicy.java | 19 +++- java/src/test/java/org/lance/CleanupTest.java | 33 ++++++ python/python/lance/dataset.py | 8 ++ python/python/lance/lance/__init__.pyi | 1 + python/python/tests/test_dataset.py | 29 +++++ python/src/dataset.rs | 6 +- rust/lance/src/dataset/cleanup.rs | 105 +++++++++++++++++- 8 files changed, 203 insertions(+), 4 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 7159e946b3c..75dc05b1011 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -2645,12 +2645,18 @@ fn inner_cleanup_with_policy<'local>( })? .unwrap_or(false); + let delete_rate_limit = env + .get_optional_from_method(&jpolicy, "getDeleteRateLimit", |env, obj| { + Ok(env.call_method(obj, "doubleValue", "()D", &[])?.d()?) + })?; + let policy = CleanupPolicy { before_timestamp, before_version, delete_unverified, error_if_tagged_old_versions, clean_referenced_branches, + delete_rate_limit, }; let stats = { diff --git a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java index 3b437f0307b..6575b551c74 100644 --- a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java +++ b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java @@ -27,18 +27,21 @@ public class CleanupPolicy { private final Optional deleteUnverified; private final Optional errorIfTaggedOldVersions; private final Optional cleanReferencedBranches; + private final Optional deleteRateLimit; private CleanupPolicy( Optional beforeTimestampMillis, Optional beforeVersion, Optional deleteUnverified, Optional errorIfTaggedOldVersions, - Optional cleanReferencedBranches) { + Optional cleanReferencedBranches, + Optional deleteRateLimit) { this.beforeTimestampMillis = beforeTimestampMillis; this.beforeVersion = beforeVersion; this.deleteUnverified = deleteUnverified; this.errorIfTaggedOldVersions = errorIfTaggedOldVersions; this.cleanReferencedBranches = cleanReferencedBranches; + this.deleteRateLimit = deleteRateLimit; } public static Builder builder() { @@ -65,6 +68,10 @@ public Optional getCleanReferencedBranches() { return cleanReferencedBranches; } + public Optional getDeleteRateLimit() { + return deleteRateLimit; + } + /** Builder for CleanupPolicy. */ public static class Builder { private Optional beforeTimestampMillis = Optional.empty(); @@ -72,6 +79,7 @@ public static class Builder { private Optional deleteUnverified = Optional.empty(); private Optional errorIfTaggedOldVersions = Optional.empty(); private Optional cleanReferencedBranches = Optional.empty(); + private Optional deleteRateLimit = Optional.empty(); private Builder() {} @@ -105,13 +113,20 @@ public Builder withCleanReferencedBranches(boolean cleanReferencedBranches) { return this; } + /** Set the maximum number of delete operations per second. */ + public Builder withDeleteRateLimit(double deleteRateLimit) { + this.deleteRateLimit = Optional.of(deleteRateLimit); + return this; + } + public CleanupPolicy build() { return new CleanupPolicy( beforeTimestampMillis, beforeVersion, deleteUnverified, errorIfTaggedOldVersions, - cleanReferencedBranches); + cleanReferencedBranches, + deleteRateLimit); } } } diff --git a/java/src/test/java/org/lance/CleanupTest.java b/java/src/test/java/org/lance/CleanupTest.java index 56955122ca0..588036d1f4a 100644 --- a/java/src/test/java/org/lance/CleanupTest.java +++ b/java/src/test/java/org/lance/CleanupTest.java @@ -22,8 +22,10 @@ import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; +import java.time.Duration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class CleanupTest { @Test @@ -110,4 +112,35 @@ public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception { } } } + + @Test + public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + + testDataset.createEmptyDataset().close(); + testDataset.write(1, 100).close(); + Thread.sleep(1000L); + testDataset.write(2, 100).close(); + Thread.sleep(100L); + + long beforeTimestampMillis = System.currentTimeMillis(); + try (Dataset dataset = testDataset.write(3, 100)) { + long start = System.nanoTime(); + RemovalStats stats = + dataset.cleanupWithPolicy( + CleanupPolicy.builder() + .withBeforeTimestampMillis(beforeTimestampMillis) + .withDeleteRateLimit(1.0) + .build()); + long elapsed = System.nanoTime() - start; + + assertEquals(3L, stats.getOldVersions()); + assertTrue(stats.getBytesRemoved() > 0); + assertTrue(elapsed >= Duration.ofSeconds(2).toNanos()); + } + } + } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2f4d78fb1bc..8794b60ae36 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2419,6 +2419,7 @@ def cleanup_old_versions( *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True, + delete_rate_limit: Optional[float] = None, ) -> CleanupStats: """ Cleans up old versions of the dataset. @@ -2458,6 +2459,12 @@ def cleanup_old_versions( tagged versions match the parameters. Otherwise, tagged versions will be ignored without any error and only untagged versions will be cleaned up. + + delete_rate_limit: float, optional + Maximum number of delete operations per second. When not set (default), + deletions run at full speed. Set this to a positive value to avoid + hitting object store request rate limits (e.g. S3 HTTP 503 SlowDown). + For example, ``delete_rate_limit=100.0`` limits to 100 deletes/second. """ if older_than is None and retain_versions is None: older_than = timedelta(days=14) @@ -2467,6 +2474,7 @@ def cleanup_old_versions( retain_versions, delete_unverified, error_if_tagged_old_versions, + delete_rate_limit, ) def create_scalar_index( diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 56fb86d6644..b19791a8dd6 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -313,6 +313,7 @@ class _Dataset: older_than_micros: int, delete_unverified: Optional[bool] = None, error_if_tagged_old_versions: Optional[bool] = None, + delete_rate_limit: Optional[float] = None, ) -> CleanupStats: ... def get_version(self, tag: str) -> int: ... # Tag operations diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 4e0ef9f92c0..b048adf0ef8 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1525,6 +1525,35 @@ def test_enable_disable_auto_cleanup(tmp_path): assert len(ds.versions()) == 7 +def test_cleanup_with_rate_limit(tmp_path): + """Test that cleanup_old_versions works with delete_rate_limit parameter.""" + table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) + base_dir = tmp_path / "test" + + lance.write_dataset(table, base_dir, mode="create") + time.sleep(1) + lance.write_dataset(table, base_dir, mode="overwrite") + time.sleep(1) + lance.write_dataset(table, base_dir, mode="overwrite") + time.sleep(1) + + moment = datetime.now() + lance.write_dataset(table, base_dir, mode="overwrite") + + dataset = lance.dataset(base_dir) + + start = time.time_ns() + # Cleanup with a rate limit should still remove old versions correctly + stats = dataset.cleanup_old_versions( + older_than=(datetime.now() - moment), delete_rate_limit=1.0 + ) + finished = time.time_ns() + + assert stats.old_versions == 3 + assert stats.bytes_removed > 0 + assert (finished - start) >= 2_000_000_000 # 2s + + def test_create_from_commit(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 01362fd03b6..ba1cc520012 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1563,13 +1563,14 @@ impl Dataset { } /// Cleanup old versions from the dataset - #[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))] + #[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None, delete_rate_limit = None))] fn cleanup_old_versions( &self, older_than_micros: Option, retain_versions: Option, delete_unverified: Option, error_if_tagged_old_versions: Option, + delete_rate_limit: Option, ) -> PyResult { let cleanup_stats = rt() .block_on(None, async { @@ -1587,6 +1588,9 @@ impl Dataset { if let Some(v) = error_if_tagged_old_versions { builder = builder.error_if_tagged_old_versions(v); } + if let Some(v) = delete_rate_limit { + builder = builder.delete_rate_limit(v)?; + } self.ds.cleanup_with_policy(builder.build()).await })? diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 802658c4567..1f756b6835d 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -39,6 +39,7 @@ use crate::{utils::temporal::utc_now, Dataset}; use chrono::{DateTime, TimeDelta, Utc}; use dashmap::DashSet; use futures::future::try_join_all; +use futures::stream::BoxStream; use futures::{stream, StreamExt, TryStreamExt}; use humantime::parse_duration; use lance_core::{ @@ -63,7 +64,10 @@ use std::{ collections::{HashMap, HashSet}, future, sync::{Mutex, MutexGuard}, + time::Duration, }; +use tokio::time::{interval, MissedTickBehavior}; +use tokio_stream::wrappers::IntervalStream; use tracing::{debug, info, instrument, Span}; #[derive(Clone, Debug, Default)] @@ -367,10 +371,27 @@ impl<'a> CleanupTask<'a> { let all_paths_to_remove = stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten(); + let paths_to_delete: BoxStream> = + if let Some(rate) = self.policy.delete_rate_limit { + info!( + "delete_rate_limit enable, limit {} ops/sec during cleanup", + rate + ); + let duration = Duration::from_secs_f64(1.0 / rate); + let mut ticker = interval(duration); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + IntervalStream::new(ticker) + .zip(all_paths_to_remove) + .map(|(_, path)| path) + .boxed() + } else { + all_paths_to_remove.boxed() + }; + let delete_fut = self .dataset .object_store - .remove_stream(all_paths_to_remove.boxed()) + .remove_stream(paths_to_delete) .try_for_each(|_| future::ready(Ok(()))); delete_fut.await?; @@ -796,6 +817,11 @@ pub struct CleanupPolicy { pub error_if_tagged_old_versions: bool, /// If clean the referenced branches pub clean_referenced_branches: bool, + /// Maximum number of delete operations per second. If None, no rate limiting is applied. + /// + /// Use this to avoid hitting S3 (or other object store) request rate limits during cleanup. + /// For example, `Some(100.0)` limits deletions to 100 files per second. + pub delete_rate_limit: Option, } impl CleanupPolicy { @@ -819,6 +845,7 @@ impl Default for CleanupPolicy { delete_unverified: false, error_if_tagged_old_versions: true, clean_referenced_branches: false, + delete_rate_limit: None, } } } @@ -872,6 +899,27 @@ impl CleanupPolicyBuilder { self } + /// Limit the number of delete operations per second during cleanup. + /// + /// By default (None), deletions run at full speed. Set this to a positive value to + /// throttle deletions and avoid hitting object store request rate limits (e.g. S3 HTTP 503). + /// + /// # Errors + /// + /// Returns an error if `rate` is not a positive finite number. + pub fn delete_rate_limit(mut self, rate: f64) -> Result { + if !rate.is_finite() || rate <= 0.0 { + return Err(Error::Cleanup { + message: format!( + "delete_rate_limit must be a positive finite number, got {}", + rate + ), + }); + } + self.policy.delete_rate_limit = Some(rate); + Ok(self) + } + pub fn build(self) -> CleanupPolicy { self.policy } @@ -998,6 +1046,23 @@ pub async fn build_cleanup_policy( // Map config to policy flag controlling whether referenced branches are cleaned builder = builder.clean_referenced_branches(clean_referenced); } + if let Some(delete_rate_limit) = manifest.config.get("lance.auto_cleanup.delete_rate_limit") { + let rate: f64 = match delete_rate_limit.parse() { + Ok(r) => r, + Err(e) => { + return Err(Error::Cleanup { + message: format!( + "Error encountered while parsing lance.auto_cleanup.delete_rate_limit as f64: {}", + e + ), + }); + } + }; + builder = match builder.delete_rate_limit(rate) { + Ok(b) => b, + Err(e) => return Err(e), + }; + } Ok(Some(builder.build())) } @@ -3357,4 +3422,42 @@ mod tests { assert_eq!(setup.branch4.counts.num_delete_files, 0); assert_eq!(setup.branch4.counts.num_index_files, 4); } + + #[tokio::test] + async fn test_cleanup_with_rate_limit() { + // Create multiple versions with data files that will be deleted. + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + // Create several old versions + for _ in 0..4 { + fixture.overwrite_some_data().await.unwrap(); + } + + MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap()); + + // Set rate limit to 1 ops/second so cleanup of several files must take at least ~1s + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now() - TimeDelta::try_days(8).unwrap()) + .delete_rate_limit(1.0) + .unwrap() + .build(); + + let start = std::time::Instant::now(); + let db = fixture.open().await.unwrap(); + let stats = cleanup_old_versions(&db, policy).await.unwrap(); + let elapsed = start.elapsed(); + + // We deleted old versions, so there should be removed files + assert!( + stats.old_versions > 0, + "expected some old versions to be removed" + ); + // With rate=1 and multiple files, it must take at least 2s + // (even just 2 deletions at 1/s means ≥2s) + assert!( + elapsed.as_millis() >= 2000, + "expected cleanup to be rate-limited (elapsed: {:?})", + elapsed + ); + } } From f8c8b495e5e11ddbb8f048ca756290230f6133ee Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 3 Mar 2026 16:57:19 +0800 Subject: [PATCH 02/10] feat(cleanup): support rate limiter for cleanp operation --- java/lance-jni/src/blocking_dataset.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index c746a97427f..77925c93525 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -2666,8 +2666,8 @@ fn inner_cleanup_with_policy<'local>( })? .unwrap_or(false); - let delete_rate_limit = env - .get_optional_from_method(&jpolicy, "getDeleteRateLimit", |env, obj| { + let delete_rate_limit = + env.get_optional_from_method(&jpolicy, "getDeleteRateLimit", |env, obj| { Ok(env.call_method(obj, "doubleValue", "()D", &[])?.d()?) })?; From 3826434df797e03a5b53cdf61abea56eb6701a21 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 3 Mar 2026 17:30:26 +0800 Subject: [PATCH 03/10] fmt --- rust/lance/src/dataset/cleanup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7774ba1f1a6..d143441f692 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -66,7 +66,7 @@ use std::{ sync::{Mutex, MutexGuard}, time::Duration, }; -use tokio::time::{interval, MissedTickBehavior}; +use tokio::time::{MissedTickBehavior, interval}; use tokio_stream::wrappers::IntervalStream; use tracing::{Span, debug, info, instrument}; From 9b1e63708778a6e9ba32206302ff3b81387789df Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 3 Mar 2026 17:33:01 +0800 Subject: [PATCH 04/10] fmt --- rust/lance/src/dataset/cleanup.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index d143441f692..fac7f0235f8 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -377,7 +377,10 @@ impl<'a> CleanupTask<'a> { "delete_rate_limit enable, limit {} ops/sec during cleanup", rate ); - let duration = Duration::from_secs_f64(1.0 / rate); + let duration = Duration::try_from_secs_f64(1.0 / rate) + .map_err(|e| Error::Cleanup { + message: format!("delete_rate_limit {} is too small: {}", rate, e), + })?; let mut ticker = interval(duration); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); IntervalStream::new(ticker) From d4c06afb905a392f6a51a59e0d91d9a57e1e0574 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Tue, 3 Mar 2026 17:33:29 +0800 Subject: [PATCH 05/10] fmt --- rust/lance/src/dataset/cleanup.rs | 38 +++++++++++++++---------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index fac7f0235f8..3798b2a17c0 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -371,25 +371,25 @@ impl<'a> CleanupTask<'a> { let all_paths_to_remove = stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten(); - let paths_to_delete: BoxStream> = - if let Some(rate) = self.policy.delete_rate_limit { - info!( - "delete_rate_limit enable, limit {} ops/sec during cleanup", - rate - ); - let duration = Duration::try_from_secs_f64(1.0 / rate) - .map_err(|e| Error::Cleanup { - message: format!("delete_rate_limit {} is too small: {}", rate, e), - })?; - let mut ticker = interval(duration); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - IntervalStream::new(ticker) - .zip(all_paths_to_remove) - .map(|(_, path)| path) - .boxed() - } else { - all_paths_to_remove.boxed() - }; + let paths_to_delete: BoxStream> = if let Some(rate) = + self.policy.delete_rate_limit + { + info!( + "delete_rate_limit enable, limit {} ops/sec during cleanup", + rate + ); + let duration = Duration::try_from_secs_f64(1.0 / rate).map_err(|e| Error::Cleanup { + message: format!("delete_rate_limit {} is too small: {}", rate, e), + })?; + let mut ticker = interval(duration); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + IntervalStream::new(ticker) + .zip(all_paths_to_remove) + .map(|(_, path)| path) + .boxed() + } else { + all_paths_to_remove.boxed() + }; let delete_fut = self .dataset From 82df0a73e3511abe04ec3f54ef6dbeeb83c3ca4e Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Mar 2026 15:56:39 +0800 Subject: [PATCH 06/10] change to u64 --- java/lance-jni/src/blocking_dataset.rs | 5 +-- .../java/org/lance/cleanup/CleanupPolicy.java | 10 ++--- java/src/test/java/org/lance/CleanupTest.java | 2 +- python/python/lance/dataset.py | 8 ++-- python/python/lance/lance/__init__.pyi | 2 +- python/python/tests/test_dataset.py | 2 +- python/src/dataset.rs | 2 +- rust/lance/src/dataset/cleanup.rs | 38 +++++++++++-------- 8 files changed, 36 insertions(+), 33 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index e7daa760526..157b1ee160c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -2684,10 +2684,7 @@ fn inner_cleanup_with_policy<'local>( })? .unwrap_or(false); - let delete_rate_limit = - env.get_optional_from_method(&jpolicy, "getDeleteRateLimit", |env, obj| { - Ok(env.call_method(obj, "doubleValue", "()D", &[])?.d()?) - })?; + let delete_rate_limit = env.get_optional_u64_from_method(&jpolicy, "getDeleteRateLimit")?; let policy = CleanupPolicy { before_timestamp, diff --git a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java index 6575b551c74..6316f70f1a6 100644 --- a/java/src/main/java/org/lance/cleanup/CleanupPolicy.java +++ b/java/src/main/java/org/lance/cleanup/CleanupPolicy.java @@ -27,7 +27,7 @@ public class CleanupPolicy { private final Optional deleteUnverified; private final Optional errorIfTaggedOldVersions; private final Optional cleanReferencedBranches; - private final Optional deleteRateLimit; + private final Optional deleteRateLimit; private CleanupPolicy( Optional beforeTimestampMillis, @@ -35,7 +35,7 @@ private CleanupPolicy( Optional deleteUnverified, Optional errorIfTaggedOldVersions, Optional cleanReferencedBranches, - Optional deleteRateLimit) { + Optional deleteRateLimit) { this.beforeTimestampMillis = beforeTimestampMillis; this.beforeVersion = beforeVersion; this.deleteUnverified = deleteUnverified; @@ -68,7 +68,7 @@ public Optional getCleanReferencedBranches() { return cleanReferencedBranches; } - public Optional getDeleteRateLimit() { + public Optional getDeleteRateLimit() { return deleteRateLimit; } @@ -79,7 +79,7 @@ public static class Builder { private Optional deleteUnverified = Optional.empty(); private Optional errorIfTaggedOldVersions = Optional.empty(); private Optional cleanReferencedBranches = Optional.empty(); - private Optional deleteRateLimit = Optional.empty(); + private Optional deleteRateLimit = Optional.empty(); private Builder() {} @@ -114,7 +114,7 @@ public Builder withCleanReferencedBranches(boolean cleanReferencedBranches) { } /** Set the maximum number of delete operations per second. */ - public Builder withDeleteRateLimit(double deleteRateLimit) { + public Builder withDeleteRateLimit(long deleteRateLimit) { this.deleteRateLimit = Optional.of(deleteRateLimit); return this; } diff --git a/java/src/test/java/org/lance/CleanupTest.java b/java/src/test/java/org/lance/CleanupTest.java index 588036d1f4a..5df49d4fff4 100644 --- a/java/src/test/java/org/lance/CleanupTest.java +++ b/java/src/test/java/org/lance/CleanupTest.java @@ -133,7 +133,7 @@ public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception { dataset.cleanupWithPolicy( CleanupPolicy.builder() .withBeforeTimestampMillis(beforeTimestampMillis) - .withDeleteRateLimit(1.0) + .withDeleteRateLimit(1L) .build()); long elapsed = System.nanoTime() - start; diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index b9e41bfe297..0a5602dcf00 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2419,7 +2419,7 @@ def cleanup_old_versions( *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True, - delete_rate_limit: Optional[float] = None, + delete_rate_limit: Optional[int] = None, ) -> CleanupStats: """ Cleans up old versions of the dataset. @@ -2460,11 +2460,11 @@ def cleanup_old_versions( be ignored without any error and only untagged versions will be cleaned up. - delete_rate_limit: float, optional + delete_rate_limit: int, optional Maximum number of delete operations per second. When not set (default), - deletions run at full speed. Set this to a positive value to avoid + deletions run at full speed. Set this to a positive integer to avoid hitting object store request rate limits (e.g. S3 HTTP 503 SlowDown). - For example, ``delete_rate_limit=100.0`` limits to 100 deletes/second. + For example, ``delete_rate_limit=100`` limits to 100 operations/second. """ if older_than is None and retain_versions is None: older_than = timedelta(days=14) diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index b19791a8dd6..c99f5a8f450 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -313,7 +313,7 @@ class _Dataset: older_than_micros: int, delete_unverified: Optional[bool] = None, error_if_tagged_old_versions: Optional[bool] = None, - delete_rate_limit: Optional[float] = None, + delete_rate_limit: Optional[int] = None, ) -> CleanupStats: ... def get_version(self, tag: str) -> int: ... # Tag operations diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index b048adf0ef8..84113a41f45 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1545,7 +1545,7 @@ def test_cleanup_with_rate_limit(tmp_path): start = time.time_ns() # Cleanup with a rate limit should still remove old versions correctly stats = dataset.cleanup_old_versions( - older_than=(datetime.now() - moment), delete_rate_limit=1.0 + older_than=(datetime.now() - moment), delete_rate_limit=1 ) finished = time.time_ns() diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 75f632b131b..0be2d9ee6af 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1565,7 +1565,7 @@ impl Dataset { retain_versions: Option, delete_unverified: Option, error_if_tagged_old_versions: Option, - delete_rate_limit: Option, + delete_rate_limit: Option, ) -> PyResult { let cleanup_stats = rt() .block_on(None, async { diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index bc6bd63c363..751dea3a63e 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -375,12 +375,21 @@ impl<'a> CleanupTask<'a> { self.policy.delete_rate_limit { info!( - "delete_rate_limit enable, limit {} ops/sec during cleanup", + "delete_rate_limit enable, limit {} delete operation per sec during cleanup", rate ); - let duration = Duration::try_from_secs_f64(1.0 / rate).map_err(|e| Error::Cleanup { - message: format!("delete_rate_limit {} is too small: {}", rate, e), - })?; + let duration = + Duration::try_from_secs_f64(1.0 / (rate as f64)).map_err(|e| Error::Cleanup { + message: format!("delete_rate_limit {} is invalid: {}", rate, e), + })?; + if duration.is_zero() { + return Err(Error::Cleanup { + message: format!( + "delete_rate_limit {} is too large; minimum supported interval is 1ns", + rate + ), + }); + } let mut ticker = interval(duration); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); IntervalStream::new(ticker) @@ -825,8 +834,8 @@ pub struct CleanupPolicy { /// Maximum number of delete operations per second. If None, no rate limiting is applied. /// /// Use this to avoid hitting S3 (or other object store) request rate limits during cleanup. - /// For example, `Some(100.0)` limits deletions to 100 files per second. - pub delete_rate_limit: Option, + /// For example, `Some(100)` limits deletions to 100 operations per second. + pub delete_rate_limit: Option, } impl CleanupPolicy { @@ -911,14 +920,11 @@ impl CleanupPolicyBuilder { /// /// # Errors /// - /// Returns an error if `rate` is not a positive finite number. - pub fn delete_rate_limit(mut self, rate: f64) -> Result { - if !rate.is_finite() || rate <= 0.0 { + /// Returns an error if `rate` is zero. + pub fn delete_rate_limit(mut self, rate: u64) -> Result { + if rate == 0 { return Err(Error::Cleanup { - message: format!( - "delete_rate_limit must be a positive finite number, got {}", - rate - ), + message: format!("delete_rate_limit must be greater than 0, got {}", rate), }); } self.policy.delete_rate_limit = Some(rate); @@ -1052,12 +1058,12 @@ pub async fn build_cleanup_policy( builder = builder.clean_referenced_branches(clean_referenced); } if let Some(delete_rate_limit) = manifest.config.get("lance.auto_cleanup.delete_rate_limit") { - let rate: f64 = match delete_rate_limit.parse() { + let rate: u64 = match delete_rate_limit.parse() { Ok(r) => r, Err(e) => { return Err(Error::Cleanup { message: format!( - "Error encountered while parsing lance.auto_cleanup.delete_rate_limit as f64: {}", + "Error encountered while parsing lance.auto_cleanup.delete_rate_limit as u64: {}", e ), }); @@ -3438,7 +3444,7 @@ mod tests { // Set rate limit to 1 ops/second so cleanup of several files must take at least ~1s let policy = CleanupPolicyBuilder::default() .before_timestamp(utc_now() - TimeDelta::try_days(8).unwrap()) - .delete_rate_limit(1.0) + .delete_rate_limit(1) .unwrap() .build(); From 88ce336ddb70db1b19d355527387cdb6e0de2291 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Mar 2026 17:47:49 +0800 Subject: [PATCH 07/10] code review --- java/src/test/java/org/lance/CleanupTest.java | 9 ++-- python/python/tests/test_dataset.py | 13 +++--- rust/lance/src/dataset/cleanup.rs | 44 ++++++++++++------- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/java/src/test/java/org/lance/CleanupTest.java b/java/src/test/java/org/lance/CleanupTest.java index 5df49d4fff4..79231f32bb9 100644 --- a/java/src/test/java/org/lance/CleanupTest.java +++ b/java/src/test/java/org/lance/CleanupTest.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.time.Duration; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -122,12 +123,12 @@ public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception { testDataset.createEmptyDataset().close(); testDataset.write(1, 100).close(); - Thread.sleep(1000L); testDataset.write(2, 100).close(); - Thread.sleep(100L); - - long beforeTimestampMillis = System.currentTimeMillis(); try (Dataset dataset = testDataset.write(3, 100)) { + List versions = dataset.listVersions(); + assertEquals(4, versions.size()); + long beforeTimestampMillis = + versions.get(versions.size() - 1).getDataTime().toInstant().toEpochMilli() + 1; long start = System.nanoTime(); RemovalStats stats = dataset.cleanupWithPolicy( diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 84113a41f45..98cbeff1111 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1531,21 +1531,22 @@ def test_cleanup_with_rate_limit(tmp_path): base_dir = tmp_path / "test" lance.write_dataset(table, base_dir, mode="create") - time.sleep(1) lance.write_dataset(table, base_dir, mode="overwrite") - time.sleep(1) lance.write_dataset(table, base_dir, mode="overwrite") - time.sleep(1) - - moment = datetime.now() lance.write_dataset(table, base_dir, mode="overwrite") dataset = lance.dataset(base_dir) + latest_version_timestamp = dataset.versions()[-1]["timestamp"] + now = ( + datetime.now(latest_version_timestamp.tzinfo) + if latest_version_timestamp.tzinfo is not None + else datetime.now() + ) start = time.time_ns() # Cleanup with a rate limit should still remove old versions correctly stats = dataset.cleanup_old_versions( - older_than=(datetime.now() - moment), delete_rate_limit=1 + older_than=(now - latest_version_timestamp), delete_rate_limit=1 ) finished = time.time_ns() diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 751dea3a63e..4f6dee83435 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -117,6 +117,8 @@ struct CleanupInspection { /// If a file cannot be verified then it will only be deleted if it is at least /// this many days old. const UNVERIFIED_THRESHOLD_DAYS: i64 = 7; +const S3_DELETE_STREAM_BATCH_SIZE: u64 = 1_000; +const AZURE_DELETE_STREAM_BATCH_SIZE: u64 = 256; impl<'a> CleanupTask<'a> { fn new(dataset: &'a Dataset, policy: CleanupPolicy) -> Self { @@ -374,22 +376,16 @@ impl<'a> CleanupTask<'a> { let paths_to_delete: BoxStream> = if let Some(rate) = self.policy.delete_rate_limit { + let batch_size = delete_stream_batch_size(self.dataset.object_store.as_ref()); + let effective_rate = rate.max(1); + let path_rate = effective_rate * batch_size; info!( - "delete_rate_limit enable, limit {} delete operation per sec during cleanup", - rate + "delete_rate_limit enabled: limit {} delete requests/sec", + effective_rate ); - let duration = - Duration::try_from_secs_f64(1.0 / (rate as f64)).map_err(|e| Error::Cleanup { - message: format!("delete_rate_limit {} is invalid: {}", rate, e), - })?; - if duration.is_zero() { - return Err(Error::Cleanup { - message: format!( - "delete_rate_limit {} is too large; minimum supported interval is 1ns", - rate - ), - }); - } + // convert user given op/s to the rate of issuing paths + let duration_ns = 1_000_000_000u64.div_ceil(path_rate).max(1); + let duration = Duration::from_nanos(duration_ns); let mut ticker = interval(duration); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); IntervalStream::new(ticker) @@ -819,6 +815,18 @@ impl<'a> CleanupTask<'a> { } } +/// quick get the object_store delete batch size based on different storage type. +fn delete_stream_batch_size(object_store: &lance_io::object_store::ObjectStore) -> u64 { + let scheme = object_store.scheme().to_lowercase(); + if scheme.contains("s3") { + S3_DELETE_STREAM_BATCH_SIZE + } else if scheme.contains("az") { + AZURE_DELETE_STREAM_BATCH_SIZE + } else { + 1 + } +} + #[derive(Clone, Debug)] pub struct CleanupPolicy { /// If not none, cleanup all versions before the specified timestamp. @@ -831,10 +839,11 @@ pub struct CleanupPolicy { pub error_if_tagged_old_versions: bool, /// If clean the referenced branches pub clean_referenced_branches: bool, - /// Maximum number of delete operations per second. If None, no rate limiting is applied. + /// Maximum number of delete requests per second. If None, no rate limiting is applied. /// /// Use this to avoid hitting S3 (or other object store) request rate limits during cleanup. - /// For example, `Some(100)` limits deletions to 100 operations per second. + /// On stores with bulk delete, each request can include multiple paths. + /// For example, `Some(100)` limits deletions to 100 delete requests per second. pub delete_rate_limit: Option, } @@ -913,10 +922,11 @@ impl CleanupPolicyBuilder { self } - /// Limit the number of delete operations per second during cleanup. + /// Limit the number of delete requests per second during cleanup. /// /// By default (None), deletions run at full speed. Set this to a positive value to /// throttle deletions and avoid hitting object store request rate limits (e.g. S3 HTTP 503). + /// On backends with bulk delete APIs, effective path throughput scales with batch size. /// /// # Errors /// From 48b52aaf8c956cb890a7655dfdf887406c4e50f9 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Mar 2026 20:39:37 +0800 Subject: [PATCH 08/10] code review --- rust/lance/src/dataset/cleanup.rs | 139 +++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 10 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 4f6dee83435..bb255ff8ec3 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -376,16 +376,7 @@ impl<'a> CleanupTask<'a> { let paths_to_delete: BoxStream> = if let Some(rate) = self.policy.delete_rate_limit { - let batch_size = delete_stream_batch_size(self.dataset.object_store.as_ref()); - let effective_rate = rate.max(1); - let path_rate = effective_rate * batch_size; - info!( - "delete_rate_limit enabled: limit {} delete requests/sec", - effective_rate - ); - // convert user given op/s to the rate of issuing paths - let duration_ns = 1_000_000_000u64.div_ceil(path_rate).max(1); - let duration = Duration::from_nanos(duration_ns); + let duration = calculate_duration(self.dataset.object_store.scheme().to_string(), rate); let mut ticker = interval(duration); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); IntervalStream::new(ticker) @@ -815,6 +806,25 @@ impl<'a> CleanupTask<'a> { } } +fn calculate_duration(scheme: String, rate: u64) -> Duration { + let batch_size = if scheme.to_lowercase().contains("s3") { + S3_DELETE_STREAM_BATCH_SIZE + } else if scheme.to_lowercase().contains("az") { + AZURE_DELETE_STREAM_BATCH_SIZE + } else { + 1 + }; + let effective_rate = rate.max(1); + let path_rate = effective_rate * batch_size; + info!( + "delete_rate_limit enabled: limit {} delete requests/sec", + effective_rate + ); + // convert user given op/s to the rate of issuing paths + let duration_ns = 1_000_000_000u64.div_ceil(path_rate).max(1); + Duration::from_nanos(duration_ns) +} + /// quick get the object_store delete batch size based on different storage type. fn delete_stream_batch_size(object_store: &lance_io::object_store::ObjectStore) -> u64 { let scheme = object_store.scheme().to_lowercase(); @@ -1121,6 +1131,8 @@ mod tests { use super::*; use crate::blob::{BlobArrayBuilder, blob_field}; + #[cfg(feature = "dynamodb_tests")] + use crate::io::StorageOptionsAccessor; use crate::{ dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder}, index::vector::VectorIndexParams, @@ -1131,6 +1143,10 @@ mod tests { Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt64Array, }; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + #[cfg(feature = "dynamodb_tests")] + use aws_config::{BehaviorVersion, ConfigLoader, Region, SdkConfig}; + #[cfg(feature = "dynamodb_tests")] + use aws_sdk_s3::{Client as S3Client, config::Credentials}; use datafusion::common::assert_contains; use lance_core::utils::tempfile::TempStrDir; use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy}; @@ -1142,6 +1158,86 @@ mod tests { use lance_table::io::commit::RenameCommitHandler; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, some_batch}; use mock_instant::thread_local::MockClock; + #[cfg(feature = "dynamodb_tests")] + use uuid::Uuid; + + #[cfg(feature = "dynamodb_tests")] + const S3_TEST_CONFIG: &[(&str, &str)] = &[ + ("access_key_id", "ACCESS_KEY"), + ("secret_access_key", "SECRET_KEY"), + ("endpoint", "http://127.0.0.1:4566"), + ("allow_http", "true"), + ("region", "us-east-1"), + ]; + + #[cfg(feature = "dynamodb_tests")] + async fn s3_test_aws_config() -> SdkConfig { + let credentials = Credentials::new( + S3_TEST_CONFIG[0].1, + S3_TEST_CONFIG[1].1, + None, + None, + "static", + ); + ConfigLoader::default() + .credentials_provider(credentials) + .endpoint_url(S3_TEST_CONFIG[2].1) + .behavior_version(BehaviorVersion::latest()) + .region(Region::new(S3_TEST_CONFIG[4].1)) + .load() + .await + } + + #[cfg(feature = "dynamodb_tests")] + struct S3Bucket(String); + + #[cfg(feature = "dynamodb_tests")] + impl S3Bucket { + async fn new(bucket: &str) -> Self { + let config = s3_test_aws_config().await; + let client = S3Client::new(&config); + Self::delete_bucket(client.clone(), bucket).await; + client.create_bucket().bucket(bucket).send().await.unwrap(); + Self(bucket.to_string()) + } + + async fn delete_bucket(client: S3Client, bucket: &str) { + let res = client + .list_objects_v2() + .bucket(bucket) + .send() + .await + .map_err(|err| err.into_service_error()); + match res { + Err(e) if e.is_no_such_bucket() => return, + Err(e) => panic!("Failed to list objects in bucket: {}", e), + _ => {} + } + let objects = res.unwrap().contents.unwrap_or_default(); + for object in objects { + client + .delete_object() + .bucket(bucket) + .key(object.key.unwrap()) + .send() + .await + .unwrap(); + } + client.delete_bucket().bucket(bucket).send().await.unwrap(); + } + } + + #[cfg(feature = "dynamodb_tests")] + impl Drop for S3Bucket { + fn drop(&mut self) { + let bucket_name = self.0.clone(); + tokio::task::spawn(async move { + let config = s3_test_aws_config().await; + let client = S3Client::new(&config); + S3Bucket::delete_bucket(client, &bucket_name).await; + }); + } + } #[derive(Debug)] struct MockObjectStore { @@ -3439,6 +3535,29 @@ mod tests { assert_eq!(setup.branch4.counts.num_index_files, 4); } + #[test] + fn test_calculate_duration_s3() { + // Normal case: duration is computed from S3 batch size and configured rate. + let normal_rate = 100; + let expected_duration_ns = + 1_000_000_000u64.div_ceil(normal_rate * S3_DELETE_STREAM_BATCH_SIZE); + assert_eq!( + calculate_duration("s3".to_string(), normal_rate), + Duration::from_nanos(expected_duration_ns) + ); + + // Edge case: rate too small should be clamped to 1. + let min_rate_duration = calculate_duration("s3".to_string(), 1); + assert_eq!(calculate_duration("s3".to_string(), 0), min_rate_duration); + + // Edge case: computed duration_ns too small should be clamped to at least 1ns. + let very_large_rate = 2_000_000; + assert_eq!( + calculate_duration("s3".to_string(), very_large_rate), + Duration::from_nanos(1) + ); + } + #[tokio::test] async fn test_cleanup_with_rate_limit() { // Create multiple versions with data files that will be deleted. From bef85a446e892dd5b2c052527ea66374c98fe368 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Mar 2026 20:43:56 +0800 Subject: [PATCH 09/10] code review --- rust/lance/src/dataset/cleanup.rs | 86 ------------------------------- 1 file changed, 86 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index bb255ff8ec3..d377b234633 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1131,8 +1131,6 @@ mod tests { use super::*; use crate::blob::{BlobArrayBuilder, blob_field}; - #[cfg(feature = "dynamodb_tests")] - use crate::io::StorageOptionsAccessor; use crate::{ dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder}, index::vector::VectorIndexParams, @@ -1143,10 +1141,6 @@ mod tests { Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt64Array, }; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; - #[cfg(feature = "dynamodb_tests")] - use aws_config::{BehaviorVersion, ConfigLoader, Region, SdkConfig}; - #[cfg(feature = "dynamodb_tests")] - use aws_sdk_s3::{Client as S3Client, config::Credentials}; use datafusion::common::assert_contains; use lance_core::utils::tempfile::TempStrDir; use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy}; @@ -1158,86 +1152,6 @@ mod tests { use lance_table::io::commit::RenameCommitHandler; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, some_batch}; use mock_instant::thread_local::MockClock; - #[cfg(feature = "dynamodb_tests")] - use uuid::Uuid; - - #[cfg(feature = "dynamodb_tests")] - const S3_TEST_CONFIG: &[(&str, &str)] = &[ - ("access_key_id", "ACCESS_KEY"), - ("secret_access_key", "SECRET_KEY"), - ("endpoint", "http://127.0.0.1:4566"), - ("allow_http", "true"), - ("region", "us-east-1"), - ]; - - #[cfg(feature = "dynamodb_tests")] - async fn s3_test_aws_config() -> SdkConfig { - let credentials = Credentials::new( - S3_TEST_CONFIG[0].1, - S3_TEST_CONFIG[1].1, - None, - None, - "static", - ); - ConfigLoader::default() - .credentials_provider(credentials) - .endpoint_url(S3_TEST_CONFIG[2].1) - .behavior_version(BehaviorVersion::latest()) - .region(Region::new(S3_TEST_CONFIG[4].1)) - .load() - .await - } - - #[cfg(feature = "dynamodb_tests")] - struct S3Bucket(String); - - #[cfg(feature = "dynamodb_tests")] - impl S3Bucket { - async fn new(bucket: &str) -> Self { - let config = s3_test_aws_config().await; - let client = S3Client::new(&config); - Self::delete_bucket(client.clone(), bucket).await; - client.create_bucket().bucket(bucket).send().await.unwrap(); - Self(bucket.to_string()) - } - - async fn delete_bucket(client: S3Client, bucket: &str) { - let res = client - .list_objects_v2() - .bucket(bucket) - .send() - .await - .map_err(|err| err.into_service_error()); - match res { - Err(e) if e.is_no_such_bucket() => return, - Err(e) => panic!("Failed to list objects in bucket: {}", e), - _ => {} - } - let objects = res.unwrap().contents.unwrap_or_default(); - for object in objects { - client - .delete_object() - .bucket(bucket) - .key(object.key.unwrap()) - .send() - .await - .unwrap(); - } - client.delete_bucket().bucket(bucket).send().await.unwrap(); - } - } - - #[cfg(feature = "dynamodb_tests")] - impl Drop for S3Bucket { - fn drop(&mut self) { - let bucket_name = self.0.clone(); - tokio::task::spawn(async move { - let config = s3_test_aws_config().await; - let client = S3Client::new(&config); - S3Bucket::delete_bucket(client, &bucket_name).await; - }); - } - } #[derive(Debug)] struct MockObjectStore { From 5a6bbbb4ce5842e783ff70bd7cbf785b4e1ee81c Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Mar 2026 20:52:18 +0800 Subject: [PATCH 10/10] code review --- rust/lance/src/dataset/cleanup.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index d377b234633..2c322d89844 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -825,18 +825,6 @@ fn calculate_duration(scheme: String, rate: u64) -> Duration { Duration::from_nanos(duration_ns) } -/// quick get the object_store delete batch size based on different storage type. -fn delete_stream_batch_size(object_store: &lance_io::object_store::ObjectStore) -> u64 { - let scheme = object_store.scheme().to_lowercase(); - if scheme.contains("s3") { - S3_DELETE_STREAM_BATCH_SIZE - } else if scheme.contains("az") { - AZURE_DELETE_STREAM_BATCH_SIZE - } else { - 1 - } -} - #[derive(Clone, Debug)] pub struct CleanupPolicy { /// If not none, cleanup all versions before the specified timestamp.