diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 4a1b0e5c8c02..25116716bd37 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream { // on the grouping columns. self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); + // Recreate `group_values` for streaming merge so group ids are assigned + // in first-seen order, as required by `GroupOrderingFull`. + // The pre-spill multi-column collector may use `vectorized_intern`, which + // can assign new group ids out of input order under hash collisions. + let group_schema = self + .spill_state + .merging_group_by + .group_schema(&self.spill_state.spill_schema)?; + if group_schema.fields().len() > 1 { + self.group_values = new_group_values(group_schema, &self.group_ordering)?; + } + // Use `OutOfMemoryMode::ReportError` from this point on // to ensure we don't spill the spilled data to disk again. self.oom_mode = OutOfMemoryMode::ReportError;