From 87c541a0ee92e3a2e544630dfd8ec898b44722d0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 Jan 2022 09:46:23 -0500 Subject: [PATCH 1/2] Improve configuration and resource use of `MemoryManager` and `DiskManager` --- ballista/rust/executor/src/executor.rs | 9 +- datafusion/src/execution/context.rs | 94 +++++++++++- datafusion/src/execution/disk_manager.rs | 106 +++++++++---- datafusion/src/execution/memory_manager.rs | 165 +++++++++++++++++---- datafusion/src/execution/mod.rs | 3 + datafusion/src/execution/runtime_env.rs | 81 +++++----- datafusion/src/physical_plan/sorts/sort.rs | 10 +- 7 files changed, 356 insertions(+), 112 deletions(-) diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index e7479bd5fe9fc..75822f5e671d9 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -24,9 +24,10 @@ use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; use ballista_core::serde::scheduler::ExecutorSpecification; use datafusion::error::DataFusionError; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use datafusion::prelude::ExecutionConfig; /// Ballista executor pub struct Executor { @@ -87,9 +88,9 @@ impl Executor { )) }?; - let runtime_config = - RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let config = ExecutionConfig::new() + .with_temp_file_path(self.work_dir.clone()); + let runtime = Arc::new(RuntimeEnv::new(config.runtime)?); let partitions = exec.execute_shuffle_write(part, runtime).await?; diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ceea83d952e02..61cbf3abc8a13 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -39,7 +39,6 @@ use crate::{ }, }; use log::debug; -use std::fs; use std::path::Path; use std::string::String; use std::sync::Arc; @@ -47,6 +46,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Mutex, }; +use std::{fs, path::PathBuf}; use futures::{StreamExt, TryStreamExt}; use tokio::task::{self, JoinHandle}; @@ -94,7 +94,12 @@ use chrono::{DateTime, Utc}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; -use super::options::{AvroReadOptions, CsvReadOptions}; +use super::{ + disk_manager::DiskManagerConfig, + memory_manager::MemoryManagerConfig, + options::{AvroReadOptions, CsvReadOptions}, + DiskManager, MemoryManager, +}; /// ExecutionContext is the main interface for executing queries with DataFusion. The context /// provides the following functionality: @@ -195,6 +200,11 @@ impl ExecutionContext { } } + /// Return the [RuntimeEnv] used to run queries with this [ExecutionContext] + pub fn runtime_env(&self) -> Arc { + self.state.lock().unwrap().runtime_env.clone() + } + /// Creates a dataframe that will execute a SQL query. /// /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` @@ -718,7 +728,7 @@ impl ExecutionContext { let path = path.as_ref(); // create directory to contain the CSV files (one per partition) let fs_path = Path::new(path); - let runtime = self.state.lock().unwrap().runtime_env.clone(); + let runtime = self.runtime_env(); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -758,7 +768,7 @@ impl ExecutionContext { let path = path.as_ref(); // create directory to contain the Parquet files (one per partition) let fs_path = Path::new(path); - let runtime = self.state.lock().unwrap().runtime_env.clone(); + let runtime = self.runtime_env(); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -1057,6 +1067,48 @@ impl ExecutionConfig { self.runtime = config; self } + + /// Use an an existing [MemoryManager] + pub fn with_existing_memory_manager(mut self, existing: Arc) -> Self { + self.runtime = self + .runtime + .with_memory_manager(MemoryManagerConfig::new_existing(existing)); + self + } + + /// Specify the total memory to use while running the DataFusion + /// plan to `max_memory * memory_fraction` in bytes. + /// + /// Note DataFusion does not yet respect this limit in all cases. + pub fn with_memory_limit( + mut self, + max_memory: usize, + memory_fraction: f64, + ) -> Result { + self.runtime = + self.runtime + .with_memory_manager(MemoryManagerConfig::try_new_limit( + max_memory, + memory_fraction, + )?); + Ok(self) + } + + /// Use an an existing [DiskManager] + pub fn with_existing_disk_manager(mut self, existing: Arc) -> Self { + self.runtime = self + .runtime + .with_disk_manager(DiskManagerConfig::new_existing(existing)); + self + } + + /// Use the specified path to create any needed temporary files + pub fn with_temp_file_path(mut self, path: impl Into) -> Self { + self.runtime = self + .runtime + .with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])); + self + } } /// Holds per-execution properties and data (such as starting timestamps, etc). @@ -1246,6 +1298,40 @@ mod tests { use tempfile::TempDir; use test::*; + #[tokio::test] + async fn shared_memory_and_disk_manager() { + // Demonstrate the ability to share DiskManager and + // MemoryManager between two different executions. + let ctx1 = ExecutionContext::new(); + + // configure with same memory / disk manager + let memory_manager = ctx1.runtime_env().memory_manager.clone(); + let disk_manager = ctx1.runtime_env().disk_manager.clone(); + let config = ExecutionConfig::new() + .with_existing_memory_manager(memory_manager.clone()) + .with_existing_disk_manager(disk_manager.clone()); + + let ctx2 = ExecutionContext::with_config(config); + + assert!(std::ptr::eq( + Arc::as_ptr(&memory_manager), + Arc::as_ptr(&ctx1.runtime_env().memory_manager) + )); + assert!(std::ptr::eq( + Arc::as_ptr(&memory_manager), + Arc::as_ptr(&ctx2.runtime_env().memory_manager) + )); + + assert!(std::ptr::eq( + Arc::as_ptr(&disk_manager), + Arc::as_ptr(&ctx1.runtime_env().disk_manager) + )); + assert!(std::ptr::eq( + Arc::as_ptr(&disk_manager), + Arc::as_ptr(&ctx2.runtime_env().disk_manager) + )); + } + #[test] fn optimize_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); diff --git a/datafusion/src/execution/disk_manager.rs b/datafusion/src/execution/disk_manager.rs index c4a6b1d6d8993..c98df3bc77e83 100644 --- a/datafusion/src/execution/disk_manager.rs +++ b/datafusion/src/execution/disk_manager.rs @@ -19,30 +19,86 @@ //! hashed among the directories listed in RuntimeConfig::local_dirs. use crate::error::{DataFusionError, Result}; -use log::info; +use log::{debug, info}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::collections::hash_map::DefaultHasher; use std::fs::File; use std::hash::{Hash, Hasher}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tempfile::{Builder, TempDir}; +/// Configuration for temporary disk access +#[derive(Debug, Clone)] +pub enum DiskManagerConfig { + /// Use the provided [DiskManager] instance + Existing(Arc), + + /// Create a new [DiskManager] that creates temporary files within + /// a temporary directory chosen by the OS + NewOs, + + /// Create a new [DiskManager] that creates temporary files within + /// the specified directories + NewSpecified(Vec), +} + +impl Default for DiskManagerConfig { + fn default() -> Self { + Self::NewOs + } +} + +impl DiskManagerConfig { + /// Create temporary files in a temporary directory chosen by the OS + pub fn new() -> Self { + Self::default() + } + + /// Create temporary files using the provided disk manager + pub fn new_existing(existing: Arc) -> Self { + Self::Existing(existing) + } + + /// Create temporary files in the specified directories + pub fn new_specified(paths: Vec) -> Self { + Self::NewSpecified(paths) + } +} + /// Manages files generated during query execution, e.g. spill files generated /// while processing dataset larger than available memory. +#[derive(Debug)] pub struct DiskManager { local_dirs: Vec, } impl DiskManager { - /// Create local dirs inside user provided dirs through conf - pub fn new(conf_dirs: &[String]) -> Result { - let local_dirs = create_local_dirs(conf_dirs)?; - info!( - "Created local dirs {:?} as DataFusion working directory", - local_dirs - ); - Ok(Self { local_dirs }) + /// Create a DiskManager given the configuration + pub fn try_new(config: DiskManagerConfig) -> Result> { + match config { + DiskManagerConfig::Existing(manager) => Ok(manager), + DiskManagerConfig::NewOs => { + let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?; + + debug!( + "Created directory {:?} as DataFusion working directory", + tempdir + ); + Ok(Arc::new(Self { + local_dirs: vec![tempdir], + })) + } + DiskManagerConfig::NewSpecified(conf_dirs) => { + let local_dirs = create_local_dirs(conf_dirs)?; + info!( + "Created local dirs {:?} as DataFusion working directory", + local_dirs + ); + Ok(Arc::new(Self { local_dirs })) + } + } } /// Create a file in conf dirs in randomized manner and return the file path @@ -52,20 +108,18 @@ impl DiskManager { } /// Setup local dirs by creating one new dir in each of the given dirs -fn create_local_dirs(local_dir: &[String]) -> Result> { - local_dir +fn create_local_dirs(local_dirs: Vec) -> Result> { + local_dirs .iter() - .map(|root| create_dir(root, "datafusion-")) + .map(|root| { + Builder::new() + .prefix("datafusion-") + .tempdir_in(root) + .map_err(DataFusionError::IoError) + }) .collect() } -fn create_dir(root: &str, prefix: &str) -> Result { - Builder::new() - .prefix(prefix) - .tempdir_in(root) - .map_err(DataFusionError::IoError) -} - fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String { let mut hasher = DefaultHasher::new(); file_name.hash(&mut hasher); @@ -98,8 +152,8 @@ fn rand_name() -> String { #[cfg(test)] mod tests { + use super::*; use crate::error::Result; - use crate::execution::disk_manager::{get_file, DiskManager}; use tempfile::TempDir; #[test] @@ -107,13 +161,13 @@ mod tests { let local_dir1 = TempDir::new()?; let local_dir2 = TempDir::new()?; let local_dir3 = TempDir::new()?; - let local_dirs = vec![ - local_dir1.path().to_str().unwrap().to_string(), - local_dir2.path().to_str().unwrap().to_string(), - local_dir3.path().to_str().unwrap().to_string(), - ]; + let config = DiskManagerConfig::new_specified(vec![ + local_dir1.path().into(), + local_dir2.path().into(), + local_dir3.path().into(), + ]); - let dm = DiskManager::new(&local_dirs)?; + let dm = DiskManager::try_new(config)?; let actual = dm.create_tmp_file()?; let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1; diff --git a/datafusion/src/execution/memory_manager.rs b/datafusion/src/execution/memory_manager.rs index caa597bea603a..32f79750a70dc 100644 --- a/datafusion/src/execution/memory_manager.rs +++ b/datafusion/src/execution/memory_manager.rs @@ -17,7 +17,7 @@ //! Manages all available memory during query execution -use crate::error::Result; +use crate::error::{DataFusionError, Result}; use async_trait::async_trait; use hashbrown::HashMap; use log::info; @@ -28,6 +28,84 @@ use std::sync::{Arc, Condvar, Mutex, Weak}; static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0); +#[derive(Debug, Clone)] +/// Configuration information for memory management +pub enum MemoryManagerConfig { + /// Use the existing [MemoryManager] + Existing(Arc), + + /// Create a new [MemoryManager] that will use up to some + /// fraction of total system memory. + New { + /// Max execution memory allowed for DataFusion. Defaults to + /// `usize::MAX`, which will not attempt to limit the memory + /// used during plan execution. + max_memory: usize, + + /// The fraction of `max_memory` that the memory manager will + /// use for execution. + /// + /// The purpose of this config is to set aside memory for + /// untracked data structures, and imprecise size estimation + /// during memory acquisition. Defaults to 0.7 + memory_fraction: f64, + }, +} + +impl Default for MemoryManagerConfig { + fn default() -> Self { + Self::New { + max_memory: usize::MAX, + memory_fraction: 0.7, + } + } +} + +impl MemoryManagerConfig { + /// Create a new memory [MemoryManager] with no limit on the + /// memory used + pub fn new() -> Self { + Default::default() + } + + /// Create a configuration based on an existing [MemoryManager] + pub fn new_existing(existing: Arc) -> Self { + Self::Existing(existing) + } + + /// Create a new [MemoryManager] with a `max_memory` and `fraction` + pub fn try_new_limit(max_memory: usize, memory_fraction: f64) -> Result { + if max_memory == 0 { + return Err(DataFusionError::Plan(format!( + "invalid max_memory. Expected greater than 0, got {}", + max_memory + ))); + } + if !(memory_fraction > 0f64 && memory_fraction <= 1f64) { + return Err(DataFusionError::Plan(format!( + "invalid fraction. Expected greater than 0 and less than 1.0, got {}", + memory_fraction + ))); + } + + Ok(Self::New { + max_memory, + memory_fraction, + }) + } + + /// return the maximum size of the memory, in bytes, this config will allow + fn pool_size(&self) -> usize { + match self { + MemoryManagerConfig::Existing(existing) => existing.pool_size, + MemoryManagerConfig::New { + max_memory, + memory_fraction, + } => (*max_memory as f64 * *memory_fraction) as usize, + } + } +} + fn next_id() -> usize { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) } @@ -165,6 +243,7 @@ The memory management architecture is the following: */ /// Manage memory usage during physical plan execution +#[derive(Debug)] pub struct MemoryManager { requesters: Arc>>>, trackers: Arc>>>, @@ -174,19 +253,27 @@ pub struct MemoryManager { } impl MemoryManager { - /// Create new memory manager based on max available pool_size + /// Create new memory manager based on the configuration #[allow(clippy::mutex_atomic)] - pub fn new(pool_size: usize) -> Self { - info!( - "Creating memory manager with initial size {}", - human_readable_size(pool_size) - ); - Self { - requesters: Arc::new(Mutex::new(HashMap::new())), - trackers: Arc::new(Mutex::new(HashMap::new())), - pool_size, - requesters_total: Arc::new(Mutex::new(0)), - cv: Condvar::new(), + pub fn new(config: MemoryManagerConfig) -> Arc { + let pool_size = config.pool_size(); + + match config { + MemoryManagerConfig::Existing(manager) => manager, + MemoryManagerConfig::New { .. } => { + info!( + "Creating memory manager with initial size {}", + human_readable_size(pool_size) + ); + + Arc::new(Self { + requesters: Arc::new(Mutex::new(HashMap::new())), + trackers: Arc::new(Mutex::new(HashMap::new())), + pool_size, + requesters_total: Arc::new(Mutex::new(0)), + cv: Condvar::new(), + }) + } } } @@ -328,10 +415,8 @@ fn human_readable_size(size: usize) -> String { #[cfg(test)] mod tests { + use super::*; use crate::error::Result; - use crate::execution::memory_manager::{ - ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, - }; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use async_trait::async_trait; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -438,11 +523,10 @@ mod tests { } #[tokio::test] - async fn basic_functionalities() -> Result<()> { + async fn basic_functionalities() { let config = RuntimeConfig::new() - .with_memory_fraction(1.0) - .with_max_execution_memory(100); - let runtime = Arc::new(RuntimeEnv::new(config)?); + .with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap()); + let runtime = Arc::new(RuntimeEnv::new(config).unwrap()); let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5)); runtime.register_consumer(&(tracker1.clone() as Arc)); @@ -463,8 +547,8 @@ mod tests { runtime.register_consumer(&(requester1.clone() as Arc)); // first requester entered, should be able to use any of the remaining 80 - requester1.do_with_mem(40).await?; - requester1.do_with_mem(10).await?; + requester1.do_with_mem(40).await.unwrap(); + requester1.do_with_mem(10).await.unwrap(); assert_eq!(requester1.get_spills(), 0); assert_eq!(requester1.mem_used(), 50); assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50); @@ -472,17 +556,46 @@ mod tests { let requester2 = Arc::new(DummyRequester::new(0, runtime.clone())); runtime.register_consumer(&(requester2.clone() as Arc)); - requester2.do_with_mem(20).await?; - requester2.do_with_mem(30).await?; + requester2.do_with_mem(20).await.unwrap(); + requester2.do_with_mem(30).await.unwrap(); assert_eq!(requester2.get_spills(), 1); assert_eq!(requester2.mem_used(), 30); - requester1.do_with_mem(10).await?; + requester1.do_with_mem(10).await.unwrap(); assert_eq!(requester1.get_spills(), 1); assert_eq!(requester1.mem_used(), 10); assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 40); + } - Ok(()) + #[tokio::test] + #[should_panic(expected = "invalid max_memory. Expected greater than 0, got 0")] + async fn test_try_new_with_limit_0() { + MemoryManagerConfig::try_new_limit(0, 1.0).unwrap(); + } + + #[tokio::test] + #[should_panic( + expected = "invalid fraction. Expected greater than 0 and less than 1.0, got -9.6" + )] + async fn test_try_new_with_limit_neg_fraction() { + MemoryManagerConfig::try_new_limit(100, -9.6).unwrap(); + } + + #[tokio::test] + #[should_panic( + expected = "invalid fraction. Expected greater than 0 and less than 1.0, got 9.6" + )] + async fn test_try_new_with_limit_too_large() { + MemoryManagerConfig::try_new_limit(100, 9.6).unwrap(); + } + + #[tokio::test] + async fn test_try_new_with_limit_pool_size() { + let config = MemoryManagerConfig::try_new_limit(100, 0.5).unwrap(); + assert_eq!(config.pool_size(), 50); + + let config = MemoryManagerConfig::try_new_limit(100000, 0.1).unwrap(); + assert_eq!(config.pool_size(), 10000); } } diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs index ebc7c011970b3..acc44bc61209e 100644 --- a/datafusion/src/execution/mod.rs +++ b/datafusion/src/execution/mod.rs @@ -23,3 +23,6 @@ pub(crate) mod disk_manager; pub(crate) mod memory_manager; pub mod options; pub mod runtime_env; + +pub use disk_manager::DiskManager; +pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager}; diff --git a/datafusion/src/execution/runtime_env.rs b/datafusion/src/execution/runtime_env.rs index 1e1aecd33c1d2..cdcd1f71b4f5f 100644 --- a/datafusion/src/execution/runtime_env.rs +++ b/datafusion/src/execution/runtime_env.rs @@ -18,17 +18,25 @@ //! Execution runtime environment that tracks memory, disk and various configurations //! that are used during physical plan execution. -use crate::error::Result; -use crate::execution::disk_manager::DiskManager; -use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager}; +use crate::{ + error::Result, + execution::{ + disk_manager::{DiskManager, DiskManagerConfig}, + memory_manager::{ + MemoryConsumer, MemoryConsumerId, MemoryManager, MemoryManagerConfig, + }, + }, +}; + use std::fmt::{Debug, Formatter}; use std::sync::Arc; #[derive(Clone)] -/// Execution runtime environment +/// Execution runtime environment. This structure is passed to the +/// physical plans when they are run. pub struct RuntimeEnv { - /// Runtime configuration - pub config: RuntimeConfig, + /// Default batch size while creating new batches + pub batch_size: usize, /// Runtime memory management pub memory_manager: Arc, /// Manage temporary files during query execution @@ -44,20 +52,22 @@ impl Debug for RuntimeEnv { impl RuntimeEnv { /// Create env based on configuration pub fn new(config: RuntimeConfig) -> Result { - let memory_manager = Arc::new(MemoryManager::new( - (config.max_memory as f64 * config.memory_fraction) as usize, - )); - let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?); - Ok(Self { - config, + let RuntimeConfig { + batch_size, memory_manager, disk_manager, + } = config; + + Ok(Self { + batch_size, + memory_manager: MemoryManager::new(memory_manager), + disk_manager: DiskManager::try_new(disk_manager)?, }) } /// Get execution batch size based on config pub fn batch_size(&self) -> usize { - self.config.batch_size + self.batch_size } /// Register the consumer to get it tracked @@ -84,16 +94,10 @@ pub struct RuntimeConfig { /// for buffer-in-memory batches since creating tiny batches would results /// in too much metadata memory consumption. pub batch_size: usize, - /// Max execution memory allowed for DataFusion. - /// Defaults to `usize::MAX` - pub max_memory: usize, - /// The fraction of total memory used for execution. - /// The purpose of this config is to set aside memory for untracked data structures, - /// and imprecise size estimation during memory acquisition. - /// Defaults to 0.7 - pub memory_fraction: f64, - /// Local dirs to store temporary files during execution. - pub local_dirs: Vec, + /// DiskManager to manage temporary disk file usage + pub disk_manager: DiskManagerConfig, + /// MemoryManager to limit access to memory + pub memory_manager: MemoryManagerConfig, } impl RuntimeConfig { @@ -110,40 +114,25 @@ impl RuntimeConfig { self } - /// Customize exec size - pub fn with_max_execution_memory(mut self, max_memory: usize) -> Self { - assert!(max_memory > 0); - self.max_memory = max_memory; + /// Customize disk manager + pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self { + self.disk_manager = disk_manager; self } - /// Customize exec memory fraction - pub fn with_memory_fraction(mut self, fraction: f64) -> Self { - assert!(fraction > 0f64 && fraction <= 1f64); - self.memory_fraction = fraction; - self - } - - /// Customize exec size - pub fn with_local_dirs(mut self, local_dirs: Vec) -> Self { - assert!(!local_dirs.is_empty()); - self.local_dirs = local_dirs; + /// Customize memory manager + pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig) -> Self { + self.memory_manager = memory_manager; self } } impl Default for RuntimeConfig { fn default() -> Self { - let tmp_dir = tempfile::tempdir().unwrap(); - let path = tmp_dir.path().to_str().unwrap().to_string(); - std::mem::forget(tmp_dir); - Self { batch_size: 8192, - // Effectively "no limit" - max_memory: usize::MAX, - memory_fraction: 0.7, - local_dirs: vec![path], + disk_manager: DiskManagerConfig::default(), + memory_manager: MemoryManagerConfig::default(), } } } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index e09146499c26c..f3df46ee98bbf 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -690,7 +690,7 @@ async fn do_sort( mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; - use crate::execution::runtime_env::RuntimeConfig; + use crate::execution::context::ExecutionConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -774,11 +774,9 @@ mod tests { #[tokio::test] async fn test_sort_spill() -> Result<()> { - let config = RuntimeConfig::new() - .with_memory_fraction(1.0) - // trigger spill there will be 4 batches with 5.5KB for each - .with_max_execution_memory(12288); - let runtime = Arc::new(RuntimeEnv::new(config)?); + // trigger spill there will be 4 batches with 5.5KB for each + let config = ExecutionConfig::new().with_memory_limit(12288, 1.0)?; + let runtime = Arc::new(RuntimeEnv::new(config.runtime)?); let schema = test_util::aggr_test_schema(); let partitions = 4; From 15771d674ed2c138d26f5eb115171649b7d3f039 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 Jan 2022 13:34:09 -0500 Subject: [PATCH 2/2] fmt --- ballista/rust/executor/src/executor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index 75822f5e671d9..6bf1aeb4f1827 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -88,8 +88,7 @@ impl Executor { )) }?; - let config = ExecutionConfig::new() - .with_temp_file_path(self.work_dir.clone()); + let config = ExecutionConfig::new().with_temp_file_path(self.work_dir.clone()); let runtime = Arc::new(RuntimeEnv::new(config.runtime)?); let partitions = exec.execute_shuffle_write(part, runtime).await?;