From a951430c750563b0d842460dc0ece98e9cd0d825 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 16:18:16 +0100 Subject: [PATCH 1/7] add temp_dir runtime config variable --- datafusion/core/src/execution/context/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ea8850d3b66cc..2809c951128a2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1067,6 +1067,14 @@ impl SessionContext { .with_runtime_env(Arc::new(builder.build()?)) .build(); } + "temp_dir" => { + let mut state = self.state.write(); + let builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()) + .with_temp_file_path(value); + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); + } _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" From 04b4ce4c5420032a7f5e60bedfd9884ff7edf925 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 16:23:41 +0100 Subject: [PATCH 2/7] rename temp_dir to temp_directory --- datafusion/core/src/execution/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 2809c951128a2..5ade8c0f4e7a1 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1067,7 +1067,7 @@ impl SessionContext { .with_runtime_env(Arc::new(builder.build()?)) .build(); } - "temp_dir" => { + "temp_directory" => { let mut state = self.state.write(); let builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()) .with_temp_file_path(value); From 0a7003b6d4d19f67742b342bb5044827dd0bfde6 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 16:56:29 +0100 Subject: [PATCH 3/7] add max_temp_directory_size config option --- datafusion/core/src/execution/context/mod.rs | 9 ++++ datafusion/core/tests/sql/runtime_config.rs | 46 ++++++++++++++++++++ datafusion/execution/src/runtime_env.rs | 12 +++-- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5ade8c0f4e7a1..9fffe14b1e846 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1075,6 +1075,15 @@ impl SessionContext { .with_runtime_env(Arc::new(builder.build()?)) .build(); } + "max_temp_directory_size" => { + let mut state = self.state.write(); + let directory_size = Self::parse_memory_limit(value)?; + let builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()) + .with_max_temp_directory_size(directory_size as u64); + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); + } _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 18e07bb61ed94..477cc3371e0cf 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; +use datafusion_common::DataFusionError; use datafusion_physical_plan::common::collect; #[tokio::test] @@ -152,6 +153,51 @@ async fn test_invalid_memory_limit() { assert!(error_message.contains("Unsupported unit 'X'")); } +#[tokio::test] +async fn test_max_temp_directory_enforcement() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.runtime.max_temp_directory_size = '0K'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!( + result.is_err(), + "Should fail due to max temp directory size limit" + ); + + ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_ok(), "Should not fail due to memory limit"); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index b086430a4ef71..f31d2136815de 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -255,13 +255,19 @@ impl RuntimeEnvBuilder { } /// Use the specified path to create any needed temporary files - pub fn with_temp_file_path(self, path: impl Into) -> Self { + pub fn with_temp_file_path(mut self, path: impl Into) -> Self { + let builder = self.disk_manager_builder.take().unwrap_or_default(); self.with_disk_manager_builder( - DiskManagerBuilder::default() - .with_mode(DiskManagerMode::Directories(vec![path.into()])), + builder.with_mode(DiskManagerMode::Directories(vec![path.into()])), ) } + /// Specify a limit on the size of the temporary file directory in bytes + pub fn with_max_temp_directory_size(mut self, size: u64) -> Self { + let builder = self.disk_manager_builder.take().unwrap_or_default(); + self.with_disk_manager_builder(builder.with_max_temp_directory_size(size)) + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { From d3e41478de6281d761fbc084cd0a3827e7d6484b Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 17:00:28 +0100 Subject: [PATCH 4/7] tidy up --- datafusion/core/src/execution/context/mod.rs | 37 +++++++------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9fffe14b1e846..32231e583fb81 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1055,41 +1055,30 @@ impl SessionContext { fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> { let key = variable.strip_prefix("datafusion.runtime.").unwrap(); - match key { + let mut state = self.state.write(); + + let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); + builder = match key { "memory_limit" => { let memory_limit = Self::parse_memory_limit(value)?; - - let mut state = self.state.write(); - let mut builder = - RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); - builder = builder.with_memory_limit(memory_limit, 1.0); - *state = SessionStateBuilder::from(state.clone()) - .with_runtime_env(Arc::new(builder.build()?)) - .build(); - } - "temp_directory" => { - let mut state = self.state.write(); - let builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()) - .with_temp_file_path(value); - *state = SessionStateBuilder::from(state.clone()) - .with_runtime_env(Arc::new(builder.build()?)) - .build(); + builder.with_memory_limit(memory_limit, 1.0) } "max_temp_directory_size" => { - let mut state = self.state.write(); let directory_size = Self::parse_memory_limit(value)?; - let builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()) - .with_max_temp_directory_size(directory_size as u64); - *state = SessionStateBuilder::from(state.clone()) - .with_runtime_env(Arc::new(builder.build()?)) - .build(); + builder.with_max_temp_directory_size(directory_size as u64) } + "temp_directory" => builder.with_temp_file_path(value), _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" ))) } - } + }; + + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); + Ok(()) } From 999a4686f09090a28e3210404a9c718294d2376c Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 17:39:48 +0100 Subject: [PATCH 5/7] update test for acceptable max temp directory size --- datafusion/core/tests/sql/runtime_config.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 477cc3371e0cf..966a7a0aabee3 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -186,7 +186,7 @@ async fn test_max_temp_directory_enforcement() { "Should fail due to max temp directory size limit" ); - ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + ctx.sql("SET datafusion.runtime.max_temp_directory_size = '1M'") .await .unwrap() .collect() @@ -195,7 +195,10 @@ async fn test_max_temp_directory_enforcement() { let result = ctx.sql(query).await.unwrap().collect().await; - assert!(result.is_ok(), "Should not fail due to memory limit"); + assert!( + result.is_ok(), + "Should not fail due to max temp directory size limit" + ); } #[tokio::test] From 8df221fa9364501dadeebdff0094b8e0c681a617 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 20:16:48 +0100 Subject: [PATCH 6/7] add new runtime config entries to docs --- datafusion/core/tests/sql/runtime_config.rs | 1 - datafusion/execution/src/runtime_env.rs | 23 +++++++++++++++------ docs/source/user-guide/configs.md | 8 ++++--- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 966a7a0aabee3..ccdcf49576467 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; -use datafusion_common::DataFusionError; use datafusion_physical_plan::common::collect; #[tokio::test] diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index f31d2136815de..70b0f0a831704 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -321,12 +321,23 @@ impl RuntimeEnvBuilder { /// Returns a list of all available runtime configurations with their current values and descriptions pub fn entries(&self) -> Vec { - // Memory pool configuration - vec![ConfigEntry { - key: "datafusion.runtime.memory_limit".to_string(), - value: None, // Default is system-dependent - description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", - }] + vec![ + ConfigEntry { + key: "datafusion.runtime.memory_limit".to_string(), + value: None, // Default is system-dependent + description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.max_temp_directory_size".to_string(), + value: Some("100G".to_string()), + description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, + ConfigEntry { + key: "datafusion.runtime.temp_directory".to_string(), + value: None, // Default is system-dependent + description: "The path to the temporary file directory.", + } + ] } /// Generate documentation that can be included in the user guide diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96b7ee672bdb6..890184ed291f6 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -158,6 +158,8 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| key | default | description | +| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | From 1db685d956f74027769666ab0eb39dfa0bdfca47 Mon Sep 17 00:00:00 2001 From: Subhan <68732277+delamarch3@users.noreply.github.com> Date: Sun, 27 Jul 2025 20:24:49 +0100 Subject: [PATCH 7/7] update test name --- datafusion/core/tests/sql/runtime_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index ccdcf49576467..b05c36e335f3d 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -153,7 +153,7 @@ async fn test_invalid_memory_limit() { } #[tokio::test] -async fn test_max_temp_directory_enforcement() { +async fn test_max_temp_directory_size_enforcement() { let ctx = SessionContext::new(); ctx.sql("SET datafusion.runtime.memory_limit = '1M'")