Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 77 additions & 118 deletions java/lance-jni/Cargo.lock

Large diffs are not rendered by default.

121 changes: 61 additions & 60 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::namespace::{
create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace,
};
use crate::session::{handle_from_session, session_from_handle};
use crate::storage_options::JavaStorageOptionsProvider;
use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString};
Expand Down Expand Up @@ -41,6 +44,7 @@ use lance::session::Session as LanceSession;
use lance::table::format::IndexMetadata;
use lance::table::format::{BasePath, Fragment};
use lance_core::datatypes::Schema as LanceSchema;
use lance_file::version::LanceFileVersion;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::btree::BTreeParameters;
use lance_index::DatasetIndexExt;
Expand Down Expand Up @@ -322,20 +326,35 @@ impl BlockingDataset {
Ok(indexes)
}

#[allow(clippy::too_many_arguments)]
pub fn commit_transaction(
&mut self,
transaction: Transaction,
store_params: ObjectStoreParams,
detached: bool,
enable_v2_manifest_paths: bool,
use_stable_row_ids: Option<bool>,
storage_format: Option<LanceFileVersion>,
max_retries: u32,
skip_auto_cleanup: bool,
) -> Result<Self> {
let new_dataset = RT.block_on(
CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(store_params)
.with_detached(detached)
.enable_v2_manifest_paths(enable_v2_manifest_paths)
.execute(transaction),
)?;
let mut builder = CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(store_params)
.with_detached(detached)
.enable_v2_manifest_paths(enable_v2_manifest_paths);
if let Some(use_stable) = use_stable_row_ids {
builder = builder.use_stable_row_ids(use_stable);
}
if let Some(format) = storage_format {
builder = builder.with_storage_format(format);
}
if max_retries > 0 {
builder = builder.with_max_retries(max_retries);
}
if skip_auto_cleanup {
builder = builder.with_skip_auto_cleanup(true);
}
let new_dataset = RT.block_on(builder.execute(transaction))?;
Ok(BlockingDataset { inner: new_dataset })
}

Expand Down Expand Up @@ -575,34 +594,11 @@ fn inner_create_with_ffi_stream<'local>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
) -> Result<JObject<'local>> {
use crate::namespace::{
create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace,
};

let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;

// Create the namespace wrapper for commit handling (if provided)
let namespace_info = if namespace_obj.is_null() {
None
} else {
let namespace: Arc<dyn LanceNamespace> = if is_directory_namespace(env, &namespace_obj)? {
let native_handle = get_native_namespace_handle(env, &namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) };
ns.inner.clone()
} else if is_rest_namespace(env, &namespace_obj)? {
let native_handle = get_native_namespace_handle(env, &namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) };
ns.inner.clone()
} else {
// Custom Java implementation, create a Java bridge wrapper
create_java_lance_namespace(env, &namespace_obj)?
};

// Extract table_id from Java List<String>
let table_id = env.get_strings(&table_id_obj)?;
Some((namespace, table_id))
};
let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?;

create_dataset(
env,
Expand Down Expand Up @@ -1150,10 +1146,6 @@ fn inner_open_native<'local>(
namespace_obj: JObject, // LanceNamespace object, null if no namespace
table_id_obj: JObject, // List<String>, null if no namespace
) -> Result<JObject<'local>> {
use crate::namespace::{
create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace,
};

let path_str: String = path.extract(env)?;
let version = env.get_u64_opt(&version_obj)?;
let block_size = env.get_int_opt(&block_size_obj)?;
Expand All @@ -1170,31 +1162,10 @@ fn inner_open_native<'local>(
storage_options_provider.map(|v| Arc::new(v) as Arc<dyn StorageOptionsProvider>);

// Extract namespace and table_id if provided (before get_bytes_opt which holds borrow)
let (namespace, table_id) = if !namespace_obj.is_null() {
// Check if it's a native implementation using instanceof checks
let ns_arc: Arc<dyn LanceNamespace> = if is_directory_namespace(env, &namespace_obj)? {
let native_handle = get_native_namespace_handle(env, &namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) };
ns.inner.clone()
} else if is_rest_namespace(env, &namespace_obj)? {
let native_handle = get_native_namespace_handle(env, &namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) };
ns.inner.clone()
} else {
// Custom Java implementation, create a Java bridge wrapper
create_java_lance_namespace(env, &namespace_obj)?
};

// Extract table_id from List<String>
let table_id = if !table_id_obj.is_null() {
Some(env.get_strings(&table_id_obj)?)
} else {
None
};

(Some(ns_arc), table_id)
} else {
(None, None)
let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?;
let (namespace, table_id) = match namespace_info {
Some((ns, tid)) => (Some(ns), Some(tid)),
None => (None, None),
};

let serialized_manifest = env.get_bytes_opt(&serialized_manifest)?;
Expand Down Expand Up @@ -1246,6 +1217,36 @@ fn get_native_namespace_handle(env: &mut JNIEnv, namespace_obj: &JObject) -> Res
.map_err(|e| Error::runtime_error(format!("getNativeHandle did not return a long: {}", e)))
}

/// Extract namespace and table_id from Java objects into Rust types.
///
/// Returns `None` if `namespace_obj` is null, otherwise returns the namespace
/// and table_id pair.
#[allow(clippy::type_complexity)]
pub(crate) fn extract_namespace_info(
env: &mut JNIEnv,
namespace_obj: &JObject,
table_id_obj: &JObject,
) -> Result<Option<(Arc<dyn LanceNamespace>, Vec<String>)>> {
if namespace_obj.is_null() {
return Ok(None);
}

let namespace: Arc<dyn LanceNamespace> = if is_directory_namespace(env, namespace_obj)? {
let native_handle = get_native_namespace_handle(env, namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) };
ns.inner.clone()
} else if is_rest_namespace(env, namespace_obj)? {
let native_handle = get_native_namespace_handle(env, namespace_obj)?;
let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) };
ns.inner.clone()
} else {
create_java_lance_namespace(env, namespace_obj)?
};

let table_id = env.get_strings(table_id_obj)?;
Ok(Some((namespace, table_id)))
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_getFragmentsNative<'a>(
mut env: JNIEnv<'a>,
Expand Down
6 changes: 1 addition & 5 deletions java/lance-jni/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,9 @@ fn inner_list_transactions<'local>(
RT.block_on(delta_guard.inner.list_transactions())?
};

let java_dataset = env
.get_field(&j_delta, "dataset", "Lorg/lance/Dataset;")?
.l()?;

let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;
for tx in txs.into_iter() {
let jtx = convert_to_java_transaction(env, tx, &java_dataset)?;
let jtx = convert_to_java_transaction(env, tx)?;
env.call_method(
&array_list,
"add",
Expand Down
Loading