From e44eb0e9bdfa5928e718b89272be96743d1cd445 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 3 Jul 2023 19:10:02 +0800 Subject: [PATCH 1/4] revert: from_plan keep same schema Project in #6595 --- datafusion/common/src/dfschema.rs | 8 ++------ datafusion/expr/src/utils.rs | 11 +++++++---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index c490852c6ee3d..cb07f15b9d26f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -384,12 +384,8 @@ impl DFSchema { let self_fields = self.fields().iter(); let other_fields = other.fields().iter(); self_fields.zip(other_fields).all(|(f1, f2)| { - // TODO: resolve field when exist alias - // f1.qualifier() == f2.qualifier() - // && f1.name() == f2.name() - // column(t1.a) field is "t1"."a" - // column(x) as t1.a field is ""."t1.a" - f1.qualified_name() == f2.qualified_name() + f1.qualifier() == f2.qualifier() + && f1.name() == f2.name() && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) }) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 069ce6df71bc6..d33919c013c88 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -730,10 +730,13 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new( - expr.to_vec(), - Arc::new(inputs[0].clone()), - )?)), + LogicalPlan::Projection(Projection { schema, .. }) => { + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr.to_vec(), + Arc::new(inputs[0].clone()), + schema.clone(), + )?)) + } LogicalPlan::Dml(DmlStatement { table_name, table_schema, From 78d36f3f9f925f18642130c108094c87754f4cd0 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 3 Jul 2023 21:48:52 +0800 Subject: [PATCH 2/4] revert: from_plan keep same schema Agg/Window in #6820 --- datafusion/expr/src/utils.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index d33919c013c88..10a8c518bb5c7 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -821,19 +821,23 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window(Window { window_expr, .. }) => { - Ok(LogicalPlan::Window(Window::try_new( - expr[0..window_expr.len()].to_vec(), - Arc::new(inputs[0].clone()), - )?)) - } - LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { - Ok(LogicalPlan::Aggregate(Aggregate::try_new( - Arc::new(inputs[0].clone()), - expr[0..group_expr.len()].to_vec(), - expr[group_expr.len()..].to_vec(), - )?)) - } + LogicalPlan::Window(Window { + window_expr, + schema, + .. + }) => Ok(LogicalPlan::Window(Window { + input: Arc::new(inputs[0].clone()), + window_expr: expr[0..window_expr.len()].to_vec(), + schema: schema.clone(), + })), + LogicalPlan::Aggregate(Aggregate { + group_expr, schema, .. + }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( + Arc::new(inputs[0].clone()), + expr[0..group_expr.len()].to_vec(), + expr[group_expr.len()..].to_vec(), + schema.clone(), + )?)), LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()), From a4d53604f046d9cb5464977d57a56481ff057e49 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 3 Jul 2023 22:44:22 +0800 Subject: [PATCH 3/4] revert type coercion --- datafusion/optimizer/src/analyzer/type_coercion.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 8edf734b474f9..5d1fef53520ba 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -43,7 +43,7 @@ use datafusion_expr::utils::from_plan; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, LogicalPlan, Operator, - WindowFrame, WindowFrameBound, WindowFrameUnits, + Projection, WindowFrame, WindowFrameBound, WindowFrameUnits, }; use datafusion_expr::{ExprSchemable, Signature}; @@ -109,7 +109,14 @@ fn analyze_internal( }) .collect::>>()?; - from_plan(plan, &new_expr, &new_inputs) + // TODO: from_plan can't change the schema, so we need to do this here + match &plan { + LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new( + new_expr, + Arc::new(new_inputs[0].clone()), + )?)), + _ => from_plan(plan, &new_expr, &new_inputs), + } } pub(crate) struct TypeCoercionRewriter { From 53b56907f078f8ccf1b2bb17a66d0faa6cef9cc8 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 3 Jul 2023 22:49:20 +0800 Subject: [PATCH 4/4] add comment --- datafusion/expr/src/utils.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 10a8c518bb5c7..3111579246f2c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -724,6 +724,9 @@ where /// // create new plan using rewritten_exprs in same position /// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs); /// ``` +/// +/// Notice: sometimes [from_plan] will use schema of original plan, it don't change schema! +/// Such as `Projection/Aggregate/Window` pub fn from_plan( plan: &LogicalPlan, expr: &[Expr],