From 660de37759b36bfcba53de902733ca9ae2fede59 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jan 2022 16:32:57 -0500 Subject: [PATCH 1/2] Use NamedTempFile rather than `String` in DiskManager --- datafusion/src/execution/disk_manager.rs | 74 +++++++++------------- datafusion/src/physical_plan/common.rs | 11 ++-- datafusion/src/physical_plan/sorts/sort.rs | 34 +++++----- 3 files changed, 55 insertions(+), 64 deletions(-) diff --git a/datafusion/src/execution/disk_manager.rs b/datafusion/src/execution/disk_manager.rs index c98df3bc77e83..79b70f1f8b9ad 100644 --- a/datafusion/src/execution/disk_manager.rs +++ b/datafusion/src/execution/disk_manager.rs @@ -20,14 +20,10 @@ use crate::error::{DataFusionError, Result}; use log::{debug, info}; -use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use std::collections::hash_map::DefaultHasher; -use std::fs::File; -use std::hash::{Hash, Hasher}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; -use tempfile::{Builder, TempDir}; +use tempfile::{Builder, NamedTempFile, TempDir}; /// Configuration for temporary disk access #[derive(Debug, Clone)] @@ -101,8 +97,8 @@ impl DiskManager { } } - /// Create a file in conf dirs in randomized manner and return the file path - pub fn create_tmp_file(&self) -> Result { + /// Return a temporary file from a randomized choice in the configured locations + pub fn create_tmp_file(&self) -> Result { create_tmp_file(&self.local_dirs) } } @@ -120,34 +116,15 @@ fn create_local_dirs(local_dirs: Vec) -> Result> { .collect() } -fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String { - let mut hasher = DefaultHasher::new(); - file_name.hash(&mut hasher); - let hash = hasher.finish(); - let dir = &local_dirs[hash.rem_euclid(local_dirs.len() as u64) as usize]; - let mut path = PathBuf::new(); - path.push(dir); - path.push(file_name); - path.to_str().unwrap().to_string() -} +fn create_tmp_file(local_dirs: &[TempDir]) -> Result { + let dir_index = thread_rng().gen_range(0..local_dirs.len()); + let dir = local_dirs.get(dir_index).ok_or_else(|| { + DataFusionError::Internal("No directories available to DiskManager".into()) + })?; -fn create_tmp_file(local_dirs: &[TempDir]) -> Result { - let name = rand_name(); - let mut path = get_file(&*name, local_dirs); - while Path::new(path.as_str()).exists() { - path = get_file(&rand_name(), local_dirs); - } - File::create(&path)?; - Ok(path) -} - -/// Return a random string suitable for use as a database name -fn rand_name() -> String { - thread_rng() - .sample_iter(&Alphanumeric) - .take(10) - .map(char::from) - .collect() + Builder::new() + .tempfile_in(dir) + .map_err(DataFusionError::IoError) } #[cfg(test)] @@ -161,19 +138,28 @@ mod tests { let local_dir1 = TempDir::new()?; let local_dir2 = TempDir::new()?; let local_dir3 = TempDir::new()?; - let config = DiskManagerConfig::new_specified(vec![ - local_dir1.path().into(), - local_dir2.path().into(), - local_dir3.path().into(), - ]); + let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()]; + let config = DiskManagerConfig::new_specified( + local_dirs.iter().map(|p| p.into()).collect(), + ); let dm = DiskManager::try_new(config)?; let actual = dm.create_tmp_file()?; - let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1; - let expected = get_file(name, &dm.local_dirs); - // file should be located in dir by it's name hash - assert_eq!(actual, expected); + // the file should be in one of the specified local directories + let found = local_dirs.iter().any(|p| { + actual + .path() + .ancestors() + .any(|candidate_path| *p == candidate_path) + }); + + assert!( + found, + "Can't find {:?} in specified local dirs: {:?}", + actual, local_dirs + ); + Ok(()) } } diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index cabb13a55ed9e..390f004fb4697 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -33,6 +33,7 @@ use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt}; use pin_project_lite::pin_project; use std::fs; use std::fs::{metadata, File}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::task::JoinHandle; @@ -387,7 +388,7 @@ mod tests { /// Write in Arrow IPC format. pub struct IPCWriter { /// path - pub path: String, + pub path: PathBuf, /// Inner writer pub writer: FileWriter, /// bathes written @@ -400,10 +401,10 @@ pub struct IPCWriter { impl IPCWriter { /// Create new writer - pub fn new(path: &str, schema: &Schema) -> Result { + pub fn new(path: &Path, schema: &Schema) -> Result { let file = File::create(path).map_err(|e| { DataFusionError::Execution(format!( - "Failed to create partition file at {}: {:?}", + "Failed to create partition file at {:?}: {:?}", path, e )) })?; @@ -411,7 +412,7 @@ impl IPCWriter { num_batches: 0, num_rows: 0, num_bytes: 0, - path: path.to_owned(), + path: path.into(), writer: FileWriter::try_new(file, schema)?, }) } @@ -432,7 +433,7 @@ impl IPCWriter { } /// Path write to - pub fn path(&self) -> &str { + pub fn path(&self) -> &Path { &self.path } } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 2933a5ba6e3fb..0f5c3bdc07f8c 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -50,8 +50,10 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; use tokio::task; @@ -68,7 +70,7 @@ struct ExternalSorter { id: MemoryConsumerId, schema: SchemaRef, in_mem_batches: Mutex>, - spills: Mutex>, + spills: Mutex>, /// Sort expressions expr: Vec, runtime: Arc, @@ -221,7 +223,7 @@ impl MemoryConsumer for ExternalSorter { let baseline_metrics = self.metrics.new_intermediate_baseline(partition); - let path = self.runtime.disk_manager.create_tmp_file()?; + let spillfile = self.runtime.disk_manager.create_tmp_file()?; let stream = in_mem_partial_sort( &mut *in_mem_batches, self.schema.clone(), @@ -229,14 +231,17 @@ impl MemoryConsumer for ExternalSorter { baseline_metrics, ); - let total_size = - spill_partial_sorted_stream(&mut stream?, path.clone(), self.schema.clone()) - .await?; + let total_size = spill_partial_sorted_stream( + &mut stream?, + spillfile.path(), + self.schema.clone(), + ) + .await?; let mut spills = self.spills.lock().await; let used = self.used.swap(0, Ordering::SeqCst); self.inner_metrics.record_spill(total_size); - spills.push(path); + spills.push(spillfile); Ok(used) } @@ -281,12 +286,12 @@ fn in_mem_partial_sort( async fn spill_partial_sorted_stream( in_mem_stream: &mut SendableRecordBatchStream, - path: String, + path: &Path, schema: SchemaRef, ) -> Result { let (sender, receiver) = tokio::sync::mpsc::channel(2); - let path_clone = path.clone(); - let handle = task::spawn_blocking(move || write_sorted(receiver, path_clone, schema)); + let path: PathBuf = path.into(); + let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema)); while let Some(item) = in_mem_stream.next().await { sender.send(item).await.ok(); } @@ -301,17 +306,16 @@ async fn spill_partial_sorted_stream( } fn read_spill_as_stream( - path: String, + path: NamedTempFile, schema: SchemaRef, ) -> Result { let (sender, receiver): ( TKSender>, TKReceiver>, ) = tokio::sync::mpsc::channel(2); - let path_clone = path.clone(); let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_spill(sender, path_clone) { - error!("Failure while reading spill file: {}. Error: {}", path, e); + if let Err(e) = read_spill(sender, path.path()) { + error!("Failure while reading spill file: {:?}. Error: {}", path, e); } }); Ok(RecordBatchReceiverStream::create( @@ -323,7 +327,7 @@ fn read_spill_as_stream( fn write_sorted( mut receiver: TKReceiver>, - path: String, + path: PathBuf, schema: SchemaRef, ) -> Result { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; @@ -338,7 +342,7 @@ fn write_sorted( Ok(writer.num_bytes as usize) } -fn read_spill(sender: TKSender>, path: String) -> Result<()> { +fn read_spill(sender: TKSender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(&path)?); let reader = FileReader::try_new(file)?; for batch in reader { From 6b231ae5f51bb29753acab269fa2545d4e0eaf74 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jan 2022 16:53:11 -0500 Subject: [PATCH 2/2] fix ballista --- ballista/rust/core/src/execution_plans/shuffle_writer.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 6a6bc3e92aaa2..4f027c1f28bd5 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -265,11 +265,10 @@ impl ShuffleWriterExec { std::fs::create_dir_all(&path)?; path.push(format!("data-{}.arrow", input_partition)); - let path = path.to_str().unwrap(); - info!("Writing results to {}", path); + info!("Writing results to {:?}", path); let mut writer = - IPCWriter::new(path, stream.schema().as_ref())?; + IPCWriter::new(&path, stream.schema().as_ref())?; writer.write(&output_batch)?; writers[output_partition] = Some(writer); @@ -287,7 +286,7 @@ impl ShuffleWriterExec { Some(w) => { w.finish()?; info!( - "Finished writing shuffle partition {} at {}. Batches: {}. Rows: {}. Bytes: {}.", + "Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.", i, w.path(), w.num_batches, @@ -297,7 +296,7 @@ impl ShuffleWriterExec { part_locs.push(ShuffleWritePartition { partition_id: i as u64, - path: w.path().to_owned(), + path: w.path().to_string_lossy().to_string(), num_batches: w.num_batches, num_rows: w.num_rows, num_bytes: w.num_bytes,