diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 09143a6adf0..5bb30513354 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-data", + "arrow-ipc", "arrow-ord", "arrow-schema", "arrow-select", diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index aa56d54de95..cc9e53fe813 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -413,6 +413,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -430,6 +432,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, ) ) } @@ -449,6 +453,8 @@ fn inner_create_with_ffi_schema<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; @@ -468,6 +474,8 @@ fn inner_create_with_ffi_schema<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, None, // No namespace for schema-only creation false, // No managed versioning for schema-only creation @@ -523,6 +531,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( storage_options_obj: JObject, // Map initial_bases: JObject, // Optional> target_bases: JObject, // Optional> + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) namespace_client_managed_versioning: jboolean, // Whether namespace manages versioning @@ -543,6 +553,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, namespace_obj, table_id_obj, namespace_client_managed_versioning != 0, @@ -555,19 +567,21 @@ fn inner_create_with_ffi_stream<'local>( env: &mut JNIEnv<'local>, arrow_array_stream_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - enable_v2_manifest_paths: JObject, // Optional - storage_options_obj: JObject, // Map - initial_bases: JObject, // Optional> - target_bases: JObject, // Optional> - namespace_obj: JObject, // LanceNamespace (can be null) - table_id_obj: JObject, // List (can be null) - namespace_client_managed_versioning: bool, // Whether namespace manages versioning + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional + storage_options_obj: JObject, // Map + initial_bases: JObject, // Optional> + target_bases: JObject, // Optional> + allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional + namespace_obj: JObject, // LanceNamespace (can be null) + table_id_obj: JObject, // List (can be null) + namespace_client_managed_versioning: bool, // Whether namespace manages versioning ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -588,6 +602,8 @@ fn inner_create_with_ffi_stream<'local>( storage_options_obj, initial_bases, target_bases, + allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, namespace_info, namespace_client_managed_versioning, @@ -613,6 +629,8 @@ fn create_dataset<'local>( storage_options_obj: JObject, initial_bases: JObject, target_bases: JObject, + allow_external_blob_outside_bases: JObject, + blob_pack_file_size_threshold: JObject, reader: impl RecordBatchReader + Send + 'static, namespace_info: Option<(Arc, Vec)>, namespace_client_managed_versioning: bool, @@ -631,7 +649,8 @@ fn create_dataset<'local>( &storage_options_obj, &initial_bases, &target_bases, - &JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write() + &allow_external_blob_outside_bases, + &blob_pack_file_size_threshold, )?; // Set up namespace commit handler and storage options provider if namespace is provided diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 3b2a7ba6b22..ed3ab0f1ec3 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -99,6 +99,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'local> { ok_or_throw_with_return!( env, @@ -117,6 +118,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_pack_file_size_threshold, ), JObject::default() ) @@ -138,6 +140,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray; let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; @@ -165,6 +168,7 @@ fn inner_create_with_ffi_array<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, ) } @@ -185,6 +189,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> JObject<'a> { ok_or_throw_with_return!( env, @@ -202,6 +207,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_pack_file_size_threshold, ), JObject::null() ) @@ -222,6 +228,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -239,6 +246,7 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj, table_id_obj, allow_external_blob_outside_bases, + blob_pack_file_size_threshold, reader, ) } @@ -257,6 +265,7 @@ fn create_fragment<'a>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) allow_external_blob_outside_bases: JObject, // Optional + blob_pack_file_size_threshold: JObject, // Optional source: impl StreamingWriteSource, ) -> Result> { let path_str = dataset_uri.extract(env)?; @@ -274,6 +283,7 @@ fn create_fragment<'a>( &JObject::null(), // not used when creating fragments &JObject::null(), // not used when creating fragments &allow_external_blob_outside_bases, + &blob_pack_file_size_threshold, )?; // Set up storage options provider if namespace is provided diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index 3d7ef2ebdaf..17a4bc9702f 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -52,6 +52,7 @@ pub fn extract_write_params( initial_bases: &JObject, // Optional target_bases: &JObject, // Optional allow_external_blob_outside_bases: &JObject, // Optional + blob_pack_file_size_threshold: &JObject, // Optional ) -> Result { let mut write_params = WriteParams::default(); @@ -101,6 +102,9 @@ pub fn extract_write_params( if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? { write_params.allow_external_blob_outside_bases = allow; } + if let Some(max_bytes) = env.get_long_opt(blob_pack_file_size_threshold)? { + write_params.blob_pack_file_size_threshold = Some(max_bytes as usize); + } // Create storage options accessor from static storage_options let accessor = if storage_options.is_empty() { diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index bc7500270d7..533b6d6771c 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -150,7 +150,9 @@ public static Dataset create( params.getEnableV2ManifestPaths(), params.getStorageOptions(), params.getInitialBases(), - params.getTargetBases()); + params.getTargetBases(), + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold()); dataset.allocator = allocator; return dataset; } @@ -196,7 +198,9 @@ private static native Dataset createWithFfiSchema( Optional enableV2ManifestPaths, Map storageOptions, Optional> initialBases, - Optional> targetBases); + Optional> targetBases, + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold); /** * Creates a dataset from an FFI arrow stream. @@ -231,6 +235,8 @@ private static native Dataset createWithFfiStream( Map storageOptions, Optional> initialBases, Optional> targetBases, + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold, LanceNamespace namespaceClient, List tableId, boolean namespaceClientManagedVersioning); @@ -280,6 +286,8 @@ static Dataset create( params.getStorageOptions(), params.getInitialBases(), params.getTargetBases(), + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold(), namespaceClient, tableId, namespaceClientManagedVersioning); diff --git a/java/src/main/java/org/lance/Fragment.java b/java/src/main/java/org/lance/Fragment.java index 89a61560561..43091269382 100644 --- a/java/src/main/java/org/lance/Fragment.java +++ b/java/src/main/java/org/lance/Fragment.java @@ -280,7 +280,8 @@ static List create( params.getStorageOptions(), namespaceClient, tableId, - params.getAllowExternalBlobOutsideBases()); + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold()); } } @@ -306,7 +307,8 @@ static List create( params.getStorageOptions(), namespaceClient, tableId, - params.getAllowExternalBlobOutsideBases()); + params.getAllowExternalBlobOutsideBases(), + params.getBlobPackFileSizeThreshold()); } /** Create a fragment from the given arrow array and schema. */ @@ -323,7 +325,8 @@ private static native List createWithFfiArray( Map storageOptions, LanceNamespace namespaceClient, List tableId, - Optional allowExternalBlobOutsideBases); + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold); /** Create a fragment from the given arrow stream. */ private static native List createWithFfiStream( @@ -338,5 +341,6 @@ private static native List createWithFfiStream( Map storageOptions, LanceNamespace namespaceClient, List tableId, - Optional allowExternalBlobOutsideBases); + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold); } diff --git a/java/src/main/java/org/lance/WriteDatasetBuilder.java b/java/src/main/java/org/lance/WriteDatasetBuilder.java index c0e5f8839c2..3c06b43fd54 100644 --- a/java/src/main/java/org/lance/WriteDatasetBuilder.java +++ b/java/src/main/java/org/lance/WriteDatasetBuilder.java @@ -78,6 +78,8 @@ public class WriteDatasetBuilder { private Optional dataStorageVersion = Optional.empty(); private Optional> initialBases = Optional.empty(); private Optional> targetBases = Optional.empty(); + private Optional allowExternalBlobOutsideBases = Optional.empty(); + private Optional blobPackFileSizeThreshold = Optional.empty(); private Session session; /** Creates a new builder instance. Package-private, use Dataset.write() instead. */ @@ -282,6 +284,30 @@ public WriteDatasetBuilder targetBases(List targetBases) { return this; } + /** + * Sets whether to allow external blob URIs outside registered base paths. + * + * @param allowExternalBlobOutsideBases Whether to allow external blob URIs outside bases + * @return this builder instance + */ + public WriteDatasetBuilder allowExternalBlobOutsideBases(boolean allowExternalBlobOutsideBases) { + this.allowExternalBlobOutsideBases = Optional.of(allowExternalBlobOutsideBases); + return this; + } + + /** + * Sets the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @param blobPackFileSizeThreshold maximum pack file size in bytes + * @return this builder instance + */ + public WriteDatasetBuilder blobPackFileSizeThreshold(long blobPackFileSizeThreshold) { + this.blobPackFileSizeThreshold = Optional.of(blobPackFileSizeThreshold); + return this; + } + /** * Sets the session to share caches with other datasets. * @@ -414,6 +440,8 @@ private Dataset executeWithNamespaceClient() { initialBases.ifPresent(paramsBuilder::withInitialBases); targetBases.ifPresent(paramsBuilder::withTargetBases); + allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases); + blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold); WriteParams params = paramsBuilder.build(); @@ -446,6 +474,8 @@ private Dataset executeWithUri() { dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); initialBases.ifPresent(paramsBuilder::withInitialBases); targetBases.ifPresent(paramsBuilder::withTargetBases); + allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases); + blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold); WriteParams params = paramsBuilder.build(); diff --git a/java/src/main/java/org/lance/WriteParams.java b/java/src/main/java/org/lance/WriteParams.java index 6f5d4545ca9..970007f442f 100644 --- a/java/src/main/java/org/lance/WriteParams.java +++ b/java/src/main/java/org/lance/WriteParams.java @@ -41,6 +41,7 @@ public enum WriteMode { private final Optional> initialBases; private final Optional> targetBases; private final Optional allowExternalBlobOutsideBases; + private final Optional blobPackFileSizeThreshold; private WriteParams( Optional maxRowsPerFile, @@ -53,7 +54,8 @@ private WriteParams( Map storageOptions, Optional> initialBases, Optional> targetBases, - Optional allowExternalBlobOutsideBases) { + Optional allowExternalBlobOutsideBases, + Optional blobPackFileSizeThreshold) { this.maxRowsPerFile = maxRowsPerFile; this.maxRowsPerGroup = maxRowsPerGroup; this.maxBytesPerFile = maxBytesPerFile; @@ -65,6 +67,7 @@ private WriteParams( this.initialBases = initialBases; this.targetBases = targetBases; this.allowExternalBlobOutsideBases = allowExternalBlobOutsideBases; + this.blobPackFileSizeThreshold = blobPackFileSizeThreshold; } public Optional getMaxRowsPerFile() { @@ -124,6 +127,17 @@ public Optional getAllowExternalBlobOutsideBases() { return allowExternalBlobOutsideBases; } + /** + * Get the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @return Optional containing the max pack file size in bytes, or empty if not set + */ + public Optional getBlobPackFileSizeThreshold() { + return blobPackFileSizeThreshold; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -148,6 +162,7 @@ public static class Builder { private Optional> initialBases = Optional.empty(); private Optional> targetBases = Optional.empty(); private Optional allowExternalBlobOutsideBases = Optional.empty(); + private Optional blobPackFileSizeThreshold = Optional.empty(); public Builder withMaxRowsPerFile(int maxRowsPerFile) { this.maxRowsPerFile = Optional.of(maxRowsPerFile); @@ -214,6 +229,19 @@ public Builder withAllowExternalBlobOutsideBases(boolean allow) { return this; } + /** + * Set the maximum size in bytes for blob v2 pack (.blob) sidecar files. + * + *

