From 92dc72704020285861408dc70515c7de18d0be18 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 27 Nov 2025 17:54:24 +0800 Subject: [PATCH 1/8] feat(java): support row lineage and cdf apis --- java/lance-jni/src/delta.rs | 211 ++++++++++++++++++ java/lance-jni/src/lib.rs | 1 + java/lance-jni/src/transaction.rs | 6 +- java/src/main/java/org/lance/Dataset.java | 37 +++ .../java/org/lance/delta/DatasetDelta.java | 104 +++++++++ .../org/lance/delta/DatasetDeltaBuilder.java | 67 ++++++ java/src/test/java/org/lance/DeltaTest.java | 168 ++++++++++++++ 7 files changed, 591 insertions(+), 3 deletions(-) create mode 100755 java/lance-jni/src/delta.rs create mode 100755 java/src/main/java/org/lance/delta/DatasetDelta.java create mode 100755 java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java create mode 100755 java/src/test/java/org/lance/DeltaTest.java diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs new file mode 100755 index 00000000000..f0b28c3c919 --- /dev/null +++ b/java/lance-jni/src/delta.rs @@ -0,0 +1,211 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET}; +use crate::error::Result; +use crate::ffi::JNIEnvExt; +use crate::transaction::convert_to_java_transaction; +use crate::RT; +use arrow::ffi_stream::FFI_ArrowArrayStream; +use jni::objects::{JObject, JValue}; +use jni::sys::jlong; +use jni::JNIEnv; +use lance::dataset::delta::DatasetDelta as RustDatasetDelta; +use lance::dataset::scanner::DatasetRecordBatchStream; +use lance::dataset::transaction::Transaction; +use lance_io::ffi::to_ffi_arrow_array_stream; + +pub const NATIVE_DELTA: &str = "nativeDeltaHandle"; + +// Removed Clone derive because RustDatasetDelta does not implement Clone +pub struct BlockingDatasetDelta { + pub(crate) inner: RustDatasetDelta, +} + +fn attach_native_delta<'local>( + env: &mut JNIEnv<'local>, + delta: BlockingDatasetDelta, + java_dataset: &JObject<'local>, +) -> Result> { + let j_delta = env.new_object("org/lance/delta/DatasetDelta", "()V", &[])?; + + // Set native Rust field + unsafe { env.set_rust_field(&j_delta, NATIVE_DELTA, delta) }?; + + // Attach base Dataset Java object for Transaction conversion + env.set_field( + &j_delta, + "dataset", + "Lorg/lance/Dataset;", + JValue::Object(java_dataset), + )?; + Ok(j_delta) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_delta_DatasetDeltaBuilder_nativeBuild<'local>( + mut env: JNIEnv<'local>, + _obj: JObject<'local>, + java_dataset: JObject<'local>, + compared_against_obj: JObject<'local>, // Optional + begin_version_obj: JObject<'local>, // Optional + end_version_obj: JObject<'local>, // Optional +) -> JObject<'local> { + ok_or_throw!( + env, + inner_native_build( + &mut env, + java_dataset, + compared_against_obj, + begin_version_obj, + end_version_obj + ) + ) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_Dataset_nativeBuildDelta<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject<'local>, + compared_against_obj: JObject<'local>, // Optional + begin_version_obj: JObject<'local>, // Optional + end_version_obj: JObject<'local>, // Optional +) -> JObject<'local> { + ok_or_throw!( + env, + inner_native_build( + &mut env, + java_dataset, + compared_against_obj, + begin_version_obj, + end_version_obj + ) + ) +} + +fn inner_native_build<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject<'local>, + compared_against_obj: JObject<'local>, + begin_version_obj: JObject<'local>, + end_version_obj: JObject<'local>, +) -> Result> { + // Extract Option from Optional first to avoid overlapping borrows of env + let compared_against = env.get_u64_opt(&compared_against_obj)?; + let begin_version = env.get_u64_opt(&begin_version_obj)?; + let end_version = env.get_u64_opt(&end_version_obj)?; + + // Get the Rust Dataset guard and build the delta inside a scope to drop the guard early + let delta = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET)? }; + + // Build Rust DatasetDelta using the builder semantics + let mut builder = dataset_guard.inner.delta(); + if let Some(compared) = compared_against { + builder = builder.compared_against_version(compared); + } else { + if let Some(begin) = begin_version { + builder = builder.with_begin_version(begin); + } + if let Some(end) = end_version { + builder = builder.with_end_version(end); + } + } + builder.build()? + }; + + let blocking_delta = BlockingDatasetDelta { inner: delta }; + attach_native_delta(env, blocking_delta, &java_dataset) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_delta_DatasetDelta_listTransactions<'local>( + mut env: JNIEnv<'local>, + j_delta: JObject<'local>, +) -> JObject<'local> { + ok_or_throw!(env, inner_list_transactions(&mut env, j_delta)) +} + +fn inner_list_transactions<'local>( + env: &mut JNIEnv<'local>, + j_delta: JObject<'local>, +) -> Result> { + // Borrow env to get the guard and fetch transactions inside a small scope so the guard is dropped + let txs: Vec = { + let delta_guard = + unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; + RT.block_on(delta_guard.inner.list_transactions())? + }; + + // Retrieve the Java Dataset attached to the DatasetDelta for conversion context + let java_dataset = env + .get_field(&j_delta, "dataset", "Lorg/lance/Dataset;")? + .l()?; + + // Convert to Java List + let array_list = env.new_object("java/util/ArrayList", "()V", &[])?; + for tx in txs.into_iter() { + let jtx = convert_to_java_transaction(env, tx, &java_dataset)?; + env.call_method( + &array_list, + "add", + "(Ljava/lang/Object;)Z", + &[JValue::Object(&jtx)], + )?; + } + Ok(array_list) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_delta_DatasetDelta_getInsertedRows<'local>( + mut env: JNIEnv<'local>, + j_delta: JObject, + stream_addr: jlong, +) { + ok_or_throw_without_return!(env, inner_get_inserted_rows(&mut env, j_delta, stream_addr)) +} + +fn inner_get_inserted_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlong) -> Result<()> { + let delta_guard = + unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; + + let stream: DatasetRecordBatchStream = RT.block_on(delta_guard.inner.get_inserted_rows())?; + let ffi_stream = to_ffi_arrow_array_stream(stream, RT.handle().clone())?; + + unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) } + Ok(()) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_delta_DatasetDelta_getUpdatedRows<'local>( + mut env: JNIEnv<'local>, + j_delta: JObject, + stream_addr: jlong, +) { + ok_or_throw_without_return!(env, inner_get_updated_rows(&mut env, j_delta, stream_addr)) +} + +fn inner_get_updated_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlong) -> Result<()> { + let delta_guard = + unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; + + let stream: DatasetRecordBatchStream = RT.block_on(delta_guard.inner.get_updated_rows())?; + let ffi_stream = to_ffi_arrow_array_stream(stream, RT.handle().clone())?; + + unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) } + Ok(()) +} + +#[no_mangle] +pub extern "system" fn Java_org_lance_delta_DatasetDelta_releaseNativeDelta( + mut env: JNIEnv, + j_delta: JObject, +) { + ok_or_throw_without_return!(env, inner_release_native_delta(&mut env, j_delta)); +} + +fn inner_release_native_delta(env: &mut JNIEnv, j_delta: JObject) -> Result<()> { + let _: BlockingDatasetDelta = unsafe { env.take_rust_field(j_delta, NATIVE_DELTA) }?; + Ok(()) +} diff --git a/java/lance-jni/src/lib.rs b/java/lance-jni/src/lib.rs index 850b70350d4..566f77dd110 100644 --- a/java/lance-jni/src/lib.rs +++ b/java/lance-jni/src/lib.rs @@ -42,6 +42,7 @@ macro_rules! ok_or_throw_with_return { mod blocking_blob; mod blocking_dataset; mod blocking_scanner; +mod delta; pub mod error; pub mod ffi; mod file_reader; diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 9b80a741d6f..9b077e7498c 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -452,7 +452,7 @@ fn inner_read_transaction<'local>( Ok(transaction) } -fn convert_to_java_transaction<'local>( +pub(crate) fn convert_to_java_transaction<'local>( env: &mut JNIEnv<'local>, transaction: Transaction, java_dataset: &JObject, @@ -479,7 +479,7 @@ fn convert_to_java_transaction<'local>( Ok(java_transaction) } -fn convert_to_java_operation<'local>( +pub(crate) fn convert_to_java_operation<'local>( env: &mut JNIEnv<'local>, operation: Option, ) -> Result> { @@ -707,7 +707,7 @@ fn convert_to_java_operation_inner<'local>( } } -fn convert_to_java_schema<'local>( +pub(crate) fn convert_to_java_schema<'local>( env: &mut JNIEnv<'local>, schema: LanceSchema, ) -> Result> { diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index b4d8fe00173..e05da021bc2 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1371,6 +1371,43 @@ public SqlQuery sql(String sql) { return new SqlQuery(this, sql); } + /** + * Compute the delta between versions. + ** + *
    + *
  • Either {@code comparedAgainst} is non-null: compare current version against this version. + *
  • Or both {@code beginVersion} (exclusive) and {@code endVersion} (inclusive) are non-null + * for an explicit range. + *
  • Mutually exclusive: do not specify both modes; do not provide incomplete range. + *
