diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 3e1e76d7fdfd2..95ddc47d95deb 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -46,6 +46,7 @@ use crate::{ }; use arrow::datatypes::*; use hashbrown::HashMap; +use log::Log; use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, @@ -55,6 +56,7 @@ use sqlparser::ast::{ use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{ObjectType, OrderByExpr, Statement}; use sqlparser::parser::ParserError::ParserError; +use sqlparser::test_utils::join; use super::{ parser::DFParser, @@ -692,6 +694,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // build join schema let mut fields = vec![]; for plan in &plans { + dbg!(plan); fields.extend_from_slice(plan.schema().fields()); } let join_schema = DFSchema::new(fields)?; @@ -701,47 +704,53 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // look for expressions of the form ` = ` let mut possible_join_keys = vec![]; extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?; - + dbg!(possible_join_keys.clone()); let mut all_join_keys = HashSet::new(); - let mut left = plans[0].clone(); - for right in plans.iter().skip(1) { - let left_schema = left.schema(); - let right_schema = right.schema(); - let mut join_keys = vec![]; - for (l, r) in &possible_join_keys { - if left_schema.field_from_column(l).is_ok() - && right_schema.field_from_column(r).is_ok() - { - join_keys.push((l.clone(), r.clone())); - } else if left_schema.field_from_column(r).is_ok() - && right_schema.field_from_column(l).is_ok() - { - join_keys.push((r.clone(), l.clone())); + let mut final_plan: LogicalPlan = plans[0].clone(); + for (idx, left) in plans.iter().enumerate() { + let mut left = left.clone(); + for right in plans.iter().skip(idx) { + let left_schema = left.schema(); + let right_schema = right.schema(); + let mut join_keys = vec![]; + for (l, r) in &possible_join_keys { + if left_schema.field_from_column(l).is_ok() + && right_schema.field_from_column(r).is_ok() + { + join_keys.push((l.clone(), r.clone())); + } else if left_schema.field_from_column(r).is_ok() + && right_schema.field_from_column(l).is_ok() + { + join_keys.push((r.clone(), l.clone())); + } + } + if join_keys.is_empty() { + continue; } - } - if join_keys.is_empty() { - left = - LogicalPlanBuilder::from(left).cross_join(right)?.build()?; - } else { let left_keys: Vec = join_keys.iter().map(|(l, _)| l.clone()).collect(); let right_keys: Vec = join_keys.iter().map(|(_, r)| r.clone()).collect(); - let builder = LogicalPlanBuilder::from(left); + let builder = LogicalPlanBuilder::from(left.clone()); left = builder - .join(right, JoinType::Inner, (left_keys, right_keys))? + .join(&right, JoinType::Inner, (left_keys, right_keys))? .build()?; + all_join_keys.extend(join_keys); + } + if idx == 0 { + final_plan = left.clone(); + } else { + // todo! } - - all_join_keys.extend(join_keys); } + dbg!(final_plan.clone()); // remove join expressions from filter match remove_join_expressions(&filter_expr, &all_join_keys)? { - Some(filter_expr) => { - LogicalPlanBuilder::from(left).filter(filter_expr)?.build() - } - _ => Ok(left), + Some(filter_expr) => LogicalPlanBuilder::from(final_plan) + .filter(filter_expr)? + .build(), + _ => Ok(final_plan), } } None => {