diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 05cc842f156a..9eb379142ea6 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -335,8 +335,6 @@ impl LogicalPlanBuilder { .iter() .all(|c| input.schema().field_from_column(c).is_ok()) => { - let input_schema = input.schema(); - let missing_exprs = missing_cols .iter() .map(|c| normalize_col(Expr::Column(c.clone()), &input)) @@ -344,17 +342,9 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, - input_schema.metadata().clone(), - )?; - - Ok(LogicalPlan::Projection(Projection { - expr, - input, - schema: DFSchemaRef::new(new_schema), - alias, - })) + Ok(LogicalPlan::Projection(Projection::try_new( + expr, input, alias, + )?)) } _ => { let new_inputs = curr_plan @@ -416,17 +406,12 @@ impl LogicalPlanBuilder { .iter() .map(|f| Expr::Column(f.qualified_column())) .collect(); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&new_expr, &self.plan)?, - schema.metadata().clone(), - )?; - Ok(Self::from(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(sort_plan), - schema: DFSchemaRef::new(new_schema), - alias: None, - }))) + Ok(Self::from(LogicalPlan::Projection(Projection::try_new( + new_expr, + Arc::new(sort_plan), + None, + )?))) } /// Apply a union, preserving duplicate rows @@ -884,12 +869,9 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input, - schema, - alias, - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, input, schema, alias, + )?)) } /// Union two logical plans with an optional alias. @@ -983,12 +965,12 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection { - expr: projected_expr, - input: Arc::new(plan.clone()), - schema: DFSchemaRef::new(schema), + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + projected_expr, + Arc::new(plan.clone()), + DFSchemaRef::new(schema), alias, - })) + )?)) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b0c0114490c4..93c18f4b96e3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -17,9 +17,10 @@ use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; +use crate::utils::exprlist_to_fields; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError}; +use datafusion_common::{Column, DFSchema, DFSchemaRef, DataFusionError}; use std::collections::HashSet; ///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; @@ -1042,6 +1043,39 @@ pub struct Projection { pub alias: Option, } +impl Projection { + /// Create a new Projection + pub fn try_new( + expr: Vec, + input: Arc, + alias: Option, + ) -> Result { + let schema = Arc::new(DFSchema::new_with_metadata( + exprlist_to_fields(&expr, &input)?, + input.schema().metadata().clone(), + )?); + Self::try_new_with_schema(expr, input, schema, alias) + } + + /// Create a new Projection using the specified output schema + pub fn try_new_with_schema( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Result { + if expr.len() != schema.fields().len() { + return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); + } + Ok(Self { + expr, + input, + schema, + alias, + }) + } +} + /// Aliased subquery #[derive(Clone)] pub struct SubqueryAlias { @@ -1409,7 +1443,9 @@ mod tests { use crate::logical_plan::table_scan; use crate::{col, in_subquery, lit}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::DFSchema; use datafusion_common::Result; + use std::collections::HashMap; fn employee_schema() -> Schema { Schema::new(vec![ @@ -1727,6 +1763,22 @@ mod tests { ); } + #[test] + fn projection_expr_schema_mismatch() -> Result<()> { + let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); + let p = Projection::try_new_with_schema( + vec![col("a")], + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: empty_schema.clone(), + })), + empty_schema, + None, + ); + assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap())); + Ok(()) + } + fn test_plan() -> LogicalPlan { let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3b55c4851008..f2f5d6002cc8 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,12 +341,12 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr.to_vec(), + Arc::new(inputs[0].clone()), + schema.clone(), + alias.clone(), + )?)) } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d1e3fea70228..3964bee6b72b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,12 +107,12 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection { - expr: new_expr.pop().unwrap(), - input: Arc::new(new_input), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + new_expr.pop().unwrap(), + Arc::new(new_input), + schema.clone(), + alias.clone(), + )?)) } LogicalPlan::Filter(Filter { predicate, input }) => { let schema = plan.schema().as_ref().clone(); @@ -292,12 +292,12 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection { - expr: project_exprs, - input: Arc::new(input), - schema: Arc::new(schema), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + project_exprs, + Arc::new(input), + Arc::new(schema), + None, + )?)) } #[inline] diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index f92061b411a0..66cdc144cc39 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,17 +162,17 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection { - expr: expr.clone(), - input: Arc::new(limit_push_down( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr.clone(), + Arc::new(limit_push_down( _optimizer, ancestor, input.as_ref(), _optimizer_config, )?), - schema: schema.clone(), - alias: alias.clone(), - })) + schema.clone(), + alias.clone(), + )?)) } ( LogicalPlan::Union(Union { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 1dfbd817f338..aa3cdfb4252c 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,14 +192,12 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(new_input), - schema: DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + new_expr, + Arc::new(new_input), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + alias.clone(), + )?)) } } LogicalPlan::Join(Join { @@ -538,9 +536,7 @@ mod tests { use datafusion_expr::{ col, lit, logical_plan::{builder::LogicalPlanBuilder, JoinType}, - max, min, - utils::exprlist_to_fields, - Expr, + max, min, Expr, }; use std::collections::HashMap; @@ -839,18 +835,11 @@ mod tests { // that the Column references are unqualified (e.g. their // relation is `None`). PlanBuilder resolves the expressions let expr = vec![col("a"), col("b")]; - let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap(); - let projected_schema = DFSchema::new_with_metadata( - projected_fields, - input_schema.metadata().clone(), - ) - .unwrap(); - let plan = LogicalPlan::Projection(Projection { + let plan = LogicalPlan::Projection(Projection::try_new( expr, - input: Arc::new(table_scan), - schema: Arc::new(projected_schema), - alias: None, - }); + Arc::new(table_scan), + None, + )?); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 67ebd4aeabb2..1769314ebc0c 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input: Arc::new(final_agg), - schema: schema.clone(), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, + Arc::new(final_agg), + schema.clone(), + None, + )?)) } else { optimize_children(plan) }