+ * + *

Examples: + * + *

{@code
+   * // Shorthand: compare current version against v1
+   * DatasetDelta delta1 = dataset.delta(Optional.of(1L), Optional.empty(), Optional.empty());
+   *
+   * // Explicit range: (1, 2]
+   * DatasetDelta delta2 = dataset.delta(Optional.empty(), Optional.of(1L), Optional.of(2L));
+   * }
+ * + * @param comparedAgainst the version to compare the current dataset against (optional) + * @param beginVersion the beginning version (exclusive) for explicit range (optional) + * @param endVersion the ending version (inclusive) for explicit range (optional) + * @return a DatasetDelta view + * @throws IllegalArgumentException if mutual exclusivity or completeness rules are violated + */ + public org.lance.delta.DatasetDelta delta( + Optional comparedAgainst, Optional beginVersion, Optional endVersion) { + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeBuildDelta(comparedAgainst, beginVersion, endVersion); + } + } + + private native org.lance.delta.DatasetDelta nativeBuildDelta( + Optional comparedAgainst, Optional beginVersion, Optional endVersion); + /** * Merge source data with the existing target data. * diff --git a/java/src/main/java/org/lance/delta/DatasetDelta.java b/java/src/main/java/org/lance/delta/DatasetDelta.java new file mode 100755 index 00000000000..0a4d497478a --- /dev/null +++ b/java/src/main/java/org/lance/delta/DatasetDelta.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.delta; + +import org.lance.Dataset; +import org.lance.JniLoader; +import org.lance.LockManager; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.ipc.ArrowReader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A view of differences between two versions of a dataset. + * + *

Created by {@link DatasetDeltaBuilder}. Provides methods to list transactions and stream + * inserted/updated rows between two versions. Mirrors Python semantics and Rust builder behavior. + */ +public class DatasetDelta implements Closeable { + static { + JniLoader.ensureLoaded(); + } + + /** Native handle to the Rust DatasetDelta. */ + private long nativeDeltaHandle; + + /** Base dataset used to compute the delta. Also used for Transaction conversion. */ + Dataset dataset; + + private final LockManager lockManager = new LockManager(); + + private DatasetDelta() {} + + /** + * List transactions between begin_version + 1 and end_version (inclusive). + * + * @return list of transactions + */ + public List listTransactions() { + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDeltaHandle != 0, "DatasetDelta is closed"); + return nativeListTransactions(); + } + } + + private native List nativeListTransactions(); + + /** Return a streaming ArrowReader for inserted rows. */ + public ArrowReader getInsertedRows() throws IOException { + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDeltaHandle != 0, "DatasetDelta is closed"); + BufferAllocator allocator = dataset.allocator(); + try (ArrowArrayStream s = ArrowArrayStream.allocateNew(allocator)) { + nativeGetInsertedRows(s.memoryAddress()); + return Data.importArrayStream(allocator, s); + } + } + } + + private native void nativeGetInsertedRows(long streamAddress) throws IOException; + + /** Return a streaming ArrowReader for updated rows. */ + public ArrowReader getUpdatedRows() throws IOException { + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDeltaHandle != 0, "DatasetDelta is closed"); + BufferAllocator allocator = dataset.allocator(); + try (ArrowArrayStream s = ArrowArrayStream.allocateNew(allocator)) { + nativeGetUpdatedRows(s.memoryAddress()); + return Data.importArrayStream(allocator, s); + } + } + } + + private native void nativeGetUpdatedRows(long streamAddress) throws IOException; + + @Override + public void close() { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + if (nativeDeltaHandle != 0) { + releaseNativeDelta(nativeDeltaHandle); + nativeDeltaHandle = 0; + } + } + } + + private native void releaseNativeDelta(long handle); +} diff --git a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java new file mode 100755 index 00000000000..566002b12da --- /dev/null +++ b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.delta; + +import org.lance.Dataset; + +import java.util.Optional; + +/** + * Builder for creating a {@link DatasetDelta} to explore changes between versions. + * + *

    + *
  • Use comparedAgainstVersion to compare current dataset version. + *
  • Or specify an explicit range with beginVersion and endVersion. + *
  • These modes are mutually exclusive. + *
+ */ +public class DatasetDeltaBuilder { + private final Dataset dataset; + private Optional comparedAgainst = Optional.empty(); + private Optional beginVersion = Optional.empty(); + private Optional endVersion = Optional.empty(); + + public DatasetDeltaBuilder(Dataset dataset) { + this.dataset = dataset; + } + + /** Shorthand to compare current dataset version against the given version. */ + public DatasetDeltaBuilder comparedAgainstVersion(long version) { + this.comparedAgainst = Optional.of(version); + return this; + } + + /** Set begin version (exclusive) for explicit range. */ + public DatasetDeltaBuilder withBeginVersion(long version) { + this.beginVersion = Optional.of(version); + return this; + } + + /** Set end version (inclusive) for explicit range. */ + public DatasetDeltaBuilder withEndVersion(long version) { + this.endVersion = Optional.of(version); + return this; + } + + /** Build the DatasetDelta after validating builder state. */ + public DatasetDelta build() { + return nativeBuild(dataset, comparedAgainst, beginVersion, endVersion); + } + + private static native DatasetDelta nativeBuild( + Dataset dataset, + Optional comparedAgainst, + Optional beginVersion, + Optional endVersion); +} diff --git a/java/src/test/java/org/lance/DeltaTest.java b/java/src/test/java/org/lance/DeltaTest.java new file mode 100755 index 00000000000..8029b8f3963 --- /dev/null +++ b/java/src/test/java/org/lance/DeltaTest.java @@ -0,0 +1,168 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance; + +import org.lance.delta.DatasetDelta; + +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Tests for Dataset.delta() Java interface bridging Rust semantics. */ +public class DeltaTest { + + @Test + public void testInsertedRowsComparedAgainst() throws IOException { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + String uri = "memory://delta_demo"; + // Build initial batch (2 rows) + Schema schema = + new Schema( + Arrays.asList( + Field.notNullable( + "id", new org.apache.arrow.vector.types.pojo.ArrowType.Int(32, true)), + Field.nullable( + "val", org.apache.arrow.vector.types.pojo.ArrowType.Utf8.INSTANCE))); + + org.apache.arrow.vector.VectorSchemaRoot root = + org.apache.arrow.vector.VectorSchemaRoot.create(schema, allocator); + root.allocateNew(); + org.apache.arrow.vector.IntVector idVec = + (org.apache.arrow.vector.IntVector) root.getVector("id"); + org.apache.arrow.vector.VarCharVector valVec = + (org.apache.arrow.vector.VarCharVector) root.getVector("val"); + idVec.setSafe(0, 1); + idVec.setSafe(1, 2); + valVec.setSafe(0, "a".getBytes()); + valVec.setSafe(1, "b".getBytes()); + root.setRowCount(2); + byte[] batch1; + // Create an output stream explicitly and pass it to ArrowStreamWriter + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (org.apache.arrow.vector.ipc.ArrowStreamWriter writer = + new org.apache.arrow.vector.ipc.ArrowStreamWriter(root, null, out)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + batch1 = out.toByteArray(); + root.close(); + + try (org.apache.arrow.vector.ipc.ArrowStreamReader reader1 = + new org.apache.arrow.vector.ipc.ArrowStreamReader( + new org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel(batch1), + allocator); + org.apache.arrow.c.ArrowArrayStream stream1 = + org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader1, stream1); + Dataset ds = + Dataset.write().stream(stream1).uri(uri).mode(WriteParams.WriteMode.CREATE).execute(); + + // Append one row (v2) + org.apache.arrow.vector.VectorSchemaRoot root2 = + org.apache.arrow.vector.VectorSchemaRoot.create(schema, allocator); + root2.allocateNew(); + org.apache.arrow.vector.IntVector idVec2 = + (org.apache.arrow.vector.IntVector) root2.getVector("id"); + org.apache.arrow.vector.VarCharVector valVec2 = + (org.apache.arrow.vector.VarCharVector) root2.getVector("val"); + idVec2.setSafe(0, 3); + valVec2.setSafe(0, "c".getBytes()); + root2.setRowCount(1); + byte[] batch2; + ByteArrayOutputStream out2 = new ByteArrayOutputStream(); + try (org.apache.arrow.vector.ipc.ArrowStreamWriter writer2 = + new org.apache.arrow.vector.ipc.ArrowStreamWriter(root2, null, out2)) { + writer2.start(); + writer2.writeBatch(); + writer2.end(); + } + batch2 = out2.toByteArray(); + root2.close(); + + try (org.apache.arrow.vector.ipc.ArrowStreamReader reader2 = + new org.apache.arrow.vector.ipc.ArrowStreamReader( + new org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel(batch2), + allocator); + org.apache.arrow.c.ArrowArrayStream stream2 = + org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader2, stream2); + Dataset ds2 = + Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute(); + + DatasetDelta delta = ds2.delta(Optional.of(1L), Optional.empty(), Optional.empty()); + try { + try (ArrowReader inserted = delta.getInsertedRows()) { + int total = 0; + while (inserted.loadNextBatch()) { + Schema outSchema = inserted.getVectorSchemaRoot().getSchema(); + List names = + outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList()); + Assertions.assertTrue(names.contains("_row_created_at_version")); + Assertions.assertTrue(names.contains("_row_last_updated_at_version")); + total += inserted.getVectorSchemaRoot().getRowCount(); + } + Assertions.assertEquals(1, total); + } + } catch (UnsatisfiedLinkError e) { + Assumptions.assumeTrue( + false, "JNI for DatasetDelta.getInsertedRows not available: " + e.getMessage()); + } + } + } + } + } + + @Test + public void testListTransactionsExplicitRange() { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + String uri = "memory://delta_demo_tx"; + // v1 + Schema schema = + new Schema( + Arrays.asList( + Field.notNullable( + "id", new org.apache.arrow.vector.types.pojo.ArrowType.Int(32, true)), + Field.nullable( + "val", org.apache.arrow.vector.types.pojo.ArrowType.Utf8.INSTANCE))); + try (Dataset ds = Dataset.create(allocator, uri, schema, new WriteParams.Builder().build())) { + // v2 + WriteParams params = + new WriteParams.Builder().withMode(WriteParams.WriteMode.APPEND).build(); + Dataset ds2 = Dataset.create(allocator, uri, schema, params); + + DatasetDelta delta = ds2.delta(Optional.empty(), Optional.of(1L), Optional.of(2L)); + try { + List txs = delta.listTransactions(); + Assertions.assertTrue(txs.size() >= 1); + } catch (UnsatisfiedLinkError e) { + Assumptions.assumeTrue( + false, "JNI for DatasetDelta.listTransactions not available: " + e.getMessage()); + } + } + } + } +} From 9ba5bbc967519c4791ae961ece68c61dfc2b3899 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 28 Nov 2025 15:16:43 +0800 Subject: [PATCH 2/8] refactor --- java/src/main/java/org/lance/Dataset.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index e05da021bc2..370670311ac 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1372,8 +1372,8 @@ public SqlQuery sql(String sql) { } /** - * Compute the delta between versions. - ** + * Compute the delta between versions. * + * *
    *
  • Either {@code comparedAgainst} is non-null: compare current version against this version. *
  • Or both {@code beginVersion} (exclusive) and {@code endVersion} (inclusive) are non-null From 02dc19c65d9442bc36737c7665c7cea61a66f20f Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 28 Nov 2025 15:50:56 +0800 Subject: [PATCH 3/8] refactor --- java/src/test/java/org/lance/DeltaTest.java | 46 +++++++++------------ 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/java/src/test/java/org/lance/DeltaTest.java b/java/src/test/java/org/lance/DeltaTest.java index 8029b8f3963..a736cc748cb 100755 --- a/java/src/test/java/org/lance/DeltaTest.java +++ b/java/src/test/java/org/lance/DeltaTest.java @@ -15,11 +15,18 @@ import org.lance.delta.DatasetDelta; +import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.Data; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; @@ -47,13 +54,10 @@ public void testInsertedRowsComparedAgainst() throws IOException { Field.nullable( "val", org.apache.arrow.vector.types.pojo.ArrowType.Utf8.INSTANCE))); - org.apache.arrow.vector.VectorSchemaRoot root = - org.apache.arrow.vector.VectorSchemaRoot.create(schema, allocator); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); root.allocateNew(); - org.apache.arrow.vector.IntVector idVec = - (org.apache.arrow.vector.IntVector) root.getVector("id"); - org.apache.arrow.vector.VarCharVector valVec = - (org.apache.arrow.vector.VarCharVector) root.getVector("val"); + IntVector idVec = (IntVector) root.getVector("id"); + VarCharVector valVec = (VarCharVector) root.getVector("val"); idVec.setSafe(0, 1); idVec.setSafe(1, 2); valVec.setSafe(0, "a".getBytes()); @@ -62,8 +66,7 @@ public void testInsertedRowsComparedAgainst() throws IOException { byte[] batch1; // Create an output stream explicitly and pass it to ArrowStreamWriter ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (org.apache.arrow.vector.ipc.ArrowStreamWriter writer = - new org.apache.arrow.vector.ipc.ArrowStreamWriter(root, null, out)) { + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { writer.start(); writer.writeBatch(); writer.end(); @@ -71,10 +74,8 @@ public void testInsertedRowsComparedAgainst() throws IOException { batch1 = out.toByteArray(); root.close(); - try (org.apache.arrow.vector.ipc.ArrowStreamReader reader1 = - new org.apache.arrow.vector.ipc.ArrowStreamReader( - new org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel(batch1), - allocator); + try (ArrowStreamReader reader1 = + new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch1), allocator); org.apache.arrow.c.ArrowArrayStream stream1 = org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader1, stream1); @@ -82,20 +83,16 @@ public void testInsertedRowsComparedAgainst() throws IOException { Dataset.write().stream(stream1).uri(uri).mode(WriteParams.WriteMode.CREATE).execute(); // Append one row (v2) - org.apache.arrow.vector.VectorSchemaRoot root2 = - org.apache.arrow.vector.VectorSchemaRoot.create(schema, allocator); + VectorSchemaRoot root2 = VectorSchemaRoot.create(schema, allocator); root2.allocateNew(); - org.apache.arrow.vector.IntVector idVec2 = - (org.apache.arrow.vector.IntVector) root2.getVector("id"); - org.apache.arrow.vector.VarCharVector valVec2 = - (org.apache.arrow.vector.VarCharVector) root2.getVector("val"); + IntVector idVec2 = (IntVector) root2.getVector("id"); + VarCharVector valVec2 = (VarCharVector) root2.getVector("val"); idVec2.setSafe(0, 3); valVec2.setSafe(0, "c".getBytes()); root2.setRowCount(1); byte[] batch2; ByteArrayOutputStream out2 = new ByteArrayOutputStream(); - try (org.apache.arrow.vector.ipc.ArrowStreamWriter writer2 = - new org.apache.arrow.vector.ipc.ArrowStreamWriter(root2, null, out2)) { + try (ArrowStreamWriter writer2 = new ArrowStreamWriter(root2, null, out2)) { writer2.start(); writer2.writeBatch(); writer2.end(); @@ -103,12 +100,9 @@ public void testInsertedRowsComparedAgainst() throws IOException { batch2 = out2.toByteArray(); root2.close(); - try (org.apache.arrow.vector.ipc.ArrowStreamReader reader2 = - new org.apache.arrow.vector.ipc.ArrowStreamReader( - new org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel(batch2), - allocator); - org.apache.arrow.c.ArrowArrayStream stream2 = - org.apache.arrow.c.ArrowArrayStream.allocateNew(allocator)) { + try (ArrowStreamReader reader2 = + new ArrowStreamReader(new ByteArrayReadableSeekableByteChannel(batch2), allocator); + ArrowArrayStream stream2 = ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader2, stream2); Dataset ds2 = Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute(); From be71dc9cc097e644a6bfc76cc807b8d55332ff63 Mon Sep 17 00:00:00 2001 From: yanghua Date: Tue, 2 Dec 2025 22:12:16 +0800 Subject: [PATCH 4/8] address review comments --- java/lance-jni/src/delta.rs | 56 +++++++++---------- java/src/main/java/org/lance/Dataset.java | 7 ++- .../java/org/lance/delta/DatasetDelta.java | 4 +- .../org/lance/delta/DatasetDeltaBuilder.java | 5 ++ java/src/test/java/org/lance/DeltaTest.java | 18 +++--- 5 files changed, 46 insertions(+), 44 deletions(-) diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs index f0b28c3c919..5cfc5ab8c64 100755 --- a/java/lance-jni/src/delta.rs +++ b/java/lance-jni/src/delta.rs @@ -17,7 +17,6 @@ use lance_io::ffi::to_ffi_arrow_array_stream; pub const NATIVE_DELTA: &str = "nativeDeltaHandle"; -// Removed Clone derive because RustDatasetDelta does not implement Clone pub struct BlockingDatasetDelta { pub(crate) inner: RustDatasetDelta, } @@ -29,10 +28,8 @@ fn attach_native_delta<'local>( ) -> Result> { let j_delta = env.new_object("org/lance/delta/DatasetDelta", "()V", &[])?; - // Set native Rust field unsafe { env.set_rust_field(&j_delta, NATIVE_DELTA, delta) }?; - // Attach base Dataset Java object for Transaction conversion env.set_field( &j_delta, "dataset", @@ -47,9 +44,9 @@ pub extern "system" fn Java_org_lance_delta_DatasetDeltaBuilder_nativeBuild<'loc mut env: JNIEnv<'local>, _obj: JObject<'local>, java_dataset: JObject<'local>, - compared_against_obj: JObject<'local>, // Optional - begin_version_obj: JObject<'local>, // Optional - end_version_obj: JObject<'local>, // Optional + compared_against_obj: JObject<'local>, + begin_version_obj: JObject<'local>, + end_version_obj: JObject<'local>, ) -> JObject<'local> { ok_or_throw!( env, @@ -67,9 +64,9 @@ pub extern "system" fn Java_org_lance_delta_DatasetDeltaBuilder_nativeBuild<'loc pub extern "system" fn Java_org_lance_Dataset_nativeBuildDelta<'local>( mut env: JNIEnv<'local>, java_dataset: JObject<'local>, - compared_against_obj: JObject<'local>, // Optional - begin_version_obj: JObject<'local>, // Optional - end_version_obj: JObject<'local>, // Optional + compared_against_obj: JObject<'local>, + begin_version_obj: JObject<'local>, + end_version_obj: JObject<'local>, ) -> JObject<'local> { ok_or_throw!( env, @@ -90,27 +87,21 @@ fn inner_native_build<'local>( begin_version_obj: JObject<'local>, end_version_obj: JObject<'local>, ) -> Result> { - // Extract Option from Optional first to avoid overlapping borrows of env let compared_against = env.get_u64_opt(&compared_against_obj)?; let begin_version = env.get_u64_opt(&begin_version_obj)?; let end_version = env.get_u64_opt(&end_version_obj)?; - // Get the Rust Dataset guard and build the delta inside a scope to drop the guard early let delta = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET)? }; - // Build Rust DatasetDelta using the builder semantics let mut builder = dataset_guard.inner.delta(); if let Some(compared) = compared_against { builder = builder.compared_against_version(compared); - } else { - if let Some(begin) = begin_version { - builder = builder.with_begin_version(begin); - } - if let Some(end) = end_version { - builder = builder.with_end_version(end); - } + } else if let Some(begin) = begin_version { + builder = builder.with_begin_version(begin); + } else if let Some(end) = end_version { + builder = builder.with_end_version(end); } builder.build()? }; @@ -131,19 +122,16 @@ fn inner_list_transactions<'local>( env: &mut JNIEnv<'local>, j_delta: JObject<'local>, ) -> Result> { - // Borrow env to get the guard and fetch transactions inside a small scope so the guard is dropped let txs: Vec = { let delta_guard = unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; RT.block_on(delta_guard.inner.list_transactions())? }; - // Retrieve the Java Dataset attached to the DatasetDelta for conversion context let java_dataset = env .get_field(&j_delta, "dataset", "Lorg/lance/Dataset;")? .l()?; - // Convert to Java List let array_list = env.new_object("java/util/ArrayList", "()V", &[])?; for tx in txs.into_iter() { let jtx = convert_to_java_transaction(env, tx, &java_dataset)?; @@ -160,13 +148,17 @@ fn inner_list_transactions<'local>( #[no_mangle] pub extern "system" fn Java_org_lance_delta_DatasetDelta_getInsertedRows<'local>( mut env: JNIEnv<'local>, - j_delta: JObject, + j_delta: JObject<'local>, stream_addr: jlong, ) { ok_or_throw_without_return!(env, inner_get_inserted_rows(&mut env, j_delta, stream_addr)) } -fn inner_get_inserted_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlong) -> Result<()> { +fn inner_get_inserted_rows<'local>( + env: &mut JNIEnv, + j_delta: JObject<'local>, + stream_addr: jlong, +) -> Result<()> { let delta_guard = unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; @@ -180,13 +172,17 @@ fn inner_get_inserted_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlon #[no_mangle] pub extern "system" fn Java_org_lance_delta_DatasetDelta_getUpdatedRows<'local>( mut env: JNIEnv<'local>, - j_delta: JObject, + j_delta: JObject<'local>, stream_addr: jlong, ) { ok_or_throw_without_return!(env, inner_get_updated_rows(&mut env, j_delta, stream_addr)) } -fn inner_get_updated_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlong) -> Result<()> { +fn inner_get_updated_rows<'local>( + env: &mut JNIEnv, + j_delta: JObject<'local>, + stream_addr: jlong, +) -> Result<()> { let delta_guard = unsafe { env.get_rust_field::<_, _, BlockingDatasetDelta>(&j_delta, NATIVE_DELTA) }?; @@ -200,12 +196,12 @@ fn inner_get_updated_rows(env: &mut JNIEnv, j_delta: JObject, stream_addr: jlong #[no_mangle] pub extern "system" fn Java_org_lance_delta_DatasetDelta_releaseNativeDelta( mut env: JNIEnv, - j_delta: JObject, + obj: JObject, ) { - ok_or_throw_without_return!(env, inner_release_native_delta(&mut env, j_delta)); + ok_or_throw_without_return!(env, inner_release_native_delta(&mut env, obj)); } -fn inner_release_native_delta(env: &mut JNIEnv, j_delta: JObject) -> Result<()> { - let _: BlockingDatasetDelta = unsafe { env.take_rust_field(j_delta, NATIVE_DELTA) }?; +fn inner_release_native_delta(env: &mut JNIEnv, obj: JObject) -> Result<()> { + let _: BlockingDatasetDelta = unsafe { env.take_rust_field(obj, NATIVE_DELTA) }?; Ok(()) } diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 370670311ac..a024622eb0b 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -16,6 +16,7 @@ import org.lance.cleanup.CleanupPolicy; import org.lance.cleanup.RemovalStats; import org.lance.compaction.CompactionOptions; +import org.lance.delta.DatasetDelta; import org.lance.index.Index; import org.lance.index.IndexOptions; import org.lance.index.IndexParams; @@ -1372,7 +1373,7 @@ public SqlQuery sql(String sql) { } /** - * Compute the delta between versions. * + * Compute the delta between versions. * *
      *
    • Either {@code comparedAgainst} is non-null: compare current version against this version. @@ -1397,7 +1398,7 @@ public SqlQuery sql(String sql) { * @return a DatasetDelta view * @throws IllegalArgumentException if mutual exclusivity or completeness rules are violated */ - public org.lance.delta.DatasetDelta delta( + public DatasetDelta delta( Optional comparedAgainst, Optional beginVersion, Optional endVersion) { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); @@ -1405,7 +1406,7 @@ public org.lance.delta.DatasetDelta delta( } } - private native org.lance.delta.DatasetDelta nativeBuildDelta( + private native DatasetDelta nativeBuildDelta( Optional comparedAgainst, Optional beginVersion, Optional endVersion); /** diff --git a/java/src/main/java/org/lance/delta/DatasetDelta.java b/java/src/main/java/org/lance/delta/DatasetDelta.java index 0a4d497478a..75275b91320 100755 --- a/java/src/main/java/org/lance/delta/DatasetDelta.java +++ b/java/src/main/java/org/lance/delta/DatasetDelta.java @@ -31,7 +31,7 @@ * A view of differences between two versions of a dataset. * *

      Created by {@link DatasetDeltaBuilder}. Provides methods to list transactions and stream - * inserted/updated rows between two versions. Mirrors Python semantics and Rust builder behavior. + * inserted/updated rows between two versions. */ public class DatasetDelta implements Closeable { static { @@ -42,7 +42,7 @@ public class DatasetDelta implements Closeable { private long nativeDeltaHandle; /** Base dataset used to compute the delta. Also used for Transaction conversion. */ - Dataset dataset; + private Dataset dataset; private final LockManager lockManager = new LockManager(); diff --git a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java index 566002b12da..983cac1d53d 100755 --- a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java +++ b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java @@ -14,6 +14,7 @@ package org.lance.delta; import org.lance.Dataset; +import org.lance.JniLoader; import java.util.Optional; @@ -27,6 +28,10 @@ *

    */ public class DatasetDeltaBuilder { + static { + JniLoader.ensureLoaded(); + } + private final Dataset dataset; private Optional comparedAgainst = Optional.empty(); private Optional beginVersion = Optional.empty(); diff --git a/java/src/test/java/org/lance/DeltaTest.java b/java/src/test/java/org/lance/DeltaTest.java index a736cc748cb..70bf8e11e6f 100755 --- a/java/src/test/java/org/lance/DeltaTest.java +++ b/java/src/test/java/org/lance/DeltaTest.java @@ -146,15 +146,15 @@ public void testListTransactionsExplicitRange() { // v2 WriteParams params = new WriteParams.Builder().withMode(WriteParams.WriteMode.APPEND).build(); - Dataset ds2 = Dataset.create(allocator, uri, schema, params); - - DatasetDelta delta = ds2.delta(Optional.empty(), Optional.of(1L), Optional.of(2L)); - try { - List txs = delta.listTransactions(); - Assertions.assertTrue(txs.size() >= 1); - } catch (UnsatisfiedLinkError e) { - Assumptions.assumeTrue( - false, "JNI for DatasetDelta.listTransactions not available: " + e.getMessage()); + try (Dataset ds2 = Dataset.create(allocator, uri, schema, params); ) { + DatasetDelta delta = ds2.delta(Optional.empty(), Optional.of(1L), Optional.of(2L)); + try { + List txs = delta.listTransactions(); + Assertions.assertTrue(txs.size() == 1); + } catch (UnsatisfiedLinkError e) { + Assumptions.assumeTrue( + false, "JNI for DatasetDelta.listTransactions not available: " + e.getMessage()); + } } } } From 79e1d59aa5b67fd2aa59a4b06179a533889b7494 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 13 Dec 2025 16:41:22 +0800 Subject: [PATCH 5/8] address review comments --- java/lance-jni/src/delta.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs index 5cfc5ab8c64..a1f5ed5ea5b 100755 --- a/java/lance-jni/src/delta.rs +++ b/java/lance-jni/src/delta.rs @@ -98,10 +98,8 @@ fn inner_native_build<'local>( let mut builder = dataset_guard.inner.delta(); if let Some(compared) = compared_against { builder = builder.compared_against_version(compared); - } else if let Some(begin) = begin_version { - builder = builder.with_begin_version(begin); - } else if let Some(end) = end_version { - builder = builder.with_end_version(end); + } else if let (Some(begin), Some(end)) = (begin_version, end_version) { + builder = builder.with_begin_version(begin).with_end_version(end); } builder.build()? }; From ed743fea29fdd21c68fc5a7d1c53b7f3369cfd40 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 13 Dec 2025 17:01:43 +0800 Subject: [PATCH 6/8] address review comments --- java/lance-jni/src/delta.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs index a1f5ed5ea5b..8a407a19167 100755 --- a/java/lance-jni/src/delta.rs +++ b/java/lance-jni/src/delta.rs @@ -195,11 +195,12 @@ fn inner_get_updated_rows<'local>( pub extern "system" fn Java_org_lance_delta_DatasetDelta_releaseNativeDelta( mut env: JNIEnv, obj: JObject, + handle: jlong, ) { - ok_or_throw_without_return!(env, inner_release_native_delta(&mut env, obj)); + ok_or_throw_without_return!(env, inner_release_native_delta(&mut env, obj, handle)); } -fn inner_release_native_delta(env: &mut JNIEnv, obj: JObject) -> Result<()> { +fn inner_release_native_delta(env: &mut JNIEnv, obj: JObject, _handle: jlong) -> Result<()> { let _: BlockingDatasetDelta = unsafe { env.take_rust_field(obj, NATIVE_DELTA) }?; Ok(()) } From b7c377348ba4e6d2d050f8c67cae7baa214bc3ee Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 17 Dec 2025 16:28:40 +0800 Subject: [PATCH 7/8] address review comments --- java/src/main/java/org/lance/Dataset.java | 42 +++++++++---------- .../java/org/lance/delta/DatasetDelta.java | 5 ++- .../org/lance/delta/DatasetDeltaBuilder.java | 15 ++++++- java/src/test/java/org/lance/DeltaTest.java | 27 +++++++++--- 4 files changed, 57 insertions(+), 32 deletions(-) diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index a024622eb0b..56cf610eb15 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1373,36 +1373,32 @@ public SqlQuery sql(String sql) { } /** - * Compute the delta between versions. + * Compute the delta between current version and this version. * - *
      - *
    • Either {@code comparedAgainst} is non-null: compare current version against this version. - *
    • Or both {@code beginVersion} (exclusive) and {@code endVersion} (inclusive) are non-null - * for an explicit range. - *
    • Mutually exclusive: do not specify both modes; do not provide incomplete range. - *
    - * - *

    Examples: - * - *

    {@code
    -   * // Shorthand: compare current version against v1
    -   * DatasetDelta delta1 = dataset.delta(Optional.of(1L), Optional.empty(), Optional.empty());
    -   *
    -   * // Explicit range: (1, 2]
    -   * DatasetDelta delta2 = dataset.delta(Optional.empty(), Optional.of(1L), Optional.of(2L));
    -   * }
    + * @param comparedAgainst the version to compare the current dataset against + * @return a DatasetDelta view + * @throws IllegalArgumentException if mutual exclusivity or completeness rules are violated + */ + public DatasetDelta delta(long comparedAgainst) { + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeBuildDelta(Optional.of(comparedAgainst), Optional.empty(), Optional.empty()); + } + } + + /** + * Compute the delta between both {@code beginVersion} (exclusive) and {@code endVersion} + * (inclusive). * - * @param comparedAgainst the version to compare the current dataset against (optional) - * @param beginVersion the beginning version (exclusive) for explicit range (optional) - * @param endVersion the ending version (inclusive) for explicit range (optional) + * @param beginVersion the beginning version (exclusive) for explicit range + * @param endVersion the ending version (inclusive) for explicit range * @return a DatasetDelta view * @throws IllegalArgumentException if mutual exclusivity or completeness rules are violated */ - public DatasetDelta delta( - Optional comparedAgainst, Optional beginVersion, Optional endVersion) { + public DatasetDelta delta(long beginVersion, long endVersion) { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); - return nativeBuildDelta(comparedAgainst, beginVersion, endVersion); + return nativeBuildDelta(Optional.empty(), Optional.of(beginVersion), Optional.of(endVersion)); } } diff --git a/java/src/main/java/org/lance/delta/DatasetDelta.java b/java/src/main/java/org/lance/delta/DatasetDelta.java index 75275b91320..1c0eb4e9a73 100755 --- a/java/src/main/java/org/lance/delta/DatasetDelta.java +++ b/java/src/main/java/org/lance/delta/DatasetDelta.java @@ -16,6 +16,7 @@ import org.lance.Dataset; import org.lance.JniLoader; import org.lance.LockManager; +import org.lance.Transaction; import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.Data; @@ -53,14 +54,14 @@ private DatasetDelta() {} * * @return list of transactions */ - public List listTransactions() { + public List listTransactions() { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { Preconditions.checkArgument(nativeDeltaHandle != 0, "DatasetDelta is closed"); return nativeListTransactions(); } } - private native List nativeListTransactions(); + private native List nativeListTransactions(); /** Return a streaming ArrowReader for inserted rows. */ public ArrowReader getInsertedRows() throws IOException { diff --git a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java index 983cac1d53d..8ad186f930d 100755 --- a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java +++ b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java @@ -41,19 +41,30 @@ public DatasetDeltaBuilder(Dataset dataset) { this.dataset = dataset; } - /** Shorthand to compare current dataset version against the given version. */ + /** + * Compare the current dataset version against the specified version. The delta will automatically + * order the versions so that `begin_version` is less than `end_version`. Cannot be used together + * with explicit `with_begin_version` and `with_end_version`. + */ public DatasetDeltaBuilder comparedAgainstVersion(long version) { this.comparedAgainst = Optional.of(version); return this; } /** Set begin version (exclusive) for explicit range. */ + /** + * Set the beginning version for the delta (exclusive). Must be used together with + * `with_end_version`. + */ public DatasetDeltaBuilder withBeginVersion(long version) { this.beginVersion = Optional.of(version); return this; } - /** Set end version (inclusive) for explicit range. */ + /** + * Set the ending version for the delta (inclusive). Must be used together with + * `with_begin_version`. Cannot be used together with `compared_against_version`. + */ public DatasetDeltaBuilder withEndVersion(long version) { this.endVersion = Optional.of(version); return this; diff --git a/java/src/test/java/org/lance/DeltaTest.java b/java/src/test/java/org/lance/DeltaTest.java index 70bf8e11e6f..72537207524 100755 --- a/java/src/test/java/org/lance/DeltaTest.java +++ b/java/src/test/java/org/lance/DeltaTest.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; /** Tests for Dataset.delta() Java interface bridging Rust semantics. */ @@ -107,19 +106,37 @@ public void testInsertedRowsComparedAgainst() throws IOException { Dataset ds2 = Dataset.write().stream(stream2).uri(uri).mode(WriteParams.WriteMode.APPEND).execute(); - DatasetDelta delta = ds2.delta(Optional.of(1L), Optional.empty(), Optional.empty()); + DatasetDelta delta = ds2.delta(1L); try { try (ArrowReader inserted = delta.getInsertedRows()) { int total = 0; + boolean foundRow = false; + while (inserted.loadNextBatch()) { - Schema outSchema = inserted.getVectorSchemaRoot().getSchema(); + VectorSchemaRoot outRoot = inserted.getVectorSchemaRoot(); + Schema outSchema = outRoot.getSchema(); List names = outSchema.getFields().stream().map(Field::getName).collect(Collectors.toList()); Assertions.assertTrue(names.contains("_row_created_at_version")); Assertions.assertTrue(names.contains("_row_last_updated_at_version")); - total += inserted.getVectorSchemaRoot().getRowCount(); + + IntVector outId = (IntVector) outRoot.getVector("id"); + VarCharVector outVal = (VarCharVector) outRoot.getVector("val"); + + for (int i = 0; i < outRoot.getRowCount(); i++) { + int id = outId.get(i); + byte[] bytes = outVal.get(i); + String val = new String(bytes, java.nio.charset.StandardCharsets.UTF_8); + if (id == 3 && "c".equals(val)) { + foundRow = true; + } + } + + total += outRoot.getRowCount(); } + Assertions.assertEquals(1, total); + Assertions.assertTrue(foundRow, "Inserted row (id=3, val=c) not found in delta"); } } catch (UnsatisfiedLinkError e) { Assumptions.assumeTrue( @@ -147,7 +164,7 @@ public void testListTransactionsExplicitRange() { WriteParams params = new WriteParams.Builder().withMode(WriteParams.WriteMode.APPEND).build(); try (Dataset ds2 = Dataset.create(allocator, uri, schema, params); ) { - DatasetDelta delta = ds2.delta(Optional.empty(), Optional.of(1L), Optional.of(2L)); + DatasetDelta delta = ds2.delta(1L, 2L); try { List txs = delta.listTransactions(); Assertions.assertTrue(txs.size() == 1); From a2252b7046a097417068d77ba36ce5f788c35e64 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 17 Dec 2025 17:53:47 +0800 Subject: [PATCH 8/8] address review comments --- java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java index 8ad186f930d..9084da2ab9c 100755 --- a/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java +++ b/java/src/main/java/org/lance/delta/DatasetDeltaBuilder.java @@ -51,7 +51,6 @@ public DatasetDeltaBuilder comparedAgainstVersion(long version) { return this; } - /** Set begin version (exclusive) for explicit range. */ /** * Set the beginning version for the delta (exclusive). Must be used together with * `with_end_version`.