From 64631cceb5f6ee972f56ae84224f5967d50f6d99 Mon Sep 17 00:00:00 2001 From: gboucher90 Date: Tue, 10 Mar 2026 21:47:10 +0100 Subject: [PATCH 1/4] Add regression test for duplicate group keys after hash aggregation spill (#20724) Add a deterministic test that reproduces the bug where hash aggregation produces duplicate group keys after spilling to disk. The root cause: update_merged_stream() sets GroupOrdering::Full but does not recreate group_values, leaving GroupValuesColumn (vectorized_intern) active. Under hash collisions, vectorized_intern produces non-monotonic group indices, causing GroupOrderingFull to prematurely emit groups. Introduces a new force_hash_partial_collisions feature that truncates hashes to 5 bits (32 distinct values) instead of all zeros. Full collisions paradoxically do not trigger the bug because all rows take the same code path producing monotonic indices. Partial collisions create the necessary mix of fast-path and slow-path rows. --- .github/workflows/extended.yml | 7 +- .github/workflows/rust.yml | 2 + datafusion/common/Cargo.toml | 1 + datafusion/common/src/hash_utils.rs | 57 ++++++++-- datafusion/core/Cargo.toml | 7 ++ datafusion/core/tests/memory_limit/mod.rs | 125 +++++++++++++++++++++- datafusion/physical-plan/Cargo.toml | 1 + 7 files changed, 189 insertions(+), 11 deletions(-) diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index 3837feb62226..7697826837b4 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -154,11 +154,16 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: stable - - name: Run tests + - name: Run tests (force_hash_collisions) run: | cd datafusion cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --exclude datafusion-cli --workspace --lib --tests --features=force_hash_collisions,avro cargo clean + - name: Run tests (force_hash_partial_collisions, #20724) + run: | + cd datafusion + cargo test --profile ci -p datafusion --test core_integration --features=force_hash_partial_collisions -- memory_limit::test_no_duplicate_groups_after_spill --exact + cargo clean sqllogictest-sqlite: name: "Run sqllogictests with the sqlite test suite" diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index af37b470a498..9844a00559f4 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -210,6 +210,8 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=encoding_expressions - name: Check datafusion (force_hash_collisions) run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_collisions + - name: Check datafusion (force_hash_partial_collisions) + run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_partial_collisions - name: Check datafusion (math_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=math_expressions - name: Check datafusion (parquet) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index e4ba71e45c66..d9a460de42e6 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -49,6 +49,7 @@ parquet_encryption = [ "dep:hex", ] force_hash_collisions = [] +force_hash_partial_collisions = [] recursive_protection = ["dep:recursive"] parquet = ["dep:parquet"] sql = ["sqlparser"] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 3be6118c55ff..13c8dfae37d9 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -22,12 +22,18 @@ use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::*; use arrow::compute::take; use arrow::datatypes::*; -#[cfg(not(feature = "force_hash_collisions"))] +#[cfg(not(all( + feature = "force_hash_collisions", + not(feature = "force_hash_partial_collisions") +)))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; use itertools::Itertools; use std::collections::HashMap; -#[cfg(not(feature = "force_hash_collisions"))] +#[cfg(not(all( + feature = "force_hash_collisions", + not(feature = "force_hash_partial_collisions") +)))] use crate::cast::{ as_binary_view_array, as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, as_large_list_array, as_large_list_view_array, @@ -935,8 +941,11 @@ fn hash_run_array( /// Internal helper function that hashes a single array and either initializes or combines /// the hash values in the buffer. -#[cfg(not(feature = "force_hash_collisions"))] -fn hash_single_array( +#[cfg(not(all( + feature = "force_hash_collisions", + not(feature = "force_hash_partial_collisions") +)))] +fn hash_single_array_impl( array: &dyn Array, random_state: &RandomState, hashes_buffer: &mut [u64], @@ -1007,17 +1016,47 @@ fn hash_single_array( Ok(()) } -/// Test version of `hash_single_array` that forces all hashes to collide to zero. -#[cfg(feature = "force_hash_collisions")] +/// Dispatches to the appropriate `hash_single_array` implementation based on +/// the enabled feature flags. +#[cfg(not(any( + feature = "force_hash_collisions", + feature = "force_hash_partial_collisions" +)))] +fn hash_single_array( + array: &dyn Array, + random_state: &RandomState, + hashes_buffer: &mut [u64], + rehash: bool, +) -> Result<()> { + hash_single_array_impl(array, random_state, hashes_buffer, rehash) +} + +/// Test version: forces full hash collisions by setting all hashes to 0. +#[cfg(all( + feature = "force_hash_collisions", + not(feature = "force_hash_partial_collisions") +))] fn hash_single_array( _array: &dyn Array, _random_state: &RandomState, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { - for hash in hashes_buffer.iter_mut() { - *hash = 0 - } + hashes_buffer.iter_mut().for_each(|x| *x = 0); + Ok(()) +} + +/// Test version: truncates real hashes to 5 bits (32 distinct values) to create +/// partial collisions that expose non-monotonic group index bugs (#20724). +#[cfg(feature = "force_hash_partial_collisions")] +fn hash_single_array( + array: &dyn Array, + random_state: &RandomState, + hashes_buffer: &mut [u64], + rehash: bool, +) -> Result<()> { + hash_single_array_impl(array, random_state, hashes_buffer, rehash)?; + hashes_buffer.iter_mut().for_each(|h| *h &= 0x1F); Ok(()) } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 9beb94497a5f..fce0d1c7cfe9 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -71,6 +71,13 @@ default = [ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] +# Used for testing ONLY: truncates hashes to 5 bits (32 distinct values) to create partial collisions. +# Unlike force_hash_collisions (all hashes = 0), this creates a mix of colliding and non-colliding keys, +# which triggers non-monotonic group indices in vectorized_intern (#20724). +force_hash_partial_collisions = [ + "datafusion-physical-plan/force_hash_partial_collisions", + "datafusion-common/force_hash_partial_collisions", +] math_expressions = ["datafusion-functions/math_expressions"] parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] parquet_encryption = [ diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index ff8c512cbd22..98230dad8710 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -24,7 +24,9 @@ use std::sync::{Arc, LazyLock}; #[cfg(feature = "extended_tests")] mod memory_limit_validation; mod repartition_mem_limit; -use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; +use arrow::array::{ + ArrayRef, DictionaryArray, Int32Array, Int64Array, RecordBatch, StringViewArray, +}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow_schema::{DataType, Field, Schema}; @@ -56,6 +58,7 @@ use datafusion_physical_plan::collect as collect_batches; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; +use std::collections::HashSet; use test_utils::AccessLogGenerator; use async_trait::async_trait; @@ -1172,3 +1175,123 @@ impl TableProvider for SortedTableProvider { Ok(DataSourceExec::from_data_source(mem_conf)) } } + +// ============================================================================ +// Regression tests for https://github.com/apache/datafusion/issues/20724 +// +// When hash aggregation spills and switches to streaming merge, +// `group_values` must be recreated with the streaming variant. +// Otherwise `vectorized_intern` can produce non-monotonic group indices +// under hash collisions, causing `GroupOrderingFull` to prematurely +// emit groups → duplicate keys in output. +// ============================================================================ + +/// Helper: set up a session that forces spilling during aggregation. +async fn setup_spill_agg_context( + memory_limit: usize, + batch_size: usize, +) -> Result { + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))) + .with_disk_manager_builder( + DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory), + ) + .build_arc() + .unwrap(); + + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(64 * 1024) + .with_sort_in_place_threshold_bytes(0) + .with_spill_compression(SpillCompression::Uncompressed) + .with_batch_size(batch_size) + .with_target_partitions(1); + + Ok(SessionContext::new_with_config_rt(config, runtime)) +} + +/// Regression test for https://github.com/apache/datafusion/issues/20724 +/// +/// When hash aggregation spills and switches to streaming merge, +/// `group_values` (GroupValuesColumn) is not recreated with the +/// streaming variant (). This means `vectorized_intern` is used +/// post-spill, which can produce non-monotonic group indices under hash +/// collisions, causing `GroupOrderingFull` to prematurely emit groups +/// and create duplicate keys in the output. +/// +/// Requirements to trigger: +/// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`) +/// - `force_hash_partial_collisions` feature → truncated hashes create the mix +/// of colliding/non-colliding keys needed for non-monotonic indices +/// - `batch_size=50` → not a multiple of rows-per-group in the merged stream, +/// so groups span batch boundaries and premature emission causes duplicates +#[tokio::test] +async fn test_no_duplicate_groups_after_spill() -> Result<()> { + let num_keys: i64 = 5000; + let rows_per_key: i64 = 4; + let total_rows = (num_keys * rows_per_key) as usize; + + let schema = Arc::new(Schema::new(vec![ + Field::new("key_a", DataType::Int64, false), + Field::new("key_b", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])); + + let mut keys_a = Vec::with_capacity(total_rows); + let mut keys_b = Vec::with_capacity(total_rows); + let mut vals = Vec::with_capacity(total_rows); + for r in 0..rows_per_key { + for k in 0..num_keys { + keys_a.push(k / 100); + keys_b.push(k % 100); + vals.push(r * num_keys + k); + } + } + + let mut batches = Vec::new(); + for start in (0..total_rows).step_by(500) { + let end = (start + 500).min(total_rows); + batches.push(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(keys_a[start..end].to_vec())), + Arc::new(Int64Array::from(keys_b[start..end].to_vec())), + Arc::new(Int64Array::from(vals[start..end].to_vec())), + ], + )?); + } + + let ctx = setup_spill_agg_context(128 * 1024, 50).await?; + let table = MemTable::try_new(schema, vec![batches])?; + ctx.register_table("t", Arc::new(table))?; + + let df = ctx + .sql("SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b") + .await?; + let results = + collect_batches(df.create_physical_plan().await?, ctx.task_ctx()).await?; + + let mut seen = HashSet::new(); + for batch in &results { + let ka = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let kb = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + assert!( + seen.insert((ka.value(i), kb.value(i))), + "DUPLICATE group key ({}, {}). \ + Bug #20724: group_values not recreated for streaming merge.", + ka.value(i), + kb.value(i), + ); + } + } + assert_eq!(seen.len(), num_keys as usize); + Ok(()) +} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 6a28486cca5d..acb6c2ac4fcf 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -39,6 +39,7 @@ workspace = true [features] force_hash_collisions = [] +force_hash_partial_collisions = ["datafusion-common/force_hash_partial_collisions"] test_utils = ["arrow/test_utils"] tokio_coop = [] tokio_coop_fallback = [] From c5eced19369a13d2dec22ee25353c75bbbb110f8 Mon Sep 17 00:00:00 2001 From: gboucher90 Date: Tue, 10 Mar 2026 21:54:04 +0100 Subject: [PATCH 2/4] Recreate group_values after spill merge to fix duplicate group keys (#20724) When switching to streaming merge after spill, group_ordering is set to Full but group_values is not recreated. The existing GroupValuesColumn uses vectorized_intern which can produce non-monotonic group indices, violating GroupOrderingFull's assumption and causing duplicate groups in the output. Fix: recreate group_values with the correct streaming mode after updating group_ordering in update_merged_stream(). --- datafusion/physical-plan/src/aggregates/row_hash.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 4a1b0e5c8c02..8a45e4b503d5 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream { // on the grouping columns. self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); + // Recreate group_values to use streaming mode (GroupValuesColumn + // with scalarized_intern) which preserves input row order, as required + // by GroupOrderingFull. This is only needed for multi-column group by, + // since single-column uses GroupValuesPrimitive which is always safe. + let group_schema = self + .spill_state + .merging_group_by + .group_schema(&self.spill_state.spill_schema)?; + if group_schema.fields().len() > 1 { + self.group_values = new_group_values(group_schema, &self.group_ordering)?; + } + // Use `OutOfMemoryMode::ReportError` from this point on // to ensure we don't spill the spilled data to disk again. self.oom_mode = OutOfMemoryMode::ReportError; From 31e4e15cbf909746f3a57f9a2f0bb5b09fe25c54 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 12 Mar 2026 07:34:31 -0400 Subject: [PATCH 3/4] [datafusion]: keep PR 20858 to code fix only --- .github/workflows/extended.yml | 7 +- .github/workflows/rust.yml | 2 - datafusion/common/Cargo.toml | 1 - datafusion/common/src/hash_utils.rs | 57 ++-------- datafusion/core/Cargo.toml | 7 -- datafusion/core/tests/memory_limit/mod.rs | 125 +--------------------- datafusion/physical-plan/Cargo.toml | 1 - 7 files changed, 11 insertions(+), 189 deletions(-) diff --git a/.github/workflows/extended.yml b/.github/workflows/extended.yml index 7697826837b4..3837feb62226 100644 --- a/.github/workflows/extended.yml +++ b/.github/workflows/extended.yml @@ -154,16 +154,11 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: stable - - name: Run tests (force_hash_collisions) + - name: Run tests run: | cd datafusion cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --exclude datafusion-cli --workspace --lib --tests --features=force_hash_collisions,avro cargo clean - - name: Run tests (force_hash_partial_collisions, #20724) - run: | - cd datafusion - cargo test --profile ci -p datafusion --test core_integration --features=force_hash_partial_collisions -- memory_limit::test_no_duplicate_groups_after_spill --exact - cargo clean sqllogictest-sqlite: name: "Run sqllogictests with the sqlite test suite" diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9844a00559f4..af37b470a498 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -210,8 +210,6 @@ jobs: run: cargo check --profile ci --no-default-features -p datafusion --features=encoding_expressions - name: Check datafusion (force_hash_collisions) run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_collisions - - name: Check datafusion (force_hash_partial_collisions) - run: cargo check --profile ci --no-default-features -p datafusion --features=force_hash_partial_collisions - name: Check datafusion (math_expressions) run: cargo check --profile ci --no-default-features -p datafusion --features=math_expressions - name: Check datafusion (parquet) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d9a460de42e6..e4ba71e45c66 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -49,7 +49,6 @@ parquet_encryption = [ "dep:hex", ] force_hash_collisions = [] -force_hash_partial_collisions = [] recursive_protection = ["dep:recursive"] parquet = ["dep:parquet"] sql = ["sqlparser"] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 13c8dfae37d9..3be6118c55ff 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -22,18 +22,12 @@ use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; use arrow::array::*; use arrow::compute::take; use arrow::datatypes::*; -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] +#[cfg(not(feature = "force_hash_collisions"))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; use itertools::Itertools; use std::collections::HashMap; -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] +#[cfg(not(feature = "force_hash_collisions"))] use crate::cast::{ as_binary_view_array, as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, as_large_list_array, as_large_list_view_array, @@ -941,11 +935,8 @@ fn hash_run_array( /// Internal helper function that hashes a single array and either initializes or combines /// the hash values in the buffer. -#[cfg(not(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -)))] -fn hash_single_array_impl( +#[cfg(not(feature = "force_hash_collisions"))] +fn hash_single_array( array: &dyn Array, random_state: &RandomState, hashes_buffer: &mut [u64], @@ -1016,47 +1007,17 @@ fn hash_single_array_impl( Ok(()) } -/// Dispatches to the appropriate `hash_single_array` implementation based on -/// the enabled feature flags. -#[cfg(not(any( - feature = "force_hash_collisions", - feature = "force_hash_partial_collisions" -)))] -fn hash_single_array( - array: &dyn Array, - random_state: &RandomState, - hashes_buffer: &mut [u64], - rehash: bool, -) -> Result<()> { - hash_single_array_impl(array, random_state, hashes_buffer, rehash) -} - -/// Test version: forces full hash collisions by setting all hashes to 0. -#[cfg(all( - feature = "force_hash_collisions", - not(feature = "force_hash_partial_collisions") -))] +/// Test version of `hash_single_array` that forces all hashes to collide to zero. +#[cfg(feature = "force_hash_collisions")] fn hash_single_array( _array: &dyn Array, _random_state: &RandomState, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { - hashes_buffer.iter_mut().for_each(|x| *x = 0); - Ok(()) -} - -/// Test version: truncates real hashes to 5 bits (32 distinct values) to create -/// partial collisions that expose non-monotonic group index bugs (#20724). -#[cfg(feature = "force_hash_partial_collisions")] -fn hash_single_array( - array: &dyn Array, - random_state: &RandomState, - hashes_buffer: &mut [u64], - rehash: bool, -) -> Result<()> { - hash_single_array_impl(array, random_state, hashes_buffer, rehash)?; - hashes_buffer.iter_mut().for_each(|h| *h &= 0x1F); + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } Ok(()) } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index fce0d1c7cfe9..9beb94497a5f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -71,13 +71,6 @@ default = [ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] -# Used for testing ONLY: truncates hashes to 5 bits (32 distinct values) to create partial collisions. -# Unlike force_hash_collisions (all hashes = 0), this creates a mix of colliding and non-colliding keys, -# which triggers non-monotonic group indices in vectorized_intern (#20724). -force_hash_partial_collisions = [ - "datafusion-physical-plan/force_hash_partial_collisions", - "datafusion-common/force_hash_partial_collisions", -] math_expressions = ["datafusion-functions/math_expressions"] parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] parquet_encryption = [ diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 98230dad8710..ff8c512cbd22 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -24,9 +24,7 @@ use std::sync::{Arc, LazyLock}; #[cfg(feature = "extended_tests")] mod memory_limit_validation; mod repartition_mem_limit; -use arrow::array::{ - ArrayRef, DictionaryArray, Int32Array, Int64Array, RecordBatch, StringViewArray, -}; +use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow_schema::{DataType, Field, Schema}; @@ -58,7 +56,6 @@ use datafusion_physical_plan::collect as collect_batches; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; -use std::collections::HashSet; use test_utils::AccessLogGenerator; use async_trait::async_trait; @@ -1175,123 +1172,3 @@ impl TableProvider for SortedTableProvider { Ok(DataSourceExec::from_data_source(mem_conf)) } } - -// ============================================================================ -// Regression tests for https://github.com/apache/datafusion/issues/20724 -// -// When hash aggregation spills and switches to streaming merge, -// `group_values` must be recreated with the streaming variant. -// Otherwise `vectorized_intern` can produce non-monotonic group indices -// under hash collisions, causing `GroupOrderingFull` to prematurely -// emit groups → duplicate keys in output. -// ============================================================================ - -/// Helper: set up a session that forces spilling during aggregation. -async fn setup_spill_agg_context( - memory_limit: usize, - batch_size: usize, -) -> Result { - let runtime = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))) - .with_disk_manager_builder( - DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory), - ) - .build_arc() - .unwrap(); - - let config = SessionConfig::new() - .with_sort_spill_reservation_bytes(64 * 1024) - .with_sort_in_place_threshold_bytes(0) - .with_spill_compression(SpillCompression::Uncompressed) - .with_batch_size(batch_size) - .with_target_partitions(1); - - Ok(SessionContext::new_with_config_rt(config, runtime)) -} - -/// Regression test for https://github.com/apache/datafusion/issues/20724 -/// -/// When hash aggregation spills and switches to streaming merge, -/// `group_values` (GroupValuesColumn) is not recreated with the -/// streaming variant (). This means `vectorized_intern` is used -/// post-spill, which can produce non-monotonic group indices under hash -/// collisions, causing `GroupOrderingFull` to prematurely emit groups -/// and create duplicate keys in the output. -/// -/// Requirements to trigger: -/// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`) -/// - `force_hash_partial_collisions` feature → truncated hashes create the mix -/// of colliding/non-colliding keys needed for non-monotonic indices -/// - `batch_size=50` → not a multiple of rows-per-group in the merged stream, -/// so groups span batch boundaries and premature emission causes duplicates -#[tokio::test] -async fn test_no_duplicate_groups_after_spill() -> Result<()> { - let num_keys: i64 = 5000; - let rows_per_key: i64 = 4; - let total_rows = (num_keys * rows_per_key) as usize; - - let schema = Arc::new(Schema::new(vec![ - Field::new("key_a", DataType::Int64, false), - Field::new("key_b", DataType::Int64, false), - Field::new("value", DataType::Int64, false), - ])); - - let mut keys_a = Vec::with_capacity(total_rows); - let mut keys_b = Vec::with_capacity(total_rows); - let mut vals = Vec::with_capacity(total_rows); - for r in 0..rows_per_key { - for k in 0..num_keys { - keys_a.push(k / 100); - keys_b.push(k % 100); - vals.push(r * num_keys + k); - } - } - - let mut batches = Vec::new(); - for start in (0..total_rows).step_by(500) { - let end = (start + 500).min(total_rows); - batches.push(RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int64Array::from(keys_a[start..end].to_vec())), - Arc::new(Int64Array::from(keys_b[start..end].to_vec())), - Arc::new(Int64Array::from(vals[start..end].to_vec())), - ], - )?); - } - - let ctx = setup_spill_agg_context(128 * 1024, 50).await?; - let table = MemTable::try_new(schema, vec![batches])?; - ctx.register_table("t", Arc::new(table))?; - - let df = ctx - .sql("SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b") - .await?; - let results = - collect_batches(df.create_physical_plan().await?, ctx.task_ctx()).await?; - - let mut seen = HashSet::new(); - for batch in &results { - let ka = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let kb = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - for i in 0..batch.num_rows() { - assert!( - seen.insert((ka.value(i), kb.value(i))), - "DUPLICATE group key ({}, {}). \ - Bug #20724: group_values not recreated for streaming merge.", - ka.value(i), - kb.value(i), - ); - } - } - assert_eq!(seen.len(), num_keys as usize); - Ok(()) -} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index acb6c2ac4fcf..6a28486cca5d 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -39,7 +39,6 @@ workspace = true [features] force_hash_collisions = [] -force_hash_partial_collisions = ["datafusion-common/force_hash_partial_collisions"] test_utils = ["arrow/test_utils"] tokio_coop = [] tokio_coop_fallback = [] From 54aaed96d0587fe0741afc4d3681017aea19d737 Mon Sep 17 00:00:00 2001 From: Guillaume Boucher Date: Thu, 12 Mar 2026 13:21:10 +0100 Subject: [PATCH 4/4] update comment Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/aggregates/row_hash.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 8a45e4b503d5..25116716bd37 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1267,10 +1267,10 @@ impl GroupedHashAggregateStream { // on the grouping columns. self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); - // Recreate group_values to use streaming mode (GroupValuesColumn - // with scalarized_intern) which preserves input row order, as required - // by GroupOrderingFull. This is only needed for multi-column group by, - // since single-column uses GroupValuesPrimitive which is always safe. + // Recreate `group_values` for streaming merge so group ids are assigned + // in first-seen order, as required by `GroupOrderingFull`. + // The pre-spill multi-column collector may use `vectorized_intern`, which + // can assign new group ids out of input order under hash collisions. let group_schema = self .spill_state .merging_group_by