Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use arrow::ipc::writer::StreamWriter;
use arrow::record_batch::RecordBatchIterator;
use arrow_schema::DataType;
use arrow_schema::Schema as ArrowSchema;
use chrono::{DateTime, Utc};
use jni::objects::{JMap, JString, JValue};
use jni::sys::{jboolean, jint};
use jni::sys::{jbyteArray, jlong};
use jni::{objects::JObject, JNIEnv};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{compact_files, CompactionOptions as RustCompactionOptions};
use lance::dataset::refs::{Ref, TagContents};
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
Expand All @@ -44,6 +46,7 @@ use std::collections::HashMap;
use std::iter::empty;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};

pub const NATIVE_DATASET: &str = "nativeDatasetHandle";

Expand Down Expand Up @@ -320,6 +323,10 @@ impl BlockingDataset {
Ok(())
}

pub fn cleanup_with_policy(&mut self, policy: CleanupPolicy) -> Result<RemovalStats> {
Ok(RT.block_on(self.inner.cleanup_with_policy(policy))?)
}

pub fn close(&self) {}
}

Expand Down Expand Up @@ -2223,3 +2230,63 @@ fn convert_java_compaction_options_to_rust(
&defer_index_remap,
)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCleanupWithPolicy<'local>(
mut env: JNIEnv<'local>,
jdataset: JObject,
jpolicy: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_cleanup_with_policy(&mut env, jdataset, jpolicy))
}

fn inner_cleanup_with_policy<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject,
jpolicy: JObject,
) -> Result<JObject<'local>> {
let before_ts_millis =
env.get_optional_u64_from_method(&jpolicy, "getBeforeTimestampMillis")?;
let before_timestamp = before_ts_millis.map(|millis| {
let st = UNIX_EPOCH + Duration::from_millis(millis);
DateTime::<Utc>::from(st)
});

let before_version = env.get_optional_u64_from_method(&jpolicy, "getBeforeVersion")?;

let delete_unverified = env
.get_optional_from_method(&jpolicy, "getDeleteUnverified", |env, obj| {
Ok(env.call_method(obj, "booleanValue", "()Z", &[])?.z()?)
})?
.unwrap_or(false);

let error_if_tagged_old_versions = env
.get_optional_from_method(&jpolicy, "getErrorIfTaggedOldVersions", |env, obj| {
Ok(env.call_method(obj, "booleanValue", "()Z", &[])?.z()?)
})?
.unwrap_or(true);

let policy = CleanupPolicy {
before_timestamp,
before_version,
delete_unverified,
error_if_tagged_old_versions,
};

let stats = {
let mut dataset =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }?;
dataset.cleanup_with_policy(policy)
}?;

let jstats = env.new_object(
"com/lancedb/lance/cleanup/RemovalStats",
"(JJ)V",
&[
JValue::Long(stats.bytes_removed as i64),
JValue::Long(stats.old_versions as i64),
],
)?;

Ok(jstats)
}
17 changes: 17 additions & 0 deletions java/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.lancedb.lance;

import com.lancedb.lance.cleanup.CleanupPolicy;
import com.lancedb.lance.cleanup.RemovalStats;
import com.lancedb.lance.compaction.CompactionOptions;
import com.lancedb.lance.index.IndexParams;
import com.lancedb.lance.index.IndexType;
Expand Down Expand Up @@ -1290,4 +1292,19 @@ public Dataset shallowClone(String targetPath, Ref ref, Map<String, String> stor

private native Dataset nativeShallowClone(
String targetPath, Ref ref, Optional<Map<String, String>> storageOptions);

/**
* Cleanup dataset based on a specified policy.
*
* @param policy cleanup policy
* @return removal stats
*/
public RemovalStats cleanupWithPolicy(CleanupPolicy policy) {
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeCleanupWithPolicy(policy);
}
}

private native RemovalStats nativeCleanupWithPolicy(CleanupPolicy policy);
}
99 changes: 99 additions & 0 deletions java/src/main/java/com/lancedb/lance/cleanup/CleanupPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance.cleanup;

import java.util.Optional;

