From 451486b19e861c5074ff0b32c8e68cb1d795b0a2 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Tue, 4 Nov 2025 20:28:36 +0800 Subject: [PATCH] feat(java): expose cleanup_with_policy api --- java/lance-jni/src/blocking_dataset.rs | 67 +++++++++++ .../main/java/com/lancedb/lance/Dataset.java | 17 +++ .../lancedb/lance/cleanup/CleanupPolicy.java | 99 +++++++++++++++ .../lancedb/lance/cleanup/RemovalStats.java | 33 +++++ .../java/com/lancedb/lance/CleanupTest.java | 113 ++++++++++++++++++ 5 files changed, 329 insertions(+) create mode 100644 java/src/main/java/com/lancedb/lance/cleanup/CleanupPolicy.java create mode 100644 java/src/main/java/com/lancedb/lance/cleanup/RemovalStats.java create mode 100644 java/src/test/java/com/lancedb/lance/CleanupTest.java diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index bc6b65b6b14..57e397ccc8b 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -19,11 +19,13 @@ use arrow::ipc::writer::StreamWriter; use arrow::record_batch::RecordBatchIterator; use arrow_schema::DataType; use arrow_schema::Schema as ArrowSchema; +use chrono::{DateTime, Utc}; use jni::objects::{JMap, JString, JValue}; use jni::sys::{jboolean, jint}; use jni::sys::{jbyteArray, jlong}; use jni::{objects::JObject, JNIEnv}; use lance::dataset::builder::DatasetBuilder; +use lance::dataset::cleanup::{CleanupPolicy, RemovalStats}; use lance::dataset::optimize::{compact_files, CompactionOptions as RustCompactionOptions}; use lance::dataset::refs::{Ref, TagContents}; use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt}; @@ -44,6 +46,7 @@ use std::collections::HashMap; use std::iter::empty; use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, UNIX_EPOCH}; pub const NATIVE_DATASET: &str = "nativeDatasetHandle"; @@ -320,6 +323,10 @@ impl BlockingDataset { Ok(()) } + pub fn cleanup_with_policy(&mut self, policy: CleanupPolicy) -> Result { + Ok(RT.block_on(self.inner.cleanup_with_policy(policy))?) + } + pub fn close(&self) {} } @@ -2223,3 +2230,63 @@ fn convert_java_compaction_options_to_rust( &defer_index_remap, ) } + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCleanupWithPolicy<'local>( + mut env: JNIEnv<'local>, + jdataset: JObject, + jpolicy: JObject, +) -> JObject<'local> { + ok_or_throw!(env, inner_cleanup_with_policy(&mut env, jdataset, jpolicy)) +} + +fn inner_cleanup_with_policy<'local>( + env: &mut JNIEnv<'local>, + jdataset: JObject, + jpolicy: JObject, +) -> Result> { + let before_ts_millis = + env.get_optional_u64_from_method(&jpolicy, "getBeforeTimestampMillis")?; + let before_timestamp = before_ts_millis.map(|millis| { + let st = UNIX_EPOCH + Duration::from_millis(millis); + DateTime::::from(st) + }); + + let before_version = env.get_optional_u64_from_method(&jpolicy, "getBeforeVersion")?; + + let delete_unverified = env + .get_optional_from_method(&jpolicy, "getDeleteUnverified", |env, obj| { + Ok(env.call_method(obj, "booleanValue", "()Z", &[])?.z()?) + })? + .unwrap_or(false); + + let error_if_tagged_old_versions = env + .get_optional_from_method(&jpolicy, "getErrorIfTaggedOldVersions", |env, obj| { + Ok(env.call_method(obj, "booleanValue", "()Z", &[])?.z()?) + })? + .unwrap_or(true); + + let policy = CleanupPolicy { + before_timestamp, + before_version, + delete_unverified, + error_if_tagged_old_versions, + }; + + let stats = { + let mut dataset = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }?; + dataset.cleanup_with_policy(policy) + }?; + + let jstats = env.new_object( + "com/lancedb/lance/cleanup/RemovalStats", + "(JJ)V", + &[ + JValue::Long(stats.bytes_removed as i64), + JValue::Long(stats.old_versions as i64), + ], + )?; + + Ok(jstats) +} diff --git a/java/src/main/java/com/lancedb/lance/Dataset.java b/java/src/main/java/com/lancedb/lance/Dataset.java index d99cf91dbe4..7ca9562b6e1 100644 --- a/java/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/src/main/java/com/lancedb/lance/Dataset.java @@ -13,6 +13,8 @@ */ package com.lancedb.lance; +import com.lancedb.lance.cleanup.CleanupPolicy; +import com.lancedb.lance.cleanup.RemovalStats; import com.lancedb.lance.compaction.CompactionOptions; import com.lancedb.lance.index.IndexParams; import com.lancedb.lance.index.IndexType; @@ -1290,4 +1292,19 @@ public Dataset shallowClone(String targetPath, Ref ref, Map stor private native Dataset nativeShallowClone( String targetPath, Ref ref, Optional> storageOptions); + + /** + * Cleanup dataset based on a specified policy. + * + * @param policy cleanup policy + * @return removal stats + */ + public RemovalStats cleanupWithPolicy(CleanupPolicy policy) { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeCleanupWithPolicy(policy); + } + } + + private native RemovalStats nativeCleanupWithPolicy(CleanupPolicy policy); } diff --git a/java/src/main/java/com/lancedb/lance/cleanup/CleanupPolicy.java b/java/src/main/java/com/lancedb/lance/cleanup/CleanupPolicy.java new file mode 100644 index 00000000000..acdf12d465f --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/cleanup/CleanupPolicy.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.cleanup; + +import java.util.Optional; + +/** + * Cleanup policy for dataset cleanup. + * + *

All fields are optional. We intentionally do not set default values here to avoid conflicting + * with Rust-side defaults. Refer to Rust CleanupPolicy for defaults. + */ +public class CleanupPolicy { + private final Optional beforeTimestampMillis; + private final Optional beforeVersion; + private final Optional deleteUnverified; + private final Optional errorIfTaggedOldVersions; + + private CleanupPolicy( + Optional beforeTimestampMillis, + Optional beforeVersion, + Optional deleteUnverified, + Optional errorIfTaggedOldVersions) { + this.beforeTimestampMillis = beforeTimestampMillis; + this.beforeVersion = beforeVersion; + this.deleteUnverified = deleteUnverified; + this.errorIfTaggedOldVersions = errorIfTaggedOldVersions; + } + + public static Builder builder() { + return new Builder(); + } + + public Optional getBeforeTimestampMillis() { + return beforeTimestampMillis; + } + + public Optional getBeforeVersion() { + return beforeVersion; + } + + public Optional getDeleteUnverified() { + return deleteUnverified; + } + + public Optional getErrorIfTaggedOldVersions() { + return errorIfTaggedOldVersions; + } + + /** Builder for CleanupPolicy. */ + public static class Builder { + private Optional beforeTimestampMillis = Optional.empty(); + private Optional beforeVersion = Optional.empty(); + private Optional deleteUnverified = Optional.empty(); + private Optional errorIfTaggedOldVersions = Optional.empty(); + + private Builder() {} + + /** Set a timestamp threshold in milliseconds since UNIX epoch (UTC). */ + public Builder withBeforeTimestampMillis(long beforeTimestampMillis) { + this.beforeTimestampMillis = Optional.of(beforeTimestampMillis); + return this; + } + + /** Set a version threshold; versions older than this will be cleaned. */ + public Builder withBeforeVersion(long beforeVersion) { + this.beforeVersion = Optional.of(beforeVersion); + return this; + } + + /** If true, delete unverified data files even if they are recent. */ + public Builder withDeleteUnverified(boolean deleteUnverified) { + this.deleteUnverified = Optional.of(deleteUnverified); + return this; + } + + /** If true, raise error when tagged versions are old and matched by policy. */ + public Builder withErrorIfTaggedOldVersions(boolean errorIfTaggedOldVersions) { + this.errorIfTaggedOldVersions = Optional.of(errorIfTaggedOldVersions); + return this; + } + + public CleanupPolicy build() { + return new CleanupPolicy( + beforeTimestampMillis, beforeVersion, deleteUnverified, errorIfTaggedOldVersions); + } + } +} diff --git a/java/src/main/java/com/lancedb/lance/cleanup/RemovalStats.java b/java/src/main/java/com/lancedb/lance/cleanup/RemovalStats.java new file mode 100644 index 00000000000..a48bd28d631 --- /dev/null +++ b/java/src/main/java/com/lancedb/lance/cleanup/RemovalStats.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.cleanup; + +/** Statistics returned by dataset cleanup. */ +public class RemovalStats { + private final long bytesRemoved; + private final long oldVersions; + + public RemovalStats(long bytesRemoved, long oldVersions) { + this.bytesRemoved = bytesRemoved; + this.oldVersions = oldVersions; + } + + public long getBytesRemoved() { + return bytesRemoved; + } + + public long getOldVersions() { + return oldVersions; + } +} diff --git a/java/src/test/java/com/lancedb/lance/CleanupTest.java b/java/src/test/java/com/lancedb/lance/CleanupTest.java new file mode 100644 index 00000000000..7d9d556c4dc --- /dev/null +++ b/java/src/test/java/com/lancedb/lance/CleanupTest.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.cleanup.CleanupPolicy; +import com.lancedb.lance.cleanup.RemovalStats; + +import org.apache.arrow.memory.RootAllocator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CleanupTest { + @Test + public void testCleanupBeforeVersion(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + + testDataset.createEmptyDataset().close(); + + testDataset.write(1, 10).close(); + testDataset.write(2, 10).close(); + + try (Dataset dataset = testDataset.write(3, 10)) { + RemovalStats stats = + dataset.cleanupWithPolicy(CleanupPolicy.builder().withBeforeVersion(3L).build()); + assertEquals(2L, stats.getOldVersions()); + } + } + } + + @Test + public void testCleanupBeforeTimestamp(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + + testDataset.createEmptyDataset().close(); + + testDataset.write(1, 10).close(); + + Thread.sleep(100L); + long beforeTs = System.currentTimeMillis(); + + testDataset.write(2, 10).close(); + + try (Dataset dataset = testDataset.write(3, 10)) { + RemovalStats stats = + dataset.cleanupWithPolicy( + CleanupPolicy.builder().withBeforeTimestampMillis(beforeTs).build()); + assertEquals(2L, stats.getOldVersions()); + } + } + } + + @Test + public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + + testDataset.createEmptyDataset().close(); + + Dataset ds = testDataset.write(1, 10); + ds.tags().create("tag-2", 2L); + + testDataset.write(2, 10).close(); + + try (Dataset dataset = testDataset.write(3, 10)) { + // cleanup with tag-2 should throw exception + Assertions.assertThrows( + RuntimeException.class, + () -> + dataset.cleanupWithPolicy( + CleanupPolicy.builder() + .withErrorIfTaggedOldVersions(true) + .withBeforeVersion(3L) + .build())); + + // cleanup with tag-2 should not throw exception when set errorIfTaggedOldVersions to false + RemovalStats stats = + dataset.cleanupWithPolicy( + CleanupPolicy.builder() + .withErrorIfTaggedOldVersions(false) + .withBeforeVersion(3L) + .build()); + assertEquals(1L, stats.getOldVersions()); + + // The version with tag-2 should not be cleaned up + Assertions.assertEquals("tag-2", dataset.tags().list().get(0).getName()); + } + } + } +}