Skip to content
4 changes: 2 additions & 2 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{num::NonZeroUsize, sync::Arc};

use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
disk_manager::DiskManagerBuilder,
memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool},
runtime_env::RuntimeEnvBuilder,
},
Expand Down Expand Up @@ -110,7 +110,7 @@ impl CommonOpt {
};
rt_builder = rt_builder
.with_memory_pool(pool)
.with_disk_manager(DiskManagerConfig::NewOs);
.with_disk_manager_builder(DiskManagerBuilder::default());
}
Ok(rt_builder)
}
Expand Down
16 changes: 5 additions & 11 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::DiskManager;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
Expand All @@ -43,7 +42,7 @@ use datafusion_cli::{
use clap::Parser;
use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use mimalloc::MiMalloc;

#[global_allocator]
Expand Down Expand Up @@ -200,15 +199,10 @@ async fn main_inner() -> Result<()> {

// set disk limit
if let Some(disk_limit) = args.disk_limit {
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

DiskManager::set_arc_max_temp_directory_size(
&mut disk_manager,
disk_limit.try_into().unwrap(),
)?;

let disk_config = DiskManagerConfig::new_existing(disk_manager);
rt_builder = rt_builder.with_disk_manager(disk_config);
let builder = DiskManagerBuilder::default()
.with_mode(DiskManagerMode::OsTmpDirectory)
.with_max_temp_directory_size(disk_limit.try_into().unwrap());
rt_builder = rt_builder.with_disk_manager_builder(builder);
}

let runtime_env = rt_builder.build_arc()?;
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ use arrow_schema::SchemaRef;
use datafusion::datasource::MemTable;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{instant::Instant, Result};
use datafusion_execution::disk_manager::DiskManagerBuilder;
use datafusion_execution::memory_pool::{
human_readable_size, MemoryPool, UnboundedMemoryPool,
};
use datafusion_expr::display_schema;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use std::time::Duration;

use datafusion_execution::{
disk_manager::DiskManagerConfig, memory_pool::FairSpillPool,
runtime_env::RuntimeEnvBuilder,
};
use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder};
use rand::prelude::IndexedRandom;
use rand::Rng;
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -548,7 +546,7 @@ impl SortFuzzerTestGenerator {

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.with_disk_manager(DiskManagerConfig::NewOs)
.with_disk_manager_builder(DiskManagerBuilder::default())
.build_arc()?;

let ctx = SessionContext::new_with_config_rt(config, runtime);
Expand Down
32 changes: 17 additions & 15 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use datafusion::assert_batches_eq;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -41,11 +40,12 @@ use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_catalog::streaming::StreamingTable;
use datafusion_catalog::Session;
use datafusion_common::{assert_contains, Result};
use datafusion_execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::{DiskManager, TaskContext};
use datafusion_execution::TaskContext;
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
Expand Down Expand Up @@ -204,7 +204,7 @@ async fn sort_merge_join_spill() {
)
.with_memory_limit(1_000)
.with_config(config)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_disk_manager_builder(DiskManagerBuilder::default())
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
Expand Down Expand Up @@ -288,7 +288,7 @@ async fn sort_spill_reservation() {
.with_memory_limit(mem_limit)
// use a single partition so only a sort is needed
.with_scenario(scenario)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_disk_manager_builder(DiskManagerBuilder::default())
.with_expected_plan(
// It is important that this plan only has a SortExec, not
// also merge, so we can ensure the sort could finish
Expand Down Expand Up @@ -550,9 +550,10 @@ async fn setup_context(
disk_limit: u64,
memory_pool_limit: usize,
) -> Result<SessionContext> {
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?;
let disk_manager = DiskManagerBuilder::default()
.with_mode(DiskManagerMode::OsTmpDirectory)
.with_max_temp_directory_size(disk_limit)
.build()?;

let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
Expand All @@ -561,7 +562,7 @@ async fn setup_context(

let runtime = Arc::new(RuntimeEnv {
memory_pool: runtime.memory_pool.clone(),
disk_manager,
disk_manager: Arc::new(disk_manager),
cache_manager: runtime.cache_manager.clone(),
object_store_registry: runtime.object_store_registry.clone(),
});
Expand Down Expand Up @@ -641,7 +642,7 @@ struct TestCase {
scenario: Scenario,
/// How should the disk manager (that allows spilling) be
/// configured? Defaults to `Disabled`
disk_manager_config: DiskManagerConfig,
disk_manager_builder: DiskManagerBuilder,
/// Expected explain plan, if non-empty
expected_plan: Vec<String>,
/// Is the plan expected to pass? Defaults to false
Expand All @@ -657,7 +658,8 @@ impl TestCase {
config: SessionConfig::new(),
memory_pool: None,
scenario: Scenario::AccessLog,
disk_manager_config: DiskManagerConfig::Disabled,
disk_manager_builder: DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Disabled),
expected_plan: vec![],
expected_success: false,
}
Expand Down Expand Up @@ -714,11 +716,11 @@ impl TestCase {

/// Specify if the disk manager should be enabled. If true,
/// operators that support it can spill
pub fn with_disk_manager_config(
pub fn with_disk_manager_builder(
mut self,
disk_manager_config: DiskManagerConfig,
disk_manager_builder: DiskManagerBuilder,
) -> Self {
self.disk_manager_config = disk_manager_config;
self.disk_manager_builder = disk_manager_builder;
self
}

Expand All @@ -737,7 +739,7 @@ impl TestCase {
memory_pool,
config,
scenario,
disk_manager_config,
disk_manager_builder,
expected_plan,
expected_success,
} = self;
Expand All @@ -746,7 +748,7 @@ impl TestCase {

let mut builder = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_disk_manager_builder(disk_manager_builder)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

if let Some(pool) = memory_pool {
Expand Down
Loading