diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index e12daf5324..fa47076fef 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -64,6 +64,7 @@ impl BasicDeleteFileLoader { self.file_io.clone(), false, None, + None, ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5b3a7bb862..c4c2fa0036 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -67,6 +67,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_size_hint: Option, } impl ArrowReaderBuilder { @@ -80,6 +81,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + metadata_size_hint: None, } } @@ -108,6 +110,15 @@ impl ArrowReaderBuilder { self } + /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata + /// + /// This hint can help reduce the number of fetch requests. For more details see the + /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { @@ -120,6 +131,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metadata_size_hint: self.metadata_size_hint, } } } @@ -136,6 +148,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_size_hint: Option, } impl ArrowReader { @@ -147,6 +160,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let metadata_size_hint = self.metadata_size_hint; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { @@ -162,6 +176,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + metadata_size_hint, ) }) .map_err(|err| { @@ -183,6 +198,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + metadata_size_hint, ) }) .map_err(|err| { @@ -205,6 +221,7 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_size_hint: Option, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -219,6 +236,7 @@ impl ArrowReader { file_io.clone(), should_load_page_index, None, + metadata_size_hint, ) .await?; @@ -271,6 +289,7 @@ impl ArrowReader { file_io.clone(), should_load_page_index, Some(options), + metadata_size_hint, ) .await? } else { @@ -474,17 +493,22 @@ impl ArrowReader { file_io: FileIO, should_load_page_index: bool, arrow_reader_options: Option, + metadata_size_hint: Option, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; - let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) + let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) .with_preload_column_index(true) .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index); + if let Some(hint) = metadata_size_hint { + parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); + } + // Create the record batch stream builder, which wraps the parquet file reader let options = arrow_reader_options.unwrap_or_default(); let record_batch_stream_builder =