Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::error::{DataFusionError, Result};
use crate::Column;

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use std::fmt::{Display, Formatter};

Expand Down Expand Up @@ -281,6 +282,30 @@ impl DFSchema {
.all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
}

/// Check to see if fields in 2 Arrow schemas are compatible
pub fn check_arrow_schema_type_compatible(
&self,
arrow_schema: &Schema,
) -> Result<()> {
let self_arrow_schema: Schema = self.into();
self_arrow_schema
.fields()
.iter()
.zip(arrow_schema.fields().iter())
.try_for_each(|(l_field, r_field)| {
if !can_cast_types(r_field.data_type(), l_field.data_type()) {
Err(DataFusionError::Plan(
format!("Column {} (type: {}) is not compatible wiht column {} (type: {})",
r_field.name(),
r_field.data_type(),
l_field.name(),
l_field.data_type())))
} else {
Ok(())
}
})
}

/// Strip all field qualifier in schema
pub fn strip_qualifiers(self) -> Self {
DFSchema {
Expand Down
52 changes: 42 additions & 10 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,29 @@ fn validate_unique_names<'a>(
})
}

pub fn project_with_column_index_alias(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Result<LogicalPlan> {
let alias_expr = expr
.into_iter()
.enumerate()
.map(|(i, e)| match e {
ignore_alias @ Expr::Alias { .. } => ignore_alias,
ignore_col @ Expr::Column { .. } => ignore_col,
x => x.alias(format!("column{}", i).as_str()),
})
.collect::<Vec<_>>();
Ok(LogicalPlan::Projection(Projection {
expr: alias_expr,
input,
schema,
alias,
}))
}

/// Union two logical plans with an optional alias.
pub fn union_with_alias(
left_plan: LogicalPlan,
Expand All @@ -1033,6 +1056,15 @@ pub fn union_with_alias(
LogicalPlan::Union(Union { inputs, .. }) => inputs,
x => vec![x],
})
.map(|p| match p {
LogicalPlan::Projection(Projection {
expr,
input,
schema,
alias,
}) => project_with_column_index_alias(expr, input, schema, alias).unwrap(),
x => x,
})
.collect::<Vec<_>>();
if inputs.is_empty() {
return Err(DataFusionError::Plan("Empty UNION".to_string()));
Expand All @@ -1043,19 +1075,19 @@ pub fn union_with_alias(
Some(ref alias) => union_schema.replace_qualifier(alias.as_str()),
None => union_schema.strip_qualifiers(),
});
if !inputs.iter().skip(1).all(|input_plan| {
// union changes all qualifers in resulting schema, so we only need to
// match against arrow schema here, which doesn't include qualifiers
union_schema.matches_arrow_schema(&((**input_plan.schema()).clone().into()))
}) {
return Err(DataFusionError::Plan(
"UNION ALL schemas are expected to be the same".to_string(),
));
}

inputs
.iter()
.skip(1)
.try_for_each(|input_plan| -> Result<()> {
union_schema.check_arrow_schema_type_compatible(
&((**input_plan.schema()).clone().into()),
)
})?;

Ok(LogicalPlan::Union(Union {
schema: union_schema,
inputs,
schema: union_schema,
alias,
}))
}
Expand Down
29 changes: 27 additions & 2 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3514,11 +3514,36 @@ mod tests {
}

#[test]
fn union_schemas_should_be_same() {
fn union_with_different_column_names() {
let sql = "SELECT order_id from orders UNION ALL SELECT customer_id FROM orders";
let expected = "Union\
\n Projection: #orders.order_id\
\n TableScan: orders projection=None\
\n Projection: #orders.customer_id\
\n TableScan: orders projection=None";
quick_test(sql, expected);
}

#[test]
fn union_values_with_no_alias() {
let sql = "SELECT 1, 2 UNION ALL SELECT 3, 4";
let expected = "Union\
\n Projection: Int64(1) AS column0, Int64(2) AS column1\
\n EmptyRelation\
\n Projection: Int64(3) AS column0, Int64(4) AS column1\
\n EmptyRelation";
quick_test(sql, expected);
}

#[test]
fn union_with_incompatible_data_type() {
let sql = "SELECT interval '1 year 1 day' UNION ALL SELECT 1";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"Plan(\"UNION ALL schemas are expected to be the same\")",
"Plan(\"Column Int64(1) (type: Int64) is \
not compatible wiht column IntervalMonthDayNano\
(\\\"950737950189618795196236955648\\\") \
(type: Interval(MonthDayNano))\")",
format!("{:?}", err)
);
}
Expand Down