diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 635a7eb5a9611..91a871d52e9ad 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1074,7 +1074,7 @@ impl LogicalPlanBuilder { let left_keys = left_keys.into_iter().collect::>>()?; let right_keys = right_keys.into_iter().collect::>>()?; - let on = left_keys + let on: Vec<_> = left_keys .into_iter() .zip(right_keys) .map(|(l, r)| (Expr::Column(l), Expr::Column(r))) @@ -1082,6 +1082,11 @@ impl LogicalPlanBuilder { let join_schema = build_join_schema(self.plan.schema(), right.schema(), &join_type)?; + // Inner type without join condition is cross join + if join_type != JoinType::Inner && on.is_empty() && filter.is_none() { + return plan_err!("join condition should not be empty"); + } + Ok(Self::new(LogicalPlan::Join(Join { left: self.plan, right: Arc::new(right), diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 04ff94347247c..1e9ef16bde675 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -242,16 +242,13 @@ fn transformed_limit( fn push_down_join(mut join: Join, limit: usize) -> Transformed { use JoinType::*; - fn is_no_join_condition(join: &Join) -> bool { - join.on.is_empty() && join.filter.is_none() + // Cross join is the special case of inner join where there is no join condition. see [LogicalPlanBuilder::cross_join] + fn is_cross_join(join: &Join) -> bool { + join.join_type == Inner && join.on.is_empty() && join.filter.is_none() } - let (left_limit, right_limit) = if is_no_join_condition(&join) { - match join.join_type { - Left | Right | Full | Inner => (Some(limit), Some(limit)), - LeftAnti | LeftSemi | LeftMark => (Some(limit), None), - RightAnti | RightSemi => (None, Some(limit)), - } + let (left_limit, right_limit) = if is_cross_join(&join) { + (Some(limit), Some(limit)) } else { match join.join_type { Left => (Some(limit), None), @@ -861,167 +858,6 @@ mod test { assert_optimized_plan_equal(outer_query, expected) } - #[test] - fn limit_should_push_down_join_without_condition() -> Result<()> { - let table_scan_1 = test_table_scan()?; - let table_scan_2 = test_table_scan_with_name("test2")?; - let left_keys: Vec<&str> = Vec::new(); - let right_keys: Vec<&str> = Vec::new(); - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::Left, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n Left Join: \ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test2, fetch=1000"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::Right, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n Right Join: \ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test2, fetch=1000"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::Full, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n Full Join: \ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test2, fetch=1000"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::LeftSemi, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n LeftSemi Join: \ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n TableScan: test2"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::LeftAnti, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n LeftAnti Join: \ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n TableScan: test2"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1.clone()) - .join( - LogicalPlanBuilder::from(table_scan_2.clone()).build()?, - JoinType::RightSemi, - (left_keys.clone(), right_keys.clone()), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n RightSemi Join: \ - \n TableScan: test\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test2, fetch=1000"; - - assert_optimized_plan_equal(plan, expected)?; - - let plan = LogicalPlanBuilder::from(table_scan_1) - .join( - LogicalPlanBuilder::from(table_scan_2).build()?, - JoinType::RightAnti, - (left_keys, right_keys), - None, - )? - .limit(0, Some(1000))? - .build()?; - - let expected = "Limit: skip=0, fetch=1000\ - \n RightAnti Join: \ - \n TableScan: test\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test2, fetch=1000"; - - assert_optimized_plan_equal(plan, expected) - } - - #[test] - fn limit_should_push_down_left_outer_join() -> Result<()> { - let table_scan_1 = test_table_scan()?; - let table_scan_2 = test_table_scan_with_name("test2")?; - - let plan = LogicalPlanBuilder::from(table_scan_1) - .join( - LogicalPlanBuilder::from(table_scan_2).build()?, - JoinType::Left, - (vec!["a"], vec!["a"]), - None, - )? - .limit(0, Some(1000))? - .build()?; - - // Limit pushdown Not supported in Join - let expected = "Limit: skip=0, fetch=1000\ - \n Left Join: test.a = test2.a\ - \n Limit: skip=0, fetch=1000\ - \n TableScan: test, fetch=1000\ - \n TableScan: test2"; - - assert_optimized_plan_equal(plan, expected) - } - #[test] fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> { let table_scan_1 = test_table_scan()?; diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 33f10400d3417..499447861a58b 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -331,7 +331,11 @@ fn build_join( _ => { // if not correlated, group down to 1 row and left join on that (preserving row count) LogicalPlanBuilder::from(filter_input.clone()) - .join_on(sub_query_alias, JoinType::Left, None)? + .join_on( + sub_query_alias, + JoinType::Left, + vec![Expr::Literal(ScalarValue::Boolean(Some(true)))], + )? .build()? } } @@ -557,7 +561,7 @@ mod tests { // it will optimize, but fail for the same reason the unoptimized query would let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ \n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\ \n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\ @@ -589,7 +593,7 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ \n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\ \n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\ @@ -965,7 +969,7 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey < __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ \n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\ \n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\ @@ -996,7 +1000,7 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ \n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\ \n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\ @@ -1097,8 +1101,8 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey BETWEEN __scalar_sq_1.min(orders.o_custkey) AND __scalar_sq_2.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\ - \n Left Join: [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\ + \n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ \n SubqueryAlias: __scalar_sq_1 [min(orders.o_custkey):Int64;N]\ \n Projection: min(orders.o_custkey) [min(orders.o_custkey):Int64;N]\ diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index c3a28f050f5b1..ed49cfaca2197 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1452,11 +1452,6 @@ fn test_unnest_to_sql() { #[test] fn test_join_with_no_conditions() { - sql_round_trip( - GenericDialect {}, - "SELECT j1.j1_id, j1.j1_string FROM j1 JOIN j2", - "SELECT j1.j1_id, j1.j1_string FROM j1 CROSS JOIN j2", - ); sql_round_trip( GenericDialect {}, "SELECT j1.j1_id, j1.j1_string FROM j1 CROSS JOIN j2", diff --git a/datafusion/sqllogictest/test_files/join.slt.part b/datafusion/sqllogictest/test_files/join.slt.part index 21126a7479673..2bbf31dad93aa 100644 --- a/datafusion/sqllogictest/test_files/join.slt.part +++ b/datafusion/sqllogictest/test_files/join.slt.part @@ -625,6 +625,28 @@ FROM t1 ---- 11 11 11 +# join condition is required +# TODO: query error join condition should not be empty +# related to: https://github.com/apache/datafusion/issues/13486 +statement ok +SELECT * FROM t1 JOIN t2 + +# join condition is required +query error join condition should not be empty +SELECT * FROM t1 LEFT JOIN t2 + +# join condition is required +query error join condition should not be empty +SELECT * FROM t1 RIGHT JOIN t2 + +# join condition is required +query error join condition should not be empty +SELECT * FROM t1 FULL JOIN t2 + +# cross join no need for join condition +statement ok +SELECT * FROM t1 CROSS JOIN t2 + # multiple inner joins with mixed ON clause and filter query III rowsort SELECT t1.t1_id, t2.t2_id, t3.t3_id diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part index b5c4e61584783..1dba8c0537209 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part @@ -55,7 +55,7 @@ logical_plan 06)----------Inner Join: supplier.s_nationkey = nation.n_nationkey 07)------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey 08)--------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey -09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] +09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost], partial_filters=[Boolean(true)] 10)----------------TableScan: supplier projection=[s_suppkey, s_nationkey] 11)------------Projection: nation.n_nationkey 12)--------------Filter: nation.n_name = Utf8("GERMANY") diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part index e831b84d82dc2..e2bd651c4aa13 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part @@ -55,7 +55,7 @@ logical_plan 03)----Inner Join: revenue0.total_revenue = __scalar_sq_1.max(revenue0.total_revenue) 04)------Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue 05)--------Inner Join: supplier.s_suppkey = revenue0.supplier_no -06)----------TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] +06)----------TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone], partial_filters=[Boolean(true)] 07)----------SubqueryAlias: revenue0 08)------------Projection: lineitem.l_suppkey AS supplier_no, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue 09)--------------Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[sum(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part index 16cb644d4e5b0..828bf967d8f4a 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part @@ -65,7 +65,7 @@ logical_plan 07)------------Projection: customer.c_phone, customer.c_acctbal 08)--------------LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey 09)----------------Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")]) -10)------------------TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")])] +10)------------------TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")]), Boolean(true)] 11)----------------SubqueryAlias: __correlated_sq_1 12)------------------TableScan: orders projection=[o_custkey] 13)------------SubqueryAlias: __scalar_sq_2