Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ config_namespace! {

/// Parquet options
pub parquet: ParquetOptions, default = Default::default()

/// Aggregate options
pub aggregate: AggregateOptions, default = Default::default()
}
}

Expand Down Expand Up @@ -260,6 +263,23 @@ config_namespace! {
}
}

config_namespace! {
/// Options related to aggregate execution
pub struct AggregateOptions {
/// Specifies the threshold for using `ScalarValue`s to update
/// accumulators during high-cardinality aggregations for each input batch.
///
/// The aggregation is considered high-cardinality if the number of affected groups
/// is greater than or equal to `batch_size / scalar_update_factor`. In such cases,
/// `ScalarValue`s are utilized for updating accumulators, rather than the default
/// batch-slice approach. This can lead to performance improvements.
///
/// By adjusting the `scalar_update_factor`, you can balance the trade-off between
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for the text helping users understand the tradeoff

/// more efficient accumulator updates and the number of groups affected.
pub scalar_update_factor: usize, default = 10
}
}

config_namespace! {
/// Options related to query optimization
pub struct OptimizerOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub(crate) struct BoundedAggregateStream {
random_state: RandomState,
/// size to be used for resulting RecordBatches
batch_size: usize,
/// threshold for using `ScalarValue`s to update
/// accumulators during high-cardinality aggregations for each input batch.
scalar_update_factor: usize,
/// if the result is chunked into batches,
/// last offset is preserved for continuation.
row_group_skip_position: usize,
Expand All @@ -126,6 +129,7 @@ impl BoundedAggregateStream {
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
batch_size: usize,
scalar_update_factor: usize,
context: Arc<TaskContext>,
partition: usize,
// Stores algorithm mode and output ordering
Expand Down Expand Up @@ -228,6 +232,7 @@ impl BoundedAggregateStream {
baseline_metrics,
random_state: Default::default(),
batch_size,
scalar_update_factor,
row_group_skip_position: 0,
indices: [normal_agg_indices, row_agg_indices],
is_end: false,
Expand Down Expand Up @@ -747,7 +752,7 @@ impl BoundedAggregateStream {
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10
&& groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor
{
self.update_accumulators_using_scalar(
&groups_with_rows,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ impl AggregateExec {
context: Arc<TaskContext>,
) -> Result<StreamType> {
let batch_size = context.session_config().batch_size();
let scalar_update_factor = context.session_config().agg_scalar_update_factor();
let input = self.input.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

Expand All @@ -448,6 +449,7 @@ impl AggregateExec {
input,
baseline_metrics,
batch_size,
scalar_update_factor,
context,
partition,
aggregation_ordering.clone(),
Expand All @@ -463,6 +465,7 @@ impl AggregateExec {
input,
baseline_metrics,
batch_size,
scalar_update_factor,
context,
partition,
)?,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ pub(crate) struct GroupedHashAggregateStream {
random_state: RandomState,
/// size to be used for resulting RecordBatches
batch_size: usize,
/// threshold for using `ScalarValue`s to update
/// accumulators during high-cardinality aggregations for each input batch.
scalar_update_factor: usize,
/// if the result is chunked into batches,
/// last offset is preserved for continuation.
row_group_skip_position: usize,
Expand All @@ -119,6 +122,7 @@ impl GroupedHashAggregateStream {
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
batch_size: usize,
scalar_update_factor: usize,
context: Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
Expand Down Expand Up @@ -219,6 +223,7 @@ impl GroupedHashAggregateStream {
baseline_metrics,
random_state: Default::default(),
batch_size,
scalar_update_factor,
row_group_skip_position: 0,
indices: [normal_agg_indices, row_agg_indices],
})
Expand Down Expand Up @@ -555,7 +560,7 @@ impl GroupedHashAggregateStream {
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10
&& groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor
{
self.update_accumulators_using_scalar(
&groups_with_rows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ datafusion.catalog.format NULL
datafusion.catalog.has_header false
datafusion.catalog.information_schema true
datafusion.catalog.location NULL
datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
Expand Down
13 changes: 13 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ impl SessionConfig {
self.options.execution.batch_size
}

/// Get the currently configured scalar_update_factor for aggregate
pub fn agg_scalar_update_factor(&self) -> usize {
self.options.execution.aggregate.scalar_update_factor
}

/// Customize scalar_update_factor for aggregate
pub fn with_agg_scalar_update_factor(mut self, n: usize) -> Self {
// scalar update factor must be greater than zero
assert!(n > 0);
self.options.execution.aggregate.scalar_update_factor = n;
self
}

/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
Expand Down
Loading