/**
* Cleanup policy for dataset cleanup.
*
* <p>All fields are optional. We intentionally do not set default values here to avoid conflicting
* with Rust-side defaults. Refer to Rust CleanupPolicy for defaults.
*/
public class CleanupPolicy {
private final Optional<Long> beforeTimestampMillis;
private final Optional<Long> beforeVersion;
private final Optional<Boolean> deleteUnverified;
private final Optional<Boolean> errorIfTaggedOldVersions;

private CleanupPolicy(
Optional<Long> beforeTimestampMillis,
Optional<Long> beforeVersion,
Optional<Boolean> deleteUnverified,
Optional<Boolean> errorIfTaggedOldVersions) {
this.beforeTimestampMillis = beforeTimestampMillis;
this.beforeVersion = beforeVersion;
this.deleteUnverified = deleteUnverified;
this.errorIfTaggedOldVersions = errorIfTaggedOldVersions;
}

public static Builder builder() {
return new Builder();
}

public Optional<Long> getBeforeTimestampMillis() {
return beforeTimestampMillis;
}

public Optional<Long> getBeforeVersion() {
return beforeVersion;
}

public Optional<Boolean> getDeleteUnverified() {
return deleteUnverified;
}

public Optional<Boolean> getErrorIfTaggedOldVersions() {
return errorIfTaggedOldVersions;
}

/** Builder for CleanupPolicy. */
public static class Builder {
private Optional<Long> beforeTimestampMillis = Optional.empty();
private Optional<Long> beforeVersion = Optional.empty();
private Optional<Boolean> deleteUnverified = Optional.empty();
private Optional<Boolean> errorIfTaggedOldVersions = Optional.empty();

private Builder() {}

/** Set a timestamp threshold in milliseconds since UNIX epoch (UTC). */
public Builder withBeforeTimestampMillis(long beforeTimestampMillis) {
this.beforeTimestampMillis = Optional.of(beforeTimestampMillis);
return this;
}

/** Set a version threshold; versions older than this will be cleaned. */
public Builder withBeforeVersion(long beforeVersion) {
this.beforeVersion = Optional.of(beforeVersion);
return this;
}

/** If true, delete unverified data files even if they are recent. */
public Builder withDeleteUnverified(boolean deleteUnverified) {
this.deleteUnverified = Optional.of(deleteUnverified);
return this;
}

/** If true, raise error when tagged versions are old and matched by policy. */
public Builder withErrorIfTaggedOldVersions(boolean errorIfTaggedOldVersions) {
this.errorIfTaggedOldVersions = Optional.of(errorIfTaggedOldVersions);
return this;
}

public CleanupPolicy build() {
return new CleanupPolicy(
beforeTimestampMillis, beforeVersion, deleteUnverified, errorIfTaggedOldVersions);
}
}
}
33 changes: 33 additions & 0 deletions java/src/main/java/com/lancedb/lance/cleanup/RemovalStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance.cleanup;

/** Statistics returned by dataset cleanup. */
public class RemovalStats {
private final long bytesRemoved;
private final long oldVersions;

public RemovalStats(long bytesRemoved, long oldVersions) {
this.bytesRemoved = bytesRemoved;
this.oldVersions = oldVersions;
}

public long getBytesRemoved() {
return bytesRemoved;
}

public long getOldVersions() {
return oldVersions;
}
}
113 changes: 113 additions & 0 deletions java/src/test/java/com/lancedb/lance/CleanupTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance;

import com.lancedb.lance.cleanup.CleanupPolicy;
import com.lancedb.lance.cleanup.RemovalStats;

import org.apache.arrow.memory.RootAllocator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class CleanupTest {
@Test
public void testCleanupBeforeVersion(@TempDir Path tempDir) {
String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);

testDataset.createEmptyDataset().close();

testDataset.write(1, 10).close();
testDataset.write(2, 10).close();

try (Dataset dataset = testDataset.write(3, 10)) {
RemovalStats stats =
dataset.cleanupWithPolicy(CleanupPolicy.builder().withBeforeVersion(3L).build());
assertEquals(2L, stats.getOldVersions());
}
}
}

@Test
public void testCleanupBeforeTimestamp(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);

testDataset.createEmptyDataset().close();

testDataset.write(1, 10).close();

Thread.sleep(100L);
long beforeTs = System.currentTimeMillis();

testDataset.write(2, 10).close();

try (Dataset dataset = testDataset.write(3, 10)) {
RemovalStats stats =
dataset.cleanupWithPolicy(
CleanupPolicy.builder().withBeforeTimestampMillis(beforeTs).build());
assertEquals(2L, stats.getOldVersions());
}
}
}

@Test
public void testCleanupTaggedVersion(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("test_dataset_for_cleanup").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);

testDataset.createEmptyDataset().close();

Dataset ds = testDataset.write(1, 10);
ds.tags().create("tag-2", 2L);

testDataset.write(2, 10).close();

try (Dataset dataset = testDataset.write(3, 10)) {
// cleanup with tag-2 should throw exception
Assertions.assertThrows(
RuntimeException.class,
() ->
dataset.cleanupWithPolicy(
CleanupPolicy.builder()
.withErrorIfTaggedOldVersions(true)
.withBeforeVersion(3L)
.build()));

// cleanup with tag-2 should not throw exception when set errorIfTaggedOldVersions to false
RemovalStats stats =
dataset.cleanupWithPolicy(
CleanupPolicy.builder()
.withErrorIfTaggedOldVersions(false)
.withBeforeVersion(3L)
.build());
assertEquals(1L, stats.getOldVersions());

// The version with tag-2 should not be cleaned up
Assertions.assertEquals("tag-2", dataset.tags().list().get(0).getName());
}
}
}
}