Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 33 additions & 14 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
storage_options_obj: JObject, // Map<String, String>
initial_bases: JObject,
target_bases: JObject,
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -430,6 +432,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
storage_options_obj,
initial_bases,
target_bases,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
)
)
}
Expand All @@ -449,6 +453,8 @@ fn inner_create_with_ffi_schema<'local>(
storage_options_obj: JObject, // Map<String, String>
initial_bases: JObject,
target_bases: JObject,
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> Result<JObject<'local>> {
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
Expand All @@ -468,6 +474,8 @@ fn inner_create_with_ffi_schema<'local>(
storage_options_obj,
initial_bases,
target_bases,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
reader,
None, // No namespace for schema-only creation
false, // No managed versioning for schema-only creation
Expand Down Expand Up @@ -523,6 +531,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
storage_options_obj: JObject, // Map<String, String>
initial_bases: JObject, // Optional<List<BasePath>>
target_bases: JObject, // Optional<List<String>>
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
namespace_client_managed_versioning: jboolean, // Whether namespace manages versioning
Expand All @@ -543,6 +553,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
storage_options_obj,
initial_bases,
target_bases,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
namespace_obj,
table_id_obj,
namespace_client_managed_versioning != 0,
Expand All @@ -555,19 +567,21 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
arrow_array_stream_addr: jlong,
path: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
initial_bases: JObject, // Optional<List<BasePath>>
target_bases: JObject, // Optional<List<String>>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
namespace_client_managed_versioning: bool, // Whether namespace manages versioning
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
initial_bases: JObject, // Optional<List<BasePath>>
target_bases: JObject, // Optional<List<String>>
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
namespace_client_managed_versioning: bool, // Whether namespace manages versioning
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -588,6 +602,8 @@ fn inner_create_with_ffi_stream<'local>(
storage_options_obj,
initial_bases,
target_bases,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
reader,
namespace_info,
namespace_client_managed_versioning,
Expand All @@ -613,6 +629,8 @@ fn create_dataset<'local>(
storage_options_obj: JObject,
initial_bases: JObject,
target_bases: JObject,
allow_external_blob_outside_bases: JObject,
blob_pack_file_size_threshold: JObject,
reader: impl RecordBatchReader + Send + 'static,
namespace_info: Option<(Arc<dyn LanceNamespace>, Vec<String>)>,
namespace_client_managed_versioning: bool,
Expand All @@ -631,7 +649,8 @@ fn create_dataset<'local>(
&storage_options_obj,
&initial_bases,
&target_bases,
&JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write()
&allow_external_blob_outside_bases,
&blob_pack_file_size_threshold,
)?;

// Set up namespace commit handler and storage options provider if namespace is provided
Expand Down
10 changes: 10 additions & 0 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> JObject<'local> {
ok_or_throw_with_return!(
env,
Expand All @@ -117,6 +118,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
),
JObject::default()
)
Expand All @@ -138,6 +140,7 @@ fn inner_create_with_ffi_array<'local>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> Result<JObject<'local>> {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
Expand Down Expand Up @@ -165,6 +168,7 @@ fn inner_create_with_ffi_array<'local>(
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
reader,
)
}
Expand All @@ -185,6 +189,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> JObject<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -202,6 +207,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
),
JObject::null()
)
Expand All @@ -222,6 +228,7 @@ fn inner_create_with_ffi_stream<'local>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -239,6 +246,7 @@ fn inner_create_with_ffi_stream<'local>(
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
blob_pack_file_size_threshold,
reader,
)
}
Expand All @@ -257,6 +265,7 @@ fn create_fragment<'a>(
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
blob_pack_file_size_threshold: JObject, // Optional<Long>
source: impl StreamingWriteSource,
) -> Result<JObject<'a>> {
let path_str = dataset_uri.extract(env)?;
Expand All @@ -274,6 +283,7 @@ fn create_fragment<'a>(
&JObject::null(), // not used when creating fragments
&JObject::null(), // not used when creating fragments
&allow_external_blob_outside_bases,
&blob_pack_file_size_threshold,
)?;

// Set up storage options provider if namespace is provided
Expand Down
4 changes: 4 additions & 0 deletions java/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub fn extract_write_params(
initial_bases: &JObject, // Optional<BasePath>
target_bases: &JObject, // Optional<String>
allow_external_blob_outside_bases: &JObject, // Optional<Boolean>
blob_pack_file_size_threshold: &JObject, // Optional<Long>
) -> Result<WriteParams> {
let mut write_params = WriteParams::default();

Expand Down Expand Up @@ -101,6 +102,9 @@ pub fn extract_write_params(
if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? {
write_params.allow_external_blob_outside_bases = allow;
}
if let Some(max_bytes) = env.get_long_opt(blob_pack_file_size_threshold)? {
write_params.blob_pack_file_size_threshold = Some(max_bytes as usize);
}

// Create storage options accessor from static storage_options
let accessor = if storage_options.is_empty() {
Expand Down
12 changes: 10 additions & 2 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ public static Dataset create(
params.getEnableV2ManifestPaths(),
params.getStorageOptions(),
params.getInitialBases(),
params.getTargetBases());
params.getTargetBases(),
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold());
dataset.allocator = allocator;
return dataset;
}
Expand Down Expand Up @@ -196,7 +198,9 @@ private static native Dataset createWithFfiSchema(
Optional<Boolean> enableV2ManifestPaths,
Map<String, String> storageOptions,
Optional<List<BasePath>> initialBases,
Optional<List<String>> targetBases);
Optional<List<String>> targetBases,
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold);

