From 1fc70ac200796f1bdb42da8f964b923e94fe940d Mon Sep 17 00:00:00 2001 From: jizezhang Date: Tue, 6 Jan 2026 05:23:39 -0800 Subject: [PATCH 1/2] feat: add list_files_cache table function for `datafusion-cli` (#19388) ## Which issue does this PR close? - Closes https://github.com/apache/datafusion/issues/19055. ## Rationale for this change ## What changes are included in this PR? ``` > CREATE EXTERNAL TABLE nyc_taxi_rides STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/' ; 0 row(s) fetched. Elapsed 10.061 seconds. > SELECT metadata_size_bytes, expires_in, unnest(metadata_list) FROM list_files_cache(); +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | metadata_size_bytes | expires_in | UNNEST(list_files_cache().metadata_list) | +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200901.parquet, file_modified: 2025-05-30T09:44:23, file_size_bytes: 222192983, e_tag: "e8d016c3c7af80bf911d96387febe2c1-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200902.parquet, file_modified: 2025-05-30T09:46:00, file_size_bytes: 211023080, e_tag: "1021626ff5ef606422aa7121edd69f3b-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200903.parquet, file_modified: 2025-05-30T09:47:20, file_size_bytes: 229202874, e_tag: "96e7494b217099c6a07e9c4298cbe783-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200904.parquet, file_modified: 2025-05-30T09:44:37, file_size_bytes: 225659965, e_tag: "728c45fabdcd8e40bdef4dfc28df9b0f-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200905.parquet, file_modified: 2025-05-30T09:46:12, file_size_bytes: 232847306, e_tag: "f59e45bd8bd1d77cd7ae8ab6ab468bcc-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200906.parquet, file_modified: 2025-05-30T09:47:26, file_size_bytes: 224226575, e_tag: "8ebb698eea85f9af87065ac333efc449-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200907.parquet, file_modified: 2025-05-30T09:44:52, file_size_bytes: 217168413, e_tag: "7d7ee77f6cac4adc18aa3a9e74600dd3-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200908.parquet, file_modified: 2025-05-30T09:46:23, file_size_bytes: 217303109, e_tag: "e9883055d92a33b941aab971423e681b-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200909.parquet, file_modified: 2025-05-30T09:47:28, file_size_bytes: 223333499, e_tag: "6f0917e6003b38df9060d71c004eb961-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200910.parquet, file_modified: 2025-05-30T09:44:54, file_size_bytes: 246300471, e_tag: "8928b29da44e041021e10077683b7817-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200911.parquet, file_modified: 2025-05-30T09:46:37, file_size_bytes: 227920860, e_tag: "4cd26a1a7f82af080c33e890dc1fef27-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-200912.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 233873308, e_tag: "23f4584e494e3c065c777c270c9eedbc-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201001.parquet, file_modified: 2025-05-30T09:45:18, file_size_bytes: 235166925, e_tag: "effcc8cc41b40cf7ac466f911d7b9459-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201002.parquet, file_modified: 2025-05-30T09:46:59, file_size_bytes: 177367931, e_tag: "ce8b7817ecc47da86ccbfa6b51ffa06b-10", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201003.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 205857224, e_tag: "94a078b61e3b652387e6f2a673dc3f4e-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201004.parquet, file_modified: 2025-05-30T09:45:04, file_size_bytes: 243024246, e_tag: "a1efbebfdabc204e0041d8714aaec01a-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201005.parquet, file_modified: 2025-05-30T09:46:47, file_size_bytes: 248130090, e_tag: "d3cf585e00ce627a807348c84a42d0a6-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201006.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 237068130, e_tag: "831db33281a5c017f8ffc466bd47546b-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201007.parquet, file_modified: 2025-05-30T09:45:35, file_size_bytes: 234826090, e_tag: "790e05983e6592e4920c88fbd2bfe774-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201008.parquet, file_modified: 2025-05-30T09:47:14, file_size_bytes: 197990272, e_tag: "d87ddb446e5cbc0f6831fafd95cfd027-11", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201009.parquet, file_modified: 2025-05-30T09:44:27, file_size_bytes: 243408943, e_tag: "abfbe3b29942bcd68d131d95540278d3-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201010.parquet, file_modified: 2025-05-30T09:45:47, file_size_bytes: 225277041, e_tag: "f768c7b77497b2bf3efd5cb2a4362977-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201011.parquet, file_modified: 2025-05-30T09:47:23, file_size_bytes: 220010577, e_tag: "c6830cbe1f3ae918f9280db3aa847b03-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201012.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 219773352, e_tag: "264f7ea433076690a3bbe5566168e5c5-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201101.parquet, file_modified: 2025-05-30T09:45:52, file_size_bytes: 212535107, e_tag: "ca3bdc2707b29667c78c39517781eac4-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201102.parquet, file_modified: 2025-05-30T09:47:23, file_size_bytes: 223138164, e_tag: "e2b3c0fd0c0d66ac6363600de0c8b2ad-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201103.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 252843261, e_tag: "fd5d4e01568cd6e7ef1e00de76441e5b-15", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201104.parquet, file_modified: 2025-05-30T09:46:10, file_size_bytes: 233123935, e_tag: "2b510cc2c0c73d9ec7374c9e6d56c388-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201105.parquet, file_modified: 2025-05-30T09:44:24, file_size_bytes: 246843111, e_tag: "abc2f58bd520b2013aa1a333d317c70c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201106.parquet, file_modified: 2025-05-30T09:44:58, file_size_bytes: 238786647, e_tag: "0e456698dc42a850ff7b764506cb511d-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201107.parquet, file_modified: 2025-05-30T09:46:40, file_size_bytes: 233249259, e_tag: "28177227cbff94a6a819a0568a14e9b2-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201108.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 212681184, e_tag: "fdcb442e1010630c0553a7018762a8ba-12", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201109.parquet, file_modified: 2025-05-30T09:45:13, file_size_bytes: 232399266, e_tag: "ccca37be5a3579a8bc644490226ed29a-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201110.parquet, file_modified: 2025-05-30T09:46:52, file_size_bytes: 248471033, e_tag: "eebe34c1bb74f63433eb607810969553-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201111.parquet, file_modified: 2025-05-30T09:44:26, file_size_bytes: 231103826, e_tag: "7c76b9fc111462b76336d63bce3253c7-13", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201112.parquet, file_modified: 2025-05-30T09:45:40, file_size_bytes: 236102882, e_tag: "26c10d1d85c4565cbb9e8fc6a7bc745c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201201.parquet, file_modified: 2025-05-30T09:47:21, file_size_bytes: 236184052, e_tag: "8cdc15a22462579dcf90d669cea0f04b-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201202.parquet, file_modified: 2025-05-30T09:44:27, file_size_bytes: 238377570, e_tag: "4e6734c5c2e77c68dde5155a45dac81c-14", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201203.parquet, file_modified: 2025-05-30T09:46:06, file_size_bytes: 258226172, e_tag: "b7b07fa0f4fefcf0ba0fc69ba344b5c8-15", version: NULL} | | 18138 | NULL | {file_path: nyc_taxi_rides/data/tripdata_parquet/data-201204.parquet, file_modified: 2025-05-30T09:44:25, file_size_bytes: 248190698, e_tag: "968c13850fa9a7cb46337bc8fc9d13fa-14", version: NULL} | | . | | . | | . | +---------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 96 row(s) fetched. (First 40 displayed. Use --maxrows to adjust) Elapsed 0.057 seconds. ``` ## Are these changes tested? Yes ## Are there any user-facing changes? This will enable a new user-facing table function to datafusion cli. --- datafusion-cli/src/functions.rs | 164 +++++++++++++++++- datafusion-cli/src/main.rs | 144 ++++++++++++++- .../execution/src/cache/cache_manager.rs | 4 + .../execution/src/cache/list_files_cache.rs | 127 +++++++++++--- docs/source/user-guide/cli/functions.md | 38 ++++ 5 files changed, 446 insertions(+), 31 deletions(-) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index a45d57e8e952..aa83fec1118e 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -17,13 +17,18 @@ //! Functions that are query-able and searchable via the `\h` command +use datafusion_common::instant::Instant; use std::fmt; use std::fs::File; use std::str::FromStr; use std::sync::Arc; -use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::array::{ + DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray, + TimestampMillisecondArray, UInt64Array, +}; +use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; use datafusion::catalog::{Session, TableFunctionImpl}; @@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc { Ok(Arc::new(statistics_cache)) } } + +#[derive(Debug)] +struct ListFilesCacheTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ListFilesCacheTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?) + } +} + +#[derive(Debug)] +pub struct ListFilesCacheFunc { + cache_manager: Arc, +} + +impl ListFilesCacheFunc { + pub fn new(cache_manager: Arc) -> Self { + Self { cache_manager } + } +} + +impl TableFunctionImpl for ListFilesCacheFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + if !exprs.is_empty() { + return plan_err!("list_files_cache should have no arguments"); + } + + let nested_fields = Fields::from(vec![ + Field::new("file_path", DataType::Utf8, false), + Field::new( + "file_modified", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("file_size_bytes", DataType::UInt64, false), + Field::new("e_tag", DataType::Utf8, true), + Field::new("version", DataType::Utf8, true), + ]); + + let metadata_field = + Field::new("metadata", DataType::Struct(nested_fields.clone()), true); + + let schema = Arc::new(Schema::new(vec![ + Field::new("path", DataType::Utf8, false), + Field::new("metadata_size_bytes", DataType::UInt64, false), + // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type. + Field::new( + "expires_in", + DataType::Duration(TimeUnit::Millisecond), + true, + ), + Field::new( + "metadata_list", + DataType::List(Arc::new(metadata_field.clone())), + true, + ), + ])); + + let mut path_arr = vec![]; + let mut metadata_size_bytes_arr = vec![]; + let mut expires_arr = vec![]; + + let mut file_path_arr = vec![]; + let mut file_modified_arr = vec![]; + let mut file_size_bytes_arr = vec![]; + let mut etag_arr = vec![]; + let mut version_arr = vec![]; + let mut offsets: Vec = vec![0]; + + if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() { + let now = Instant::now(); + let mut current_offset: i32 = 0; + + for (path, entry) in list_files_cache.list_entries() { + path_arr.push(path.to_string()); + metadata_size_bytes_arr.push(entry.size_bytes as u64); + // calculates time left before entry expires + expires_arr.push( + entry + .expires + .map(|t| t.duration_since(now).as_millis() as i64), + ); + + for meta in entry.metas.iter() { + file_path_arr.push(meta.location.to_string()); + file_modified_arr.push(meta.last_modified.timestamp_millis()); + file_size_bytes_arr.push(meta.size); + etag_arr.push(meta.e_tag.clone()); + version_arr.push(meta.version.clone()); + } + current_offset += entry.metas.len() as i32; + offsets.push(current_offset); + } + } + + let struct_arr = StructArray::new( + nested_fields, + vec![ + Arc::new(StringArray::from(file_path_arr)), + Arc::new(TimestampMillisecondArray::from(file_modified_arr)), + Arc::new(UInt64Array::from(file_size_bytes_arr)), + Arc::new(StringArray::from(etag_arr)), + Arc::new(StringArray::from(version_arr)), + ], + None, + ); + + let offsets_buffer: OffsetBuffer = + OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets))); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(path_arr)), + Arc::new(UInt64Array::from(metadata_size_bytes_arr)), + Arc::new(DurationMillisecondArray::from(expires_arr)), + Arc::new(GenericListArray::new( + Arc::new(metadata_field), + offsets_buffer, + Arc::new(struct_arr), + None, + )), + ], + )?; + + let list_files_cache = ListFilesCacheTable { schema, batch }; + Ok(Arc::new(list_files_cache)) + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 8f69ae477904..46d88152fac1 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ - MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, + ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, @@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> { )), ); + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, @@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use datafusion::{ common::test_util::batches_to_string, execution::cache::{ - cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache, + DefaultListFilesCache, cache_manager::CacheManagerConfig, + cache_unit::DefaultFileStatisticsCache, }, - prelude::ParquetReadOptions, + prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; + use object_store::memory::InMemory; + use url::Url; fn assert_conversion(input: &str, expected: Result) { let result = extract_memory_pool_size(input); @@ -741,4 +753,130 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_list_files_cache() -> Result<(), DataFusionError> { + let list_files_cache = Arc::new(DefaultListFilesCache::new( + 1024, + Some(Duration::from_secs(1)), + )); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + ctx.register_object_store( + &Url::parse("mem://test_table").unwrap(), + Arc::new(InMemory::new()), + ); + + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + ctx.sql( + "CREATE EXTERNAL TABLE src_table + STORED AS PARQUET + LOCATION '../parquet-testing/data/alltypes_plain.parquet'", + ) + .await? + .collect() + .await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?; + + ctx.sql( + "CREATE EXTERNAL TABLE test_table + STORED AS PARQUET + LOCATION 'mem://test_table/' + ", + ) + .await? + .collect() + .await?; + + let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()"; + let df = ctx + .sql(sql) + .await? + .unnest_columns(&["metadata_list"])? + .with_column_renamed("metadata_list", "metadata")? + .unnest_columns(&["metadata"])?; + + assert_eq!( + 2, + df.clone() + .filter(col("expires_in").is_not_null())? + .count() + .await? + ); + + let df = df + .with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")? + .with_column_renamed(r#""metadata.e_tag""#, "etag")? + .with_column( + "filename", + split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)), + )? + .select_columns(&[ + "metadata_size_bytes", + "filename", + "file_size_bytes", + "etag", + ])? + .sort(vec![col("filename").sort(true, false)])?; + let rbs = df.collect().await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +---------------------+-----------+-----------------+------+ + | metadata_size_bytes | filename | file_size_bytes | etag | + +---------------------+-----------+-----------------+------+ + | 212 | 0.parquet | 3645 | 0 | + | 212 | 1.parquet | 3645 | 1 | + +---------------------+-----------+-----------------+------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> { + let rt = RuntimeEnvBuilder::new() + .with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None)) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + ctx.register_udtf( + "list_files_cache", + Arc::new(ListFilesCacheFunc::new( + ctx.task_ctx().runtime_env().cache_manager.clone(), + )), + ); + + let rbs = ctx + .sql("SELECT * FROM list_files_cache()") + .await? + .collect() + .await?; + assert_snapshot!(batches_to_string(&rbs),@r" + +------+---------------------+------------+---------------+ + | path | metadata_size_bytes | expires_in | metadata_list | + +------+---------------------+------------+---------------+ + +------+---------------------+------------+---------------+ + "); + + Ok(()) + } } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c76a68c651eb..31a2323524dd 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,6 +16,7 @@ // under the License. use crate::cache::cache_unit::DefaultFilesMetadataCache; +use crate::cache::list_files_cache::ListFilesEntry; use crate::cache::{CacheAccessor, DefaultListFilesCache}; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; @@ -93,6 +94,9 @@ pub trait ListFilesCache: /// Updates the cache with a new TTL (time-to-live). fn update_cache_ttl(&self, ttl: Option); + + /// Retrieves the information about the entries currently cached. + fn list_entries(&self) -> HashMap; } /// Generic file-embedded metadata used with [`FileMetadataCache`]. diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 661bc47b5468..c4a92c49478d 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -17,6 +17,7 @@ use std::mem::size_of; use std::{ + collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -103,10 +104,11 @@ impl DefaultListFilesCache { } } -struct ListFilesEntry { - metas: Arc>, - size_bytes: usize, - expires: Option, +#[derive(Clone, PartialEq, Debug)] +pub struct ListFilesEntry { + pub metas: Arc>, + pub size_bytes: usize, + pub expires: Option, } impl ListFilesEntry { @@ -347,6 +349,15 @@ impl ListFilesCache for DefaultListFilesCache { state.ttl = ttl; state.evict_entries(); } + + fn list_entries(&self) -> HashMap { + let state = self.state.lock().unwrap(); + let mut entries = HashMap::::new(); + for (path, entry) in state.lru_queue.list_entries() { + entries.insert(path.clone(), entry.clone()); + } + entries + } } impl CacheAccessor>> for DefaultListFilesCache { @@ -431,7 +442,6 @@ impl CacheAccessor>> for DefaultListFilesCache { mod tests { use super::*; use chrono::DateTime; - use std::thread; struct MockTimeProvider { base: Instant, @@ -525,12 +535,35 @@ mod tests { assert_eq!(cache.len(), 0); // Put multiple entries - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); + cache.put(&path1, Arc::clone(&value1)); + cache.put(&path2, Arc::clone(&value2)); assert_eq!(cache.len(), 2); + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: None, + } + ), + ( + path2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ) + ]) + ); + // Clear all entries cache.clear(); assert_eq!(cache.len(), 0); @@ -673,14 +706,14 @@ mod tests { #[test] fn test_entry_update_with_size_change() { let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100); let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); let cache = DefaultListFilesCache::new(size * 3, None); // Add three entries cache.put(&path1, value1); - cache.put(&path2, value2); + cache.put(&path2, Arc::clone(&value2)); cache.put(&path3, value3_v1); assert_eq!(cache.len(), 3); @@ -694,35 +727,77 @@ mod tests { assert!(cache.contains_key(&path3)); // Update path3 with larger size that requires evicting path1 (LRU) - let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200); - cache.put(&path3, value3_v3); + let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); + cache.put(&path3, Arc::clone(&value3_v3)); assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted (was LRU) - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(!cache.contains_key(&path1)); + + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path2, + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: None, + } + ), + ( + path3, + ListFilesEntry { + metas: value3_v3, + size_bytes: size3_v3, + expires: None, + } + ) + ]) + ); } #[test] fn test_cache_with_ttl() { let ttl = Duration::from_millis(100); - let cache = DefaultListFilesCache::new(10000, Some(ttl)); - let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50); + let mock_time = Arc::new(MockTimeProvider::new()); + let cache = DefaultListFilesCache::new(10000, Some(ttl)) + .with_time_provider(Arc::clone(&mock_time) as Arc); - cache.put(&path1, value1); - cache.put(&path2, value2); + let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); + let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); + + cache.put(&path1, Arc::clone(&value1)); + cache.put(&path2, Arc::clone(&value2)); // Entries should be accessible immediately assert!(cache.get(&path1).is_some()); assert!(cache.get(&path2).is_some()); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert_eq!(cache.len(), 2); - + // List cache entries + assert_eq!( + cache.list_entries(), + HashMap::from([ + ( + path1.clone(), + ListFilesEntry { + metas: value1, + size_bytes: size1, + expires: mock_time.now().checked_add(ttl), + } + ), + ( + path2.clone(), + ListFilesEntry { + metas: value2, + size_bytes: size2, + expires: mock_time.now().checked_add(ttl), + } + ) + ]) + ); // Wait for TTL to expire - thread::sleep(Duration::from_millis(150)); + mock_time.inc(Duration::from_millis(150)); // Entries should now return None and be removed when observed through get or contains_key assert!(cache.get(&path1).is_none()); diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index f3b0163534c4..11f61297ac8d 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -170,5 +170,43 @@ The columns of the returned table are: | table_size_bytes | Utf8 | Size of the table, in bytes | | statistics_size_bytes | UInt64 | Size of the cached statistics in memory | +## `list_files_cache` + +The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. + +You can inspect the cache by querying the `list_files_cache` function. For example, + +```sql +> select split_part(path, '/', -1) as folder, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache(); ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +| folder | metadata_size_bytes | expires_in | file_size_bytes | e_tag | ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1233969 | 7041136-643a7bfeeec9b-12d431 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1234756 | 7041137-643a7bfeef2df-12d744 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232554 | 7041139-643a7bfeef86a-12ceaa | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1238676 | 704113a-643a7bfeef914-12e694 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232186 | 704113b-643a7bfeefb22-12cd3a | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1237506 | 7041138-643a7bfeef775-12e202 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228756 | 7041134-643a7bfeec2d8-12bfd4 | +| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228509 | 7041135-643a7bfeed599-12bedd | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20124715 | 704114a-643a7c00bb560-133142b | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20131024 | 7041149-643a7c00b90b7-1332cd0 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20179217 | 704114b-643a7c00bb93e-133e911 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20296819 | 704114f-643a7c00ccefd-135b473 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20110730 | 7041148-643a7c00b9832-132dd8a | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20128346 | 704114c-643a7c00bc00a-133225a | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20130133 | 7041147-643a7c00b3901-1332955 | +| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20139830 | 7041146-643a7c00abbe8-1334f36 | ++----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +``` + +The columns of the returned table are: +| column_name | data_type | Description | +| ------------------- | ------------ | ----------------------------------------------------------------------------------------- | +| path | Utf8 | File path relative to the object store / filesystem root | +| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not its thrift encoded form) | +| expires_in | Duration(ms) | Last modified time of the file | +| metadata_list | List(Struct) | List of metadatas, one for each file under the path. | + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag From aa3d413f0754f12bd83df8cb7accbd31ab8beaaa Mon Sep 17 00:00:00 2001 From: jizezhang Date: Thu, 8 Jan 2026 06:34:10 -0800 Subject: [PATCH 2/2] Make default ListingFilesCache table scoped (#19616) ## Which issue does this PR close? - Builds on https://github.com/apache/datafusion/pull/19388 - Closes https://github.com/apache/datafusion/issues/19573 ## Rationale for this change This PR explores one way to make `ListFilesCache` table scoped. A session level cache is still used, but the cache key is made a "table-scoped" path, for which a new struct ``` pub struct TableScopedPath(pub Option, pub Path); ``` is defined. `TableReference` comes from `CreateExternalTable` passed to `ListingTableFactory::create`. Additionally, when a table is dropped, all entries related to a table is dropped by modifying `SessionContext::find_and_deregister` method. Testing (change on adding `list_files_cache()` for cli is included for easier testing). - Testing cache reuse on a single table. ``` > \object_store_profiling summary ObjectStore Profile mode set to Summary > create external table test stored as parquet location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/'; 0 row(s) fetched. Elapsed 14.878 seconds. Object Store Profiling Instrumented Object Store: instrument_mode: Summary, inner: AmazonS3(overturemaps-us-west-2) Summaries: +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Operation | Metric | min | max | avg | sum | count | +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Get | duration | 0.030597s | 0.209235s | 0.082396s | 36.254189s | 440 | | Get | size | 204782 B | 857230 B | 497304.88 B | 218814144 B | 440 | | List | duration | 0.192037s | 0.192037s | 0.192037s | 0.192037s | 1 | | List | size | | | | | 1 | +-----------+----------+-----------+-----------+-------------+-------------+-------+ > select table, path, unnest(metadata_list) from list_files_cache() limit 1; +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | table | path | UNNEST(list_files_cache().metadata_list) | +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | test | release/2025-12-17.0/theme=base | {file_path: release/2025-12-17.0/theme=base/type=bathymetry/part-00000-dd0f2f50-b436-4710-996f-f1b06181a3a1-c000.zstd.parquet, file_modified: 2025-12-17T22:32:50, file_size_bytes: 40280159, e_tag: "15090401f8f936c3f83bb498cb99a41d-3", version: NULL} | +-------+---------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.058 seconds. Object Store Profiling > select count(*) from test where type = 'infrastructure'; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.028 seconds. Object Store Profiling ``` - Test separate cache entries are created for two tables with same path ``` > create external table test2 stored as parquet location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/'; 0 row(s) fetched. Elapsed 14.798 seconds. Object Store Profiling Instrumented Object Store: instrument_mode: Summary, inner: AmazonS3(overturemaps-us-west-2) Summaries: +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Operation | Metric | min | max | avg | sum | count | +-----------+----------+-----------+-----------+-------------+-------------+-------+ | Get | duration | 0.030238s | 0.350465s | 0.073670s | 32.414654s | 440 | | Get | size | 204782 B | 857230 B | 497304.88 B | 218814144 B | 440 | | List | duration | 0.133334s | 0.133334s | 0.133334s | 0.133334s | 1 | | List | size | | | | | 1 | +-----------+----------+-----------+-----------+-------------+-------------+-------+ > select count(*) from test2 where type = 'bathymetry'; +----------+ | count(*) | +----------+ | 59963 | +----------+ 1 row(s) fetched. Elapsed 0.009 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test | release/2025-12-17.0/theme=base | | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 2 row(s) fetched. Elapsed 0.004 seconds. ``` - Test cache associated with a table is dropped when table is dropped, and the other table with same path is unaffected. ``` > drop table test; 0 row(s) fetched. Elapsed 0.015 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 1 row(s) fetched. Elapsed 0.005 seconds. Object Store Profiling > select count(*) from list_files_cache() where table = 'test'; +----------+ | count(*) | +----------+ | 0 | +----------+ 1 row(s) fetched. Elapsed 0.014 seconds. > select count(*) from test2 where type = 'infrastructure'; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.013 seconds. Object Store Profiling ``` - Test that dropping a view does not remove cache ``` > create view test2_view as (select * from test2 where type = 'infrastructure'); 0 row(s) fetched. Elapsed 0.103 seconds. Object Store Profiling > select count(*) from test2_view; +-----------+ | count(*) | +-----------+ | 142969564 | +-----------+ 1 row(s) fetched. Elapsed 0.094 seconds. Object Store Profiling > drop view test2_view; 0 row(s) fetched. Elapsed 0.002 seconds. Object Store Profiling > select table, path from list_files_cache(); +-------+---------------------------------+ | table | path | +-------+---------------------------------+ | test2 | release/2025-12-17.0/theme=base | +-------+---------------------------------+ 1 row(s) fetched. Elapsed 0.007 seconds. ``` ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? --- datafusion-cli/src/functions.rs | 23 +- datafusion-cli/src/main.rs | 31 -- datafusion/catalog-listing/src/table.rs | 7 +- .../src/datasource/listing_table_factory.rs | 7 +- datafusion/core/src/execution/context/mod.rs | 7 +- datafusion/datasource/src/url.rs | 49 +- .../execution/src/cache/cache_manager.rs | 8 +- .../execution/src/cache/list_files_cache.rs | 469 +++++++++++++----- datafusion/execution/src/cache/mod.rs | 1 + docs/source/user-guide/cli/functions.md | 56 ++- 10 files changed, 474 insertions(+), 184 deletions(-) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index aa83fec1118e..8a6ad448d895 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -703,6 +703,23 @@ impl TableFunctionImpl for StatisticsCacheFunc { } } +// Implementation of the `list_files_cache` table function in datafusion-cli. +/// +/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object. +/// DataFusion uses these cached results to plan queries against external tables. +/// # Schema +/// ```sql +/// > describe select * from list_files_cache(); +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | column_name | data_type | is_nullable | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// | table | Utf8 | NO | +/// | path | Utf8 | NO | +/// | metadata_size_bytes | UInt64 | NO | +/// | expires_in | Duration(ms) | YES | +/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES | +/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ +/// ``` #[derive(Debug)] struct ListFilesCacheTable { schema: SchemaRef, @@ -771,6 +788,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { Field::new("metadata", DataType::Struct(nested_fields.clone()), true); let schema = Arc::new(Schema::new(vec![ + Field::new("table", DataType::Utf8, false), Field::new("path", DataType::Utf8, false), Field::new("metadata_size_bytes", DataType::UInt64, false), // expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type. @@ -786,6 +804,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { ), ])); + let mut table_arr = vec![]; let mut path_arr = vec![]; let mut metadata_size_bytes_arr = vec![]; let mut expires_arr = vec![]; @@ -802,7 +821,8 @@ impl TableFunctionImpl for ListFilesCacheFunc { let mut current_offset: i32 = 0; for (path, entry) in list_files_cache.list_entries() { - path_arr.push(path.to_string()); + table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string())); + path_arr.push(path.path.to_string()); metadata_size_bytes_arr.push(entry.size_bytes as u64); // calculates time left before entry expires expires_arr.push( @@ -841,6 +861,7 @@ impl TableFunctionImpl for ListFilesCacheFunc { let batch = RecordBatch::try_new( schema.clone(), vec![ + Arc::new(StringArray::from(table_arr)), Arc::new(StringArray::from(path_arr)), Arc::new(UInt64Array::from(metadata_size_bytes_arr)), Arc::new(DurationMillisecondArray::from(expires_arr)), diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 46d88152fac1..9e53260e4277 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -848,35 +848,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> { - let rt = RuntimeEnvBuilder::new() - .with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None)) - .build_arc() - .unwrap(); - - let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); - - ctx.register_udtf( - "list_files_cache", - Arc::new(ListFilesCacheFunc::new( - ctx.task_ctx().runtime_env().cache_manager.clone(), - )), - ); - - let rbs = ctx - .sql("SELECT * FROM list_files_cache()") - .await? - .collect() - .await?; - assert_snapshot!(batches_to_string(&rbs),@r" - +------+---------------------+------------+---------------+ - | path | metadata_size_bytes | expires_in | metadata_list | - +------+---------------------+------------+---------------+ - +------+---------------------+------------+---------------+ - "); - - Ok(()) - } } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 9fb2dd2dce29..a175d47f4de6 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, }; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; @@ -565,7 +566,11 @@ impl TableProvider for ListingTable { // Invalidate cache entries for this table if they exist if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() { - let _ = lfc.remove(table_path.prefix()); + let key = TableScopedPath { + table: table_path.get_table_ref().clone(), + path: table_path.prefix().clone(), + }; + let _ = lfc.remove(&key); } // Sink related option, apart from format diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3ca388af0c4c..86af691fd724 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; - let mut table_path = ListingTableUrl::parse(&cmd.location)?; + let mut table_path = + ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone()); let file_extension = match table_path.is_collection() { // Setting the extension to be empty instead of allowing the default extension seems // odd, but was done to ensure existing behavior isn't modified. It seems like this @@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory { } None => format!("*.{}", cmd.file_type.to_lowercase()), }; - table_path = table_path.with_glob(glob.as_ref())?; + table_path = table_path + .with_glob(glob.as_ref())? + .with_table_ref(cmd.name.clone()); } let schema = options.infer_schema(session_state, &table_path).await?; let df_schema = Arc::clone(&schema).to_dfschema()?; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a769bb01b435..6df90b205c8e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1315,7 +1315,7 @@ impl SessionContext { let table = table_ref.table().to_owned(); let maybe_schema = { let state = self.state.read(); - let resolved = state.resolve_table_ref(table_ref); + let resolved = state.resolve_table_ref(table_ref.clone()); state .catalog_list() .catalog(&resolved.catalog) @@ -1327,6 +1327,11 @@ impl SessionContext { && table_provider.table_type() == table_type { schema.deregister_table(&table)?; + if table_type == TableType::Base + && let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() + { + lfc.drop_table_entries(&Some(table_ref))?; + } return Ok(true); } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 155d6efe462c..2428275ac3c3 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, TableReference}; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -41,6 +42,8 @@ pub struct ListingTableUrl { prefix: Path, /// An optional glob expression used to filter files glob: Option, + + table_ref: Option, } impl ListingTableUrl { @@ -145,7 +148,12 @@ impl ListingTableUrl { /// to create a [`ListingTableUrl`]. pub fn try_new(url: Url, glob: Option) -> Result { let prefix = Path::from_url_path(url.path())?; - Ok(Self { url, prefix, glob }) + Ok(Self { + url, + prefix, + glob, + table_ref: None, + }) } /// Returns the URL scheme @@ -255,7 +263,14 @@ impl ListingTableUrl { }; let list: BoxStream<'a, Result> = if self.is_collection() { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } else { match store.head(&full_prefix).await { Ok(meta) => futures::stream::once(async { Ok(meta) }) @@ -264,7 +279,14 @@ impl ListingTableUrl { // If the head command fails, it is likely that object doesn't exist. // Retry as though it were a prefix (aka a collection) Err(object_store::Error::NotFound { .. }) => { - list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await? + list_with_cache( + ctx, + store, + self.table_ref.as_ref(), + &self.prefix, + prefix.as_ref(), + ) + .await? } Err(e) => return Err(e.into()), } @@ -323,6 +345,15 @@ impl ListingTableUrl { Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?; Self::try_new(self.url, Some(glob)) } + + pub fn with_table_ref(mut self, table_ref: TableReference) -> Self { + self.table_ref = Some(table_ref); + self + } + + pub fn get_table_ref(&self) -> &Option { + &self.table_ref + } } /// Lists files with cache support, using prefix-aware lookups. @@ -345,6 +376,7 @@ impl ListingTableUrl { async fn list_with_cache<'b>( ctx: &'b dyn Session, store: &'b dyn ObjectStore, + table_ref: Option<&TableReference>, table_base_path: &Path, prefix: Option<&Path>, ) -> Result>> { @@ -367,9 +399,14 @@ async fn list_with_cache<'b>( // Convert prefix to Option for cache lookup let prefix_filter = prefix.cloned(); + let table_scoped_base_path = TableScopedPath { + table: table_ref.cloned(), + path: table_base_path.clone(), + }; + // Try cache lookup with optional prefix filter let vec = if let Some(res) = - cache.get_with_extra(table_base_path, &prefix_filter) + cache.get_with_extra(&table_scoped_base_path, &prefix_filter) { debug!("Hit list files cache"); res.as_ref().clone() @@ -380,7 +417,7 @@ async fn list_with_cache<'b>( .list(Some(table_base_path)) .try_collect::>() .await?; - cache.put(table_base_path, Arc::new(vec.clone())); + cache.put(&table_scoped_base_path, Arc::new(vec.clone())); // If a prefix filter was requested, apply it to the results if prefix.is_some() { diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 31a2323524dd..162074d909ea 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -17,7 +17,9 @@ use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::list_files_cache::ListFilesEntry; +use crate::cache::list_files_cache::TableScopedPath; use crate::cache::{CacheAccessor, DefaultListFilesCache}; +use datafusion_common::TableReference; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use object_store::ObjectMeta; @@ -81,7 +83,7 @@ pub struct FileStatisticsCacheEntry { /// /// See [`crate::runtime_env::RuntimeEnv`] for more details. pub trait ListFilesCache: - CacheAccessor>, Extra = Option> + CacheAccessor>, Extra = Option> { /// Returns the cache's memory limit in bytes. fn cache_limit(&self) -> usize; @@ -96,7 +98,9 @@ pub trait ListFilesCache: fn update_cache_ttl(&self, ttl: Option); /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; + fn list_entries(&self) -> HashMap; + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; } /// Generic file-embedded metadata used with [`FileMetadataCache`]. diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index c4a92c49478d..858219e5b883 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -22,6 +22,7 @@ use std::{ time::Duration, }; +use datafusion_common::TableReference; use datafusion_common::instant::Instant; use object_store::{ObjectMeta, path::Path}; @@ -148,9 +149,15 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB /// The default cache TTL for the [`DefaultListFilesCache`] pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + /// Handles the inner state of the [`DefaultListFilesCache`] struct. pub struct DefaultListFilesCacheState { - lru_queue: LruQueue, + lru_queue: LruQueue, memory_limit: usize, memory_used: usize, ttl: Option, @@ -198,17 +205,17 @@ impl DefaultListFilesCacheState { /// ``` fn get_with_prefix( &mut self, - table_base: &Path, + table_scoped_base_path: &TableScopedPath, prefix: Option<&Path>, now: Instant, ) -> Option>> { - let entry = self.lru_queue.get(table_base)?; + let entry = self.lru_queue.get(table_scoped_base_path)?; // Check expiration if let Some(exp) = entry.expires && now > exp { - self.remove(table_base); + self.remove(table_scoped_base_path); return None; } @@ -218,6 +225,7 @@ impl DefaultListFilesCacheState { }; // Build the full prefix path: table_base/prefix + let table_base = &table_scoped_base_path.path; let mut parts: Vec<_> = table_base.parts().collect(); parts.extend(prefix.parts()); let full_prefix = Path::from_iter(parts); @@ -243,7 +251,7 @@ impl DefaultListFilesCacheState { /// If the entry has expired by `now` it is removed from the cache. /// /// The LRU queue is not updated. - fn contains_key(&mut self, k: &Path, now: Instant) -> bool { + fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; }; @@ -264,7 +272,7 @@ impl DefaultListFilesCacheState { /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, - key: &Path, + key: &TableScopedPath, value: Arc>, now: Instant, ) -> Option>> { @@ -306,7 +314,7 @@ impl DefaultListFilesCacheState { } /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option>> { + fn remove(&mut self, k: &TableScopedPath) -> Option>> { if let Some(entry) = self.lru_queue.remove(k) { self.memory_used -= entry.size_bytes; Some(entry.metas) @@ -350,23 +358,40 @@ impl ListFilesCache for DefaultListFilesCache { state.evict_entries(); } - fn list_entries(&self) -> HashMap { + fn list_entries(&self) -> HashMap { let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); + let mut entries = HashMap::::new(); for (path, entry) in state.lru_queue.list_entries() { entries.insert(path.clone(), entry.clone()); } entries } + + fn drop_table_entries( + &self, + table_ref: &Option, + ) -> datafusion_common::Result<()> { + let mut state = self.state.lock().unwrap(); + let mut table_paths = vec![]; + for (path, _) in state.lru_queue.list_entries() { + if path.table == *table_ref { + table_paths.push(path.clone()); + } + } + for path in table_paths { + state.remove(&path); + } + Ok(()) + } } -impl CacheAccessor>> for DefaultListFilesCache { +impl CacheAccessor>> for DefaultListFilesCache { type Extra = Option; /// Gets all files for the given table base path. /// /// This is equivalent to calling `get_with_extra(k, &None)`. - fn get(&self, k: &Path) -> Option>> { + fn get(&self, k: &TableScopedPath) -> Option>> { self.get_with_extra(k, &None) } @@ -385,17 +410,17 @@ impl CacheAccessor>> for DefaultListFilesCache { /// can serve queries for any partition subset without additional storage calls. fn get_with_extra( &self, - table_base: &Path, + table_scoped_path: &TableScopedPath, prefix: &Self::Extra, ) -> Option>> { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); - state.get_with_prefix(table_base, prefix.as_ref(), now) + state.get_with_prefix(table_scoped_path, prefix.as_ref(), now) } fn put( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, ) -> Option>> { let mut state = self.state.lock().unwrap(); @@ -405,19 +430,19 @@ impl CacheAccessor>> for DefaultListFilesCache { fn put_with_extra( &self, - key: &Path, + key: &TableScopedPath, value: Arc>, _e: &Self::Extra, ) -> Option>> { self.put(key, value) } - fn remove(&self, k: &Path) -> Option>> { + fn remove(&self, k: &TableScopedPath) -> Option>> { let mut state = self.state.lock().unwrap(); state.remove(k) } - fn contains_key(&self, k: &Path) -> bool { + fn contains_key(&self, k: &TableScopedPath) -> bool { let mut state = self.state.lock().unwrap(); let now = self.time_provider.now(); state.contains_key(k, now) @@ -509,36 +534,49 @@ mod tests { #[test] fn test_basic_operations() { let cache = DefaultListFilesCache::default(); + let table_ref = Some(TableReference::from("table")); let path = Path::from("test_path"); + let key = TableScopedPath { + table: table_ref.clone(), + path, + }; // Initially cache is empty - assert!(cache.get(&path).is_none()); - assert!(!cache.contains_key(&path)); + assert!(cache.get(&key).is_none()); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put an entry let meta = create_test_object_meta("file1", 50); let value = Arc::new(vec![meta.clone()]); - cache.put(&path, Arc::clone(&value)); + cache.put(&key, Arc::clone(&value)); // Entry should be retrievable - assert!(cache.contains_key(&path)); + assert!(cache.contains_key(&key)); assert_eq!(cache.len(), 1); - let retrieved = cache.get(&path).unwrap(); + let retrieved = cache.get(&key).unwrap(); assert_eq!(retrieved.len(), 1); assert_eq!(retrieved[0].location, meta.location); // Remove the entry - let removed = cache.remove(&path).unwrap(); + let removed = cache.remove(&key).unwrap(); assert_eq!(removed.len(), 1); - assert!(!cache.contains_key(&path)); + assert!(!cache.contains_key(&key)); assert_eq!(cache.len(), 0); // Put multiple entries let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); - cache.put(&path1, Arc::clone(&value1)); - cache.put(&path2, Arc::clone(&value2)); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); assert_eq!(cache.len(), 2); // List cache entries @@ -546,7 +584,7 @@ mod tests { cache.list_entries(), HashMap::from([ ( - path1.clone(), + key1.clone(), ListFilesEntry { metas: value1, size_bytes: size1, @@ -554,7 +592,7 @@ mod tests { } ), ( - path2.clone(), + key2.clone(), ListFilesEntry { metas: value2, size_bytes: size2, @@ -567,8 +605,8 @@ mod tests { // Clear all entries cache.clear(); assert_eq!(cache.len(), 0); - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] @@ -580,24 +618,42 @@ mod tests { // Set cache limit to exactly fit all three entries let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + // All three entries should fit - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Adding a new entry should evict path1 (LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -609,24 +665,42 @@ mod tests { // Set cache limit to fit exactly three entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Access path1 to move it to front (MRU) // Order is now: path2 (LRU), path3, path1 (MRU) - cache.get(&path1); + cache.get(&key1); // Adding a new entry should evict path2 (the LRU) let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - cache.put(&path4, value4); + let key4 = TableScopedPath { + table: table_ref, + path: path4, + }; + cache.put(&key4, value4); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); // Still present (recently accessed) - assert!(!cache.contains_key(&path2)); // Evicted (was LRU) - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path4)); + assert!(cache.contains_key(&key1)); // Still present (recently accessed) + assert!(!cache.contains_key(&key2)); // Evicted (was LRU) + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key4)); } #[test] @@ -637,19 +711,32 @@ mod tests { // Set cache limit to fit both entries let cache = DefaultListFilesCache::new(size * 2, None); - cache.put(&path1, value1); - cache.put(&path2, value2); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // Large entry should not be added - assert!(!cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key_large)); assert_eq!(cache.len(), 2); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); } #[test] @@ -661,21 +748,38 @@ mod tests { // Set cache limit for exactly 3 entries let cache = DefaultListFilesCache::new(size * 3, None); - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref.clone(), + path: path3, + }; + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Add a large entry that requires evicting 2 entries let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); - cache.put(&path_large, value_large); + let key_large = TableScopedPath { + table: table_ref, + path: path_large, + }; + cache.put(&key_large, value_large); // path1 and path2 should be evicted (both LRU), path3 and path_large remain assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); // Evicted - assert!(!cache.contains_key(&path2)); // Evicted - assert!(cache.contains_key(&path3)); - assert!(cache.contains_key(&path_large)); + assert!(!cache.contains_key(&key1)); // Evicted + assert!(!cache.contains_key(&key2)); // Evicted + assert!(cache.contains_key(&key3)); + assert!(cache.contains_key(&key_large)); } #[test] @@ -686,10 +790,23 @@ mod tests { let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, value2); - cache.put(&path3, value3); + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Resize cache to only fit one entry @@ -697,10 +814,10 @@ mod tests { // Should keep only the most recent entry (path3, the MRU) assert_eq!(cache.len(), 1); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key3)); // Earlier entries (LRU) should be evicted - assert!(!cache.contains_key(&path1)); - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); } #[test] @@ -711,34 +828,49 @@ mod tests { let cache = DefaultListFilesCache::new(size * 3, None); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; // Add three entries - cache.put(&path1, value1); - cache.put(&path2, Arc::clone(&value2)); - cache.put(&path3, value3_v1); + cache.put(&key1, value1); + cache.put(&key2, Arc::clone(&value2)); + cache.put(&key3, value3_v1); assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); - cache.put(&path3, value3_v2); + cache.put(&key3, value3_v2); assert_eq!(cache.len(), 3); - assert!(cache.contains_key(&path1)); - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + assert!(cache.contains_key(&key1)); + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // Update path3 with larger size that requires evicting path1 (LRU) let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); - cache.put(&path3, Arc::clone(&value3_v3)); + cache.put(&key3, Arc::clone(&value3_v3)); assert_eq!(cache.len(), 2); - assert!(!cache.contains_key(&path1)); + assert!(!cache.contains_key(&key1)); // Evicted (was LRU) + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); // List cache entries assert_eq!( cache.list_entries(), HashMap::from([ ( - path2, + key2, ListFilesEntry { metas: value2, size_bytes: size2, @@ -746,7 +878,7 @@ mod tests { } ), ( - path3, + key3, ListFilesEntry { metas: value3_v3, size_bytes: size3_v3, @@ -768,18 +900,27 @@ mod tests { let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); - cache.put(&path1, Arc::clone(&value1)); - cache.put(&path2, Arc::clone(&value2)); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref, + path: path2, + }; + cache.put(&key1, Arc::clone(&value1)); + cache.put(&key2, Arc::clone(&value2)); // Entries should be accessible immediately - assert!(cache.get(&path1).is_some()); - assert!(cache.get(&path2).is_some()); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key2).is_some()); // List cache entries assert_eq!( cache.list_entries(), HashMap::from([ ( - path1.clone(), + key1.clone(), ListFilesEntry { metas: value1, size_bytes: size1, @@ -787,7 +928,7 @@ mod tests { } ), ( - path2.clone(), + key2.clone(), ListFilesEntry { metas: value2, size_bytes: size2, @@ -800,9 +941,9 @@ mod tests { mock_time.inc(Duration::from_millis(150)); // Entries should now return None and be removed when observed through get or contains_key - assert!(cache.get(&path1).is_none()); + assert!(cache.get(&key1).is_none()); assert_eq!(cache.len(), 1); // path1 was removed by get() - assert!(!cache.contains_key(&path2)); + assert!(!cache.contains_key(&key2)); assert_eq!(cache.len(), 0); // path2 was removed by contains_key() } @@ -818,21 +959,34 @@ mod tests { let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + let key3 = TableScopedPath { + table: table_ref, + path: path3, + }; + cache.put(&key1, value1); mock_time.inc(Duration::from_millis(50)); - cache.put(&path2, value2); + cache.put(&key2, value2); mock_time.inc(Duration::from_millis(50)); // path3 should evict path1 due to size limit - cache.put(&path3, value3); - assert!(!cache.contains_key(&path1)); // Evicted by LRU - assert!(cache.contains_key(&path2)); - assert!(cache.contains_key(&path3)); + cache.put(&key3, value3); + assert!(!cache.contains_key(&key1)); // Evicted by LRU + assert!(cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); mock_time.inc(Duration::from_millis(151)); - assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(!cache.contains_key(&key2)); // Expired + assert!(cache.contains_key(&key3)); // Still valid } #[test] @@ -918,7 +1072,12 @@ mod tests { // Add entry and verify memory tracking let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); - cache.put(&path1, value1); + let table_ref = Some(TableReference::from("table")); + let key1 = TableScopedPath { + table: table_ref.clone(), + path: path1, + }; + cache.put(&key1, value1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1); @@ -926,14 +1085,18 @@ mod tests { // Add another entry let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); - cache.put(&path2, value2); + let key2 = TableScopedPath { + table: table_ref.clone(), + path: path2, + }; + cache.put(&key2, value2); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size1 + size2); } // Remove first entry and verify memory decreases - cache.remove(&path1); + cache.remove(&key1); { let state = cache.state.lock().unwrap(); assert_eq!(state.memory_used, size2); @@ -977,12 +1140,17 @@ mod tests { ]); // Cache the full table listing - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=1 using get_with_extra // New API: get_with_extra(table_base, Some(relative_prefix)) let prefix_a1 = Some(Path::from("a=1")); - let result = cache.get_with_extra(&table_base, &prefix_a1); + let result = cache.get_with_extra(&key, &prefix_a1); // Should return filtered results (only files from a=1) assert!(result.is_some()); @@ -996,7 +1164,7 @@ mod tests { // Query for partition a=2 let prefix_a2 = Some(Path::from("a=2")); - let result_2 = cache.get_with_extra(&table_base, &prefix_a2); + let result_2 = cache.get_with_extra(&key, &prefix_a2); assert!(result_2.is_some()); let filtered_2 = result_2.unwrap(); @@ -1022,16 +1190,21 @@ mod tests { create_object_meta_with_path("my_table/a=2/file3.parquet"), create_object_meta_with_path("my_table/a=2/file4.parquet"), ]); - cache.put(&table_base, full_files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, full_files); // Query with no prefix filter (None) should return all 4 files - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_some()); let files = result.unwrap(); assert_eq!(files.len(), 4); // Also test using get() which delegates to get_with_extra(&None) - let result_get = cache.get(&table_base); + let result_get = cache.get(&key); assert!(result_get.is_some()); assert_eq!(result_get.unwrap().len(), 4); } @@ -1042,14 +1215,19 @@ mod tests { let cache = DefaultListFilesCache::new(100000, None); let table_base = Path::from("my_table"); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; // Query for full table should miss (nothing cached) - let result = cache.get_with_extra(&table_base, &None); + let result = cache.get_with_extra(&key, &None); assert!(result.is_none()); // Query with prefix should also miss let prefix = Some(Path::from("a=1")); - let result_2 = cache.get_with_extra(&table_base, &prefix); + let result_2 = cache.get_with_extra(&key, &prefix); assert!(result_2.is_none()); } @@ -1063,11 +1241,16 @@ mod tests { create_object_meta_with_path("my_table/a=1/file1.parquet"), create_object_meta_with_path("my_table/a=2/file2.parquet"), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for partition a=3 which doesn't exist let prefix_a3 = Some(Path::from("a=3")); - let result = cache.get_with_extra(&table_base, &prefix_a3); + let result = cache.get_with_extra(&key, &prefix_a3); // Should return None since no files match assert!(result.is_none()); @@ -1093,23 +1276,28 @@ mod tests { "events/year=2025/month=01/day=01/file4.parquet", ), ]); - cache.put(&table_base, files); + let table_ref = Some(TableReference::from("table")); + let key = TableScopedPath { + table: table_ref, + path: table_base, + }; + cache.put(&key, files); // Query for year=2024/month=01 (should get 2 files) let prefix_month = Some(Path::from("year=2024/month=01")); - let result = cache.get_with_extra(&table_base, &prefix_month); + let result = cache.get_with_extra(&key, &prefix_month); assert!(result.is_some()); assert_eq!(result.unwrap().len(), 2); // Query for year=2024 (should get 3 files) let prefix_year = Some(Path::from("year=2024")); - let result_year = cache.get_with_extra(&table_base, &prefix_year); + let result_year = cache.get_with_extra(&key, &prefix_year); assert!(result_year.is_some()); assert_eq!(result_year.unwrap().len(), 3); // Query for specific day (should get 1 file) let prefix_day = Some(Path::from("year=2024/month=01/day=01")); - let result_day = cache.get_with_extra(&table_base, &prefix_day); + let result_day = cache.get_with_extra(&key, &prefix_day); assert!(result_day.is_some()); assert_eq!(result_day.unwrap().len(), 1); } @@ -1130,18 +1318,63 @@ mod tests { create_object_meta_with_path("table_b/part=2/file2.parquet"), ]); - cache.put(&table_a, files_a); - cache.put(&table_b, files_b); + let table_ref_a = Some(TableReference::from("table_a")); + let table_ref_b = Some(TableReference::from("table_b")); + let key_a = TableScopedPath { + table: table_ref_a, + path: table_a, + }; + let key_b = TableScopedPath { + table: table_ref_b, + path: table_b, + }; + cache.put(&key_a, files_a); + cache.put(&key_b, files_b); // Query table_a should only return table_a files - let result_a = cache.get(&table_a); + let result_a = cache.get(&key_a); assert!(result_a.is_some()); assert_eq!(result_a.unwrap().len(), 1); // Query table_b with prefix should only return matching table_b files let prefix = Some(Path::from("part=1")); - let result_b = cache.get_with_extra(&table_b, &prefix); + let result_b = cache.get_with_extra(&key_b, &prefix); assert!(result_b.is_some()); assert_eq!(result_b.unwrap().len(), 1); } + + #[test] + fn test_drop_table_entries() { + let cache = DefaultListFilesCache::default(); + + let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100); + let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + + let table_ref1 = Some(TableReference::from("table1")); + let key1 = TableScopedPath { + table: table_ref1.clone(), + path: path1, + }; + let key2 = TableScopedPath { + table: table_ref1.clone(), + path: path2, + }; + + let table_ref2 = Some(TableReference::from("table2")); + let key3 = TableScopedPath { + table: table_ref2.clone(), + path: path3, + }; + + cache.put(&key1, value1); + cache.put(&key2, value2); + cache.put(&key3, value3); + + cache.drop_table_entries(&table_ref1).unwrap(); + + assert!(!cache.contains_key(&key1)); + assert!(!cache.contains_key(&key2)); + assert!(cache.contains_key(&key3)); + } } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 8172069fdbab..93b9f0520b2a 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -24,6 +24,7 @@ mod list_files_cache; pub use file_metadata_cache::DefaultFilesMetadataCache; pub use list_files_cache::DefaultListFilesCache; +pub use list_files_cache::TableScopedPath; /// A trait that can be implemented to provide custom cache behavior for the caches managed by /// [`cache_manager::CacheManager`]. diff --git a/docs/source/user-guide/cli/functions.md b/docs/source/user-guide/cli/functions.md index 11f61297ac8d..ea353d5c8dcc 100644 --- a/docs/source/user-guide/cli/functions.md +++ b/docs/source/user-guide/cli/functions.md @@ -172,41 +172,53 @@ The columns of the returned table are: ## `list_files_cache` -The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. +The `list_files_cache` function shows information about the `ListFilesCache` that is used by the [`ListingTable`] implementation in DataFusion. When creating a [`ListingTable`], DataFusion lists the files in the table's location and caches results in the `ListFilesCache`. Subsequent queries against the same table can reuse this cached information instead of re-listing the files. Cache entries are scoped to tables. You can inspect the cache by querying the `list_files_cache` function. For example, ```sql -> select split_part(path, '/', -1) as folder, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache(); -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ -| folder | metadata_size_bytes | expires_in | file_size_bytes | e_tag | -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1233969 | 7041136-643a7bfeeec9b-12d431 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1234756 | 7041137-643a7bfeef2df-12d744 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232554 | 7041139-643a7bfeef86a-12ceaa | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1238676 | 704113a-643a7bfeef914-12e694 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1232186 | 704113b-643a7bfeefb22-12cd3a | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1237506 | 7041138-643a7bfeef775-12e202 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228756 | 7041134-643a7bfeec2d8-12bfd4 | -| customer | 1592 | 0 days 0 hours 0 mins 18.488 secs | 1228509 | 7041135-643a7bfeed599-12bedd | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20124715 | 704114a-643a7c00bb560-133142b | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20131024 | 7041149-643a7c00b90b7-1332cd0 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20179217 | 704114b-643a7c00bb93e-133e911 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20296819 | 704114f-643a7c00ccefd-135b473 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20110730 | 7041148-643a7c00b9832-132dd8a | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20128346 | 704114c-643a7c00bc00a-133225a | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20130133 | 7041147-643a7c00b3901-1332955 | -| lineitem | 1600 | 0 days 0 hours 0 mins 16.758 secs | 20139830 | 7041146-643a7c00abbe8-1334f36 | -+----------+---------------------+-----------------------------------+-----------------+-------------------------------+ +> set datafusion.runtime.list_files_cache_ttl = "30s"; +> create external table overturemaps +stored as parquet +location 's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/type=infrastructure'; +0 row(s) fetched. +> select table, path, metadata_size_bytes, expires_in, unnest(metadata_list)['file_size_bytes'] as file_size_bytes, unnest(metadata_list)['e_tag'] as e_tag from list_files_cache() limit 10; ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| table | path | metadata_size_bytes | expires_in | file_size_bytes | e_tag | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 999055952 | "35fc8fbe8400960b54c66fbb408c48e8-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 975592768 | "8a16e10b722681cdc00242564b502965-59" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1082925747 | "24cd13ddb5e0e438952d2499f5dabe06-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1008425557 | "37663e31c7c64d4ef355882bcd47e361-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1065561905 | "4e7c50d2d1b3c5ed7b82b4898f5ac332-64" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1045655427 | "8fff7e6a72d375eba668727c55d4f103-63" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1086822683 | "b67167d8022d778936c330a52a5f1922-65" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1016732378 | "6d70857a0473ed9ed3fc6e149814168b-61" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 991363784 | "c9cafb42fcbb413f851691c895dd7c2b-60" | +| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750 | 0 days 0 hours 0 mins 25.264 secs | 1032469715 | "7540252d0d67158297a67038a3365e0f-62" | ++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+ ``` The columns of the returned table are: | column_name | data_type | Description | | ------------------- | ------------ | ----------------------------------------------------------------------------------------- | +| table | Utf8 | Name of the table | | path | Utf8 | File path relative to the object store / filesystem root | | metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not its thrift encoded form) | | expires_in | Duration(ms) | Last modified time of the file | | metadata_list | List(Struct) | List of metadatas, one for each file under the path. | +A metadata struct in the metadata_list contains the following fields: + +```text +{ + "file_path": "release/2025-12-17.0/theme=base/type=infrastructure/part-00000-d556e455-e0c5-4940-b367-daff3287a952-c000.zstd.parquet", + "file_modified": "2025-12-17T22:20:29", + "file_size_bytes": 999055952, + "e_tag": "35fc8fbe8400960b54c66fbb408c48e8-60", + "version": null +} +``` + [`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag