From f3198f09d408fcfa4a5412f3b0fe38dbc4fe4897 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 14 Apr 2026 04:23:55 -0500 Subject: [PATCH 1/2] feat: add configurable blob v2 pack file size Add blob_max_pack_file_bytes to WriteParams, allowing users to override the default 1 GiB maximum pack (.blob) sidecar file size. Exposed in Rust, Python, and Java APIs. Co-Authored-By: Claude Opus 4.6 (1M context) --- java/lance-jni/Cargo.lock | 1 + java/lance-jni/src/blocking_dataset.rs | 1 + java/lance-jni/src/fragment.rs | 10 ++++++ java/lance-jni/src/utils.rs | 4 +++ java/src/main/java/org/lance/Fragment.java | 12 ++++--- java/src/main/java/org/lance/WriteParams.java | 33 +++++++++++++++++-- python/python/lance/dataset.py | 5 +++ python/src/dataset.rs | 3 ++ rust/lance/src/dataset/blob.rs | 7 +++- rust/lance/src/dataset/write.rs | 25 ++++++++++++++ 10 files changed, 94 insertions(+), 7 deletions(-) diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 09143a6adf0..5bb30513354 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-data", + "arrow-ipc", "arrow-ord", "arrow-schema", "arrow-select", diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index aa56d54de95..5e7d464a36f 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -632,6 +632,7 @@ fn create_dataset<'local>( &initial_bases, &target_bases, &JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write() + &JObject::null(), // blob_max_pack_file_bytes not used for Dataset.write() )?; // Set up namespace commit handler and storage options provider if namespace is provided diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 3b2a7ba6b22..c17ab86c8fd 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -99,6 +99,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_max_pack_file_bytes: JObject, // Optional ) -> JObject<'local> { ok_or_throw_with_return!( env, @@ -117,6 +118,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_max_pack_file_bytes, ), JObject::default() ) @@ -138,6 +140,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_max_pack_file_bytes: JObject, // Optional ) -> Result> { let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray; let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; @@ -165,6 +168,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_max_pack_file_bytes, reader, ) } @@ -185,6 +189,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_max_pack_file_bytes: JObject, // Optional ) -> JObject<'a> { ok_or_throw_with_return!( env, @@ -202,6 +207,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_max_pack_file_bytes, ), JObject::null() ) @@ -222,6 +228,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_max_pack_file_bytes: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -239,6 +246,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_max_pack_file_bytes, reader, ) } @@ -257,6 +265,7 @@ fn create_fragment<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_max_pack_file_bytes: JObject, // Optional source: impl StreamingWriteSource, ) -> Result> { let path_str = dataset_uri.extract(env)?; @@ -274,6 +283,7 @@ fn create_fragment<'a>( &JObject::null(), // not used when creating fragments &JObject::null(), // not used when creating fragments &allow_external_blob_outside_bases, + &blob_max_pack_file_bytes, )?; // Set up storage options provider if namespace is provided diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index 3d7ef2ebdaf..d90664dbf2b 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -52,6 +52,7 @@ pub fn extract_write_params( initial_bases: &JObject, // Optional target_bases: &JObject, // Optional allow_external_blob_outside_bases: &JObject, // Optional + blob_max_pack_file_bytes: &JObject, // Optional ) -> Result { let mut write_params = WriteParams::default(); @@ -101,6 +102,9 @@ pub fn extract_write_params( if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? { write_params.allow_external_blob_outside_bases = allow; } + if let Some(max_bytes) = env.get_long_opt(blob_max_pack_file_bytes)? { + write_params.blob_max_pack_file_bytes = Some(max_bytes as usize); + } // Create storage options accessor from static storage_options let accessor = if storage_options.is_empty() { diff --git a/java/src/main/java/org/lance/Fragment.java b/java/src/main/java/org/lance/Fragment.java index 89a61560561..1d35332c91a 100644 --- a/java/src/main/java/org/lance/Fragment.java +++ b/java/src/main/java/org/lance/Fragment.java @@ -280,7 +280,8 @@ static List create( params.getStorageOptions(), namespaceClient, tableId, - params.getAllowExternalBlobOutsideBases()); + params.getAllowExternalBlobOutsideBases(), + params.getBlobMaxPackFileBytes()); } } @@ -306,7 +307,8 @@ static List create( params.getStorageOptions(), namespaceClient, tableId, - params.getAllowExternalBlobOutsideBases()); + params.getAllowExternalBlobOutsideBases(), + params.getBlobMaxPackFileBytes()); } /** Create a fragment from the given arrow array and schema. */ @@ -323,7 +325,8 @@ private static native List createWithFfiArray( Map storageOptions, LanceNamespace namespaceClient, List tableId, - Optional allowExternalBlobOutsideBases); + Optional allowExternalBlobOutsideBases, + Optional blobMaxPackFileBytes); /** Create a fragment from the given arrow stream. */ private static native List createWithFfiStream( @@ -338,5 +341,6 @@ private static native List createWithFfiStream( Map storageOptions, LanceNamespace namespaceClient, List tableId, - Optional allowExternalBlobOutsideBases); + Optional allowExternalBlobOutsideBases, + Optional blobMaxPackFileBytes); } diff --git a/java/src/main/java/org/lance/WriteParams.java b/java/src/main/java/org/lance/WriteParams.java index 6f5d4545ca9..a98f5940713 100644 --- a/java/src/main/java/org/lance/WriteParams.java +++ b/java/src/main/java/org/lance/WriteParams.java @@ -41,6 +41,7 @@ public enum WriteMode { private final Optional> initialBases; private final Optional> targetBases; private final Optional allowExternalBlobOutsideBases; + private final Optional blobMaxPackFileBytes; private WriteParams( Optional maxRowsPerFile, @@ -53,7 +54,8 @@ private WriteParams( Map storageOptions, Optional> initialBases, Optional> targetBases, - Optional allowExternalBlobOutsideBases) { + Optional allowExternalBlobOutsideBases, + Optional blobMaxPackFileBytes) { this.maxRowsPerFile = maxRowsPerFile; this.maxRowsPerGroup = maxRowsPerGroup; this.maxBytesPerFile = maxBytesPerFile; @@ -65,6 +67,7 @@ private WriteParams( this.initialBases = initialBases; this.targetBases = targetBases; this.allowExternalBlobOutsideBases = allowExternalBlobOutsideBases; + this.blobMaxPackFileBytes = blobMaxPackFileBytes; } public Optional getMaxRowsPerFile() { @@ -124,6 +127,17 @@ public Optional getAllowExternalBlobOutsideBases() { return allowExternalBlobOutsideBases; } + /** + * Get the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @return Optional containing the max pack file size in bytes, or empty if not set + */ + public Optional getBlobMaxPackFileBytes() { + return blobMaxPackFileBytes; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -148,6 +162,7 @@ public static class Builder { private Optional> initialBases = Optional.empty(); private Optional> targetBases = Optional.empty(); private Optional allowExternalBlobOutsideBases = Optional.empty(); + private Optional blobMaxPackFileBytes = Optional.empty(); public Builder withMaxRowsPerFile(int maxRowsPerFile) { this.maxRowsPerFile = Optional.of(maxRowsPerFile); @@ -214,6 +229,19 @@ public Builder withAllowExternalBlobOutsideBases(boolean allow) { return this; } + /** + * Set the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @param maxBytes maximum pack file size in bytes + * @return this builder + */ + public Builder withBlobMaxPackFileBytes(long maxBytes) { + this.blobMaxPackFileBytes = Optional.of(maxBytes); + return this; + } + public WriteParams build() { return new WriteParams( maxRowsPerFile, @@ -226,7 +254,8 @@ public WriteParams build() { storageOptions, initialBases, targetBases, - allowExternalBlobOutsideBases); + allowExternalBlobOutsideBases, + blobMaxPackFileBytes); } } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 377ac546c3a..a2a7667a69d 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5962,6 +5962,7 @@ def write_dataset( target_bases: Optional[List[str]] = None, external_blob_mode: Literal["reference", "ingest"] = "reference", allow_external_blob_outside_bases: bool = False, + blob_max_pack_file_bytes: Optional[int] = None, namespace_client: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, ) -> LanceDataset: @@ -6067,6 +6068,9 @@ def write_dataset( If False, external blob URIs must map to the dataset root or a registered base path. If True, external blob URIs outside registered bases are allowed. This option only applies when ``external_blob_mode="reference"``. + blob_max_pack_file_bytes: optional, int, default None + Maximum size in bytes for blob v2 pack (.blob) sidecar files. When a pack + file reaches this size, a new one is started. If not set, defaults to 1 GiB. namespace_client : optional, LanceNamespace A namespace client from which to fetch table location and storage options. Must be provided together with `table_id`. Cannot be used with `uri`. @@ -6195,6 +6199,7 @@ def write_dataset( "target_bases": target_bases, "external_blob_mode": external_blob_mode, "allow_external_blob_outside_bases": allow_external_blob_outside_bases, + "blob_max_pack_file_bytes": blob_max_pack_file_bytes, } # Add namespace_client and table_id for storage options provider and managed diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8838b89bc0a..d70dff1e6fd 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -3500,6 +3500,9 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "blob_max_pack_file_bytes")? { + p = p.with_blob_max_pack_file_bytes(max_bytes); + } // Handle properties if let Some(props) = diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 33c365ee534..4311af529c7 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -308,12 +308,16 @@ impl BlobPreprocessor { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + max_pack_file_bytes: Option, ) -> Self { - let pack_writer = PackWriter::new( + let mut pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); + if let Some(max_bytes) = max_pack_file_bytes { + pack_writer.max_pack_size = max_bytes; + } let arrow_schema = arrow_schema::Schema::from(schema); let fields = arrow_schema.fields(); let blob_v2_cols = fields.iter().map(|field| field.is_blob_v2()).collect(); @@ -2399,6 +2403,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); let mut blob_builder = BlobArrayBuilder::new(1); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 46c4e47c81d..b6e732c757e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -298,6 +298,11 @@ pub struct WriteParams { /// The strategy used when writing external blob URIs. pub external_blob_mode: ExternalBlobMode, + + /// Maximum size in bytes for blob v2 pack (.blob) sidecar files. + /// When a pack file reaches this size, a new one is started. + /// If not set, defaults to 1 GiB. + pub blob_max_pack_file_bytes: Option, } impl Default for WriteParams { @@ -325,6 +330,7 @@ impl Default for WriteParams { target_base_names_or_paths: None, allow_external_blob_outside_bases: false, external_blob_mode: ExternalBlobMode::Reference, + blob_max_pack_file_bytes: None, } } } @@ -419,6 +425,14 @@ impl WriteParams { ..self } } + + /// Set the maximum size in bytes for blob v2 pack (.blob) sidecar files. + pub fn with_blob_max_pack_file_bytes(self, max_bytes: usize) -> Self { + Self { + blob_max_pack_file_bytes: Some(max_bytes), + ..self + } + } } /// Writes the given data to the dataset and returns fragments. @@ -491,6 +505,7 @@ pub async fn do_write_fragments( params.external_blob_mode, source_store_registry, source_store_params, + params.blob_max_pack_file_bytes, ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; @@ -999,6 +1014,7 @@ struct WriterOptions { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_max_pack_file_bytes: Option, } async fn open_writer_with_options( @@ -1016,6 +1032,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, + blob_max_pack_file_bytes, } = options; let data_file_key = generate_random_filename(); @@ -1063,6 +1080,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, + blob_max_pack_file_bytes, )) } else { None @@ -1105,6 +1123,7 @@ struct WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_max_pack_file_bytes: Option, /// Counter for round-robin selection next_base_index: AtomicUsize, } @@ -1122,6 +1141,7 @@ impl WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_max_pack_file_bytes: Option, ) -> Self { Self { object_store, @@ -1134,6 +1154,7 @@ impl WriterGenerator { external_blob_mode, source_store_registry, source_store_params, + blob_max_pack_file_bytes, next_base_index: AtomicUsize::new(0), } } @@ -1167,6 +1188,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), + blob_max_pack_file_bytes: self.blob_max_pack_file_bytes, }, ) .await? @@ -1184,6 +1206,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), + blob_max_pack_file_bytes: self.blob_max_pack_file_bytes, }, ) .await? @@ -1820,6 +1843,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); // Create a writer @@ -1937,6 +1961,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); // Create test batch From 1b0fa16642352de8e1d2f8a55a9b34489d3170df Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 14 Apr 2026 10:36:57 -0500 Subject: [PATCH 2/2] feat: rename to blob_pack_file_size_threshold and plumb blob params through Dataset.write() Rename blob_max_pack_file_bytes to blob_pack_file_size_threshold across Rust, Python, and Java. Wire both allow_external_blob_outside_bases and blob_pack_file_size_threshold through the Java Dataset.write() JNI path, which previously hardcoded them to null/defaults. Co-Authored-By: Claude Opus 4.6 (1M context) --- java/lance-jni/src/blocking_dataset.rs | 48 +++++++++++++------ java/lance-jni/src/fragment.rs | 20 ++++---- java/lance-jni/src/utils.rs | 6 +-- java/src/main/java/org/lance/Dataset.java | 12 ++++- java/src/main/java/org/lance/Fragment.java | 8 ++-- .../java/org/lance/WriteDatasetBuilder.java | 30 ++++++++++++ java/src/main/java/org/lance/WriteParams.java | 18 +++---- python/python/lance/dataset.py | 6 +-- python/src/dataset.rs | 4 +- rust/lance/src/dataset/blob.rs | 4 +- rust/lance/src/dataset/write.rs | 26 +++++----- 11 files changed, 119 insertions(+), 63 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 5e7d464a36f..cc9e53fe813 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -413,6 +413,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -430,6 +432,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, ) ) } @@ -449,6 +453,8 @@ fn inner_create_with_ffi_schema<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; @@ -468,6 +474,8 @@ fn inner_create_with_ffi_schema<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, None, // No namespace for schema-only creation false, // No managed versioning for schema-only creation @@ -523,6 +531,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, // Optional> target_bases: JObject, // Optional> + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) namespace_client_managed_versioning: jboolean, // Whether namespace manages versioning @@ -543,6 +553,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, namespace_obj, table_id_obj, namespace_client_managed_versioning != 0, @@ -555,19 +567,21 @@ fn inner_create_with_ffi_stream<'local>( env: &mut JNIEnv<'local>, arrow_array_stream_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - enable_v2_manifest_paths: JObject, // Optional - storage_options_obj: JObject, // Map - initial_bases: JObject, // Optional> - target_bases: JObject, // Optional> - namespace_obj: JObject, // LanceNamespace (can be null) - table_id_obj: JObject, // List (can be null) - namespace_client_managed_versioning: bool, // Whether namespace manages versioning + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional + storage_options_obj: JObject, // Map + initial_bases: JObject, // Optional> + target_bases: JObject, // Optional> + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional + namespace_obj: JObject, // LanceNamespace (can be null) + table_id_obj: JObject, // List (can be null) + namespace_client_managed_versioning: bool, // Whether namespace manages versioning ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -588,6 +602,8 @@ fn inner_create_with_ffi_stream<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, namespace_info, namespace_client_managed_versioning, @@ -613,6 +629,8 @@ fn create_dataset<'local>( storage_options_obj: JObject, initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, + blob_pack_file_size_threshold: JObject, reader: impl RecordBatchReader + Send + 'static, namespace_info: Option<(Arc, Vec)>, namespace_client_managed_versioning: bool, @@ -631,8 +649,8 @@ fn create_dataset<'local>( &storage_options_obj, &initial_bases, &target_bases, - &JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write() - &JObject::null(), // blob_max_pack_file_bytes not used for Dataset.write() + &allow_external_blob_outside_bases, + &blob_pack_file_size_threshold, )?; // Set up namespace commit handler and storage options provider if namespace is provided diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index c17ab86c8fd..ed3ab0f1ec3 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -99,7 +99,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional - blob_max_pack_file_bytes: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'local> { ok_or_throw_with_return!( env, @@ -118,7 +118,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, ), JObject::default() ) @@ -140,7 +140,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional - blob_max_pack_file_bytes: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray; let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; @@ -168,7 +168,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, reader, ) } @@ -189,7 +189,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional - blob_max_pack_file_bytes: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'a> { ok_or_throw_with_return!( env, @@ -207,7 +207,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, ), JObject::null() ) @@ -228,7 +228,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional - blob_max_pack_file_bytes: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -246,7 +246,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, reader, ) } @@ -265,7 +265,7 @@ fn create_fragment<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional - blob_max_pack_file_bytes: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional source: impl StreamingWriteSource, ) -> Result> { let path_str = dataset_uri.extract(env)?; @@ -283,7 +283,7 @@ fn create_fragment<'a>( &JObject::null(), // not used when creating fragments &JObject::null(), // not used when creating fragments &allow_external_blob_outside_bases, - &blob_max_pack_file_bytes, + &blob_pack_file_size_threshold, )?; // Set up storage options provider if namespace is provided diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index d90664dbf2b..17a4bc9702f 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -52,7 +52,7 @@ pub fn extract_write_params( initial_bases: &JObject, // Optional target_bases: &JObject, // Optional allow_external_blob_outside_bases: &JObject, // Optional - blob_max_pack_file_bytes: &JObject, // Optional + blob_pack_file_size_threshold: &JObject, // Optional ) -> Result { let mut write_params = WriteParams::default(); @@ -102,8 +102,8 @@ pub fn extract_write_params( if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? { write_params.allow_external_blob_outside_bases = allow; } - if let Some(max_bytes) = env.get_long_opt(blob_max_pack_file_bytes)? { - write_params.blob_max_pack_file_bytes = Some(max_bytes as usize); + if let Some(max_bytes) = env.get_long_opt(blob_pack_file_size_threshold)? { + write_params.blob_pack_file_size_threshold = Some(max_bytes as usize); } // Create storage options accessor from static storage_options diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index bc7500270d7..533b6d6771c 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -150,7 +150,9 @@ public static Dataset create( params.getEnableV2ManifestPaths(), params.getStorageOptions(), params.getInitialBases(), - params.getTargetBases()); + params.getTargetBases(), + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold()); dataset.allocator = allocator; return dataset; } @@ -196,7 +198,9 @@ private static native Dataset createWithFfiSchema( Optional enableV2ManifestPaths, Map storageOptions, Optional> initialBases, - Optional> targetBases); + Optional> targetBases, + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold); /** * Creates a dataset from an FFI arrow stream. @@ -231,6 +235,8 @@ private static native Dataset createWithFfiStream( Map storageOptions, Optional> initialBases, Optional> targetBases, + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold, LanceNamespace namespaceClient, List tableId, boolean namespaceClientManagedVersioning); @@ -280,6 +286,8 @@ static Dataset create( params.getStorageOptions(), params.getInitialBases(), params.getTargetBases(), + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold(), namespaceClient, tableId, namespaceClientManagedVersioning); diff --git a/java/src/main/java/org/lance/Fragment.java b/java/src/main/java/org/lance/Fragment.java index 1d35332c91a..43091269382 100644 --- a/java/src/main/java/org/lance/Fragment.java +++ b/java/src/main/java/org/lance/Fragment.java @@ -281,7 +281,7 @@ static List create( namespaceClient, tableId, params.getAllowExternalBlobOutsideBases(), - params.getBlobMaxPackFileBytes()); + params.getBlobPackFileSizeThreshold()); } } @@ -308,7 +308,7 @@ static List create( namespaceClient, tableId, params.getAllowExternalBlobOutsideBases(), - params.getBlobMaxPackFileBytes()); + params.getBlobPackFileSizeThreshold()); } /** Create a fragment from the given arrow array and schema. */ @@ -326,7 +326,7 @@ private static native List createWithFfiArray( LanceNamespace namespaceClient, List tableId, Optional allowExternalBlobOutsideBases, - Optional blobMaxPackFileBytes); + Optional blobPackFileSizeThreshold); /** Create a fragment from the given arrow stream. */ private static native List createWithFfiStream( @@ -342,5 +342,5 @@ private static native List createWithFfiStream( LanceNamespace namespaceClient, List tableId, Optional allowExternalBlobOutsideBases, - Optional blobMaxPackFileBytes); + Optional blobPackFileSizeThreshold); } diff --git a/java/src/main/java/org/lance/WriteDatasetBuilder.java b/java/src/main/java/org/lance/WriteDatasetBuilder.java index c0e5f8839c2..3c06b43fd54 100644 --- a/java/src/main/java/org/lance/WriteDatasetBuilder.java +++ b/java/src/main/java/org/lance/WriteDatasetBuilder.java @@ -78,6 +78,8 @@ public class WriteDatasetBuilder { private Optional dataStorageVersion = Optional.empty(); private Optional> initialBases = Optional.empty(); private Optional> targetBases = Optional.empty(); + private Optional allowExternalBlobOutsideBases = Optional.empty(); + private Optional blobPackFileSizeThreshold = Optional.empty(); private Session session; /** Creates a new builder instance. Package-private, use Dataset.write() instead. */ @@ -282,6 +284,30 @@ public WriteDatasetBuilder targetBases(List targetBases) { return this; } + /** + * Sets whether to allow external blob URIs outside registered base paths. + * + * @param allowExternalBlobOutsideBases Whether to allow external blob URIs outside bases + * @return this builder instance + */ + public WriteDatasetBuilder allowExternalBlobOutsideBases(boolean allowExternalBlobOutsideBases) { + this.allowExternalBlobOutsideBases = Optional.of(allowExternalBlobOutsideBases); + return this; + } + + /** + * Sets the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @param blobPackFileSizeThreshold maximum pack file size in bytes + * @return this builder instance + */ + public WriteDatasetBuilder blobPackFileSizeThreshold(long blobPackFileSizeThreshold) { + this.blobPackFileSizeThreshold = Optional.of(blobPackFileSizeThreshold); + return this; + } + /** * Sets the session to share caches with other datasets. * @@ -414,6 +440,8 @@ private Dataset executeWithNamespaceClient() { initialBases.ifPresent(paramsBuilder::withInitialBases); targetBases.ifPresent(paramsBuilder::withTargetBases); + allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases); + blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold); WriteParams params = paramsBuilder.build(); @@ -446,6 +474,8 @@ private Dataset executeWithUri() { dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); initialBases.ifPresent(paramsBuilder::withInitialBases); targetBases.ifPresent(paramsBuilder::withTargetBases); + allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases); + blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold); WriteParams params = paramsBuilder.build(); diff --git a/java/src/main/java/org/lance/WriteParams.java b/java/src/main/java/org/lance/WriteParams.java index a98f5940713..970007f442f 100644 --- a/java/src/main/java/org/lance/WriteParams.java +++ b/java/src/main/java/org/lance/WriteParams.java @@ -41,7 +41,7 @@ public enum WriteMode { private final Optional> initialBases; private final Optional> targetBases; private final Optional allowExternalBlobOutsideBases; - private final Optional blobMaxPackFileBytes; + private final Optional blobPackFileSizeThreshold; private WriteParams( Optional maxRowsPerFile, @@ -55,7 +55,7 @@ private WriteParams( Optional> initialBases, Optional> targetBases, Optional allowExternalBlobOutsideBases, - Optional blobMaxPackFileBytes) { + Optional blobPackFileSizeThreshold) { this.maxRowsPerFile = maxRowsPerFile; this.maxRowsPerGroup = maxRowsPerGroup; this.maxBytesPerFile = maxBytesPerFile; @@ -67,7 +67,7 @@ private WriteParams( this.initialBases = initialBases; this.targetBases = targetBases; this.allowExternalBlobOutsideBases = allowExternalBlobOutsideBases; - this.blobMaxPackFileBytes = blobMaxPackFileBytes; + this.blobPackFileSizeThreshold = blobPackFileSizeThreshold; } public Optional getMaxRowsPerFile() { @@ -134,8 +134,8 @@ public Optional getAllowExternalBlobOutsideBases() { * * @return Optional containing the max pack file size in bytes, or empty if not set */ - public Optional getBlobMaxPackFileBytes() { - return blobMaxPackFileBytes; + public Optional getBlobPackFileSizeThreshold() { + return blobPackFileSizeThreshold; } @Override @@ -162,7 +162,7 @@ public static class Builder { private Optional> initialBases = Optional.empty(); private Optional> targetBases = Optional.empty(); private Optional allowExternalBlobOutsideBases = Optional.empty(); - private Optional blobMaxPackFileBytes = Optional.empty(); + private Optional blobPackFileSizeThreshold = Optional.empty(); public Builder withMaxRowsPerFile(int maxRowsPerFile) { this.maxRowsPerFile = Optional.of(maxRowsPerFile); @@ -237,8 +237,8 @@ public Builder withAllowExternalBlobOutsideBases(boolean allow) { * @param maxBytes maximum pack file size in bytes * @return this builder */ - public Builder withBlobMaxPackFileBytes(long maxBytes) { - this.blobMaxPackFileBytes = Optional.of(maxBytes); + public Builder withBlobPackFileSizeThreshold(long maxBytes) { + this.blobPackFileSizeThreshold = Optional.of(maxBytes); return this; } @@ -255,7 +255,7 @@ public WriteParams build() { initialBases, targetBases, allowExternalBlobOutsideBases, - blobMaxPackFileBytes); + blobPackFileSizeThreshold); } } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index a2a7667a69d..b8257692d03 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5962,7 +5962,7 @@ def write_dataset( target_bases: Optional[List[str]] = None, external_blob_mode: Literal["reference", "ingest"] = "reference", allow_external_blob_outside_bases: bool = False, - blob_max_pack_file_bytes: Optional[int] = None, + blob_pack_file_size_threshold: Optional[int] = None, namespace_client: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, ) -> LanceDataset: @@ -6068,7 +6068,7 @@ def write_dataset( If False, external blob URIs must map to the dataset root or a registered base path. If True, external blob URIs outside registered bases are allowed. This option only applies when ``external_blob_mode="reference"``. - blob_max_pack_file_bytes: optional, int, default None + blob_pack_file_size_threshold: optional, int, default None Maximum size in bytes for blob v2 pack (.blob) sidecar files. When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. namespace_client : optional, LanceNamespace @@ -6199,7 +6199,7 @@ def write_dataset( "target_bases": target_bases, "external_blob_mode": external_blob_mode, "allow_external_blob_outside_bases": allow_external_blob_outside_bases, - "blob_max_pack_file_bytes": blob_max_pack_file_bytes, + "blob_pack_file_size_threshold": blob_pack_file_size_threshold, } # Add namespace_client and table_id for storage options provider and managed diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d70dff1e6fd..2a4a28d6d4d 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -3500,8 +3500,8 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "blob_max_pack_file_bytes")? { - p = p.with_blob_max_pack_file_bytes(max_bytes); + if let Some(max_bytes) = get_dict_opt::(options, "blob_pack_file_size_threshold")? { + p = p.with_blob_pack_file_size_threshold(max_bytes); } // Handle properties diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 4311af529c7..7a736ec4928 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -308,14 +308,14 @@ impl BlobPreprocessor { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, - max_pack_file_bytes: Option, + pack_file_size_threshold: Option, ) -> Self { let mut pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); - if let Some(max_bytes) = max_pack_file_bytes { + if let Some(max_bytes) = pack_file_size_threshold { pack_writer.max_pack_size = max_bytes; } let arrow_schema = arrow_schema::Schema::from(schema); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index b6e732c757e..d95d10c73bb 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -302,7 +302,7 @@ pub struct WriteParams { /// Maximum size in bytes for blob v2 pack (.blob) sidecar files. /// When a pack file reaches this size, a new one is started. /// If not set, defaults to 1 GiB. - pub blob_max_pack_file_bytes: Option, + pub blob_pack_file_size_threshold: Option, } impl Default for WriteParams { @@ -330,7 +330,7 @@ impl Default for WriteParams { target_base_names_or_paths: None, allow_external_blob_outside_bases: false, external_blob_mode: ExternalBlobMode::Reference, - blob_max_pack_file_bytes: None, + blob_pack_file_size_threshold: None, } } } @@ -427,9 +427,9 @@ impl WriteParams { } /// Set the maximum size in bytes for blob v2 pack (.blob) sidecar files. - pub fn with_blob_max_pack_file_bytes(self, max_bytes: usize) -> Self { + pub fn with_blob_pack_file_size_threshold(self, max_bytes: usize) -> Self { Self { - blob_max_pack_file_bytes: Some(max_bytes), + blob_pack_file_size_threshold: Some(max_bytes), ..self } } @@ -505,7 +505,7 @@ pub async fn do_write_fragments( params.external_blob_mode, source_store_registry, source_store_params, - params.blob_max_pack_file_bytes, + params.blob_pack_file_size_threshold, ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; @@ -1014,7 +1014,7 @@ struct WriterOptions { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, - blob_max_pack_file_bytes: Option, + blob_pack_file_size_threshold: Option, } async fn open_writer_with_options( @@ -1032,7 +1032,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, } = options; let data_file_key = generate_random_filename(); @@ -1080,7 +1080,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, )) } else { None @@ -1123,7 +1123,7 @@ struct WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, - blob_max_pack_file_bytes: Option, + blob_pack_file_size_threshold: Option, /// Counter for round-robin selection next_base_index: AtomicUsize, } @@ -1141,7 +1141,7 @@ impl WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, - blob_max_pack_file_bytes: Option, + blob_pack_file_size_threshold: Option, ) -> Self { Self { object_store, @@ -1154,7 +1154,7 @@ impl WriterGenerator { external_blob_mode, source_store_registry, source_store_params, - blob_max_pack_file_bytes, + blob_pack_file_size_threshold, next_base_index: AtomicUsize::new(0), } } @@ -1188,7 +1188,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), - blob_max_pack_file_bytes: self.blob_max_pack_file_bytes, + blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, }, ) .await? @@ -1206,7 +1206,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), - blob_max_pack_file_bytes: self.blob_max_pack_file_bytes, + blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, }, ) .await?