From 5096883a82ab94d451974d04660e33deb1b560ea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 2 Oct 2023 19:56:37 -0700 Subject: [PATCH 1/3] Remove input_schema --- .../aggregate_statistics.rs | 14 -------------- .../combine_partial_final_agg.rs | 5 ----- .../enforce_distribution.rs | 5 ----- .../core/src/physical_optimizer/test_utils.rs | 2 -- .../physical_optimizer/topk_aggregation.rs | 1 - datafusion/core/src/physical_planner.rs | 8 -------- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 2 -- .../physical-plan/src/aggregates/mod.rs | 19 +------------------ datafusion/proto/src/physical_plan/mod.rs | 1 - .../tests/cases/roundtrip_physical_plan.rs | 3 --- 10 files changed, 1 insertion(+), 59 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 396e66972f304..a63857d578922 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -404,7 +404,6 @@ mod tests { async fn test_count_partial_direct_child() -> Result<()> { // basic test case with the aggregation applied on a source with exact statistics let source = mock_data()?; - let schema = source.schema(); let agg = TestAggregate::new_count_star(); let partial_agg = AggregateExec::try_new( @@ -414,7 +413,6 @@ mod tests { vec![None], vec![None], source, - Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -424,7 +422,6 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), - Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -446,7 +443,6 @@ mod tests { vec![None], vec![None], source, - Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -456,7 +452,6 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), - Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -467,7 +462,6 @@ mod tests { #[tokio::test] async fn test_count_partial_indirect_child() -> Result<()> { let source = mock_data()?; - let schema = source.schema(); let agg = TestAggregate::new_count_star(); let partial_agg = AggregateExec::try_new( @@ -477,7 +471,6 @@ mod tests { vec![None], vec![None], source, - Arc::clone(&schema), )?; // We introduce an intermediate optimization step between the partial and final aggregtator @@ -490,7 +483,6 @@ mod tests { vec![None], vec![None], Arc::new(coalesce), - Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -511,7 +503,6 @@ mod tests { vec![None], vec![None], source, - Arc::clone(&schema), )?; // We introduce an intermediate optimization step between the partial and final aggregtator @@ -524,7 +515,6 @@ mod tests { vec![None], vec![None], Arc::new(coalesce), - Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -556,7 +546,6 @@ mod tests { vec![None], vec![None], filter, - Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -566,7 +555,6 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), - Arc::clone(&schema), )?; let conf = ConfigOptions::new(); @@ -603,7 +591,6 @@ mod tests { vec![None], vec![None], filter, - Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -613,7 +600,6 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), - Arc::clone(&schema), )?; let conf = ConfigOptions::new(); diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 40b2bcc3e140e..56abc65e2342d 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -91,7 +91,6 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr().to_vec(), input_agg_exec.order_by_expr().to_vec(), input_agg_exec.input().clone(), - input_agg_exec.input_schema().clone(), ) .ok() .map(Arc::new) @@ -265,7 +264,6 @@ mod tests { group_by: PhysicalGroupBy, aggr_expr: Vec>, ) -> Arc { - let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Partial, @@ -274,7 +272,6 @@ mod tests { vec![], vec![], input, - schema, ) .unwrap(), ) @@ -285,7 +282,6 @@ mod tests { group_by: PhysicalGroupBy, aggr_expr: Vec>, ) -> Arc { - let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Final, @@ -294,7 +290,6 @@ mod tests { vec![], vec![], input, - schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index b3fb41ea100fe..21ab323f82fcb 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -538,7 +538,6 @@ fn reorder_aggregate_keys( agg_exec.filter_expr().to_vec(), agg_exec.order_by_expr().to_vec(), agg_exec.input().clone(), - agg_exec.input_schema.clone(), )?)) } else { None @@ -570,7 +569,6 @@ fn reorder_aggregate_keys( agg_exec.filter_expr().to_vec(), agg_exec.order_by_expr().to_vec(), partial_agg, - agg_exec.input_schema().clone(), )?); // Need to create a new projection to change the expr ordering back @@ -1933,7 +1931,6 @@ mod tests { input: Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { - let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; for (column, alias) in alias_pairs.iter() { group_by_expr @@ -1969,11 +1966,9 @@ mod tests { vec![], vec![], input, - schema.clone(), ) .unwrap(), ), - schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index e021cda2c8682..769eddd8a043b 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -326,7 +326,6 @@ pub fn repartition_exec(input: Arc) -> Arc } pub fn aggregate_exec(input: Arc) -> Arc { - let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Final, @@ -335,7 +334,6 @@ pub fn aggregate_exec(input: Arc) -> Arc { vec![], vec![], input, - schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 572e796a8ba73..025bbe19f2ae1 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -75,7 +75,6 @@ impl TopKAggregation { aggr.filter_expr().to_vec(), aggr.order_by_expr().to_vec(), aggr.input().clone(), - aggr.input_schema().clone(), ) .expect("Unable to copy Aggregate!") .with_limit(Some(limit)); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2328ffce235d6..ef015d218a0b6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -802,7 +802,6 @@ impl DefaultPhysicalPlanner { filters.clone(), order_bys, input_exec, - physical_input_schema.clone(), )?); // update group column indices based on partial aggregate plan evaluation @@ -847,7 +846,6 @@ impl DefaultPhysicalPlanner { filters, updated_order_bys, initial_aggr, - physical_input_schema.clone(), )?)) } LogicalPlan::Projection(Projection { input, expr, .. }) => { @@ -2413,9 +2411,6 @@ mod tests { "SUM(aggregate_test_100.c2)", final_hash_agg.schema().field(1).name() ); - // we need access to the input to the partial aggregate so that other projects can - // implement serde - assert_eq!("c2", final_hash_agg.input_schema().field(1).name()); Ok(()) } @@ -2441,9 +2436,6 @@ mod tests { "SUM(aggregate_test_100.c3)", final_hash_agg.schema().field(2).name() ); - // we need access to the input to the partial aggregate so that other projects can - // implement serde - assert_eq!("c3", final_hash_agg.input_schema().field(2).name()); Ok(()) } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index a0e9a50a22aee..644829570f7f8 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -116,7 +116,6 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], vec![None], running_source, - schema.clone(), ) .unwrap(), ) as Arc; @@ -129,7 +128,6 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], vec![None], usual_source, - schema.clone(), ) .unwrap(), ) as Arc; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index b6e4b0a44dec4..0ed6de42b0030 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -283,10 +283,6 @@ pub struct AggregateExec { pub input: Arc, /// Schema after the aggregate is applied schema: SchemaRef, - /// Input schema before any aggregation is applied. For partial aggregate this will be the - /// same as input.schema() but for the final aggregate it will be the same as the input - /// to the partial aggregate - pub input_schema: SchemaRef, /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr /// The key is the column from the input schema and the values are the columns from the output schema columns_map: HashMap>, @@ -612,7 +608,6 @@ impl AggregateExec { // Ordering requirement of each aggregate expression mut order_by_expr: Vec>, input: Arc, - input_schema: SchemaRef, ) -> Result { let schema = create_schema( &input.schema(), @@ -705,7 +700,6 @@ impl AggregateExec { order_by_expr, input, schema, - input_schema, columns_map, metrics: ExecutionPlanMetricsSet::new(), aggregation_ordering, @@ -754,9 +748,8 @@ impl AggregateExec { &self.input } - /// Get the input schema before any aggregates are applied pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() + self.input.schema().clone() } fn execute_typed( @@ -982,7 +975,6 @@ impl ExecutionPlan for AggregateExec { self.filter_expr.clone(), self.order_by_expr.clone(), children[0].clone(), - self.input_schema.clone(), )?; me.limit = self.limit; Ok(Arc::new(me)) @@ -1532,7 +1524,6 @@ mod tests { vec![None], vec![None], input, - input_schema.clone(), )?); let result = @@ -1611,7 +1602,6 @@ mod tests { vec![None], vec![None], merge, - input_schema, )?); let result = @@ -1677,7 +1667,6 @@ mod tests { vec![None], vec![None], input, - input_schema.clone(), )?); let result = @@ -1725,7 +1714,6 @@ mod tests { vec![None], vec![None], merge, - input_schema, )?); let result = @@ -1988,7 +1976,6 @@ mod tests { vec![None; 3], vec![None; 3], input.clone(), - input_schema.clone(), )?); let stream = partial_aggregate.execute_typed(0, task_ctx.clone())?; @@ -2044,7 +2031,6 @@ mod tests { vec![None], vec![None], blocking_exec, - schema, )?); let fut = crate::collect(aggregate_exec, task_ctx); @@ -2083,7 +2069,6 @@ mod tests { vec![None], vec![None], blocking_exec, - schema, )?); let fut = crate::collect(aggregate_exec, task_ctx); @@ -2185,7 +2170,6 @@ mod tests { vec![None], vec![Some(ordering_req.clone())], memory_exec, - schema.clone(), )?); let coalesce = if use_coalesce_batches { let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); @@ -2201,7 +2185,6 @@ mod tests { vec![None], vec![Some(ordering_req)], coalesce, - schema, )?) as Arc; let result = crate::collect(aggregate_final, task_ctx).await?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8257f9aa34580..3212b387ba8e6 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -500,7 +500,6 @@ impl AsExecutionPlan for PhysicalPlanNode { physical_filter_expr, physical_order_by_expr, input, - Arc::new((&input_schema).try_into()?), )?)) } PhysicalPlanType::HashJoin(hashjoin) => { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 77e77630bcb2f..562d63193bc1c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -302,7 +302,6 @@ fn rountrip_aggregate() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), - schema, )?)) } @@ -370,7 +369,6 @@ fn roundtrip_aggregate_udaf() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), - schema, )?), ctx, ) @@ -584,7 +582,6 @@ fn roundtrip_distinct_count() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), - schema, )?)) } From c06693b31130428c8e4455dbd9bec6c822fce92c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Oct 2023 23:04:58 -0700 Subject: [PATCH 2/3] Revert "Remove input_schema" This reverts commit 5096883a82ab94d451974d04660e33deb1b560ea. --- .../aggregate_statistics.rs | 14 ++++++++++++++ .../combine_partial_final_agg.rs | 5 +++++ .../enforce_distribution.rs | 5 +++++ .../core/src/physical_optimizer/test_utils.rs | 2 ++ .../physical_optimizer/topk_aggregation.rs | 1 + datafusion/core/src/physical_planner.rs | 8 ++++++++ .../core/tests/fuzz_cases/aggregate_fuzz.rs | 2 ++ .../physical-plan/src/aggregates/mod.rs | 19 ++++++++++++++++++- datafusion/proto/src/physical_plan/mod.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 3 +++ 10 files changed, 59 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index a63857d578922..396e66972f304 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -404,6 +404,7 @@ mod tests { async fn test_count_partial_direct_child() -> Result<()> { // basic test case with the aggregation applied on a source with exact statistics let source = mock_data()?; + let schema = source.schema(); let agg = TestAggregate::new_count_star(); let partial_agg = AggregateExec::try_new( @@ -413,6 +414,7 @@ mod tests { vec![None], vec![None], source, + Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -422,6 +424,7 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), + Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -443,6 +446,7 @@ mod tests { vec![None], vec![None], source, + Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -452,6 +456,7 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), + Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -462,6 +467,7 @@ mod tests { #[tokio::test] async fn test_count_partial_indirect_child() -> Result<()> { let source = mock_data()?; + let schema = source.schema(); let agg = TestAggregate::new_count_star(); let partial_agg = AggregateExec::try_new( @@ -471,6 +477,7 @@ mod tests { vec![None], vec![None], source, + Arc::clone(&schema), )?; // We introduce an intermediate optimization step between the partial and final aggregtator @@ -483,6 +490,7 @@ mod tests { vec![None], vec![None], Arc::new(coalesce), + Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -503,6 +511,7 @@ mod tests { vec![None], vec![None], source, + Arc::clone(&schema), )?; // We introduce an intermediate optimization step between the partial and final aggregtator @@ -515,6 +524,7 @@ mod tests { vec![None], vec![None], Arc::new(coalesce), + Arc::clone(&schema), )?; assert_count_optim_success(final_agg, agg).await?; @@ -546,6 +556,7 @@ mod tests { vec![None], vec![None], filter, + Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -555,6 +566,7 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), + Arc::clone(&schema), )?; let conf = ConfigOptions::new(); @@ -591,6 +603,7 @@ mod tests { vec![None], vec![None], filter, + Arc::clone(&schema), )?; let final_agg = AggregateExec::try_new( @@ -600,6 +613,7 @@ mod tests { vec![None], vec![None], Arc::new(partial_agg), + Arc::clone(&schema), )?; let conf = ConfigOptions::new(); diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 56abc65e2342d..40b2bcc3e140e 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -91,6 +91,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr().to_vec(), input_agg_exec.order_by_expr().to_vec(), input_agg_exec.input().clone(), + input_agg_exec.input_schema().clone(), ) .ok() .map(Arc::new) @@ -264,6 +265,7 @@ mod tests { group_by: PhysicalGroupBy, aggr_expr: Vec>, ) -> Arc { + let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Partial, @@ -272,6 +274,7 @@ mod tests { vec![], vec![], input, + schema, ) .unwrap(), ) @@ -282,6 +285,7 @@ mod tests { group_by: PhysicalGroupBy, aggr_expr: Vec>, ) -> Arc { + let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Final, @@ -290,6 +294,7 @@ mod tests { vec![], vec![], input, + schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 21ab323f82fcb..b3fb41ea100fe 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -538,6 +538,7 @@ fn reorder_aggregate_keys( agg_exec.filter_expr().to_vec(), agg_exec.order_by_expr().to_vec(), agg_exec.input().clone(), + agg_exec.input_schema.clone(), )?)) } else { None @@ -569,6 +570,7 @@ fn reorder_aggregate_keys( agg_exec.filter_expr().to_vec(), agg_exec.order_by_expr().to_vec(), partial_agg, + agg_exec.input_schema().clone(), )?); // Need to create a new projection to change the expr ordering back @@ -1931,6 +1933,7 @@ mod tests { input: Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { + let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; for (column, alias) in alias_pairs.iter() { group_by_expr @@ -1966,9 +1969,11 @@ mod tests { vec![], vec![], input, + schema.clone(), ) .unwrap(), ), + schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 769eddd8a043b..e021cda2c8682 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -326,6 +326,7 @@ pub fn repartition_exec(input: Arc) -> Arc } pub fn aggregate_exec(input: Arc) -> Arc { + let schema = input.schema(); Arc::new( AggregateExec::try_new( AggregateMode::Final, @@ -334,6 +335,7 @@ pub fn aggregate_exec(input: Arc) -> Arc { vec![], vec![], input, + schema, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 025bbe19f2ae1..572e796a8ba73 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -75,6 +75,7 @@ impl TopKAggregation { aggr.filter_expr().to_vec(), aggr.order_by_expr().to_vec(), aggr.input().clone(), + aggr.input_schema().clone(), ) .expect("Unable to copy Aggregate!") .with_limit(Some(limit)); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ef015d218a0b6..2328ffce235d6 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -802,6 +802,7 @@ impl DefaultPhysicalPlanner { filters.clone(), order_bys, input_exec, + physical_input_schema.clone(), )?); // update group column indices based on partial aggregate plan evaluation @@ -846,6 +847,7 @@ impl DefaultPhysicalPlanner { filters, updated_order_bys, initial_aggr, + physical_input_schema.clone(), )?)) } LogicalPlan::Projection(Projection { input, expr, .. }) => { @@ -2411,6 +2413,9 @@ mod tests { "SUM(aggregate_test_100.c2)", final_hash_agg.schema().field(1).name() ); + // we need access to the input to the partial aggregate so that other projects can + // implement serde + assert_eq!("c2", final_hash_agg.input_schema().field(1).name()); Ok(()) } @@ -2436,6 +2441,9 @@ mod tests { "SUM(aggregate_test_100.c3)", final_hash_agg.schema().field(2).name() ); + // we need access to the input to the partial aggregate so that other projects can + // implement serde + assert_eq!("c3", final_hash_agg.input_schema().field(2).name()); Ok(()) } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 644829570f7f8..a0e9a50a22aee 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -116,6 +116,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], vec![None], running_source, + schema.clone(), ) .unwrap(), ) as Arc; @@ -128,6 +129,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![None], vec![None], usual_source, + schema.clone(), ) .unwrap(), ) as Arc; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 0ed6de42b0030..b6e4b0a44dec4 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -283,6 +283,10 @@ pub struct AggregateExec { pub input: Arc, /// Schema after the aggregate is applied schema: SchemaRef, + /// Input schema before any aggregation is applied. For partial aggregate this will be the + /// same as input.schema() but for the final aggregate it will be the same as the input + /// to the partial aggregate + pub input_schema: SchemaRef, /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr /// The key is the column from the input schema and the values are the columns from the output schema columns_map: HashMap>, @@ -608,6 +612,7 @@ impl AggregateExec { // Ordering requirement of each aggregate expression mut order_by_expr: Vec>, input: Arc, + input_schema: SchemaRef, ) -> Result { let schema = create_schema( &input.schema(), @@ -700,6 +705,7 @@ impl AggregateExec { order_by_expr, input, schema, + input_schema, columns_map, metrics: ExecutionPlanMetricsSet::new(), aggregation_ordering, @@ -748,8 +754,9 @@ impl AggregateExec { &self.input } + /// Get the input schema before any aggregates are applied pub fn input_schema(&self) -> SchemaRef { - self.input.schema().clone() + self.input_schema.clone() } fn execute_typed( @@ -975,6 +982,7 @@ impl ExecutionPlan for AggregateExec { self.filter_expr.clone(), self.order_by_expr.clone(), children[0].clone(), + self.input_schema.clone(), )?; me.limit = self.limit; Ok(Arc::new(me)) @@ -1524,6 +1532,7 @@ mod tests { vec![None], vec![None], input, + input_schema.clone(), )?); let result = @@ -1602,6 +1611,7 @@ mod tests { vec![None], vec![None], merge, + input_schema, )?); let result = @@ -1667,6 +1677,7 @@ mod tests { vec![None], vec![None], input, + input_schema.clone(), )?); let result = @@ -1714,6 +1725,7 @@ mod tests { vec![None], vec![None], merge, + input_schema, )?); let result = @@ -1976,6 +1988,7 @@ mod tests { vec![None; 3], vec![None; 3], input.clone(), + input_schema.clone(), )?); let stream = partial_aggregate.execute_typed(0, task_ctx.clone())?; @@ -2031,6 +2044,7 @@ mod tests { vec![None], vec![None], blocking_exec, + schema, )?); let fut = crate::collect(aggregate_exec, task_ctx); @@ -2069,6 +2083,7 @@ mod tests { vec![None], vec![None], blocking_exec, + schema, )?); let fut = crate::collect(aggregate_exec, task_ctx); @@ -2170,6 +2185,7 @@ mod tests { vec![None], vec![Some(ordering_req.clone())], memory_exec, + schema.clone(), )?); let coalesce = if use_coalesce_batches { let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); @@ -2185,6 +2201,7 @@ mod tests { vec![None], vec![Some(ordering_req)], coalesce, + schema, )?) as Arc; let result = crate::collect(aggregate_final, task_ctx).await?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 3212b387ba8e6..8257f9aa34580 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -500,6 +500,7 @@ impl AsExecutionPlan for PhysicalPlanNode { physical_filter_expr, physical_order_by_expr, input, + Arc::new((&input_schema).try_into()?), )?)) } PhysicalPlanType::HashJoin(hashjoin) => { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 562d63193bc1c..77e77630bcb2f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -302,6 +302,7 @@ fn rountrip_aggregate() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), + schema, )?)) } @@ -369,6 +370,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), + schema, )?), ctx, ) @@ -582,6 +584,7 @@ fn roundtrip_distinct_count() -> Result<()> { vec![None], vec![None], Arc::new(EmptyExec::new(false, schema.clone())), + schema, )?)) } From 070c5a2554c09ff120e01705729632c2d12ce53d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Oct 2023 23:17:50 -0700 Subject: [PATCH 3/3] Add comment --- datafusion/physical-plan/src/aggregates/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index b6e4b0a44dec4..1571198ed8353 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -285,7 +285,9 @@ pub struct AggregateExec { schema: SchemaRef, /// Input schema before any aggregation is applied. For partial aggregate this will be the /// same as input.schema() but for the final aggregate it will be the same as the input - /// to the partial aggregate + /// to the partial aggregate, i.e., partial and final aggregates have same `input_schema`. + /// We need the input schema of partial aggregate to be able to deserialize aggregate + /// expressions from protobuf for final aggregate. pub input_schema: SchemaRef, /// The columns map used to normalize out expressions like Partitioning and PhysicalSortExpr /// The key is the column from the input schema and the values are the columns from the output schema