From bf857444b6fd1be0b5e45b8a63df967b8c1c77ba Mon Sep 17 00:00:00 2001 From: gboucher90 Date: Tue, 10 Mar 2026 21:54:04 +0100 Subject: [PATCH] Recreate group_values after spill merge to fix duplicate group keys (#20724) When switching to streaming merge after spill, group_ordering is set to Full but group_values is not recreated. The existing GroupValuesColumn uses vectorized_intern which can produce non-monotonic group indices, violating GroupOrderingFull's assumption and causing duplicate groups in the output. Fix: recreate group_values with the correct streaming mode after updating group_ordering in update_merged_stream(). --- datafusion/physical-plan/src/aggregates/row_hash.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 7cc59b44a301..a6fc27572370 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1233,6 +1233,18 @@ impl GroupedHashAggregateStream { // on the grouping columns. self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); + // Recreate group_values to use streaming mode (GroupValuesColumn + // with scalarized_intern) which preserves input row order, as required + // by GroupOrderingFull. This is only needed for multi-column group by, + // since single-column uses GroupValuesPrimitive which is always safe. + let group_schema = self + .spill_state + .merging_group_by + .group_schema(&self.spill_state.spill_schema)?; + if group_schema.fields().len() > 1 { + self.group_values = new_group_values(group_schema, &self.group_ordering)?; + } + // Use `OutOfMemoryMode::ReportError` from this point on // to ensure we don't spill the spilled data to disk again. self.oom_mode = OutOfMemoryMode::ReportError;