From a7d0de3bb896579aa14b96f6564f15159def70c5 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 2 Feb 2026 09:38:11 -0500 Subject: [PATCH 1/2] Add ParquetMetaData caching to ArrowReader. --- .../iceberg/src/arrow/delete_file_loader.rs | 8 +- crates/iceberg/src/arrow/delete_filter.rs | 7 +- crates/iceberg/src/arrow/reader.rs | 157 ++++++++++++++++-- 3 files changed, 155 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index e12daf5324..1a09700e97 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use crate::arrow::ArrowReader; +use crate::arrow::reader::ParquetMetadataCache; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; @@ -43,12 +44,16 @@ pub trait DeleteFileLoader { #[derive(Clone, Debug)] pub(crate) struct BasicDeleteFileLoader { file_io: FileIO, + metadata_cache: ParquetMetadataCache, } #[allow(unused_variables)] impl BasicDeleteFileLoader { pub fn new(file_io: FileIO) -> Self { - BasicDeleteFileLoader { file_io } + BasicDeleteFileLoader { + file_io, + metadata_cache: ParquetMetadataCache::default(), + } } /// Loads a RecordBatchStream for a given datafile. pub(crate) async fn parquet_to_batch_stream( @@ -64,6 +69,7 @@ impl BasicDeleteFileLoader { self.file_io.clone(), false, None, + &self.metadata_cache, ) .await? .build()? diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4af9f6b6ff..6e7ebb6ee6 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -100,9 +100,10 @@ impl DeleteFilter { // Mark as loading to prevent duplicate work let notifier = Arc::new(Notify::new()); - state - .equality_deletes - .insert(file_path.to_string(), EqDelState::Loading(notifier.clone())); + state.equality_deletes.insert( + file_path.to_string(), + EqDelState::Loading(Arc::clone(¬ifier)), + ); Some(notifier) } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index aa45a12973..4010c94694 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; @@ -35,7 +35,7 @@ use fnv::FnvHashSet; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; use parquet::arrow::arrow_reader::{ - ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, + ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -43,6 +43,7 @@ use parquet::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, }; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use tokio::sync::Notify; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; @@ -60,6 +61,80 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// State for cached Parquet metadata +/// Modeled after PosDelState in delete_filter.rs +enum MetadataCacheState { + /// Metadata is currently being loaded by another task + Loading(Arc), + /// Metadata has been loaded + Loaded(Arc), +} + +/// Cache for Parquet file metadata, shared across concurrent FileScanTasks +/// Prevents redundant metadata loading when multiple tasks reference the same file +#[derive(Clone, Default)] +pub(crate) struct ParquetMetadataCache { + state: Arc>>, +} + +impl std::fmt::Debug for ParquetMetadataCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetMetadataCache") + .field("cached_files", &self.state.read().unwrap().len()) + .finish() + } +} + +/// Action to take when trying to get or load metadata +/// Modeled after PosDelLoadAction in delete_filter.rs +pub(crate) enum MetadataLoadAction { + /// Metadata is already loaded + Loaded(Arc), + /// Metadata is being loaded by another task - wait for notification + WaitFor(Arc), + /// Caller should load the metadata + Load, +} + +impl ParquetMetadataCache { + /// Try to get cached metadata or determine if we should load it + pub(crate) fn try_get_or_start_loading(&self, file_path: &str) -> MetadataLoadAction { + let mut state = self.state.write().unwrap(); + + if let Some(cached) = state.get(file_path) { + return match cached { + MetadataCacheState::Loaded(metadata) => { + MetadataLoadAction::Loaded(Arc::clone(metadata)) + } + MetadataCacheState::Loading(notify) => { + MetadataLoadAction::WaitFor(Arc::clone(notify)) + } + }; + } + + // Mark as loading to prevent duplicate work + let notify = Arc::new(Notify::new()); + state.insert( + file_path.to_string(), + MetadataCacheState::Loading(Arc::clone(¬ify)), + ); + + MetadataLoadAction::Load + } + + /// Store loaded metadata and notify waiters + pub(crate) fn finish_loading(&self, file_path: &str, metadata: Arc) { + let mut state = self.state.write().unwrap(); + + // Get the notify handle before replacing + if let Some(MetadataCacheState::Loading(notify)) = state.get(file_path) { + notify.notify_waiters(); + } + + state.insert(file_path.to_string(), MetadataCacheState::Loaded(metadata)); + } +} + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -120,6 +195,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metadata_cache: ParquetMetadataCache::default(), } } } @@ -136,6 +212,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_cache: ParquetMetadataCache, } impl ArrowReader { @@ -147,6 +224,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let metadata_cache = self.metadata_cache.clone(); // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { @@ -154,6 +232,7 @@ impl ArrowReader { tasks .and_then(move |task| { let file_io = file_io.clone(); + let metadata_cache = metadata_cache.clone(); Self::process_file_scan_task( task, @@ -162,6 +241,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + metadata_cache, ) }) .map_err(|err| { @@ -175,6 +255,7 @@ impl ArrowReader { tasks .map_ok(move |task| { let file_io = file_io.clone(); + let metadata_cache = metadata_cache.clone(); Self::process_file_scan_task( task, @@ -183,6 +264,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + metadata_cache, ) }) .map_err(|err| { @@ -205,6 +287,7 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + metadata_cache: ParquetMetadataCache, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -219,6 +302,7 @@ impl ArrowReader { file_io.clone(), should_load_page_index, None, + &metadata_cache, ) .await?; @@ -271,6 +355,7 @@ impl ArrowReader { file_io.clone(), should_load_page_index, Some(options), + &metadata_cache, ) .await? } else { @@ -474,21 +559,67 @@ impl ArrowReader { file_io: FileIO, should_load_page_index: bool, arrow_reader_options: Option, + metadata_cache: &ParquetMetadataCache, ) -> Result>> { - // Get the metadata for the Parquet file we need to read and build - // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; - let (parquet_metadata, parquet_reader) = + + // Get or load ParquetMetaData from cache + let parquet_metadata: Arc = loop { + match metadata_cache.try_get_or_start_loading(data_file_path) { + MetadataLoadAction::Loaded(metadata) => break metadata, + MetadataLoadAction::WaitFor(notify) => { + notify.notified().await; + // Loop again to get the loaded result + } + MetadataLoadAction::Load => { + // We're responsible for loading + let (file_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let mut temp_reader = ArrowFileReader::new(file_metadata, parquet_reader) + .with_preload_column_index(true) + .with_preload_offset_index(true) + .with_preload_page_index(should_load_page_index); + + // Load metadata via ArrowReaderMetadata + let temp_arrow_metadata = + ArrowReaderMetadata::load_async(&mut temp_reader, Default::default()) + .await + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata") + .with_source(e) + })?; + + let metadata = Arc::clone(temp_arrow_metadata.metadata()); + metadata_cache.finish_loading(data_file_path, Arc::clone(&metadata)); + break metadata; + } + } + }; + + // Create ArrowReaderMetadata from cached ParquetMetaData with the requested options + let options = arrow_reader_options.unwrap_or_default(); + let arrow_reader_metadata = ArrowReaderMetadata::try_new(parquet_metadata, options) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create ArrowReaderMetadata from ParquetMetaData", + ) + .with_source(e) + })?; + + // Create a fresh reader for data access (still need file handle) + let (file_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; - let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader) + let parquet_file_reader = ArrowFileReader::new(file_metadata, parquet_reader) .with_preload_column_index(true) .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index); - // Create the record batch stream builder, which wraps the parquet file reader - let options = arrow_reader_options.unwrap_or_default(); - let record_batch_stream_builder = - ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; + // Use new_with_metadata to avoid reloading metadata + let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + parquet_file_reader, + arrow_reader_metadata, + ); Ok(record_batch_stream_builder) } @@ -679,7 +810,7 @@ impl ArrowReader { scale: requested_scale, }), ) if requested_precision >= file_precision && file_scale == requested_scale => true, - // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16). + // Uuid will be store as Fixed(16) in Parquet file, so the read back type will be Fixed(16). (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true, _ => false, } @@ -965,7 +1096,7 @@ impl ArrowReader { } } -/// Build the map of parquet field id to Parquet column index in the schema. +/// Build the map of Parquet field id to Parquet column index in the schema. /// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables). fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result>> { let mut column_map = HashMap::new(); @@ -3954,7 +4085,7 @@ message schema { let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); - // Create 3 parquet files with different data + // Create 3 Parquet files with different data let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); From 84a5723ea4e42e0163c71a1e22c05b185f155a4b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 2 Feb 2026 10:18:17 -0500 Subject: [PATCH 2/2] Add test. --- crates/iceberg/src/arrow/reader.rs | 121 +++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4010c94694..104de492ae 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -71,16 +71,17 @@ enum MetadataCacheState { } /// Cache for Parquet file metadata, shared across concurrent FileScanTasks -/// Prevents redundant metadata loading when multiple tasks reference the same file +/// Prevents redundant metadata loading when multiple tasks reference the same file. +/// Cache key is (file_path, should_load_page_index) since metadata content differs. #[derive(Clone, Default)] pub(crate) struct ParquetMetadataCache { - state: Arc>>, + state: Arc>>, } impl std::fmt::Debug for ParquetMetadataCache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ParquetMetadataCache") - .field("cached_files", &self.state.read().unwrap().len()) + .field("cached_entries", &self.state.read().unwrap().len()) .finish() } } @@ -98,10 +99,15 @@ pub(crate) enum MetadataLoadAction { impl ParquetMetadataCache { /// Try to get cached metadata or determine if we should load it - pub(crate) fn try_get_or_start_loading(&self, file_path: &str) -> MetadataLoadAction { + pub(crate) fn try_get_or_start_loading( + &self, + file_path: &str, + load_page_index: bool, + ) -> MetadataLoadAction { let mut state = self.state.write().unwrap(); + let key = (file_path.to_string(), load_page_index); - if let Some(cached) = state.get(file_path) { + if let Some(cached) = state.get(&key) { return match cached { MetadataCacheState::Loaded(metadata) => { MetadataLoadAction::Loaded(Arc::clone(metadata)) @@ -114,24 +120,33 @@ impl ParquetMetadataCache { // Mark as loading to prevent duplicate work let notify = Arc::new(Notify::new()); - state.insert( - file_path.to_string(), - MetadataCacheState::Loading(Arc::clone(¬ify)), - ); + state.insert(key, MetadataCacheState::Loading(Arc::clone(¬ify))); MetadataLoadAction::Load } /// Store loaded metadata and notify waiters - pub(crate) fn finish_loading(&self, file_path: &str, metadata: Arc) { + pub(crate) fn finish_loading( + &self, + file_path: &str, + load_page_index: bool, + metadata: Arc, + ) { let mut state = self.state.write().unwrap(); + let key = (file_path.to_string(), load_page_index); // Get the notify handle before replacing - if let Some(MetadataCacheState::Loading(notify)) = state.get(file_path) { + if let Some(MetadataCacheState::Loading(notify)) = state.get(&key) { notify.notify_waiters(); } - state.insert(file_path.to_string(), MetadataCacheState::Loaded(metadata)); + state.insert(key, MetadataCacheState::Loaded(metadata)); + } + + /// Returns the number of cached entries (for testing) + #[cfg(test)] + pub(crate) fn len(&self) -> usize { + self.state.read().unwrap().len() } } @@ -565,7 +580,7 @@ impl ArrowReader { // Get or load ParquetMetaData from cache let parquet_metadata: Arc = loop { - match metadata_cache.try_get_or_start_loading(data_file_path) { + match metadata_cache.try_get_or_start_loading(data_file_path, should_load_page_index) { MetadataLoadAction::Loaded(metadata) => break metadata, MetadataLoadAction::WaitFor(notify) => { notify.notified().await; @@ -590,7 +605,11 @@ impl ArrowReader { })?; let metadata = Arc::clone(temp_arrow_metadata.metadata()); - metadata_cache.finish_loading(data_file_path, Arc::clone(&metadata)); + metadata_cache.finish_loading( + data_file_path, + should_load_page_index, + Arc::clone(&metadata), + ); break metadata; } } @@ -1913,7 +1932,7 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; + use arrow_array::{ArrayRef, Int32Array, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; @@ -1927,7 +1946,9 @@ mod tests { use tempfile::TempDir; use crate::ErrorKind; - use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; + use crate::arrow::reader::{ + CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY, ParquetMetadataCache, + }; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; @@ -4370,4 +4391,72 @@ message schema { assert_eq!(name_col.value(2), "Charlie"); assert_eq!(name_col.value(3), "Dave"); } + + #[tokio::test] + async fn test_parquet_metadata_cache_prevents_duplicate_loads() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Create a Parquet file + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let id_data = Arc::new(Int32Array::from_iter_values(0..10)) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![id_data]).unwrap(); + + let file_path = format!("{table_location}/test.parquet"); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Create a shared cache + let cache = ParquetMetadataCache::default(); + assert_eq!(cache.len(), 0); + + // First call should load and cache metadata + let _builder1 = ArrowReader::create_parquet_record_batch_stream_builder( + &file_path, + file_io.clone(), + false, // should_load_page_index + None, + &cache, + ) + .await + .unwrap(); + assert_eq!(cache.len(), 1); + + // Second call with same parameters should hit cache + let _builder2 = ArrowReader::create_parquet_record_batch_stream_builder( + &file_path, + file_io.clone(), + false, + None, + &cache, + ) + .await + .unwrap(); + assert_eq!(cache.len(), 1); // Still 1 - cache hit + + // Third call with different should_load_page_index creates new cache entry + let _builder3 = ArrowReader::create_parquet_record_batch_stream_builder( + &file_path, + file_io.clone(), + true, // Different should_load_page_index + None, + &cache, + ) + .await + .unwrap(); + assert_eq!(cache.len(), 2); // Now 2 - different key + } }