diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index a45d57e8e952..8a6ad448d895 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -17,13 +17,18 @@ //! Functions that are query-able and searchable via the `\h` command +use datafusion_common::instant::Instant; use std::fmt; use std::fs::File; use std::str::FromStr; use std::sync::Arc; -use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::array::{ + DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray, + TimestampMillisecondArray, UInt64Array, +}; +use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use datafusion::catalog::{Session, TableFunctionImpl}; @@ -697,3 +702,179 @@ impl TableFunctionImpl for StatisticsCacheFunc { Ok(Arc::new(statistics_cache)) } } + +// Implementation of the `list_files_cache` table function in datafusion-cli. +/// +/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object. +/// DataFusion uses these cached results to plan queries against external tables. +/// # Schema +/// ```sql +/// > describe select * from list_files_cache(); +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | column_name | data_type | is_nullable | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | table | Utf8 | NO | +/// | path | Utf8 | NO | +/// | metadata_size_bytes | UInt64 | NO | +/// | expires_in | Duration(ms) | YES | +/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// ``` +#[derive(Debug)] +struct ListFilesCacheTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ListFilesCacheTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?) + } +} + +#[derive(Debug)] +pub struct ListFilesCacheFunc { + cache_manager: Arc, +} + +impl ListFilesCacheFunc { + pub fn new(cache_manager: Arc) -> Self { + Self { cache_manager } + } +} + +impl TableFunctionImpl for ListFilesCacheFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + if !exprs.is_empty() { + return plan_err!("list_files_cache should have no arguments"); + } + + let nested_fields = Fields::from(vec![ + Field::new("file_path", DataType::Utf8, false), + Field::new( + "file_modified", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("file_size_bytes", DataType::UInt64, false), + Field::new("e_tag", DataType::Utf8, true), + Field::new("version", DataType::Utf8, true), + ]); + + let metadata_field = + Field::new("metadata", DataType::Struct(nested_fields.clone()), true); + + let schema = Arc::new(Schema::new(vec![ + Field::new("table", DataType::Utf8, false), + Field::new("path", DataType::Utf8, false), + Field::new("metadata_size_bytes", DataType::UInt64, false), + // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type. + Field::new( + "expires_in", + DataType::Duration(TimeUnit::Millisecond), + true, + ), + Field::new( + "metadata_list", + DataType::List(Arc::new(metadata_field.clone())), + true, + ), + ])); + + let mut table_arr = vec![]; + let mut path_arr = vec![]; + let mut metadata_size_bytes_arr = vec![]; + let mut expires_arr = vec![]; + + let mut file_path_arr = vec![]; + let mut file_modified_arr = vec![]; + let mut file_size_bytes_arr = vec![]; + let mut etag_arr = vec![]; + let mut version_arr = vec![]; + let mut offsets: Vec = vec![0]; + + if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() { + let now = Instant::now(); + let mut current_offset: i32 = 0; + + for (path, entry) in list_files_cache.list_entries() { + table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string())); + path_arr.push(path.path.to_string()); + metadata_size_bytes_arr.push(entry.size_bytes as u64); + // calculates time left before entry expires + expires_arr.push( + entry + .expires + .map(|t| t.duration_since(now).as_millis() as i64), + ); + + for meta in entry.metas.iter() { + file_path_arr.push(meta.location.to_string()); + file_modified_arr.push(meta.last_modified.timestamp_millis()); + file_size_bytes_arr.push(meta.size); + etag_arr.push(meta.e_tag.clone()); + version_arr.push(meta.version.clone()); + } + current_offset += entry.metas.len() as i32; + offsets.push(current_offset); + } + } + + let struct_arr = StructArray::new( + nested_fields, + vec![ + Arc::new(StringArray::from(file_path_arr)), + Arc::new(TimestampMillisecondArray::from(file_modified_arr)), + Arc::new(UInt64Array::from(file_size_bytes_arr)), + Arc::new(StringArray::from(etag_arr)), + Arc::new(StringArray::from(version_arr)), + ], + None, + ); + + let offsets_buffer: OffsetBuffer = + OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets))); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(table_arr)), + Arc::new(StringArray::from(path_arr)), + Arc::new(UInt64Array::from(metadata_size_bytes_arr)), + Arc::new(DurationMillisecondArray::from(expires_arr)), + Arc::new(GenericListArray::new( + Arc::new(metadata_field), + offsets_buffer, + Arc::new(struct_arr), + None, + )), + ], + )?; + + let list_files_cache = ListFilesCacheTable { schema, batch }; + Ok(Arc::new(list_files_cache)) + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 8f69ae477904..9e53260e4277 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ - MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, + ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, @@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> { )), ); + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, @@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use datafusion::{ common::test_util::batches_to_string, execution::cache::{ - cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache, + DefaultListFilesCache, cache_manager::CacheManagerConfig, + cache_unit::DefaultFileStatisticsCache, }, - prelude::ParquetReadOptions, + prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; + use object_store::memory::InMemory; + use url::Url; fn assert_conversion(input: &str, expected: Result) { let result = extract_memory_pool_size(input); @@ -741,4 +753,99 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_list_files_cache() -> Result<(), DataFusionError> { + let list_files_cache = Arc::new(DefaultListFilesCache::new( + 1024, + Some(Duration::from_secs(1)), + )); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + ctx.register_object_store( + &Url::parse("mem://test_table").unwrap(), + Arc::new(InMemory::new()), + ); + + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + ctx.sql( + "CREATE EXTERNAL TABLE src_table + STORED AS PARQUET + LOCATION '../parquet-testing/data/alltypes_plain.parquet'", + ) + .await? + .collect() + .await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql( + "CREATE EXTERNAL TABLE test_table + STORED AS PARQUET + LOCATION 'mem://test_table/' + ", + ) + .await? + .collect() + .await?; + + let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()"; + let df = ctx + .sql(sql) + .await? + .unnest_columns(&["metadata_list"])? + .with_column_renamed("metadata_list", "metadata")? + .unnest_columns(&["metadata"])?; + + assert_eq!( + 2, + df.clone() + .filter(col("expires_in").is_not_null())? + .count() + .await? + ); + + let df = df + .with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")? + .with_column_renamed(r#""metadata.e_tag""#, "etag")? + .with_column( + "filename", + split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)), + )? + .select_columns(&[ + "metadata_size_bytes", + "filename", + "file_size_bytes", + "etag", + ])? + .sort(vec![col("filename").sort(true, false)])?; + let rbs = df.collect().await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +---------------------+-----------+-----------------+------+ + | metadata_size_bytes | filename | file_size_bytes | etag | + +---------------------+-----------+-----------------+------+ + | 212 | 0.parquet | 3645 | 0 | + | 212 | 1.parquet | 3645 | 1 | + +---------------------+-----------+-----------------+------+ + "); + + Ok(()) + } } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29..a175d47f4de6 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; @@ -565,7 +566,11 @@ impl TableProvider for ListingTable { // Invalidate cache entries for this table if they exist if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() { - let _ = lfc.remove(table_path.prefix()); + let key = TableScopedPath { + table: table_path.get_table_ref().clone(), + path: table_path.prefix().clone(), + }; + let _ = lfc.remove(&key); } // Sink related option, apart from format diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3ca388af0c4c..86af691fd724 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; - let mut table_path = ListingTableUrl::parse(&cmd.location)?; + let mut table_path = + ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone()); let file_extension = match table_path.is_collection() { // Setting the extension to be empty instead of allowing the default extension seems // odd, but was done to ensure existing behavior isn't modified. It seems like this @@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory { } None => format!("*.{}", cmd.file_type.to_lowercase()), }; - table_path = table_path.with_glob(glob.as_ref())?; + table_path = table_path + .with_glob(glob.as_ref())? + .with_table_ref(cmd.name.clone()); } let schema = options.infer_schema(session_state, &table_path).await?; let df_schema = Arc::clone(&schema).to_dfschema()?; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a769bb01b435..6df90b205c8e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1315,7 +1315,7 @@ impl SessionContext { let table = table_ref.table().to_owned(); let maybe_schema = { let state = self.state.read(); - let resolved = state.resolve_table_ref(table_ref); + let resolved = state.resolve_table_ref(table_ref.clone()); state .catalog_list() .catalog(&resolved.catalog) @@ -1327,6 +1327,11 @@ impl SessionContext { && table_provider.table_type() == table_type { schema.deregister_table(&table)?; + if table_type == TableType::Base + && let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() + { + lfc.drop_table_entries(&Some(table_ref))?; + } return Ok(true); } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 155d6efe462c..2428275ac3c3 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, TableReference}; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -41,6 +42,8 @@ pub struct ListingTableUrl { prefix: Path, /// An optional glob expression used to filter files glob: Option, + + table_ref: Option, } impl ListingTableUrl { @@ -145,7 +148,12 @@ impl ListingTableUrl { /// to create a [`ListingTableUrl`]. pub fn try_new(url: Url, glob: Option) -> Result { let prefix = Path::from_url_path(url.path())?; - Ok(Self { url, prefix, glob }) + Ok(Self { + url, + prefix, + glob, + table_ref: None, + }) } /// Returns the URL scheme @@ -255,7 +263,14 @@ impl ListingTableUrl { }; let list: BoxStream<'a, Result> = if self.is_collection() { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } else { match store.head(&full_prefix).await { Ok(meta) => futures::stream::once(async { Ok(meta) }) @@ -264,7 +279,14 @@ impl ListingTableUrl { // If the head command fails, it is likely that object doesn't exist. // Retry as though it were a prefix (aka a collection) Err(object_store::Error::NotFound { .. }) => { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } Err(e) => return Err(e.into()), } @@ -323,6 +345,15 @@ impl ListingTableUrl { Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?; Self::try_new(self.url, Some(glob)) } + + pub fn with_table_ref(mut self, table_ref: TableReference) -> Self { + self.table_ref = Some(table_ref); + self + } + + pub fn get_table_ref(&self) -> &Option { + &self.table_ref + } } /// Lists files with cache support, using prefix-aware lookups. @@ -345,6 +376,7 @@ impl ListingTableUrl { async fn list_with_cache<'b>( ctx: &'b dyn Session, store: &'b dyn ObjectStore, + table_ref: Option<&TableReference>, table_base_path: &Path, prefix: Option<&Path>, ) -> Result>> { @@ -367,9 +399,14 @@ async fn list_with_cache<'b>( // Convert prefix to Option for cache lookup let prefix_filter = prefix.cloned(); + let table_scoped_base_path = TableScopedPath { + table: table_ref.cloned(), + path: table_base_path.clone(), + }; + // Try cache lookup with optional prefix filter let vec = if let Some(res) = - cache.get_with_extra(table_base_path, &prefix_filter) + cache.get_with_extra(&table_scoped_base_path, &prefix_filter) { debug!("Hit list files cache"); res.as_ref().clone() @@ -380,7 +417,7 @@ async fn list_with_cache<'b>( .list(Some(table_base_path)) .try_collect::>() .await?; - cache.put(table_base_path, Arc::new(vec.clone())); + cache.put(&table_scoped_base_path, Arc::new(vec.clone())); // If a prefix filter was requested, apply it to the results if prefix.is_some() { diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c76a68c651eb..162074d909ea 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,7 +16,10 @@ // under the License. use crate::cache::cache_unit::DefaultFilesMetadataCache; +use crate::cache::list_files_cache::ListFilesEntry; +use crate::cache::list_files_cache::TableScopedPath; use crate::cache::{CacheAccessor, DefaultListFilesCache}; +use datafusion_common::TableReference; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use object_store::ObjectMeta; @@ -80,7 +83,7 @@ pub struct FileStatisticsCacheEntry { /// /// See [`crate::runtime_env::RuntimeEnv`] for more details. pub trait ListFilesCache: - CacheAccessor>, Extra = Option> + CacheAccessor>, Extra = Option> { /// Returns the cache's memory limit in bytes. fn cache_limit(&self) -> usize; @@ -93,6 +96,11 @@ pub trait ListFilesCache: /// Updates the cache with a new TTL (time-to-live). fn update_cache_ttl(&self, ttl: Option); + + /// Retrieves the information about the entries currently cached. + fn list_entries(&self) -> HashMap; + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; } /// Generic file-embedded metadata used with [`FileMetadataCache`]. diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 661bc47b5468..858219e5b883 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -17,10 +17,12 @@ use std::mem::size_of; use std::{ + collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; +use datafusion_common::TableReference; use datafusion_common::instant::Instant; use object_store::{ObjectMeta, path::Path}; @@ -103,10 +105,11 @@ impl DefaultListFilesCache { } } -struct ListFilesEntry { - metas: Arc>, - size_bytes: usize, - expires: Option, +#[derive(Clone, PartialEq, Debug)] +pub struct ListFilesEntry { + pub metas: Arc>, + pub size_bytes: usize, + pub expires: Option, } impl ListFilesEntry { @@ -146,9 +149,15 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB /// The default cache TTL for the [`DefaultListFilesCache`] pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + /// Handles the inner state of the [`DefaultListFilesCache`] struct. pub struct DefaultListFilesCacheState { - lru_queue: LruQueue, + lru_queue: LruQueue, memory_limit: usize, memory_used: usize, ttl: Option, @@ -196,17 +205,17 @@ impl DefaultListFilesCacheState { /// ``` fn get_with_prefix( &mut self, - table_base: &Path, + table_scoped_base_path: &TableScopedPath, prefix: Option<&Path>, now: Instant, ) -> Option>> { - let entry = self.lru_queue.get(table_base)?; + let entry = self.lru_queue.get(table_scoped_base_path)?; // Check expiration if let Some(exp) = entry.expires && now > exp { - self.remove(table_base); + self.remove(table_scoped_base_path); return None; } @@ -216,6 +225,7 @@ impl DefaultListFilesCacheState { }; // Build the full prefix path: table_base/prefix + let table_base = &table_scoped_base_path.path; let mut parts: Vec<_> = table_base.parts().collect(); parts.extend(prefix.parts()); let full_prefix = Path::from_iter(parts); @@ -241,7 +251,7 @@ impl DefaultListFilesCacheState { /// If the entry has expired by `now` it is removed from the cache. /// /// The LRU queue is not updated. - fn contains_key(&mut self, k: &Path, now: Instant) -> bool { + fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; }; @@ -262,7 +272,7 @@ impl DefaultListFilesCacheState { /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, - key: &Path, + key: &TableScopedPath, value: Arc>, now: Instant, ) -> Option>> { @@ -304,7 +314,7 @@ impl DefaultListFilesCacheState { } /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option>> { + fn remove(&mut self, k: &TableScopedPath) -> Option>> { if let Some(entry) = self.lru_queue.remove(k) { self.memory_used -= entry.size_bytes; Some(entry.metas) @@ -347,15 +357,41 @@ impl ListFilesCache for DefaultListFilesCache { state.ttl = ttl; state.evict_entries(); } + + fn list_entries(&self) -> HashMap { + let state = self.state.lock().unwrap(); + let mut entries = HashMap::::new(); + for (path, entry) in state.lru_queue.list_entries() { + entries.insert(path.clone(), entry.clone()); + } + entries + } + + fn drop_table_entries( + &self, + table_ref: &Option, + ) -> datafusion_common::Result<()> { + let mut state = self.state.lock().unwrap(); + let mut table_paths = vec![]; + for (path, _) in state.lru_queue.list_entries() { + if path.table == *table_ref { + table_paths.push(path.clone()); + } + } + for path in table_paths { + state.remove(&path); + } + Ok(()) + } } -impl CacheAccessor>> for DefaultListFilesCache { +impl CacheAccessor>> for DefaultListFilesCache { type Extra = Option; /// Gets all files for the given table base path. /// /// This is equivalent to calling `get_with_extra(k, &None)`. - fn get(&self, k: &Path) -> Option>> { + fn get(&self, k: &TableScopedPath) -> Option>> { self.get_with_extra(k, &None) } @@ -374,17 +410,17 @@ impl CacheAccessor>> for DefaultListFilesCache { /// can serve queries for any partition subset without additional storage calls. fn get_with_extra( &self, - table_base: &Path, + table_scoped_path: &TableScopedPath, prefix: &Self::Extra, ) -> Option>> { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); - state.get_with_prefix(table_base, prefix.as_ref(), now) + state.get_with_prefix(table_scoped_path, prefix.as_ref(), now) } fn put( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, ) -> Option>> { let mut state = self.state.lock().unwrap(); @@ -394,19 +430,19 @@ impl CacheAccessor>> for DefaultListFilesCache { fn put_with_extra( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, _e: &Self::Extra, ) -> Option>> { self.put(key, value) } - fn remove(&self, k: &Path) -> Option>> { + fn remove(&self, k: &TableScopedPath) -> Option>> { let mut state = self.state.lock().unwrap(); state.remove(k) } - fn contains_key(&self, k: &Path) -> bool { + fn contains_key(&self, k: &TableScopedPath) -> bool { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); state.contains_key(k, now) @@ -431,7 +467,6 @@ impl CacheAccessor>> for DefaultListFilesCache { mod tests { use super::*; use chrono::DateTime; - use std::thread; struct MockTimeProvider { base: Instant, @@ -499,43 +534,79 @@ mod tests { #[test] fn test_basic_operations() { let cache = DefaultListFilesCache::default(); + let table_ref = Some(TableReference::from("table")); let path = Path::from("test_path"); + let key = TableScopedPath { + table: table_ref.clone(), + path, + }; // Initially cache is empty - assert!(cache.get(&path).is_none()); - assert!(!cache.contains_key(&path)); + assert!(cache.get(&key).is_none()); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put an entry let meta = create_test_object_meta("file1", 50); let value = Arc::new(vec![meta.clone()]); - cache.put(&path, Arc::clone(&value)); + cache.put(&key, Arc::clone(&value)); // Entry should be retrievable - assert!(cache.contains_key(&path)); + assert!(cache.contains_key(&key)); assert_eq!(cache.len(), 1); - let retrieved = cache.get(&path).unwrap(); + let retrieved = cache.get(&key).unwrap(); assert_eq!(retrieved.len(), 1); assert_eq!(retrieved[0].location, meta.location); // Remove the entry - let removed = cache.remove(&path).unwrap(); + let removed = cache.remove(&key).unwrap(); assert_eq!(removed.len(), 1); - assert!(!cache.contains_key(&path)); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put multiple entries - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); assert_eq!(cache.len(), 2); + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + key1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: None, + } + ), + ( + key2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ) + ]) + ); + // Clear all entries cache.clear(); assert_eq!(cache.len(), 0); - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] @@ -547,24 +618,42 @@ mod tests { // Set cache limit to exactly fit all three entries let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + // All three entries should fit - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Adding a new entry should evict path1 (LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -576,24 +665,42 @@ mod tests { // Set cache limit to fit exactly three entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Access path1 to move it to front (MRU) // Order is now: path2 (LRU), path3, path1 (MRU) - cache.get(&path1); + cache.get(&key1); // Adding a new entry should evict path2 (the LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); // Still present (recently accessed) - assert!(!cache.contains_key(&path2)); // Evicted (was LRU) - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(cache.contains_key(&key1)); // Still present (recently accessed) + assert!(!cache.contains_key(&key2)); // Evicted (was LRU) + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -604,19 +711,32 @@ mod tests { // Set cache limit to fit both entries let cache = DefaultListFilesCache::new(size * 2, None); - cache.put(&path1, value1); - cache.put(&path2, value2); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // Large entry should not be added - assert!(!cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key_large)); assert_eq!(cache.len(), 2); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); } #[test] @@ -628,21 +748,38 @@ mod tests { // Set cache limit for exactly 3 entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Add a large entry that requires evicting 2 entries let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // path1 and path2 should be evicted (both LRU), path3 and path_large remain assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(!cache.contains_key(&path2)); // Evicted - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(!cache.contains_key(&key2)); // Evicted + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key_large)); } #[test] @@ -653,10 +790,23 @@ mod tests { let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Resize cache to only fit one entry @@ -664,70 +814,136 @@ mod tests { // Should keep only the most recent entry (path3, the MRU) assert_eq!(cache.len(), 1); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key3)); // Earlier entries (LRU) should be evicted - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] fn test_entry_update_with_size_change() { let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100); let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3_v1); + cache.put(&key1, value1); + cache.put(&key2, Arc::clone(&value2)); + cache.put(&key3, value3_v1); assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); - cache.put(&path3, value3_v2); + cache.put(&key3, value3_v2); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Update path3 with larger size that requires evicting path1 (LRU) - let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200); - cache.put(&path3, value3_v3); + let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); + cache.put(&key3, Arc::clone(&value3_v3)); assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted (was LRU) - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(!cache.contains_key(&key1)); // Evicted (was LRU) + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + key2, + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ), + ( + key3, + ListFilesEntry { + metas: value3_v3, + size_bytes: size3_v3, + expires: None, + } + ) + ]) + ); } #[test] fn test_cache_with_ttl() { let ttl = Duration::from_millis(100); - let cache = DefaultListFilesCache::new(10000, Some(ttl)); - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50); + let mock_time = Arc::new(MockTimeProvider::new()); + let cache = DefaultListFilesCache::new(10000, Some(ttl)) + .with_time_provider(Arc::clone(&mock_time) as Arc); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); - // Entries should be accessible immediately - assert!(cache.get(&path1).is_some()); - assert!(cache.get(&path2).is_some()); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert_eq!(cache.len(), 2); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); + // Entries should be accessible immediately + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key2).is_some()); + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + key1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: mock_time.now().checked_add(ttl), + } + ), + ( + key2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: mock_time.now().checked_add(ttl), + } + ) + ]) + ); // Wait for TTL to expire - thread::sleep(Duration::from_millis(150)); + mock_time.inc(Duration::from_millis(150)); // Entries should now return None and be removed when observed through get or contains_key - assert!(cache.get(&path1).is_none()); + assert!(cache.get(&key1).is_none()); assert_eq!(cache.len(), 1); // path1 was removed by get() - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key2)); assert_eq!(cache.len(), 0); // path2 was removed by contains_key() } @@ -743,21 +959,34 @@ mod tests { let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; + cache.put(&key1, value1); mock_time.inc(Duration::from_millis(50)); - cache.put(&path2, value2); + cache.put(&key2, value2); mock_time.inc(Duration::from_millis(50)); // path3 should evict path1 due to size limit - cache.put(&path3, value3); - assert!(!cache.contains_key(&path1)); // Evicted by LRU - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + cache.put(&key3, value3); + assert!(!cache.contains_key(&key1)); // Evicted by LRU + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); mock_time.inc(Duration::from_millis(151)); - assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(!cache.contains_key(&key2)); // Expired + assert!(cache.contains_key(&key3)); // Still valid } #[test] @@ -843,7 +1072,12 @@ mod tests { // Add entry and verify memory tracking let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + cache.put(&key1, value1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1); @@ -851,14 +1085,18 @@ mod tests { // Add another entry let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); - cache.put(&path2, value2); + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key2, value2); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1 + size2); } // Remove first entry and verify memory decreases - cache.remove(&path1); + cache.remove(&key1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size2); @@ -902,12 +1140,17 @@ mod tests { ]); // Cache the full table listing - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=1 using get_with_extra // New API: get_with_extra(table_base, Some(relative_prefix)) let prefix_a1 = Some(Path::from("a=1")); - let result = cache.get_with_extra(&table_base, &prefix_a1); + let result = cache.get_with_extra(&key, &prefix_a1); // Should return filtered results (only files from a=1) assert!(result.is_some()); @@ -921,7 +1164,7 @@ mod tests { // Query for partition a=2 let prefix_a2 = Some(Path::from("a=2")); - let result_2 = cache.get_with_extra(&table_base, &prefix_a2); + let result_2 = cache.get_with_extra(&key, &prefix_a2); assert!(result_2.is_some()); let filtered_2 = result_2.unwrap(); @@ -947,16 +1190,21 @@ mod tests { create_object_meta_with_path("my_table/a=2/file3.parquet"), create_object_meta_with_path("my_table/a=2/file4.parquet"), ]); - cache.put(&table_base, full_files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, full_files); // Query with no prefix filter (None) should return all 4 files - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_some()); let files = result.unwrap(); assert_eq!(files.len(), 4); // Also test using get() which delegates to get_with_extra(&None) - let result_get = cache.get(&table_base); + let result_get = cache.get(&key); assert!(result_get.is_some()); assert_eq!(result_get.unwrap().len(), 4); } @@ -967,14 +1215,19 @@ mod tests { let cache = DefaultListFilesCache::new(100000, None); let table_base = Path::from("my_table"); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; // Query for full table should miss (nothing cached) - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_none()); // Query with prefix should also miss let prefix = Some(Path::from("a=1")); - let result_2 = cache.get_with_extra(&table_base, &prefix); + let result_2 = cache.get_with_extra(&key, &prefix); assert!(result_2.is_none()); } @@ -988,11 +1241,16 @@ mod tests { create_object_meta_with_path("my_table/a=1/file1.parquet"), create_object_meta_with_path("my_table/a=2/file2.parquet"), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=3 which doesn't exist let prefix_a3 = Some(Path::from("a=3")); - let result = cache.get_with_extra(&table_base, &prefix_a3); + let result = cache.get_with_extra(&key, &prefix_a3); // Should return None since no files match assert!(result.is_none()); @@ -1018,23 +1276,28 @@ mod tests { "events/year=2025/month=01/day=01/file4.parquet", ), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for year=2024/month=01 (should get 2 files) let prefix_month = Some(Path::from("year=2024/month=01")); - let result = cache.get_with_extra(&table_base, &prefix_month); + let result = cache.get_with_extra(&key, &prefix_month); assert!(result.is_some()); assert_eq!(result.unwrap().len(), 2); // Query for year=2024 (should get 3 files) let prefix_year = Some(Path::from("year=2024")); - let result_year = cache.get_with_extra(&table_base, &prefix_year); + let result_year = cache.get_with_extra(&key, &prefix_year); assert!(result_year.is_some()); assert_eq!(result_year.unwrap().len(), 3); // Query for specific day (should get 1 file) let prefix_day = Some(Path::from("year=2024/month=01/day=01")); - let result_day = cache.get_with_extra(&table_base, &prefix_day); + let result_day = cache.get_with_extra(&key, &prefix_day); assert!(result_day.is_some()); assert_eq!(result_day.unwrap().len(), 1); } @@ -1055,18 +1318,63 @@ mod tests { create_object_meta_with_path("table_b/part=2/file2.parquet"), ]); - cache.put(&table_a, files_a); - cache.put(&table_b, files_b); + let table_ref_a = Some(TableReference::from("table_a")); + let table_ref_b = Some(TableReference::from("table_b")); + let key_a = TableScopedPath { + table: table_ref_a, + path: table_a, + }; + let key_b = TableScopedPath { + table: table_ref_b, + path: table_b, + }; + cache.put(&key_a, files_a); + cache.put(&key_b, files_b); // Query table_a should only return table_a files - let result_a = cache.get(&table_a); + let result_a = cache.get(&key_a); assert!(result_a.is_some()); assert_eq!(result_a.unwrap().len(), 1); // Query table_b with prefix should only return matching table_b files let prefix = Some(Path::from("part=1")); - let result_b = cache.get_with_extra(&table_b, &prefix); + let result_b = cache.get_with_extra(&key_b, &prefix); assert!(result_b.is_some()); assert_eq!(result_b.unwrap().len(), 1); } + + #[test] + fn test_drop_table_entries() { + let cache = DefaultListFilesCache::default(); + + let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + let table_ref1 = Some(TableReference::from("table1")); + let key1 = TableScopedPath { + table: table_ref1.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref1.clone(), + path: path2, + }; + + let table_ref2 = Some(TableReference::from("table2")); + let key3 = TableScopedPath { + table: table_ref2.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); + + cache.drop_table_entries(&table_ref1).unwrap(); + + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + } } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 8172069fdbab..93b9f0520b2a 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -24,6 +24,7 @@ mod list_files_cache; pub use file_metadata_cache::DefaultFilesMetadataCache; pub use list_files_cache::DefaultListFilesCache; +pub use list_files_cache::TableScopedPath; /// A trait that can be implemented to provide custom cache behavior for the caches managed by /// [`cache_manager::CacheManager`]. diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index f3b0163534c4..ea353d5c8dcc 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -170,5 +170,55 @@ The columns of the returned table are: | table_size_bytes | Utf8 | Size of the table, in bytes | | statistics_size_bytes | UInt64 | Size of the cached statistics in memory | +## `list_files_cache` + +The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. Cache entries are scoped to tables. + +You can inspect the cache by querying the `list_files_cache` function. For example, + +```sql +> set datafusion.runtime.list_files_cache_ttl = "30s"; +> create external table overturemaps +stored as parquet +location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/type=infrastructure'; +0 row(s) fetched. +> select table, path, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache() limit 10; ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| table | path | metadata_size_bytes | expires_in | file_size_bytes | e_tag | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 999055952 | "35fc8fbe8400960b54c66fbb408c48e8-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 975592768 | "8a16e10b722681cdc00242564b502965-59" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1082925747 | "24cd13ddb5e0e438952d2499f5dabe06-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1008425557 | "37663e31c7c64d4ef355882bcd47e361-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1065561905 | "4e7c50d2d1b3c5ed7b82b4898f5ac332-64" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1045655427 | "8fff7e6a72d375eba668727c55d4f103-63" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1086822683 | "b67167d8022d778936c330a52a5f1922-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1016732378 | "6d70857a0473ed9ed3fc6e149814168b-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 991363784 | "c9cafb42fcbb413f851691c895dd7c2b-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1032469715 | "7540252d0d67158297a67038a3365e0f-62" | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +``` + +The columns of the returned table are: +| column_name | data_type | Description | +| ------------------- | ------------ | ----------------------------------------------------------------------------------------- | +| table | Utf8 | Name of the table | +| path | Utf8 | File path relative to the object store / filesystem root | +| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not its thrift encoded form) | +| expires_in | Duration(ms) | Last modified time of the file | +| metadata_list | List(Struct) | List of metadatas, one for each file under the path. | + +A metadata struct in the metadata_list contains the following fields: + +```text +{ + "file_path": "release/2025-12-17.0/theme=base/type=infrastructure/part-00000-d556e455-e0c5-4940-b367-daff3287a952-c000.zstd.parquet", + "file_modified": "2025-12-17T22:20:29", + "file_size_bytes": 999055952, + "e_tag": "35fc8fbe8400960b54c66fbb408c48e8-60", + "version": null +} +``` + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag