Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2358,7 +2358,6 @@ async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
for repartition_joins in test_repartition_joins {
let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;

// reduce to inner join, t2.t2_id will insert cast.
let sql =
"select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";
let msg = format!("Creating logical plan for '{}'", sql);
Expand Down Expand Up @@ -2400,3 +2399,52 @@ async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> {
let test_repartition_joins = vec![true, false];
for repartition_joins in test_repartition_joins {
let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;

let sql = "select *,t1.t1_id+11 from t1,t2 where t1.t1_id+11=t2.t2_id";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select *,t1.t1_id+10 from t1,t2 where t1.t1_id+10=t2.t2_id;
will generate empty result set, I change to t1_id+11.

let msg = format!("Creating logical plan for '{}'", sql);
let plan = ctx
.create_logical_plan(&("explain ".to_owned() + sql))
.expect(&msg);
let state = ctx.state();
let plan = state.optimize(&plan)?;

let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int, CAST(t1.t1_id AS Int64) + Int64(11) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t1.t1_id + Int64(11):Int64;N]",
" Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: t1.t1_id + Int64(11) = CAST(t2.t2_id AS Int64) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(11):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, CAST(t2.t2_id AS Int64):Int64;N]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left project is ...CAST(t1.t1_id AS Int64) + Int64(11), and the right project is ...CAST(t2.t2_id AS Int64) AS CAST(t2.t2_id AS Int64)
But the condition of join is t1.t1_id + Int64(11) = CAST(t2.t2_id AS Int64).

Do we need to do type coercion for the exprs in the Join Plan after this pr #4353?

cc @ygf11

Copy link
Contributor Author

@ygf11 ygf11 Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the condition of join is t1.t1_id + Int64(11) = CAST(t2.t2_id AS Int64).

The reason is projection will:

  • using expr.display_name() --- t1.t1_id + Int64(11) as field name to generate its schema, which we also use to find the field.
  • using full expr name --- CAST(t1.t1_id AS Int64) + Int64(11) to display the projection expression.

To address this gap, we can add alias or remove these projections #4389. I am working on #4389, and plan to submit a pr today or tomorrow.

Do we need to do type coercion for the exprs in the Join Plan after this pr #4353?

No, I think the type coercion already has done in this test case, because the join keys are from join filter which has been optimized.

https://github.com/apache/arrow-datafusion/blob/b822b0e5e582676eb58faf7fb89adc312dc95174/datafusion/expr/src/utils.rs#L481-L485

" Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(11) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(11):Int64;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) AS CAST(t2.t2_id AS Int64) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, CAST(t2.t2_id AS Int64):Int64;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
];

let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected, actual
);
let expected = vec![
"+-------+---------+--------+-------+---------+--------+----------------------+",
"| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int | t1.t1_id + Int64(11) |",
"+-------+---------+--------+-------+---------+--------+----------------------+",
"| 11 | a | 1 | 22 | y | 1 | 22 |",
"| 33 | c | 3 | 44 | x | 3 | 44 |",
"| 44 | d | 4 | 55 | w | 3 | 55 |",
"+-------+---------+--------+-------+---------+--------+----------------------+",
];

let results = execute_to_batches(&ctx, sql).await;
assert_batches_sorted_eq!(expected, &results);
}

Ok(())
}