/**
* Creates a dataset from an FFI arrow stream.
Expand Down Expand Up @@ -231,6 +235,8 @@ private static native Dataset createWithFfiStream(
Map<String, String> storageOptions,
Optional<List<BasePath>> initialBases,
Optional<List<String>> targetBases,
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold,
LanceNamespace namespaceClient,
List<String> tableId,
boolean namespaceClientManagedVersioning);
Expand Down Expand Up @@ -280,6 +286,8 @@ static Dataset create(
params.getStorageOptions(),
params.getInitialBases(),
params.getTargetBases(),
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold(),
namespaceClient,
tableId,
namespaceClientManagedVersioning);
Expand Down
12 changes: 8 additions & 4 deletions java/src/main/java/org/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ static List<FragmentMetadata> create(
params.getStorageOptions(),
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases());
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold());
}
}

Expand All @@ -306,7 +307,8 @@ static List<FragmentMetadata> create(
params.getStorageOptions(),
namespaceClient,
tableId,
params.getAllowExternalBlobOutsideBases());
params.getAllowExternalBlobOutsideBases(),
params.getBlobPackFileSizeThreshold());
}

/** Create a fragment from the given arrow array and schema. */
Expand All @@ -323,7 +325,8 @@ private static native List<FragmentMetadata> createWithFfiArray(
Map<String, String> storageOptions,
LanceNamespace namespaceClient,
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases);
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold);

/** Create a fragment from the given arrow stream. */
private static native List<FragmentMetadata> createWithFfiStream(
Expand All @@ -338,5 +341,6 @@ private static native List<FragmentMetadata> createWithFfiStream(
Map<String, String> storageOptions,
LanceNamespace namespaceClient,
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases);
Optional<Boolean> allowExternalBlobOutsideBases,
Optional<Long> blobPackFileSizeThreshold);
}
30 changes: 30 additions & 0 deletions java/src/main/java/org/lance/WriteDatasetBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class WriteDatasetBuilder {
private Optional<String> dataStorageVersion = Optional.empty();
private Optional<List<BasePath>> initialBases = Optional.empty();
private Optional<List<String>> targetBases = Optional.empty();
private Optional<Boolean> allowExternalBlobOutsideBases = Optional.empty();
private Optional<Long> blobPackFileSizeThreshold = Optional.empty();
private Session session;

/** Creates a new builder instance. Package-private, use Dataset.write() instead. */
Expand Down Expand Up @@ -282,6 +284,30 @@ public WriteDatasetBuilder targetBases(List<String> targetBases) {
return this;
}

/**
* Sets whether to allow external blob URIs outside registered base paths.
*
* @param allowExternalBlobOutsideBases Whether to allow external blob URIs outside bases
* @return this builder instance
*/
public WriteDatasetBuilder allowExternalBlobOutsideBases(boolean allowExternalBlobOutsideBases) {
this.allowExternalBlobOutsideBases = Optional.of(allowExternalBlobOutsideBases);
return this;
}

/**
* Sets the maximum size in bytes for blob v2 pack (.blob) sidecar files.
*
* <p>When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB.
*
* @param blobPackFileSizeThreshold maximum pack file size in bytes
* @return this builder instance
*/
public WriteDatasetBuilder blobPackFileSizeThreshold(long blobPackFileSizeThreshold) {
this.blobPackFileSizeThreshold = Optional.of(blobPackFileSizeThreshold);
return this;
}

/**
* Sets the session to share caches with other datasets.
*
Expand Down Expand Up @@ -414,6 +440,8 @@ private Dataset executeWithNamespaceClient() {

initialBases.ifPresent(paramsBuilder::withInitialBases);
targetBases.ifPresent(paramsBuilder::withTargetBases);
allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases);
blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold);

WriteParams params = paramsBuilder.build();

Expand Down Expand Up @@ -446,6 +474,8 @@ private Dataset executeWithUri() {
dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion);
initialBases.ifPresent(paramsBuilder::withInitialBases);
targetBases.ifPresent(paramsBuilder::withTargetBases);
allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases);
blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold);

WriteParams params = paramsBuilder.build();

Expand Down
Loading
Loading