When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB. + * + * @param maxBytes maximum pack file size in bytes + * @return this builder + */ + public Builder withBlobPackFileSizeThreshold(long maxBytes) { + this.blobPackFileSizeThreshold = Optional.of(maxBytes); + return this; + } + public WriteParams build() { return new WriteParams( maxRowsPerFile, @@ -226,7 +254,8 @@ public WriteParams build() { storageOptions, initialBases, targetBases, - allowExternalBlobOutsideBases); + allowExternalBlobOutsideBases, + blobPackFileSizeThreshold); } } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 377ac546c3a..b8257692d03 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5962,6 +5962,7 @@ def write_dataset( target_bases: Optional[List[str]] = None, external_blob_mode: Literal["reference", "ingest"] = "reference", allow_external_blob_outside_bases: bool = False, + blob_pack_file_size_threshold: Optional[int] = None, namespace_client: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, ) -> LanceDataset: @@ -6067,6 +6068,9 @@ def write_dataset( If False, external blob URIs must map to the dataset root or a registered base path. If True, external blob URIs outside registered bases are allowed. This option only applies when ``external_blob_mode="reference"``. + blob_pack_file_size_threshold: optional, int, default None + Maximum size in bytes for blob v2 pack (.blob) sidecar files. When a pack + file reaches this size, a new one is started. If not set, defaults to 1 GiB. namespace_client : optional, LanceNamespace A namespace client from which to fetch table location and storage options. Must be provided together with `table_id`. Cannot be used with `uri`. @@ -6195,6 +6199,7 @@ def write_dataset( "target_bases": target_bases, "external_blob_mode": external_blob_mode, "allow_external_blob_outside_bases": allow_external_blob_outside_bases, + "blob_pack_file_size_threshold": blob_pack_file_size_threshold, } # Add namespace_client and table_id for storage options provider and managed diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8838b89bc0a..2a4a28d6d4d 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -3500,6 +3500,9 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "blob_pack_file_size_threshold")? { + p = p.with_blob_pack_file_size_threshold(max_bytes); + } // Handle properties if let Some(props) = diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 33c365ee534..7a736ec4928 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -308,12 +308,16 @@ impl BlobPreprocessor { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + pack_file_size_threshold: Option, ) -> Self { - let pack_writer = PackWriter::new( + let mut pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); + if let Some(max_bytes) = pack_file_size_threshold { + pack_writer.max_pack_size = max_bytes; + } let arrow_schema = arrow_schema::Schema::from(schema); let fields = arrow_schema.fields(); let blob_v2_cols = fields.iter().map(|field| field.is_blob_v2()).collect(); @@ -2399,6 +2403,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); let mut blob_builder = BlobArrayBuilder::new(1); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 46c4e47c81d..d95d10c73bb 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -298,6 +298,11 @@ pub struct WriteParams { /// The strategy used when writing external blob URIs. pub external_blob_mode: ExternalBlobMode, + + /// Maximum size in bytes for blob v2 pack (.blob) sidecar files. + /// When a pack file reaches this size, a new one is started. + /// If not set, defaults to 1 GiB. + pub blob_pack_file_size_threshold: Option, } impl Default for WriteParams { @@ -325,6 +330,7 @@ impl Default for WriteParams { target_base_names_or_paths: None, allow_external_blob_outside_bases: false, external_blob_mode: ExternalBlobMode::Reference, + blob_pack_file_size_threshold: None, } } } @@ -419,6 +425,14 @@ impl WriteParams { ..self } } + + /// Set the maximum size in bytes for blob v2 pack (.blob) sidecar files. + pub fn with_blob_pack_file_size_threshold(self, max_bytes: usize) -> Self { + Self { + blob_pack_file_size_threshold: Some(max_bytes), + ..self + } + } } /// Writes the given data to the dataset and returns fragments. @@ -491,6 +505,7 @@ pub async fn do_write_fragments( params.external_blob_mode, source_store_registry, source_store_params, + params.blob_pack_file_size_threshold, ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; @@ -999,6 +1014,7 @@ struct WriterOptions { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_pack_file_size_threshold: Option, } async fn open_writer_with_options( @@ -1016,6 +1032,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, + blob_pack_file_size_threshold, } = options; let data_file_key = generate_random_filename(); @@ -1063,6 +1080,7 @@ async fn open_writer_with_options( external_blob_mode, source_store_registry, source_store_params, + blob_pack_file_size_threshold, )) } else { None @@ -1105,6 +1123,7 @@ struct WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_pack_file_size_threshold: Option, /// Counter for round-robin selection next_base_index: AtomicUsize, } @@ -1122,6 +1141,7 @@ impl WriterGenerator { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + blob_pack_file_size_threshold: Option, ) -> Self { Self { object_store, @@ -1134,6 +1154,7 @@ impl WriterGenerator { external_blob_mode, source_store_registry, source_store_params, + blob_pack_file_size_threshold, next_base_index: AtomicUsize::new(0), } } @@ -1167,6 +1188,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), + blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, }, ) .await? @@ -1184,6 +1206,7 @@ impl WriterGenerator { external_blob_mode: self.external_blob_mode, source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), + blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, }, ) .await? @@ -1820,6 +1843,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); // Create a writer @@ -1937,6 +1961,7 @@ mod tests { ExternalBlobMode::Reference, Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), + None, ); // Create test batch