From 68d8efccd4c0d8baaf15e0833909b496ee71dc9f Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 5 Jul 2021 00:11:03 -0700 Subject: [PATCH 1/2] dedup using join column in wildcard expansion --- datafusion/src/logical_plan/builder.rs | 57 +++++++++++++++++++++++--- datafusion/src/logical_plan/expr.rs | 2 +- datafusion/src/logical_plan/plan.rs | 2 +- datafusion/src/sql/planner.rs | 16 +++++++- datafusion/src/sql/utils.rs | 48 ++++++++++++++++++---- 5 files changed, 110 insertions(+), 15 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 41f29c4b99052..29d71a4636773 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -17,7 +17,10 @@ //! This module provides a builder for creating LogicalPlans -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use arrow::{ datatypes::{Schema, SchemaRef}, @@ -220,10 +223,33 @@ impl LogicalPlanBuilder { for e in expr { match e { Expr::Wildcard => { - (0..input_schema.fields().len()).for_each(|i| { - projected_expr - .push(Expr::Column(input_schema.field(i).qualified_column())) - }); + let columns_to_skip = self + .plan + .using_columns()? + .into_iter() + // For each USING JOIN condition, only expand to one column in projection + .map(|cols| { + let mut cols = cols.into_iter().collect::>(); + // sort join columns to make sure we consistently keep the same + // qualified column + cols.sort(); + cols.into_iter().skip(1) + }) + .flatten() + .collect::>(); + + if columns_to_skip.is_empty() { + input_schema.fields().iter().for_each(|f| { + projected_expr.push(Expr::Column(f.qualified_column())) + }) + } else { + input_schema.fields().iter().for_each(|f| { + let col = f.qualified_column(); + if !columns_to_skip.contains(&col) { + projected_expr.push(Expr::Column(col)) + } + }) + } } _ => projected_expr .push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)), @@ -587,6 +613,27 @@ mod tests { Ok(()) } + #[test] + fn plan_using_join_wildcard_projection() -> Result<()> { + let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)? + .build()?; + + let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)? + .join_using(&t2, JoinType::Inner, vec!["id"])? + .project(vec![Expr::Wildcard])? + .build()?; + + // id column should only show up once in projection + let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\ + \n Join: Using #t1.id = #t2.id\ + \n TableScan: t1 projection=None\ + \n TableScan: t2 projection=None"; + + assert_eq!(expected, format!("{:?}", plan)); + + Ok(()) + } + #[test] fn plan_builder_union_combined_single_union() -> Result<()> { let plan = LogicalPlanBuilder::scan_empty( diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 59c99797e0cd8..2eee140f47fe5 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -34,7 +34,7 @@ use std::fmt; use std::sync::Arc; /// A named reference to a qualified field in a schema. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Column { /// relation/table name. pub relation: Option, diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index b954b6a97950c..2504dfaa6f236 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -25,8 +25,8 @@ use crate::error::DataFusionError; use crate::logical_plan::dfschema::DFSchemaRef; use crate::sql::parser::FileType; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use std::collections::HashSet; use std::{ + collections::HashSet, fmt::{self, Display}, sync::Arc, }; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index f89ba3f659c88..75d90b47328c4 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -682,13 +682,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { projection: &[SelectItem], ) -> Result> { let input_schema = plan.schema(); + let using_columns = plan.using_columns()?; projection .iter() .map(|expr| self.sql_select_to_rex(expr, input_schema)) .collect::>>()? .iter() - .flat_map(|expr| expand_wildcard(expr, input_schema)) + .flat_map(|expr| expand_wildcard(expr, input_schema, &using_columns)) .map(|expr| normalize_col(expr, plan)) .collect::>>() } @@ -2773,6 +2774,19 @@ mod tests { quick_test(sql, expected); } + #[test] + fn project_wildcard_on_join_with_using() { + let sql = "SELECT * \ + FROM lineitem \ + JOIN lineitem as lineitem2 \ + USING (l_item_id)"; + let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\ + \n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\ + \n TableScan: lineitem projection=None\ + \n TableScan: lineitem2 projection=None"; + quick_test(sql, expected); + } + #[test] fn equijoin_explicit_syntax_3_tables() { let sql = "SELECT id, order_id, l_description \ diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 28243360c412f..d63ed82e342d9 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -23,16 +23,50 @@ use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, ExpressionVisitor, Recursion}, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. -pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec { +pub(crate) fn expand_wildcard( + expr: &Expr, + schema: &DFSchema, + using_columns: &[HashSet], +) -> Vec { match expr { - Expr::Wildcard => schema - .fields() - .iter() - .map(|f| Expr::Column(f.qualified_column())) - .collect::>(), + Expr::Wildcard => { + let columns_to_skip = using_columns + .iter() + // For each USING JOIN condition, only expand to one column in projection + .map(|cols| { + let mut cols = cols.iter().collect::>(); + // sort join columns to make sure we consistently keep the same + // qualified column + cols.sort(); + cols.into_iter().skip(1) + }) + .flatten() + .collect::>(); + + if columns_to_skip.is_empty() { + schema + .fields() + .iter() + .map(|f| Expr::Column(f.qualified_column())) + .collect::>() + } else { + schema + .fields() + .iter() + .filter_map(|f| { + let col = f.qualified_column(); + if !columns_to_skip.contains(&col) { + Some(Expr::Column(col)) + } else { + None + } + }) + .collect::>() + } + } _ => vec![expr.clone()], } } From 72a747967c9e1d51b87f1156c4caa28a614c168e Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 12 Jul 2021 23:30:21 -0700 Subject: [PATCH 2/2] reuse expand_wildcard in logical plan builder --- datafusion/src/logical_plan/builder.rs | 69 ++++++++++++++++---------- datafusion/src/logical_plan/mod.rs | 2 +- datafusion/src/sql/planner.rs | 22 +++++--- datafusion/src/sql/utils.rs | 50 +------------------ 4 files changed, 60 insertions(+), 83 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 29d71a4636773..85c4aea99ff5f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -223,33 +223,7 @@ impl LogicalPlanBuilder { for e in expr { match e { Expr::Wildcard => { - let columns_to_skip = self - .plan - .using_columns()? - .into_iter() - // For each USING JOIN condition, only expand to one column in projection - .map(|cols| { - let mut cols = cols.into_iter().collect::>(); - // sort join columns to make sure we consistently keep the same - // qualified column - cols.sort(); - cols.into_iter().skip(1) - }) - .flatten() - .collect::>(); - - if columns_to_skip.is_empty() { - input_schema.fields().iter().for_each(|f| { - projected_expr.push(Expr::Column(f.qualified_column())) - }) - } else { - input_schema.fields().iter().for_each(|f| { - let col = f.qualified_column(); - if !columns_to_skip.contains(&col) { - projected_expr.push(Expr::Column(col)) - } - }) - } + projected_expr.extend(expand_wildcard(input_schema, &self.plan)?) } _ => projected_expr .push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)), @@ -534,6 +508,47 @@ pub fn union_with_alias( }) } +/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. +pub(crate) fn expand_wildcard( + schema: &DFSchema, + plan: &LogicalPlan, +) -> Result> { + let using_columns = plan.using_columns()?; + let columns_to_skip = using_columns + .into_iter() + // For each USING JOIN condition, only expand to one column in projection + .map(|cols| { + let mut cols = cols.into_iter().collect::>(); + // sort join columns to make sure we consistently keep the same + // qualified column + cols.sort(); + cols.into_iter().skip(1) + }) + .flatten() + .collect::>(); + + if columns_to_skip.is_empty() { + Ok(schema + .fields() + .iter() + .map(|f| Expr::Column(f.qualified_column())) + .collect::>()) + } else { + Ok(schema + .fields() + .iter() + .filter_map(|f| { + let col = f.qualified_column(); + if !columns_to_skip.contains(&col) { + Some(Expr::Column(col)) + } else { + None + } + }) + .collect::>()) + } +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field}; diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index 2c751abdad349..f381e316669e4 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -21,7 +21,7 @@ //! Logical query plans can then be optimized and executed directly, or translated into //! physical query plans and executed. -mod builder; +pub(crate) mod builder; mod dfschema; mod display; mod expr; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 75d90b47328c4..41b4e20f15f3d 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -27,8 +27,9 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan, - LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema, + and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, + DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, + ToDFSchema, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError; use super::{ parser::DFParser, utils::{ - can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases, + can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases, find_aggregate_exprs, find_column_exprs, find_window_exprs, group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs, resolve_positions_to_exprs, @@ -682,15 +683,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { projection: &[SelectItem], ) -> Result> { let input_schema = plan.schema(); - let using_columns = plan.using_columns()?; projection .iter() .map(|expr| self.sql_select_to_rex(expr, input_schema)) .collect::>>()? - .iter() - .flat_map(|expr| expand_wildcard(expr, input_schema, &using_columns)) - .map(|expr| normalize_col(expr, plan)) + .into_iter() + .map(|expr| { + Ok(match expr { + Expr::Wildcard => expand_wildcard(input_schema, plan)?, + _ => vec![normalize_col(expr, plan)?], + }) + }) + .flat_map(|res| match res { + Ok(v) => v.into_iter().map(Ok).collect(), + Err(e) => vec![Err(e)], + }) .collect::>>() } diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index d63ed82e342d9..41bcd205800df 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -17,59 +17,13 @@ //! SQL Utility Functions -use crate::logical_plan::{DFSchema, Expr, LogicalPlan}; +use crate::logical_plan::{Expr, LogicalPlan}; use crate::scalar::ScalarValue; use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, ExpressionVisitor, Recursion}, }; -use std::collections::{HashMap, HashSet}; - -/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. -pub(crate) fn expand_wildcard( - expr: &Expr, - schema: &DFSchema, - using_columns: &[HashSet], -) -> Vec { - match expr { - Expr::Wildcard => { - let columns_to_skip = using_columns - .iter() - // For each USING JOIN condition, only expand to one column in projection - .map(|cols| { - let mut cols = cols.iter().collect::>(); - // sort join columns to make sure we consistently keep the same - // qualified column - cols.sort(); - cols.into_iter().skip(1) - }) - .flatten() - .collect::>(); - - if columns_to_skip.is_empty() { - schema - .fields() - .iter() - .map(|f| Expr::Column(f.qualified_column())) - .collect::>() - } else { - schema - .fields() - .iter() - .filter_map(|f| { - let col = f.qualified_column(); - if !columns_to_skip.contains(&col) { - Some(Expr::Column(col)) - } else { - None - } - }) - .collect::>() - } - } - _ => vec![expr.clone()], - } -} +use std::collections::HashMap; /// Collect all deeply nested `Expr::AggregateFunction` and /// `Expr::AggregateUDF`. They are returned in order of occurrence (depth