From 00c8817ab489671830821052813633198c05b6b7 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Sun, 15 Feb 2026 19:42:37 -0800 Subject: [PATCH 1/2] feat: Expose used MemoryPool in ResourcesExhausted error messages --- datafusion-cli/tests/cli_integration.rs | 4 +- .../cli_top_memory_consumers@no_track.snap | 2 +- .../cli_top_memory_consumers@top2.snap | 2 +- ...cli_top_memory_consumers@top3_default.snap | 2 +- ...consumers_with_mem_pool_type@no_track.snap | 2 +- ...ory_consumers_with_mem_pool_type@top2.snap | 2 +- .../memory_pool_tracking.rs | 3 +- datafusion/execution/src/memory_pool/mod.rs | 8 +- datafusion/execution/src/memory_pool/pool.rs | 317 +++++++++++++----- 9 files changed, 256 insertions(+), 86 deletions(-) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 3cecba75e21b0..be4a2ad4fe197 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -261,11 +261,11 @@ fn bind_to_settings(snapshot_name: &str) -> SettingsBindDropGuard { "Consumer(can spill: bool) consumed XB, peak XB", ); settings.add_filter( - r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", + r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total memory pool: '.*?'", "Error: Failed to allocate ", ); settings.add_filter( - r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", + r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total memory pool: '.*?'", "Resources exhausted: Failed to allocate", ); diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap index fe454595eb4bc..c34e1202f55da 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap @@ -16,6 +16,6 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'. caused by -Resources exhausted: Failed to allocate +Resources exhausted: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: greedy(used: 10.0 MB, pool_size: 10.0 MB) ----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index bb30e387166bc..ebf7a540d8d44 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -19,6 +19,6 @@ caused by Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. -Error: Failed to allocate +Error: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: greedy(used: 10.0 MB, pool_size: 10.0 MB) ----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 891d72e3cc639..9e279ca93ddcd 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -18,6 +18,6 @@ Resources exhausted: Additional allocation failed for ExternalSorter[0] with top Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. -Error: Failed to allocate +Error: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: greedy(used: 10.0 MB, pool_size: 10.0 MB) ----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap index 25267ea1617e5..9a228fcfb6e93 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap @@ -18,6 +18,6 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'. caused by -Resources exhausted: Failed to allocate +Resources exhausted: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 10.0 MB) ----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap index 6515050047107..d7f964a339313 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap @@ -21,6 +21,6 @@ caused by Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. -Error: Failed to allocate +Error: Failed to allocate additional 128.0 KB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 10.0 MB) ----- stderr ----- diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs index af3031c690fa3..d849a033bc66b 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs @@ -119,7 +119,8 @@ async fn automatic_usage_example() -> Result<()> { ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB, ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB, ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB. - Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool + Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated + for this reservation - 7.1 MB remain available for the total memory pool */ } } diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index a544cdfdb02e8..829e313d2381e 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -19,6 +19,7 @@ //! help with allocation accounting. use datafusion_common::{Result, internal_datafusion_err}; +use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::{cmp::Ordering, sync::Arc, sync::atomic}; @@ -181,7 +182,10 @@ pub use pool::*; /// /// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers, /// providing better error messages on the largest memory users. -pub trait MemoryPool: Send + Sync + std::fmt::Debug { +pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { + /// Return pool name + fn name(&self) -> &str; + /// Registers a new [`MemoryConsumer`] /// /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory @@ -232,7 +236,7 @@ pub enum MemoryLimit { /// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to /// a particular `MemoryConsumer`; /// -/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable, +/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefore not cloneable, /// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`], /// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool, /// and is only guaranteed to share the name and inner properties. diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 19aaa0371ada3..7d073faa7d333 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -22,6 +22,7 @@ use datafusion_common::HashMap; use datafusion_common::{DataFusionError, Result, resources_datafusion_err}; use log::debug; use parking_lot::Mutex; +use std::fmt::{Display, Formatter}; use std::{ num::NonZeroUsize, sync::atomic::{AtomicUsize, Ordering}, @@ -34,6 +35,10 @@ pub struct UnboundedMemoryPool { } impl MemoryPool for UnboundedMemoryPool { + fn name(&self) -> &str { + "unbounded" + } + fn grow(&self, _reservation: &MemoryReservation, additional: usize) { self.used.fetch_add(additional, Ordering::Relaxed); } @@ -56,6 +61,17 @@ impl MemoryPool for UnboundedMemoryPool { } } +impl Display for UnboundedMemoryPool { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let used = self.used.load(Ordering::Relaxed); + write!( + f, + "{}", + format_args!("{}(used: {})", &self.name(), human_readable_size(used)) + ) + } +} + /// A [`MemoryPool`] that implements a greedy first-come first-serve limit. /// /// This pool works well for queries that do not need to spill or have @@ -79,6 +95,10 @@ impl GreedyMemoryPool { } impl MemoryPool for GreedyMemoryPool { + fn name(&self) -> &str { + "greedy" + } + fn grow(&self, _reservation: &MemoryReservation, additional: usize) { self.used.fetch_add(additional, Ordering::Relaxed); } @@ -98,6 +118,7 @@ impl MemoryPool for GreedyMemoryPool { reservation, additional, self.pool_size.saturating_sub(used), + self, ) })?; Ok(()) @@ -112,6 +133,22 @@ impl MemoryPool for GreedyMemoryPool { } } +impl Display for GreedyMemoryPool { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let used = self.used.load(Ordering::Relaxed); + write!( + f, + "{}", + format_args!( + "{}(used: {}, pool_size: {})", + &self.name(), + human_readable_size(used), + human_readable_size(self.pool_size) + ) + ) + } +} + /// A [`MemoryPool`] that prevents spillable reservations from using more than /// an even fraction of the available memory sans any unspillable reservations /// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`) @@ -170,6 +207,10 @@ impl FairSpillPool { } impl MemoryPool for FairSpillPool { + fn name(&self) -> &str { + "fair" + } + fn register(&self, consumer: &MemoryConsumer) { if consumer.can_spill { self.state.lock().num_spill += 1; @@ -217,6 +258,7 @@ impl MemoryPool for FairSpillPool { reservation, additional, available, + self, )); } state.spillable += additional; @@ -231,6 +273,7 @@ impl MemoryPool for FairSpillPool { reservation, additional, available, + self, )); } state.unspillable += additional; @@ -249,6 +292,20 @@ impl MemoryPool for FairSpillPool { } } +impl Display for FairSpillPool { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + format_args!( + "{}(pool_size: {})", + &self.name(), + human_readable_size(self.pool_size), + ) + ) + } +} + /// Constructs a resources error based upon the individual [`MemoryReservation`]. /// /// The error references the `bytes already allocated` for the reservation, @@ -259,13 +316,15 @@ fn insufficient_capacity_err( reservation: &MemoryReservation, additional: usize, available: usize, + pool: &impl MemoryPool, ) -> DataFusionError { resources_datafusion_err!( - "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", + "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total memory pool: {}", human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size()), - human_readable_size(available) + human_readable_size(available), + pool ) } @@ -362,6 +421,21 @@ pub struct TrackConsumersPool { tracked_consumers: Mutex>, } +impl Display for TrackConsumersPool { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + format_args!( + "{}(inner_pool: {}, num_of_top_consumers: {})", + &self.name(), + &self.inner, + &self.top, + ) + ) + } +} + impl TrackConsumersPool { /// Creates a new [`TrackConsumersPool`]. /// @@ -407,6 +481,11 @@ impl TrackConsumersPool { } } + /// Returns a reference to the wrapped inner [`MemoryPool`]. + pub fn inner(&self) -> &I { + &self.inner + } + /// Returns a snapshot of all currently tracked consumers. pub fn metrics(&self) -> Vec { self.tracked_consumers @@ -452,6 +531,10 @@ impl TrackConsumersPool { } impl MemoryPool for TrackConsumersPool { + fn name(&self) -> &str { + "track_consumers" + } + fn register(&self, consumer: &MemoryConsumer) { self.inner.register(consumer); @@ -545,7 +628,7 @@ fn provide_top_memory_consumers_to_error_msg( #[cfg(test)] mod tests { use super::*; - use insta::{Settings, allow_duplicates, assert_snapshot}; + use insta::{Settings, allow_duplicates, assert_snapshot, with_settings}; use std::sync::Arc; fn make_settings() -> Settings { @@ -575,10 +658,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); r1.shrink(1990); r2.shrink(2000); @@ -603,12 +686,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); // But dropping r2 does drop(r2); @@ -621,7 +704,7 @@ mod tests { let r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool"); + assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)"); } #[test] @@ -669,7 +752,7 @@ mod tests { r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B, r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B, r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B. - Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool + Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total memory pool: greedy(used: 95.0 B, pool_size: 100.0 B) "); } @@ -692,7 +775,7 @@ mod tests { assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. - Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total memory pool: greedy(used: 0.0 B, pool_size: 100.0 B) "); // API: multiple registrations using the same hashed consumer, @@ -710,7 +793,7 @@ mod tests { Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. - Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: greedy(used: 10.0 B, pool_size: 100.0 B) "); // Test: will accumulate size changes per consumer, not per reservation @@ -723,7 +806,7 @@ mod tests { Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. - Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool + Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B) "); // Test: different hashed consumer, (even with the same name), @@ -739,78 +822,86 @@ mod tests { foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B. - Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool + Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B) "); } #[test] fn test_tracked_consumers_pool_deregister() { - fn test_per_pool_type(pool: Arc) { - // Baseline: see the 2 memory consumers - let setting = make_settings(); - let _bound = setting.bind_to_scope(); - let r0 = MemoryConsumer::new("r0").register(&pool); - r0.grow(10); - let r1_consumer = MemoryConsumer::new("r1"); - let r1 = r1_consumer.register(&pool); - r1.grow(20); - - let res = r0.try_grow(150); - assert!(res.is_err()); - let error = res.unwrap_err().strip_backtrace(); - allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: - r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. - Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool - ")); - - // Test: unregister one - // only the remaining one should be listed - drop(r1); - let res = r0.try_grow(150); - assert!(res.is_err()); - let error = res.unwrap_err().strip_backtrace(); - allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. - Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool - ")); - - // Test: actual message we see is the `available is 70`. When it should be `available is 90`. - // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let res = r0.try_grow(150); - assert!(res.is_err()); - let error = res.unwrap_err().strip_backtrace(); - allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. - Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool - ")); - - // Test: the registration needs to free itself (or be dropped), - // for the proper error message - let res = r0.try_grow(150); - assert!(res.is_err()); - let error = res.unwrap_err().strip_backtrace(); - allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. - Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool - ")); + fn test_per_pool_type(pool: Arc>) { + // `snapshot_suffix` ties each insta snapshot to this pool's inner backend; filters + // normalize inner pool `Display` so fair vs greedy share the same `@` reference text. + with_settings!({ + snapshot_suffix => pool.inner().name().to_string(), + filters => vec![ + ( + r"([^\s]+)\#\d+\(can spill: (true|false)\)", + "$1#[ID](can spill: $2)", + ), + ( + r"for the total memory pool: [^\n]+", + "for the total memory pool: [INNER_POOL]", + ), + ], + }, { + let memory_pool: Arc = Arc::>::clone(&pool); + let r0 = MemoryConsumer::new("r0").register(&memory_pool); + r0.grow(10); + let r1 = MemoryConsumer::new("r1").register(&memory_pool); + r1.grow(20); + + // Baseline: see the 2 memory consumers + let error = r0.try_grow(150).unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: + r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, + r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL] + "); + + // Test: unregister one — only the remaining consumer should be listed + drop(r1); + let error = r0.try_grow(150).unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL] + "); + + // Test: actual message we see is the `available is 70`. When it should be `available is 90`. + // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). + let error = r0.try_grow(150).unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL] + "); + + // Test: the registration needs to free itself (or be dropped), + // for the proper error message + let error = r0.try_grow(150).unwrap_err().strip_backtrace(); + assert_snapshot!(error, @r" + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: + r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL] + "); + } + ); } - let tracked_spill_pool: Arc = Arc::new(TrackConsumersPool::new( - FairSpillPool::new(100), - NonZeroUsize::new(3).unwrap(), - )); - test_per_pool_type(tracked_spill_pool); + allow_duplicates! { + let tracked_spill_pool = Arc::new(TrackConsumersPool::new( + FairSpillPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_spill_pool); - let tracked_greedy_pool: Arc = Arc::new(TrackConsumersPool::new( - GreedyMemoryPool::new(100), - NonZeroUsize::new(3).unwrap(), - )); - test_per_pool_type(tracked_greedy_pool); + let tracked_greedy_pool = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_greedy_pool); + } } #[test] @@ -894,4 +985,78 @@ mod tests { r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B. "); } + + #[test] + fn test_memory_pool_display_fmt() { + let top = NonZeroUsize::new(5).unwrap(); + + // UnboundedMemoryPool Display with default allocation: 0.0B + let unbounded = UnboundedMemoryPool::default(); + assert_eq!( + unbounded.to_string(), + "unbounded(used: 0.0 B)", + "UnboundedMemoryPool Display" + ); + + // UnboundedMemoryPool Display with reservations + let unbounded_arc: Arc = Arc::new(UnboundedMemoryPool::default()); + let r = MemoryConsumer::new("u").register(&unbounded_arc); + r.grow(2048); + assert_eq!( + unbounded_arc.as_ref().to_string(), + "unbounded(used: 2.0 KB)", + "UnboundedMemoryPool Display with reservations" + ); + + // GreedyMemoryPool Display with default allocation: 100.0B + let greedy = GreedyMemoryPool::new(100); + assert_eq!( + greedy.to_string(), + "greedy(used: 0.0 B, pool_size: 100.0 B)", + "GreedyMemoryPool Display" + ); + + // GreedyMemoryPool Display with reservations + let greedy_arc: Arc = Arc::new(GreedyMemoryPool::new(100)); + let r = MemoryConsumer::new("g").register(&greedy_arc); + r.grow(50); + assert_eq!( + greedy_arc.as_ref().to_string(), + "greedy(used: 50.0 B, pool_size: 100.0 B)", + "GreedyMemoryPool Display with reservations" + ); + + // FairSpillPool Display with default allocation: 4.0KB and without reservations + let fair = FairSpillPool::new(4096); + assert_eq!( + fair.to_string(), + "fair(pool_size: 4.0 KB)", + "FairSpillPool Display" + ); + + // TrackConsumersPool Display with default allocation: 128.0B and without reservations + let tracked_greedy = TrackConsumersPool::new(GreedyMemoryPool::new(128), top); + assert_eq!( + tracked_greedy.to_string(), + "track_consumers(inner_pool: greedy(used: 0.0 B, pool_size: 128.0 B), num_of_top_consumers: 5)", + "TrackConsumersPool Display" + ); + + // TrackConsumersPool Display with default allocation: 256.0B and without reservations + let tracked_fair = TrackConsumersPool::new(FairSpillPool::new(256), top); + assert_eq!( + tracked_fair.to_string(), + "track_consumers(inner_pool: fair(pool_size: 256.0 B), num_of_top_consumers: 5)", + "TrackConsumersPool Display" + ); + + // TrackConsumersPool Display without reservations + let tracked_unbounded = + TrackConsumersPool::new(UnboundedMemoryPool::default(), top); + assert_eq!( + tracked_unbounded.to_string(), + "track_consumers(inner_pool: unbounded(used: 0.0 B), num_of_top_consumers: 5)", + "TrackConsumersPool Display" + ); + } } From 5b181a11e27dc6d7516b93831851ce13d26cac79 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Mon, 20 Apr 2026 22:11:17 -0700 Subject: [PATCH 2/2] Addressing review comment --- datafusion/execution/src/memory_pool/pool.rs | 37 +++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 7d073faa7d333..aac95b9d6a81f 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -64,11 +64,7 @@ impl MemoryPool for UnboundedMemoryPool { impl Display for UnboundedMemoryPool { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let used = self.used.load(Ordering::Relaxed); - write!( - f, - "{}", - format_args!("{}(used: {})", &self.name(), human_readable_size(used)) - ) + write!(f, "{}(used: {})", &self.name(), human_readable_size(used)) } } @@ -138,13 +134,10 @@ impl Display for GreedyMemoryPool { let used = self.used.load(Ordering::Relaxed); write!( f, - "{}", - format_args!( - "{}(used: {}, pool_size: {})", - &self.name(), - human_readable_size(used), - human_readable_size(self.pool_size) - ) + "{}(used: {}, pool_size: {})", + &self.name(), + human_readable_size(used), + human_readable_size(self.pool_size) ) } } @@ -296,12 +289,9 @@ impl Display for FairSpillPool { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{}", - format_args!( - "{}(pool_size: {})", - &self.name(), - human_readable_size(self.pool_size), - ) + "{}(pool_size: {})", + &self.name(), + human_readable_size(self.pool_size), ) } } @@ -425,13 +415,10 @@ impl Display for TrackConsumersPool { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{}", - format_args!( - "{}(inner_pool: {}, num_of_top_consumers: {})", - &self.name(), - &self.inner, - &self.top, - ) + "{}(inner_pool: {}, num_of_top_consumers: {})", + &self.name(), + &self.inner, + &self.top, ) } }