From 80c9f258cdeb2b95f0259ef337353d67136d1b6d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 25 Feb 2026 17:10:40 -0500 Subject: [PATCH 1/5] Simplify logic for memory pressure partial emit from ordered group by --- .../physical-plan/src/aggregates/row_hash.rs | 62 ++++++++++++------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 35f32ac7ae03..b8c83385a01e 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1028,36 +1028,52 @@ impl GroupedHashAggregateStream { Ok(None) } OutOfMemoryMode::EmitEarly if self.group_values.len() > 1 => { - let n = if self.group_values.len() >= self.batch_size { - // Try to emit an integer multiple of batch size if possible - self.group_values.len() / self.batch_size * self.batch_size - } else { - // Otherwise emit whatever we can - self.group_values.len() - }; - // Clamp to the sort boundary when using partial group ordering, // otherwise remove_groups panics (#20445). - let n = match &self.group_ordering { - GroupOrdering::None => n, - _ => match self.group_ordering.emit_to() { - Some(EmitTo::First(max)) => n.min(max), - _ => 0, - }, - }; - - if n > 0 - && let Some(batch) = self.emit(EmitTo::First(n), false)? - { - Ok(Some(ExecutionState::ProducingOutput(batch))) - } else { - Err(oom) + if let Some(emit_to) = self.emit_target_for_oom() { + if let Some(batch) = self.emit(EmitTo::First(n), false)? { + return Ok(Some(ExecutionState::ProducingOutput(batch))); + } } + Err(oom) } - _ => Err(oom), } } + + /// Returns how many groups to try and emit in order to avoid an out-of-memory + /// condition. + /// + /// Returns `None` if emitting is not possible. + /// + /// Returns Some(EmitTo) with the number of groups to emit if it is possible + /// to emit some groups to free memory + fn emit_target_for_oom(&self) -> Option { + let n = if self.group_values.len() >= self.batch_size { + // Try to emit an integer multiple of batch size if possible + self.group_values.len() / self.batch_size * self.batch_size + } else { + // Otherwise emit whatever we can + self.group_values.len() + }; + + // Special case for GroupOrdering::None since emit_to() returns None for + // that case, but we can still emit some groups to try to resolve the OOM + if matches!(&self.group_ordering, GroupOrdering::None) { + return Some(EmitTo::First(n)); + }; + + self.group_ordering.emit_to() + .map(|emit_to| match emit_to { + // If the ordering allows emitting some groups, + // emit as many as we can to try to resolve the OOM, + EmitTo::First(max)=> EmitTo::First(n.min(max)), + // if the ordering allows emitting all groups, we can emit n + // groups to try to resolve the OOM + EmitTo::All => EmitTo::First(n), + }) + } + fn update_memory_reservation(&mut self) -> Result<()> { let acc = self.accumulators.iter().map(|x| x.size()).sum::(); let new_size = acc From 42ded352bfca476410fba2bd34d60ad7d84aad16 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 25 Feb 2026 17:18:36 -0500 Subject: [PATCH 2/5] Fixup --- .../physical-plan/src/aggregates/row_hash.rs | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b8c83385a01e..eb841be18c7f 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1030,17 +1030,19 @@ impl GroupedHashAggregateStream { OutOfMemoryMode::EmitEarly if self.group_values.len() > 1 => { // Clamp to the sort boundary when using partial group ordering, // otherwise remove_groups panics (#20445). - if let Some(emit_to) = self.emit_target_for_oom() { - if let Some(batch) = self.emit(EmitTo::First(n), false)? { - return Ok(Some(ExecutionState::ProducingOutput(batch))); - } + if let Some(emit_to) = self.emit_target_for_oom() + && let Some(batch) = self.emit(emit_to, false)? + { + return Ok(Some(ExecutionState::ProducingOutput(batch))); } Err(oom) } + OutOfMemoryMode::EmitEarly + | OutOfMemoryMode::Spill + | OutOfMemoryMode::ReportError => Err(oom), } } - /// Returns how many groups to try and emit in order to avoid an out-of-memory /// condition. /// @@ -1063,15 +1065,14 @@ impl GroupedHashAggregateStream { return Some(EmitTo::First(n)); }; - self.group_ordering.emit_to() - .map(|emit_to| match emit_to { - // If the ordering allows emitting some groups, - // emit as many as we can to try to resolve the OOM, - EmitTo::First(max)=> EmitTo::First(n.min(max)), - // if the ordering allows emitting all groups, we can emit n - // groups to try to resolve the OOM - EmitTo::All => EmitTo::First(n), - }) + self.group_ordering.emit_to().map(|emit_to| match emit_to { + // If the ordering allows emitting some groups, + // emit as many as we can to try to resolve the OOM, + EmitTo::First(max) => EmitTo::First(n.min(max)), + // if the ordering allows emitting all groups, we can emit n + // groups to try to resolve the OOM + EmitTo::All => EmitTo::First(n), + }) } fn update_memory_reservation(&mut self) -> Result<()> { From 1ba7407b41f95f1d7476e305d250bec65345e212 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 25 Feb 2026 17:20:08 -0500 Subject: [PATCH 3/5] improve --- datafusion/physical-plan/src/aggregates/row_hash.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index eb841be18c7f..ed845cd04ff8 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1043,13 +1043,13 @@ impl GroupedHashAggregateStream { } } - /// Returns how many groups to try and emit in order to avoid an out-of-memory - /// condition. + /// Returns the number of groups groups that can be emitted to avoid an + /// out-of-memory condition. /// /// Returns `None` if emitting is not possible. /// - /// Returns Some(EmitTo) with the number of groups to emit if it is possible - /// to emit some groups to free memory + /// Returns `Some(EmitTo)` with the number of groups to emit if it is possible + /// to emit some groups to free memory. fn emit_target_for_oom(&self) -> Option { let n = if self.group_values.len() >= self.batch_size { // Try to emit an integer multiple of batch size if possible @@ -1065,6 +1065,10 @@ impl GroupedHashAggregateStream { return Some(EmitTo::First(n)); }; + // For the case of GroupOrdering::Partial or GroupOrdering::Full, use + // the ordering's emit_to() method to determine how many groups can be + // emitted while respecting the ordering guarantees, clamped to the + // batch size. self.group_ordering.emit_to().map(|emit_to| match emit_to { // If the ordering allows emitting some groups, // emit as many as we can to try to resolve the OOM, From e4514e617b89ed7d7721abe7f9d461e242d6ab71 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Mar 2026 07:14:18 -0400 Subject: [PATCH 4/5] Move logic into GroupOrdering --- .../physical-plan/src/aggregates/order/mod.rs | 81 ++++++++++++++++++- .../physical-plan/src/aggregates/row_hash.rs | 48 +++-------- 2 files changed, 89 insertions(+), 40 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index bbcb30d877cf..134d0bfc3615 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -52,7 +52,8 @@ impl GroupOrdering { } } - // How many groups be emitted, or None if no data can be emitted + /// Returns how many groups be emitted while respecting the current ordering + /// guarantees, or `None` if no data can be emitted pub fn emit_to(&self) -> Option { match self { GroupOrdering::None => None, @@ -61,6 +62,28 @@ impl GroupOrdering { } } + /// Returns the emit strategy to use under memory pressure (OOM). + /// + /// Returns the strategy that must be used when emitting up to `n` groups + /// while respecting the current ordering guarantees. + /// + /// Returns `None` if no data can be emitted. + pub fn oom_emit_to(&self, n: usize) -> Option { + if n == 0 { + return None; + } + + match self { + GroupOrdering::None => Some(EmitTo::First(n)), + GroupOrdering::Partial(_) | GroupOrdering::Full(_) => { + self.emit_to().map(|emit_to| match emit_to { + EmitTo::First(max) => EmitTo::First(n.min(max)), + EmitTo::All => EmitTo::First(n), + }) + } + } + } + /// Updates the state the input is done pub fn input_done(&mut self) { match self { @@ -122,3 +145,59 @@ impl GroupOrdering { } } } + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int32Array}; + + #[test] + fn test_oom_emit_to_none_ordering() { + let group_ordering = GroupOrdering::None; + + assert_eq!(group_ordering.oom_emit_to(0), None); + assert_eq!(group_ordering.oom_emit_to(5), Some(EmitTo::First(5))); + } + + #[test] + fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> { + let mut group_ordering = + GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?); + + let batch_group_values: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ]; + let group_indices = vec![0, 1, 2]; + + group_ordering.new_groups(&batch_group_values, &group_indices, 3)?; + + assert_eq!(group_ordering.emit_to(), Some(EmitTo::First(2))); + assert_eq!(group_ordering.oom_emit_to(1), Some(EmitTo::First(1))); + assert_eq!(group_ordering.oom_emit_to(3), Some(EmitTo::First(2))); + + Ok(()) + } + + #[test] + fn test_oom_emit_to_partial_without_boundary() -> Result<()> { + let mut group_ordering = + GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?); + + let batch_group_values: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 1, 1])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ]; + let group_indices = vec![0, 1, 2]; + + group_ordering.new_groups(&batch_group_values, &group_indices, 3)?; + + assert_eq!(group_ordering.emit_to(), None); + assert_eq!(group_ordering.oom_emit_to(3), None); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 1d9e5266a43d..b857fdca3f21 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1029,9 +1029,15 @@ impl GroupedHashAggregateStream { Ok(None) } OutOfMemoryMode::EmitEarly if self.group_values.len() > 1 => { - // Clamp to the sort boundary when using partial group ordering, - // otherwise remove_groups panics (#20445). - if let Some(emit_to) = self.emit_target_for_oom() + let n = if self.group_values.len() >= self.batch_size { + // Try to emit an integer multiple of batch size if possible + self.group_values.len() / self.batch_size * self.batch_size + } else { + // Otherwise emit whatever we can + self.group_values.len() + }; + + if let Some(emit_to) = self.group_ordering.oom_emit_to(n) && let Some(batch) = self.emit(emit_to, false)? { return Ok(Some(ExecutionState::ProducingOutput(batch))); @@ -1044,42 +1050,6 @@ impl GroupedHashAggregateStream { } } - /// Returns the number of groups groups that can be emitted to avoid an - /// out-of-memory condition. - /// - /// Returns `None` if emitting is not possible. - /// - /// Returns `Some(EmitTo)` with the number of groups to emit if it is possible - /// to emit some groups to free memory. - fn emit_target_for_oom(&self) -> Option { - let n = if self.group_values.len() >= self.batch_size { - // Try to emit an integer multiple of batch size if possible - self.group_values.len() / self.batch_size * self.batch_size - } else { - // Otherwise emit whatever we can - self.group_values.len() - }; - - // Special case for GroupOrdering::None since emit_to() returns None for - // that case, but we can still emit some groups to try to resolve the OOM - if matches!(&self.group_ordering, GroupOrdering::None) { - return Some(EmitTo::First(n)); - }; - - // For the case of GroupOrdering::Partial or GroupOrdering::Full, use - // the ordering's emit_to() method to determine how many groups can be - // emitted while respecting the ordering guarantees, clamped to the - // batch size. - self.group_ordering.emit_to().map(|emit_to| match emit_to { - // If the ordering allows emitting some groups, - // emit as many as we can to try to resolve the OOM, - EmitTo::First(max) => EmitTo::First(n.min(max)), - // if the ordering allows emitting all groups, we can emit n - // groups to try to resolve the OOM - EmitTo::All => EmitTo::First(n), - }) - } - fn update_memory_reservation(&mut self) -> Result<()> { let acc = self.accumulators.iter().map(|x| x.size()).sum::(); let groups_and_acc_size = acc From cb46642a0f67bc5325e56878039aac55e2c4506c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 17 Mar 2026 09:27:35 -0400 Subject: [PATCH 5/5] updates --- .../physical-plan/src/aggregates/order/mod.rs | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index 134d0bfc3615..183e2b0098bb 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -162,19 +162,31 @@ mod tests { assert_eq!(group_ordering.oom_emit_to(5), Some(EmitTo::First(5))); } - #[test] - fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> { + /// Creates a partially ordered grouping state with three groups. + /// + /// `sort_key_values` controls whether a sort boundary exists in the batch: + /// distinct values such as `[1, 2, 3]` create boundaries, while repeated + /// values such as `[1, 1, 1]` do not. + fn partial_ordering(sort_key_values: Vec) -> Result { let mut group_ordering = GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?); let batch_group_values: Vec = vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(sort_key_values)), Arc::new(Int32Array::from(vec![10, 20, 30])), ]; let group_indices = vec![0, 1, 2]; group_ordering.new_groups(&batch_group_values, &group_indices, 3)?; + Ok(group_ordering) + } + + #[test] + fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> { + let group_ordering = partial_ordering(vec![1, 2, 3])?; + + // Can emit both `1` and `2` groups because we have seen `3` assert_eq!(group_ordering.emit_to(), Some(EmitTo::First(2))); assert_eq!(group_ordering.oom_emit_to(1), Some(EmitTo::First(1))); assert_eq!(group_ordering.oom_emit_to(3), Some(EmitTo::First(2))); @@ -184,17 +196,9 @@ mod tests { #[test] fn test_oom_emit_to_partial_without_boundary() -> Result<()> { - let mut group_ordering = - GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?); - - let batch_group_values: Vec = vec![ - Arc::new(Int32Array::from(vec![1, 1, 1])), - Arc::new(Int32Array::from(vec![10, 20, 30])), - ]; - let group_indices = vec![0, 1, 2]; - - group_ordering.new_groups(&batch_group_values, &group_indices, 3)?; + let group_ordering = partial_ordering(vec![1, 1, 1])?; + // Can't emit the last `1` group as it may have more values assert_eq!(group_ordering.emit_to(), None); assert_eq!(group_ordering.oom_emit_to(3), None);