Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream {
// on the grouping columns.
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());

// Recreate `group_values` for streaming merge so group ids are assigned
// in first-seen order, as required by `GroupOrderingFull`.
// The pre-spill multi-column collector may use `vectorized_intern`, which
// can assign new group ids out of input order under hash collisions.
let group_schema = self
.spill_state
.merging_group_by
.group_schema(&self.spill_state.spill_schema)?;
if group_schema.fields().len() > 1 {
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
Comment on lines +1278 to +1279
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:

only recreates group_values for multi-column GROUP BY (fields().len() > 1), since single-column uses GroupValuesPrimitive which doesn't have the vectorized_intern bug. This avoids the memory overhead that was causing aggregate_source_not_yielding_with_spill to fail with its tight 2600-byte budget.

}

// Use `OutOfMemoryMode::ReportError` from this point on
// to ensure we don't spill the spilled data to disk again.
self.oom_mode = OutOfMemoryMode::ReportError;
Expand Down
Loading