From e823b168afbdad88d2651c8ed2c575ce1d5736d9 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 10 Jun 2021 22:52:16 +0800 Subject: [PATCH 1/2] refactor hash aggregates --- datafusion/src/physical_plan/mod.rs | 3 +- datafusion/src/physical_plan/planner.rs | 54 +++++++++++-------------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index af6969c43cbd6..710a4ebb25d7a 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -343,7 +343,8 @@ pub enum Partitioning { RoundRobinBatch(usize), /// Allocate rows based on a hash of one of more expressions and the specified /// number of partitions - /// This partitioning scheme is not yet fully supported. See [ARROW-11011](https://issues.apache.org/jira/browse/ARROW-11011) + /// FIXME: This partitioning scheme is not yet fully supported. + /// See https://github.com/apache/arrow-datafusion/issues/131 Hash(Vec>, usize), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d7451c7870961..37ca10aaa0d72 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -222,11 +222,15 @@ impl DefaultPhysicalPlanner { .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) .any(|x| matches!(x, DataType::Dictionary(_, _))); - if !groups.is_empty() + let can_repartition = !groups.is_empty() && ctx_state.config.concurrency > 1 && ctx_state.config.repartition_aggregations - && !contains_dict - { + && !contains_dict; + + let (initial_aggr, next_partition_mode): ( + Arc, + AggregateMode, + ) = if can_repartition { // Divide partial hash aggregates into multiple partitions by hash key let hash_repartition = Arc::new(RepartitionExec::try_new( initial_aggr, @@ -235,35 +239,25 @@ impl DefaultPhysicalPlanner { ctx_state.config.concurrency, ), )?); - - // Combine hashaggregates within the partition - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::FinalPartitioned, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - hash_repartition, - input_schema, - )?)) + // Combine hash aggregates within the partition + (hash_repartition, AggregateMode::FinalPartitioned) } else { - // construct a second aggregation, keeping the final column name equal to the first aggregation - // and the expressions corresponding to the respective aggregate + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + (initial_aggr, AggregateMode::Final) + }; - Ok(Arc::new(HashAggregateExec::try_new( - AggregateMode::Final, - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - initial_aggr, - input_schema, - )?)) - } + Ok(Arc::new(HashAggregateExec::try_new( + next_partition_mode, + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + initial_aggr, + input_schema, + )?)) } LogicalPlan::Projection { input, expr, .. } => { let input_exec = self.create_initial_plan(input, ctx_state)?; From a9a2ca76fd5b8a436f0d58690a3b7ce25d45cf08 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 11 Jun 2021 16:28:03 +0800 Subject: [PATCH 2/2] remove stale comments --- datafusion/src/physical_plan/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 710a4ebb25d7a..ebc6fd6ce94a2 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -341,10 +341,8 @@ pub async fn collect_partitioned( pub enum Partitioning { /// Allocate batches using a round-robin algorithm and the specified number of partitions RoundRobinBatch(usize), - /// Allocate rows based on a hash of one of more expressions and the specified - /// number of partitions - /// FIXME: This partitioning scheme is not yet fully supported. - /// See https://github.com/apache/arrow-datafusion/issues/131 + /// Allocate rows based on a hash of one of more expressions and the specified number of + /// partitions Hash(Vec>, usize), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize),