diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 89a8b8c3cecc0..69eed6ab71597 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::Column; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use std::fmt::{Display, Formatter}; @@ -281,6 +282,30 @@ impl DFSchema { .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name()) } + /// Check to see if fields in 2 Arrow schemas are compatible + pub fn check_arrow_schema_type_compatible( + &self, + arrow_schema: &Schema, + ) -> Result<()> { + let self_arrow_schema: Schema = self.into(); + self_arrow_schema + .fields() + .iter() + .zip(arrow_schema.fields().iter()) + .try_for_each(|(l_field, r_field)| { + if !can_cast_types(r_field.data_type(), l_field.data_type()) { + Err(DataFusionError::Plan( + format!("Column {} (type: {}) is not compatible wiht column {} (type: {})", + r_field.name(), + r_field.data_type(), + l_field.name(), + l_field.data_type()))) + } else { + Ok(()) + } + }) + } + /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { DFSchema { diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 23a16fc472c15..5a1659d92a2cf 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1021,6 +1021,29 @@ fn validate_unique_names<'a>( }) } +pub fn project_with_column_index_alias( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, +) -> Result { + let alias_expr = expr + .into_iter() + .enumerate() + .map(|(i, e)| match e { + ignore_alias @ Expr::Alias { .. } => ignore_alias, + ignore_col @ Expr::Column { .. } => ignore_col, + x => x.alias(format!("column{}", i).as_str()), + }) + .collect::>(); + Ok(LogicalPlan::Projection(Projection { + expr: alias_expr, + input, + schema, + alias, + })) +} + /// Union two logical plans with an optional alias. pub fn union_with_alias( left_plan: LogicalPlan, @@ -1033,6 +1056,15 @@ pub fn union_with_alias( LogicalPlan::Union(Union { inputs, .. }) => inputs, x => vec![x], }) + .map(|p| match p { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + alias, + }) => project_with_column_index_alias(expr, input, schema, alias).unwrap(), + x => x, + }) .collect::>(); if inputs.is_empty() { return Err(DataFusionError::Plan("Empty UNION".to_string())); @@ -1043,19 +1075,19 @@ pub fn union_with_alias( Some(ref alias) => union_schema.replace_qualifier(alias.as_str()), None => union_schema.strip_qualifiers(), }); - if !inputs.iter().skip(1).all(|input_plan| { - // union changes all qualifers in resulting schema, so we only need to - // match against arrow schema here, which doesn't include qualifiers - union_schema.matches_arrow_schema(&((**input_plan.schema()).clone().into())) - }) { - return Err(DataFusionError::Plan( - "UNION ALL schemas are expected to be the same".to_string(), - )); - } + + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; Ok(LogicalPlan::Union(Union { - schema: union_schema, inputs, + schema: union_schema, alias, })) } diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 886052156305a..440911000bfeb 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -3514,11 +3514,36 @@ mod tests { } #[test] - fn union_schemas_should_be_same() { + fn union_with_different_column_names() { let sql = "SELECT order_id from orders UNION ALL SELECT customer_id FROM orders"; + let expected = "Union\ + \n Projection: #orders.order_id\ + \n TableScan: orders projection=None\ + \n Projection: #orders.customer_id\ + \n TableScan: orders projection=None"; + quick_test(sql, expected); + } + + #[test] + fn union_values_with_no_alias() { + let sql = "SELECT 1, 2 UNION ALL SELECT 3, 4"; + let expected = "Union\ + \n Projection: Int64(1) AS column0, Int64(2) AS column1\ + \n EmptyRelation\ + \n Projection: Int64(3) AS column0, Int64(4) AS column1\ + \n EmptyRelation"; + quick_test(sql, expected); + } + + #[test] + fn union_with_incompatible_data_type() { + let sql = "SELECT interval '1 year 1 day' UNION ALL SELECT 1"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "Plan(\"UNION ALL schemas are expected to be the same\")", + "Plan(\"Column Int64(1) (type: Int64) is \ + not compatible wiht column IntervalMonthDayNano\ + (\\\"950737950189618795196236955648\\\") \ + (type: Interval(MonthDayNano))\")", format!("{:?}", err) ); }