From 285ed357004ac8480a85bd8850634d50562b56db Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Fri, 5 Dec 2025 12:51:56 +0000 Subject: [PATCH 1/7] add config options for list_files_cache_limit and list_files_cache_ttl --- datafusion/core/src/execution/context/mod.rs | 44 ++++++++- datafusion/core/tests/sql/runtime_config.rs | 92 +++++++++++++++++++ .../execution/src/cache/cache_manager.rs | 24 ++++- .../execution/src/cache/list_files_cache.rs | 15 ++- datafusion/execution/src/runtime_env.rs | 2 +- 5 files changed, 167 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 8849615927fba..6b27f3ae0c1e9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Weak}; +use std::time::Duration; use super::options::ReadOptions; use crate::datasource::dynamic_file::DynamicListTableFactory; @@ -72,7 +73,10 @@ use datafusion_common::{ tree_node::{TreeNodeRecursion, TreeNodeVisitor}, DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference, }; -use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT; +use datafusion_execution::cache::cache_manager::{ + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, + DEFAULT_METADATA_CACHE_LIMIT, +}; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::{ DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE, @@ -1161,6 +1165,14 @@ impl SessionContext { let limit = Self::parse_memory_limit(value)?; builder.with_metadata_cache_limit(limit) } + "list_files_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_object_list_cache_limit(limit) + } + "list_files_cache_ttl" => { + let duration = Self::parse_duration(value)?; + builder.with_object_list_cache_ttl(Some(duration)) + } _ => return plan_err!("Unknown runtime configuration: {variable}"), // Remember to update `reset_runtime_variable()` when adding new options }; @@ -1192,6 +1204,14 @@ impl SessionContext { "metadata_cache_limit" => { builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT); } + "list_files_cache_limit" => { + builder = builder + .with_object_list_cache_limit(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); + } + "list_files_cache_ttl" => { + builder = + builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL); + } _ => return plan_err!("Unknown runtime configuration: {variable}"), }; @@ -1232,6 +1252,28 @@ impl SessionContext { } } + fn parse_duration(duration: &str) -> Result { + let mut minutes = None; + let mut seconds = None; + + for duration in duration.split_inclusive(&['m', 's']) { + let (number, unit) = duration.split_at(duration.len() - 1); + let number: u64 = number.parse().map_err(|_| { + plan_datafusion_err!("Failed to parse number from duration '{duration}'") + })?; + + match unit { + "m" if minutes.is_none() && seconds.is_none() => minutes = Some(number), + "s" if seconds.is_none() => seconds = Some(number), + _ => plan_err!("Invalid duration, unit must be either 'm' (minutes), or 's' (seconds), and be in the correct order")?, + } + } + + Ok(Duration::from_secs( + minutes.unwrap_or_default() * 60 + seconds.unwrap_or_default(), + )) + } + async fn create_custom_table( &self, cmd: &CreateExternalTable, diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 9627d7bccdb04..d6dc6983998d8 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -18,9 +18,14 @@ //! Tests for runtime configuration SQL interface use std::sync::Arc; +use std::time::Duration; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; +use datafusion::prelude::SessionConfig; +use datafusion_execution::cache::cache_manager::CacheManagerConfig; +use datafusion_execution::cache::DefaultListFilesCache; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; #[tokio::test] @@ -233,6 +238,93 @@ async fn test_test_metadata_cache_limit() { assert_eq!(get_limit(&ctx), 123 * 1024); } +#[tokio::test] +async fn test_list_files_cache_limit() { + let list_files_cache = Arc::new(DefaultListFilesCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.list_files_cache_limit = '{limit}'").as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> usize { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_list_files_cache() + .unwrap() + .cache_limit() + }; + + update_limit(&ctx, "100M").await; + assert_eq!(get_limit(&ctx), 100 * 1024 * 1024); + + update_limit(&ctx, "2G").await; + assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024); + + update_limit(&ctx, "123K").await; + assert_eq!(get_limit(&ctx), 123 * 1024); +} + +#[tokio::test] +async fn test_list_files_cache_ttl() { + let list_files_cache = Arc::new(DefaultListFilesCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default().with_list_files_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.list_files_cache_ttl = '{limit}'").as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> Duration { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_list_files_cache() + .unwrap() + .cache_ttl() + .unwrap() + }; + + update_limit(&ctx, "1m").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(60)); + + update_limit(&ctx, "30s").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(30)); + + update_limit(&ctx, "1m30s").await; + assert_eq!(get_limit(&ctx), Duration::from_secs(90)); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 0eab4a60318eb..cbdecc90db326 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -26,7 +26,9 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Duration; -use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT; +pub use super::list_files_cache::{ + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, +}; /// A cache for [`Statistics`]. /// @@ -56,6 +58,9 @@ pub trait ListFilesCache: /// Updates the cache with a new memory limit in bytes. fn update_cache_limit(&self, limit: usize); + + /// Updates the cache with a new TTL (time-to-live). + fn update_cache_ttl(&self, ttl: Option); } /// Generic file-embedded metadata used with [`FileMetadataCache`]. @@ -153,7 +158,16 @@ impl CacheManager { let file_statistic_cache = config.table_files_statistics_cache.as_ref().map(Arc::clone); - let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone); + let list_files_cache = config + .list_files_cache + .as_ref() + .and_then(|c| { + // the cache memory limit or ttl might have changed, ensure they are updated + c.update_cache_limit(config.list_files_cache_limit); + c.update_cache_ttl(config.list_files_cache_ttl); + Some(c) + }) + .map(Arc::clone); let file_metadata_cache = config .file_metadata_cache @@ -242,7 +256,7 @@ impl Default for CacheManagerConfig { table_files_statistics_cache: Default::default(), list_files_cache: Default::default(), list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, - list_files_cache_ttl: None, + list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL, file_metadata_cache: Default::default(), metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT, } @@ -283,8 +297,8 @@ impl CacheManagerConfig { /// Sets the TTL (time-to-live) for entries in the list files cache. /// /// Default: None (infinite). - pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self { - self.list_files_cache_ttl = Some(ttl); + pub fn with_list_files_cache_ttl(mut self, ttl: Option) -> Self { + self.list_files_cache_ttl = ttl; self } diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 944cfca8ea2f8..683d87de5b414 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -110,7 +110,10 @@ fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { } /// The default memory limit for the [`DefaultListFilesCache`] -pub(super) const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB +pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +/// The default cache TTL for the [`DefaultListFilesCache`] +pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite /// Handles the inner state of the [`DefaultListFilesCache`] struct. pub struct DefaultListFilesCacheState { @@ -126,7 +129,7 @@ impl Default for DefaultListFilesCacheState { lru_queue: LruQueue::new(), memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, memory_used: 0, - ttl: None, + ttl: DEFAULT_LIST_FILES_CACHE_TTL, } } } @@ -243,7 +246,7 @@ impl DefaultListFilesCacheState { impl ListFilesCache for DefaultListFilesCache { fn cache_limit(&self) -> usize { let state = self.state.lock().unwrap(); - state.memory_limit + dbg!(state.memory_limit) } fn cache_ttl(&self) -> Option { @@ -256,6 +259,12 @@ impl ListFilesCache for DefaultListFilesCache { state.memory_limit = limit; state.evict_entries(); } + + fn update_cache_ttl(&self, ttl: Option) { + let mut state = self.state.lock().unwrap(); + state.ttl = ttl; + state.evict_entries(); + } } impl CacheAccessor>> for DefaultListFilesCache { diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 6613018765924..a7356b6a9f75a 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -394,7 +394,7 @@ impl RuntimeEnvBuilder { } /// Specifies the duration entries in the object list cache will be considered valid. - pub fn with_object_list_cache_ttl(mut self, ttl: Duration) -> Self { + pub fn with_object_list_cache_ttl(mut self, ttl: Option) -> Self { self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl); self } From 0b0750bbbb6867eebf18a2c0d2fb7d53c580d4ba Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Fri, 5 Dec 2025 13:18:28 +0000 Subject: [PATCH 2/7] clippy --- datafusion/execution/src/cache/cache_manager.rs | 3 +-- datafusion/execution/src/cache/list_files_cache.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index cbdecc90db326..05d7e73501ec1 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -161,11 +161,10 @@ impl CacheManager { let list_files_cache = config .list_files_cache .as_ref() - .and_then(|c| { + .inspect(|c| { // the cache memory limit or ttl might have changed, ensure they are updated c.update_cache_limit(config.list_files_cache_limit); c.update_cache_ttl(config.list_files_cache_ttl); - Some(c) }) .map(Arc::clone); diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 683d87de5b414..912cc09a4c26b 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -246,7 +246,7 @@ impl DefaultListFilesCacheState { impl ListFilesCache for DefaultListFilesCache { fn cache_limit(&self) -> usize { let state = self.state.lock().unwrap(); - dbg!(state.memory_limit) + state.memory_limit } fn cache_ttl(&self) -> Option { From 8289b58b94ce6b7134083a1272a8b628b4587d97 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Fri, 5 Dec 2025 16:09:26 +0000 Subject: [PATCH 3/7] add sqllogictests --- datafusion/sqllogictest/test_files/set_variable.slt | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 82bd71d72b9e5..d3a860d8f6557 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -351,6 +351,18 @@ RESET datafusion.runtime.memory_limit statement ok EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +statement ok +SET datafusion.runtime.list_files_cache_limit = '1K' + +statement ok +RESET datafusion.runtime.list_files_cache_limit + +statement ok +SET datafusion.runtime.list_files_cache_ttl = '1m' + +statement ok +RESET datafusion.runtime.list_files_cache_ttl + # reset invalid variable - typo in namespace statement error DataFusion error: Invalid or Unsupported Configuration: Could not find config namespace "dataexplosion" RESET dataexplosion.execution.batch_size From 10b5d63492f661a4917b26e313e515ab556c6aea Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sat, 6 Dec 2025 11:35:31 +0000 Subject: [PATCH 4/7] add config entries for list_files_cache runtime options --- datafusion/execution/src/runtime_env.rs | 36 +++++++++++++++++++ .../sqllogictest/test_files/set_variable.slt | 2 ++ docs/source/user-guide/configs.md | 2 ++ 3 files changed, 40 insertions(+) diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index a7356b6a9f75a..5e8bf38b3bee8 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -101,6 +101,8 @@ fn create_runtime_config_entries( max_temp_directory_size: Option, temp_directory: Option, metadata_cache_limit: Option, + list_files_cache_limit: Option, + list_files_cache_ttl: Option, ) -> Vec { vec![ ConfigEntry { @@ -123,6 +125,16 @@ fn create_runtime_config_entries( value: metadata_cache_limit, description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", }, + ConfigEntry { + key: "datafusion.runtime.list_files_cache_limit".to_string(), + value: list_files_cache_limit, + description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.list_files_cache_ttl".to_string(), + value: list_files_cache_ttl, + description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.", + }, ] } @@ -227,6 +239,14 @@ impl RuntimeEnv { } } + fn format_duration(duration: Duration) -> String { + let total = duration.as_secs(); + let mins = total / 60; + let secs = total % 60; + + format!("{mins}m{secs}s") + } + let memory_limit_value = match self.memory_pool.memory_limit() { MemoryLimit::Finite(size) => Some(format_byte_size( size.try_into() @@ -259,11 +279,25 @@ impl RuntimeEnv { .expect("Metadata cache size conversion failed"), ); + let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit(); + let list_files_cache_value = format_byte_size( + list_files_cache_limit + .try_into() + .expect("List files cache size conversion failed"), + ); + + let list_files_cache_ttl = self + .cache_manager + .get_list_files_cache_ttl() + .map(format_duration); + create_runtime_config_entries( memory_limit_value, Some(max_temp_dir_value), temp_dir_value, Some(metadata_cache_value), + Some(list_files_cache_value), + list_files_cache_ttl, ) } } @@ -473,6 +507,8 @@ impl RuntimeEnvBuilder { Some("100G".to_string()), None, Some("50M".to_owned()), + Some("1M".to_owned()), + None, ) } diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index d3a860d8f6557..8957404799b73 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -423,6 +423,8 @@ datafusion.runtime.metadata_cache_limit 200M query T SELECT name FROM information_schema.df_settings WHERE name LIKE 'datafusion.runtime.%' ORDER BY name ---- +datafusion.runtime.list_files_cache_limit +datafusion.runtime.list_files_cache_ttl datafusion.runtime.max_temp_directory_size datafusion.runtime.memory_limit datafusion.runtime.metadata_cache_limit diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 77d6ff8be97ed..bb9b95f4cdfad 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -199,6 +199,8 @@ The following runtime configuration settings are available: | key | default | description | | ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | | datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | | datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | | datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | From 1af1720f1e101bb8bcb6c488dd9cc1739556afb1 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sat, 6 Dec 2025 11:57:37 +0000 Subject: [PATCH 5/7] fix sqllogictests for information_schema --- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5e478de0416ce..6920f6a105462 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -316,6 +316,8 @@ datafusion.optimizer.repartition_sorts true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.top_down_join_key_reordering true +datafusion.runtime.list_files_cache_limit 1M +datafusion.runtime.list_files_cache_ttl NULL datafusion.runtime.max_temp_directory_size 100G datafusion.runtime.memory_limit unlimited datafusion.runtime.metadata_cache_limit 50M @@ -444,6 +446,8 @@ datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a datafusion.optimizer.repartition_windows true Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys +datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. +datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.memory_limit unlimited Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.metadata_cache_limit 50M Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. From eaff9b59a42fb0f9d97c2ba9045728b4a504bf65 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Mon, 8 Dec 2025 20:25:02 +0000 Subject: [PATCH 6/7] ensure duration is greater than 0 seconds --- datafusion/core/src/execution/context/mod.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 6b27f3ae0c1e9..4d9ae253ed522 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1269,9 +1269,15 @@ impl SessionContext { } } - Ok(Duration::from_secs( + let duration = Duration::from_secs( minutes.unwrap_or_default() * 60 + seconds.unwrap_or_default(), - )) + ); + + if duration.is_zero() { + return plan_err!("Duration must be greater than 0 seconds"); + } + + Ok(duration) } async fn create_custom_table( From 2f71c706706f3f63ce9c646eebe111ca447aa666 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 14 Dec 2025 13:39:46 +0000 Subject: [PATCH 7/7] add test cases for parse_duration --- datafusion/core/src/execution/context/mod.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5fc7ff5f00c5b..5bba78e44d271 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2721,4 +2721,24 @@ mod tests { Ok(()) } + + #[test] + fn test_parse_duration() { + // Valid durations + for (duration, want) in [ + ("1s", Duration::from_secs(1)), + ("1m", Duration::from_secs(60)), + ("1m0s", Duration::from_secs(60)), + ("1m1s", Duration::from_secs(61)), + ] { + let have = SessionContext::parse_duration(duration).unwrap(); + assert_eq!(want, have); + } + + // Invalid durations + for duration in ["0s", "0m", "1s0m", "1s1m"] { + let have = SessionContext::parse_duration(duration); + assert!(have.is_err()); + } + } }