diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 878094950808a..6627a287dfcd4 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -19,7 +19,7 @@ use std::{num::NonZeroUsize, sync::Arc}; use datafusion::{ execution::{ - disk_manager::DiskManagerConfig, + disk_manager::DiskManagerBuilder, memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool}, runtime_env::RuntimeEnvBuilder, }, @@ -110,7 +110,7 @@ impl CommonOpt { }; rt_builder = rt_builder .with_memory_pool(pool) - .with_disk_manager(DiskManagerConfig::NewOs); + .with_disk_manager_builder(DiskManagerBuilder::default()); } Ok(rt_builder) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 091b177d0819a..fdecb185e33e4 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -28,7 +28,6 @@ use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::execution::DiskManager; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::ParquetMetadataFunc; @@ -43,7 +42,7 @@ use datafusion_cli::{ use clap::Parser; use datafusion::common::config_err; use datafusion::config::ConfigOptions; -use datafusion::execution::disk_manager::DiskManagerConfig; +use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode}; use mimalloc::MiMalloc; #[global_allocator] @@ -200,15 +199,10 @@ async fn main_inner() -> Result<()> { // set disk limit if let Some(disk_limit) = args.disk_limit { - let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - - DiskManager::set_arc_max_temp_directory_size( - &mut disk_manager, - disk_limit.try_into().unwrap(), - )?; - - let disk_config = DiskManagerConfig::new_existing(disk_manager); - rt_builder = rt_builder.with_disk_manager(disk_config); + let builder = DiskManagerBuilder::default() + .with_mode(DiskManagerMode::OsTmpDirectory) + .with_max_temp_directory_size(disk_limit.try_into().unwrap()); + rt_builder = rt_builder.with_disk_manager_builder(builder); } let runtime_env = rt_builder.build_arc()?; diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 464d65acc2ee6..d2d3a5e0c22fa 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -25,6 +25,7 @@ use arrow_schema::SchemaRef; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{instant::Instant, Result}; +use datafusion_execution::disk_manager::DiskManagerBuilder; use datafusion_execution::memory_pool::{ human_readable_size, MemoryPool, UnboundedMemoryPool, }; @@ -32,10 +33,7 @@ use datafusion_expr::display_schema; use datafusion_physical_plan::spill::get_record_batch_memory_size; use std::time::Duration; -use datafusion_execution::{ - disk_manager::DiskManagerConfig, memory_pool::FairSpillPool, - runtime_env::RuntimeEnvBuilder, -}; +use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}; use rand::prelude::IndexedRandom; use rand::Rng; use rand::{rngs::StdRng, SeedableRng}; @@ -548,7 +546,7 @@ impl SortFuzzerTestGenerator { let runtime = RuntimeEnvBuilder::new() .with_memory_pool(memory_pool) - .with_disk_manager(DiskManagerConfig::NewOs) + .with_disk_manager_builder(DiskManagerBuilder::default()) .build_arc()?; let ctx = SessionContext::new_with_config_rt(config, runtime); diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 3b3ac7bdfabea..7695cc0969d87 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,7 +31,6 @@ use datafusion::assert_batches_eq; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -41,11 +40,12 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_catalog::streaming::StreamingTable; use datafusion_catalog::Session; use datafusion_common::{assert_contains, Result}; +use datafusion_execution::disk_manager::{DiskManagerBuilder, DiskManagerMode}; use datafusion_execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::{DiskManager, TaskContext}; +use datafusion_execution::TaskContext; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; @@ -204,7 +204,7 @@ async fn sort_merge_join_spill() { ) .with_memory_limit(1_000) .with_config(config) - .with_disk_manager_config(DiskManagerConfig::NewOs) + .with_disk_manager_builder(DiskManagerBuilder::default()) .with_scenario(Scenario::AccessLogStreaming) .run() .await @@ -288,7 +288,7 @@ async fn sort_spill_reservation() { .with_memory_limit(mem_limit) // use a single partition so only a sort is needed .with_scenario(scenario) - .with_disk_manager_config(DiskManagerConfig::NewOs) + .with_disk_manager_builder(DiskManagerBuilder::default()) .with_expected_plan( // It is important that this plan only has a SortExec, not // also merge, so we can ensure the sort could finish @@ -550,9 +550,10 @@ async fn setup_context( disk_limit: u64, memory_pool_limit: usize, ) -> Result { - let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; - - DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?; + let disk_manager = DiskManagerBuilder::default() + .with_mode(DiskManagerMode::OsTmpDirectory) + .with_max_temp_directory_size(disk_limit) + .build()?; let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit))) @@ -561,7 +562,7 @@ async fn setup_context( let runtime = Arc::new(RuntimeEnv { memory_pool: runtime.memory_pool.clone(), - disk_manager, + disk_manager: Arc::new(disk_manager), cache_manager: runtime.cache_manager.clone(), object_store_registry: runtime.object_store_registry.clone(), }); @@ -641,7 +642,7 @@ struct TestCase { scenario: Scenario, /// How should the disk manager (that allows spilling) be /// configured? Defaults to `Disabled` - disk_manager_config: DiskManagerConfig, + disk_manager_builder: DiskManagerBuilder, /// Expected explain plan, if non-empty expected_plan: Vec, /// Is the plan expected to pass? Defaults to false @@ -657,7 +658,8 @@ impl TestCase { config: SessionConfig::new(), memory_pool: None, scenario: Scenario::AccessLog, - disk_manager_config: DiskManagerConfig::Disabled, + disk_manager_builder: DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Disabled), expected_plan: vec![], expected_success: false, } @@ -714,11 +716,11 @@ impl TestCase { /// Specify if the disk manager should be enabled. If true, /// operators that support it can spill - pub fn with_disk_manager_config( + pub fn with_disk_manager_builder( mut self, - disk_manager_config: DiskManagerConfig, + disk_manager_builder: DiskManagerBuilder, ) -> Self { - self.disk_manager_config = disk_manager_config; + self.disk_manager_builder = disk_manager_builder; self } @@ -737,7 +739,7 @@ impl TestCase { memory_pool, config, scenario, - disk_manager_config, + disk_manager_builder, expected_plan, expected_success, } = self; @@ -746,7 +748,7 @@ impl TestCase { let mut builder = RuntimeEnvBuilder::new() // disk manager setting controls the spilling - .with_disk_manager(disk_manager_config) + .with_disk_manager_builder(disk_manager_builder) .with_memory_limit(memory_limit, MEMORY_FRACTION); if let Some(pool) = memory_pool { diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 6556052a3db1b..1810601fd362a 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -32,7 +32,95 @@ use crate::memory_pool::human_readable_size; const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB +/// Builder pattern for the [DiskManager] structure +#[derive(Clone, Debug)] +pub struct DiskManagerBuilder { + /// The storage mode of the disk manager + mode: DiskManagerMode, + /// The maximum amount of data (in bytes) stored inside the temporary directories. + /// Default to 100GB + max_temp_directory_size: u64, +} + +impl Default for DiskManagerBuilder { + fn default() -> Self { + Self { + mode: DiskManagerMode::OsTmpDirectory, + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + } + } +} + +impl DiskManagerBuilder { + pub fn set_mode(&mut self, mode: DiskManagerMode) { + self.mode = mode; + } + + pub fn with_mode(mut self, mode: DiskManagerMode) -> Self { + self.set_mode(mode); + self + } + + pub fn set_max_temp_directory_size(&mut self, value: u64) { + self.max_temp_directory_size = value; + } + + pub fn with_max_temp_directory_size(mut self, value: u64) -> Self { + self.set_max_temp_directory_size(value); + self + } + + /// Create a DiskManager given the builder + pub fn build(self) -> Result { + match self.mode { + DiskManagerMode::OsTmpDirectory => Ok(DiskManager { + local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: self.max_temp_directory_size, + used_disk_space: Arc::new(AtomicU64::new(0)), + }), + DiskManagerMode::Directories(conf_dirs) => { + let local_dirs = create_local_dirs(conf_dirs)?; + debug!( + "Created local dirs {local_dirs:?} as DataFusion working directory" + ); + Ok(DiskManager { + local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: self.max_temp_directory_size, + used_disk_space: Arc::new(AtomicU64::new(0)), + }) + } + DiskManagerMode::Disabled => Ok(DiskManager { + local_dirs: Mutex::new(None), + max_temp_directory_size: self.max_temp_directory_size, + used_disk_space: Arc::new(AtomicU64::new(0)), + }), + } + } +} + +#[derive(Clone, Debug)] +pub enum DiskManagerMode { + /// Create a new [DiskManager] that creates temporary files within + /// a temporary directory chosen by the OS + OsTmpDirectory, + + /// Create a new [DiskManager] that creates temporary files within + /// the specified directories. One of the directories will be chosen + /// at random for each temporary file created. + Directories(Vec), + + /// Disable disk manager, attempts to create temporary files will error + Disabled, +} + +impl Default for DiskManagerMode { + fn default() -> Self { + Self::OsTmpDirectory + } +} + /// Configuration for temporary disk access +#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")] #[derive(Debug, Clone)] pub enum DiskManagerConfig { /// Use the provided [DiskManager] instance @@ -50,12 +138,14 @@ pub enum DiskManagerConfig { Disabled, } +#[allow(deprecated)] impl Default for DiskManagerConfig { fn default() -> Self { Self::NewOs } } +#[allow(deprecated)] impl DiskManagerConfig { /// Create temporary files in a temporary directory chosen by the OS pub fn new() -> Self { @@ -91,7 +181,14 @@ pub struct DiskManager { } impl DiskManager { + /// Creates a builder for [DiskManager] + pub fn builder() -> DiskManagerBuilder { + DiskManagerBuilder::default() + } + /// Create a DiskManager given the configuration + #[allow(deprecated)] + #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")] pub fn try_new(config: DiskManagerConfig) -> Result> { match config { DiskManagerConfig::Existing(manager) => Ok(manager), @@ -305,8 +402,7 @@ mod tests { #[test] fn lazy_temp_dir_creation() -> Result<()> { // A default configuration should not create temp files until requested - let config = DiskManagerConfig::new(); - let dm = DiskManager::try_new(config)?; + let dm = Arc::new(DiskManagerBuilder::default().build()?); assert_eq!(0, local_dir_snapshot(&dm).len()); @@ -338,11 +434,14 @@ mod tests { let local_dir2 = TempDir::new()?; let local_dir3 = TempDir::new()?; let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()]; - let config = DiskManagerConfig::new_specified( - local_dirs.iter().map(|p| p.into()).collect(), + let dm = Arc::new( + DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Directories( + local_dirs.iter().map(|p| p.into()).collect(), + )) + .build()?, ); - let dm = DiskManager::try_new(config)?; assert!(dm.tmp_files_enabled()); let actual = dm.create_tmp_file("Testing")?; @@ -354,8 +453,12 @@ mod tests { #[test] fn test_disabled_disk_manager() { - let config = DiskManagerConfig::Disabled; - let manager = DiskManager::try_new(config).unwrap(); + let manager = Arc::new( + DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Disabled) + .build() + .unwrap(), + ); assert!(!manager.tmp_files_enabled()); assert_eq!( manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(), @@ -366,11 +469,9 @@ mod tests { #[test] fn test_disk_manager_create_spill_folder() { let dir = TempDir::new().unwrap(); - let config = DiskManagerConfig::new_specified(vec![dir.path().to_owned()]); - - DiskManager::try_new(config) - .unwrap() - .create_tmp_file("Testing") + DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()])) + .build() .unwrap(); } @@ -393,8 +494,7 @@ mod tests { #[test] fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> { // Test for the case using OS arranged temporary directory - let config = DiskManagerConfig::new(); - let dm = DiskManager::try_new(config)?; + let dm = Arc::new(DiskManagerBuilder::default().build()?); let temp_file = dm.create_tmp_file("Testing")?; let temp_file_path = temp_file.path().to_owned(); assert!(temp_file_path.exists()); @@ -410,10 +510,13 @@ mod tests { let local_dir2 = TempDir::new()?; let local_dir3 = TempDir::new()?; let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()]; - let config = DiskManagerConfig::new_specified( - local_dirs.iter().map(|p| p.into()).collect(), + let dm = Arc::new( + DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Directories( + local_dirs.iter().map(|p| p.into()).collect(), + )) + .build()?, ); - let dm = DiskManager::try_new(config)?; let temp_file = dm.create_tmp_file("Testing")?; let temp_file_path = temp_file.path().to_owned(); assert!(temp_file_path.exists()); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index cb085108819eb..b086430a4ef71 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -18,8 +18,10 @@ //! Execution [`RuntimeEnv`] environment that manages access to object //! store, memory manager, disk manager. +#[allow(deprecated)] +use crate::disk_manager::DiskManagerConfig; use crate::{ - disk_manager::{DiskManager, DiskManagerConfig}, + disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode}, memory_pool::{ GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, }, @@ -170,8 +172,11 @@ pub type RuntimeConfig = RuntimeEnvBuilder; /// /// See example on [`RuntimeEnv`] pub struct RuntimeEnvBuilder { + #[allow(deprecated)] /// DiskManager to manage temporary disk file usage pub disk_manager: DiskManagerConfig, + /// DiskManager builder to manager temporary disk file usage + pub disk_manager_builder: Option, /// [`MemoryPool`] from which to allocate memory /// /// Defaults to using an [`UnboundedMemoryPool`] if `None` @@ -193,18 +198,27 @@ impl RuntimeEnvBuilder { pub fn new() -> Self { Self { disk_manager: Default::default(), + disk_manager_builder: Default::default(), memory_pool: Default::default(), cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), } } + #[allow(deprecated)] + #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")] /// Customize disk manager pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self { self.disk_manager = disk_manager; self } + /// Customize the disk manager builder + pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self { + self.disk_manager_builder = Some(disk_manager); + self + } + /// Customize memory policy pub fn with_memory_pool(mut self, memory_pool: Arc) -> Self { self.memory_pool = Some(memory_pool); @@ -242,13 +256,17 @@ impl RuntimeEnvBuilder { /// Use the specified path to create any needed temporary files pub fn with_temp_file_path(self, path: impl Into) -> Self { - self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])) + self.with_disk_manager_builder( + DiskManagerBuilder::default() + .with_mode(DiskManagerMode::Directories(vec![path.into()])), + ) } /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { disk_manager, + disk_manager_builder, memory_pool, cache_manager, object_store_registry, @@ -258,7 +276,12 @@ impl RuntimeEnvBuilder { Ok(RuntimeEnv { memory_pool, - disk_manager: DiskManager::try_new(disk_manager)?, + disk_manager: if let Some(builder) = disk_manager_builder { + Arc::new(builder.build()?) + } else { + #[allow(deprecated)] + DiskManager::try_new(disk_manager)? + }, cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, }) @@ -279,9 +302,11 @@ impl RuntimeEnvBuilder { }; Self { + #[allow(deprecated)] disk_manager: DiskManagerConfig::Existing(Arc::clone( &runtime_env.disk_manager, )), + disk_manager_builder: None, memory_pool: Some(Arc::clone(&runtime_env.memory_pool)), cache_manager: cache_config, object_store_registry: Arc::clone(&runtime_env.object_store_registry), diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index f3e0628a2ecde..cadd2b53ab117 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -2587,7 +2587,7 @@ mod tests { JoinSide, }; use datafusion_execution::config::SessionConfig; - use datafusion_execution::disk_manager::DiskManagerConfig; + use datafusion_execution::disk_manager::{DiskManagerBuilder, DiskManagerMode}; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_execution::TaskContext; use datafusion_expr::Operator; @@ -4125,7 +4125,9 @@ mod tests { // Disable DiskManager to prevent spilling let runtime = RuntimeEnvBuilder::new() .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled), + ) .build_arc()?; let session_config = SessionConfig::default().with_batch_size(50); @@ -4205,7 +4207,9 @@ mod tests { // Disable DiskManager to prevent spilling let runtime = RuntimeEnvBuilder::new() .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled), + ) .build_arc()?; let session_config = SessionConfig::default().with_batch_size(50); @@ -4263,7 +4267,9 @@ mod tests { // Enable DiskManager to allow spilling let runtime = RuntimeEnvBuilder::new() .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory), + ) .build_arc()?; for batch_size in [1, 50] { @@ -4366,7 +4372,9 @@ mod tests { // Enable DiskManager to allow spilling let runtime = RuntimeEnvBuilder::new() .with_memory_limit(500, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory), + ) .build_arc()?; for batch_size in [1, 50] { diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 0a7e546b4b18d..e30a1046ab274 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -92,7 +92,8 @@ mod test { }; use datafusion_common::test_util::batches_to_string; use datafusion_execution::{ - config::SessionConfig, disk_manager::DiskManagerConfig, + config::SessionConfig, + disk_manager::{DiskManagerBuilder, DiskManagerMode}, runtime_env::RuntimeEnvBuilder, }; use datafusion_physical_plan::collect; @@ -112,7 +113,9 @@ mod test { fn get_ctx() -> Arc { let rt = RuntimeEnvBuilder::new() - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled), + ) .build_arc() .unwrap(); let session_config = SessionConfig::new().with_target_partitions(1);