From 41b5a00dfb252a734f352bd00b40d38e35da4507 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 23 Oct 2023 11:22:09 +0300 Subject: [PATCH] Simplifications --- .../physical-plan/src/aggregates/mod.rs | 81 +++++++++---------- datafusion/physical-plan/src/projection.rs | 20 ++--- 2 files changed, 45 insertions(+), 56 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 7191d51fb7f0a..1fa129680ceae 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -218,6 +218,23 @@ impl PhysicalGroupBy { pub fn is_single(&self) -> bool { self.null_expr.is_empty() } + + /// Calculate GROUP BY expressions according to input schema. + pub fn input_exprs(&self) -> Vec> { + self.expr + .iter() + .map(|(expr, _alias)| expr.clone()) + .collect() + } + + /// Return grouping expressions as they occur in the output schema. + fn output_exprs(&self) -> Vec> { + self.expr + .iter() + .enumerate() + .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _) + .collect() + } } impl PartialEq for PhysicalGroupBy { @@ -319,11 +336,7 @@ fn get_working_mode( // Since direction of the ordering is not important for GROUP BY columns, // we convert PhysicalSortExpr to PhysicalExpr in the existing ordering. let ordering_exprs = convert_to_expr(output_ordering); - let groupby_exprs = group_by - .expr - .iter() - .map(|(item, _)| item.clone()) - .collect::>(); + let groupby_exprs = group_by.input_exprs(); // Find where each expression of the GROUP BY clause occurs in the existing // ordering (if it occurs): let mut ordered_indices = @@ -363,7 +376,7 @@ fn calc_aggregation_ordering( ) -> Option { get_working_mode(input, group_by).map(|(mode, order_indices)| { let existing_ordering = input.output_ordering().unwrap_or(&[]); - let out_group_expr = output_group_expr_helper(group_by); + let out_group_expr = group_by.output_exprs(); // Calculate output ordering information for the operator: let out_ordering = order_indices .iter() @@ -381,18 +394,6 @@ fn calc_aggregation_ordering( }) } -/// This function returns grouping expressions as they occur in the output schema. -fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec> { - // Update column indices. Since the group by columns come first in the output schema, their - // indices are simply 0..self.group_expr(len). - group_by - .expr() - .iter() - .enumerate() - .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _) - .collect() -} - /// This function returns the ordering requirement of the first non-reversible /// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves /// as the initial requirement while calculating the finest requirement among all @@ -591,11 +592,7 @@ fn group_by_contains_all_requirements( group_by: &PhysicalGroupBy, requirement: &LexOrdering, ) -> bool { - let physical_exprs = group_by - .expr() - .iter() - .map(|(expr, _alias)| expr.clone()) - .collect::>(); + let physical_exprs = group_by.input_exprs(); // When we have multiple groups (grouping set) // since group by may be calculated on the subset of the group_by.expr() // it is not guaranteed to have all of the requirements among group by expressions. @@ -735,7 +732,7 @@ impl AggregateExec { /// Grouping expressions as they occur in the output schema pub fn output_group_expr(&self) -> Vec> { - output_group_expr_helper(&self.group_by) + self.group_by.output_exprs() } /// Aggregate expressions @@ -894,28 +891,24 @@ impl ExecutionPlan for AggregateExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - match &self.mode { - AggregateMode::Partial | AggregateMode::Single => { - // Partial and Single Aggregation will not change the output partitioning but need to respect the Alias - let input_partition = self.input.output_partitioning(); - match input_partition { - Partitioning::Hash(exprs, part) => { - let normalized_exprs = exprs - .into_iter() - .map(|expr| { - normalize_out_expr_with_columns_map( - expr, - &self.columns_map, - ) - }) - .collect::>(); - Partitioning::Hash(normalized_exprs, part) - } - _ => input_partition, - } + let input_partition = self.input.output_partitioning(); + if self.mode.is_first_stage() { + // First stage Aggregation will not change the output partitioning but need to respect the Alias + let input_partition = self.input.output_partitioning(); + if let Partitioning::Hash(exprs, part) = input_partition { + let normalized_exprs = exprs + .into_iter() + .map(|expr| { + normalize_out_expr_with_columns_map(expr, &self.columns_map) + }) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } else { + input_partition } + } else { // Final Aggregation's output partitioning is the same as its real input - _ => self.input.output_partitioning(), + input_partition } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 029dd24d7d115..a374154c995cf 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -224,18 +224,14 @@ impl ExecutionPlan for ProjectionExec { fn output_partitioning(&self) -> Partitioning { // Output partition need to respect the alias let input_partition = self.input.output_partitioning(); - match input_partition { - Partitioning::Hash(exprs, part) => { - let normalized_exprs = exprs - .into_iter() - .map(|expr| { - normalize_out_expr_with_columns_map(expr, &self.columns_map) - }) - .collect::>(); - - Partitioning::Hash(normalized_exprs, part) - } - _ => input_partition, + if let Partitioning::Hash(exprs, part) = input_partition { + let normalized_exprs = exprs + .into_iter() + .map(|expr| normalize_out_expr_with_columns_map(expr, &self.columns_map)) + .collect::>(); + Partitioning::Hash(normalized_exprs, part) + } else { + input_partition } }