From e50b016464b301d4dbaea26c49c579fa8e091ce6 Mon Sep 17 00:00:00 2001 From: duripeng Date: Tue, 29 Mar 2022 03:05:41 +0800 Subject: [PATCH 1/2] fix union all bug --- datafusion/common/src/dfschema.rs | 13 +++++++ datafusion/core/src/logical_plan/builder.rs | 43 ++++++++++++++++++--- datafusion/core/src/sql/planner.rs | 24 +++++++++--- 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 89a8b8c3cecc0..ddbe6f7bf4536 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::Column; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use std::fmt::{Display, Formatter}; @@ -281,6 +282,18 @@ impl DFSchema { .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name()) } + /// Check to see if fields in 2 Arrow schemas are compatible + pub fn check_arrow_schema_type_compatible(&self, arrow_schema: &Schema) -> bool { + let self_arrow_schema: Schema = self.into(); + self_arrow_schema + .fields() + .iter() + .zip(arrow_schema.fields().iter()) + .all(|(l_field, r_field)| { + can_cast_types(r_field.data_type(), l_field.data_type()) + }) + } + /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { DFSchema { diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 23a16fc472c15..8b7bcfbdfb612 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1021,6 +1021,29 @@ fn validate_unique_names<'a>( }) } +pub fn project_with_column_index_alias( + expr: Vec, + input: Arc, + schema: DFSchemaRef, + alias: Option, +) -> Result { + let alias_expr = expr + .into_iter() + .enumerate() + .map(|(i, e)| match e { + ignore_alias @ Expr::Alias { .. } => ignore_alias, + ignore_col @ Expr::Column { .. } => ignore_col, + x => x.alias(format!("column{}", i).as_str()), + }) + .collect::>(); + Ok(LogicalPlan::Projection(Projection { + expr: alias_expr, + input, + schema, + alias, + })) +} + /// Union two logical plans with an optional alias. pub fn union_with_alias( left_plan: LogicalPlan, @@ -1033,6 +1056,15 @@ pub fn union_with_alias( LogicalPlan::Union(Union { inputs, .. }) => inputs, x => vec![x], }) + .map(|p| match p { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + alias, + }) => project_with_column_index_alias(expr, input, schema, alias).unwrap(), + x => x, + }) .collect::>(); if inputs.is_empty() { return Err(DataFusionError::Plan("Empty UNION".to_string())); @@ -1043,19 +1075,20 @@ pub fn union_with_alias( Some(ref alias) => union_schema.replace_qualifier(alias.as_str()), None => union_schema.strip_qualifiers(), }); + if !inputs.iter().skip(1).all(|input_plan| { - // union changes all qualifers in resulting schema, so we only need to - // match against arrow schema here, which doesn't include qualifiers - union_schema.matches_arrow_schema(&((**input_plan.schema()).clone().into())) + // we need to check arrow field types cast compatible here + union_schema + .check_arrow_schema_type_compatible(&((**input_plan.schema()).clone().into())) }) { return Err(DataFusionError::Plan( - "UNION ALL schemas are expected to be the same".to_string(), + "UNION ALL schemas are expected to be arrow type compatible".to_string(), )); } Ok(LogicalPlan::Union(Union { - schema: union_schema, inputs, + schema: union_schema, alias, })) } diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 886052156305a..1ded440f2a7c5 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -3514,13 +3514,25 @@ mod tests { } #[test] - fn union_schemas_should_be_same() { + fn union_with_different_column_names() { let sql = "SELECT order_id from orders UNION ALL SELECT customer_id FROM orders"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "Plan(\"UNION ALL schemas are expected to be the same\")", - format!("{:?}", err) - ); + let expected = "Union\ + \n Projection: #orders.order_id\ + \n TableScan: orders projection=None\ + \n Projection: #orders.customer_id\ + \n TableScan: orders projection=None"; + quick_test(sql, expected); + } + + #[test] + fn union_values_with_no_alias() { + let sql = "SELECT 1, 2 UNION ALL SELECT 3, 4"; + let expected = "Union\ + \n Projection: Int64(1) AS column0, Int64(2) AS column1\ + \n EmptyRelation\ + \n Projection: Int64(3) AS column0, Int64(4) AS column1\ + \n EmptyRelation"; + quick_test(sql, expected); } #[test] From ff8d4dbb7d04e0ce19213d90bb98134916a7e81c Mon Sep 17 00:00:00 2001 From: duripeng Date: Sat, 2 Apr 2022 05:04:41 +0800 Subject: [PATCH 2/2] check type compatible return Result<()> --- datafusion/common/src/dfschema.rs | 18 +++++++++++++++--- datafusion/core/src/logical_plan/builder.rs | 17 ++++++++--------- datafusion/core/src/sql/planner.rs | 13 +++++++++++++ 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ddbe6f7bf4536..69eed6ab71597 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -283,14 +283,26 @@ impl DFSchema { } /// Check to see if fields in 2 Arrow schemas are compatible - pub fn check_arrow_schema_type_compatible(&self, arrow_schema: &Schema) -> bool { + pub fn check_arrow_schema_type_compatible( + &self, + arrow_schema: &Schema, + ) -> Result<()> { let self_arrow_schema: Schema = self.into(); self_arrow_schema .fields() .iter() .zip(arrow_schema.fields().iter()) - .all(|(l_field, r_field)| { - can_cast_types(r_field.data_type(), l_field.data_type()) + .try_for_each(|(l_field, r_field)| { + if !can_cast_types(r_field.data_type(), l_field.data_type()) { + Err(DataFusionError::Plan( + format!("Column {} (type: {}) is not compatible wiht column {} (type: {})", + r_field.name(), + r_field.data_type(), + l_field.name(), + l_field.data_type()))) + } else { + Ok(()) + } }) } diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 8b7bcfbdfb612..5a1659d92a2cf 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1076,15 +1076,14 @@ pub fn union_with_alias( None => union_schema.strip_qualifiers(), }); - if !inputs.iter().skip(1).all(|input_plan| { - // we need to check arrow field types cast compatible here - union_schema - .check_arrow_schema_type_compatible(&((**input_plan.schema()).clone().into())) - }) { - return Err(DataFusionError::Plan( - "UNION ALL schemas are expected to be arrow type compatible".to_string(), - )); - } + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; Ok(LogicalPlan::Union(Union { inputs, diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 1ded440f2a7c5..440911000bfeb 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -3535,6 +3535,19 @@ mod tests { quick_test(sql, expected); } + #[test] + fn union_with_incompatible_data_type() { + let sql = "SELECT interval '1 year 1 day' UNION ALL SELECT 1"; + let err = logical_plan(sql).expect_err("query should have failed"); + assert_eq!( + "Plan(\"Column Int64(1) (type: Int64) is \ + not compatible wiht column IntervalMonthDayNano\ + (\\\"950737950189618795196236955648\\\") \ + (type: Interval(MonthDayNano))\")", + format!("{:?}", err) + ); + } + #[test] fn empty_over() { let sql = "SELECT order_id, MAX(order_id) OVER () from orders";