Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2684,12 +2684,15 @@ fn inner_cleanup_with_policy<'local>(
})?
.unwrap_or(false);

let delete_rate_limit = env.get_optional_u64_from_method(&jpolicy, "getDeleteRateLimit")?;

let policy = CleanupPolicy {
before_timestamp,
before_version,
delete_unverified,
error_if_tagged_old_versions,
clean_referenced_branches,
delete_rate_limit,
};

let stats = {
Expand Down
19 changes: 17 additions & 2 deletions java/src/main/java/org/lance/cleanup/CleanupPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ public class CleanupPolicy {
private final Optional<Boolean> deleteUnverified;
private final Optional<Boolean> errorIfTaggedOldVersions;
private final Optional<Boolean> cleanReferencedBranches;
private final Optional<Long> deleteRateLimit;

private CleanupPolicy(
Optional<Long> beforeTimestampMillis,
Optional<Long> beforeVersion,
Optional<Boolean> deleteUnverified,
Optional<Boolean> errorIfTaggedOldVersions,
Optional<Boolean> cleanReferencedBranches) {
Optional<Boolean> cleanReferencedBranches,
Optional<Long> deleteRateLimit) {
this.beforeTimestampMillis = beforeTimestampMillis;
this.beforeVersion = beforeVersion;
this.deleteUnverified = deleteUnverified;
this.errorIfTaggedOldVersions = errorIfTaggedOldVersions;
this.cleanReferencedBranches = cleanReferencedBranches;
this.deleteRateLimit = deleteRateLimit;
}

public static Builder builder() {
Expand All @@ -65,13 +68,18 @@ public Optional<Boolean> getCleanReferencedBranches() {
return cleanReferencedBranches;
}

public Optional<Long> getDeleteRateLimit() {
return deleteRateLimit;
}

/** Builder for CleanupPolicy. */
public static class Builder {
private Optional<Long> beforeTimestampMillis = Optional.empty();
private Optional<Long> beforeVersion = Optional.empty();
private Optional<Boolean> deleteUnverified = Optional.empty();
private Optional<Boolean> errorIfTaggedOldVersions = Optional.empty();
private Optional<Boolean> cleanReferencedBranches = Optional.empty();
private Optional<Long> deleteRateLimit = Optional.empty();

private Builder() {}

Expand Down Expand Up @@ -105,13 +113,20 @@ public Builder withCleanReferencedBranches(boolean cleanReferencedBranches) {
return this;
}

/** Set the maximum number of delete operations per second. */
public Builder withDeleteRateLimit(long deleteRateLimit) {
this.deleteRateLimit = Optional.of(deleteRateLimit);
return this;
}

public CleanupPolicy build() {
return new CleanupPolicy(
beforeTimestampMillis,
beforeVersion,
deleteUnverified,
errorIfTaggedOldVersions,
cleanReferencedBranches);
cleanReferencedBranches,
deleteRateLimit);
}
}
}
34 changes: 34 additions & 0 deletions java/src/test/java/org/lance/CleanupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.time.Duration;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CleanupTest {
@Test
Expand Down Expand Up @@ -110,4 +113,35 @@ public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception {
}
}
}

@Test
public void testCleanupWithRateLimit(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);

testDataset.createEmptyDataset().close();
testDataset.write(1, 100).close();
testDataset.write(2, 100).close();
try (Dataset dataset = testDataset.write(3, 100)) {
List<Version> versions = dataset.listVersions();
assertEquals(4, versions.size());
long beforeTimestampMillis =
versions.get(versions.size() - 1).getDataTime().toInstant().toEpochMilli() + 1;
long start = System.nanoTime();
RemovalStats stats =
dataset.cleanupWithPolicy(
CleanupPolicy.builder()
.withBeforeTimestampMillis(beforeTimestampMillis)
.withDeleteRateLimit(1L)
.build());
long elapsed = System.nanoTime() - start;

assertEquals(3L, stats.getOldVersions());
assertTrue(stats.getBytesRemoved() > 0);
assertTrue(elapsed >= Duration.ofSeconds(2).toNanos());
}
}
}
}
8 changes: 8 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,7 @@ def cleanup_old_versions(
*,
delete_unverified: bool = False,
error_if_tagged_old_versions: bool = True,
delete_rate_limit: Optional[int] = None,
) -> CleanupStats:
"""
Cleans up old versions of the dataset.
Expand Down Expand Up @@ -2458,6 +2459,12 @@ def cleanup_old_versions(
tagged versions match the parameters. Otherwise, tagged versions will
be ignored without any error and only untagged versions will be
cleaned up.

delete_rate_limit: int, optional
Maximum number of delete operations per second. When not set (default),
deletions run at full speed. Set this to a positive integer to avoid
hitting object store request rate limits (e.g. S3 HTTP 503 SlowDown).
For example, ``delete_rate_limit=100`` limits to 100 operations/second.
"""
if older_than is None and retain_versions is None:
older_than = timedelta(days=14)
Expand All @@ -2467,6 +2474,7 @@ def cleanup_old_versions(
retain_versions,
delete_unverified,
error_if_tagged_old_versions,
delete_rate_limit,
)

def create_scalar_index(
Expand Down
1 change: 1 addition & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ class _Dataset:
older_than_micros: int,
delete_unverified: Optional[bool] = None,
error_if_tagged_old_versions: Optional[bool] = None,
delete_rate_limit: Optional[int] = None,
) -> CleanupStats: ...
def get_version(self, tag: str) -> int: ...
# Tag operations
Expand Down
30 changes: 30 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,36 @@ def test_enable_disable_auto_cleanup(tmp_path):
assert len(ds.versions()) == 7


def test_cleanup_with_rate_limit(tmp_path):
"""Test that cleanup_old_versions works with delete_rate_limit parameter."""
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"

lance.write_dataset(table, base_dir, mode="create")
lance.write_dataset(table, base_dir, mode="overwrite")
lance.write_dataset(table, base_dir, mode="overwrite")
lance.write_dataset(table, base_dir, mode="overwrite")

dataset = lance.dataset(base_dir)
latest_version_timestamp = dataset.versions()[-1]["timestamp"]
now = (
datetime.now(latest_version_timestamp.tzinfo)
if latest_version_timestamp.tzinfo is not None
else datetime.now()
)

start = time.time_ns()
# Cleanup with a rate limit should still remove old versions correctly
stats = dataset.cleanup_old_versions(
older_than=(now - latest_version_timestamp), delete_rate_limit=1
)
finished = time.time_ns()

assert stats.old_versions == 3
assert stats.bytes_removed > 0
assert (finished - start) >= 2_000_000_000 # 2s


def test_create_from_commit(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
Expand Down
6 changes: 5 additions & 1 deletion python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1558,13 +1558,14 @@ impl Dataset {
}

/// Cleanup old versions from the dataset
#[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))]
#[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None, delete_rate_limit = None))]
fn cleanup_old_versions(
&self,
older_than_micros: Option<i64>,
retain_versions: Option<usize>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
delete_rate_limit: Option<u64>,
) -> PyResult<CleanupStats> {
let cleanup_stats = rt()
.block_on(None, async {
Expand All @@ -1582,6 +1583,9 @@ impl Dataset {
if let Some(v) = error_if_tagged_old_versions {
builder = builder.error_if_tagged_old_versions(v);
}
if let Some(v) = delete_rate_limit {
builder = builder.delete_rate_limit(v)?;
}

self.ds.cleanup_with_policy(builder.build()).await
})?
Expand Down
Loading