From 02c24d58074c254445da5e1ccf6365917ba520ec Mon Sep 17 00:00:00 2001 From: yuuch Date: Sun, 23 Oct 2022 16:46:26 +0800 Subject: [PATCH 1/5] remove unused code for select plan --- datafusion/sql/src/planner.rs | 89 +++++++++-------------------------- 1 file changed, 21 insertions(+), 68 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index b51ce83d5d01e..756b9735d0899 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -854,6 +854,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { outer_query_schema: Option<&DFSchema>, ctes: &mut HashMap, ) -> Result { + + let cross_join_plan = if plans.len() == 1 { + plans[0].clone() + } else { + let mut left = plans[0].clone(); + for right in plans.iter().skip(1) { + left = + LogicalPlanBuilder::from(left).cross_join(right)?.build()?; + } + left + }; match selection { Some(predicate_expr) => { // build join schema @@ -869,10 +880,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?; - + /* // look for expressions of the form ` = ` let mut possible_join_keys = vec![]; extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?; + */ + + Ok(LogicalPlan::Filter(Filter::try_new( + filter_expr, + Arc::new(cross_join_plan), + )?)) + /* let mut all_join_keys = HashSet::new(); @@ -1001,18 +1019,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } _ => Ok(left), } + */ } None => { - if plans.len() == 1 { - Ok(plans[0].clone()) - } else { - let mut left = plans[0].clone(); - for right in plans.iter().skip(1) { - left = - LogicalPlanBuilder::from(left).cross_join(right)?.build()?; - } - Ok(left) - } + Ok(cross_join_plan) } } } @@ -2683,40 +2693,6 @@ fn normalize_sql_object_name(sql_object_name: &ObjectName) -> String { .join(".") } -/// Remove join expressions from a filter expression -fn remove_join_expressions( - expr: &Expr, - join_columns: &HashSet<(Column, Column)>, -) -> Result> { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - Operator::Eq => match (left.as_ref(), right.as_ref()) { - (Expr::Column(l), Expr::Column(r)) => { - if join_columns.contains(&(l.clone(), r.clone())) - || join_columns.contains(&(r.clone(), l.clone())) - { - Ok(None) - } else { - Ok(Some(expr.clone())) - } - } - _ => Ok(Some(expr.clone())), - }, - Operator::And => { - let l = remove_join_expressions(left, join_columns)?; - let r = remove_join_expressions(right, join_columns)?; - match (l, r) { - (Some(ll), Some(rr)) => Ok(Some(and(ll, rr))), - (Some(ll), _) => Ok(Some(ll)), - (_, Some(rr)) => Ok(Some(rr)), - _ => Ok(None), - } - } - _ => Ok(Some(expr.clone())), - }, - _ => Ok(Some(expr.clone())), - } -} /// Extracts equijoin ON condition be a single Eq or multiple conjunctive Eqs /// Filters matching this pattern are added to `accum` @@ -2784,29 +2760,6 @@ fn extract_join_keys( } } -/// Extract join keys from a WHERE clause -fn extract_possible_join_keys( - expr: &Expr, - accum: &mut Vec<(Column, Column)>, -) -> Result<()> { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - Operator::Eq => match (left.as_ref(), right.as_ref()) { - (Expr::Column(l), Expr::Column(r)) => { - accum.push((l.clone(), r.clone())); - Ok(()) - } - _ => Ok(()), - }, - Operator::And => { - extract_possible_join_keys(left, accum)?; - extract_possible_join_keys(right, accum) - } - _ => Ok(()), - }, - _ => Ok(()), - } -} /// Convert SQL simple data type to relational representation of data type pub fn convert_simple_data_type(sql_type: &SQLDataType) -> Result { From 4f04ebb3e96a0661b155d30cb8aae0dac2bdd763 Mon Sep 17 00:00:00 2001 From: yuuch Date: Fri, 28 Oct 2022 11:49:59 +0800 Subject: [PATCH 2/5] fmt and clippy --- datafusion/sql/src/planner.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 756b9735d0899..1f25ebe914d1e 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -35,7 +35,7 @@ use datafusion_expr::utils::{ COUNT_STAR_EXPANSION, }; use datafusion_expr::{ - and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, GetIndexedField, + col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, GetIndexedField, Operator, ScalarUDF, WindowFrame, WindowFrameUnits, }; use datafusion_expr::{ @@ -854,14 +854,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { outer_query_schema: Option<&DFSchema>, ctes: &mut HashMap, ) -> Result { - let cross_join_plan = if plans.len() == 1 { plans[0].clone() } else { let mut left = plans[0].clone(); for right in plans.iter().skip(1) { - left = - LogicalPlanBuilder::from(left).cross_join(right)?.build()?; + left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?; } left }; @@ -1021,9 +1019,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } */ } - None => { - Ok(cross_join_plan) - } + None => Ok(cross_join_plan), } } @@ -2693,7 +2689,6 @@ fn normalize_sql_object_name(sql_object_name: &ObjectName) -> String { .join(".") } - /// Extracts equijoin ON condition be a single Eq or multiple conjunctive Eqs /// Filters matching this pattern are added to `accum` /// Filters that don't match this pattern are added to `accum_filter` @@ -2760,7 +2755,6 @@ fn extract_join_keys( } } - /// Convert SQL simple data type to relational representation of data type pub fn convert_simple_data_type(sql_type: &SQLDataType) -> Result { match sql_type { From 33146b89c380e20a0151de9605757c44aecb7661 Mon Sep 17 00:00:00 2001 From: yuuch Date: Fri, 28 Oct 2022 12:18:25 +0800 Subject: [PATCH 3/5] remove some useless comments --- datafusion/sql/src/planner.rs | 135 ---------------------------------- 1 file changed, 135 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 1f25ebe914d1e..8e22e647141b3 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -878,146 +878,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?; - /* - // look for expressions of the form ` = ` - let mut possible_join_keys = vec![]; - extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?; - */ Ok(LogicalPlan::Filter(Filter::try_new( filter_expr, Arc::new(cross_join_plan), )?)) - /* - - let mut all_join_keys = HashSet::new(); - - let orig_plans = plans.clone(); - let mut plans = plans.into_iter(); - let mut left = plans.next().unwrap(); // have at least one plan - - // List of the plans that have not yet been joined - let mut remaining_plans: Vec> = - plans.into_iter().map(Some).collect(); - - // Take from the list of remaining plans, - loop { - let mut join_keys = vec![]; - - // Search all remaining plans for the next to - // join. Prefer the first one that has a join - // predicate in the predicate lists - let plan_with_idx = - remaining_plans.iter().enumerate().find(|(_idx, plan)| { - // skip plans that have been joined already - let plan = if let Some(plan) = plan { - plan - } else { - return false; - }; - - // can we find a match? - let left_schema = left.schema(); - let right_schema = plan.schema(); - for (l, r) in &possible_join_keys { - if left_schema.field_from_column(l).is_ok() - && right_schema.field_from_column(r).is_ok() - && can_hash( - left_schema - .field_from_column(l) - .unwrap() // the result must be OK - .data_type(), - ) - { - join_keys.push((l.clone(), r.clone())); - } else if left_schema.field_from_column(r).is_ok() - && right_schema.field_from_column(l).is_ok() - && can_hash( - left_schema - .field_from_column(r) - .unwrap() // the result must be OK - .data_type(), - ) - { - join_keys.push((r.clone(), l.clone())); - } - } - // stop if we found join keys - !join_keys.is_empty() - }); - - // If we did not find join keys, either there are - // no more plans, or we can't find any plans that - // can be joined with predicates - if join_keys.is_empty() { - assert!(plan_with_idx.is_none()); - - // pick the first non null plan to join - let plan_with_idx = remaining_plans - .iter() - .enumerate() - .find(|(_idx, plan)| plan.is_some()); - if let Some((idx, _)) = plan_with_idx { - let plan = std::mem::take(&mut remaining_plans[idx]).unwrap(); - left = LogicalPlanBuilder::from(left) - .cross_join(&plan)? - .build()?; - } else { - // no more plans to join - break; - } - } else { - // have a plan - let (idx, _) = plan_with_idx.expect("found plan node"); - let plan = std::mem::take(&mut remaining_plans[idx]).unwrap(); - - let left_keys: Vec = - join_keys.iter().map(|(l, _)| l.clone()).collect(); - let right_keys: Vec = - join_keys.iter().map(|(_, r)| r.clone()).collect(); - let builder = LogicalPlanBuilder::from(left); - left = builder - .join(&plan, JoinType::Inner, (left_keys, right_keys), None)? - .build()?; - } - - all_join_keys.extend(join_keys); - } - - // remove join expressions from filter - match remove_join_expressions(&filter_expr, &all_join_keys)? { - Some(filter_expr) => { - // this logic is adapted from [`LogicalPlanBuilder::filter`] to take - // the query outer schema into account so that joins in subqueries - // can reference outer query fields. - let mut all_schemas: Vec = vec![]; - for plan in orig_plans { - for schema in plan.all_schemas() { - all_schemas.push(schema.clone()); - } - } - if let Some(outer_query_schema) = outer_query_schema { - all_schemas.push(Arc::new(outer_query_schema.clone())); - } - let mut join_columns = HashSet::new(); - for (l, r) in &all_join_keys { - join_columns.insert(l.clone()); - join_columns.insert(r.clone()); - } - let x: Vec<&DFSchemaRef> = all_schemas.iter().collect(); - let filter_expr = normalize_col_with_schemas( - filter_expr, - x.as_slice(), - &[join_columns], - )?; - Ok(LogicalPlan::Filter(Filter::try_new( - filter_expr, - Arc::new(left), - )?)) - } - _ => Ok(left), - } - */ } None => Ok(cross_join_plan), } From 4e3d95819708149874c56376a1adb6d0fdb0a44a Mon Sep 17 00:00:00 2001 From: yuuch Date: Sat, 29 Oct 2022 23:51:34 +0800 Subject: [PATCH 4/5] fix ut --- datafusion/sql/src/planner.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 8e22e647141b3..9740ec0d2e812 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -867,17 +867,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(predicate_expr) => { // build join schema let mut fields = vec![]; - let mut metadata = std::collections::HashMap::new(); + let mut metadata = HashMap::new(); for plan in &plans { fields.extend_from_slice(plan.schema().fields()); metadata.extend(plan.schema().metadata().clone()); } let mut join_schema = DFSchema::new_with_metadata(fields, metadata)?; + let mut all_schemas: Vec = vec![]; + for plan in plans { + for schema in plan.all_schemas() { + all_schemas.push(schema.clone()); + } + } if let Some(outer) = outer_query_schema { + all_schemas.push(Arc::new(outer.clone())); join_schema.merge(outer); } + let x: Vec<&DFSchemaRef> = all_schemas.iter().collect(); let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?; + let mut using_columns = HashSet::new(); + expr_to_columns(&filter_expr, &mut using_columns)?; + let filter_expr = normalize_col_with_schemas( + filter_expr, + x.as_slice(), + &[using_columns], + )?; Ok(LogicalPlan::Filter(Filter::try_new( filter_expr, From c46ede18a02cc8ddf698667392ae9097714c615e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 5 Nov 2022 13:23:23 +0100 Subject: [PATCH 5/5] Test --- datafusion/core/tests/sql/subqueries.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 4fb97d5eb3bbf..eca64014ff5d2 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -144,12 +144,12 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#; Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value Inner Join: nation.n_regionkey = region.r_regionkey Inner Join: supplier.s_nationkey = nation.n_nationkey - Inner Join: partsupp.ps_suppkey = supplier.s_suppkey - Inner Join: part.p_partkey = partsupp.ps_partkey + Inner Join: part.p_partkey = partsupp.ps_partkey, supplier.s_suppkey = partsupp.ps_suppkey + CrossJoin: Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS") TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")] - TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] TableScan: nation projection=[n_nationkey, n_name, n_regionkey] Filter: region.r_name = Utf8("EUROPE") TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]