From c51e16e02774cf19465a5edca4457e1017a1425a Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sat, 2 Aug 2025 21:12:37 -0700 Subject: [PATCH 01/19] make CachedParquetMetaData pub --- datafusion/datasource-parquet/src/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 6ad9428770e90..75c1dfa09e901 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -282,7 +282,7 @@ impl AsyncFileReader for CachedParquetFileReader { } /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. -struct CachedParquetMetaData(Arc); +pub struct CachedParquetMetaData(Arc); impl FileMetadata for CachedParquetMetaData { fn as_any(&self) -> &dyn Any { From 74359e0c22056de46810054aded90f46af9b9b00 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sat, 2 Aug 2025 21:18:13 -0700 Subject: [PATCH 02/19] make parquet reader pub --- datafusion/datasource-parquet/src/mod.rs | 3 +-- datafusion/datasource-parquet/src/reader.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 0b4e862403837..ef7a3b1a9fe59 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -24,7 +24,7 @@ pub mod file_format; mod metrics; mod opener; mod page_filter; -mod reader; +pub mod reader; mod row_filter; mod row_group_filter; pub mod source; @@ -34,7 +34,6 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 75c1dfa09e901..219128736923d 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -91,7 +91,7 @@ impl DefaultParquetFileReaderFactory { /// This implementation does not coalesce I/O operations or cache bytes. Such /// optimizations can be done either at the object store level or by providing a /// custom implementation of [`ParquetFileReaderFactory`]. -pub(crate) struct ParquetFileReader { +pub struct ParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, } @@ -212,7 +212,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { /// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata /// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then /// updates the cache. -pub(crate) struct CachedParquetFileReader { +pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, file_meta: FileMeta, From c67ba4a142dc7f63591fd65cbceaedef4bfcf705 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sat, 2 Aug 2025 21:21:21 -0700 Subject: [PATCH 03/19] undo --- datafusion/datasource-parquet/src/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index ef7a3b1a9fe59..f04af0bcd6dd5 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -34,6 +34,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; +pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; From 84f91b07089863e01c4b54aff312c01e98d38278 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sat, 2 Aug 2025 21:44:01 -0700 Subject: [PATCH 04/19] add impl --- datafusion/datasource-parquet/src/reader.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 219128736923d..cc58da25ac7f9 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -253,10 +253,10 @@ impl AsyncFileReader for CachedParquetFileReader { // lookup if the metadata is already cached if let Some(metadata) = metadata_cache.get(object_meta) { - if let Some(parquet_metadata) = + if let Some(cached_parquet_metadata) = metadata.as_any().downcast_ref::() { - return Ok(Arc::clone(&parquet_metadata.0)); + return Ok(Arc::clone(cached_parquet_metadata.parquet_metadata())); } } @@ -284,6 +284,12 @@ impl AsyncFileReader for CachedParquetFileReader { /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. pub struct CachedParquetMetaData(Arc); +impl CachedParquetMetaData { + pub fn parquet_metadata(&self) -> &Arc { + &self.0 + } +} + impl FileMetadata for CachedParquetMetaData { fn as_any(&self) -> &dyn Any { self From e3efe06c2599db62445fa1b82681a62d9eadd1b3 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sun, 3 Aug 2025 04:15:18 -0700 Subject: [PATCH 05/19] cached meta in reader --- .../src/datasource/file_format/parquet.rs | 90 +++++++++++++++---- .../core/tests/parquet/custom_reader.rs | 2 + .../datasource-parquet/src/file_format.rs | 47 ++++++++-- 3 files changed, 117 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9b343923f0145..12146005c15cc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -195,9 +195,16 @@ mod tests { let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); - let stats = - fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None, None) - .await?; + let stats = fetch_statistics( + store.as_ref(), + schema.clone(), + &meta[0], + None, + None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -205,8 +212,16 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = - fetch_statistics(store.as_ref(), schema, &meta[1], None, None).await?; + let stats = fetch_statistics( + store.as_ref(), + schema, + &meta[1], + None, + None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -383,6 +398,8 @@ mod tests { &meta[0], Some(9), None, + false, + None, ) .await .expect("error reading metadata with hint"); @@ -409,6 +426,8 @@ mod tests { &meta[0], Some(9), None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; @@ -425,9 +444,16 @@ mod tests { // Use the file size as the hint so we can get the full metadata from the first fetch let size_hint = meta[0].size as usize; - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) - .await - .expect("error reading metadata with hint"); + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); @@ -445,6 +471,8 @@ mod tests { &meta[0], Some(size_hint), None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; @@ -461,9 +489,16 @@ mod tests { // Use the a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) - .await - .expect("error reading metadata with hint"); + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 1); @@ -500,8 +535,15 @@ mod tests { let schema = format.infer_schema(&state, &store, &files).await.unwrap(); // Fetch statistics for first file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; + let pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[0], + None, + None, + format.options().global.cache_metadata, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -559,8 +601,15 @@ mod tests { }; // Fetch statistics for first file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; + let pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[0], + None, + None, + format.options().global.cache_metadata, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -586,8 +635,15 @@ mod tests { assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); // Fetch statistics for second file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[1], None, None).await?; + let pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[1], + None, + None, + format.options().global.cache_metadata, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 5fc3513ff745b..e12c1dc4b2fb5 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -242,6 +242,8 @@ impl AsyncFileReader for ParquetFileReader { &self.meta, self.metadata_size_hint, None, + false, + None, ) .await .map_err(|e| { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 7210cc09a0b39..0805a783f4cf5 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,7 +63,7 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::reader::CachedParquetFileReaderFactory; +use crate::reader::{CachedParquetFileReaderFactory, CachedParquetMetaData}; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -83,6 +83,7 @@ use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -310,6 +311,8 @@ async fn fetch_schema_with_location( metadata_size_hint: Option, file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, + cache_metadata: bool, + file_metadata_cache: Option>, ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); let schema = fetch_schema( @@ -318,6 +321,8 @@ async fn fetch_schema_with_location( metadata_size_hint, file_decryption_properties, coerce_int96, + cache_metadata, + file_metadata_cache, ) .await?; Ok((loc_path, schema)) @@ -371,6 +376,8 @@ impl FileFormat for ParquetFormat { self.metadata_size_hint(), file_decryption_properties.as_ref(), coerce_int96, + self.options.global.cache_metadata, + state.runtime_env().cache_manager.get_file_metadata_cache(), ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -414,7 +421,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &dyn Session, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -429,6 +436,8 @@ impl FileFormat for ParquetFormat { object, self.metadata_size_hint(), file_decryption_properties.as_ref(), + self.options.global.cache_metadata, + state.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; Ok(stats) @@ -974,7 +983,23 @@ pub async fn fetch_parquet_metadata( meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, + cache_metadata: bool, + file_metadata_cache: Option>, ) -> Result { + // Check cache first if caching is enabled + if cache_metadata { + if let Some(cache) = &file_metadata_cache { + if let Some(cached_metadata) = cache.get(meta) { + if let Some(parquet_metadata) = cached_metadata + .as_any() + .downcast_ref::() + { + return Ok(parquet_metadata.parquet_metadata().as_ref().clone()); + } + } + } + } + let file_size = meta.size; let fetch = ObjectStoreFetch::new(store, meta); @@ -996,12 +1021,16 @@ async fn fetch_schema( metadata_size_hint: Option, file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, + cache_metadata: bool, + file_metadata_cache: Option>, ) -> Result { let metadata = fetch_parquet_metadata( store, file, metadata_size_hint, file_decryption_properties, + cache_metadata, + file_metadata_cache, ) .await?; let file_metadata = metadata.file_metadata(); @@ -1026,10 +1055,18 @@ pub async fn fetch_statistics( file: &ObjectMeta, metadata_size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, + cache_metadata: bool, + file_metadata_cache: Option>, ) -> Result { - let metadata = - fetch_parquet_metadata(store, file, metadata_size_hint, decryption_properties) - .await?; + let metadata = fetch_parquet_metadata( + store, + file, + metadata_size_hint, + decryption_properties, + cache_metadata, + file_metadata_cache, + ) + .await?; statistics_from_parquet_meta_calc(&metadata, table_schema) } From f95ea46df740e0d74c08a2630fe328d5466ae6b0 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sun, 3 Aug 2025 04:36:17 -0700 Subject: [PATCH 06/19] update function docs --- docs/source/user-guide/sql/window_functions.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/user-guide/sql/window_functions.md b/docs/source/user-guide/sql/window_functions.md index 73e9731cdbc03..dc06f3d051bb5 100644 --- a/docs/source/user-guide/sql/window_functions.md +++ b/docs/source/user-guide/sql/window_functions.md @@ -331,6 +331,8 @@ FROM employees; +-------------+--------+---------+ ``` +# + ## Analytical Functions - [first_value](#first_value) From 0d6b5ffc873069ce9202960b60672fdede16c4f9 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sun, 3 Aug 2025 04:48:41 -0700 Subject: [PATCH 07/19] cache after fetch --- datafusion/datasource-parquet/src/file_format.rs | 14 ++++++++++++-- datafusion/datasource-parquet/src/reader.rs | 4 ++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 0805a783f4cf5..5cc7c50f973ec 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1008,10 +1008,20 @@ pub async fn fetch_parquet_metadata( #[cfg(feature = "parquet_encryption")] let reader = reader.with_decryption_properties(decryption_properties); - reader + let metadata = reader .load_and_finish(fetch, file_size) .await - .map_err(DataFusionError::from) + .map_err(DataFusionError::from)?; + + if cache_metadata { + if let Some(cache) = file_metadata_cache { + let cached_metadata = + Arc::new(CachedParquetMetaData::new(Arc::new(metadata.clone()))); + cache.put(meta, cached_metadata); + } + } + + Ok(metadata) } /// Read and parse the schema of the Parquet file at location `path` diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index cc58da25ac7f9..56f303c4159df 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -285,6 +285,10 @@ impl AsyncFileReader for CachedParquetFileReader { pub struct CachedParquetMetaData(Arc); impl CachedParquetMetaData { + pub fn new(metadata: Arc) -> Self { + Self(metadata) + } + pub fn parquet_metadata(&self) -> &Arc { &self.0 } From c12bc46844391a5d3723848dd3827fb10d58e5dc Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sun, 3 Aug 2025 04:55:35 -0700 Subject: [PATCH 08/19] cache meta --- datafusion/datasource-parquet/src/file_format.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 5cc7c50f973ec..72bf7e3a6a084 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1004,7 +1004,6 @@ pub async fn fetch_parquet_metadata( let fetch = ObjectStoreFetch::new(store, meta); let reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); - #[cfg(feature = "parquet_encryption")] let reader = reader.with_decryption_properties(decryption_properties); @@ -1015,9 +1014,10 @@ pub async fn fetch_parquet_metadata( if cache_metadata { if let Some(cache) = file_metadata_cache { - let cached_metadata = - Arc::new(CachedParquetMetaData::new(Arc::new(metadata.clone()))); - cache.put(meta, cached_metadata); + cache.put( + meta, + Arc::new(CachedParquetMetaData::new(Arc::new(metadata.clone()))), + ); } } From c20b142b0672192f7490376d731661e7ffd09d82 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Sun, 3 Aug 2025 06:50:04 -0700 Subject: [PATCH 09/19] return arc --- datafusion/core/tests/parquet/custom_reader.rs | 2 +- datafusion/datasource-parquet/src/file_format.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index e12c1dc4b2fb5..e7beb363393ac 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -251,7 +251,7 @@ impl AsyncFileReader for ParquetFileReader { "AsyncChunkReader::get_metadata error: {e}" )) })?; - Ok(Arc::new(metadata)) + Ok(metadata) }) } } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 72bf7e3a6a084..2716cf4a3fa00 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -985,7 +985,7 @@ pub async fn fetch_parquet_metadata( #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, cache_metadata: bool, file_metadata_cache: Option>, -) -> Result { +) -> Result> { // Check cache first if caching is enabled if cache_metadata { if let Some(cache) = &file_metadata_cache { @@ -994,7 +994,7 @@ pub async fn fetch_parquet_metadata( .as_any() .downcast_ref::() { - return Ok(parquet_metadata.parquet_metadata().as_ref().clone()); + return Ok(Arc::clone(parquet_metadata.parquet_metadata())); } } } @@ -1007,16 +1007,18 @@ pub async fn fetch_parquet_metadata( #[cfg(feature = "parquet_encryption")] let reader = reader.with_decryption_properties(decryption_properties); - let metadata = reader - .load_and_finish(fetch, file_size) - .await - .map_err(DataFusionError::from)?; + let metadata = Arc::new( + reader + .load_and_finish(fetch, file_size) + .await + .map_err(DataFusionError::from)?, + ); if cache_metadata { if let Some(cache) = file_metadata_cache { cache.put( meta, - Arc::new(CachedParquetMetaData::new(Arc::new(metadata.clone()))), + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), ); } } From 230256ce6411e68e05a44594f0c9cefe254e2343 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Tue, 5 Aug 2025 22:12:55 -0700 Subject: [PATCH 10/19] add cache metadata tests --- .../src/datasource/file_format/parquet.rs | 212 ++++++++++++++++-- .../datasource-parquet/src/file_format.rs | 5 + 2 files changed, 202 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 12146005c15cc..82a079248a985 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -391,6 +391,9 @@ mod tests { ))); let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; + let session = SessionContext::new(); + let ctx = session.state(); + // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata fetch_parquet_metadata( @@ -403,11 +406,47 @@ mod tests { ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 2); - let session = SessionContext::new(); - let ctx = session.state(); + // Increases by 2 because cache has no entries yet + fetch_parquet_metadata( + store.as_ref() as &dyn ObjectStore, + &meta[0], + Some(9), + None, + true, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 4); + + // No increase because cache has an entry + fetch_parquet_metadata( + store.as_ref() as &dyn ObjectStore, + &meta[0], + Some(9), + None, + true, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 4); + + // Increase by 2 because `cache_metadata` is false + fetch_parquet_metadata( + store.as_ref() as &dyn ObjectStore, + &meta[0], + Some(9), + None, + false, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 6); + let force_views = match force_views { ForceViews::Yes => true, ForceViews::No => false, @@ -415,10 +454,33 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) .with_force_view_types(force_views); + let _schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); + // increase by 4, no cache being used. + assert_eq!(store.request_count(), 10); + + let format = format.with_cache_metadata(true); + let _schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); + // increase by 2, partial cache being used. + assert_eq!(store.request_count(), 12); + let _schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); + // no increase, full cache being used. + assert_eq!(store.request_count(), 12); + let format = format.with_cache_metadata(false); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await .unwrap(); + // Increase by 4, no cache being used. + assert_eq!(store.request_count(), 16); let stats = fetch_statistics( store.upcast().as_ref(), @@ -449,15 +511,55 @@ mod tests { &meta[0], Some(size_hint), None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + false, + None, ) .await .expect("error reading metadata with hint"); - // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); + let session = SessionContext::new(); + let ctx = session.state(); + // Increases by 1 because cache has no entries yet and new session context + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + true, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 2); + + // No increase because cache has an entry + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + true, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 2); + + // Increase by 1 because `cache_metadata` is false + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + false, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 3); + let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) .with_force_view_types(force_views); @@ -488,18 +590,29 @@ mod tests { // Use the a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; + fetch_parquet_metadata( + store.upcast().as_ref(), + &meta[0], + Some(size_hint), + None, + false, + None, + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 1); + // No increase because cache has an entry fetch_parquet_metadata( store.upcast().as_ref(), &meta[0], Some(size_hint), None, - format.options().global.cache_metadata, + true, ctx.runtime_env().cache_manager.get_file_metadata_cache(), ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 1); Ok(()) @@ -527,23 +640,62 @@ mod tests { // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: // - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available. - let store = Arc::new(LocalFileSystem::new()) as _; + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); let (files, _file_names) = store_parquet(vec![batch1], false).await?; let state = SessionContext::new().state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + let _schema = format + .infer_schema(&state, &store.upcast(), &files) + .await + .unwrap(); + assert_eq!(store.request_count(), 2); + + let format = format.with_cache_metadata(true); + let _schema = format + .infer_schema(&state, &store.upcast(), &files) + .await + .unwrap(); + // Increase by 2, no cache entries yet. + assert_eq!(store.request_count(), 4); + let _schema = format + .infer_schema(&state, &store.upcast(), &files) + .await + .unwrap(); + // No increase, cache being used. + assert_eq!(store.request_count(), 4); + let format = format.with_cache_metadata(false); + let schema = format + .infer_schema(&state, &store.upcast(), &files) + .await + .unwrap(); + // Increase by 2, no cache being used. + assert_eq!(store.request_count(), 6); // Fetch statistics for first file + let _pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[0], + None, + None, + true, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; + assert_eq!(store.request_count(), 6); + // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[0], None, None, - format.options().global.cache_metadata, + true, state.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -580,7 +732,9 @@ mod tests { // - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values // . batch2 written into second file and includes: // - column c2 that has 3 rows with one null. Stats min and max of int are available and 1 and 2 respectively - let store = Arc::new(LocalFileSystem::new()) as _; + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; let force_views = match force_views { @@ -591,7 +745,11 @@ mod tests { let mut state = SessionContext::new().state(); state = set_view_state(state, force_views); let format = ParquetFormat::default().with_force_view_types(force_views); - let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + let schema = format + .infer_schema(&state, &store.upcast(), &files) + .await + .unwrap(); + assert_eq!(store.request_count(), 4); let null_i64 = ScalarValue::Int64(None); let null_utf8 = if force_views { @@ -601,15 +759,27 @@ mod tests { }; // Fetch statistics for first file + let _pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[0], + None, + None, + true, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; + assert_eq!(store.request_count(), 6); + // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[0], None, None, - format.options().global.cache_metadata, + true, state.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -635,15 +805,27 @@ mod tests { assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); // Fetch statistics for second file + let _pq_meta = fetch_parquet_metadata( + store.as_ref(), + &files[1], + None, + None, + true, + state.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; + assert_eq!(store.request_count(), 8); + // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[1], None, None, - format.options().global.cache_metadata, + true, state.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; + assert_eq!(store.request_count(), 8); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 2716cf4a3fa00..fe68436376e70 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -286,6 +286,11 @@ impl ParquetFormat { self.options.global.coerce_int96 = time_unit; self } + + pub fn with_cache_metadata(mut self, cache_metadata: bool) -> Self { + self.options.global.cache_metadata = cache_metadata; + self + } } /// Clears all metadata (Schema level and field level) on an iterator From 17366f528021e785baacbf7469942eafb66eb922 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Tue, 5 Aug 2025 22:24:18 -0700 Subject: [PATCH 11/19] add parquet cache tests --- .../src/datasource/file_format/parquet.rs | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 82a079248a985..9b54414042394 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -482,6 +482,21 @@ mod tests { // Increase by 4, no cache being used. assert_eq!(store.request_count(), 16); + // increase by 2, no cache being used + let _stats = fetch_statistics( + store.upcast().as_ref(), + schema.clone(), + &meta[0], + Some(9), + None, + format.options().global.cache_metadata, + ctx.runtime_env().cache_manager.get_file_metadata_cache(), + ) + .await?; + assert_eq!(store.request_count(), 18); + + // No increase, cache being used + let format = format.with_cache_metadata(true); let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), @@ -492,6 +507,7 @@ mod tests { ctx.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; + assert_eq!(store.request_count(), 18); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -563,10 +579,25 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) .with_force_view_types(force_views); + let _schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); + // Increase by 2, no cache being used. + assert_eq!(store.request_count(), 5); + let format = format.with_cache_metadata(true); + let _schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); + // increase by 1, partial cache being used. + assert_eq!(store.request_count(), 6); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await .unwrap(); + // no increase, full cache being used. + assert_eq!(store.request_count(), 6); let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), @@ -577,6 +608,8 @@ mod tests { ctx.runtime_env().cache_manager.get_file_metadata_cache(), ) .await?; + // No increase, cache being used + assert_eq!(store.request_count(), 6); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; From 15914ebc077b84080d88450aedb47648974531d2 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 01:21:29 -0700 Subject: [PATCH 12/19] remove cache_metadata option --- .../src/datasource/file_format/parquet.rs | 246 +++++------------- .../core/tests/parquet/custom_reader.rs | 1 - .../datasource-parquet/src/file_format.rs | 47 +--- 3 files changed, 72 insertions(+), 222 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9b54414042394..444ed5698e7dd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -180,8 +180,8 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; @@ -193,7 +193,7 @@ mod tests { ForceViews::No => false, }; let format = ParquetFormat::default().with_force_view_types(force_views); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta).await?; let stats = fetch_statistics( store.as_ref(), @@ -201,8 +201,7 @@ mod tests { &meta[0], None, None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; @@ -218,8 +217,7 @@ mod tests { &meta[1], None, None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); @@ -255,11 +253,9 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); let batch1 = - RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())]) - .unwrap(); + RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])?; let batch2 = - RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())]) - .unwrap(); + RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])?; let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; @@ -267,7 +263,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta).await?; let order: Vec<_> = ["a", "b", "c", "d"] .into_iter() @@ -383,8 +379,8 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; let store = Arc::new(RequestCountingObjectStore::new(Arc::new( LocalFileSystem::new(), @@ -401,7 +397,6 @@ mod tests { &meta[0], Some(9), None, - false, None, ) .await @@ -414,8 +409,7 @@ mod tests { &meta[0], Some(9), None, - true, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await .expect("error reading metadata with hint"); @@ -427,21 +421,19 @@ mod tests { &meta[0], Some(9), None, - true, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 4); - // Increase by 2 because `cache_metadata` is false + // Increase by 2 because `get_file_metadata_cache()` is None fetch_parquet_metadata( store.as_ref() as &dyn ObjectStore, &meta[0], Some(9), None, - false, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + None, ) .await .expect("error reading metadata with hint"); @@ -454,60 +446,24 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) .with_force_view_types(force_views); - let _schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); - // increase by 4, no cache being used. - assert_eq!(store.request_count(), 10); - - let format = format.with_cache_metadata(true); - let _schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); // increase by 2, partial cache being used. - assert_eq!(store.request_count(), 12); - let _schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); + let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 8); // no increase, full cache being used. - assert_eq!(store.request_count(), 12); - let format = format.with_cache_metadata(false); - let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); - // Increase by 4, no cache being used. - assert_eq!(store.request_count(), 16); - - // increase by 2, no cache being used - let _stats = fetch_statistics( - store.upcast().as_ref(), - schema.clone(), - &meta[0], - Some(9), - None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), - ) - .await?; - assert_eq!(store.request_count(), 18); + let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 8); // No increase, cache being used - let format = format.with_cache_metadata(true); let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), &meta[0], Some(9), None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 18); + assert_eq!(store.request_count(), 8); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -527,7 +483,6 @@ mod tests { &meta[0], Some(size_hint), None, - false, None, ) .await @@ -543,8 +498,7 @@ mod tests { &meta[0], Some(size_hint), None, - true, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await .expect("error reading metadata with hint"); @@ -556,21 +510,19 @@ mod tests { &meta[0], Some(size_hint), None, - true, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 2); - // Increase by 1 because `cache_metadata` is false + // Increase by 1 because `get_file_metadata_cache` is None fetch_parquet_metadata( store.upcast().as_ref(), &meta[0], Some(size_hint), None, - false, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + None, ) .await .expect("error reading metadata with hint"); @@ -579,37 +531,23 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) .with_force_view_types(force_views); - let _schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); - // Increase by 2, no cache being used. - assert_eq!(store.request_count(), 5); - let format = format.with_cache_metadata(true); - let _schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); // increase by 1, partial cache being used. - assert_eq!(store.request_count(), 6); - let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); + let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 4); // no increase, full cache being used. - assert_eq!(store.request_count(), 6); + let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 4); + // No increase, cache being used let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), &meta[0], Some(size_hint), None, - format.options().global.cache_metadata, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - // No increase, cache being used - assert_eq!(store.request_count(), 6); + assert_eq!(store.request_count(), 4); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -628,7 +566,6 @@ mod tests { &meta[0], Some(size_hint), None, - false, None, ) .await @@ -641,8 +578,7 @@ mod tests { &meta[0], Some(size_hint), None, - true, - ctx.runtime_env().cache_manager.get_file_metadata_cache(), + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await .expect("error reading metadata with hint"); @@ -664,11 +600,10 @@ mod tests { // Data for column c_dic: ["a", "b", "c", "d"] let values = StringArray::from_iter_values(["a", "b", "c", "d"]); let keys = Int32Array::from_iter_values([0, 1, 2, 3]); - let dic_array = - DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let dic_array = DictionaryArray::::try_new(keys, Arc::new(values))?; let c_dic: ArrayRef = Arc::new(dic_array); - let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)])?; // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: @@ -680,55 +615,22 @@ mod tests { let state = SessionContext::new().state(); let format = ParquetFormat::default(); - let _schema = format - .infer_schema(&state, &store.upcast(), &files) - .await - .unwrap(); + let _schema = format.infer_schema(&state, &store.upcast(), &files).await?; assert_eq!(store.request_count(), 2); - - let format = format.with_cache_metadata(true); - let _schema = format - .infer_schema(&state, &store.upcast(), &files) - .await - .unwrap(); - // Increase by 2, no cache entries yet. - assert_eq!(store.request_count(), 4); - let _schema = format - .infer_schema(&state, &store.upcast(), &files) - .await - .unwrap(); // No increase, cache being used. - assert_eq!(store.request_count(), 4); - let format = format.with_cache_metadata(false); - let schema = format - .infer_schema(&state, &store.upcast(), &files) - .await - .unwrap(); - // Increase by 2, no cache being used. - assert_eq!(store.request_count(), 6); + let schema = format.infer_schema(&state, &store.upcast(), &files).await?; + assert_eq!(store.request_count(), 2); - // Fetch statistics for first file - let _pq_meta = fetch_parquet_metadata( - store.as_ref(), - &files[0], - None, - None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), - ) - .await?; - assert_eq!(store.request_count(), 6); // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[0], None, None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 6); + assert_eq!(store.request_count(), 2); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -754,11 +656,11 @@ mod tests { // Data for column c1: ["Foo", null, "bar"] let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; // Data for column c2: [1, 2, null] let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: @@ -778,10 +680,7 @@ mod tests { let mut state = SessionContext::new().state(); state = set_view_state(state, force_views); let format = ParquetFormat::default().with_force_view_types(force_views); - let schema = format - .infer_schema(&state, &store.upcast(), &files) - .await - .unwrap(); + let schema = format.infer_schema(&state, &store.upcast(), &files).await?; assert_eq!(store.request_count(), 4); let null_i64 = ScalarValue::Int64(None); @@ -791,28 +690,16 @@ mod tests { Utf8(None) }; - // Fetch statistics for first file - let _pq_meta = fetch_parquet_metadata( - store.as_ref(), - &files[0], - None, - None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), - ) - .await?; - assert_eq!(store.request_count(), 6); // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[0], None, None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 6); + assert_eq!(store.request_count(), 4); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -837,28 +724,16 @@ mod tests { assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone())); assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); - // Fetch statistics for second file - let _pq_meta = fetch_parquet_metadata( - store.as_ref(), - &files[1], - None, - None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), - ) - .await?; - assert_eq!(store.request_count(), 8); // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( store.as_ref(), &files[1], None, None, - true, - state.runtime_env().cache_manager.get_file_metadata_cache(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 8); + assert_eq!(store.request_count(), 4); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null @@ -1289,22 +1164,20 @@ mod tests { async fn test_read_parquet_page_index() -> Result<()> { let testdata = datafusion_common::test_util::parquet_test_data(); let path = format!("{testdata}/alltypes_tiny_pages.parquet"); - let file = File::open(path).await.unwrap(); + let file = File::open(path).await?; let options = ArrowReaderOptions::new().with_page_index(true); let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) - .await - .unwrap() + .await? .metadata() .clone(); check_page_index_validation(builder.column_index(), builder.offset_index()); let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); - let file = File::open(path).await.unwrap(); + let file = File::open(path).await?; let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) - .await - .unwrap() + .await? .metadata() .clone(); check_page_index_validation(builder.column_index(), builder.offset_index()); @@ -1386,7 +1259,7 @@ mod tests { /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_empty_parquet() -> Result<()> { - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); File::create(&path).await?; @@ -1410,12 +1283,10 @@ mod tests { /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_partitioned_empty_parquet() -> Result<()> { - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let partition_dir = tmp_dir.path().join("col1=a"); - std::fs::create_dir(&partition_dir).unwrap(); - File::create(partition_dir.join("empty.parquet")) - .await - .unwrap(); + std::fs::create_dir(&partition_dir)?; + File::create(partition_dir.join("empty.parquet")).await?; let ctx = SessionContext::new(); @@ -1541,7 +1412,7 @@ mod tests { ) .expect("Failed to create empty RecordBatch"); - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let path = format!("{}/empty2.parquet", tmp_dir.path().to_string_lossy()); let ctx = SessionContext::new(); @@ -1736,7 +1607,7 @@ mod tests { // create data let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // write stream FileSink::write_all( @@ -1815,7 +1686,7 @@ mod tests { // create data with 2 partitions let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // write stream FileSink::write_all( @@ -1908,8 +1779,7 @@ mod tests { // create data let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = - RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // create task context let task_context = build_ctx(object_store_url.as_ref()); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index e7beb363393ac..42778d725fe8c 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -242,7 +242,6 @@ impl AsyncFileReader for ParquetFileReader { &self.meta, self.metadata_size_hint, None, - false, None, ) .await diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index e17844fb271ee..1e6cab7945e43 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -286,11 +286,6 @@ impl ParquetFormat { self.options.global.coerce_int96 = time_unit; self } - - pub fn with_cache_metadata(mut self, cache_metadata: bool) -> Self { - self.options.global.cache_metadata = cache_metadata; - self - } } /// Clears all metadata (Schema level and field level) on an iterator @@ -316,7 +311,6 @@ async fn fetch_schema_with_location( metadata_size_hint: Option, file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, - cache_metadata: bool, file_metadata_cache: Option>, ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); @@ -326,7 +320,6 @@ async fn fetch_schema_with_location( metadata_size_hint, file_decryption_properties, coerce_int96, - cache_metadata, file_metadata_cache, ) .await?; @@ -381,8 +374,7 @@ impl FileFormat for ParquetFormat { self.metadata_size_hint(), file_decryption_properties.as_ref(), coerce_int96, - self.options.global.cache_metadata, - state.runtime_env().cache_manager.get_file_metadata_cache(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -441,8 +433,7 @@ impl FileFormat for ParquetFormat { object, self.metadata_size_hint(), file_decryption_properties.as_ref(), - self.options.global.cache_metadata, - state.runtime_env().cache_manager.get_file_metadata_cache(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; Ok(stats) @@ -982,19 +973,15 @@ pub async fn fetch_parquet_metadata( meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, - cache_metadata: bool, file_metadata_cache: Option>, ) -> Result> { - // Check cache first if caching is enabled - if cache_metadata { - if let Some(cache) = &file_metadata_cache { - if let Some(cached_metadata) = cache.get(meta) { - if let Some(parquet_metadata) = cached_metadata - .as_any() - .downcast_ref::() - { - return Ok(Arc::clone(parquet_metadata.parquet_metadata())); - } + if let Some(cache) = &file_metadata_cache { + if let Some(cached_metadata) = cache.get(meta) { + if let Some(parquet_metadata) = cached_metadata + .as_any() + .downcast_ref::() + { + return Ok(Arc::clone(parquet_metadata.parquet_metadata())); } } } @@ -1013,13 +1000,11 @@ pub async fn fetch_parquet_metadata( .map_err(DataFusionError::from)?, ); - if cache_metadata { - if let Some(cache) = file_metadata_cache { - cache.put( - meta, - Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), - ); - } + if let Some(cache) = file_metadata_cache { + cache.put( + meta, + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ); } Ok(metadata) @@ -1032,7 +1017,6 @@ async fn fetch_schema( metadata_size_hint: Option, file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, - cache_metadata: bool, file_metadata_cache: Option>, ) -> Result { let metadata = fetch_parquet_metadata( @@ -1040,7 +1024,6 @@ async fn fetch_schema( file, metadata_size_hint, file_decryption_properties, - cache_metadata, file_metadata_cache, ) .await?; @@ -1066,7 +1049,6 @@ pub async fn fetch_statistics( file: &ObjectMeta, metadata_size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, - cache_metadata: bool, file_metadata_cache: Option>, ) -> Result { let metadata = fetch_parquet_metadata( @@ -1074,7 +1056,6 @@ pub async fn fetch_statistics( file, metadata_size_hint, decryption_properties, - cache_metadata, file_metadata_cache, ) .await?; From a6d0bc0864a1ae58abcb42b1c3c9d210ed175274 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 02:10:22 -0700 Subject: [PATCH 13/19] parquet metadata cache impacts test --- datafusion/core/tests/parquet/page_pruning.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index f851c199eb4d6..5b37c55c09e41 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -903,8 +903,8 @@ async fn without_pushdown_filter() { ) .unwrap(); - // Same amount of bytes are scanned when defaulting to cache parquet metadata - assert!(bytes_scanned_with_filter == bytes_scanned_without_filter); + // Without filter will not read pageIndex. + assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); } #[tokio::test] From f3ad6f17dd2d3aca45a2f0433e79e1e556c32893 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 18:45:30 -0700 Subject: [PATCH 14/19] add comment for reader --- datafusion/datasource-parquet/src/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index f04af0bcd6dd5..ad59e7261cba3 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -24,7 +24,7 @@ pub mod file_format; mod metrics; mod opener; mod page_filter; -pub mod reader; +mod reader; mod row_filter; mod row_group_filter; pub mod source; @@ -34,7 +34,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; From 5b4129a8169c1bbf8892e9746cbffa8e0560be8f Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 19:41:55 -0700 Subject: [PATCH 15/19] refactor fetch parquet metadata --- .../src/datasource/file_format/parquet.rs | 28 ++++----- .../core/tests/parquet/custom_reader.rs | 4 +- .../datasource-parquet/src/file_format.rs | 57 ++++++++++++------- datafusion/datasource-parquet/src/reader.rs | 44 ++++++-------- 4 files changed, 70 insertions(+), 63 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 444ed5698e7dd..f4094a366fde2 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -134,7 +134,7 @@ mod tests { use datafusion_datasource::{ListingTableUrl, PartitionedFile}; use datafusion_datasource_parquet::{ fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc, - ParquetFormat, ParquetFormatFactory, ParquetSink, + ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink, }; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -393,7 +393,7 @@ mod tests { // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata fetch_parquet_metadata( - store.as_ref() as &dyn ObjectStore, + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], Some(9), None, @@ -405,7 +405,7 @@ mod tests { // Increases by 2 because cache has no entries yet fetch_parquet_metadata( - store.as_ref() as &dyn ObjectStore, + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], Some(9), None, @@ -417,7 +417,7 @@ mod tests { // No increase because cache has an entry fetch_parquet_metadata( - store.as_ref() as &dyn ObjectStore, + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], Some(9), None, @@ -429,7 +429,7 @@ mod tests { // Increase by 2 because `get_file_metadata_cache()` is None fetch_parquet_metadata( - store.as_ref() as &dyn ObjectStore, + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], Some(9), None, @@ -479,7 +479,7 @@ mod tests { let size_hint = meta[0].size as usize; fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -494,7 +494,7 @@ mod tests { let ctx = session.state(); // Increases by 1 because cache has no entries yet and new session context fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -506,7 +506,7 @@ mod tests { // No increase because cache has an entry fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -518,7 +518,7 @@ mod tests { // Increase by 1 because `get_file_metadata_cache` is None fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -562,7 +562,7 @@ mod tests { // Use the a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -574,7 +574,7 @@ mod tests { // No increase because cache has an entry fetch_parquet_metadata( - store.upcast().as_ref(), + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), &meta[0], Some(size_hint), None, @@ -623,7 +623,7 @@ mod tests { // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( - store.as_ref(), + ObjectStoreFetch::new(store.as_ref(), &files[0]), &files[0], None, None, @@ -692,7 +692,7 @@ mod tests { // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( - store.as_ref(), + ObjectStoreFetch::new(store.as_ref(), &files[0]), &files[0], None, None, @@ -726,7 +726,7 @@ mod tests { // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( - store.as_ref(), + ObjectStoreFetch::new(store.as_ref(), &files[1]), &files[1], None, None, diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 42778d725fe8c..f7e48fa9cb910 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -38,6 +38,7 @@ use datafusion_common::Result; use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource_parquet::ObjectStoreFetch; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -237,8 +238,9 @@ impl AsyncFileReader for ParquetFileReader { _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { + let fetch = ObjectStoreFetch::new(self.store.as_ref(), &self.meta); let metadata = fetch_parquet_metadata( - self.store.as_ref(), + fetch, &self.meta, self.metadata_size_hint, None, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 4fb25a040eebe..7dcaa2c3c93a1 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1009,13 +1009,13 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema { } /// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`] -struct ObjectStoreFetch<'a> { +pub struct ObjectStoreFetch<'a> { store: &'a dyn ObjectStore, meta: &'a ObjectMeta, } impl<'a> ObjectStoreFetch<'a> { - fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self { + pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self { Self { store, meta } } } @@ -1038,30 +1038,45 @@ impl MetadataFetch for ObjectStoreFetch<'_> { /// through [`ParquetFileReaderFactory`]. /// /// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory -pub async fn fetch_parquet_metadata( - store: &dyn ObjectStore, - meta: &ObjectMeta, +pub async fn fetch_parquet_metadata( + fetch: F, + object_meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result> { - if let Some(cache) = &file_metadata_cache { - if let Some(cached_metadata) = cache.get(meta) { - if let Some(parquet_metadata) = cached_metadata + if let Some(file_metadata_cache) = &file_metadata_cache { + if let Some(file_metadata) = file_metadata_cache.get(object_meta) { + if let Some(cached_parquet_metadata) = file_metadata .as_any() - .downcast_ref::() - { - return Ok(Arc::clone(parquet_metadata.parquet_metadata())); + .downcast_ref::( + ) { + return Ok(Arc::clone(cached_parquet_metadata.parquet_metadata())); } } } - let file_size = meta.size; - let fetch = ObjectStoreFetch::new(store, meta); + let file_size = object_meta.size; + let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); - let reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); #[cfg(feature = "parquet_encryption")] - let reader = reader.with_decryption_properties(decryption_properties); + { + if let Some(props) = decryption_properties { + reader = reader.with_decryption_properties(Some(props)); + } else if file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); + } + } + + #[cfg(not(feature = "parquet_encryption"))] + { + // Need to retrieve the entire metadata for the caching to be effective. + if file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); + } + } let metadata = Arc::new( reader @@ -1070,9 +1085,9 @@ pub async fn fetch_parquet_metadata( .map_err(DataFusionError::from)?, ); - if let Some(cache) = file_metadata_cache { - cache.put( - meta, + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put( + object_meta, Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), ); } @@ -1089,8 +1104,9 @@ async fn fetch_schema( coerce_int96: Option, file_metadata_cache: Option>, ) -> Result { + let fetch = ObjectStoreFetch::new(store, file); let metadata = fetch_parquet_metadata( - store, + fetch, file, metadata_size_hint, file_decryption_properties, @@ -1121,8 +1137,9 @@ pub async fn fetch_statistics( decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result { + let fetch = ObjectStoreFetch::new(store, file); let metadata = fetch_parquet_metadata( - store, + fetch, file, metadata_size_hint, decryption_properties, diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 8584a0215e065..e61586019d68a 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -18,7 +18,7 @@ //! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for //! low level control of parquet file readers -use crate::ParquetFileMetrics; +use crate::{fetch_parquet_metadata, ParquetFileMetrics}; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; @@ -28,7 +28,7 @@ use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::metadata::ParquetMetaData; use std::any::Any; use std::fmt::Debug; use std::ops::Range; @@ -250,32 +250,20 @@ impl AsyncFileReader for CachedParquetFileReader { async move { let object_meta = &file_meta.object_meta; - - // lookup if the metadata is already cached - if let Some(metadata) = metadata_cache.get(object_meta) { - if let Some(cached_parquet_metadata) = - metadata.as_any().downcast_ref::() - { - return Ok(Arc::clone(cached_parquet_metadata.parquet_metadata())); - } - } - - let mut reader = ParquetMetaDataReader::new(); - // the page index can only be loaded with unencrypted files - if let Some(file_decryption_properties) = - options.and_then(|o| o.file_decryption_properties()) - { - reader = - reader.with_decryption_properties(Some(file_decryption_properties)); - } else { - reader = reader.with_page_indexes(true); - } - reader.try_load(&mut self.inner, object_meta.size).await?; - let metadata = Arc::new(reader.finish()?); - let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); - - metadata_cache.put(object_meta, cached_metadata); - Ok(metadata) + fetch_parquet_metadata( + &mut self.inner, + object_meta, + None, + options.and_then(|o| o.file_decryption_properties()), + Some(metadata_cache), + ) + .await + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch metadata for file {}: {e}", + object_meta.location, + )) + }) } .boxed() } From c39b29f95951eee8d1651e49795681b286e5e2f5 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 20:20:14 -0700 Subject: [PATCH 16/19] parquet metadata cleanup --- datafusion/core/tests/parquet/page_pruning.rs | 4 ++-- datafusion/datasource-parquet/src/file_format.rs | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 5b37c55c09e41..27bee10234b57 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -903,8 +903,8 @@ async fn without_pushdown_filter() { ) .unwrap(); - // Without filter will not read pageIndex. - assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); + // Same amount of bytes are scanned when defaulting to cache parquet metadata + assert_eq!(bytes_scanned_with_filter, bytes_scanned_without_filter); } #[tokio::test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 7dcaa2c3c93a1..f67fde35cbc84 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -80,7 +80,7 @@ use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions, }; -use parquet::arrow::async_reader::MetadataFetch; +use parquet::arrow::async_reader::{MetadataFetch, ParquetObjectReader}; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; @@ -1061,9 +1061,8 @@ pub async fn fetch_parquet_metadata( #[cfg(feature = "parquet_encryption")] { - if let Some(props) = decryption_properties { - reader = reader.with_decryption_properties(Some(props)); - } else if file_metadata_cache.is_some() { + reader = reader.with_decryption_properties(decryption_properties); + if file_metadata_cache.is_some() { // Need to retrieve the entire metadata for the caching to be effective. reader = reader.with_page_indexes(true); } @@ -1071,7 +1070,6 @@ pub async fn fetch_parquet_metadata( #[cfg(not(feature = "parquet_encryption"))] { - // Need to retrieve the entire metadata for the caching to be effective. if file_metadata_cache.is_some() { // Need to retrieve the entire metadata for the caching to be effective. reader = reader.with_page_indexes(true); From 2313aed8137a35faf62cb30ad3d993f84a5260f7 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 20:26:18 -0700 Subject: [PATCH 17/19] parquet metadata cleanup --- datafusion/datasource-parquet/src/file_format.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index f67fde35cbc84..dc4d1b4315920 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -80,7 +80,7 @@ use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions, }; -use parquet::arrow::async_reader::{MetadataFetch, ParquetObjectReader}; +use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; @@ -1062,18 +1062,11 @@ pub async fn fetch_parquet_metadata( #[cfg(feature = "parquet_encryption")] { reader = reader.with_decryption_properties(decryption_properties); - if file_metadata_cache.is_some() { - // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_indexes(true); - } } - #[cfg(not(feature = "parquet_encryption"))] - { - if file_metadata_cache.is_some() { - // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_indexes(true); - } + if file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); } let metadata = Arc::new( From 3bb321b33c3d6a67f795bdb386c2e32923dad29e Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 21:24:26 -0700 Subject: [PATCH 18/19] refactor parquet metadata cache --- .../src/datasource/file_format/parquet.rs | 34 +++++++------- .../datasource-parquet/src/file_format.rs | 46 +++++++++++-------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f4094a366fde2..b7d66e4f27898 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -403,7 +403,7 @@ mod tests { .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 2); - // Increases by 2 because cache has no entries yet + // Increases by 3 because cache has no entries yet fetch_parquet_metadata( ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], @@ -413,7 +413,7 @@ mod tests { ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 4); + assert_eq!(store.request_count(), 5); // No increase because cache has an entry fetch_parquet_metadata( @@ -425,7 +425,7 @@ mod tests { ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 4); + assert_eq!(store.request_count(), 5); // Increase by 2 because `get_file_metadata_cache()` is None fetch_parquet_metadata( @@ -437,7 +437,7 @@ mod tests { ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 6); + assert_eq!(store.request_count(), 7); let force_views = match force_views { ForceViews::Yes => true, @@ -446,12 +446,12 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) .with_force_view_types(force_views); - // increase by 2, partial cache being used. + // Increase by 3, partial cache being used. let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; - assert_eq!(store.request_count(), 8); - // no increase, full cache being used. + assert_eq!(store.request_count(), 10); + // No increase, full cache being used. let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; - assert_eq!(store.request_count(), 8); + assert_eq!(store.request_count(), 10); // No increase, cache being used let stats = fetch_statistics( @@ -463,7 +463,7 @@ mod tests { Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 8); + assert_eq!(store.request_count(), 10); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -531,10 +531,10 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) .with_force_view_types(force_views); - // increase by 1, partial cache being used. + // Increase by 1, partial cache being used. let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; assert_eq!(store.request_count(), 4); - // no increase, full cache being used. + // No increase, full cache being used. let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; assert_eq!(store.request_count(), 4); // No increase, cache being used @@ -616,10 +616,10 @@ mod tests { let state = SessionContext::new().state(); let format = ParquetFormat::default(); let _schema = format.infer_schema(&state, &store.upcast(), &files).await?; - assert_eq!(store.request_count(), 2); + assert_eq!(store.request_count(), 3); // No increase, cache being used. let schema = format.infer_schema(&state, &store.upcast(), &files).await?; - assert_eq!(store.request_count(), 2); + assert_eq!(store.request_count(), 3); // No increase in request count because cache is not empty let pq_meta = fetch_parquet_metadata( @@ -630,7 +630,7 @@ mod tests { Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 2); + assert_eq!(store.request_count(), 3); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -681,7 +681,7 @@ mod tests { state = set_view_state(state, force_views); let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&state, &store.upcast(), &files).await?; - assert_eq!(store.request_count(), 4); + assert_eq!(store.request_count(), 6); let null_i64 = ScalarValue::Int64(None); let null_utf8 = if force_views { @@ -699,7 +699,7 @@ mod tests { Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 4); + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -733,7 +733,7 @@ mod tests { Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; - assert_eq!(store.request_count(), 4); + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index dc4d1b4315920..ab4d84ee368e4 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1045,42 +1045,52 @@ pub async fn fetch_parquet_metadata( #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result> { - if let Some(file_metadata_cache) = &file_metadata_cache { - if let Some(file_metadata) = file_metadata_cache.get(object_meta) { - if let Some(cached_parquet_metadata) = file_metadata - .as_any() - .downcast_ref::( - ) { - return Ok(Arc::clone(cached_parquet_metadata.parquet_metadata())); - } + let cache_metadata = + !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); + + if cache_metadata { + if let Some(parquet_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::() + .map(|cached_parquet_metadata| { + Arc::clone(cached_parquet_metadata.parquet_metadata()) + }) + }) + { + return Ok(parquet_metadata); } } - let file_size = object_meta.size; let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); #[cfg(feature = "parquet_encryption")] - { - reader = reader.with_decryption_properties(decryption_properties); + if let Some(decryption_properties) = decryption_properties { + reader = reader.with_decryption_properties(Some(decryption_properties)); } - if file_metadata_cache.is_some() { + if cache_metadata && file_metadata_cache.is_some() { // Need to retrieve the entire metadata for the caching to be effective. reader = reader.with_page_indexes(true); } let metadata = Arc::new( reader - .load_and_finish(fetch, file_size) + .load_and_finish(fetch, object_meta.size) .await .map_err(DataFusionError::from)?, ); - if let Some(file_metadata_cache) = file_metadata_cache { - file_metadata_cache.put( - object_meta, - Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), - ); + if cache_metadata { + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put( + object_meta, + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ); + } } Ok(metadata) From 7d9128b884a4d7324c7dd4e2d869cd130199f7c8 Mon Sep 17 00:00:00 2001 From: Shehab <11789402+shehabgamin@users.noreply.github.com> Date: Fri, 8 Aug 2025 21:48:15 -0700 Subject: [PATCH 19/19] feature flag --- datafusion/datasource-parquet/src/reader.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index e61586019d68a..df375818689ca 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -249,19 +249,25 @@ impl AsyncFileReader for CachedParquetFileReader { let metadata_cache = Arc::clone(&self.metadata_cache); async move { - let object_meta = &file_meta.object_meta; + #[cfg(feature = "parquet_encryption")] + let file_decryption_properties = + options.and_then(|o| o.file_decryption_properties()); + + #[cfg(not(feature = "parquet_encryption"))] + let file_decryption_properties = None; + fetch_parquet_metadata( &mut self.inner, - object_meta, + &file_meta.object_meta, None, - options.and_then(|o| o.file_decryption_properties()), + file_decryption_properties, Some(metadata_cache), ) .await .map_err(|e| { parquet::errors::ParquetError::General(format!( "Failed to fetch metadata for file {}: {e}", - object_meta.location, + file_meta.object_meta.location, )) }) }