From e50da3cfe6ee7030e77f791946644b9c9cffc362 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 May 2022 08:47:20 -0600 Subject: [PATCH 1/3] Move LogicalPlanBuilder to datafusion-expr crate --- datafusion/core/src/logical_plan/expr.rs | 99 +---- datafusion/core/src/logical_plan/mod.rs | 26 +- .../src/optimizer/common_subexpr_eliminate.rs | 4 +- .../core/src/optimizer/eliminate_filter.rs | 4 +- .../core/src/optimizer/eliminate_limit.rs | 4 +- .../core/src/optimizer/filter_push_down.rs | 8 +- .../core/src/optimizer/limit_push_down.rs | 4 +- .../src/optimizer/projection_push_down.rs | 14 +- .../src/optimizer/simplify_expressions.rs | 4 +- .../optimizer/single_distinct_to_groupby.rs | 6 +- datafusion/core/src/optimizer/utils.rs | 204 +--------- datafusion/core/src/sql/planner.rs | 19 +- datafusion/core/src/sql/utils.rs | 57 +-- datafusion/expr/src/lib.rs | 2 +- .../src/logical_plan/builder.rs | 89 +++-- datafusion/expr/src/logical_plan/mod.rs | 2 + datafusion/expr/src/utils.rs | 347 +++++++++++++++++- 17 files changed, 458 insertions(+), 435 deletions(-) rename datafusion/{core => expr}/src/logical_plan/builder.rs (95%) diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index 6d90c78f171af..f8d7f46c67290 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -20,18 +20,14 @@ pub use super::Operator; use crate::error::Result; -use crate::logical_plan::ExprSchemable; -use crate::logical_plan::{DFField, DFSchema}; -use crate::sql::utils::find_columns_referenced_by_expr; use arrow::datatypes::DataType; pub use datafusion_common::{Column, ExprSchema}; pub use datafusion_expr::expr_fn::*; -use datafusion_expr::logical_plan::Aggregate; +use datafusion_expr::AccumulatorFunctionImplementation; use datafusion_expr::BuiltinScalarFunction; pub use datafusion_expr::Expr; use datafusion_expr::StateTypeFunction; pub use datafusion_expr::{lit, lit_timestamp_nano, Literal}; -use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan}; use datafusion_expr::{AggregateUDF, ScalarUDF}; use datafusion_expr::{ ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility, @@ -52,39 +48,6 @@ pub fn combine_filters(filters: &[Expr]) -> Option { Some(combined_filter) } -/// Convert an expression into Column expression if it's already provided as input plan. -/// -/// For example, it rewrites: -/// -/// ```text -/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])? -/// .project(vec![col("c1"), sum(col("c2"))? -/// ``` -/// -/// Into: -/// -/// ```text -/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])? -/// .project(vec![col("c1"), col("SUM(#c2)")? -/// ``` -pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { - match e { - Expr::Column(_) => e, - Expr::Alias(inner_expr, name) => { - Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name) - } - Expr::ScalarSubquery(_) => e.clone(), - _ => match e.name(input_schema) { - Ok(name) => match input_schema.field_with_unqualified_name(&name) { - Ok(field) => Expr::Column(field.qualified_column()), - // expression not provided as input, do not convert to a column reference - Err(_) => e, - }, - Err(_) => e, - }, - } -} - /// Recursively un-alias an expressions #[inline] pub fn unalias(expr: Expr) -> Expr { @@ -137,66 +100,6 @@ pub fn create_udaf( ) } -/// Find all columns referenced from an aggregate query -fn agg_cols(agg: &Aggregate) -> Result> { - Ok(agg - .aggr_expr - .iter() - .chain(&agg.group_expr) - .flat_map(find_columns_referenced_by_expr) - .collect()) -} - -fn exprlist_to_fields_aggregate( - exprs: &[Expr], - plan: &LogicalPlan, - agg: &Aggregate, -) -> Result> { - let agg_cols = agg_cols(agg)?; - let mut fields = vec![]; - for expr in exprs { - match expr { - Expr::Column(c) if agg_cols.iter().any(|x| x == c) => { - // resolve against schema of input to aggregate - fields.push(expr.to_field(agg.input.schema())?); - } - _ => fields.push(expr.to_field(plan.schema())?), - } - } - Ok(fields) -} - -/// Create field meta-data from an expression, for use in a result set schema -pub fn exprlist_to_fields<'a>( - expr: impl IntoIterator, - plan: &LogicalPlan, -) -> Result> { - let exprs: Vec = expr.into_iter().cloned().collect(); - // when dealing with aggregate plans we cannot simply look in the aggregate output schema - // because it will contain columns representing complex expressions (such a column named - // `#GROUPING(person.state)` so in order to resolve `person.state` in this case we need to - // look at the input to the aggregate instead. - let fields = match plan { - LogicalPlan::Aggregate(agg) => { - Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) - } - LogicalPlan::Window(window) => match window.input.as_ref() { - LogicalPlan::Aggregate(agg) => { - Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) - } - _ => None, - }, - _ => None, - }; - if let Some(fields) = fields { - fields - } else { - // look for exact match in plan's output schema - let input_schema = &plan.schema(); - exprs.iter().map(|e| e.to_field(input_schema)).collect() - } -} - /// Calls a named built in function /// ``` /// use datafusion::logical_plan::*; diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index e9496c8f254a8..0a97314412deb 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -21,35 +21,33 @@ //! Logical query plans can then be optimized and executed directly, or translated into //! physical query plans and executed. -pub(crate) mod builder; mod expr; mod expr_simplier; pub mod plan; mod registry; pub mod window_frames; -pub use builder::{ - build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE, -}; pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema}; pub use datafusion_expr::{ expr_fn::binary_expr, expr_rewriter, expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, + logical_plan::builder::{ + build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE, + }, ExprSchemable, Operator, }; pub use expr::{ abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan, avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col, - columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, - count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, - exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap, left, length, - lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, - not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or, power, random, - regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, - scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, - starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, - to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, - when, Column, Expr, ExprSchema, Literal, + combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count, + count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp, + floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln, + log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, + now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace, + repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, + sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, + to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, + trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, }; pub use expr_rewriter::{ normalize_col, normalize_col_with_schemas, normalize_cols, replace_col, diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index af0eea66321c8..81183f56d97c0 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -27,9 +27,9 @@ use crate::logical_plan::{ ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, }; use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; use arrow::datatypes::DataType; use datafusion_expr::expr::GroupingSet; +use datafusion_expr::utils::from_plan; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -238,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } } } diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs index 800963ef550f3..fb99a97988d5a 100644 --- a/datafusion/core/src/optimizer/eliminate_filter.rs +++ b/datafusion/core/src/optimizer/eliminate_filter.rs @@ -19,6 +19,7 @@ //! This saves time in planning and executing the query. //! Note that this rule should be applied after simplify expressions optimizer rule. use datafusion_common::ScalarValue; +use datafusion_expr::utils::from_plan; use datafusion_expr::Expr; use crate::error::Result; @@ -26,7 +27,6 @@ use crate::logical_plan::plan::Filter; use crate::logical_plan::{EmptyRelation, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; -use super::utils; use crate::execution::context::ExecutionProps; /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation] @@ -68,7 +68,7 @@ impl OptimizerRule for EliminateFilter { .map(|plan| self.optimize(plan, execution_props)) .collect::>>()?; - utils::from_plan(plan, &plan.expressions(), &new_inputs) + from_plan(plan, &plan.expressions(), &new_inputs) } } } diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs index c1fc2068d3250..a7acf7ca6b561 100644 --- a/datafusion/core/src/optimizer/eliminate_limit.rs +++ b/datafusion/core/src/optimizer/eliminate_limit.rs @@ -20,8 +20,8 @@ use crate::error::Result; use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; +use datafusion_expr::utils::from_plan; -use super::utils; use crate::execution::context::ExecutionProps; /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] @@ -59,7 +59,7 @@ impl OptimizerRule for EliminateLimit { .map(|plan| self.optimize(plan, execution_props)) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } } } diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 7a0383463a10b..45b1fde367c36 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -24,7 +24,7 @@ use crate::logical_plan::{ use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; -use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns}; +use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, from_plan}; use std::collections::{HashMap, HashSet}; /// Filter Push Down optimizer rule pushes filter clauses down the plan @@ -90,7 +90,7 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result { .collect::>>()?; let expr = plan.expressions(); - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } // remove all filters from `filters` that are in `predicate_columns` @@ -246,7 +246,7 @@ fn optimize_join( // create a new Join with the new `left` and `right` let expr = plan.expressions(); - let plan = utils::from_plan(plan, &expr, &[left, right])?; + let plan = from_plan(plan, &expr, &[left, right])?; if to_keep.0.is_empty() { Ok(plan) @@ -334,7 +334,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // optimize inner let new_input = optimize(input, state)?; - utils::from_plan(plan, expr, &[new_input]) + from_plan(plan, expr, &[new_input]) } LogicalPlan::Aggregate(Aggregate { aggr_expr, input, .. diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index a52fd40df8019..087ac53c8c1a5 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -17,7 +17,6 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) -use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::Projection; @@ -25,6 +24,7 @@ use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; use datafusion_expr::logical_plan::Offset; +use datafusion_expr::utils::from_plan; use std::sync::Arc; /// Optimization rule that tries pushes down LIMIT n @@ -171,7 +171,7 @@ fn limit_push_down( }) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } } } diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index cf14adcd18035..5a127df6da971 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -28,10 +28,11 @@ use crate::logical_plan::{ LogicalPlanBuilder, ToDFSchema, Union, }; use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; -use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs}; +use datafusion_expr::utils::{ + expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan, +}; use datafusion_expr::Expr; use std::{ collections::{BTreeSet, HashSet}, @@ -458,7 +459,7 @@ fn optimize_plan( _execution_props, )?]; let expr = vec![]; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } _ => Err(DataFusionError::Plan( "SubqueryAlias should only wrap TableScan".to_string(), @@ -502,7 +503,7 @@ fn optimize_plan( }) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } } } @@ -513,12 +514,11 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::logical_plan::{ - col, exprlist_to_fields, lit, max, min, Expr, JoinType, LogicalPlanBuilder, - }; + use crate::logical_plan::{col, lit, max, min, Expr, JoinType, LogicalPlanBuilder}; use crate::test::*; use crate::test_util::scan_empty; use arrow::datatypes::DataType; + use datafusion_expr::utils::exprlist_to_fields; #[test] fn aggregate_no_group_by() -> Result<()> { diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 216af223a07c5..8c628906ac99c 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -25,13 +25,13 @@ use crate::logical_plan::{ LogicalPlan, RewriteRecursion, SimplifyInfo, }; use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; use crate::physical_plan::planner::create_physical_expr; use crate::scalar::ScalarValue; use crate::{error::Result, logical_plan::Operator}; use arrow::array::new_null_array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_expr::utils::from_plan; use datafusion_expr::Volatility; /// Provides simplification information based on schema and properties @@ -234,7 +234,7 @@ impl OptimizerRule for SimplifyExpressions { }) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } } diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index dfbefa63acd8f..65ff565562341 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -21,9 +21,9 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Aggregate, Projection}; use crate::logical_plan::ExprSchemable; -use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan}; +use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; +use datafusion_expr::utils::{columnize_expr, from_plan}; use hashbrown::HashSet; use std::sync::Arc; @@ -155,7 +155,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result { .iter() .map(|plan| optimize(plan)) .collect::>>()?; - utils::from_plan(plan, &expr, &new_inputs) + from_plan(plan, &expr, &new_inputs) } fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 96ba969c4f050..81d4b76a26eba 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -19,21 +19,14 @@ use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; -use datafusion_expr::logical_plan::{ - Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery, - SubqueryAlias, Window, -}; +use datafusion_expr::logical_plan::Filter; use crate::error::{DataFusionError, Result}; -use crate::logical_plan::{ - and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr, Limit, - LogicalPlan, LogicalPlanBuilder, Offset, Operator, Partitioning, Repartition, Union, - Values, -}; +use crate::logical_plan::{and, Expr, LogicalPlan, Operator}; use crate::prelude::lit; use crate::scalar::ScalarValue; -use datafusion_common::DFSchema; use datafusion_expr::expr::GroupingSet; +use datafusion_expr::utils::from_plan; use std::sync::Arc; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; @@ -61,197 +54,6 @@ pub fn optimize_children( from_plan(plan, &new_exprs, &new_inputs) } -/// Returns a new logical plan based on the original one with inputs -/// and expressions replaced. -/// -/// The exprs correspond to the same order of expressions returned by -/// `LogicalPlan::expressions`. This function is used in optimizers in -/// the following way: -/// -/// ```text -/// let new_inputs = optimize_children(..., plan, props); -/// -/// // get the plans expressions to optimize -/// let exprs = plan.expressions(); -/// -/// // potentially rewrite plan expressions -/// let rewritten_exprs = rewrite_exprs(exprs); -/// -/// // create new plan using rewritten_exprs in same position -/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs); -/// ``` -pub fn from_plan( - plan: &LogicalPlan, - expr: &[Expr], - inputs: &[LogicalPlan], -) -> Result { - match plan { - LogicalPlan::Projection(Projection { schema, alias, .. }) => { - Ok(LogicalPlan::Projection(Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - })) - } - LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { - schema: schema.clone(), - values: expr - .chunks_exact(schema.fields().len()) - .map(|s| s.to_vec()) - .collect::>(), - })), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter { - predicate: expr[0].clone(), - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Repartition(Repartition { - partitioning_scheme, - .. - }) => match partitioning_scheme { - Partitioning::RoundRobinBatch(n) => { - Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::RoundRobinBatch(*n), - input: Arc::new(inputs[0].clone()), - })) - } - Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition { - partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n), - input: Arc::new(inputs[0].clone()), - })), - }, - LogicalPlan::Window(Window { - window_expr, - schema, - .. - }) => Ok(LogicalPlan::Window(Window { - input: Arc::new(inputs[0].clone()), - window_expr: expr[0..window_expr.len()].to_vec(), - schema: schema.clone(), - })), - LogicalPlan::Aggregate(Aggregate { - group_expr, schema, .. - }) => Ok(LogicalPlan::Aggregate(Aggregate { - group_expr: expr[0..group_expr.len()].to_vec(), - aggr_expr: expr[group_expr.len()..].to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - })), - LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Join(Join { - join_type, - join_constraint, - on, - null_equals_null, - .. - }) => { - let schema = - build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; - Ok(LogicalPlan::Join(Join { - left: Arc::new(inputs[0].clone()), - right: Arc::new(inputs[1].clone()), - join_type: *join_type, - join_constraint: *join_constraint, - on: on.clone(), - schema: DFSchemaRef::new(schema), - null_equals_null: *null_equals_null, - })) - } - LogicalPlan::CrossJoin(_) => { - let left = inputs[0].clone(); - let right = &inputs[1]; - LogicalPlanBuilder::from(left).cross_join(right)?.build() - } - LogicalPlan::Subquery(_) => { - let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?; - Ok(LogicalPlan::Subquery(Subquery { - subquery: Arc::new(subquery), - })) - } - LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { - let schema = inputs[0].schema().as_ref().clone().into(); - let schema = - DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); - Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { - alias: alias.clone(), - input: Arc::new(inputs[0].clone()), - schema, - })) - } - LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { - n: *n, - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::Offset(Offset { offset, .. }) => Ok(LogicalPlan::Offset(Offset { - offset: *offset, - input: Arc::new(inputs[0].clone()), - })), - LogicalPlan::CreateMemoryTable(CreateMemoryTable { - name, - if_not_exists, - .. - }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { - input: Arc::new(inputs[0].clone()), - name: name.clone(), - if_not_exists: *if_not_exists, - })), - LogicalPlan::CreateView(CreateView { - name, or_replace, .. - }) => Ok(LogicalPlan::CreateView(CreateView { - input: Arc::new(inputs[0].clone()), - name: name.clone(), - or_replace: *or_replace, - })), - LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { - node: e.node.from_template(expr, inputs), - })), - LogicalPlan::Union(Union { schema, alias, .. }) => { - Ok(LogicalPlan::Union(Union { - inputs: inputs.to_vec(), - schema: schema.clone(), - alias: alias.clone(), - })) - } - LogicalPlan::Analyze(a) => { - assert!(expr.is_empty()); - assert_eq!(inputs.len(), 1); - Ok(LogicalPlan::Analyze(Analyze { - verbose: a.verbose, - schema: a.schema.clone(), - input: Arc::new(inputs[0].clone()), - })) - } - LogicalPlan::Explain(_) => { - // Explain should be handled specially in the optimizers; - // If this assert fails it means some optimizer pass is - // trying to optimize Explain directly - assert!( - expr.is_empty(), - "Explain can not be created from utils::from_expr" - ); - assert!( - inputs.is_empty(), - "Explain can not be created from utils::from_expr" - ); - Ok(plan.clone()) - } - LogicalPlan::EmptyRelation(_) - | LogicalPlan::TableScan { .. } - | LogicalPlan::CreateExternalTable(_) - | LogicalPlan::DropTable(_) - | LogicalPlan::CreateCatalogSchema(_) - | LogicalPlan::CreateCatalog(_) => { - // All of these plan types have no inputs / exprs so should not be called - assert!(expr.is_empty(), "{:?} should have no exprs", plan); - assert!(inputs.is_empty(), "{:?} should have no inputs", plan); - Ok(plan.clone()) - } - } -} - /// Returns all direct children `Expression`s of `expr`. /// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as Expr objects) pub fn expr_sub_expressions(expr: &Expr) -> Result> { diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 5113c9f5bdbe6..b2dad55f26dbe 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -28,11 +28,10 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, col, lit, normalize_col, normalize_col_with_schemas, provider_as_source, - union_with_alias, Column, CreateCatalog, CreateCatalogSchema, - CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, - DFSchema, DFSchemaRef, DropTable, Expr, FileType, LogicalPlan, LogicalPlanBuilder, - Operator, PlanType, ToDFSchema, ToStringifiedPlan, + and, col, lit, normalize_col, normalize_col_with_schemas, provider_as_source, Column, + CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, + CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, FileType, + LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, }; use crate::prelude::JoinType; use crate::scalar::ScalarValue; @@ -47,13 +46,15 @@ use crate::{ }; use arrow::datatypes::*; use datafusion_expr::utils::{ - exprlist_to_columns, find_aggregate_exprs, find_window_exprs, + expr_as_column_expr, exprlist_to_columns, find_aggregate_exprs, find_column_exprs, + find_window_exprs, }; use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction}; use hashbrown::HashMap; use datafusion_common::field_not_found; use datafusion_expr::expr::GroupingSet; +use datafusion_expr::logical_plan::builder::project_with_alias; use datafusion_expr::logical_plan::{Filter, Subquery}; use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, @@ -69,12 +70,10 @@ use sqlparser::parser::ParserError::ParserError; use super::{ parser::DFParser, utils::{ - check_columns_satisfy_exprs, expr_as_column_expr, extract_aliases, - find_column_exprs, rebase_expr, resolve_aliases_to_exprs, - resolve_positions_to_exprs, + check_columns_satisfy_exprs, extract_aliases, rebase_expr, + resolve_aliases_to_exprs, resolve_positions_to_exprs, }, }; -use crate::logical_plan::builder::project_with_alias; use crate::logical_plan::plan::{Analyze, Explain}; /// The ContextProvider trait allows the query planner to obtain meta-data about tables and diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index 0b8e8d3a6683c..7a2523e030952 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -20,66 +20,13 @@ use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION}; use sqlparser::ast::Ident; -use crate::logical_plan::ExprVisitable; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::{Expr, LogicalPlan}; use crate::scalar::ScalarValue; -use crate::{ - error::{DataFusionError, Result}, - logical_plan::{Column, ExpressionVisitor, Recursion}, -}; use datafusion_expr::expr::GroupingSet; +use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use std::collections::HashMap; -/// Collect all deeply nested `Expr::Column`'s. They are returned in order of -/// appearance (depth first), and may contain duplicates. -pub(crate) fn find_column_exprs(exprs: &[Expr]) -> Vec { - exprs - .iter() - .flat_map(find_columns_referenced_by_expr) - .map(Expr::Column) - .collect() -} - -/// Recursively find all columns referenced by an expression -#[derive(Debug, Default)] -struct ColumnCollector { - exprs: Vec, -} - -impl ExpressionVisitor for ColumnCollector { - fn pre_visit(mut self, expr: &Expr) -> Result> { - if let Expr::Column(c) = expr { - self.exprs.push(c.clone()) - } - Ok(Recursion::Continue(self)) - } -} - -pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec { - // As the `ExpressionVisitor` impl above always returns Ok, this - // "can't" error - let ColumnCollector { exprs } = e - .accept(ColumnCollector::default()) - .expect("Unexpected error"); - exprs -} - -/// Convert any `Expr` to an `Expr::Column`. -pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { - match expr { - Expr::Column(col) => { - let field = plan.schema().field_from_column(col)?; - Ok(Expr::Column(field.qualified_column())) - } - _ => { - // we should not be trying to create a name for the expression - // based on the input schema but this is the current behavior - // see https://github.com/apache/arrow-datafusion/issues/2456 - Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?))) - } - } -} - /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { clone_with_replacement(expr, &|nested_expr| { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 87bb77067f1bc..f71243610ddfe 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -65,7 +65,7 @@ pub use function::{ StateTypeFunction, }; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; -pub use logical_plan::{LogicalPlan, PlanVisitor}; +pub use logical_plan::{LogicalPlan, LogicalPlanBuilder, PlanVisitor}; pub use nullif::SUPPORTED_NULLIF_TYPES; pub use operator::Operator; pub use signature::{Signature, TypeSignature, Volatility}; diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs similarity index 95% rename from datafusion/core/src/logical_plan/builder.rs rename to datafusion/expr/src/logical_plan/builder.rs index cad1b0a6a7095..9778e2fe99e14 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -17,18 +17,25 @@ //! This module provides a builder for creating LogicalPlans -use crate::error::{DataFusionError, Result}; -use crate::logical_expr::ExprSchemable; -use crate::logical_plan::plan::{ - Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window, +use crate::expr_rewriter::{normalize_col, normalize_cols, rewrite_sort_cols_by_aggs}; +use crate::utils::{columnize_expr, exprlist_to_fields, from_plan}; +use crate::{ + logical_plan::{ + Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join, + JoinConstraint, JoinType, Limit, LogicalPlan, Offset, Partitioning, PlanType, + Projection, Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, + Union, Values, Window, + }, + utils::{ + expand_qualified_wildcard, expand_wildcard, expr_to_columns, + group_window_expr_by_sort_keys, + }, + Expr, ExprSchemable, TableSource, }; -use crate::optimizer::utils; -use crate::scalar::ScalarValue; use arrow::datatypes::{DataType, Schema}; -use datafusion_expr::utils::{ - expand_qualified_wildcard, expand_wildcard, expr_to_columns, - group_window_expr_by_sort_keys, +use datafusion_common::{ + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, + ToDFSchema, }; use std::convert::TryFrom; use std::iter; @@ -37,16 +44,6 @@ use std::{ sync::Arc, }; -use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; -use crate::logical_plan::{ - columnize_expr, exprlist_to_fields, normalize_col, normalize_cols, - rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, - Offset, Partitioning, Repartition, Values, -}; - -use datafusion_common::ToDFSchema; -use datafusion_expr::TableSource; - /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -240,8 +237,9 @@ impl LogicalPlanBuilder { }); Ok(Self::from(table_scan)) } + /// Wrap a plan in a window - pub(crate) fn window_plan( + pub fn window_plan( input: LogicalPlan, window_exprs: Vec, ) -> Result { @@ -368,7 +366,7 @@ impl LogicalPlanBuilder { .collect::>>()?; let expr = curr_plan.expressions(); - utils::from_plan(&curr_plan, &expr, &new_inputs) + from_plan(&curr_plan, &expr, &new_inputs) } } } @@ -704,7 +702,7 @@ impl LogicalPlanBuilder { } /// Process intersect set operator - pub(crate) fn intersect( + pub fn intersect( left_plan: LogicalPlan, right_plan: LogicalPlan, is_all: bool, @@ -718,7 +716,7 @@ impl LogicalPlanBuilder { } /// Process except set operator - pub(crate) fn except( + pub fn except( left_plan: LogicalPlan, right_plan: LogicalPlan, is_all: bool, @@ -938,17 +936,15 @@ pub fn project_with_alias( #[cfg(test)] mod tests { - use arrow::datatypes::{DataType, Field}; + use crate::expr_fn::exists; + use arrow::datatypes::{DataType, Field, SchemaRef}; use datafusion_common::SchemaError; - use datafusion_expr::expr_fn::exists; + use std::any::Any; use crate::logical_plan::StringifiedPlan; - use crate::prelude::*; - use crate::test::test_table_scan_with_name; - use crate::test_util::scan_empty; - use super::super::{col, lit, sum}; use super::*; + use crate::{col, in_subquery, lit, scalar_subquery, sum}; #[test] fn plan_builder_simple() -> Result<()> { @@ -1239,4 +1235,37 @@ mod tests { assert!(stringified_plan.should_display(true)); assert!(!stringified_plan.should_display(false)); } + + fn test_table_scan_with_name(name: &str) -> Result { + let schema = Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::UInt32, false), + Field::new("c", DataType::UInt32, false), + ]); + scan_empty(Some(name), &schema, None)?.build() + } + + fn scan_empty( + name: Option<&str>, + table_schema: &Schema, + projection: Option>, + ) -> Result { + let table_schema = Arc::new(table_schema.clone()); + let table_source = Arc::new(EmptyTable { table_schema }); + LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection) + } + + struct EmptyTable { + table_schema: SchemaRef, + } + + impl TableSource for EmptyTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_schema.clone() + } + } } diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index e9a90cd7f9a2c..f96e4320fe2a9 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +pub mod builder; pub mod display; mod extension; mod plan; +pub use builder::LogicalPlanBuilder; pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 709a3eee24b69..a7aed1799dd5c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -18,9 +18,18 @@ //! Expression utilities use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; -use crate::{Expr, LogicalPlan}; -use datafusion_common::{Column, DFField, DFSchema, DataFusionError, Result}; +use crate::logical_plan::builder::build_join_schema; +use crate::logical_plan::{ + Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter, Join, Limit, + Offset, Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union, + Values, Window, +}; +use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; +use datafusion_common::{ + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, +}; use std::collections::HashSet; +use std::sync::Arc; /// Recursively walk a list of expression trees, collecting the unique set of columns /// referenced in the expression @@ -285,6 +294,340 @@ where } } +/// Returns a new logical plan based on the original one with inputs +/// and expressions replaced. +/// +/// The exprs correspond to the same order of expressions returned by +/// `LogicalPlan::expressions`. This function is used in optimizers in +/// the following way: +/// +/// ```text +/// let new_inputs = optimize_children(..., plan, props); +/// +/// // get the plans expressions to optimize +/// let exprs = plan.expressions(); +/// +/// // potentially rewrite plan expressions +/// let rewritten_exprs = rewrite_exprs(exprs); +/// +/// // create new plan using rewritten_exprs in same position +/// let new_plan = from_plan(&plan, rewritten_exprs, new_inputs); +/// ``` +pub fn from_plan( + plan: &LogicalPlan, + expr: &[Expr], + inputs: &[LogicalPlan], +) -> Result { + match plan { + LogicalPlan::Projection(Projection { schema, alias, .. }) => { + Ok(LogicalPlan::Projection(Projection { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + schema: schema.clone(), + alias: alias.clone(), + })) + } + LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { + schema: schema.clone(), + values: expr + .chunks_exact(schema.fields().len()) + .map(|s| s.to_vec()) + .collect::>(), + })), + LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter { + predicate: expr[0].clone(), + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::Repartition(Repartition { + partitioning_scheme, + .. + }) => match partitioning_scheme { + Partitioning::RoundRobinBatch(n) => { + Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::RoundRobinBatch(*n), + input: Arc::new(inputs[0].clone()), + })) + } + Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition(Repartition { + partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n), + input: Arc::new(inputs[0].clone()), + })), + }, + LogicalPlan::Window(Window { + window_expr, + schema, + .. + }) => Ok(LogicalPlan::Window(Window { + input: Arc::new(inputs[0].clone()), + window_expr: expr[0..window_expr.len()].to_vec(), + schema: schema.clone(), + })), + LogicalPlan::Aggregate(Aggregate { + group_expr, schema, .. + }) => Ok(LogicalPlan::Aggregate(Aggregate { + group_expr: expr[0..group_expr.len()].to_vec(), + aggr_expr: expr[group_expr.len()..].to_vec(), + input: Arc::new(inputs[0].clone()), + schema: schema.clone(), + })), + LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::Join(Join { + join_type, + join_constraint, + on, + null_equals_null, + .. + }) => { + let schema = + build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; + Ok(LogicalPlan::Join(Join { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + join_type: *join_type, + join_constraint: *join_constraint, + on: on.clone(), + schema: DFSchemaRef::new(schema), + null_equals_null: *null_equals_null, + })) + } + LogicalPlan::CrossJoin(_) => { + let left = inputs[0].clone(); + let right = &inputs[1]; + LogicalPlanBuilder::from(left).cross_join(right)?.build() + } + LogicalPlan::Subquery(_) => { + let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?; + Ok(LogicalPlan::Subquery(Subquery { + subquery: Arc::new(subquery), + })) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { + let schema = inputs[0].schema().as_ref().clone().into(); + let schema = + DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?); + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + alias: alias.clone(), + input: Arc::new(inputs[0].clone()), + schema, + })) + } + LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit { + n: *n, + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::Offset(Offset { offset, .. }) => Ok(LogicalPlan::Offset(Offset { + offset: *offset, + input: Arc::new(inputs[0].clone()), + })), + LogicalPlan::CreateMemoryTable(CreateMemoryTable { + name, + if_not_exists, + .. + }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { + input: Arc::new(inputs[0].clone()), + name: name.clone(), + if_not_exists: *if_not_exists, + })), + LogicalPlan::CreateView(CreateView { + name, or_replace, .. + }) => Ok(LogicalPlan::CreateView(CreateView { + input: Arc::new(inputs[0].clone()), + name: name.clone(), + or_replace: *or_replace, + })), + LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { + node: e.node.from_template(expr, inputs), + })), + LogicalPlan::Union(Union { schema, alias, .. }) => { + Ok(LogicalPlan::Union(Union { + inputs: inputs.to_vec(), + schema: schema.clone(), + alias: alias.clone(), + })) + } + LogicalPlan::Analyze(a) => { + assert!(expr.is_empty()); + assert_eq!(inputs.len(), 1); + Ok(LogicalPlan::Analyze(Analyze { + verbose: a.verbose, + schema: a.schema.clone(), + input: Arc::new(inputs[0].clone()), + })) + } + LogicalPlan::Explain(_) => { + // Explain should be handled specially in the optimizers; + // If this assert fails it means some optimizer pass is + // trying to optimize Explain directly + assert!( + expr.is_empty(), + "Explain can not be created from utils::from_expr" + ); + assert!( + inputs.is_empty(), + "Explain can not be created from utils::from_expr" + ); + Ok(plan.clone()) + } + LogicalPlan::EmptyRelation(_) + | LogicalPlan::TableScan { .. } + | LogicalPlan::CreateExternalTable(_) + | LogicalPlan::DropTable(_) + | LogicalPlan::CreateCatalogSchema(_) + | LogicalPlan::CreateCatalog(_) => { + // All of these plan types have no inputs / exprs so should not be called + assert!(expr.is_empty(), "{:?} should have no exprs", plan); + assert!(inputs.is_empty(), "{:?} should have no inputs", plan); + Ok(plan.clone()) + } + } +} + +/// Find all columns referenced from an aggregate query +fn agg_cols(agg: &Aggregate) -> Result> { + Ok(agg + .aggr_expr + .iter() + .chain(&agg.group_expr) + .flat_map(find_columns_referenced_by_expr) + .collect()) +} + +fn exprlist_to_fields_aggregate( + exprs: &[Expr], + plan: &LogicalPlan, + agg: &Aggregate, +) -> Result> { + let agg_cols = agg_cols(agg)?; + let mut fields = vec![]; + for expr in exprs { + match expr { + Expr::Column(c) if agg_cols.iter().any(|x| x == c) => { + // resolve against schema of input to aggregate + fields.push(expr.to_field(agg.input.schema())?); + } + _ => fields.push(expr.to_field(plan.schema())?), + } + } + Ok(fields) +} + +/// Create field meta-data from an expression, for use in a result set schema +pub fn exprlist_to_fields<'a>( + expr: impl IntoIterator, + plan: &LogicalPlan, +) -> Result> { + let exprs: Vec = expr.into_iter().cloned().collect(); + // when dealing with aggregate plans we cannot simply look in the aggregate output schema + // because it will contain columns representing complex expressions (such a column named + // `#GROUPING(person.state)` so in order to resolve `person.state` in this case we need to + // look at the input to the aggregate instead. + let fields = match plan { + LogicalPlan::Aggregate(agg) => { + Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) + } + LogicalPlan::Window(window) => match window.input.as_ref() { + LogicalPlan::Aggregate(agg) => { + Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) + } + _ => None, + }, + _ => None, + }; + if let Some(fields) = fields { + fields + } else { + // look for exact match in plan's output schema + let input_schema = &plan.schema(); + exprs.iter().map(|e| e.to_field(input_schema)).collect() + } +} + +/// Convert an expression into Column expression if it's already provided as input plan. +/// +/// For example, it rewrites: +/// +/// ```text +/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])? +/// .project(vec![col("c1"), sum(col("c2"))? +/// ``` +/// +/// Into: +/// +/// ```text +/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])? +/// .project(vec![col("c1"), col("SUM(#c2)")? +/// ``` +pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { + match e { + Expr::Column(_) => e, + Expr::Alias(inner_expr, name) => { + Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name) + } + Expr::ScalarSubquery(_) => e.clone(), + _ => match e.name(input_schema) { + Ok(name) => match input_schema.field_with_unqualified_name(&name) { + Ok(field) => Expr::Column(field.qualified_column()), + // expression not provided as input, do not convert to a column reference + Err(_) => e, + }, + Err(_) => e, + }, + } +} + +/// Collect all deeply nested `Expr::Column`'s. They are returned in order of +/// appearance (depth first), and may contain duplicates. +pub fn find_column_exprs(exprs: &[Expr]) -> Vec { + exprs + .iter() + .flat_map(find_columns_referenced_by_expr) + .map(Expr::Column) + .collect() +} + +/// Recursively find all columns referenced by an expression +#[derive(Debug, Default)] +struct ColumnCollector { + exprs: Vec, +} + +impl ExpressionVisitor for ColumnCollector { + fn pre_visit(mut self, expr: &Expr) -> Result> { + if let Expr::Column(c) = expr { + self.exprs.push(c.clone()) + } + Ok(Recursion::Continue(self)) + } +} + +pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec { + // As the `ExpressionVisitor` impl above always returns Ok, this + // "can't" error + let ColumnCollector { exprs } = e + .accept(ColumnCollector::default()) + .expect("Unexpected error"); + exprs +} + +/// Convert any `Expr` to an `Expr::Column`. +pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { + match expr { + Expr::Column(col) => { + let field = plan.schema().field_from_column(col)?; + Ok(Expr::Column(field.qualified_column())) + } + _ => { + // we should not be trying to create a name for the expression + // based on the input schema but this is the current behavior + // see https://github.com/apache/arrow-datafusion/issues/2456 + Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?))) + } + } +} + #[cfg(test)] mod tests { use super::*; From dbdee6b890d6e5ac429c86acab3041fcca7bf145 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 May 2022 08:55:49 -0600 Subject: [PATCH 2/3] install python --- .github/workflows/rust.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ce1bc32f8feb9..370270264f530 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -146,6 +146,9 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - uses: actions/setup-python@v3 + with: + python-version: '3.x' - name: Cache Cargo uses: actions/cache@v2 with: From 573ee3bd5f732bf21a4147df7e195e5ce6a04dd9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 May 2022 08:56:51 -0600 Subject: [PATCH 3/3] Revert "install python" This reverts commit dbdee6b890d6e5ac429c86acab3041fcca7bf145. --- .github/workflows/rust.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 370270264f530..ce1bc32f8feb9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -146,9 +146,6 @@ jobs: - uses: actions/checkout@v2 with: submodules: true - - uses: actions/setup-python@v3 - with: - python-version: '3.x' - name: Cache Cargo uses: actions/cache@v2 with: