diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index b6c78b0cf3814..5c4171484af59 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -2452,29 +2452,29 @@ async fn both_side_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(12)\", index: 2 }], 2)", - " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]", + " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2)", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + Int64(1)\", index: 1 }], 2)", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]", + " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2)", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", - ] + ] } else { vec![ "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]", " CoalescePartitionsExec", - " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", ] @@ -2524,10 +2524,10 @@ async fn left_side_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 2 }], 2)", - " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]", + " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2)", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", " CoalesceBatchesExec: target_batch_size=4096", @@ -2541,9 +2541,9 @@ async fn left_side_expr_key_inner_join() -> Result<()> { " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]", " CoalescePartitionsExec", - " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", " MemoryExec: partitions=1, partition_sizes=[1]", @@ -2594,14 +2594,14 @@ async fn right_side_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 1 }], 2)", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]", + " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2)", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", ] @@ -2610,9 +2610,9 @@ async fn right_side_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]", " MemoryExec: partitions=1, partition_sizes=[1]", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", ] @@ -2662,14 +2662,14 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 3 }], 2)", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]", + " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2)", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", ] @@ -2678,9 +2678,9 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> { "ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]", " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]", " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]", " MemoryExec: partitions=1, partition_sizes=[1]", - " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]", + " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]", " RepartitionExec: partitioning=RoundRobinBatch(2)", " MemoryExec: partitions=1, partition_sizes=[1]", ] diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index a3b240e65386a..87c7b2597fb10 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -586,12 +586,13 @@ pub fn from_plan( // The preceding part of expr is equi-exprs, // and the struct of each equi-expr is like `left-expr = right-expr`. let new_on:Vec<(Expr,Expr)> = expr.iter().take(equi_expr_count).map(|equi_expr| { - if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = equi_expr { - assert!(op == &Operator::Eq); - Ok(((**left).clone(), (**right).clone())) + // SimplifyExpression rule may add alias to the equi_expr. + let unalias_expr = equi_expr.clone().unalias(); + if let Expr::BinaryExpr(BinaryExpr { left, op:Operator::Eq, right }) = unalias_expr { + Ok((*left, *right)) } else { Err(DataFusionError::Internal(format!( - "The front part expressions should be an binary expression, actual:{}", + "The front part expressions should be an binary equiality expression, actual:{}", equi_expr ))) } diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 6645284e39768..e4fe2e13751ab 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -120,6 +120,7 @@ mod tests { use crate::simplify_expressions::utils::for_test::{ cast_to_int64_expr, now_expr, to_timestamp_expr, }; + use crate::test::test_table_scan_with_name; use super::*; use arrow::datatypes::{DataType, Field, Schema}; @@ -131,7 +132,7 @@ mod tests { use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, - ExprSchemable, + ExprSchemable, JoinType, }; /// A macro to assert that one string is contained within another with @@ -789,4 +790,30 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } + + #[test] + fn simplify_equijoin_predicate() -> Result<()> { + let t1 = test_table_scan_with_name("t1")?; + let t2 = test_table_scan_with_name("t2")?; + + let left_key = col("t1.a") + lit(1i64).cast_to(&DataType::UInt32, t1.schema())?; + let right_key = + col("t2.a") + lit(2i64).cast_to(&DataType::UInt32, t2.schema())?; + let plan = LogicalPlanBuilder::from(t1) + .join_with_expr_keys( + t2, + JoinType::Inner, + (vec![left_key], vec![right_key]), + None, + )? + .build()?; + + // before simplify: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32) + // after simplify: t1.a + UInt32(1) = t2.a + UInt32(2) AS t1.a + Int64(1) = t2.a + Int64(2) + let expected = "Inner Join: t1.a + UInt32(1) = t2.a + UInt32(2)\ + \n TableScan: t1\ + \n TableScan: t2"; + + assert_optimized_plan_eq(&plan, expected) + } }