From 37089964705ccd19940b3db86b847dd706aebaae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:07:25 -0600 Subject: [PATCH 1/8] Projection::new --- datafusion/expr/src/logical_plan/builder.rs | 37 +++++++++---------- datafusion/expr/src/logical_plan/plan.rs | 18 +++++++++ datafusion/expr/src/utils.rs | 12 +++--- .../optimizer/src/common_subexpr_eliminate.rs | 24 ++++++------ datafusion/optimizer/src/limit_push_down.rs | 12 +++--- .../optimizer/src/projection_push_down.rs | 24 ++++++------ .../src/single_distinct_to_groupby.rs | 12 +++--- 7 files changed, 76 insertions(+), 63 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a3abc694dc92..fcfd6501c416 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -349,12 +349,12 @@ impl LogicalPlanBuilder { input_schema.metadata().clone(), )?; - Ok(LogicalPlan::Projection(Projection { + Ok(LogicalPlan::Projection(Projection::new( expr, input, - schema: DFSchemaRef::new(new_schema), + DFSchemaRef::new(new_schema), alias, - })) + ))) } _ => { let new_inputs = curr_plan @@ -421,12 +421,12 @@ impl LogicalPlanBuilder { schema.metadata().clone(), )?; - Ok(Self::from(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(sort_plan), - schema: DFSchemaRef::new(new_schema), - alias: None, - }))) + Ok(Self::from(LogicalPlan::Projection(Projection::new( + new_expr, + Arc::new(sort_plan), + DFSchemaRef::new(new_schema), + None, + )))) } /// Apply a union, preserving duplicate rows @@ -884,12 +884,9 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input, - schema, - alias, - })) + Ok(LogicalPlan::Projection(Projection::new( + alias_expr, input, schema, alias, + ))) } /// Union two logical plans with an optional alias. @@ -983,12 +980,12 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection { - expr: projected_expr, - input: Arc::new(plan.clone()), - schema: DFSchemaRef::new(schema), + Ok(LogicalPlan::Projection(Projection::new( + projected_expr, + Arc::new(plan.clone()), + DFSchemaRef::new(schema), alias, - })) + ))) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0e52ff253508..ca9543b8ab0a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -995,6 +995,24 @@ pub struct Projection { pub alias: Option, } +impl Projection { + /// Create a new Projection + pub fn new( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Self { + assert_eq!(expr.len(), schema.fields().len()); + Self { + expr, + input, + schema, + alias, + } + } +} + /// Aliased subquery #[derive(Clone)] pub struct SubqueryAlias { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3b55c4851008..91ca2b7b1ba2 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,12 +341,12 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + expr.to_vec(), + Arc::new(inputs[0].clone()), + schema.clone(), + alias.clone(), + ))) } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d1e3fea70228..34c53b0dc17c 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,12 +107,12 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection { - expr: new_expr.pop().unwrap(), - input: Arc::new(new_input), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + new_expr.pop().unwrap(), + Arc::new(new_input), + schema.clone(), + alias.clone(), + ))) } LogicalPlan::Filter(Filter { predicate, input }) => { let schema = plan.schema().as_ref().clone(); @@ -292,12 +292,12 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection { - expr: project_exprs, - input: Arc::new(input), - schema: Arc::new(schema), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::new( + project_exprs, + Arc::new(input), + Arc::new(schema), + None, + ))) } #[inline] diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 9726be087f4a..8df321723bf8 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,17 +162,17 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection { - expr: expr.clone(), - input: Arc::new(limit_push_down( + Ok(LogicalPlan::Projection(Projection::new( + expr.clone(), + Arc::new(limit_push_down( _optimizer, ancestor, input.as_ref(), _optimizer_config, )?), - schema: schema.clone(), - alias: alias.clone(), - })) + schema.clone(), + alias.clone(), + ))) } ( LogicalPlan::Union(Union { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 1dfbd817f338..c9d34553efb3 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,14 +192,12 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(new_input), - schema: DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + new_expr, + Arc::new(new_input), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + alias.clone(), + ))) } } LogicalPlan::Join(Join { @@ -845,12 +843,12 @@ mod tests { input_schema.metadata().clone(), ) .unwrap(); - let plan = LogicalPlan::Projection(Projection { + let plan = LogicalPlan::Projection(Projection::new( expr, - input: Arc::new(table_scan), - schema: Arc::new(projected_schema), - alias: None, - }); + Arc::new(table_scan), + Arc::new(projected_schema), + None, + )); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 67ebd4aeabb2..86f78d503fae 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input: Arc::new(final_agg), - schema: schema.clone(), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::new( + alias_expr, + Arc::new(final_agg), + schema.clone(), + None, + ))) } else { optimize_children(plan) } From 22469577713584f1aaa796a3bdcebe6d86d203ee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:37:47 -0600 Subject: [PATCH 2/8] try_new and validation --- datafusion/expr/src/logical_plan/builder.rs | 41 +++++++----------- datafusion/expr/src/logical_plan/plan.rs | 42 +++++++++++++++---- datafusion/expr/src/utils.rs | 6 +-- .../optimizer/src/common_subexpr_eliminate.rs | 12 +++--- datafusion/optimizer/src/limit_push_down.rs | 6 +-- .../optimizer/src/projection_push_down.rs | 24 ++++------- .../src/single_distinct_to_groupby.rs | 6 +-- 7 files changed, 74 insertions(+), 63 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index fcfd6501c416..6ec48348cda1 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -335,8 +335,6 @@ impl LogicalPlanBuilder { .iter() .all(|c| input.schema().field_from_column(c).is_ok()) => { - let input_schema = input.schema(); - let missing_exprs = missing_cols .iter() .map(|c| normalize_col(Expr::Column(c.clone()), &input)) @@ -344,17 +342,9 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, - input_schema.metadata().clone(), - )?; - - Ok(LogicalPlan::Projection(Projection::new( - expr, - input, - DFSchemaRef::new(new_schema), - alias, - ))) + Ok(LogicalPlan::Projection(Projection::try_new( + expr, input, None, alias, + )?)) } _ => { let new_inputs = curr_plan @@ -416,17 +406,13 @@ impl LogicalPlanBuilder { .iter() .map(|f| Expr::Column(f.qualified_column())) .collect(); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&new_expr, &self.plan)?, - schema.metadata().clone(), - )?; - Ok(Self::from(LogicalPlan::Projection(Projection::new( + Ok(Self::from(LogicalPlan::Projection(Projection::try_new( new_expr, Arc::new(sort_plan), - DFSchemaRef::new(new_schema), None, - )))) + None, + )?))) } /// Apply a union, preserving duplicate rows @@ -884,9 +870,12 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection::new( - alias_expr, input, schema, alias, - ))) + Ok(LogicalPlan::Projection(Projection::try_new( + alias_expr, + input, + Some(schema), + alias, + )?)) } /// Union two logical plans with an optional alias. @@ -980,12 +969,12 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( projected_expr, Arc::new(plan.clone()), - DFSchemaRef::new(schema), + Some(DFSchemaRef::new(schema)), alias, - ))) + )?)) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ca9543b8ab0a..6479e6d957c3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -17,9 +17,10 @@ use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; +use crate::utils::exprlist_to_fields; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError}; +use datafusion_common::{Column, DFSchema, DFSchemaRef, DataFusionError}; use std::collections::HashSet; ///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; @@ -997,19 +998,28 @@ pub struct Projection { impl Projection { /// Create a new Projection - pub fn new( + pub fn try_new( expr: Vec, input: Arc, - schema: DFSchemaRef, + schema: Option, alias: Option, - ) -> Self { - assert_eq!(expr.len(), schema.fields().len()); - Self { + ) -> Result { + let schema = match schema { + Some(x) => x, + _ => Arc::new(DFSchema::new_with_metadata( + exprlist_to_fields(&expr, &input)?, + input.schema().metadata().clone(), + )?), + }; + if expr.len() != schema.fields().len() { + return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); + } + Ok(Self { expr, input, schema, alias, - } + }) } } @@ -1380,6 +1390,8 @@ mod tests { use crate::logical_plan::table_scan; use crate::{col, lit}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::DFSchema; + use std::collections::HashMap; fn employee_schema() -> Schema { Schema::new(vec![ @@ -1691,6 +1703,22 @@ mod tests { ); } + #[test] + fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { + let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); + let p = Projection::try_new( + vec![col("a")], + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: empty_schema.clone(), + })), + Some(empty_schema), + None, + ); + assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap())); + Ok(()) + } + fn test_plan() -> LogicalPlan { let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 91ca2b7b1ba2..187d1781c43c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,12 +341,12 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( expr.to_vec(), Arc::new(inputs[0].clone()), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 34c53b0dc17c..5bdfabf0d539 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,12 +107,12 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( new_expr.pop().unwrap(), Arc::new(new_input), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } LogicalPlan::Filter(Filter { predicate, input }) => { let schema = plan.schema().as_ref().clone(); @@ -292,12 +292,12 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( project_exprs, Arc::new(input), - Arc::new(schema), + Some(Arc::new(schema)), None, - ))) + )?)) } #[inline] diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 8df321723bf8..189b7829b3f4 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,7 +162,7 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( expr.clone(), Arc::new(limit_push_down( _optimizer, @@ -170,9 +170,9 @@ fn limit_push_down( input.as_ref(), _optimizer_config, )?), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } ( LogicalPlan::Union(Union { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index c9d34553efb3..03614cf86097 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,12 +192,14 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( new_expr, Arc::new(new_input), - DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + Some(DFSchemaRef::new(DFSchema::new_with_metadata( + new_fields, metadata, + )?)), alias.clone(), - ))) + )?)) } } LogicalPlan::Join(Join { @@ -536,9 +538,7 @@ mod tests { use datafusion_expr::{ col, lit, logical_plan::{builder::LogicalPlanBuilder, JoinType}, - max, min, - utils::exprlist_to_fields, - Expr, + max, min, Expr, }; use std::collections::HashMap; @@ -837,18 +837,12 @@ mod tests { // that the Column references are unqualified (e.g. their // relation is `None`). PlanBuilder resolves the expressions let expr = vec![col("a"), col("b")]; - let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap(); - let projected_schema = DFSchema::new_with_metadata( - projected_fields, - input_schema.metadata().clone(), - ) - .unwrap(); - let plan = LogicalPlan::Projection(Projection::new( + let plan = LogicalPlan::Projection(Projection::try_new( expr, Arc::new(table_scan), - Arc::new(projected_schema), None, - )); + None, + )?); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 86f78d503fae..4eb623ee1e53 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( alias_expr, Arc::new(final_agg), - schema.clone(), + Some(schema.clone()), None, - ))) + )?)) } else { optimize_children(plan) } From 6720cd1f085a4da633b804067c2f7c280f1786c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:48:28 -0600 Subject: [PATCH 3/8] better variable name --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6479e6d957c3..5ecf9cddf3c5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1005,7 +1005,7 @@ impl Projection { alias: Option, ) -> Result { let schema = match schema { - Some(x) => x, + Some(provided) => provided, _ => Arc::new(DFSchema::new_with_metadata( exprlist_to_fields(&expr, &input)?, input.schema().metadata().clone(), From e41ab4b3cdbc4ebfca56a21fce7fbef433725d5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 11:19:44 -0600 Subject: [PATCH 4/8] split into try_new and try_new_with_schema --- datafusion/expr/src/logical_plan/builder.rs | 14 ++++------ datafusion/expr/src/logical_plan/plan.rs | 26 ++++++++++++------- datafusion/expr/src/utils.rs | 4 +-- .../optimizer/src/common_subexpr_eliminate.rs | 8 +++--- datafusion/optimizer/src/limit_push_down.rs | 4 +-- .../optimizer/src/projection_push_down.rs | 7 +++-- .../src/single_distinct_to_groupby.rs | 4 +-- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6ec48348cda1..60351ad747a8 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -343,7 +343,7 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); Ok(LogicalPlan::Projection(Projection::try_new( - expr, input, None, alias, + expr, input, alias, )?)) } _ => { @@ -411,7 +411,6 @@ impl LogicalPlanBuilder { new_expr, Arc::new(sort_plan), None, - None, )?))) } @@ -870,11 +869,8 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection::try_new( - alias_expr, - input, - Some(schema), - alias, + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, input, schema, alias, )?)) } @@ -969,10 +965,10 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( projected_expr, Arc::new(plan.clone()), - Some(DFSchemaRef::new(schema)), + DFSchemaRef::new(schema), alias, )?)) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 5ecf9cddf3c5..0e438fa5119d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1001,16 +1001,22 @@ impl Projection { pub fn try_new( expr: Vec, input: Arc, - schema: Option, alias: Option, ) -> Result { - let schema = match schema { - Some(provided) => provided, - _ => Arc::new(DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, - input.schema().metadata().clone(), - )?), - }; + let schema = Arc::new(DFSchema::new_with_metadata( + exprlist_to_fields(&expr, &input)?, + input.schema().metadata().clone(), + )?); + Self::try_new_with_schema(expr, input, schema, alias) + } + + /// Create a new Projection using the specified output schema + pub fn try_new_with_schema( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Result { if expr.len() != schema.fields().len() { return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); } @@ -1706,13 +1712,13 @@ mod tests { #[test] fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); - let p = Projection::try_new( + let p = Projection::try_new_with_schema( vec![col("a")], Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: empty_schema.clone(), })), - Some(empty_schema), + empty_schema, None, ); assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap())); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 187d1781c43c..f2f5d6002cc8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,10 +341,10 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr.to_vec(), Arc::new(inputs[0].clone()), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 5bdfabf0d539..3964bee6b72b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,10 +107,10 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr.pop().unwrap(), Arc::new(new_input), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } @@ -292,10 +292,10 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( project_exprs, Arc::new(input), - Some(Arc::new(schema)), + Arc::new(schema), None, )?)) } diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 189b7829b3f4..2269cbb26d9d 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,7 +162,7 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr.clone(), Arc::new(limit_push_down( _optimizer, @@ -170,7 +170,7 @@ fn limit_push_down( input.as_ref(), _optimizer_config, )?), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 03614cf86097..48025e8f3c15 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,12 +192,12 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr, Arc::new(new_input), - Some(DFSchemaRef::new(DFSchema::new_with_metadata( + DFSchemaRef::new(DFSchema::new_with_metadata( new_fields, metadata, - )?)), + )?), alias.clone(), )?)) } @@ -841,7 +841,6 @@ mod tests { expr, Arc::new(table_scan), None, - None, )?); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 4eb623ee1e53..1769314ebc0c 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,10 +141,10 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( alias_expr, Arc::new(final_agg), - Some(schema.clone()), + schema.clone(), None, )?)) } else { From 5e813b2f368e0b52a36dbf006ceae68a5ba96d67 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 11:36:48 -0600 Subject: [PATCH 5/8] fmt --- datafusion/optimizer/src/projection_push_down.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 48025e8f3c15..aa3cdfb4252c 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -195,9 +195,7 @@ fn optimize_plan( Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr, Arc::new(new_input), - DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), alias.clone(), )?)) } From 28f07a03a49c8b1170649cf455cebab02f4db244 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 08:57:30 -0600 Subject: [PATCH 6/8] fix invalid projection in common_subexpr_eliminate --- datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/optimizer/src/common_subexpr_eliminate.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3dbeb7685879..93c18f4b96e3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1764,7 +1764,7 @@ mod tests { } #[test] - fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { + fn projection_expr_schema_mismatch() -> Result<()> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); let p = Projection::try_new_with_schema( vec![col("a")], diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 3964bee6b72b..db45a5af0c8d 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -289,13 +289,9 @@ fn build_project_plan( } } - let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; - schema.merge(input.schema()); - - Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + Ok(LogicalPlan::Projection(Projection::try_new( project_exprs, Arc::new(input), - Arc::new(schema), None, )?)) } From 616ae6010972a39e957405203aa28d88345c2e8b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 09:05:50 -0600 Subject: [PATCH 7/8] Revert "fix invalid projection in common_subexpr_eliminate" This reverts commit 28f07a03a49c8b1170649cf455cebab02f4db244. --- datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/optimizer/src/common_subexpr_eliminate.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 93c18f4b96e3..3dbeb7685879 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1764,7 +1764,7 @@ mod tests { } #[test] - fn projection_expr_schema_mismatch() -> Result<()> { + fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); let p = Projection::try_new_with_schema( vec![col("a")], diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index db45a5af0c8d..3964bee6b72b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -289,9 +289,13 @@ fn build_project_plan( } } - Ok(LogicalPlan::Projection(Projection::try_new( + let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; + schema.merge(input.schema()); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( project_exprs, Arc::new(input), + Arc::new(schema), None, )?)) } From dcf0be3ee8c0dca6cea94f65e1ef07aab1f26e1e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 09:24:53 -0600 Subject: [PATCH 8/8] fix merge conflict --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3dbeb7685879..93c18f4b96e3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1764,7 +1764,7 @@ mod tests { } #[test] - fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { + fn projection_expr_schema_mismatch() -> Result<()> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); let p = Projection::try_new_with_schema( vec![col("a")],