Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::scheduler::ExecutorSpecification;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::prelude::ExecutionConfig;

/// Ballista executor
pub struct Executor {
Expand Down Expand Up @@ -87,9 +88,8 @@ impl Executor {
))
}?;

let runtime_config =
RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let config = ExecutionConfig::new().with_temp_file_path(self.work_dir.clone());
let runtime = Arc::new(RuntimeEnv::new(config.runtime)?);

let partitions = exec.execute_shuffle_write(part, runtime).await?;

Expand Down
94 changes: 90 additions & 4 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ use crate::{
},
};
use log::debug;
use std::fs;
use std::path::Path;
use std::string::String;
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
use std::{fs, path::PathBuf};

use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};
Expand Down Expand Up @@ -94,7 +94,12 @@ use chrono::{DateTime, Utc};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;

use super::options::{AvroReadOptions, CsvReadOptions};
use super::{
disk_manager::DiskManagerConfig,
memory_manager::MemoryManagerConfig,
options::{AvroReadOptions, CsvReadOptions},
DiskManager, MemoryManager,
};

/// ExecutionContext is the main interface for executing queries with DataFusion. The context
/// provides the following functionality:
Expand Down Expand Up @@ -195,6 +200,11 @@ impl ExecutionContext {
}
}

/// Return the [RuntimeEnv] used to run queries with this [ExecutionContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.state.lock().unwrap().runtime_env.clone()
}

/// Creates a dataframe that will execute a SQL query.
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
Expand Down Expand Up @@ -718,7 +728,7 @@ impl ExecutionContext {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = self.state.lock().unwrap().runtime_env.clone();
let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
Expand Down Expand Up @@ -758,7 +768,7 @@ impl ExecutionContext {
let path = path.as_ref();
// create directory to contain the Parquet files (one per partition)
let fs_path = Path::new(path);
let runtime = self.state.lock().unwrap().runtime_env.clone();
let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
Expand Down Expand Up @@ -1057,6 +1067,48 @@ impl ExecutionConfig {
self.runtime = config;
self
}

/// Use an an existing [MemoryManager]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new public API

pub fn with_existing_memory_manager(mut self, existing: Arc<MemoryManager>) -> Self {
self.runtime = self
.runtime
.with_memory_manager(MemoryManagerConfig::new_existing(existing));
self
}

/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
/// Note DataFusion does not yet respect this limit in all cases.
pub fn with_memory_limit(
mut self,
max_memory: usize,
memory_fraction: f64,
) -> Result<Self> {
self.runtime =
self.runtime
.with_memory_manager(MemoryManagerConfig::try_new_limit(
max_memory,
memory_fraction,
)?);
Ok(self)
}

/// Use an an existing [DiskManager]
pub fn with_existing_disk_manager(mut self, existing: Arc<DiskManager>) -> Self {
self.runtime = self
.runtime
.with_disk_manager(DiskManagerConfig::new_existing(existing));
self
}

/// Use the specified path to create any needed temporary files
pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
self.runtime = self
.runtime
.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]));
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down Expand Up @@ -1246,6 +1298,40 @@ mod tests {
use tempfile::TempDir;
use test::*;

#[tokio::test]
async fn shared_memory_and_disk_manager() {
// Demonstrate the ability to share DiskManager and
// MemoryManager between two different executions.
let ctx1 = ExecutionContext::new();

// configure with same memory / disk manager
let memory_manager = ctx1.runtime_env().memory_manager.clone();
let disk_manager = ctx1.runtime_env().disk_manager.clone();
let config = ExecutionConfig::new()
.with_existing_memory_manager(memory_manager.clone())
.with_existing_disk_manager(disk_manager.clone());

let ctx2 = ExecutionContext::with_config(config);

assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx1.runtime_env().memory_manager)
));
assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx2.runtime_env().memory_manager)
));

assert!(std::ptr::eq(
Arc::as_ptr(&disk_manager),
Arc::as_ptr(&ctx1.runtime_env().disk_manager)
));
assert!(std::ptr::eq(
Arc::as_ptr(&disk_manager),
Arc::as_ptr(&ctx2.runtime_env().disk_manager)
));
}

#[test]
fn optimize_explain() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
Expand Down
106 changes: 80 additions & 26 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,86 @@
//! hashed among the directories listed in RuntimeConfig::local_dirs.

use crate::error::{DataFusionError, Result};
use log::info;
use log::{debug, info};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::hash_map::DefaultHasher;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::{Builder, TempDir};

/// Configuration for temporary disk access
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
/// Use the provided [DiskManager] instance
Existing(Arc<DiskManager>),

/// Create a new [DiskManager] that creates temporary files within
/// a temporary directory chosen by the OS
NewOs,

/// Create a new [DiskManager] that creates temporary files within
/// the specified directories
NewSpecified(Vec<PathBuf>),
}

impl Default for DiskManagerConfig {
fn default() -> Self {
Self::NewOs
}
}

impl DiskManagerConfig {
/// Create temporary files in a temporary directory chosen by the OS
pub fn new() -> Self {
Self::default()
}

/// Create temporary files using the provided disk manager
pub fn new_existing(existing: Arc<DiskManager>) -> Self {
Self::Existing(existing)
}

/// Create temporary files in the specified directories
pub fn new_specified(paths: Vec<PathBuf>) -> Self {
Self::NewSpecified(paths)
}
}

/// Manages files generated during query execution, e.g. spill files generated
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Vec<TempDir>,
}

impl DiskManager {
/// Create local dirs inside user provided dirs through conf
pub fn new(conf_dirs: &[String]) -> Result<Self> {
let local_dirs = create_local_dirs(conf_dirs)?;
info!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Self { local_dirs })
/// Create a DiskManager given the configuration
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;

debug!(
"Created directory {:?} as DataFusion working directory",
tempdir
);
Ok(Arc::new(Self {
local_dirs: vec![tempdir],
}))
}
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
info!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Arc::new(Self { local_dirs }))
}
}
}

/// Create a file in conf dirs in randomized manner and return the file path
Expand All @@ -52,20 +108,18 @@ impl DiskManager {
}

/// Setup local dirs by creating one new dir in each of the given dirs
fn create_local_dirs(local_dir: &[String]) -> Result<Vec<TempDir>> {
local_dir
fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
local_dirs
.iter()
.map(|root| create_dir(root, "datafusion-"))
.map(|root| {
Builder::new()
.prefix("datafusion-")
.tempdir_in(root)
.map_err(DataFusionError::IoError)
})
.collect()
}

fn create_dir(root: &str, prefix: &str) -> Result<TempDir> {
Builder::new()
.prefix(prefix)
.tempdir_in(root)
.map_err(DataFusionError::IoError)
}

fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String {
let mut hasher = DefaultHasher::new();
file_name.hash(&mut hasher);
Expand Down Expand Up @@ -98,22 +152,22 @@ fn rand_name() -> String {

#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
use crate::execution::disk_manager::{get_file, DiskManager};
use tempfile::TempDir;

#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = vec![
local_dir1.path().to_str().unwrap().to_string(),
local_dir2.path().to_str().unwrap().to_string(),
local_dir3.path().to_str().unwrap().to_string(),
];
let config = DiskManagerConfig::new_specified(vec![
local_dir1.path().into(),
local_dir2.path().into(),
local_dir3.path().into(),
]);

let dm = DiskManager::new(&local_dirs)?;
let dm = DiskManager::try_new(config)?;
let actual = dm.create_tmp_file()?;
let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1;

Expand Down
Loading