diff --git a/datafusion/core/src/execution/disk_manager.rs b/datafusion/core/src/execution/disk_manager.rs index 80d594341875a..b8e5fac3bc257 100644 --- a/datafusion/core/src/execution/disk_manager.rs +++ b/datafusion/core/src/execution/disk_manager.rs @@ -74,7 +74,7 @@ pub struct DiskManager { /// TempDirs to put temporary files in. /// /// If `Some(vec![])` a new OS specified temporary directory will be created - /// If `None` an error will be returned + /// If `None` an error will be returned (configured not to spill) local_dirs: Mutex>>, } @@ -103,12 +103,16 @@ impl DiskManager { } /// Return a temporary file from a randomized choice in the configured locations - pub fn create_tmp_file(&self) -> Result { + /// + /// If the file can not be created for some reason, returns an + /// error message referencing the request description + pub fn create_tmp_file(&self, request_description: &str) -> Result { let mut guard = self.local_dirs.lock(); let local_dirs = guard.as_mut().ok_or_else(|| { - DataFusionError::ResourcesExhausted( - "Cannot spill to temporary file as DiskManager is disabled".to_string(), - ) + DataFusionError::ResourcesExhausted(format!( + "Memory Exhausted while {} (DiskManager is disabled)", + request_description + )) })?; // Create a temporary directory if needed @@ -116,8 +120,9 @@ impl DiskManager { let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?; debug!( - "Created directory '{:?}' as DataFusion tempfile directory", - tempdir.path().to_string_lossy() + "Created directory '{:?}' as DataFusion tempfile directory for {}", + tempdir.path().to_string_lossy(), + request_description, ); local_dirs.push(tempdir); @@ -160,7 +165,7 @@ mod tests { assert_eq!(0, local_dir_snapshot(&dm).len()); // can still create a tempfile however: - let actual = dm.create_tmp_file()?; + let actual = dm.create_tmp_file("Testing")?; // Now the tempdir has been created on demand assert_eq!(1, local_dir_snapshot(&dm).len()); @@ -192,7 +197,7 @@ mod tests { ); let dm = DiskManager::try_new(config)?; - let actual = dm.create_tmp_file()?; + let actual = dm.create_tmp_file("Testing")?; // the file should be in one of the specified local directories assert_path_in_dirs(actual.path(), local_dirs.into_iter()); @@ -204,10 +209,9 @@ mod tests { fn test_disabled_disk_manager() { let config = DiskManagerConfig::Disabled; let manager = DiskManager::try_new(config).unwrap(); - let e = manager.create_tmp_file().unwrap_err().to_string(); assert_eq!( - e, - "Resources exhausted: Cannot spill to temporary file as DiskManager is disabled" + manager.create_tmp_file("Testing").unwrap_err().to_string(), + "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)", ) } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 0ea2c5c5f0647..0dfac78767343 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -291,7 +291,7 @@ impl MemoryConsumer for ExternalSorter { .metrics_set .new_intermediate_tracking(partition, self.runtime.clone()); - let spillfile = self.runtime.disk_manager.create_tmp_file()?; + let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; let stream = in_mem_partial_sort( &mut in_mem_batches, self.schema.clone(),