From e019ccccbcdf22bda62e382242db4b585bdc9e58 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Fri, 9 Aug 2024 23:22:13 +0800 Subject: [PATCH 01/13] Improve type coercion for `UNION` --- datafusion/expr/src/logical_plan/builder.rs | 69 +------ datafusion/optimizer/src/analyzer/mod.rs | 1 + .../optimizer/src/analyzer/type_coercion.rs | 23 ++- datafusion/optimizer/src/analyzer/union.rs | 175 ++++++++++++++++++ 4 files changed, 196 insertions(+), 72 deletions(-) create mode 100644 datafusion/optimizer/src/analyzer/union.rs diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 4ef346656ff40..fddfb2c8aa3c7 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -20,7 +20,6 @@ use std::any::Any; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; -use std::iter::zip; use std::sync::Arc; use crate::dml::CopyTo; @@ -36,7 +35,7 @@ use crate::logical_plan::{ Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values, Window, }; -use crate::type_coercion::binary::{comparison_coercion, values_coercion}; +use crate::type_coercion::binary::values_coercion; use crate::utils::{ can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard, expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair, @@ -1366,68 +1365,12 @@ pub fn project_with_column_index( /// Union two logical plans. pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { - let left_col_num = left_plan.schema().fields().len(); - - // check union plan length same. - let right_col_num = right_plan.schema().fields().len(); - if right_col_num != left_col_num { - return plan_err!( - "Union queries must have the same number of columns, (left is {left_col_num}, right is {right_col_num})"); - } - - // create union schema - let union_qualified_fields = - zip(left_plan.schema().iter(), right_plan.schema().iter()) - .map( - |((left_qualifier, left_field), (_right_qualifier, right_field))| { - let nullable = left_field.is_nullable() || right_field.is_nullable(); - let data_type = comparison_coercion( - left_field.data_type(), - right_field.data_type(), - ) - .ok_or_else(|| { - plan_datafusion_err!( - "UNION Column {} (type: {}) is not compatible with column {} (type: {})", - right_field.name(), - right_field.data_type(), - left_field.name(), - left_field.data_type() - ) - })?; - Ok(( - left_qualifier.cloned(), - Arc::new(Field::new(left_field.name(), data_type, nullable)), - )) - }, - ) - .collect::>>()?; - let union_schema = - DFSchema::new_with_metadata(union_qualified_fields, HashMap::new())?; - - let inputs = vec![left_plan, right_plan] - .into_iter() - .map(|p| { - let plan = coerce_plan_expr_for_schema(&p, &union_schema)?; - match plan { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - Ok(Arc::new(project_with_column_index( - expr, - input, - Arc::new(union_schema.clone()), - )?)) - } - other_plan => Ok(Arc::new(other_plan)), - } - }) - .collect::>>()?; - - if inputs.is_empty() { - return plan_err!("Empty UNION"); - } - + // Use the schema from the left input temporarily, and later rely on the analyzer + // to coerce it to a common schema. + let schema = left_plan.schema().clone(); Ok(LogicalPlan::Union(Union { - inputs, - schema: Arc::new(union_schema), + inputs: vec![Arc::new(left_plan), Arc::new(right_plan)], + schema: schema, })) } diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 91ee8a9e1033a..65f7c4a783e78 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -42,6 +42,7 @@ pub mod function_rewrite; pub mod inline_table_scan; pub mod subquery; pub mod type_coercion; +mod union; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 2bb859d84ad79..a84186de3a123 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -47,10 +47,11 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, - AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, Operator, ScalarUDF, - WindowFrame, WindowFrameBound, WindowFrameUnits, + AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, Join, LogicalPlan, Operator, + ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, }; +use crate::analyzer::union::coerce_union; use crate::analyzer::AnalyzerRule; use crate::utils::NamePreserver; @@ -120,8 +121,8 @@ fn analyze_internal( expr.rewrite(&mut expr_rewrite)? .map_data(|expr| original_name.restore(expr)) })? - // coerce join expressions specially - .map_data(|plan| expr_rewrite.coerce_joins(plan))? + // some plans need to be rewritten after the expressions have been updated + .map_data(|plan| expr_rewrite.coerce_plan(plan))? // recompute the schema after the expressions have been rewritten as the types may have changed .map_data(|plan| plan.recompute_schema()) } @@ -135,6 +136,14 @@ impl<'a> TypeCoercionRewriter<'a> { Self { schema } } + fn coerce_plan(&mut self, plan: LogicalPlan) -> Result { + match plan { + LogicalPlan::Join(join) => self.coerce_join(join), + LogicalPlan::Union(union) => coerce_union(union), + _ => Ok(plan), + } + } + /// Coerce join equality expressions and join filter /// /// Joins must be treated specially as their equality expressions are stored @@ -143,11 +152,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// /// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored /// as a list of `(t1.a, t2.b), (t1.x, t2.y)` - fn coerce_joins(&mut self, plan: LogicalPlan) -> Result { - let LogicalPlan::Join(mut join) = plan else { - return Ok(plan); - }; - + fn coerce_join(&mut self, mut join: Join) -> Result { join.on = join .on .into_iter() diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union.rs new file mode 100644 index 0000000000000..f00c14684bec8 --- /dev/null +++ b/datafusion/optimizer/src/analyzer/union.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! type coercion for UNION + +use itertools::izip; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::datatypes::Field; + +use datafusion_common::{ + plan_datafusion_err, plan_err, DFSchema, Result, TableReference, +}; +use datafusion_expr::expr::Alias; +use datafusion_expr::type_coercion::binary::comparison_coercion; +use datafusion_expr::{Expr, ExprSchemable, LogicalPlan, Projection, Union}; + +/// Coerce the schema of the inputs to a common schema +fn coerce_union_schema(inputs: Vec>) -> Result { + let base_schema = inputs[0].schema(); + let mut union_datatypes = base_schema + .fields() + .iter() + .map(|f| f.data_type().clone()) + .collect::>(); + let mut union_nullabilities = base_schema + .fields() + .iter() + .map(|f| f.is_nullable()) + .collect::>(); + + for (i, plan) in inputs.iter().enumerate().skip(1) { + let plan_schema = plan.schema(); + if plan_schema.fields().len() != base_schema.fields().len() { + return plan_err!( + "Union schemas have different number of fields, + query 1 is {}, query {} is {}", + base_schema.fields().len(), + i + 1, + plan_schema.fields().len() + ); + } + // coerce data type and nullablity for each field + for (union_datatype, union_nullable, plan_field) in izip!( + union_datatypes.iter_mut(), + union_nullabilities.iter_mut(), + plan_schema.fields() + ) { + let coerced_type = + comparison_coercion(union_datatype, plan_field.data_type()).ok_or_else( + || { + plan_datafusion_err!( + "UNION Column {} (type: {}) is not compatible with other type: {}", + plan_field.name(), + plan_field.data_type(), + union_datatype + ) + }, + )?; + *union_datatype = coerced_type; + *union_nullable = *union_nullable || plan_field.is_nullable(); + } + } + let union_qualified_fields = izip!( + base_schema.iter(), + union_datatypes.into_iter(), + union_nullabilities + ) + .map(|((qualifier, field), datatype, nullable)| { + let field = Arc::new(Field::new(field.name().clone(), datatype, nullable)); + (qualifier.cloned(), field) + }) + .collect::>(); + DFSchema::new_with_metadata(union_qualified_fields, HashMap::new()) +} + +/// Make sure that the schemas of all inputs are compatible with each other, +/// which includes having the same field types and names. +pub(crate) fn coerce_union(union_plan: Union) -> Result { + let union_schema = coerce_union_schema(union_plan.inputs.clone())?; + let new_inputs = union_plan + .inputs + .iter() + .map(|plan| coerce_plan_expr_for_schema(&plan, &union_schema).map(Arc::new)) + .collect::>>()?; + Ok(LogicalPlan::Union(Union { + inputs: new_inputs, + schema: Arc::new(union_schema), + })) +} + +/// Returns plan with expressions coerced to types compatible with +/// schema types +pub(crate) fn coerce_plan_expr_for_schema( + plan: &LogicalPlan, + schema: &DFSchema, +) -> Result { + match plan { + // special case Projection to avoid adding multiple projections + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let new_exprs = + coerce_exprs_for_schema(expr.clone(), input.schema(), schema)?; + let projection = Projection::try_new(new_exprs, Arc::clone(input))?; + Ok(LogicalPlan::Projection(projection)) + } + _ => { + let exprs: Vec = plan.schema().iter().map(Expr::from).collect(); + + let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?; + let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none()); + if add_project { + let projection = Projection::try_new(new_exprs, Arc::new(plan.clone()))?; + Ok(LogicalPlan::Projection(projection)) + } else { + Ok(plan.clone()) + } + } + } +} + +fn same_qualified_name( + src_expr: &Expr, + dst_qualifier: Option<&TableReference>, + dst_field: &Field, +) -> bool { + match &src_expr { + Expr::Column(c) => { + c.relation.as_ref() == dst_qualifier && c.name == *dst_field.name() + } + Expr::Alias(Alias { relation, name, .. }) => { + relation.as_ref() == dst_qualifier || name == dst_field.name() + } + _ => { + dst_qualifier.is_none() + && src_expr.schema_name().to_string() == *dst_field.name() + } + } +} + +fn coerce_exprs_for_schema( + exprs: Vec, + src_schema: &DFSchema, + dst_schema: &DFSchema, +) -> Result> { + exprs + .into_iter() + .enumerate() + .map(|(idx, expr)| { + let (dst_qualifier, dst_field) = dst_schema.qualified_field(idx); + let mut new_expr = + expr.unalias().cast_to(dst_field.data_type(), src_schema)?; + // Make sure the new expression has the same qualified name as the dst_field + if !same_qualified_name(&new_expr, dst_qualifier, dst_field) { + new_expr = + new_expr.alias_qualified(dst_qualifier.cloned(), dst_field.name()); + } + Ok(new_expr) + }) + .collect::>() +} From 9fca95786acd3b77761a68dc8f880a79b701ed37 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Sat, 10 Aug 2024 00:39:07 +0800 Subject: [PATCH 02/13] fix clippy --- datafusion/expr/src/logical_plan/builder.rs | 8 ++++---- datafusion/optimizer/src/analyzer/union.rs | 7 ++++++- datafusion/sqllogictest/test_files/dictionary.slt | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index fddfb2c8aa3c7..c44ddb2440889 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1365,12 +1365,12 @@ pub fn project_with_column_index( /// Union two logical plans. pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { - // Use the schema from the left input temporarily, and later rely on the analyzer - // to coerce it to a common schema. - let schema = left_plan.schema().clone(); + // Temporarily use the schema from the left input and later rely on the analyzer to + // coerce the two schemas into a common one. + let schema = Arc::clone(left_plan.schema()); Ok(LogicalPlan::Union(Union { inputs: vec![Arc::new(left_plan), Arc::new(right_plan)], - schema: schema, + schema, })) } diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union.rs index f00c14684bec8..530362a0d333e 100644 --- a/datafusion/optimizer/src/analyzer/union.rs +++ b/datafusion/optimizer/src/analyzer/union.rs @@ -96,7 +96,7 @@ pub(crate) fn coerce_union(union_plan: Union) -> Result { let new_inputs = union_plan .inputs .iter() - .map(|plan| coerce_plan_expr_for_schema(&plan, &union_schema).map(Arc::new)) + .map(|plan| coerce_plan_expr_for_schema(plan, &union_schema).map(Arc::new)) .collect::>>()?; Ok(LogicalPlan::Union(Union { inputs: new_inputs, @@ -173,3 +173,8 @@ fn coerce_exprs_for_schema( }) .collect::>() } + +#[cfg(test)] +mod test { + use super::*; +} diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt index ec8a514885647..92d6d3ba68b4e 100644 --- a/datafusion/sqllogictest/test_files/dictionary.slt +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -208,7 +208,7 @@ true false NULL true true false true NULL # Reproducer for https://github.com/apache/datafusion/issues/8738 # This query should work correctly -query P?TT rowsort +query P?TR rowsort SELECT "data"."timestamp" as "time", "data"."tag_id", @@ -264,7 +264,7 @@ ORDER BY # deterministic sort (so we can avoid rowsort) -query P?TT +query P?TR SELECT "data"."timestamp" as "time", "data"."tag_id", From e69a1a1646bc9127fc8a159315b979a058e7413f Mon Sep 17 00:00:00 2001 From: jonahgao Date: Sun, 11 Aug 2024 23:40:11 +0800 Subject: [PATCH 03/13] fix test --- datafusion/expr/src/logical_plan/builder.rs | 17 ------- datafusion/optimizer/src/analyzer/union.rs | 50 +++++++++++++++++++-- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c44ddb2440889..0914a76452be4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1824,23 +1824,6 @@ mod tests { Ok(()) } - #[test] - fn plan_builder_union_different_num_columns_error() -> Result<()> { - let plan1 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; - let plan2 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; - - let expected = "Error during planning: Union queries must have the same number of columns, (left is 1, right is 2)"; - let err_msg1 = plan1.clone().union(plan2.clone().build()?).unwrap_err(); - let err_msg2 = plan1.union_distinct(plan2.build()?).unwrap_err(); - - assert_eq!(err_msg1.strip_backtrace(), expected); - assert_eq!(err_msg2.strip_backtrace(), expected); - - Ok(()) - } - #[test] fn plan_builder_simple_distinct() -> Result<()> { let plan = diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union.rs index 530362a0d333e..1d8d8cc83c4c1 100644 --- a/datafusion/optimizer/src/analyzer/union.rs +++ b/datafusion/optimizer/src/analyzer/union.rs @@ -48,8 +48,8 @@ fn coerce_union_schema(inputs: Vec>) -> Result { let plan_schema = plan.schema(); if plan_schema.fields().len() != base_schema.fields().len() { return plan_err!( - "Union schemas have different number of fields, - query 1 is {}, query {} is {}", + "Union schemas have different number of fields,\ + query 1 has {}, query {} has {}", base_schema.fields().len(), i + 1, plan_schema.fields().len() @@ -176,5 +176,49 @@ fn coerce_exprs_for_schema( #[cfg(test)] mod test { - use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{Result, TableReference}; + use datafusion_expr::builder::table_scan; + use std::sync::Arc; + + use crate::analyzer::TypeCoercion; + use crate::test::assert_analyzer_check_err; + + fn employee_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("first_name", DataType::Utf8, false), + Field::new("last_name", DataType::Utf8, false), + Field::new("state", DataType::Utf8, false), + Field::new("salary", DataType::Int32, false), + ]) + } + + #[test] + fn union_different_num_columns_error() -> Result<()> { + let plan1 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; + let plan2 = + table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))? + .build()?; + + let expected = "type_coercion\n\ + caused by\n\ + Error during planning: Union schemas have different number of fields,\ + query 1 has 1, query 2 has 2"; + let union_plan = plan1.clone().union(plan2.clone())?.build()?; + assert_analyzer_check_err( + vec![Arc::new(TypeCoercion::new())], + union_plan, + expected, + ); + + let union_distinct = plan1.union_distinct(plan2)?.build()?; + assert_analyzer_check_err( + vec![Arc::new(TypeCoercion::new())], + union_distinct, + expected, + ); + Ok(()) + } } From b67f11c6591d10ccabaee4f406780160cba14fc6 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 12 Aug 2024 23:11:26 +0800 Subject: [PATCH 04/13] fix sqllogictests --- datafusion/expr/src/logical_plan/builder.rs | 25 ------ datafusion/optimizer/src/analyzer/union.rs | 78 ++++++++++++------- .../sqllogictest/test_files/dictionary.slt | 4 +- datafusion/sqllogictest/test_files/union.slt | 11 +++ 4 files changed, 62 insertions(+), 56 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 0914a76452be4..1887e96edf086 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1338,31 +1338,6 @@ pub(crate) fn validate_unique_names<'a>( }) } -pub fn project_with_column_index( - expr: Vec, - input: Arc, - schema: DFSchemaRef, -) -> Result { - let alias_expr = expr - .into_iter() - .enumerate() - .map(|(i, e)| match e { - Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { - e.unalias().alias(schema.field(i).name()) - } - Expr::Column(Column { - relation: _, - ref name, - }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), - Expr::Alias { .. } | Expr::Column { .. } => e, - _ => e.alias(schema.field(i).name()), - }) - .collect::>(); - - Projection::try_new_with_schema(alias_expr, input, schema) - .map(LogicalPlan::Projection) -} - /// Union two logical plans. pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { // Temporarily use the schema from the left input and later rely on the analyzer to diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union.rs index 1d8d8cc83c4c1..51f05faada520 100644 --- a/datafusion/optimizer/src/analyzer/union.rs +++ b/datafusion/optimizer/src/analyzer/union.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::datatypes::Field; use datafusion_common::{ - plan_datafusion_err, plan_err, DFSchema, Result, TableReference, + plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::type_coercion::binary::comparison_coercion; @@ -96,7 +96,19 @@ pub(crate) fn coerce_union(union_plan: Union) -> Result { let new_inputs = union_plan .inputs .iter() - .map(|plan| coerce_plan_expr_for_schema(plan, &union_schema).map(Arc::new)) + .map(|p| { + let plan = coerce_plan_expr_for_schema(&p, &union_schema)?; + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + Ok(Arc::new(project_with_column_index( + expr, + input, + Arc::new(union_schema.clone()), + )?)) + } + other_plan => Ok(Arc::new(other_plan)), + } + }) .collect::>>()?; Ok(LogicalPlan::Union(Union { inputs: new_inputs, @@ -133,25 +145,6 @@ pub(crate) fn coerce_plan_expr_for_schema( } } -fn same_qualified_name( - src_expr: &Expr, - dst_qualifier: Option<&TableReference>, - dst_field: &Field, -) -> bool { - match &src_expr { - Expr::Column(c) => { - c.relation.as_ref() == dst_qualifier && c.name == *dst_field.name() - } - Expr::Alias(Alias { relation, name, .. }) => { - relation.as_ref() == dst_qualifier || name == dst_field.name() - } - _ => { - dst_qualifier.is_none() - && src_expr.schema_name().to_string() == *dst_field.name() - } - } -} - fn coerce_exprs_for_schema( exprs: Vec, src_schema: &DFSchema, @@ -161,19 +154,46 @@ fn coerce_exprs_for_schema( .into_iter() .enumerate() .map(|(idx, expr)| { - let (dst_qualifier, dst_field) = dst_schema.qualified_field(idx); - let mut new_expr = - expr.unalias().cast_to(dst_field.data_type(), src_schema)?; - // Make sure the new expression has the same qualified name as the dst_field - if !same_qualified_name(&new_expr, dst_qualifier, dst_field) { - new_expr = - new_expr.alias_qualified(dst_qualifier.cloned(), dst_field.name()); + let new_type = dst_schema.field(idx).data_type(); + if new_type != &expr.get_type(src_schema)? { + match expr { + Expr::Alias(Alias { expr, name, .. }) => { + Ok(expr.cast_to(new_type, src_schema)?.alias(name)) + } + _ => expr.cast_to(new_type, src_schema), + } + } else { + Ok(expr) } - Ok(new_expr) }) .collect::>() } +fn project_with_column_index( + expr: Vec, + input: Arc, + schema: DFSchemaRef, +) -> Result { + let alias_expr = expr + .into_iter() + .enumerate() + .map(|(i, e)| match e { + Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { + e.unalias().alias(schema.field(i).name()) + } + Expr::Column(Column { + relation: _, + ref name, + }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), + Expr::Alias { .. } | Expr::Column { .. } => e, + _ => e.alias(schema.field(i).name()), + }) + .collect::>(); + + Projection::try_new_with_schema(alias_expr, input, schema) + .map(LogicalPlan::Projection) +} + #[cfg(test)] mod test { use arrow::datatypes::{DataType, Field, Schema}; diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt index 92d6d3ba68b4e..ec8a514885647 100644 --- a/datafusion/sqllogictest/test_files/dictionary.slt +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -208,7 +208,7 @@ true false NULL true true false true NULL # Reproducer for https://github.com/apache/datafusion/issues/8738 # This query should work correctly -query P?TR rowsort +query P?TT rowsort SELECT "data"."timestamp" as "time", "data"."tag_id", @@ -264,7 +264,7 @@ ORDER BY # deterministic sort (so we can avoid rowsort) -query P?TR +query P?TT SELECT "data"."timestamp" as "time", "data"."tag_id", diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 476ebe7ebebe1..57f718142e7aa 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -642,3 +642,14 @@ DROP TABLE t1; statement ok DROP TABLE t2; + +# Test issue: https://github.com/apache/datafusion/issues/11742 +query R rowsort +WITH + tt(v1) AS (VALUES (1::INT),(NULL::INT)) +SELECT NVL(v1, 0.5) FROM tt + UNION ALL +SELECT NULL WHERE FALSE; +---- +0.5 +1 \ No newline at end of file From 5ad7243b93e710059156afc382a5b4a9d1c9831d Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 12 Aug 2024 23:20:39 +0800 Subject: [PATCH 05/13] fix EliminateNestedUnion tests --- datafusion/optimizer/src/analyzer/union.rs | 1 + datafusion/optimizer/src/eliminate_nested_union.rs | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union.rs index 51f05faada520..aa6716c86a15b 100644 --- a/datafusion/optimizer/src/analyzer/union.rs +++ b/datafusion/optimizer/src/analyzer/union.rs @@ -169,6 +169,7 @@ fn coerce_exprs_for_schema( .collect::>() } +/// See https://github.com/apache/datafusion/pull/2108 fn project_with_column_index( expr: Vec, input: Arc, diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index cc8cf1f56c184..5f41e4f137b15 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -114,8 +114,11 @@ fn extract_plan_from_distinct(plan: Arc) -> Arc { #[cfg(test)] mod tests { use super::*; + use crate::analyzer::type_coercion::TypeCoercion; + use crate::analyzer::Analyzer; use crate::test::*; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::config::ConfigOptions; use datafusion_expr::{col, logical_plan::table_scan}; fn schema() -> Schema { @@ -127,7 +130,14 @@ mod tests { } fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq(Arc::new(EliminateNestedUnion::new()), plan, expected) + let options = ConfigOptions::default(); + let analyzed_plan = Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]) + .execute_and_check(plan, &options, |_, _| {})?; + assert_optimized_plan_eq( + Arc::new(EliminateNestedUnion::new()), + analyzed_plan, + expected, + ) } #[test] From d96892cb5381f2b5de3799ec8ec7b69c4ccf6e97 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Tue, 13 Aug 2024 16:01:28 +0800 Subject: [PATCH 06/13] Move tests to slt --- datafusion/optimizer/src/analyzer/mod.rs | 2 +- .../optimizer/src/analyzer/type_coercion.rs | 4 +- .../analyzer/{union.rs => union_coercion.rs} | 61 +----- datafusion/sql/tests/sql_integration.rs | 142 ------------- .../sqllogictest/test_files/type_coercion.slt | 196 ++++++++++++++++++ 5 files changed, 206 insertions(+), 199 deletions(-) rename datafusion/optimizer/src/analyzer/{union.rs => union_coercion.rs} (76%) diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 65f7c4a783e78..435b86fd973be 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -42,7 +42,7 @@ pub mod function_rewrite; pub mod inline_table_scan; pub mod subquery; pub mod type_coercion; -mod union; +mod union_coercion; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index a84186de3a123..462696929d7a8 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -51,7 +51,7 @@ use datafusion_expr::{ ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, }; -use crate::analyzer::union::coerce_union; +use crate::analyzer::union_coercion::coerce_union; use crate::analyzer::AnalyzerRule; use crate::utils::NamePreserver; @@ -121,7 +121,7 @@ fn analyze_internal( expr.rewrite(&mut expr_rewrite)? .map_data(|expr| original_name.restore(expr)) })? - // some plans need to be rewritten after the expressions have been updated + // some plans need extra coercion after their expressions are coerced .map_data(|plan| expr_rewrite.coerce_plan(plan))? // recompute the schema after the expressions have been rewritten as the types may have changed .map_data(|plan| plan.recompute_schema()) diff --git a/datafusion/optimizer/src/analyzer/union.rs b/datafusion/optimizer/src/analyzer/union_coercion.rs similarity index 76% rename from datafusion/optimizer/src/analyzer/union.rs rename to datafusion/optimizer/src/analyzer/union_coercion.rs index aa6716c86a15b..66a80ddb232c7 100644 --- a/datafusion/optimizer/src/analyzer/union.rs +++ b/datafusion/optimizer/src/analyzer/union_coercion.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! type coercion for UNION +//! Helper functions of TypeCoercion that are applied to UNION. use itertools::izip; use std::collections::HashMap; @@ -30,7 +30,7 @@ use datafusion_expr::expr::Alias; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::{Expr, ExprSchemable, LogicalPlan, Projection, Union}; -/// Coerce the schema of the inputs to a common schema +/// Get a common schema that is compatible with all inputs of UNION. fn coerce_union_schema(inputs: Vec>) -> Result { let base_schema = inputs[0].schema(); let mut union_datatypes = base_schema @@ -48,8 +48,8 @@ fn coerce_union_schema(inputs: Vec>) -> Result { let plan_schema = plan.schema(); if plan_schema.fields().len() != base_schema.fields().len() { return plan_err!( - "Union schemas have different number of fields,\ - query 1 has {}, query {} has {}", + "Union schemas have different number of fields: \ + query 1 has {} fields whereas query {} has {} fields", base_schema.fields().len(), i + 1, plan_schema.fields().len() @@ -89,15 +89,14 @@ fn coerce_union_schema(inputs: Vec>) -> Result { DFSchema::new_with_metadata(union_qualified_fields, HashMap::new()) } -/// Make sure that the schemas of all inputs are compatible with each other, -/// which includes having the same field types and names. +/// Coerce the union's inputs to a common schema pub(crate) fn coerce_union(union_plan: Union) -> Result { let union_schema = coerce_union_schema(union_plan.inputs.clone())?; let new_inputs = union_plan .inputs .iter() .map(|p| { - let plan = coerce_plan_expr_for_schema(&p, &union_schema)?; + let plan = coerce_plan_expr_for_schema(p, &union_schema)?; match plan { LogicalPlan::Projection(Projection { expr, input, .. }) => { Ok(Arc::new(project_with_column_index( @@ -196,50 +195,4 @@ fn project_with_column_index( } #[cfg(test)] -mod test { - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{Result, TableReference}; - use datafusion_expr::builder::table_scan; - use std::sync::Arc; - - use crate::analyzer::TypeCoercion; - use crate::test::assert_analyzer_check_err; - - fn employee_schema() -> Schema { - Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("first_name", DataType::Utf8, false), - Field::new("last_name", DataType::Utf8, false), - Field::new("state", DataType::Utf8, false), - Field::new("salary", DataType::Int32, false), - ]) - } - - #[test] - fn union_different_num_columns_error() -> Result<()> { - let plan1 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; - let plan2 = - table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))? - .build()?; - - let expected = "type_coercion\n\ - caused by\n\ - Error during planning: Union schemas have different number of fields,\ - query 1 has 1, query 2 has 2"; - let union_plan = plan1.clone().union(plan2.clone())?.build()?; - assert_analyzer_check_err( - vec![Arc::new(TypeCoercion::new())], - union_plan, - expected, - ); - - let union_distinct = plan1.union_distinct(plan2)?.build()?; - assert_analyzer_check_err( - vec![Arc::new(TypeCoercion::new())], - union_distinct, - expected, - ); - Ok(()) - } -} +mod test {} diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 4d7e60805657c..6c41f82269a5d 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2174,148 +2174,6 @@ fn union_all() { quick_test(sql, expected); } -#[test] -fn union_with_different_column_names() { - let sql = "SELECT order_id from orders UNION ALL SELECT customer_id FROM orders"; - let expected = "Union\ - \n Projection: orders.order_id\ - \n TableScan: orders\ - \n Projection: orders.customer_id AS order_id\ - \n TableScan: orders"; - quick_test(sql, expected); -} - -#[test] -fn union_values_with_no_alias() { - let sql = "SELECT 1, 2 UNION ALL SELECT 3, 4"; - let expected = "Union\ - \n Projection: Int64(1) AS Int64(1), Int64(2) AS Int64(2)\ - \n EmptyRelation\ - \n Projection: Int64(3) AS Int64(1), Int64(4) AS Int64(2)\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_incompatible_data_type() { - let sql = "SELECT interval '1 year 1 day' UNION ALL SELECT 1"; - let err = logical_plan(sql) - .expect_err("query should have failed") - .strip_backtrace(); - assert_eq!( - "Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with column IntervalMonthDayNano(\"IntervalMonthDayNano { months: 12, days: 1, nanoseconds: 0 }\") (type: Interval(MonthDayNano))", - err - ); -} - -#[test] -fn union_with_different_decimal_data_types() { - let sql = "SELECT 1 a UNION ALL SELECT 1.1 a"; - let expected = "Union\ - \n Projection: CAST(Int64(1) AS Float64) AS a\ - \n EmptyRelation\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_null() { - let sql = "SELECT NULL a UNION ALL SELECT 1.1 a"; - let expected = "Union\ - \n Projection: CAST(NULL AS Float64) AS a\ - \n EmptyRelation\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_float_and_string() { - let sql = "SELECT 'a' a UNION ALL SELECT 1.1 a"; - let expected = "Union\ - \n Projection: Utf8(\"a\") AS a\ - \n EmptyRelation\ - \n Projection: CAST(Float64(1.1) AS Utf8) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_multiply_cols() { - let sql = "SELECT 'a' a, 1 b UNION ALL SELECT 1.1 a, 1.1 b"; - let expected = "Union\ - \n Projection: Utf8(\"a\") AS a, CAST(Int64(1) AS Float64) AS b\ - \n EmptyRelation\ - \n Projection: CAST(Float64(1.1) AS Utf8) AS a, Float64(1.1) AS b\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn sorted_union_with_different_types_and_group_by() { - let sql = "SELECT a FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1"; - let expected = "Sort: x.a ASC NULLS LAST\ - \n Union\ - \n Projection: CAST(x.a AS Float64) AS a\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: x.a\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_binary_expr_and_cast() { - let sql = "SELECT cast(0.0 + a as integer) FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT 2.1 + a FROM (select 1 a) x GROUP BY 1)"; - let expected = "Union\ - \n Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a\ - \n Aggregate: groupBy=[[CAST(Float64(0) + x.a AS Int32)]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: Float64(2.1) + x.a AS Float64(0) + x.a\ - \n Aggregate: groupBy=[[Float64(2.1) + x.a]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_aliases() { - let sql = "SELECT a as a1 FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a as a1 FROM (select 1.1 a) x GROUP BY 1)"; - let expected = "Union\ - \n Projection: CAST(x.a AS Float64) AS a1\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Int64(1) AS a\ - \n EmptyRelation\ - \n Projection: x.a AS a1\ - \n Aggregate: groupBy=[[x.a]], aggr=[[]]\ - \n SubqueryAlias: x\ - \n Projection: Float64(1.1) AS a\ - \n EmptyRelation"; - quick_test(sql, expected); -} - -#[test] -fn union_with_incompatible_data_types() { - let sql = "SELECT 'a' a UNION ALL SELECT true a"; - let err = logical_plan(sql) - .expect_err("query should have failed") - .strip_backtrace(); - assert_eq!( - "Error during planning: UNION Column a (type: Boolean) is not compatible with column a (type: Utf8)", - err - ); -} - #[test] fn empty_over() { let sql = "SELECT order_id, MAX(order_id) OVER () from orders"; diff --git a/datafusion/sqllogictest/test_files/type_coercion.slt b/datafusion/sqllogictest/test_files/type_coercion.slt index aa1e6826eca55..316280b552c03 100644 --- a/datafusion/sqllogictest/test_files/type_coercion.slt +++ b/datafusion/sqllogictest/test_files/type_coercion.slt @@ -49,3 +49,199 @@ select interval '1 month' - '2023-05-01'::date; # interval - timestamp query error DataFusion error: Error during planning: Cannot coerce arithmetic expression Interval\(MonthDayNano\) \- Timestamp\(Nanosecond, None\) to valid types SELECT interval '1 month' - '2023-05-01 12:30:00'::timestamp; + + +#################################### +## Test type coercion with UNIONs ## +#################################### + +# Disable optimizer to test only the analyzer with type coercion +statement ok +set datafusion.optimizer.max_passes = 0; + +statement ok +set datafusion.explain.logical_plan_only = true; + +# Create test table +statement ok +CREATE TABLE orders( + order_id INT UNSIGNED NOT NULL, + customer_id INT UNSIGNED NOT NULL, + o_item_id VARCHAR NOT NULL, + qty INT NOT NULL, + price DOUBLE NOT NULL, + delivered BOOLEAN NOT NULL +); + +# union_different_num_columns_error() / UNION +statement error +SELECT order_id FROM orders UNION SELECT customer_id, o_item_id FROM orders +---- +DataFusion error: type_coercion +caused by +Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields + + +# union_different_num_columns_error() / UNION ALL +statement error +SELECT order_id FROM orders UNION ALL SELECT customer_id, o_item_id FROM orders +---- +DataFusion error: type_coercion +caused by +Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields + + +# union_with_different_column_names() +query TT +EXPLAIN SELECT order_id from orders UNION ALL SELECT customer_id FROM orders +---- +logical_plan +01)Union +02)--Projection: orders.order_id +03)----TableScan: orders +04)--Projection: orders.customer_id AS order_id +05)----TableScan: orders + +# union_values_with_no_alias() +query TT +EXPLAIN SELECT 1, 2 UNION ALL SELECT 3, 4 +---- +logical_plan +01)Union +02)--Projection: Int64(1) AS Int64(1), Int64(2) AS Int64(2) +03)----EmptyRelation +04)--Projection: Int64(3) AS Int64(1), Int64(4) AS Int64(2) +05)----EmptyRelation + +# union_with_incompatible_data_type() +statement error +SELECT interval '1 year 1 day' UNION ALL SELECT 1 +---- +DataFusion error: type_coercion +caused by +Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with other type: Interval(MonthDayNano) + + +# union_with_different_decimal_data_types() +query TT +EXPLAIN SELECT 1 a UNION ALL SELECT 1.1 a +---- +logical_plan +01)Union +02)--Projection: CAST(Int64(1) AS Float64) AS a +03)----EmptyRelation +04)--Projection: Float64(1.1) AS a +05)----EmptyRelation + +# union_with_null() +query TT +EXPLAIN SELECT NULL a UNION ALL SELECT 1.1 a +---- +logical_plan +01)Union +02)--Projection: CAST(NULL AS Float64) AS a +03)----EmptyRelation +04)--Projection: Float64(1.1) AS a +05)----EmptyRelation + +# union_with_float_and_string() +query TT +EXPLAIN SELECT 'a' a UNION ALL SELECT 1.1 a +---- +logical_plan +01)Union +02)--Projection: Utf8("a") AS a +03)----EmptyRelation +04)--Projection: CAST(Float64(1.1) AS Utf8) AS a +05)----EmptyRelation + +# union_with_multiply_cols() +query TT +EXPLAIN SELECT 'a' a, 1 b UNION ALL SELECT 1.1 a, 1.1 b +---- +logical_plan +01)Union +02)--Projection: Utf8("a") AS a, CAST(Int64(1) AS Float64) AS b +03)----EmptyRelation +04)--Projection: CAST(Float64(1.1) AS Utf8) AS a, Float64(1.1) AS b +05)----EmptyRelation + +# sorted_union_with_different_types_and_group_by() +query TT +EXPLAIN SELECT a FROM (select 1 a) x GROUP BY 1 + UNION ALL +(SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1 +---- +logical_plan +01)Sort: x.a ASC NULLS LAST +02)--Union +03)----Projection: CAST(x.a AS Float64) AS a +04)------Aggregate: groupBy=[[x.a]], aggr=[[]] +05)--------SubqueryAlias: x +06)----------Projection: Int64(1) AS a +07)------------EmptyRelation +08)----Projection: x.a +09)------Aggregate: groupBy=[[x.a]], aggr=[[]] +10)--------SubqueryAlias: x +11)----------Projection: Float64(1.1) AS a +12)------------EmptyRelation + +# union_with_binary_expr_and_cast() +query TT +EXPLAIN SELECT cast(0.0 + a as integer) FROM (select 1 a) x GROUP BY 1 + UNION ALL +(SELECT 2.1 + a FROM (select 1 a) x GROUP BY 1) +---- +logical_plan +01)Union +02)--Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a +03)----Aggregate: groupBy=[[CAST(Float64(0) + CAST(x.a AS Float64) AS Int32)]], aggr=[[]] +04)------SubqueryAlias: x +05)--------Projection: Int64(1) AS a +06)----------EmptyRelation +07)--Projection: Float64(2.1) + x.a AS Float64(0) + x.a +08)----Aggregate: groupBy=[[Float64(2.1) + CAST(x.a AS Float64)]], aggr=[[]] +09)------SubqueryAlias: x +10)--------Projection: Int64(1) AS a +11)----------EmptyRelation + +# union_with_aliases() +query TT +EXPLAIN SELECT a as a1 FROM (select 1 a) x GROUP BY 1 + UNION ALL +(SELECT a as a1 FROM (select 1.1 a) x GROUP BY 1) +---- +logical_plan +01)Union +02)--Projection: CAST(x.a AS Float64) AS a1 +03)----Aggregate: groupBy=[[x.a]], aggr=[[]] +04)------SubqueryAlias: x +05)--------Projection: Int64(1) AS a +06)----------EmptyRelation +07)--Projection: x.a AS a1 +08)----Aggregate: groupBy=[[x.a]], aggr=[[]] +09)------SubqueryAlias: x +10)--------Projection: Float64(1.1) AS a +11)----------EmptyRelation + +# union_with_incompatible_data_types() +statement error +SELECT 'a' a UNION ALL SELECT true a +---- +DataFusion error: type_coercion +caused by +Error during planning: UNION Column a (type: Boolean) is not compatible with other type: Utf8 + + +statement ok +SET datafusion.optimizer.max_passes = 3; + +statement ok +SET datafusion.explain.logical_plan_only = false; + +statement error DataFusion error: Execution error: Table 'order' doesn't exist\. +DROP TABLE order; + +######################################## +## Test type coercion with UNIONs end ## +######################################## From 6c05d17a9c73689b449bf661f6cf416bdce836f2 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Tue, 13 Aug 2024 16:24:40 +0800 Subject: [PATCH 07/13] Move union_coercion to type_coercion.rs --- datafusion/optimizer/src/analyzer/mod.rs | 1 - .../optimizer/src/analyzer/type_coercion.rs | 128 ++++++++++- .../optimizer/src/analyzer/union_coercion.rs | 198 ------------------ 3 files changed, 121 insertions(+), 206 deletions(-) delete mode 100644 datafusion/optimizer/src/analyzer/union_coercion.rs diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 435b86fd973be..91ee8a9e1033a 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -42,7 +42,6 @@ pub mod function_rewrite; pub mod inline_table_scan; pub mod subquery; pub mod type_coercion; -mod union_coercion; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 462696929d7a8..5b30e0d287197 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -17,20 +17,24 @@ //! Optimizer rule for type validation and coercion +use std::collections::HashMap; use std::sync::Arc; -use arrow::datatypes::{DataType, IntervalUnit}; +use itertools::izip; + +use arrow::datatypes::{DataType, Field, IntervalUnit}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, - DataFusionError, Result, ScalarValue, + exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, + DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{ - self, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, ScalarFunction, - WindowFunction, + self, Alias, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, + ScalarFunction, WindowFunction, }; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; use datafusion_expr::expr_schema::cast_subquery; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::Subquery; @@ -48,10 +52,9 @@ use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, AggregateUDF, Expr, ExprFunctionExt, ExprSchemable, Join, LogicalPlan, Operator, - ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, + Projection, ScalarUDF, Union, WindowFrame, WindowFrameBound, WindowFrameUnits, }; -use crate::analyzer::union_coercion::coerce_union; use crate::analyzer::AnalyzerRule; use crate::utils::NamePreserver; @@ -779,6 +782,117 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { Ok(Case::new(case_expr, when_then, else_expr)) } +/// Get a common schema that is compatible with all inputs of UNION. +fn coerce_union_schema(inputs: Vec>) -> Result { + let base_schema = inputs[0].schema(); + let mut union_datatypes = base_schema + .fields() + .iter() + .map(|f| f.data_type().clone()) + .collect::>(); + let mut union_nullabilities = base_schema + .fields() + .iter() + .map(|f| f.is_nullable()) + .collect::>(); + + for (i, plan) in inputs.iter().enumerate().skip(1) { + let plan_schema = plan.schema(); + if plan_schema.fields().len() != base_schema.fields().len() { + return plan_err!( + "Union schemas have different number of fields: \ + query 1 has {} fields whereas query {} has {} fields", + base_schema.fields().len(), + i + 1, + plan_schema.fields().len() + ); + } + // coerce data type and nullablity for each field + for (union_datatype, union_nullable, plan_field) in izip!( + union_datatypes.iter_mut(), + union_nullabilities.iter_mut(), + plan_schema.fields() + ) { + let coerced_type = + comparison_coercion(union_datatype, plan_field.data_type()).ok_or_else( + || { + plan_datafusion_err!( + "UNION Column {} (type: {}) is not compatible with other type: {}", + plan_field.name(), + plan_field.data_type(), + union_datatype + ) + }, + )?; + *union_datatype = coerced_type; + *union_nullable = *union_nullable || plan_field.is_nullable(); + } + } + let union_qualified_fields = izip!( + base_schema.iter(), + union_datatypes.into_iter(), + union_nullabilities + ) + .map(|((qualifier, field), datatype, nullable)| { + let field = Arc::new(Field::new(field.name().clone(), datatype, nullable)); + (qualifier.cloned(), field) + }) + .collect::>(); + DFSchema::new_with_metadata(union_qualified_fields, HashMap::new()) +} + +/// Coerce the union's inputs to a common schema +fn coerce_union(union_plan: Union) -> Result { + let union_schema = coerce_union_schema(union_plan.inputs.clone())?; + let new_inputs = union_plan + .inputs + .iter() + .map(|p| { + let plan = coerce_plan_expr_for_schema(p, &union_schema)?; + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + Ok(Arc::new(project_with_column_index( + expr, + input, + Arc::new(union_schema.clone()), + )?)) + } + other_plan => Ok(Arc::new(other_plan)), + } + }) + .collect::>>()?; + Ok(LogicalPlan::Union(Union { + inputs: new_inputs, + schema: Arc::new(union_schema), + })) +} + +/// See https://github.com/apache/datafusion/pull/2108 +fn project_with_column_index( + expr: Vec, + input: Arc, + schema: DFSchemaRef, +) -> Result { + let alias_expr = expr + .into_iter() + .enumerate() + .map(|(i, e)| match e { + Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { + e.unalias().alias(schema.field(i).name()) + } + Expr::Column(Column { + relation: _, + ref name, + }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), + Expr::Alias { .. } | Expr::Column { .. } => e, + _ => e.alias(schema.field(i).name()), + }) + .collect::>(); + + Projection::try_new_with_schema(alias_expr, input, schema) + .map(LogicalPlan::Projection) +} + #[cfg(test)] mod test { use std::any::Any; diff --git a/datafusion/optimizer/src/analyzer/union_coercion.rs b/datafusion/optimizer/src/analyzer/union_coercion.rs deleted file mode 100644 index 66a80ddb232c7..0000000000000 --- a/datafusion/optimizer/src/analyzer/union_coercion.rs +++ /dev/null @@ -1,198 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Helper functions of TypeCoercion that are applied to UNION. - -use itertools::izip; -use std::collections::HashMap; -use std::sync::Arc; - -use arrow::datatypes::Field; - -use datafusion_common::{ - plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, Result, -}; -use datafusion_expr::expr::Alias; -use datafusion_expr::type_coercion::binary::comparison_coercion; -use datafusion_expr::{Expr, ExprSchemable, LogicalPlan, Projection, Union}; - -/// Get a common schema that is compatible with all inputs of UNION. -fn coerce_union_schema(inputs: Vec>) -> Result { - let base_schema = inputs[0].schema(); - let mut union_datatypes = base_schema - .fields() - .iter() - .map(|f| f.data_type().clone()) - .collect::>(); - let mut union_nullabilities = base_schema - .fields() - .iter() - .map(|f| f.is_nullable()) - .collect::>(); - - for (i, plan) in inputs.iter().enumerate().skip(1) { - let plan_schema = plan.schema(); - if plan_schema.fields().len() != base_schema.fields().len() { - return plan_err!( - "Union schemas have different number of fields: \ - query 1 has {} fields whereas query {} has {} fields", - base_schema.fields().len(), - i + 1, - plan_schema.fields().len() - ); - } - // coerce data type and nullablity for each field - for (union_datatype, union_nullable, plan_field) in izip!( - union_datatypes.iter_mut(), - union_nullabilities.iter_mut(), - plan_schema.fields() - ) { - let coerced_type = - comparison_coercion(union_datatype, plan_field.data_type()).ok_or_else( - || { - plan_datafusion_err!( - "UNION Column {} (type: {}) is not compatible with other type: {}", - plan_field.name(), - plan_field.data_type(), - union_datatype - ) - }, - )?; - *union_datatype = coerced_type; - *union_nullable = *union_nullable || plan_field.is_nullable(); - } - } - let union_qualified_fields = izip!( - base_schema.iter(), - union_datatypes.into_iter(), - union_nullabilities - ) - .map(|((qualifier, field), datatype, nullable)| { - let field = Arc::new(Field::new(field.name().clone(), datatype, nullable)); - (qualifier.cloned(), field) - }) - .collect::>(); - DFSchema::new_with_metadata(union_qualified_fields, HashMap::new()) -} - -/// Coerce the union's inputs to a common schema -pub(crate) fn coerce_union(union_plan: Union) -> Result { - let union_schema = coerce_union_schema(union_plan.inputs.clone())?; - let new_inputs = union_plan - .inputs - .iter() - .map(|p| { - let plan = coerce_plan_expr_for_schema(p, &union_schema)?; - match plan { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - Ok(Arc::new(project_with_column_index( - expr, - input, - Arc::new(union_schema.clone()), - )?)) - } - other_plan => Ok(Arc::new(other_plan)), - } - }) - .collect::>>()?; - Ok(LogicalPlan::Union(Union { - inputs: new_inputs, - schema: Arc::new(union_schema), - })) -} - -/// Returns plan with expressions coerced to types compatible with -/// schema types -pub(crate) fn coerce_plan_expr_for_schema( - plan: &LogicalPlan, - schema: &DFSchema, -) -> Result { - match plan { - // special case Projection to avoid adding multiple projections - LogicalPlan::Projection(Projection { expr, input, .. }) => { - let new_exprs = - coerce_exprs_for_schema(expr.clone(), input.schema(), schema)?; - let projection = Projection::try_new(new_exprs, Arc::clone(input))?; - Ok(LogicalPlan::Projection(projection)) - } - _ => { - let exprs: Vec = plan.schema().iter().map(Expr::from).collect(); - - let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?; - let add_project = new_exprs.iter().any(|expr| expr.try_as_col().is_none()); - if add_project { - let projection = Projection::try_new(new_exprs, Arc::new(plan.clone()))?; - Ok(LogicalPlan::Projection(projection)) - } else { - Ok(plan.clone()) - } - } - } -} - -fn coerce_exprs_for_schema( - exprs: Vec, - src_schema: &DFSchema, - dst_schema: &DFSchema, -) -> Result> { - exprs - .into_iter() - .enumerate() - .map(|(idx, expr)| { - let new_type = dst_schema.field(idx).data_type(); - if new_type != &expr.get_type(src_schema)? { - match expr { - Expr::Alias(Alias { expr, name, .. }) => { - Ok(expr.cast_to(new_type, src_schema)?.alias(name)) - } - _ => expr.cast_to(new_type, src_schema), - } - } else { - Ok(expr) - } - }) - .collect::>() -} - -/// See https://github.com/apache/datafusion/pull/2108 -fn project_with_column_index( - expr: Vec, - input: Arc, - schema: DFSchemaRef, -) -> Result { - let alias_expr = expr - .into_iter() - .enumerate() - .map(|(i, e)| match e { - Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { - e.unalias().alias(schema.field(i).name()) - } - Expr::Column(Column { - relation: _, - ref name, - }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), - Expr::Alias { .. } | Expr::Column { .. } => e, - _ => e.alias(schema.field(i).name()), - }) - .collect::>(); - - Projection::try_new_with_schema(alias_expr, input, schema) - .map(LogicalPlan::Projection) -} - -#[cfg(test)] -mod test {} From 21f93dcafe9c87671d333d36d6bf1bb4d26e8f6a Mon Sep 17 00:00:00 2001 From: jonahgao Date: Tue, 13 Aug 2024 16:56:46 +0800 Subject: [PATCH 08/13] fix tests --- .../optimizer/src/analyzer/type_coercion.rs | 2 +- .../sqllogictest/test_files/type_coercion.slt | 32 ++++--------------- 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 5b30e0d287197..0b95f0507cd53 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -817,7 +817,7 @@ fn coerce_union_schema(inputs: Vec>) -> Result { comparison_coercion(union_datatype, plan_field.data_type()).ok_or_else( || { plan_datafusion_err!( - "UNION Column {} (type: {}) is not compatible with other type: {}", + "UNION Column '{}' (type: {}) is not compatible with other type: {}", plan_field.name(), plan_field.data_type(), union_datatype diff --git a/datafusion/sqllogictest/test_files/type_coercion.slt b/datafusion/sqllogictest/test_files/type_coercion.slt index 316280b552c03..52be8c779004f 100644 --- a/datafusion/sqllogictest/test_files/type_coercion.slt +++ b/datafusion/sqllogictest/test_files/type_coercion.slt @@ -74,22 +74,12 @@ CREATE TABLE orders( ); # union_different_num_columns_error() / UNION -statement error +query error Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields SELECT order_id FROM orders UNION SELECT customer_id, o_item_id FROM orders ----- -DataFusion error: type_coercion -caused by -Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields - # union_different_num_columns_error() / UNION ALL -statement error +query error Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields SELECT order_id FROM orders UNION ALL SELECT customer_id, o_item_id FROM orders ----- -DataFusion error: type_coercion -caused by -Error during planning: Union schemas have different number of fields: query 1 has 1 fields whereas query 2 has 2 fields - # union_with_different_column_names() query TT @@ -114,13 +104,8 @@ logical_plan 05)----EmptyRelation # union_with_incompatible_data_type() -statement error +query error Error during planning: UNION Column 'Int64\(1\)' \(type: Int64\) is not compatible with other type: Interval\(MonthDayNano\) SELECT interval '1 year 1 day' UNION ALL SELECT 1 ----- -DataFusion error: type_coercion -caused by -Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with other type: Interval(MonthDayNano) - # union_with_different_decimal_data_types() query TT @@ -225,13 +210,8 @@ logical_plan 11)----------EmptyRelation # union_with_incompatible_data_types() -statement error +query error Error during planning: UNION Column 'a' \(type: Boolean\) is not compatible with other type: Utf8 SELECT 'a' a UNION ALL SELECT true a ----- -DataFusion error: type_coercion -caused by -Error during planning: UNION Column a (type: Boolean) is not compatible with other type: Utf8 - statement ok SET datafusion.optimizer.max_passes = 3; @@ -239,8 +219,8 @@ SET datafusion.optimizer.max_passes = 3; statement ok SET datafusion.explain.logical_plan_only = false; -statement error DataFusion error: Execution error: Table 'order' doesn't exist\. -DROP TABLE order; +statement ok +DROP TABLE orders; ######################################## ## Test type coercion with UNIONs end ## From c2235df778fd83efbeb3cd6bb3ac1107e20f041e Mon Sep 17 00:00:00 2001 From: jonahgao Date: Tue, 13 Aug 2024 17:08:58 +0800 Subject: [PATCH 09/13] fix cargo doc --- datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 0b95f0507cd53..51b89d97230e5 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -867,7 +867,7 @@ fn coerce_union(union_plan: Union) -> Result { })) } -/// See https://github.com/apache/datafusion/pull/2108 +/// See `` fn project_with_column_index( expr: Vec, input: Arc, From e5c2dc9c1bbabab36c1a14369762699ae942e501 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Wed, 14 Aug 2024 10:18:14 +0800 Subject: [PATCH 10/13] Improve error msg --- datafusion/optimizer/src/analyzer/type_coercion.rs | 11 ++++++----- datafusion/sqllogictest/test_files/type_coercion.slt | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index add0f093cd734..1a5edde6e3038 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -816,11 +816,12 @@ fn coerce_union_schema(inputs: Vec>) -> Result { comparison_coercion(union_datatype, plan_field.data_type()).ok_or_else( || { plan_datafusion_err!( - "UNION Column '{}' (type: {}) is not compatible with other type: {}", - plan_field.name(), - plan_field.data_type(), - union_datatype - ) + "Incompatible inputs for Union: Previous inputs were \ + of type {}, but got incompatible type {} on column '{}'", + union_datatype, + plan_field.data_type(), + plan_field.name() + ) }, )?; *union_datatype = coerced_type; diff --git a/datafusion/sqllogictest/test_files/type_coercion.slt b/datafusion/sqllogictest/test_files/type_coercion.slt index 52be8c779004f..e420c0cc71554 100644 --- a/datafusion/sqllogictest/test_files/type_coercion.slt +++ b/datafusion/sqllogictest/test_files/type_coercion.slt @@ -104,7 +104,7 @@ logical_plan 05)----EmptyRelation # union_with_incompatible_data_type() -query error Error during planning: UNION Column 'Int64\(1\)' \(type: Int64\) is not compatible with other type: Interval\(MonthDayNano\) +query error Incompatible inputs for Union: Previous inputs were of type Interval\(MonthDayNano\), but got incompatible type Int64 on column 'Int64\(1\)' SELECT interval '1 year 1 day' UNION ALL SELECT 1 # union_with_different_decimal_data_types() @@ -210,7 +210,7 @@ logical_plan 11)----------EmptyRelation # union_with_incompatible_data_types() -query error Error during planning: UNION Column 'a' \(type: Boolean\) is not compatible with other type: Utf8 +query error Incompatible inputs for Union: Previous inputs were of type Utf8, but got incompatible type Boolean on column 'a' SELECT 'a' a UNION ALL SELECT true a statement ok From 723fd43d07a9313db5e2d582b26c2b1ff57fd9f9 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Wed, 14 Aug 2024 10:25:19 +0800 Subject: [PATCH 11/13] As static member --- .../optimizer/src/analyzer/type_coercion.rs | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 1a5edde6e3038..3c8c24963058b 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -141,7 +141,7 @@ impl<'a> TypeCoercionRewriter<'a> { fn coerce_plan(&mut self, plan: LogicalPlan) -> Result { match plan { LogicalPlan::Join(join) => self.coerce_join(join), - LogicalPlan::Union(union) => coerce_union(union), + LogicalPlan::Union(union) => Self::coerce_union(union), _ => Ok(plan), } } @@ -175,6 +175,33 @@ impl<'a> TypeCoercionRewriter<'a> { Ok(LogicalPlan::Join(join)) } + /// Coerce the union’s inputs to a common schema compatible with all inputs. + /// This occurs after wildcard expansion and the coercion of the input expressions. + fn coerce_union(union_plan: Union) -> Result { + let union_schema = coerce_union_schema(union_plan.inputs.clone())?; + let new_inputs = union_plan + .inputs + .iter() + .map(|p| { + let plan = coerce_plan_expr_for_schema(p, &union_schema)?; + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + Ok(Arc::new(project_with_column_index( + expr, + input, + Arc::new(union_schema.clone()), + )?)) + } + other_plan => Ok(Arc::new(other_plan)), + } + }) + .collect::>>()?; + Ok(LogicalPlan::Union(Union { + inputs: new_inputs, + schema: Arc::new(union_schema), + })) + } + fn coerce_join_filter(&self, expr: Expr) -> Result { let expr_type = expr.get_type(self.schema)?; match expr_type { @@ -841,33 +868,6 @@ fn coerce_union_schema(inputs: Vec>) -> Result { DFSchema::new_with_metadata(union_qualified_fields, HashMap::new()) } -/// Coerce the union’s inputs to a common schema. This occurs after -/// wildcard expansion and the coercion of the input expressions. -fn coerce_union(union_plan: Union) -> Result { - let union_schema = coerce_union_schema(union_plan.inputs.clone())?; - let new_inputs = union_plan - .inputs - .iter() - .map(|p| { - let plan = coerce_plan_expr_for_schema(p, &union_schema)?; - match plan { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - Ok(Arc::new(project_with_column_index( - expr, - input, - Arc::new(union_schema.clone()), - )?)) - } - other_plan => Ok(Arc::new(other_plan)), - } - }) - .collect::>>()?; - Ok(LogicalPlan::Union(Union { - inputs: new_inputs, - schema: Arc::new(union_schema), - })) -} - /// See `` fn project_with_column_index( expr: Vec, From 7e8b3f407f43a9a18fc98b55d039717607884bad Mon Sep 17 00:00:00 2001 From: jonahgao Date: Wed, 14 Aug 2024 10:30:45 +0800 Subject: [PATCH 12/13] Avoid clone --- datafusion/optimizer/src/analyzer/type_coercion.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 3c8c24963058b..cfadc34c66316 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -178,7 +178,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. fn coerce_union(union_plan: Union) -> Result { - let union_schema = coerce_union_schema(union_plan.inputs.clone())?; + let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?); let new_inputs = union_plan .inputs .iter() @@ -189,7 +189,7 @@ impl<'a> TypeCoercionRewriter<'a> { Ok(Arc::new(project_with_column_index( expr, input, - Arc::new(union_schema.clone()), + Arc::clone(&union_schema), )?)) } other_plan => Ok(Arc::new(other_plan)), @@ -198,7 +198,7 @@ impl<'a> TypeCoercionRewriter<'a> { .collect::>>()?; Ok(LogicalPlan::Union(Union { inputs: new_inputs, - schema: Arc::new(union_schema), + schema: union_schema, })) } @@ -809,7 +809,7 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { } /// Get a common schema that is compatible with all inputs of UNION. -fn coerce_union_schema(inputs: Vec>) -> Result { +fn coerce_union_schema(inputs: &Vec>) -> Result { let base_schema = inputs[0].schema(); let mut union_datatypes = base_schema .fields() From d42486e7abb555a3f26e6e6b3929cf38cfa7a568 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Wed, 14 Aug 2024 10:46:45 +0800 Subject: [PATCH 13/13] Fix clippy --- datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index cfadc34c66316..40efbba6de7a5 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -809,7 +809,7 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { } /// Get a common schema that is compatible with all inputs of UNION. -fn coerce_union_schema(inputs: &Vec>) -> Result { +fn coerce_union_schema(inputs: &[Arc]) -> Result { let base_schema = inputs[0].schema(); let mut union_datatypes = base_schema .fields()