Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl BasicDeleteFileLoader {
self.file_io.clone(),
false,
None,
None,
)
.await?
.build()?
Expand Down
26 changes: 25 additions & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct ArrowReaderBuilder {
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
}

impl ArrowReaderBuilder {
Expand All @@ -80,6 +81,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
metadata_size_hint: None,
}
}

Expand Down Expand Up @@ -108,6 +110,15 @@ impl ArrowReaderBuilder {
self
}

/// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
///
/// This hint can help reduce the number of fetch requests. For more details see the
/// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
Expand All @@ -120,6 +131,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
metadata_size_hint: self.metadata_size_hint,
}
}
}
Expand All @@ -136,6 +148,7 @@ pub struct ArrowReader {

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
}

impl ArrowReader {
Expand All @@ -147,6 +160,7 @@ impl ArrowReader {
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;
let metadata_size_hint = self.metadata_size_hint;

// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
Expand All @@ -162,6 +176,7 @@ impl ArrowReader {
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
metadata_size_hint,
)
})
.map_err(|err| {
Expand All @@ -183,6 +198,7 @@ impl ArrowReader {
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
metadata_size_hint,
)
})
.map_err(|err| {
Expand All @@ -205,6 +221,7 @@ impl ArrowReader {
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
Expand All @@ -219,6 +236,7 @@ impl ArrowReader {
file_io.clone(),
should_load_page_index,
None,
metadata_size_hint,
)
.await?;

Expand Down Expand Up @@ -271,6 +289,7 @@ impl ArrowReader {
file_io.clone(),
should_load_page_index,
Some(options),
metadata_size_hint,
)
.await?
} else {
Expand Down Expand Up @@ -474,17 +493,22 @@ impl ArrowReader {
file_io: FileIO,
should_load_page_index: bool,
arrow_reader_options: Option<ArrowReaderOptions>,
metadata_size_hint: Option<usize>,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(data_file_path)?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
let mut parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
.with_preload_column_index(true)
.with_preload_offset_index(true)
.with_preload_page_index(should_load_page_index);

if let Some(hint) = metadata_size_hint {
parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
}

// Create the record batch stream builder, which wraps the parquet file reader
let options = arrow_reader_options.unwrap_or_default();
let record_batch_stream_builder =
Expand Down
Loading