From 1452333cf0933d4d8da032af68bc5a3a05c62483 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Tue, 29 Jul 2025 19:03:35 +0100 Subject: [PATCH 1/4] feat: Cache Parquet metadata --- datafusion/common/src/config.rs | 6 + .../common/src/file_options/parquet_writer.rs | 3 + .../src/datasource/file_format/options.rs | 15 ++ .../datasource-parquet/src/file_format.rs | 18 ++- datafusion/datasource-parquet/src/reader.rs | 131 +++++++++++++++++- .../execution/src/cache/cache_manager.rs | 37 +++++ datafusion/execution/src/cache/cache_unit.rs | 121 +++++++++++++++- datafusion/execution/src/runtime_env.rs | 1 + .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 18 +++ .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/file_formats.rs | 2 + .../test_files/information_schema.slt | 2 + .../sqllogictest/test_files/parquet.slt | 119 ++++++++++++++++ docs/source/user-guide/configs.md | 1 + 18 files changed, 480 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5796edc283e01..3bcb0839b5a2b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -549,6 +549,12 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) Whether or not to enable the caching of embedded metadata of Parquet files + /// (footer and page metadata). Enabling it can offer substantial performance improvements + /// for repeated queries over large files. By default, the cache is automatically + /// invalidated when the underlying file is modified. + pub cache_metadata: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index cde0ea1299795..9ea2b6af82ec6 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -245,6 +245,7 @@ impl ParquetOptions { binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props skip_arrow_metadata: _, + cache_metadata: _, } = self; let mut builder = WriterProperties::builder() @@ -522,6 +523,7 @@ mod tests { binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: defaults.cache_metadata, } } @@ -634,6 +636,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: global_options_defaults.cache_metadata, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 02b792823a827..459e92a7a9768 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> { pub file_sort_order: Vec>, /// Properties for decryption of Parquet files that use modular encryption pub file_decryption_properties: Option, + /// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and + /// page metadata). Enabling it can offer substantial performance improvements for repeated + /// queries over large files. By default, the cache is automatically invalidated when the + /// underlying file is modified. + pub cache_metadata: Option, } impl Default for ParquetReadOptions<'_> { @@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> { schema: None, file_sort_order: vec![], file_decryption_properties: None, + cache_metadata: None, } } } @@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> { self.file_decryption_properties = Some(file_decryption_properties); self } + + /// Specify whether to enable or not metadata caching + pub fn cache_metadata(mut self, cache_metadata: bool) -> Self { + self.cache_metadata = Some(cache_metadata); + self + } } /// Options that control the reading of ARROW files. @@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { if let Some(file_decryption_properties) = &self.file_decryption_properties { options.crypto.file_decryption = Some(file_decryption_properties.clone()); } + if let Some(cache_metadata) = self.cache_metadata { + options.global.cache_metadata = cache_metadata; + } let mut file_format = ParquetFormat::new().with_options(options); if let Some(parquet_pruning) = self.parquet_pruning { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 43b0886193e74..7210cc09a0b39 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::reader::CachedParquetFileReaderFactory; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result> { let mut metadata_size_hint = None; @@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat { let mut source = ParquetSource::new(self.options.clone()); + // Use the CachedParquetFileReaderFactory when metadata caching is enabled + if self.options.global.cache_metadata { + if let Some(metadata_cache) = + state.runtime_env().cache_manager.get_file_metadata_cache() + { + let store = state + .runtime_env() + .object_store(conf.object_store_url.clone())?; + let cached_parquet_read_factory = + Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + source = + source.with_parquet_file_reader_factory(cached_parquet_read_factory); + } + } + if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 27ec843c1991d..bd9a3bc1e8045 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,12 +21,14 @@ use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; +use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is +/// encrypted), even if not required by the current query, to ensure it is always available for +/// those that need it. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc, + metadata_cache: FileMetadataCache, +} + +impl CachedParquetFileReaderFactory { + pub fn new(store: Arc, metadata_cache: FileMetadataCache) -> Self { + Self { + store, + metadata_cache, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + + let mut inner = + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) + .with_file_size(file_meta.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(CachedParquetFileReader { + inner, + file_metrics, + file_meta, + metadata_cache: Arc::clone(&self.metadata_cache), + })) + } +} + +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then +/// updates the cache. +pub(crate) struct CachedParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, + file_meta: FileMeta, + metadata_cache: FileMetadataCache, +} + +impl AsyncFileReader for CachedParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let file_meta = self.file_meta.clone(); + let metadata_cache = Arc::clone(&self.metadata_cache); + + async move { + let object_meta = &file_meta.object_meta; + + // lookup if the metadata is already cached + if let Some(metadata) = + metadata_cache.get_with_extra(&object_meta.location, object_meta) + { + if let Ok(parquet_metadata) = Arc::downcast::(metadata) { + return Ok(Arc::clone(&parquet_metadata)); + } + } + + let mut reader = ParquetMetaDataReader::new(); + // the page index can only be loaded with unencrypted files + if let Some(file_decryption_properties) = + options.and_then(|o| o.file_decryption_properties()) + { + reader = + reader.with_decryption_properties(Some(file_decryption_properties)); + } else { + reader = reader.with_page_indexes(true); + } + reader.try_load(&mut self.inner, object_meta.size).await?; + let metadata = Arc::new(reader.finish()?); + + metadata_cache.put_with_extra( + &object_meta.location, + Arc::clone(&metadata) as Arc, + object_meta, + ); + Ok(metadata) + } + .boxed() + } +} diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c2403e34c6657..8f86a593e8932 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::CacheAccessor; use datafusion_common::{Result, Statistics}; use object_store::path::Path; use object_store::ObjectMeta; +use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -32,6 +34,13 @@ pub type FileStatisticsCache = pub type ListFilesCache = Arc>, Extra = ObjectMeta>>; +/// Represents generic file-embedded metadata. +pub type FileMetadata = dyn Any + Send + Sync; + +/// Cache to store file-embedded metadata. +pub type FileMetadataCache = + Arc, Extra = ObjectMeta>>; + impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) @@ -44,10 +53,17 @@ impl Debug for dyn CacheAccessor>, Extra = ObjectMeta> } } +impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + #[derive(Default, Debug)] pub struct CacheManager { file_statistic_cache: Option, list_files_cache: Option, + file_metadata_cache: Option, } impl CacheManager { @@ -59,6 +75,13 @@ impl CacheManager { if let Some(lc) = &config.list_files_cache { manager.list_files_cache = Some(Arc::clone(lc)) } + if let Some(mc) = &config.file_metadata_cache { + manager.file_metadata_cache = Some(Arc::clone(mc)); + } else { + manager.file_metadata_cache = + Some(Arc::new(DefaultFilesMetadataCache::default())); + } + Ok(Arc::new(manager)) } @@ -71,6 +94,11 @@ impl CacheManager { pub fn get_list_files_cache(&self) -> Option { self.list_files_cache.clone() } + + /// Get the file embedded metadata cache. + pub fn get_file_metadata_cache(&self) -> Option { + self.file_metadata_cache.clone() + } } #[derive(Clone, Default)] @@ -86,6 +114,10 @@ pub struct CacheManagerConfig { /// location. /// Default is disable. pub list_files_cache: Option, + /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a + /// data file (e.g., Parquet footer and page metadata). + /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. + pub file_metadata_cache: Option, } impl CacheManagerConfig { @@ -101,4 +133,9 @@ impl CacheManagerConfig { self.list_files_cache = cache; self } + + pub fn with_file_metadata_cache(mut self, cache: Option) -> Self { + self.file_metadata_cache = cache; + self + } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index a9291659a3efa..38d49b306832b 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::cache::cache_manager::FileMetadata; use crate::cache::CacheAccessor; use datafusion_common::Statistics; @@ -157,9 +158,79 @@ impl CacheAccessor>> for DefaultListFilesCache { } } +/// Collected file embedded metadata cache. +/// The metadata for some file is invalided when the file size or last modification time have been +/// changed. +#[derive(Default)] +pub struct DefaultFilesMetadataCache { + metadata: DashMap)>, +} + +impl CacheAccessor> for DefaultFilesMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, _k: &Path) -> Option> { + panic!("get in DefaultFilesMetadataCache is not supported, please use get_with_extra") + } + + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + self.metadata + .get(k) + .map(|s| { + let (extra, metadata) = s.value(); + if extra.size != e.size || extra.last_modified != e.last_modified { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + fn put(&self, _key: &Path, _value: Arc) -> Option> { + panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc, + e: &Self::Extra, + ) -> Option> { + self.metadata + .insert(key.clone(), (e.clone(), value)) + .map(|x| x.1) + } + + fn remove(&mut self, k: &Path) -> Option> { + self.metadata.remove(k).map(|x| x.1 .1) + } + + fn contains_key(&self, k: &Path) -> bool { + self.metadata.contains_key(k) + } + + fn len(&self) -> usize { + self.metadata.len() + } + + fn clear(&self) { + self.metadata.clear() + } + + fn name(&self) -> String { + "DefaultFilesMetadataCache".to_string() + } +} + #[cfg(test)] mod tests { - use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; + use std::sync::Arc; + + use crate::cache::cache_manager::FileMetadata; + use crate::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, + }; use crate::cache::CacheAccessor; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; @@ -232,4 +303,52 @@ mod tests { meta.clone() ); } + + #[test] + fn test_file_metadata_cache() { + let object_meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + let metadata: Arc = Arc::new("retrieved_metadata".to_owned()); + + let cache = DefaultFilesMetadataCache::default(); + assert!(cache + .get_with_extra(&object_meta.location, &object_meta) + .is_none()); + + cache.put_with_extra(&object_meta.location, metadata, &object_meta); + assert!(cache + .get_with_extra(&object_meta.location, &object_meta) + .is_some()); + + // file size changed + let mut object_meta2 = object_meta.clone(); + object_meta2.size = 2048; + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + + // file last_modified changed + let mut object_meta2 = object_meta.clone(); + object_meta2.last_modified = + DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") + .unwrap() + .into(); + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + + // different file + let mut object_meta2 = object_meta; + object_meta2.location = Path::from("test2"); + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index b086430a4ef71..4c75a53e9fba7 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -299,6 +299,7 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), }; Self { diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 8cb2726058997..ffdc29e429e8f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -504,6 +504,7 @@ message ParquetOptions { bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false bool skip_arrow_metadata = 30; // default = false + bool cache_metadata = 33; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 0823e150268de..98df86a21f534 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -988,6 +988,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index f35fd15946958..89e85b0dc8f1c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5066,6 +5066,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { len += 1; } + if self.cache_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5168,6 +5171,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; } + if self.cache_metadata { + struct_ser.serialize_field("cacheMetadata", &self.cache_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5314,6 +5320,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "binaryAsString", "skip_arrow_metadata", "skipArrowMetadata", + "cache_metadata", + "cacheMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5362,6 +5370,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SchemaForceViewTypes, BinaryAsString, SkipArrowMetadata, + CacheMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5414,6 +5423,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), + "cacheMetadata" | "cache_metadata" => Ok(GeneratedField::CacheMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5464,6 +5474,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; let mut skip_arrow_metadata__ = None; + let mut cache_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5585,6 +5596,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } skip_arrow_metadata__ = Some(map_.next_value()?); } + GeneratedField::CacheMetadata => { + if cache_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("cacheMetadata")); + } + cache_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5700,6 +5717,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), + cache_metadata: cache_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index ac4a9ea4be696..6ed32d7de053a 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index b6cbe5759cfcc..0bd6f09bb3d19 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -836,6 +836,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index ac4a9ea4be696..6ed32d7de053a 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 620442c79e72c..1e0d76bc672b7 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -414,6 +414,7 @@ impl TableParquetOptionsProto { coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { parquet_options::CoerceInt96Opt::CoerceInt96(compression) }), + cache_metadata: global_options.global.cache_metadata, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -513,6 +514,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), + cache_metadata: proto.cache_metadata, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 86dfbd7c84963..50d2c78cbe7ea 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -231,6 +231,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.cache_metadata false datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -344,6 +345,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.cache_metadata false (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 51e40e3e685d0..0beb2e8f5d20a 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -750,3 +750,122 @@ drop table int96_from_spark; statement ok set datafusion.execution.parquet.coerce_int96 = ns; + + +### Tests for metadata caching + +# Create temporary data +query I +COPY ( + SELECT 'k-' || i as k, i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +# Enable the cache +statement ok +set datafusion.execution.parquet.cache_metadata = true; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/cache_metadata.parquet'; + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 1000 +k-9999 9999 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-1 +2 k-2 +9999 k-9999 +10000 k-10000 + +# Updating the file should invalidate the cache. Otherwise, the following queries would fail +# (e.g., with "Arrow: Parquet argument error: External: incomplete frame"). +query I +COPY ( + SELECT 'k-' || i as k, 20000 - i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 19000 +k-9999 10001 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-19999 +2 k-19998 +9999 k-10001 +10000 k-10000 + +statement ok +DROP TABLE t; + +# Partitioned files should be independently cached. Otherwise, the following queries might fail. +statement ok +COPY ( + SELECT i % 10 as part, 'k-' || i as k, i as v + FROM generate_series(0, 9) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata_partitioned.parquet' +PARTITIONED BY (part); + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet/cache_metadata_partitioned.parquet'; + +query TTI +select part, k, v from t where k = 'k-0' +---- +0 k-0 0 + +query TTI +select part, k, v from t where k = 'k-5' +---- +5 k-5 5 + +query TTI +select part, k, v from t where k = 'k-9' +---- +9 k-9 9 + +query TTI +select part, k, v from t order by k +---- +0 k-0 0 +1 k-1 1 +2 k-2 2 +3 k-3 3 +4 k-4 4 +5 k-5 5 +6 k-6 6 +7 k-7 7 +8 k-8 8 +9 k-9 9 + +statement ok +DROP TABLE t; + +statement ok +set datafusion.execution.parquet.cache_metadata = false; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96b7ee672bdb6..86eee5b68b9ae 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 6e8385e887d62bed6b89a084791982c7305c24bc Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Thu, 31 Jul 2025 18:57:30 +0100 Subject: [PATCH 2/4] Convert FileMetadata and FileMetadataCache to traits --- datafusion/datasource-parquet/src/reader.rs | 23 +++++++--- .../execution/src/cache/cache_manager.rs | 21 +++++---- datafusion/execution/src/cache/cache_unit.rs | 43 +++++++++++++------ 3 files changed, 60 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index bd9a3bc1e8045..0d0202192067b 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -161,11 +161,14 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { #[derive(Debug)] pub struct CachedParquetFileReaderFactory { store: Arc, - metadata_cache: FileMetadataCache, + metadata_cache: Arc, } impl CachedParquetFileReaderFactory { - pub fn new(store: Arc, metadata_cache: FileMetadataCache) -> Self { + pub fn new( + store: Arc, + metadata_cache: Arc, + ) -> Self { Self { store, metadata_cache, @@ -212,7 +215,7 @@ pub(crate) struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, file_meta: FileMeta, - metadata_cache: FileMetadataCache, + metadata_cache: Arc, } impl AsyncFileReader for CachedParquetFileReader { @@ -251,8 +254,10 @@ impl AsyncFileReader for CachedParquetFileReader { if let Some(metadata) = metadata_cache.get_with_extra(&object_meta.location, object_meta) { - if let Ok(parquet_metadata) = Arc::downcast::(metadata) { - return Ok(Arc::clone(&parquet_metadata)); + if let Ok(parquet_metadata) = + Arc::downcast::(metadata) + { + return Ok(Arc::clone(&parquet_metadata.0)); } } @@ -268,10 +273,11 @@ impl AsyncFileReader for CachedParquetFileReader { } reader.try_load(&mut self.inner, object_meta.size).await?; let metadata = Arc::new(reader.finish()?); + let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); metadata_cache.put_with_extra( &object_meta.location, - Arc::clone(&metadata) as Arc, + cached_metadata, object_meta, ); Ok(metadata) @@ -279,3 +285,8 @@ impl AsyncFileReader for CachedParquetFileReader { .boxed() } } + +/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. +struct CachedParquetMetaData(Arc); + +impl FileMetadata for CachedParquetMetaData {} diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 8f86a593e8932..273ee59cf8339 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -35,11 +35,13 @@ pub type ListFilesCache = Arc>, Extra = ObjectMeta>>; /// Represents generic file-embedded metadata. -pub type FileMetadata = dyn Any + Send + Sync; +pub trait FileMetadata: Any + Send + Sync {} /// Cache to store file-embedded metadata. -pub type FileMetadataCache = - Arc, Extra = ObjectMeta>>; +pub trait FileMetadataCache: + CacheAccessor, Extra = ObjectMeta> +{ +} impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -53,7 +55,7 @@ impl Debug for dyn CacheAccessor>, Extra = ObjectMeta> } } -impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { +impl Debug for dyn FileMetadataCache { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) } @@ -63,7 +65,7 @@ impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { pub struct CacheManager { file_statistic_cache: Option, list_files_cache: Option, - file_metadata_cache: Option, + file_metadata_cache: Option>, } impl CacheManager { @@ -96,7 +98,7 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Option { + pub fn get_file_metadata_cache(&self) -> Option> { self.file_metadata_cache.clone() } } @@ -117,7 +119,7 @@ pub struct CacheManagerConfig { /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a /// data file (e.g., Parquet footer and page metadata). /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. - pub file_metadata_cache: Option, + pub file_metadata_cache: Option>, } impl CacheManagerConfig { @@ -134,7 +136,10 @@ impl CacheManagerConfig { self } - pub fn with_file_metadata_cache(mut self, cache: Option) -> Self { + pub fn with_file_metadata_cache( + mut self, + cache: Option>, + ) -> Self { self.file_metadata_cache = cache; self } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 38d49b306832b..a3987a07f4200 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use crate::cache::cache_manager::FileMetadata; +use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; use crate::cache::CacheAccessor; use datafusion_common::Statistics; @@ -163,17 +163,19 @@ impl CacheAccessor>> for DefaultListFilesCache { /// changed. #[derive(Default)] pub struct DefaultFilesMetadataCache { - metadata: DashMap)>, + metadata: DashMap)>, } -impl CacheAccessor> for DefaultFilesMetadataCache { +impl FileMetadataCache for DefaultFilesMetadataCache {} + +impl CacheAccessor> for DefaultFilesMetadataCache { type Extra = ObjectMeta; - fn get(&self, _k: &Path) -> Option> { + fn get(&self, _k: &Path) -> Option> { panic!("get in DefaultFilesMetadataCache is not supported, please use get_with_extra") } - fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { self.metadata .get(k) .map(|s| { @@ -187,22 +189,26 @@ impl CacheAccessor> for DefaultFilesMetadataCache { .unwrap_or(None) } - fn put(&self, _key: &Path, _value: Arc) -> Option> { + fn put( + &self, + _key: &Path, + _value: Arc, + ) -> Option> { panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") } fn put_with_extra( &self, key: &Path, - value: Arc, + value: Arc, e: &Self::Extra, - ) -> Option> { + ) -> Option> { self.metadata .insert(key.clone(), (e.clone(), value)) .map(|x| x.1) } - fn remove(&mut self, k: &Path) -> Option> { + fn remove(&mut self, k: &Path) -> Option> { self.metadata.remove(k).map(|x| x.1 .1) } @@ -304,6 +310,12 @@ mod tests { ); } + pub struct TestFileMetadata { + metadata: String, + } + + impl FileMetadata for TestFileMetadata {} + #[test] fn test_file_metadata_cache() { let object_meta = ObjectMeta { @@ -315,7 +327,10 @@ mod tests { e_tag: None, version: None, }; - let metadata: Arc = Arc::new("retrieved_metadata".to_owned()); + + let metadata: Arc = Arc::new(TestFileMetadata { + metadata: "retrieved_metadata".to_owned(), + }); let cache = DefaultFilesMetadataCache::default(); assert!(cache @@ -323,9 +338,11 @@ mod tests { .is_none()); cache.put_with_extra(&object_meta.location, metadata, &object_meta); - assert!(cache - .get_with_extra(&object_meta.location, &object_meta) - .is_some()); + let value = cache.get_with_extra(&object_meta.location, &object_meta); + assert!(value.is_some()); + let test_file_metadata = Arc::downcast::(value.unwrap()); + assert!(test_file_metadata.is_ok()); + assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata"); // file size changed let mut object_meta2 = object_meta.clone(); From 292ac29b068e106212d6bfd040b706131cf42325 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Thu, 31 Jul 2025 20:14:31 +0100 Subject: [PATCH 3/4] Use as_any to respect MSRV --- datafusion/datasource-parquet/src/reader.rs | 11 ++++++++--- datafusion/execution/src/cache/cache_manager.rs | 6 +++++- datafusion/execution/src/cache/cache_unit.rs | 6 +++++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 0d0202192067b..5550c908a9289 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -29,6 +29,7 @@ use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use std::any::Any; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -254,8 +255,8 @@ impl AsyncFileReader for CachedParquetFileReader { if let Some(metadata) = metadata_cache.get_with_extra(&object_meta.location, object_meta) { - if let Ok(parquet_metadata) = - Arc::downcast::(metadata) + if let Some(parquet_metadata) = + metadata.as_any().downcast_ref::() { return Ok(Arc::clone(&parquet_metadata.0)); } @@ -289,4 +290,8 @@ impl AsyncFileReader for CachedParquetFileReader { /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. struct CachedParquetMetaData(Arc); -impl FileMetadata for CachedParquetMetaData {} +impl FileMetadata for CachedParquetMetaData { + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 273ee59cf8339..419c9534306e9 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -35,7 +35,11 @@ pub type ListFilesCache = Arc>, Extra = ObjectMeta>>; /// Represents generic file-embedded metadata. -pub trait FileMetadata: Any + Send + Sync {} +pub trait FileMetadata: Any + Send + Sync { + /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific + /// implementation. + fn as_any(&self) -> &dyn Any; +} /// Cache to store file-embedded metadata. pub trait FileMetadataCache: diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index a3987a07f4200..45866d3162ee4 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -314,7 +314,11 @@ mod tests { metadata: String, } - impl FileMetadata for TestFileMetadata {} + impl FileMetadata for TestFileMetadata { + fn as_any(&self) -> &dyn std::any::Any { + self + } + } #[test] fn test_file_metadata_cache() { From 881917a56be39c280ab2a4077997a70fdf9752ab Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Fri, 1 Aug 2025 19:13:38 +0100 Subject: [PATCH 4/4] Use ObjectMeta as the key of FileMetadataCache --- datafusion/datasource-parquet/src/reader.rs | 10 +-- .../execution/src/cache/cache_manager.rs | 2 +- datafusion/execution/src/cache/cache_unit.rs | 90 +++++++++++-------- 3 files changed, 56 insertions(+), 46 deletions(-) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 5550c908a9289..6ad9428770e90 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -252,9 +252,7 @@ impl AsyncFileReader for CachedParquetFileReader { let object_meta = &file_meta.object_meta; // lookup if the metadata is already cached - if let Some(metadata) = - metadata_cache.get_with_extra(&object_meta.location, object_meta) - { + if let Some(metadata) = metadata_cache.get(object_meta) { if let Some(parquet_metadata) = metadata.as_any().downcast_ref::() { @@ -276,11 +274,7 @@ impl AsyncFileReader for CachedParquetFileReader { let metadata = Arc::new(reader.finish()?); let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); - metadata_cache.put_with_extra( - &object_meta.location, - cached_metadata, - object_meta, - ); + metadata_cache.put(object_meta, cached_metadata); Ok(metadata) } .boxed() diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 419c9534306e9..37f1baa17f681 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -43,7 +43,7 @@ pub trait FileMetadata: Any + Send + Sync { /// Cache to store file-embedded metadata. pub trait FileMetadataCache: - CacheAccessor, Extra = ObjectMeta> + CacheAccessor, Extra = ObjectMeta> { } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 45866d3162ee4..70d007bf5b88e 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -161,6 +161,8 @@ impl CacheAccessor>> for DefaultListFilesCache { /// Collected file embedded metadata cache. /// The metadata for some file is invalided when the file size or last modification time have been /// changed. +/// Users should use the `get` and `put` methods. The `get_with_extra` and `put_with_extra` methods +/// simply call `get` and `put`, respectively. #[derive(Default)] pub struct DefaultFilesMetadataCache { metadata: DashMap)>, @@ -168,19 +170,15 @@ pub struct DefaultFilesMetadataCache { impl FileMetadataCache for DefaultFilesMetadataCache {} -impl CacheAccessor> for DefaultFilesMetadataCache { +impl CacheAccessor> for DefaultFilesMetadataCache { type Extra = ObjectMeta; - fn get(&self, _k: &Path) -> Option> { - panic!("get in DefaultFilesMetadataCache is not supported, please use get_with_extra") - } - - fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + fn get(&self, k: &ObjectMeta) -> Option> { self.metadata - .get(k) + .get(&k.location) .map(|s| { let (extra, metadata) = s.value(); - if extra.size != e.size || extra.last_modified != e.last_modified { + if extra.size != k.size || extra.last_modified != k.last_modified { None } else { Some(Arc::clone(metadata)) @@ -189,31 +187,45 @@ impl CacheAccessor> for DefaultFilesMetadataCache { .unwrap_or(None) } - fn put( + fn get_with_extra( &self, - _key: &Path, - _value: Arc, + k: &ObjectMeta, + _e: &Self::Extra, ) -> Option> { - panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") + self.get(k) } - fn put_with_extra( + fn put( &self, - key: &Path, + key: &ObjectMeta, value: Arc, - e: &Self::Extra, ) -> Option> { self.metadata - .insert(key.clone(), (e.clone(), value)) + .insert(key.location.clone(), (key.clone(), value)) .map(|x| x.1) } - fn remove(&mut self, k: &Path) -> Option> { - self.metadata.remove(k).map(|x| x.1 .1) + fn put_with_extra( + &self, + key: &ObjectMeta, + value: Arc, + _e: &Self::Extra, + ) -> Option> { + self.put(key, value) + } + + fn remove(&mut self, k: &ObjectMeta) -> Option> { + self.metadata.remove(&k.location).map(|x| x.1 .1) } - fn contains_key(&self, k: &Path) -> bool { - self.metadata.contains_key(k) + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.metadata + .get(&k.location) + .map(|s| { + let (extra, _) = s.value(); + extra.size == k.size && extra.last_modified == k.last_modified + }) + .unwrap_or(false) } fn len(&self) -> usize { @@ -221,7 +233,7 @@ impl CacheAccessor> for DefaultFilesMetadataCache { } fn clear(&self) { - self.metadata.clear() + self.metadata.clear(); } fn name(&self) -> String { @@ -336,13 +348,15 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let cache = DefaultFilesMetadataCache::default(); - assert!(cache - .get_with_extra(&object_meta.location, &object_meta) - .is_none()); + let mut cache = DefaultFilesMetadataCache::default(); + assert!(cache.get(&object_meta).is_none()); - cache.put_with_extra(&object_meta.location, metadata, &object_meta); - let value = cache.get_with_extra(&object_meta.location, &object_meta); + // put + cache.put(&object_meta, metadata); + + // get and contains of a valid entry + assert!(cache.contains_key(&object_meta)); + let value = cache.get(&object_meta); assert!(value.is_some()); let test_file_metadata = Arc::downcast::(value.unwrap()); assert!(test_file_metadata.is_ok()); @@ -351,9 +365,8 @@ mod tests { // file size changed let mut object_meta2 = object_meta.clone(); object_meta2.size = 2048; - assert!(cache - .get_with_extra(&object_meta2.location, &object_meta2) - .is_none()); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); // file last_modified changed let mut object_meta2 = object_meta.clone(); @@ -361,15 +374,18 @@ mod tests { DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") .unwrap() .into(); - assert!(cache - .get_with_extra(&object_meta2.location, &object_meta2) - .is_none()); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); // different file - let mut object_meta2 = object_meta; + let mut object_meta2 = object_meta.clone(); object_meta2.location = Path::from("test2"); - assert!(cache - .get_with_extra(&object_meta2.location, &object_meta2) - .is_none()); + assert!(cache.get(&object_meta2).is_none()); + assert!(!cache.contains_key(&object_meta2)); + + // remove + cache.remove(&object_meta); + assert!(cache.get(&object_meta).is_none()); + assert!(!cache.contains_key(&object_meta)); } }