From 80c7921e5037575cb16f0c7f6354345fa073768b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 May 2022 09:14:54 -0600 Subject: [PATCH 1/5] Fix issue 2430 --- datafusion/core/src/logical_plan/builder.rs | 11 +++-- datafusion/core/src/logical_plan/expr.rs | 26 ++++++++++-- .../src/optimizer/projection_push_down.rs | 2 +- datafusion/core/src/sql/planner.rs | 31 ++++++++++---- datafusion/core/src/sql/utils.rs | 42 +++++++++++++++++-- datafusion/core/tests/sql/group_by.rs | 26 ++++++++++++ datafusion/expr/src/logical_plan/plan.rs | 23 ++++++++-- 7 files changed, 136 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 1fbb1f5f9dfea..2a6f679dbb11c 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -557,7 +557,7 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&expr, input_schema)?, + exprlist_to_fields(&expr, &input)?, input_schema.metadata().clone(), )?; @@ -629,7 +629,7 @@ impl LogicalPlanBuilder { .map(|f| Expr::Column(f.qualified_column())) .collect(); let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&new_expr, schema)?, + exprlist_to_fields(&new_expr, &self.plan)?, schema.metadata().clone(), )?; @@ -843,8 +843,7 @@ impl LogicalPlanBuilder { let window_expr = normalize_cols(window_expr, &self.plan)?; let all_expr = window_expr.iter(); validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; - let mut window_fields: Vec = - exprlist_to_fields(all_expr, self.plan.schema())?; + let mut window_fields: Vec = exprlist_to_fields(all_expr, &self.plan)?; window_fields.extend_from_slice(self.plan.schema().fields()); Ok(Self::from(LogicalPlan::Window(Window { input: Arc::new(self.plan.clone()), @@ -869,7 +868,7 @@ impl LogicalPlanBuilder { let all_expr = group_expr.iter().chain(aggr_expr.iter()); validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?; let aggr_schema = DFSchema::new_with_metadata( - exprlist_to_fields(all_expr, self.plan.schema())?, + exprlist_to_fields(all_expr, &self.plan)?, self.plan.schema().metadata().clone(), )?; Ok(Self::from(LogicalPlan::Aggregate(Aggregate { @@ -1126,7 +1125,7 @@ pub fn project_with_alias( } validate_unique_names("Projections", projected_expr.iter(), input_schema)?; let input_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&projected_expr, input_schema)?, + exprlist_to_fields(&projected_expr, &plan)?, plan.schema().metadata().clone(), )?; let schema = match alias { diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index 673345c69b61c..02a426e8cfc98 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -25,11 +25,11 @@ use crate::logical_plan::{DFField, DFSchema}; use arrow::datatypes::DataType; pub use datafusion_common::{Column, ExprSchema}; pub use datafusion_expr::expr_fn::*; -use datafusion_expr::AccumulatorFunctionImplementation; use datafusion_expr::BuiltinScalarFunction; pub use datafusion_expr::Expr; use datafusion_expr::StateTypeFunction; pub use datafusion_expr::{lit, lit_timestamp_nano, Literal}; +use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan}; use datafusion_expr::{AggregateUDF, ScalarUDF}; use datafusion_expr::{ ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility, @@ -138,9 +138,29 @@ pub fn create_udaf( /// Create field meta-data from an expression, for use in a result set schema pub fn exprlist_to_fields<'a>( expr: impl IntoIterator, - input_schema: &DFSchema, + plan: &LogicalPlan, ) -> Result> { - expr.into_iter().map(|e| e.to_field(input_schema)).collect() + let input_schema = &plan.schema(); + let exprs: Vec = expr.into_iter().cloned().collect(); + match plan { + LogicalPlan::Aggregate(agg) => { + let mut fields = vec![]; + let group_columns: Vec = agg + .columns_in_group_expr()? + .iter() + .map(|col| Expr::Column(col.clone())) + .collect(); + for e in &exprs { + if group_columns.iter().any(|col| col == e) { + fields.push(e.to_field(agg.input.schema())?); + } else { + fields.push(e.to_field(input_schema)?); + } + } + Ok(fields) + } + _ => exprs.iter().map(|e| e.to_field(input_schema)).collect(), + } } /// Calls a named built in function diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 5062082e86433..0979d8f5b2188 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -810,7 +810,7 @@ mod tests { // that the Column references are unqualified (e.g. their // relation is `None`). PlanBuilder resolves the expressions let expr = vec![col("a"), col("b")]; - let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); + let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap(); let projected_schema = DFSchema::new_with_metadata( projected_fields, input_schema.metadata().clone(), diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index d4002997b79b4..fe737d6e81f77 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -37,7 +37,7 @@ use crate::logical_plan::{ use crate::optimizer::utils::exprlist_to_columns; use crate::prelude::JoinType; use crate::scalar::ScalarValue; -use crate::sql::utils::{make_decimal_type, normalize_ident}; +use crate::sql::utils::{make_decimal_type, normalize_ident, resolve_columns}; use crate::{ error::{DataFusionError, Result}, physical_plan::aggregates, @@ -1144,30 +1144,45 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { group_by_exprs: Vec, aggr_exprs: Vec, ) -> Result<(LogicalPlan, Vec, Option)> { + // create the aggregate plan + let plan = LogicalPlanBuilder::from(input.clone()) + .aggregate(group_by_exprs.clone(), aggr_exprs.clone())? + .build()?; + + // in this next section of code we are re-writing the projection to refer to columns + // output by the aggregate plan. For example, if the projection contains the expression + // `SUM(a)` then we replace that with a reference to a column `#SUM(a)` produced by + // the aggregate plan. + + // combine the original grouping and aggregate expressions into one list (note that + // we do not add the "having" expression since that is not part of the projection) let aggr_projection_exprs = group_by_exprs .iter() .chain(aggr_exprs.iter()) .cloned() .collect::>(); - let plan = LogicalPlanBuilder::from(input.clone()) - .aggregate(group_by_exprs, aggr_exprs)? - .build()?; + // now attempt to resolve columns and replace with fully-qualified columns + let aggr_projection_exprs = aggr_projection_exprs + .iter() + .map(|expr| resolve_columns(expr, &input)) + .collect::>>()?; - // After aggregation, these are all of the columns that will be - // available to next phases of planning. + // next we replace any expressions that are not a column with a column referencing + // an output column from the aggregate schema let column_exprs_post_aggr = aggr_projection_exprs .iter() .map(|expr| expr_as_column_expr(expr, &input)) .collect::>>()?; - // Rewrite the SELECT expression to use the columns produced by the - // aggregation. + // next we re-write the projection let select_exprs_post_aggr = select_exprs .iter() .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input)) .collect::>>()?; + // finally, we have some validation that the re-written projection can be resolved + // from the aggregate output columns check_columns_satisfy_exprs( &column_exprs_post_aggr, &select_exprs_post_aggr, diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index cd1fb316b76d5..ea68cee0cf07a 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -139,6 +139,10 @@ where exprs } +// pub(crate) fn resolve_columns(plan: &LogicalPlan) -> Result { +// +// } + /// Convert any `Expr` to an `Expr::Column`. pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { @@ -155,6 +159,22 @@ pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result Result { + clone_with_replacement(expr, &|nested_expr| { + match nested_expr { + Expr::Column(col) => { + let field = plan.schema().field_from_column(col)?; + Ok(Some(Expr::Column(field.qualified_column()))) + } + _ => { + // keep recursing + Ok(None) + } + } + }) +} + /// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s. /// /// For example, the expression `a + b < 1` would require, as input, the 2 @@ -174,6 +194,12 @@ pub(crate) fn rebase_expr( base_exprs: &[Expr], plan: &LogicalPlan, ) -> Result { + // make a best effort attempt to replace columns with fully qualified columns + let base_exprs = base_exprs + .iter() + .map(|expr| resolve_columns(expr, plan)) + .collect::>>()?; + clone_with_replacement(expr, &|nested_expr| { if base_exprs.contains(nested_expr) { Ok(Some(expr_as_column_expr(nested_expr, plan)?)) @@ -238,6 +264,11 @@ where { let replacement_opt = replacement_fn(expr)?; + println!( + "clone_with_replacement: {:?} replacement = {:?}", + expr, replacement_opt + ); + match replacement_opt { // If we were provided a replacement, use the replacement. Do not // descend further. @@ -286,10 +317,13 @@ where .map(|e| clone_with_replacement(e, replacement_fn)) .collect::>>()?, }), - Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias( - Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?), - alias_name.clone(), - )), + Expr::Alias(nested_expr, alias_name) => { + println!("alias case: alias={}, expr={:?}", alias_name, nested_expr); + Ok(Expr::Alias( + Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?), + alias_name.clone(), + )) + } Expr::Between { expr: nested_expr, negated, diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs index 41f2471f6c9e7..e3da1b02195a5 100644 --- a/datafusion/core/tests/sql/group_by.rs +++ b/datafusion/core/tests/sql/group_by.rs @@ -211,6 +211,32 @@ async fn csv_query_having_without_group_by() -> Result<()> { Ok(()) } +#[tokio::test] +async fn csv_query_group_by_substr() -> Result<()> { + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx).await?; + // there is an input column "c1" as well a projection expression aliased as "c1" + let sql = "SELECT substr(c1, 1, 1) c1 \ + FROM aggregate_test_100 \ + GROUP BY substr(c1, 1, 1) \ + "; + let actual = execute_to_batches(&ctx, sql).await; + #[rustfmt::skip] + let expected = vec![ + "+----+", + "| c1 |", + "+----+", + "| a |", + "| b |", + "| c |", + "| d |", + "| e |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn csv_query_group_by_avg() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 579898dbe207a..3201e5f0437c7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::expr::find_columns_referenced_by_expr; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError}; +use datafusion_common::{Column, DFSchemaRef, DataFusionError, Result}; use std::collections::HashSet; ///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; @@ -281,7 +282,7 @@ impl LogicalPlan { } /// returns all `Using` join columns in a logical plan - pub fn using_columns(&self) -> Result>, DataFusionError> { + pub fn using_columns(&self) -> Result>> { struct UsingJoinColumnVisitor { using_columns: Vec>, } @@ -289,7 +290,10 @@ impl LogicalPlan { impl PlanVisitor for UsingJoinColumnVisitor { type Error = DataFusionError; - fn pre_visit(&mut self, plan: &LogicalPlan) -> Result { + fn pre_visit( + &mut self, + plan: &LogicalPlan, + ) -> std::result::Result { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, on, @@ -1126,6 +1130,19 @@ pub struct Aggregate { pub schema: DFSchemaRef, } +impl Aggregate { + /// Return all columns referenced in the grouping expressions + pub fn columns_in_group_expr(&self) -> Result> { + let mut cols = vec![]; + for e in &self.group_expr { + for col in find_columns_referenced_by_expr(e) { + cols.push(col) + } + } + Ok(cols) + } +} + /// Sorts its input according to a list of sort expressions. #[derive(Clone)] pub struct Sort { From 5f9734f1359fbf029377cb67f4f763ca376c4522 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 May 2022 09:16:33 -0600 Subject: [PATCH 2/5] remove debug println --- datafusion/core/src/sql/utils.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index ea68cee0cf07a..79e3d8f8e2b7e 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -139,10 +139,6 @@ where exprs } -// pub(crate) fn resolve_columns(plan: &LogicalPlan) -> Result { -// -// } - /// Convert any `Expr` to an `Expr::Column`. pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { @@ -264,11 +260,6 @@ where { let replacement_opt = replacement_fn(expr)?; - println!( - "clone_with_replacement: {:?} replacement = {:?}", - expr, replacement_opt - ); - match replacement_opt { // If we were provided a replacement, use the replacement. Do not // descend further. @@ -317,13 +308,10 @@ where .map(|e| clone_with_replacement(e, replacement_fn)) .collect::>>()?, }), - Expr::Alias(nested_expr, alias_name) => { - println!("alias case: alias={}, expr={:?}", alias_name, nested_expr); - Ok(Expr::Alias( - Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?), - alias_name.clone(), - )) - } + Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias( + Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?), + alias_name.clone(), + )), Expr::Between { expr: nested_expr, negated, From a28f1f3a140d977cbc69c38f47372c2251dff1a4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 May 2022 09:25:42 -0600 Subject: [PATCH 3/5] revert some changes that were not needed --- datafusion/core/src/logical_plan/expr.rs | 20 +------------------- datafusion/expr/src/logical_plan/plan.rs | 23 +++-------------------- 2 files changed, 4 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index 02a426e8cfc98..9b4f14e9bad79 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -142,25 +142,7 @@ pub fn exprlist_to_fields<'a>( ) -> Result> { let input_schema = &plan.schema(); let exprs: Vec = expr.into_iter().cloned().collect(); - match plan { - LogicalPlan::Aggregate(agg) => { - let mut fields = vec![]; - let group_columns: Vec = agg - .columns_in_group_expr()? - .iter() - .map(|col| Expr::Column(col.clone())) - .collect(); - for e in &exprs { - if group_columns.iter().any(|col| col == e) { - fields.push(e.to_field(agg.input.schema())?); - } else { - fields.push(e.to_field(input_schema)?); - } - } - Ok(fields) - } - _ => exprs.iter().map(|e| e.to_field(input_schema)).collect(), - } + exprs.iter().map(|e| e.to_field(input_schema)).collect() } /// Calls a named built in function diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3201e5f0437c7..579898dbe207a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::expr::find_columns_referenced_by_expr; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError, Result}; +use datafusion_common::{Column, DFSchemaRef, DataFusionError}; use std::collections::HashSet; ///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; @@ -282,7 +281,7 @@ impl LogicalPlan { } /// returns all `Using` join columns in a logical plan - pub fn using_columns(&self) -> Result>> { + pub fn using_columns(&self) -> Result>, DataFusionError> { struct UsingJoinColumnVisitor { using_columns: Vec>, } @@ -290,10 +289,7 @@ impl LogicalPlan { impl PlanVisitor for UsingJoinColumnVisitor { type Error = DataFusionError; - fn pre_visit( - &mut self, - plan: &LogicalPlan, - ) -> std::result::Result { + fn pre_visit(&mut self, plan: &LogicalPlan) -> Result { if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, on, @@ -1130,19 +1126,6 @@ pub struct Aggregate { pub schema: DFSchemaRef, } -impl Aggregate { - /// Return all columns referenced in the grouping expressions - pub fn columns_in_group_expr(&self) -> Result> { - let mut cols = vec![]; - for e in &self.group_expr { - for col in find_columns_referenced_by_expr(e) { - cols.push(col) - } - } - Ok(cols) - } -} - /// Sorts its input according to a list of sort expressions. #[derive(Clone)] pub struct Sort { From 64200ef8ffc70291515f91657e24e7fda830958c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 May 2022 09:30:01 -0600 Subject: [PATCH 4/5] revert some changes that were not needed --- datafusion/core/src/sql/utils.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index 79e3d8f8e2b7e..4acaa21ef73b4 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -190,12 +190,6 @@ pub(crate) fn rebase_expr( base_exprs: &[Expr], plan: &LogicalPlan, ) -> Result { - // make a best effort attempt to replace columns with fully qualified columns - let base_exprs = base_exprs - .iter() - .map(|expr| resolve_columns(expr, plan)) - .collect::>>()?; - clone_with_replacement(expr, &|nested_expr| { if base_exprs.contains(nested_expr) { Ok(Some(expr_as_column_expr(nested_expr, plan)?)) From 3ea0f830d1903b558dfeb7b3029b1eaf6f98e7e9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 6 May 2022 09:38:07 -0600 Subject: [PATCH 5/5] revert some changes that were not needed --- datafusion/core/src/logical_plan/builder.rs | 11 ++++++----- datafusion/core/src/logical_plan/expr.rs | 8 +++----- datafusion/core/src/optimizer/projection_push_down.rs | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 2a6f679dbb11c..1fbb1f5f9dfea 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -557,7 +557,7 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, + exprlist_to_fields(&expr, input_schema)?, input_schema.metadata().clone(), )?; @@ -629,7 +629,7 @@ impl LogicalPlanBuilder { .map(|f| Expr::Column(f.qualified_column())) .collect(); let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&new_expr, &self.plan)?, + exprlist_to_fields(&new_expr, schema)?, schema.metadata().clone(), )?; @@ -843,7 +843,8 @@ impl LogicalPlanBuilder { let window_expr = normalize_cols(window_expr, &self.plan)?; let all_expr = window_expr.iter(); validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; - let mut window_fields: Vec = exprlist_to_fields(all_expr, &self.plan)?; + let mut window_fields: Vec = + exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); Ok(Self::from(LogicalPlan::Window(Window { input: Arc::new(self.plan.clone()), @@ -868,7 +869,7 @@ impl LogicalPlanBuilder { let all_expr = group_expr.iter().chain(aggr_expr.iter()); validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?; let aggr_schema = DFSchema::new_with_metadata( - exprlist_to_fields(all_expr, &self.plan)?, + exprlist_to_fields(all_expr, self.plan.schema())?, self.plan.schema().metadata().clone(), )?; Ok(Self::from(LogicalPlan::Aggregate(Aggregate { @@ -1125,7 +1126,7 @@ pub fn project_with_alias( } validate_unique_names("Projections", projected_expr.iter(), input_schema)?; let input_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&projected_expr, &plan)?, + exprlist_to_fields(&projected_expr, input_schema)?, plan.schema().metadata().clone(), )?; let schema = match alias { diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index 9b4f14e9bad79..673345c69b61c 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -25,11 +25,11 @@ use crate::logical_plan::{DFField, DFSchema}; use arrow::datatypes::DataType; pub use datafusion_common::{Column, ExprSchema}; pub use datafusion_expr::expr_fn::*; +use datafusion_expr::AccumulatorFunctionImplementation; use datafusion_expr::BuiltinScalarFunction; pub use datafusion_expr::Expr; use datafusion_expr::StateTypeFunction; pub use datafusion_expr::{lit, lit_timestamp_nano, Literal}; -use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan}; use datafusion_expr::{AggregateUDF, ScalarUDF}; use datafusion_expr::{ ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility, @@ -138,11 +138,9 @@ pub fn create_udaf( /// Create field meta-data from an expression, for use in a result set schema pub fn exprlist_to_fields<'a>( expr: impl IntoIterator, - plan: &LogicalPlan, + input_schema: &DFSchema, ) -> Result> { - let input_schema = &plan.schema(); - let exprs: Vec = expr.into_iter().cloned().collect(); - exprs.iter().map(|e| e.to_field(input_schema)).collect() + expr.into_iter().map(|e| e.to_field(input_schema)).collect() } /// Calls a named built in function diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 0979d8f5b2188..5062082e86433 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -810,7 +810,7 @@ mod tests { // that the Column references are unqualified (e.g. their // relation is `None`). PlanBuilder resolves the expressions let expr = vec![col("a"), col("b")]; - let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap(); + let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); let projected_schema = DFSchema::new_with_metadata( projected_fields, input_schema.metadata().clone(),