Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,10 @@ impl ShuffleWriterExec {
std::fs::create_dir_all(&path)?;

path.push(format!("data-{}.arrow", input_partition));
let path = path.to_str().unwrap();
info!("Writing results to {}", path);
info!("Writing results to {:?}", path);

let mut writer =
IPCWriter::new(path, stream.schema().as_ref())?;
IPCWriter::new(&path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[output_partition] = Some(writer);
Expand All @@ -287,7 +286,7 @@ impl ShuffleWriterExec {
Some(w) => {
w.finish()?;
info!(
"Finished writing shuffle partition {} at {}. Batches: {}. Rows: {}. Bytes: {}.",
"Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.",
i,
w.path(),
w.num_batches,
Expand All @@ -297,7 +296,7 @@ impl ShuffleWriterExec {

part_locs.push(ShuffleWritePartition {
partition_id: i as u64,
path: w.path().to_owned(),
path: w.path().to_string_lossy().to_string(),
num_batches: w.num_batches,
num_rows: w.num_rows,
num_bytes: w.num_bytes,
Expand Down
74 changes: 30 additions & 44 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@

use crate::error::{DataFusionError, Result};
use log::{debug, info};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::hash_map::DefaultHasher;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::{Builder, TempDir};
use tempfile::{Builder, NamedTempFile, TempDir};

/// Configuration for temporary disk access
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -101,8 +97,8 @@ impl DiskManager {
}
}

/// Create a file in conf dirs in randomized manner and return the file path
pub fn create_tmp_file(&self) -> Result<String> {
/// Return a temporary file from a randomized choice in the configured locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
create_tmp_file(&self.local_dirs)
}
}
Expand All @@ -120,34 +116,15 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
.collect()
}

fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String {
let mut hasher = DefaultHasher::new();
file_name.hash(&mut hasher);
let hash = hasher.finish();
let dir = &local_dirs[hash.rem_euclid(local_dirs.len() as u64) as usize];
let mut path = PathBuf::new();
path.push(dir);
path.push(file_name);
path.to_str().unwrap().to_string()
}
fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempfiles are now generated using tempfile rather than string manipulation

let dir_index = thread_rng().gen_range(0..local_dirs.len());
let dir = local_dirs.get(dir_index).ok_or_else(|| {
DataFusionError::Internal("No directories available to DiskManager".into())
})?;

fn create_tmp_file(local_dirs: &[TempDir]) -> Result<String> {
let name = rand_name();
let mut path = get_file(&*name, local_dirs);
while Path::new(path.as_str()).exists() {
path = get_file(&rand_name(), local_dirs);
}
File::create(&path)?;
Ok(path)
}

/// Return a random string suitable for use as a database name
fn rand_name() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect()
Builder::new()
.tempfile_in(dir)
.map_err(DataFusionError::IoError)
}

#[cfg(test)]
Expand All @@ -161,19 +138,28 @@ mod tests {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let config = DiskManagerConfig::new_specified(vec![
local_dir1.path().into(),
local_dir2.path().into(),
local_dir3.path().into(),
]);
let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
let config = DiskManagerConfig::new_specified(
local_dirs.iter().map(|p| p.into()).collect(),
);

let dm = DiskManager::try_new(config)?;
let actual = dm.create_tmp_file()?;
let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1;

let expected = get_file(name, &dm.local_dirs);
// file should be located in dir by it's name hash
assert_eq!(actual, expected);
// the file should be in one of the specified local directories
let found = local_dirs.iter().any(|p| {
actual
.path()
.ancestors()
.any(|candidate_path| *p == candidate_path)
});

assert!(
found,
"Can't find {:?} in specified local dirs: {:?}",
actual, local_dirs
);

Ok(())
}
}
11 changes: 6 additions & 5 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use std::fs;
use std::fs::{metadata, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -387,7 +388,7 @@ mod tests {
/// Write in Arrow IPC format.
pub struct IPCWriter {
/// path
pub path: String,
pub path: PathBuf,
/// Inner writer
pub writer: FileWriter<File>,
/// bathes written
Expand All @@ -400,18 +401,18 @@ pub struct IPCWriter {

impl IPCWriter {
/// Create new writer
pub fn new(path: &str, schema: &Schema) -> Result<Self> {
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create partition file at {}: {:?}",
"Failed to create partition file at {:?}: {:?}",
path, e
))
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
path: path.into(),
writer: FileWriter::try_new(file, schema)?,
})
}
Expand All @@ -432,7 +433,7 @@ impl IPCWriter {
}

/// Path write to
pub fn path(&self) -> &str {
pub fn path(&self) -> &Path {
&self.path
}
}
Expand Down
34 changes: 19 additions & 15 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
use tokio::task;

Expand All @@ -68,7 +70,7 @@ struct ExternalSorter {
id: MemoryConsumerId,
schema: SchemaRef,
in_mem_batches: Mutex<Vec<RecordBatch>>,
spills: Mutex<Vec<String>>,
spills: Mutex<Vec<NamedTempFile>>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
runtime: Arc<RuntimeEnv>,
Expand Down Expand Up @@ -221,22 +223,25 @@ impl MemoryConsumer for ExternalSorter {

let baseline_metrics = self.metrics.new_intermediate_baseline(partition);

let path = self.runtime.disk_manager.create_tmp_file()?;
let spillfile = self.runtime.disk_manager.create_tmp_file()?;
let stream = in_mem_partial_sort(
&mut *in_mem_batches,
self.schema.clone(),
&*self.expr,
baseline_metrics,
);

let total_size =
spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone())
.await?;
let total_size = spill_partial_sorted_stream(
&mut stream?,
spillfile.path(),
self.schema.clone(),
)
.await?;

let mut spills = self.spills.lock().await;
let used = self.used.swap(0, Ordering::SeqCst);
self.inner_metrics.record_spill(total_size);
spills.push(path);
spills.push(spillfile);
Ok(used)
}

Expand Down Expand Up @@ -281,12 +286,12 @@ fn in_mem_partial_sort(

async fn spill_partial_sorted_stream(
in_mem_stream: &mut SendableRecordBatchStream,
path: String,
path: &Path,
schema: SchemaRef,
) -> Result<usize> {
let (sender, receiver) = tokio::sync::mpsc::channel(2);
let path_clone = path.clone();
let handle = task::spawn_blocking(move || write_sorted(receiver, path_clone, schema));
let path: PathBuf = path.into();
let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema));
while let Some(item) = in_mem_stream.next().await {
sender.send(item).await.ok();
}
Expand All @@ -301,17 +306,16 @@ async fn spill_partial_sorted_stream(
}

fn read_spill_as_stream(
path: String,
path: NamedTempFile,
Copy link
Contributor Author

@alamb alamb Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ownership of NamedTempFile is passed into the actual task doing the reading, when it is done, the temp file is cleaned up 🧹

schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (sender, receiver): (
TKSender<ArrowResult<RecordBatch>>,
TKReceiver<ArrowResult<RecordBatch>>,
) = tokio::sync::mpsc::channel(2);
let path_clone = path.clone();
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_spill(sender, path_clone) {
error!("Failure while reading spill file: {}. Error: {}", path, e);
if let Err(e) = read_spill(sender, path.path()) {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(
Expand All @@ -323,7 +327,7 @@ fn read_spill_as_stream(

fn write_sorted(
mut receiver: TKReceiver<ArrowResult<RecordBatch>>,
path: String,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
Expand All @@ -338,7 +342,7 @@ fn write_sorted(
Ok(writer.num_bytes as usize)
}

fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: String) -> Result<()> {
fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(&path)?);
let reader = FileReader::try_new(file)?;
for batch in reader {
Expand Down