From 37089964705ccd19940b3db86b847dd706aebaae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:07:25 -0600 Subject: [PATCH 01/13] Projection::new --- datafusion/expr/src/logical_plan/builder.rs | 37 +++++++++---------- datafusion/expr/src/logical_plan/plan.rs | 18 +++++++++ datafusion/expr/src/utils.rs | 12 +++--- .../optimizer/src/common_subexpr_eliminate.rs | 24 ++++++------ datafusion/optimizer/src/limit_push_down.rs | 12 +++--- .../optimizer/src/projection_push_down.rs | 24 ++++++------ .../src/single_distinct_to_groupby.rs | 12 +++--- 7 files changed, 76 insertions(+), 63 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a3abc694dc92a..fcfd6501c4168 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -349,12 +349,12 @@ impl LogicalPlanBuilder { input_schema.metadata().clone(), )?; - Ok(LogicalPlan::Projection(Projection { + Ok(LogicalPlan::Projection(Projection::new( expr, input, - schema: DFSchemaRef::new(new_schema), + DFSchemaRef::new(new_schema), alias, - })) + ))) } _ => { let new_inputs = curr_plan @@ -421,12 +421,12 @@ impl LogicalPlanBuilder { schema.metadata().clone(), )?; - Ok(Self::from(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(sort_plan), - schema: DFSchemaRef::new(new_schema), - alias: None, - }))) + Ok(Self::from(LogicalPlan::Projection(Projection::new( + new_expr, + Arc::new(sort_plan), + DFSchemaRef::new(new_schema), + None, + )))) } /// Apply a union, preserving duplicate rows @@ -884,12 +884,9 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input, - schema, - alias, - })) + Ok(LogicalPlan::Projection(Projection::new( + alias_expr, input, schema, alias, + ))) } /// Union two logical plans with an optional alias. @@ -983,12 +980,12 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection { - expr: projected_expr, - input: Arc::new(plan.clone()), - schema: DFSchemaRef::new(schema), + Ok(LogicalPlan::Projection(Projection::new( + projected_expr, + Arc::new(plan.clone()), + DFSchemaRef::new(schema), alias, - })) + ))) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0e52ff2535087..ca9543b8ab0a3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -995,6 +995,24 @@ pub struct Projection { pub alias: Option, } +impl Projection { + /// Create a new Projection + pub fn new( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Self { + assert_eq!(expr.len(), schema.fields().len()); + Self { + expr, + input, + schema, + alias, + } + } +} + /// Aliased subquery #[derive(Clone)] pub struct SubqueryAlias { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3b55c48510087..91ca2b7b1ba25 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,12 +341,12 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + expr.to_vec(), + Arc::new(inputs[0].clone()), + schema.clone(), + alias.clone(), + ))) } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d1e3fea702284..34c53b0dc17cb 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,12 +107,12 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection { - expr: new_expr.pop().unwrap(), - input: Arc::new(new_input), - schema: schema.clone(), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + new_expr.pop().unwrap(), + Arc::new(new_input), + schema.clone(), + alias.clone(), + ))) } LogicalPlan::Filter(Filter { predicate, input }) => { let schema = plan.schema().as_ref().clone(); @@ -292,12 +292,12 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection { - expr: project_exprs, - input: Arc::new(input), - schema: Arc::new(schema), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::new( + project_exprs, + Arc::new(input), + Arc::new(schema), + None, + ))) } #[inline] diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 9726be087f4a1..8df321723bf81 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,17 +162,17 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection { - expr: expr.clone(), - input: Arc::new(limit_push_down( + Ok(LogicalPlan::Projection(Projection::new( + expr.clone(), + Arc::new(limit_push_down( _optimizer, ancestor, input.as_ref(), _optimizer_config, )?), - schema: schema.clone(), - alias: alias.clone(), - })) + schema.clone(), + alias.clone(), + ))) } ( LogicalPlan::Union(Union { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 1dfbd817f3389..c9d34553efb3c 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,14 +192,12 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection { - expr: new_expr, - input: Arc::new(new_input), - schema: DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), - alias: alias.clone(), - })) + Ok(LogicalPlan::Projection(Projection::new( + new_expr, + Arc::new(new_input), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + alias.clone(), + ))) } } LogicalPlan::Join(Join { @@ -845,12 +843,12 @@ mod tests { input_schema.metadata().clone(), ) .unwrap(); - let plan = LogicalPlan::Projection(Projection { + let plan = LogicalPlan::Projection(Projection::new( expr, - input: Arc::new(table_scan), - schema: Arc::new(projected_schema), - alias: None, - }); + Arc::new(table_scan), + Arc::new(projected_schema), + None, + )); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 67ebd4aeabb23..86f78d503fae1 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection { - expr: alias_expr, - input: Arc::new(final_agg), - schema: schema.clone(), - alias: None, - })) + Ok(LogicalPlan::Projection(Projection::new( + alias_expr, + Arc::new(final_agg), + schema.clone(), + None, + ))) } else { optimize_children(plan) } From 22469577713584f1aaa796a3bdcebe6d86d203ee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:37:47 -0600 Subject: [PATCH 02/13] try_new and validation --- datafusion/expr/src/logical_plan/builder.rs | 41 +++++++----------- datafusion/expr/src/logical_plan/plan.rs | 42 +++++++++++++++---- datafusion/expr/src/utils.rs | 6 +-- .../optimizer/src/common_subexpr_eliminate.rs | 12 +++--- datafusion/optimizer/src/limit_push_down.rs | 6 +-- .../optimizer/src/projection_push_down.rs | 24 ++++------- .../src/single_distinct_to_groupby.rs | 6 +-- 7 files changed, 74 insertions(+), 63 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index fcfd6501c4168..6ec48348cda1c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -335,8 +335,6 @@ impl LogicalPlanBuilder { .iter() .all(|c| input.schema().field_from_column(c).is_ok()) => { - let input_schema = input.schema(); - let missing_exprs = missing_cols .iter() .map(|c| normalize_col(Expr::Column(c.clone()), &input)) @@ -344,17 +342,9 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, - input_schema.metadata().clone(), - )?; - - Ok(LogicalPlan::Projection(Projection::new( - expr, - input, - DFSchemaRef::new(new_schema), - alias, - ))) + Ok(LogicalPlan::Projection(Projection::try_new( + expr, input, None, alias, + )?)) } _ => { let new_inputs = curr_plan @@ -416,17 +406,13 @@ impl LogicalPlanBuilder { .iter() .map(|f| Expr::Column(f.qualified_column())) .collect(); - let new_schema = DFSchema::new_with_metadata( - exprlist_to_fields(&new_expr, &self.plan)?, - schema.metadata().clone(), - )?; - Ok(Self::from(LogicalPlan::Projection(Projection::new( + Ok(Self::from(LogicalPlan::Projection(Projection::try_new( new_expr, Arc::new(sort_plan), - DFSchemaRef::new(new_schema), None, - )))) + None, + )?))) } /// Apply a union, preserving duplicate rows @@ -884,9 +870,12 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection::new( - alias_expr, input, schema, alias, - ))) + Ok(LogicalPlan::Projection(Projection::try_new( + alias_expr, + input, + Some(schema), + alias, + )?)) } /// Union two logical plans with an optional alias. @@ -980,12 +969,12 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( projected_expr, Arc::new(plan.clone()), - DFSchemaRef::new(schema), + Some(DFSchemaRef::new(schema)), alias, - ))) + )?)) } /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ca9543b8ab0a3..6479e6d957c33 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -17,9 +17,10 @@ use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; +use crate::utils::exprlist_to_fields; use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError}; +use datafusion_common::{Column, DFSchema, DFSchemaRef, DataFusionError}; use std::collections::HashSet; ///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; @@ -997,19 +998,28 @@ pub struct Projection { impl Projection { /// Create a new Projection - pub fn new( + pub fn try_new( expr: Vec, input: Arc, - schema: DFSchemaRef, + schema: Option, alias: Option, - ) -> Self { - assert_eq!(expr.len(), schema.fields().len()); - Self { + ) -> Result { + let schema = match schema { + Some(x) => x, + _ => Arc::new(DFSchema::new_with_metadata( + exprlist_to_fields(&expr, &input)?, + input.schema().metadata().clone(), + )?), + }; + if expr.len() != schema.fields().len() { + return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); + } + Ok(Self { expr, input, schema, alias, - } + }) } } @@ -1380,6 +1390,8 @@ mod tests { use crate::logical_plan::table_scan; use crate::{col, lit}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::DFSchema; + use std::collections::HashMap; fn employee_schema() -> Schema { Schema::new(vec![ @@ -1691,6 +1703,22 @@ mod tests { ); } + #[test] + fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { + let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); + let p = Projection::try_new( + vec![col("a")], + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: empty_schema.clone(), + })), + Some(empty_schema), + None, + ); + assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap())); + Ok(()) + } + fn test_plan() -> LogicalPlan { let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 91ca2b7b1ba25..187d1781c43cc 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,12 +341,12 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( expr.to_vec(), Arc::new(inputs[0].clone()), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 34c53b0dc17cb..5bdfabf0d5393 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,12 +107,12 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( new_expr.pop().unwrap(), Arc::new(new_input), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } LogicalPlan::Filter(Filter { predicate, input }) => { let schema = plan.schema().as_ref().clone(); @@ -292,12 +292,12 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( project_exprs, Arc::new(input), - Arc::new(schema), + Some(Arc::new(schema)), None, - ))) + )?)) } #[inline] diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 8df321723bf81..189b7829b3f4a 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,7 +162,7 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( expr.clone(), Arc::new(limit_push_down( _optimizer, @@ -170,9 +170,9 @@ fn limit_push_down( input.as_ref(), _optimizer_config, )?), - schema.clone(), + Some(schema.clone()), alias.clone(), - ))) + )?)) } ( LogicalPlan::Union(Union { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index c9d34553efb3c..03614cf860979 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,12 +192,14 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( new_expr, Arc::new(new_input), - DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), + Some(DFSchemaRef::new(DFSchema::new_with_metadata( + new_fields, metadata, + )?)), alias.clone(), - ))) + )?)) } } LogicalPlan::Join(Join { @@ -536,9 +538,7 @@ mod tests { use datafusion_expr::{ col, lit, logical_plan::{builder::LogicalPlanBuilder, JoinType}, - max, min, - utils::exprlist_to_fields, - Expr, + max, min, Expr, }; use std::collections::HashMap; @@ -837,18 +837,12 @@ mod tests { // that the Column references are unqualified (e.g. their // relation is `None`). PlanBuilder resolves the expressions let expr = vec![col("a"), col("b")]; - let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap(); - let projected_schema = DFSchema::new_with_metadata( - projected_fields, - input_schema.metadata().clone(), - ) - .unwrap(); - let plan = LogicalPlan::Projection(Projection::new( + let plan = LogicalPlan::Projection(Projection::try_new( expr, Arc::new(table_scan), - Arc::new(projected_schema), None, - )); + None, + )?); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 86f78d503fae1..4eb623ee1e536 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,12 +141,12 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection::new( + Ok(LogicalPlan::Projection(Projection::try_new( alias_expr, Arc::new(final_agg), - schema.clone(), + Some(schema.clone()), None, - ))) + )?)) } else { optimize_children(plan) } From 6720cd1f085a4da633b804067c2f7c280f1786c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 10:48:28 -0600 Subject: [PATCH 03/13] better variable name --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6479e6d957c33..5ecf9cddf3c51 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1005,7 +1005,7 @@ impl Projection { alias: Option, ) -> Result { let schema = match schema { - Some(x) => x, + Some(provided) => provided, _ => Arc::new(DFSchema::new_with_metadata( exprlist_to_fields(&expr, &input)?, input.schema().metadata().clone(), From e41ab4b3cdbc4ebfca56a21fce7fbef433725d5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 11:19:44 -0600 Subject: [PATCH 04/13] split into try_new and try_new_with_schema --- datafusion/expr/src/logical_plan/builder.rs | 14 ++++------ datafusion/expr/src/logical_plan/plan.rs | 26 ++++++++++++------- datafusion/expr/src/utils.rs | 4 +-- .../optimizer/src/common_subexpr_eliminate.rs | 8 +++--- datafusion/optimizer/src/limit_push_down.rs | 4 +-- .../optimizer/src/projection_push_down.rs | 7 +++-- .../src/single_distinct_to_groupby.rs | 4 +-- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6ec48348cda1c..60351ad747a82 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -343,7 +343,7 @@ impl LogicalPlanBuilder { expr.extend(missing_exprs); Ok(LogicalPlan::Projection(Projection::try_new( - expr, input, None, alias, + expr, input, alias, )?)) } _ => { @@ -411,7 +411,6 @@ impl LogicalPlanBuilder { new_expr, Arc::new(sort_plan), None, - None, )?))) } @@ -870,11 +869,8 @@ pub fn project_with_column_index_alias( x => x.alias(schema.field(i).name()), }) .collect::>(); - Ok(LogicalPlan::Projection(Projection::try_new( - alias_expr, - input, - Some(schema), - alias, + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + alias_expr, input, schema, alias, )?)) } @@ -969,10 +965,10 @@ pub fn project_with_alias( None => input_schema, }; - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( projected_expr, Arc::new(plan.clone()), - Some(DFSchemaRef::new(schema)), + DFSchemaRef::new(schema), alias, )?)) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 5ecf9cddf3c51..0e438fa5119d3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1001,16 +1001,22 @@ impl Projection { pub fn try_new( expr: Vec, input: Arc, - schema: Option, alias: Option, ) -> Result { - let schema = match schema { - Some(provided) => provided, - _ => Arc::new(DFSchema::new_with_metadata( - exprlist_to_fields(&expr, &input)?, - input.schema().metadata().clone(), - )?), - }; + let schema = Arc::new(DFSchema::new_with_metadata( + exprlist_to_fields(&expr, &input)?, + input.schema().metadata().clone(), + )?); + Self::try_new_with_schema(expr, input, schema, alias) + } + + /// Create a new Projection using the specified output schema + pub fn try_new_with_schema( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, + ) -> Result { if expr.len() != schema.fields().len() { return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); } @@ -1706,13 +1712,13 @@ mod tests { #[test] fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); - let p = Projection::try_new( + let p = Projection::try_new_with_schema( vec![col("a")], Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: empty_schema.clone(), })), - Some(empty_schema), + empty_schema, None, ); assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap())); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 187d1781c43cc..f2f5d6002cc87 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -341,10 +341,10 @@ pub fn from_plan( ) -> Result { match plan { LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr.to_vec(), Arc::new(inputs[0].clone()), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 5bdfabf0d5393..3964bee6b72b1 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -107,10 +107,10 @@ fn optimize( optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr.pop().unwrap(), Arc::new(new_input), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } @@ -292,10 +292,10 @@ fn build_project_plan( let mut schema = DFSchema::new_with_metadata(fields, HashMap::new())?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( project_exprs, Arc::new(input), - Some(Arc::new(schema)), + Arc::new(schema), None, )?)) } diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 189b7829b3f4a..2269cbb26d9d6 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -162,7 +162,7 @@ fn limit_push_down( ancestor, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr.clone(), Arc::new(limit_push_down( _optimizer, @@ -170,7 +170,7 @@ fn limit_push_down( input.as_ref(), _optimizer_config, )?), - Some(schema.clone()), + schema.clone(), alias.clone(), )?)) } diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 03614cf860979..48025e8f3c150 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -192,12 +192,12 @@ fn optimize_plan( Ok(new_input) } else { let metadata = new_input.schema().metadata().clone(); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr, Arc::new(new_input), - Some(DFSchemaRef::new(DFSchema::new_with_metadata( + DFSchemaRef::new(DFSchema::new_with_metadata( new_fields, metadata, - )?)), + )?), alias.clone(), )?)) } @@ -841,7 +841,6 @@ mod tests { expr, Arc::new(table_scan), None, - None, )?); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 4eb623ee1e536..1769314ebc0cf 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -141,10 +141,10 @@ fn optimize(plan: &LogicalPlan) -> Result { schema: final_agg_schema, }); - Ok(LogicalPlan::Projection(Projection::try_new( + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( alias_expr, Arc::new(final_agg), - Some(schema.clone()), + schema.clone(), None, )?)) } else { From 5e813b2f368e0b52a36dbf006ceae68a5ba96d67 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Jul 2022 11:36:48 -0600 Subject: [PATCH 05/13] fmt --- datafusion/optimizer/src/projection_push_down.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 48025e8f3c150..aa3cdfb4252ce 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -195,9 +195,7 @@ fn optimize_plan( Ok(LogicalPlan::Projection(Projection::try_new_with_schema( new_expr, Arc::new(new_input), - DFSchemaRef::new(DFSchema::new_with_metadata( - new_fields, metadata, - )?), + DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?), alias.clone(), )?)) } From faa5aa84b8f85eeb19682144261069c43649fc62 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 08:16:47 -0600 Subject: [PATCH 06/13] skip optimizer rules that fail --- datafusion/optimizer/src/optimizer.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1fee75df6e6f7..658761c9982f5 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -20,7 +20,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::Result; use datafusion_expr::logical_plan::LogicalPlan; -use log::{debug, trace}; +use log::{debug, error, trace}; use std::sync::Arc; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -97,10 +97,21 @@ impl Optimizer { debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); for rule in &self.rules { - new_plan = rule.optimize(&new_plan, optimizer_config)?; - observer(&new_plan, rule.as_ref()); - debug!("After apply {} rule:\n", rule.name()); - debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + match rule.optimize(&new_plan, optimizer_config) { + Ok(plan) => { + new_plan = plan; + observer(&new_plan, rule.as_ref()); + debug!("After apply {} rule:\n", rule.name()); + debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + } + Err(e) => { + error!( + "Skipping optimizer rule {} due to error: {}", + rule.name(), + e + ); + } + } } debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); trace!("Full Optimized logical plan:\n {:?}", new_plan); From 6500eacde68d4283d8afa453eaf577f118b5f82c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 08:31:38 -0600 Subject: [PATCH 07/13] unit tests --- datafusion/optimizer/src/optimizer.rs | 84 ++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 658761c9982f5..0a9851413545b 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -45,6 +45,8 @@ pub struct OptimizerConfig { /// to use a literal value instead pub query_execution_start_time: DateTime, next_id: usize, + /// Option to skip rules that produce errors + skip_failing_rules: bool, } impl OptimizerConfig { @@ -53,9 +55,16 @@ impl OptimizerConfig { Self { query_execution_start_time: chrono::Utc::now(), next_id: 0, // useful for generating things like unique subquery aliases + skip_failing_rules: true, } } + /// Specify whether the optimizer should skip rules that produce errors, or fail the query + pub fn with_skip_failing_rules(mut self, b: bool) -> Self { + self.skip_failing_rules = b; + self + } + pub fn next_id(&mut self) -> usize { self.next_id += 1; self.next_id @@ -97,19 +106,24 @@ impl Optimizer { debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); for rule in &self.rules { - match rule.optimize(&new_plan, optimizer_config) { + let result = rule.optimize(&new_plan, optimizer_config); + match result { Ok(plan) => { new_plan = plan; observer(&new_plan, rule.as_ref()); debug!("After apply {} rule:\n", rule.name()); debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); } - Err(e) => { - error!( - "Skipping optimizer rule {} due to error: {}", - rule.name(), - e - ); + Err(ref e) => { + if optimizer_config.skip_failing_rules { + error!( + "Skipping optimizer rule {} due to error: {}", + rule.name(), + e + ); + } else { + return result; + } } } } @@ -118,3 +132,59 @@ impl Optimizer { Ok(new_plan) } } + +#[cfg(test)] +mod tests { + use crate::optimizer::Optimizer; + use crate::{OptimizerConfig, OptimizerRule}; + use datafusion_common::{DFSchema, DataFusionError}; + use datafusion_expr::logical_plan::EmptyRelation; + use datafusion_expr::LogicalPlan; + use std::sync::Arc; + + #[test] + fn skip_failing_rule() -> Result<(), DataFusionError> { + let opt = Optimizer::new(vec![Arc::new(BadRule {})]); + let mut config = OptimizerConfig::new().with_skip_failing_rules(true); + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + opt.optimize(&plan, &mut config, &observe)?; + Ok(()) + } + + #[test] + fn no_skip_failing_rule() -> Result<(), DataFusionError> { + let opt = Optimizer::new(vec![Arc::new(BadRule {})]); + let mut config = OptimizerConfig::new().with_skip_failing_rules(false); + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let result = opt.optimize(&plan, &mut config, &observe); + assert_eq!( + "Error during planning: rule failed", + format!("{}", result.err().unwrap()) + ); + Ok(()) + } + + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + + struct BadRule {} + + impl OptimizerRule for BadRule { + fn optimize( + &self, + _plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> datafusion_common::Result { + Err(DataFusionError::Plan("rule failed".to_string())) + } + + fn name(&self) -> &str { + "bad rule" + } + } +} From 8f0c3e45bba2f233abc2625ff675a4f9785c20d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 09:20:40 -0600 Subject: [PATCH 08/13] fix merge conflict --- datafusion/expr/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3dbeb7685879e..93c18f4b96e31 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1764,7 +1764,7 @@ mod tests { } #[test] - fn projection_expr_schema_mismatch() -> Result<(), DataFusionError> { + fn projection_expr_schema_mismatch() -> Result<()> { let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); let p = Projection::try_new_with_schema( vec![col("a")], From 2cb9101a69515c1af8836d5d4d1f0e4ff21c6ccc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 11:16:22 -0600 Subject: [PATCH 09/13] user configurable --- datafusion/core/src/config.rs | 13 ++++++++++++- datafusion/core/src/execution/context.rs | 8 ++++++-- docs/source/user-guide/configs.md | 1 + 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index f732d6431e143..054c66bf5d65d 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -43,6 +43,10 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches"; pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str = "datafusion.execution.coalesce_target_batch_size"; +/// Configuration option "datafusion.optimizer.skip_failed_rules" +pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = + "datafusion.optimizer.skip_failed_rules"; + /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -156,11 +160,18 @@ impl BuiltInConfigs { format!("Target batch size when coalescing batches. Uses in conjunction with the \ configuration setting '{}'.", OPT_COALESCE_BATCHES), 4096, + ), + ConfigDefinition::new_bool( + OPT_OPTIMIZER_SKIP_FAILED_RULES, + "When set to true, the logical plan optimizer will produce warning \ + messages if any optimization rules produce errors and then proceed to the next \ + rule. When set to false, any rules that produce errors will cause the query to fail.", + true )], } } - /// Generate documentation that can be included int he user guide + /// Generate documentation that can be included in the user guide pub fn generate_config_markdown() -> String { use std::fmt::Write as _; let configs = Self::new(); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 42de53a888aa7..5cb45be2f0659 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -83,7 +83,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, - OPT_FILTER_NULL_JOIN_KEYS, + OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Explain; @@ -1371,7 +1371,11 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let mut optimizer_config = OptimizerConfig::new(); + let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules( + self.config + .config_options + .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES), + ); optimizer_config.query_execution_start_time = self.execution_props.query_execution_start_time; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 41f6df85056ac..6794067bf82ba 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -43,3 +43,4 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | | datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | | datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | From 4dd1111ad0f00a9cbb0e56a339e51ea073c3f8cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 12:57:16 -0600 Subject: [PATCH 10/13] Update datafusion/optimizer/src/optimizer.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/optimizer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 0a9851413545b..32edb74947ad0 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -116,7 +116,7 @@ impl Optimizer { } Err(ref e) => { if optimizer_config.skip_failing_rules { - error!( + warn!( "Skipping optimizer rule {} due to error: {}", rule.name(), e From 7fd096a07df62e122f5057ab755aa0f1bcbcb77f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 12:57:21 -0600 Subject: [PATCH 11/13] Update datafusion/optimizer/src/optimizer.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/optimizer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 32edb74947ad0..f9e838161cd23 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -117,7 +117,7 @@ impl Optimizer { Err(ref e) => { if optimizer_config.skip_failing_rules { warn!( - "Skipping optimizer rule {} due to error: {}", + "Skipping optimizer rule {} due to unexpected error: {}", rule.name(), e ); From 1cc773961833d6c69db3e3fa7c00725047b5c508 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 12:57:30 -0600 Subject: [PATCH 12/13] Update datafusion/optimizer/src/optimizer.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/optimizer.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index f9e838161cd23..448042a32a454 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -116,6 +116,9 @@ impl Optimizer { } Err(ref e) => { if optimizer_config.skip_failing_rules { + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion warn!( "Skipping optimizer rule {} due to unexpected error: {}", rule.name(), From 58b29e007535e889e0ca31bc860019aac3e2e087 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Jul 2022 13:11:05 -0600 Subject: [PATCH 13/13] fix errors from applying suggestions --- datafusion/optimizer/src/optimizer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 448042a32a454..9d76cf5e7e6e9 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -20,7 +20,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::Result; use datafusion_expr::logical_plan::LogicalPlan; -use log::{debug, error, trace}; +use log::{debug, trace, warn}; use std::sync::Arc; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -116,9 +116,9 @@ impl Optimizer { } Err(ref e) => { if optimizer_config.skip_failing_rules { - // Note to future readers: if you see this warning it signals a - // bug in the DataFusion optimizer. Please consider filing a ticket - // https://github.com/apache/arrow-datafusion + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion warn!( "Skipping optimizer rule {} due to unexpected error: {}", rule.name(),