diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index c490852c6ee3d..cb07f15b9d26f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -384,12 +384,8 @@ impl DFSchema { let self_fields = self.fields().iter(); let other_fields = other.fields().iter(); self_fields.zip(other_fields).all(|(f1, f2)| { - // TODO: resolve field when exist alias - // f1.qualifier() == f2.qualifier() - // && f1.name() == f2.name() - // column(t1.a) field is "t1"."a" - // column(x) as t1.a field is ""."t1.a" - f1.qualified_name() == f2.qualified_name() + f1.qualifier() == f2.qualifier() + && f1.name() == f2.name() && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) }) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 069ce6df71bc6..3111579246f2c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -724,16 +724,22 @@ where /// // create new plan using rewritten_exprs in same position /// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs); /// ``` +/// +/// Notice: sometimes [from_plan] will use schema of original plan, it don't change schema! +/// Such as `Projection/Aggregate/Window` pub fn from_plan( plan: &LogicalPlan, expr: &[Expr], inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new( - expr.to_vec(), - Arc::new(inputs[0].clone()), - )?)), + LogicalPlan::Projection(Projection { schema, .. }) => { + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr.to_vec(), + Arc::new(inputs[0].clone()), + schema.clone(), + )?)) + } LogicalPlan::Dml(DmlStatement { table_name, table_schema, @@ -818,19 +824,23 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window(Window { window_expr, .. }) => { - Ok(LogicalPlan::Window(Window::try_new( - expr[0..window_expr.len()].to_vec(), - Arc::new(inputs[0].clone()), - )?)) - } - LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => { - Ok(LogicalPlan::Aggregate(Aggregate::try_new( - Arc::new(inputs[0].clone()), - expr[0..group_expr.len()].to_vec(), - expr[group_expr.len()..].to_vec(), - )?)) - } + LogicalPlan::Window(Window { + window_expr, + schema, + .. + }) => Ok(LogicalPlan::Window(Window { + input: Arc::new(inputs[0].clone()), + window_expr: expr[0..window_expr.len()].to_vec(), + schema: schema.clone(), + })), + LogicalPlan::Aggregate(Aggregate { + group_expr, schema, .. + }) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( + Arc::new(inputs[0].clone()), + expr[0..group_expr.len()].to_vec(), + expr[group_expr.len()..].to_vec(), + schema.clone(), + )?)), LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()), diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 8edf734b474f9..5d1fef53520ba 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -43,7 +43,7 @@ use datafusion_expr::utils::from_plan; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, LogicalPlan, Operator, - WindowFrame, WindowFrameBound, WindowFrameUnits, + Projection, WindowFrame, WindowFrameBound, WindowFrameUnits, }; use datafusion_expr::{ExprSchemable, Signature}; @@ -109,7 +109,14 @@ fn analyze_internal( }) .collect::>>()?; - from_plan(plan, &new_expr, &new_inputs) + // TODO: from_plan can't change the schema, so we need to do this here + match &plan { + LogicalPlan::Projection(_) => Ok(LogicalPlan::Projection(Projection::try_new( + new_expr, + Arc::new(new_inputs[0].clone()), + )?)), + _ => from_plan(plan, &new_expr, &new_inputs), + } } pub(crate) struct TypeCoercionRewriter {