From 156ee11b99b1bc64536e51053c7371a7ab5c3869 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 16 Sep 2025 14:46:18 +0300 Subject: [PATCH 01/17] perf: Optimize `multi_group_by` when there are a lot of unique groups This optimization is fairly simple: if the row indices to append are continues (i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]`) we will call an optimized function for that case the optimized function should copy all the data in a single pass making it very fast as opposed to item by item --- .../group_values/multi_group_by/bytes.rs | 101 +++++++++++++++++- .../group_values/multi_group_by/mod.rs | 75 +++++++++++-- .../group_values/multi_group_by/primitive.rs | 29 +++++ .../aggregates/group_values/null_builder.rs | 7 ++ 4 files changed, 201 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index be1f68ea453fa..199100eebcc64 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -22,7 +22,7 @@ use arrow::array::{ GenericBinaryArray, GenericByteArray, GenericStringArray, OffsetSizeTrait, }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; -use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType}; +use arrow::datatypes::{ArrowNativeType, ByteArrayType, DataType, GenericBinaryType}; use datafusion_common::utils::proxy::VecAllocExt; use datafusion_common::{DataFusionError, Result}; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; @@ -171,6 +171,74 @@ where Ok(()) } + fn append_array_slice_inner( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> + where + B: ByteArrayType, + { + let array = array.as_bytes::(); + + let offsets = array.offsets(); + let bytes = array.value_data(); + + // Is the slice all nulls + let all_nulls: bool; + + // 1. Append the nulls + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + let nulls_slice = nulls.slice(start, length); + all_nulls = nulls_slice.null_count() == length; + self.nulls.append_buffer(&nulls_slice); + } else { + all_nulls = false; + self.nulls.append_n(length, false); + } + + let values_start = offsets[start].as_usize(); + let values_end = offsets[start + length].as_usize(); + + // 2. Append the offsets + + // Must be before adding the bytes to the buffer + let mut last_offset = self.buffer.len(); + + if all_nulls { + // If all nulls, we can just repeat the last offset + for _ in 0..length { + self.offsets.push(O::usize_as(last_offset)); + } + } else { + for start_and_end_values in offsets[start..=start + length].windows(2) { + let length = start_and_end_values[1] - start_and_end_values[0]; + last_offset += length.as_usize(); + self.offsets.push(O::usize_as(last_offset)); + } + } + + // 3. Append the bytes + + // Only if not all nulls append the actual bytes + if !all_nulls { + // Note: if the array have nulls we might copy some bytes that are not used. + + // Add all the bytes for the values directly to the byte buffer + self.buffer.append_slice(&bytes[values_start..values_end]); + + if self.buffer.len() > self.max_buffer_size { + return Err(DataFusionError::Execution(format!( + "offset overflow, buffer size > {}", + self.max_buffer_size + ))); + } + } + + Ok(()) + } + fn do_equal_to_inner( &self, lhs_row: usize, @@ -327,6 +395,37 @@ where Ok(()) } + fn append_array_slice( + &mut self, + column: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + column.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.append_array_slice_inner::>( + column, start, length, + )? + } + OutputType::Utf8 => { + debug_assert!(matches!( + column.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.append_array_slice_inner::>( + column, start, length, + )? + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + }; + + Ok(()) + } + fn len(&self) -> usize { self.offsets.len() - 1 } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 5d96ac6dccedc..d2155f5734fe2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -88,6 +88,24 @@ pub trait GroupColumn: Send + Sync { /// The vectorized version `append_val` fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>; + /// Append slice of values from `array`, starting at `start` for `length` rows + /// + /// This is a special case of `vectorized_append` when the rows are continuous + /// + /// You should implement this to optimize large copies of contiguous values. + /// + /// This does not get the sliced array even though it would be more user-friendly + /// to allow optimization that avoid the additional computation that can happen in a slice + fn append_array_slice( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + let rows = (start..start + length).collect::>(); + self.vectorized_append(array, &rows) + } + /// Returns the number of rows stored in this builder fn len(&self) -> usize; @@ -233,6 +251,16 @@ struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, + /// The last row index in `append_row_indices` + /// + /// Will be None if `append_row_indices` is empty + last_append_row_index: Option, + + /// If all the values in `append_row_indices` are continuous + /// i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]` + /// this is used to optimize the `vectorized_append` operation + are_row_indices_continuous: bool, + /// The `vectorized_equal_to` row indices buffer equal_to_row_indices: Vec, @@ -250,12 +278,28 @@ struct VectorizedOperationBuffers { impl VectorizedOperationBuffers { fn clear(&mut self) { - self.append_row_indices.clear(); + self.clear_append_row_indices(); self.equal_to_row_indices.clear(); self.equal_to_group_indices.clear(); self.equal_to_results.clear(); self.remaining_row_indices.clear(); } + + fn add_append_row_index(&mut self, row: usize) { + self.are_row_indices_continuous = self.are_row_indices_continuous + && self + .last_append_row_index + .is_none_or(|last| last + 1 == row); + self.last_append_row_index = Some(row); + + self.append_row_indices.push(row); + } + + fn clear_append_row_indices(&mut self) { + self.append_row_indices.clear(); + self.are_row_indices_continuous = true; + self.last_append_row_index = None; + } } impl GroupValuesColumn { @@ -498,7 +542,7 @@ impl GroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { - self.vectorized_operation_buffers.append_row_indices.clear(); + self.vectorized_operation_buffers.clear_append_row_indices(); self.vectorized_operation_buffers .equal_to_row_indices .clear(); @@ -528,9 +572,7 @@ impl GroupValuesColumn { ); // Add row index to `vectorized_append_row_indices` - self.vectorized_operation_buffers - .append_row_indices - .push(row); + self.vectorized_operation_buffers.add_append_row_index(row); // Set group index to row in `groups` groups[row] = current_group_idx; @@ -578,11 +620,24 @@ impl GroupValuesColumn { } let iter = self.group_values.iter_mut().zip(cols.iter()); - for (group_column, col) in iter { - group_column.vectorized_append( - col, - &self.vectorized_operation_buffers.append_row_indices, - )?; + if self.vectorized_operation_buffers.are_row_indices_continuous + && !self + .vectorized_operation_buffers + .append_row_indices + .is_empty() + { + let start = self.vectorized_operation_buffers.append_row_indices[0]; + let length = self.vectorized_operation_buffers.append_row_indices.len(); + for (group_column, col) in iter { + group_column.append_array_slice(col, start, length)?; + } + } else { + for (group_column, col) in iter { + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + )?; + } } Ok(()) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index afec25fd3d666..a312bba71b279 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -175,6 +175,35 @@ impl GroupColumn Ok(()) } + fn append_array_slice( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { + let array = array.as_primitive::(); + + if NULLABLE { + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + self.nulls.append_buffer(&nulls.slice(start, length)); + } else { + self.nulls.append_n(length, false); + } + } else { + assert_eq!( + array.null_count(), + 0, + "unexpected nulls in non nullable input" + ); + self.nulls.append_n(length, false); + } + + self.group_values + .extend_from_slice(&array.values()[start..start + length]); + + Ok(()) + } + fn len(&self) -> usize { self.group_values.len() } diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 23ffc69f218bf..78f66b6647067 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -61,6 +61,13 @@ impl MaybeNullBufferBuilder { } } + /// Append [`NullBuffer`] to this [`NullBufferBuilder`] + /// + /// This is useful when you want to concatenate two null buffers. + pub fn append_buffer(&mut self, other: &NullBuffer) { + self.nulls.append_buffer(other); + } + /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { // NullBufferBuilder builder::allocated_size returns capacity in bits From 31331964f43ed65301a0b21c75107342db774fd5 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 16 Sep 2025 16:04:27 +0300 Subject: [PATCH 02/17] updated based on cr --- .../group_values/multi_group_by/mod.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index d2155f5734fe2..e490487f604b6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -251,15 +251,10 @@ struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, - /// The last row index in `append_row_indices` - /// - /// Will be None if `append_row_indices` is empty - last_append_row_index: Option, - /// If all the values in `append_row_indices` are continuous /// i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]` /// this is used to optimize the `vectorized_append` operation - are_row_indices_continuous: bool, + are_row_indices_consecutive: bool, /// The `vectorized_equal_to` row indices buffer equal_to_row_indices: Vec, @@ -286,19 +281,18 @@ impl VectorizedOperationBuffers { } fn add_append_row_index(&mut self, row: usize) { - self.are_row_indices_continuous = self.are_row_indices_continuous + self.are_row_indices_consecutive = self.are_row_indices_consecutive && self - .last_append_row_index + .append_row_indices + .last() .is_none_or(|last| last + 1 == row); - self.last_append_row_index = Some(row); self.append_row_indices.push(row); } fn clear_append_row_indices(&mut self) { self.append_row_indices.clear(); - self.are_row_indices_continuous = true; - self.last_append_row_index = None; + self.are_row_indices_consecutive = true; } } @@ -620,7 +614,9 @@ impl GroupValuesColumn { } let iter = self.group_values.iter_mut().zip(cols.iter()); - if self.vectorized_operation_buffers.are_row_indices_continuous + if self + .vectorized_operation_buffers + .are_row_indices_consecutive && !self .vectorized_operation_buffers .append_row_indices From 00b5f079f1c4886c1234e00dc31a710dcbd07831 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 16 Sep 2025 16:21:39 +0300 Subject: [PATCH 03/17] update comment --- .../src/aggregates/group_values/multi_group_by/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index e490487f604b6..cf0f676a04fc9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -251,7 +251,7 @@ struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, - /// If all the values in `append_row_indices` are continuous + /// If all the values in `append_row_indices` are consecutive /// i.e. `append_row_indices[i] + 1 == append_row_indices[i + 1]` /// this is used to optimize the `vectorized_append` operation are_row_indices_consecutive: bool, From 9476184e8635bb8de535d7f17b7b7ec24e57b046 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 16 Sep 2025 21:34:36 +0300 Subject: [PATCH 04/17] added fuzz test to test all values are unique in aggregate group by --- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 308 +++++++++++++++++- 1 file changed, 305 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 01554c1af7fa1..43a8fc6ab8a51 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -24,11 +24,12 @@ use crate::fuzz_cases::aggregation_fuzzer::{ }; use arrow::array::{ - types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array, RecordBatch, - StringArray, + make_array, types::Int64Type, Array, ArrayRef, AsArray, Int32Array, Int64Array, + RecordBatch, StringArray, StringViewArray, UInt64Array, UInt8Array, }; +use arrow::buffer::NullBuffer; use arrow::compute::concat_batches; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, UInt64Type}; use arrow::util::pretty::pretty_format_batches; use arrow_schema::{Field, Schema, SchemaRef}; use datafusion::datasource::memory::MemorySourceConfig; @@ -767,3 +768,304 @@ async fn test_single_mode_aggregate_single_mode_aggregate_with_spill() -> Result Ok(()) } + +/// This test where we group by multiple columns and a very extreme case where all the values are unique +/// i.e. there are the same number of groups as input rows +#[tokio::test] +async fn test_group_by_multiple_columns_all_unique() -> Result<()> { + let scan_schema = Arc::new(Schema::new(vec![ + Field::new("col_1", DataType::UInt8, true), + Field::new("idx", DataType::UInt64, false), + Field::new("grouping_key_1", DataType::Utf8, true), + Field::new("grouping_key_2", DataType::Utf8, false), + Field::new("grouping_key_3", DataType::Utf8View, true), + Field::new("grouping_key_4", DataType::Utf8View, false), + Field::new("grouping_key_5", DataType::Int32, true), + Field::new("grouping_key_6", DataType::Int32, false), + ])); + + let nullable_grouping_expr_indices = scan_schema + .fields() + .iter() + .enumerate() + .skip(1) + .filter(|(_, f)| f.is_nullable()) + .map(|(index, _)| index) + .collect::>(); + + let record_batch_size = 100; + let number_of_record_batches = 1000; + + #[derive(Clone)] + enum NullableDataGenConfig { + /// All the values in the array are nulls + All, + + /// No nulls values + None, + + /// Random nulls + Random, + } + + /// Generate an infinite iterator of (column_index, null_config) pairs + /// where the column_index is which column we're going to change based on the null_config + /// + /// Each (column_index, null_config) pair will be repeated [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] times + /// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases + /// + /// For example, if the nullable_column_indices are `[1, 3, 5]` + /// and [`NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG`] is 2 + /// this will generate the following sequence: + /// ```text + /// (1, AllNulls), (1, AllNulls), + /// (1, NoNulls), (1, NoNulls), + /// (1, RandomNulls), (1, RandomNulls), + /// (3, AllNulls), (3, AllNulls), + /// (3, NoNulls), (3, NoNulls), + /// (3, RandomNulls), (3, RandomNulls), + /// (5, AllNulls), (5, AllNulls), + /// (5, NoNulls), (5, NoNulls), + /// (5, RandomNulls), (5, RandomNulls), + /// ``` + fn generate_infinite_null_state_iter( + nullable_column_indices: Vec, + ) -> impl Iterator { + /// How many consecutive batches to generate with the same null configuration + /// to make sure that even if concatenating batches together, we still have a safe margin for the all and none null cases + const NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG: usize = 4; + + nullable_column_indices + .into_iter() + // Create a pair of (column_index, null_config) for each null config + .flat_map(|index| { + [ + (index, NullableDataGenConfig::All), + (index, NullableDataGenConfig::None), + (index, NullableDataGenConfig::Random), + ] + }) + // Repeat the same column index for NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG times + // to make sure we have multiple consecutive batches with the same null configuration + .flat_map(|index| { + std::iter::repeat_n(index, NUMBER_OF_CONSECUTIVE_WITH_SAME_NULL_CONFIG) + }) + // Make it infinite so we can just take as many as we need + .cycle() + } + + let schema = Arc::clone(&scan_schema); + + let input = (0..number_of_record_batches) + .zip(generate_infinite_null_state_iter( + nullable_grouping_expr_indices, + )) + .map(move |(batch_index, null_generate_config)| { + let unique_iterator = (batch_index * record_batch_size) + ..(batch_index * record_batch_size) + record_batch_size; + + // Only one column at a time to have nulls to make sure all the group keys together are still unique + + let mut columns = vec![ + // col_1: nullable uint8 + // The values does not really matter as we just count on it + Arc::new(UInt8Array::from_iter_values(std::iter::repeat_n( + 0, + record_batch_size, + ))), + // idx: non-nullable uint64 + Arc::new(UInt64Array::from_iter_values( + unique_iterator.clone().map(|x| x as u64), + )) as ArrayRef, + // grouping_key_1: nullable string + Arc::new(StringArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_2: non-nullable string + Arc::new(StringArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_3: nullable string view + Arc::new(StringViewArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_4: non-nullable string view + Arc::new(StringViewArray::from_iter_values( + unique_iterator.clone().map(|x| x.to_string()), + )) as ArrayRef, + // grouping_key_5: nullable int32 + Arc::new(Int32Array::from_iter_values( + unique_iterator.clone().map(|x| x as i32), + )) as ArrayRef, + // grouping_key_6: non-nullable int32 + Arc::new(Int32Array::from_iter_values( + unique_iterator.clone().map(|x| x as i32), + )) as ArrayRef, + ]; + + // Apply the null configuration to the selected column + let (column_to_set_nulls, null_config) = null_generate_config; + + match null_config { + // We should already have no nulls by default + NullableDataGenConfig::None => { + assert_eq!(columns[column_to_set_nulls].null_count(), 0); + } + // Change all values to nulls + NullableDataGenConfig::All => { + columns[column_to_set_nulls] = set_nulls( + &columns[column_to_set_nulls], + NullBuffer::new_null(columns[column_to_set_nulls].len()), + ); + } + NullableDataGenConfig::Random => { + let mut rnd = rng(); + + let null_buffer: NullBuffer = (0..columns[column_to_set_nulls].len()) + .map(|_| { + // ~30% nulls + rnd.random::() > 0.3 + }) + .collect(); + + columns[column_to_set_nulls] = + set_nulls(&columns[column_to_set_nulls], null_buffer); + } + } + + RecordBatch::try_new(Arc::clone(&schema), columns).map_err(Into::into) + }) + .collect::>>()?; + + let input_row_count: usize = input.iter().map(|b| b.num_rows()).sum(); + + let results = { + let session_config = SessionConfig::new().with_batch_size(record_batch_size); + let ctx = SessionContext::new_with_config(session_config); + + let df = run_sql_on_input( + &ctx, + Arc::clone(&scan_schema), + vec![input.clone()], + r#" + SELECT + idx, + grouping_key_1, + grouping_key_2, + grouping_key_3, + grouping_key_4, + grouping_key_5, + grouping_key_6, + COUNT(col_1) + FROM t + GROUP BY + idx, + grouping_key_1, + grouping_key_2, + grouping_key_3, + grouping_key_4, + grouping_key_5, + grouping_key_6 + "#, + ) + .await?; + + df.collect().await? + }; + + assert_eq!( + results.iter().map(|b| b.num_rows()).sum::(), + input_row_count, + "Must be exactly the same number of output rows as input rows" + ); + + // Sort the results for easier assertion + // We are sorting in different plan to make sure it does not affect the aggregation + let sorted_results = { + let session_config = SessionConfig::new().with_batch_size(record_batch_size); + let ctx = SessionContext::new_with_config(session_config); + + let df = run_sql_on_input( + &ctx, + results[0].schema(), + vec![results], + "SELECT * FROM t ORDER BY idx", + ) + .await?; + + df.collect().await? + }; + + // Assert the input is sorted + assert_eq!( + sorted_results + .iter() + .flat_map(|b| b + .column_by_name("idx") + .unwrap() + .as_primitive::() + .iter()) + .collect::>(), + (0..input_row_count) + .map(|x| Some(x as u64)) + .collect::>(), + "Output is not sorted by idx" + ); + + // The expected output batches are the input batches without the `col_1` and with a count column with value 1 added at the end + let expected_output_batches = input.into_iter().map(|batch| { + // Remove the first column (col_1) which we are counting + let indices = (1..batch.schema().fields().len()).collect::>(); + let batch = batch.project(&indices).unwrap(); + + // Add the expected count column with all values set to 1 + let count_result = vec![1; batch.num_rows()]; + let count_array = Int64Array::from_iter_values(count_result); + let (_, mut arrays, _) = batch.into_parts(); + + arrays.push(Arc::new(count_array) as ArrayRef); + + RecordBatch::try_new(sorted_results[0].schema(), arrays).unwrap() + }); + + for (output_batch, expected_batch) in + sorted_results.iter().zip(expected_output_batches) + { + assert_eq!(output_batch, &expected_batch); + } + + Ok(()) +} + +/// Set the null buffer of an array to the specified null buffer +/// it is important that we keep the underlying values as is and only modify the null buffer +/// to also test the case for bytes for example that null values does not point to an empty bytes +fn set_nulls(array: &ArrayRef, null_buffer: NullBuffer) -> ArrayRef { + let null_count = null_buffer.null_count(); + + let array_data = array + .to_data() + .into_builder() + .nulls(Some(null_buffer)) + .build() + .unwrap(); + + let array = make_array(array_data); + + assert_eq!(array.null_count(), null_count); + + array +} + +async fn run_sql_on_input( + ctx: &SessionContext, + schema: SchemaRef, + partitions: Vec>, + sql: &str, +) -> Result { + let provider = MemTable::try_new(schema, partitions)?; + + ctx.register_table("t", Arc::new(provider))?; + + ctx.sql(sql).await +} From a052b39a6d20e7085624703994eda89019012da1 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 16 Sep 2025 22:32:58 +0300 Subject: [PATCH 05/17] added comment --- datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 31e5566f3b382..8f7b9cf8cb1a0 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -947,6 +947,8 @@ async fn test_group_by_multiple_columns_all_unique() -> Result<()> { &ctx, Arc::clone(&scan_schema), vec![input.clone()], + // It is important to get the grouping keys back so we can test that we don't + // mess them up r#" SELECT idx, From 390bd68c788b7ca5b12d6f21c363863e3670c318 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 17 Sep 2025 12:22:48 +0300 Subject: [PATCH 06/17] don't always call `append_array_slice` if it is not supported this is done to make sure we don't allocate the indices again when it is not supported --- .../group_values/multi_group_by/bytes.rs | 4 +++ .../group_values/multi_group_by/mod.rs | 25 ++++++++++++++----- .../group_values/multi_group_by/primitive.rs | 4 +++ 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 199100eebcc64..1c291e04bc5ae 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -395,6 +395,10 @@ where Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + fn append_array_slice( &mut self, column: &ArrayRef, diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index cf0f676a04fc9..9cf685d523495 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -88,6 +88,10 @@ pub trait GroupColumn: Send + Sync { /// The vectorized version `append_val` fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>; + fn support_append_array_slice(&self) -> bool { + false + } + /// Append slice of values from `array`, starting at `start` for `length` rows /// /// This is a special case of `vectorized_append` when the rows are continuous @@ -98,12 +102,14 @@ pub trait GroupColumn: Send + Sync { /// to allow optimization that avoid the additional computation that can happen in a slice fn append_array_slice( &mut self, - array: &ArrayRef, - start: usize, - length: usize, + _array: &ArrayRef, + _start: usize, + _length: usize, ) -> Result<()> { - let rows = (start..start + length).collect::>(); - self.vectorized_append(array, &rows) + assert!(!self.support_append_array_slice(), "support_append_array_slice() return true while append_array_slice() is not implemented"); + not_impl_err!( + "append_array_slice is not implemented for this GroupColumn, please implement it as well as support_append_array_slice" + ) } /// Returns the number of rows stored in this builder @@ -625,7 +631,14 @@ impl GroupValuesColumn { let start = self.vectorized_operation_buffers.append_row_indices[0]; let length = self.vectorized_operation_buffers.append_row_indices.len(); for (group_column, col) in iter { - group_column.append_array_slice(col, start, length)?; + if group_column.support_append_array_slice() { + group_column.append_array_slice(col, start, length)?; + } else { + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + )?; + } } } else { for (group_column, col) in iter { diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index a312bba71b279..64a547b151c3d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -175,6 +175,10 @@ impl GroupColumn Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + fn append_array_slice( &mut self, array: &ArrayRef, From 0a78ede35ae915b9140237f8d12f9461bd08b4c6 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 17 Sep 2025 12:29:23 +0300 Subject: [PATCH 07/17] add comment --- .../src/aggregates/group_values/multi_group_by/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 9cf685d523495..1ca9b9ea7cf16 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -88,6 +88,8 @@ pub trait GroupColumn: Send + Sync { /// The vectorized version `append_val` fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>; + /// Whether this builder supports [`Self::append_array_slice`] optimization + /// In case it returns true, [`Self::append_array_slice`] must be implemented fn support_append_array_slice(&self) -> bool { false } @@ -100,6 +102,8 @@ pub trait GroupColumn: Send + Sync { /// /// This does not get the sliced array even though it would be more user-friendly /// to allow optimization that avoid the additional computation that can happen in a slice + /// + /// Note: in order for this to be used, [`Self::support_append_array_slice`] must return true fn append_array_slice( &mut self, _array: &ArrayRef, From c1bcb173f062710593f15209737cac53548d80e8 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 18 Sep 2025 21:31:08 +0300 Subject: [PATCH 08/17] add benchmark --- datafusion/core/benches/aggregate_query_sql.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 057a0e1d1b54c..a090ece4d6c3e 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -153,6 +153,19 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, utf8 \ + FROM t GROUP BY u64_wide, utf8", + ) + }) + }); + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { b.iter(|| { query( From cc2e7258c522711d760695578508b962da2c271d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 18 Sep 2025 21:31:25 +0300 Subject: [PATCH 09/17] try optimization --- .../group_values/multi_group_by/bytes.rs | 87 ++++++++++++++++--- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 1c291e04bc5ae..a022ff2067c78 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -73,6 +73,12 @@ where } } + + #[inline] + fn next_offset(&self) -> O { + O::from_usize(self.buffer.len()).expect("byte array offset overflow") + } + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool where B: ByteArrayType, @@ -181,8 +187,8 @@ where B: ByteArrayType, { let array = array.as_bytes::(); + let offsets = &array.value_offsets()[start..=start + length]; - let offsets = array.offsets(); let bytes = array.value_data(); // Is the slice all nulls @@ -198,24 +204,83 @@ where self.nulls.append_n(length, false); } - let values_start = offsets[start].as_usize(); - let values_end = offsets[start + length].as_usize(); + let values_start = offsets[0].as_usize(); + let values_end = offsets[offsets.len() - 1].as_usize(); // 2. Append the offsets // Must be before adding the bytes to the buffer - let mut last_offset = self.buffer.len(); + + self.offsets.reserve(length); if all_nulls { + let last_offset = self.buffer.len(); + // If all nulls, we can just repeat the last offset - for _ in 0..length { - self.offsets.push(O::usize_as(last_offset)); - } + self.offsets.extend( + std::iter::repeat_n(O::usize_as(last_offset), length) + ); } else { - for start_and_end_values in offsets[start..=start + length].windows(2) { - let length = start_and_end_values[1] - start_and_end_values[0]; - last_offset += length.as_usize(); - self.offsets.push(O::usize_as(last_offset)); + + // If the offsets are contiguous, we can append them directly avoiding the need to align + // for example, when the first appended array is not sliced (starts at offset 0) + if self.next_offset() == self.offsets[0] { + // if B::Offset is the same type as O, we can directly extend the offsets + // transmuting the slice + if size_of::() == size_of::() + && align_of::() == align_of::() + { + let offsets: &[O] = unsafe { + std::mem::transmute::<&[B::Offset], &[O]>(offsets) + }; + self.offsets.extend_from_slice(&offsets[1..]); + } else { + // Otherwise we need to convert the offsets + self.offsets + .extend(offsets[1..].iter().map(|o| O::from_usize(o.as_usize()).unwrap())); + } + } else { + // if B::Offset is the same type as O, we can directly extend the offsets + // transmuting the slice + if size_of::() == size_of::() + && align_of::() == align_of::() + { + let offsets: &[O] = unsafe { + std::mem::transmute::<&[B::Offset], &[O]>(offsets) + }; + + // Shifting all the offsets + let shift: O = self.next_offset() - offsets[0]; + + // Creating intermediate offsets instead of pushing each offset is faster + // (even if we make MutableBuffer to avoid updating length on each push + // and reserve the necessary capacity, it's still slower) + let mut intermediate = Vec::with_capacity(offsets.len() - 1); + + for &offset in &offsets[1..] { + intermediate.push(offset + shift) + } + + self.offsets.extend_from_slice(&intermediate); + } else { + // Otherwise we need to convert the offsets + self.offsets + .extend(offsets[1..].iter().map(|o| O::from_usize(o.as_usize()).unwrap())); + + // Shifting all the offsets + let shift: O = self.next_offset() - O::usize_as(offsets[0].as_usize()); + + // Creating intermediate offsets instead of pushing each offset is faster + // (even if we make MutableBuffer to avoid updating length on each push + // and reserve the necessary capacity, it's still slower) + let mut intermediate = Vec::with_capacity(offsets.len() - 1); + + for &offset in &offsets[1..] { + intermediate.push(O::usize_as(offset.as_usize()) + shift) + } + + self.offsets.extend_from_slice(&intermediate); + } } } From 35c5a02f4c3ff9629ee571bbc710954c7c163914 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 18 Sep 2025 21:31:30 +0300 Subject: [PATCH 10/17] Revert "try optimization" This reverts commit cc2e7258c522711d760695578508b962da2c271d. --- .../group_values/multi_group_by/bytes.rs | 87 +++---------------- 1 file changed, 11 insertions(+), 76 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index a022ff2067c78..1c291e04bc5ae 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -73,12 +73,6 @@ where } } - - #[inline] - fn next_offset(&self) -> O { - O::from_usize(self.buffer.len()).expect("byte array offset overflow") - } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool where B: ByteArrayType, @@ -187,8 +181,8 @@ where B: ByteArrayType, { let array = array.as_bytes::(); - let offsets = &array.value_offsets()[start..=start + length]; + let offsets = array.offsets(); let bytes = array.value_data(); // Is the slice all nulls @@ -204,83 +198,24 @@ where self.nulls.append_n(length, false); } - let values_start = offsets[0].as_usize(); - let values_end = offsets[offsets.len() - 1].as_usize(); + let values_start = offsets[start].as_usize(); + let values_end = offsets[start + length].as_usize(); // 2. Append the offsets // Must be before adding the bytes to the buffer - - self.offsets.reserve(length); + let mut last_offset = self.buffer.len(); if all_nulls { - let last_offset = self.buffer.len(); - // If all nulls, we can just repeat the last offset - self.offsets.extend( - std::iter::repeat_n(O::usize_as(last_offset), length) - ); + for _ in 0..length { + self.offsets.push(O::usize_as(last_offset)); + } } else { - - // If the offsets are contiguous, we can append them directly avoiding the need to align - // for example, when the first appended array is not sliced (starts at offset 0) - if self.next_offset() == self.offsets[0] { - // if B::Offset is the same type as O, we can directly extend the offsets - // transmuting the slice - if size_of::() == size_of::() - && align_of::() == align_of::() - { - let offsets: &[O] = unsafe { - std::mem::transmute::<&[B::Offset], &[O]>(offsets) - }; - self.offsets.extend_from_slice(&offsets[1..]); - } else { - // Otherwise we need to convert the offsets - self.offsets - .extend(offsets[1..].iter().map(|o| O::from_usize(o.as_usize()).unwrap())); - } - } else { - // if B::Offset is the same type as O, we can directly extend the offsets - // transmuting the slice - if size_of::() == size_of::() - && align_of::() == align_of::() - { - let offsets: &[O] = unsafe { - std::mem::transmute::<&[B::Offset], &[O]>(offsets) - }; - - // Shifting all the offsets - let shift: O = self.next_offset() - offsets[0]; - - // Creating intermediate offsets instead of pushing each offset is faster - // (even if we make MutableBuffer to avoid updating length on each push - // and reserve the necessary capacity, it's still slower) - let mut intermediate = Vec::with_capacity(offsets.len() - 1); - - for &offset in &offsets[1..] { - intermediate.push(offset + shift) - } - - self.offsets.extend_from_slice(&intermediate); - } else { - // Otherwise we need to convert the offsets - self.offsets - .extend(offsets[1..].iter().map(|o| O::from_usize(o.as_usize()).unwrap())); - - // Shifting all the offsets - let shift: O = self.next_offset() - O::usize_as(offsets[0].as_usize()); - - // Creating intermediate offsets instead of pushing each offset is faster - // (even if we make MutableBuffer to avoid updating length on each push - // and reserve the necessary capacity, it's still slower) - let mut intermediate = Vec::with_capacity(offsets.len() - 1); - - for &offset in &offsets[1..] { - intermediate.push(O::usize_as(offset.as_usize()) + shift) - } - - self.offsets.extend_from_slice(&intermediate); - } + for start_and_end_values in offsets[start..=start + length].windows(2) { + let length = start_and_end_values[1] - start_and_end_values[0]; + last_offset += length.as_usize(); + self.offsets.push(O::usize_as(last_offset)); } } From 093b069947012e298ebab5cb1450547c221b3e17 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 18 Sep 2025 21:33:31 +0300 Subject: [PATCH 11/17] reserve and use extend --- .../src/aggregates/group_values/multi_group_by/bytes.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 1c291e04bc5ae..9edeeb246b991 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -208,10 +208,10 @@ where if all_nulls { // If all nulls, we can just repeat the last offset - for _ in 0..length { - self.offsets.push(O::usize_as(last_offset)); - } + self.offsets.extend(std::iter::repeat_n(O::usize_as(last_offset), length)); } else { + self.offsets.reserve(length); + for start_and_end_values in offsets[start..=start + length].windows(2) { let length = start_and_end_values[1] - start_and_end_values[0]; last_offset += length.as_usize(); From 1ce9617a346d2b9e7be3abcf712ccf9e8aaabe9b Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 8 Oct 2025 14:41:33 +0300 Subject: [PATCH 12/17] support `append_array_slice` for boolean group values --- .../group_values/multi_group_by/boolean.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs index 91a9e21aeb686..576446f73dec8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs @@ -157,6 +157,34 @@ impl GroupColumn for BooleanGroupValueBuilder { Ok(()) } + fn support_append_array_slice(&self) -> bool { + true + } + + fn append_array_slice(&mut self, array: &ArrayRef, start: usize, length: usize) -> Result<()> { + let array = array.as_boolean(); + + if NULLABLE { + if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) { + self.nulls.append_buffer(&nulls.slice(start, length)); + } else { + self.nulls.append_n(length, false); + } + } else { + assert_eq!( + array.null_count(), + 0, + "unexpected nulls in non nullable input" + ); + self.nulls.append_n(length, false); + } + + self.buffer + .append_buffer(&array.values().slice(start, length)); + + Ok(()) + } + fn len(&self) -> usize { self.buffer.len() } From 045020e13dfb3b165558e8a25e2938800cf636b8 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 8 Oct 2025 14:45:05 +0300 Subject: [PATCH 13/17] avoid push and use extend instead to make the compiler vectorize the offset extending --- .../group_values/multi_group_by/bytes.rs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 752120c7ac57e..7e0aab0bd1f84 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{ArrowNativeType, ByteArrayType, DataType, GenericBinaryType}; use datafusion_common::utils::proxy::VecAllocExt; -use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_common::{exec_datafusion_err, exec_err, Result}; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; use itertools::izip; use std::mem::size_of; @@ -210,13 +210,14 @@ where // If all nulls, we can just repeat the last offset self.offsets.extend(std::iter::repeat_n(O::usize_as(last_offset), length)); } else { - self.offsets.reserve(length); - - for start_and_end_values in offsets[start..=start + length].windows(2) { - let length = start_and_end_values[1] - start_and_end_values[0]; - last_offset += length.as_usize(); - self.offsets.push(O::usize_as(last_offset)); - } + self.offsets.extend( + offsets[start..=start + length].windows(2) + .map(|start_and_end_values| { + let length = start_and_end_values[1] - start_and_end_values[0]; + last_offset += length.as_usize(); + O::usize_as(last_offset) + }) + ); } // 3. Append the bytes @@ -229,10 +230,7 @@ where self.buffer.append_slice(&bytes[values_start..values_end]); if self.buffer.len() > self.max_buffer_size { - return Err(DataFusionError::Execution(format!( - "offset overflow, buffer size > {}", - self.max_buffer_size - ))); + return exec_err!( "offset overflow, buffer size > {}", self.max_buffer_size); } } From e8a94e18da40a5df127427b78f2dd8d3b0586767 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:05:59 +0300 Subject: [PATCH 14/17] fix: actually generate a lot of unique values in benchmark table also added benchmark for testing pure grouping performance for more than 1 column. ---- I run this query for the data: ``` SELECT COUNT(*) AS total_count, COUNT(DISTINCT u64_wide) AS unique_count, COUNT(DISTINCT u64_wide) * 1.0 / COUNT(*) AS cardinality FROM t; ``` Before: ``` | total_count | unique_count | cardinality | | ----------- | ------------ | ----------- | | 65536 | 2048 | 0.03125 | ``` After: ``` | total_count | unique_count | cardinality | | ----------- | ------------ | ----------- | | 65536 | 65536 | 1.0 | ``` --- datafusion/core/benches/aggregate_query_sql.rs | 13 +++++++++++++ datafusion/core/benches/data_utils/mod.rs | 7 ++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 057a0e1d1b54c..a090ece4d6c3e 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -153,6 +153,19 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, utf8 \ + FROM t GROUP BY u64_wide, utf8", + ) + }) + }); + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { b.iter(|| { query( diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index c0477b1306f75..a97a694f4a959 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -81,10 +81,7 @@ fn create_data(size: usize, null_density: f64) -> Vec> { .collect() } -fn create_integer_data(size: usize, value_density: f64) -> Vec> { - // use random numbers to avoid spurious compiler optimizations wrt to branching - let mut rng = StdRng::seed_from_u64(42); - +fn create_integer_data(rng: &mut StdRng, size: usize, value_density: f64) -> Vec> { (0..size) .map(|_| { if rng.random::() > value_density { @@ -116,7 +113,7 @@ fn create_record_batch( let values = create_data(batch_size, 0.5); // Integer values between [0, u64::MAX]. - let integer_values_wide = create_integer_data(batch_size, 9.0); + let integer_values_wide = create_integer_data(rng, batch_size, 9.0); // Integer values between [0, 9]. let integer_values_narrow = (0..batch_size) From 155868df9ad86f0c555ea0a812aff1ab76848547 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:10:47 +0300 Subject: [PATCH 15/17] format --- .../core/benches/aggregate_query_sql.rs | 25 +++++++++++-------- datafusion/core/benches/data_utils/mod.rs | 6 ++++- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index a090ece4d6c3e..26355fcadf565 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -153,18 +153,21 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - // Due to the large number of distinct values in u64_wide, - // this query test the actual grouping performance for more than 1 column - "SELECT u64_wide, utf8 \ + c.bench_function( + "aggregate_query_group_by_wide_u64_and_string_without_aggregate_expressions", + |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, utf8 \ FROM t GROUP BY u64_wide, utf8", - ) - }) - }); + ) + }) + }, + ); c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { b.iter(|| { diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index a97a694f4a959..fffe2e2d17522 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -81,7 +81,11 @@ fn create_data(size: usize, null_density: f64) -> Vec> { .collect() } -fn create_integer_data(rng: &mut StdRng, size: usize, value_density: f64) -> Vec> { +fn create_integer_data( + rng: &mut StdRng, + size: usize, + value_density: f64, +) -> Vec> { (0..size) .map(|_| { if rng.random::() > value_density { From f4d0373a9a4559f9eea9f515a5796efea7b4caa1 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:27:29 +0300 Subject: [PATCH 16/17] added multi group by benchmark on primitive only columns --- datafusion/core/benches/aggregate_query_sql.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 26355fcadf565..9da341ce2e926 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -169,6 +169,22 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); + c.bench_function( + "aggregate_query_group_by_wide_u64_and_f32_without_aggregate_expressions", + |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + // Due to the large number of distinct values in u64_wide, + // this query test the actual grouping performance for more than 1 column + "SELECT u64_wide, f32 \ + FROM t GROUP BY u64_wide, f32", + ) + }) + }, + ); + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { b.iter(|| { query( From 7d8db1a5b523422d6f14b0edb9a6c46efe2c11ce Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 20 Nov 2025 20:41:28 +0200 Subject: [PATCH 17/17] optimize bytes --- .../group_values/multi_group_by/boolean.rs | 9 +++++-- .../group_values/multi_group_by/bytes.rs | 25 +++++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs index f87ac7f62a789..76b8f193573c7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs @@ -162,7 +162,12 @@ impl GroupColumn for BooleanGroupValueBuilder { true } - fn append_array_slice(&mut self, array: &ArrayRef, start: usize, length: usize) -> Result<()> { + fn append_array_slice( + &mut self, + array: &ArrayRef, + start: usize, + length: usize, + ) -> Result<()> { let array = array.as_boolean(); if NULLABLE { @@ -181,7 +186,7 @@ impl GroupColumn for BooleanGroupValueBuilder { } self.buffer - .append_buffer(&array.values().slice(start, length)); + .append_buffer(&array.values().slice(start, length)); Ok(()) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 308b6a325f6fd..9a3a6528f2af6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -205,20 +205,20 @@ where // 2. Append the offsets - // Must be before adding the bytes to the buffer - let mut last_offset = self.buffer.len(); - if all_nulls { + let last_offset = self.buffer.len(); + // If all nulls, we can just repeat the last offset - self.offsets.extend(std::iter::repeat_n(O::usize_as(last_offset), length)); + self.offsets + .extend(std::iter::repeat_n(O::usize_as(last_offset), length)); } else { + let new_base = self.buffer.len(); + let old_base = offsets[start].as_usize(); + self.offsets.extend( - offsets[start..=start + length].windows(2) - .map(|start_and_end_values| { - let length = start_and_end_values[1] - start_and_end_values[0]; - last_offset += length.as_usize(); - O::usize_as(last_offset) - }) + offsets[start + 1..start + length + 1] + .iter() + .map(|&offset| O::usize_as(new_base + offset.as_usize() - old_base)), ); } @@ -232,7 +232,10 @@ where self.buffer.append_slice(&bytes[values_start..values_end]); if self.buffer.len() > self.max_buffer_size { - return exec_err!( "offset overflow, buffer size > {}", self.max_buffer_size); + return exec_err!( + "offset overflow, buffer size > {}", + self.max_buffer_size + ); } }