Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 1 addition & 98 deletions datafusion/core/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@

pub use super::Operator;
use crate::error::Result;
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{DFField, DFSchema};
use crate::sql::utils::find_columns_referenced_by_expr;
use arrow::datatypes::DataType;
pub use datafusion_common::{Column, ExprSchema};
pub use datafusion_expr::expr_fn::*;
use datafusion_expr::logical_plan::Aggregate;
use datafusion_expr::AccumulatorFunctionImplementation;
use datafusion_expr::BuiltinScalarFunction;
pub use datafusion_expr::Expr;
use datafusion_expr::StateTypeFunction;
pub use datafusion_expr::{lit, lit_timestamp_nano, Literal};
use datafusion_expr::{AccumulatorFunctionImplementation, LogicalPlan};
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_expr::{
ReturnTypeFunction, ScalarFunctionImplementation, Signature, Volatility,
Expand All @@ -52,39 +48,6 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
Some(combined_filter)
}

/// Convert an expression into Column expression if it's already provided as input plan.
///
/// For example, it rewrites:
///
/// ```text
/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
/// .project(vec![col("c1"), sum(col("c2"))?
/// ```
///
/// Into:
///
/// ```text
/// .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
/// .project(vec![col("c1"), col("SUM(#c2)")?
/// ```
pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
match e {
Expr::Column(_) => e,
Expr::Alias(inner_expr, name) => {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
}
Expr::ScalarSubquery(_) => e.clone(),
_ => match e.name(input_schema) {
Ok(name) => match input_schema.field_with_unqualified_name(&name) {
Ok(field) => Expr::Column(field.qualified_column()),
// expression not provided as input, do not convert to a column reference
Err(_) => e,
},
Err(_) => e,
},
}
}

/// Recursively un-alias an expressions
#[inline]
pub fn unalias(expr: Expr) -> Expr {
Expand Down Expand Up @@ -137,66 +100,6 @@ pub fn create_udaf(
)
}

/// Find all columns referenced from an aggregate query
fn agg_cols(agg: &Aggregate) -> Result<Vec<Column>> {
Ok(agg
.aggr_expr
.iter()
.chain(&agg.group_expr)
.flat_map(find_columns_referenced_by_expr)
.collect())
}

fn exprlist_to_fields_aggregate(
exprs: &[Expr],
plan: &LogicalPlan,
agg: &Aggregate,
) -> Result<Vec<DFField>> {
let agg_cols = agg_cols(agg)?;
let mut fields = vec![];
for expr in exprs {
match expr {
Expr::Column(c) if agg_cols.iter().any(|x| x == c) => {
// resolve against schema of input to aggregate
fields.push(expr.to_field(agg.input.schema())?);
}
_ => fields.push(expr.to_field(plan.schema())?),
}
}
Ok(fields)
}

/// Create field meta-data from an expression, for use in a result set schema
pub fn exprlist_to_fields<'a>(
expr: impl IntoIterator<Item = &'a Expr>,
plan: &LogicalPlan,
) -> Result<Vec<DFField>> {
let exprs: Vec<Expr> = expr.into_iter().cloned().collect();
// when dealing with aggregate plans we cannot simply look in the aggregate output schema
// because it will contain columns representing complex expressions (such a column named
// `#GROUPING(person.state)` so in order to resolve `person.state` in this case we need to
// look at the input to the aggregate instead.
let fields = match plan {
LogicalPlan::Aggregate(agg) => {
Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
}
LogicalPlan::Window(window) => match window.input.as_ref() {
LogicalPlan::Aggregate(agg) => {
Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
}
_ => None,
},
_ => None,
};
if let Some(fields) = fields {
fields
} else {
// look for exact match in plan's output schema
let input_schema = &plan.schema();
exprs.iter().map(|e| e.to_field(input_schema)).collect()
}
}

/// Calls a named built in function
/// ```
/// use datafusion::logical_plan::*;
Expand Down
26 changes: 12 additions & 14 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,33 @@
//! Logical query plans can then be optimized and executed directly, or translated into
//! physical query plans and executed.

pub(crate) mod builder;
mod expr;
mod expr_simplier;
pub mod plan;
mod registry;
pub mod window_frames;
pub use builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
};
pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use datafusion_expr::{
expr_fn::binary_expr,
expr_rewriter,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
logical_plan::builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
ExprSchemable, Operator,
};
pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap, left, length,
lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min,
not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or, power, random,
regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim,
scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt,
starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper,
when, Column, Expr, ExprSchema, Literal,
combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln,
log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace,
repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::logical_plan::{
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use arrow::datatypes::DataType;
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::utils::from_plan;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -238,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
.map(|input_plan| optimize(input_plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/optimizer/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
//! This saves time in planning and executing the query.
//! Note that this rule should be applied after simplify expressions optimizer rule.
use datafusion_common::ScalarValue;
use datafusion_expr::utils::from_plan;
use datafusion_expr::Expr;

use crate::error::Result;
use crate::logical_plan::plan::Filter;
use crate::logical_plan::{EmptyRelation, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;

use super::utils;
use crate::execution::context::ExecutionProps;

/// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
Expand Down Expand Up @@ -68,7 +68,7 @@ impl OptimizerRule for EliminateFilter {
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &plan.expressions(), &new_inputs)
from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/optimizer/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use crate::error::Result;
use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::utils::from_plan;

use super::utils;
use crate::execution::context::ExecutionProps;

/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
Expand Down Expand Up @@ -59,7 +59,7 @@ impl OptimizerRule for EliminateLimit {
.map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::logical_plan::{
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns};
use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, from_plan};
use std::collections::{HashMap, HashSet};

/// Filter Push Down optimizer rule pushes filter clauses down the plan
Expand Down Expand Up @@ -90,7 +90,7 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> {
.collect::<Result<Vec<_>>>()?;

let expr = plan.expressions();
utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}

// remove all filters from `filters` that are in `predicate_columns`
Expand Down Expand Up @@ -246,7 +246,7 @@ fn optimize_join(

// create a new Join with the new `left` and `right`
let expr = plan.expressions();
let plan = utils::from_plan(plan, &expr, &[left, right])?;
let plan = from_plan(plan, &expr, &[left, right])?;

if to_keep.0.is_empty() {
Ok(plan)
Expand Down Expand Up @@ -334,7 +334,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// optimize inner
let new_input = optimize(input, state)?;

utils::from_plan(plan, expr, &[new_input])
from_plan(plan, expr, &[new_input])
}
LogicalPlan::Aggregate(Aggregate {
aggr_expr, input, ..
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::logical_plan::Offset;
use datafusion_expr::utils::from_plan;
use std::sync::Arc;

/// Optimization rule that tries pushes down LIMIT n
Expand Down Expand Up @@ -171,7 +171,7 @@ fn limit_push_down(
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use crate::logical_plan::{
LogicalPlanBuilder, ToDFSchema, Union,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
use datafusion_expr::utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs};
use datafusion_expr::utils::{
expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan,
};
use datafusion_expr::Expr;
use std::{
collections::{BTreeSet, HashSet},
Expand Down Expand Up @@ -458,7 +459,7 @@ fn optimize_plan(
_execution_props,
)?];
let expr = vec![];
utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
Expand Down Expand Up @@ -502,7 +503,7 @@ fn optimize_plan(
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
}
}
Expand All @@ -513,12 +514,11 @@ mod tests {
use std::collections::HashMap;

use super::*;
use crate::logical_plan::{
col, exprlist_to_fields, lit, max, min, Expr, JoinType, LogicalPlanBuilder,
};
use crate::logical_plan::{col, lit, max, min, Expr, JoinType, LogicalPlanBuilder};
use crate::test::*;
use crate::test_util::scan_empty;
use arrow::datatypes::DataType;
use datafusion_expr::utils::exprlist_to_fields;

#[test]
fn aggregate_no_group_by() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use crate::logical_plan::{
LogicalPlan, RewriteRecursion, SimplifyInfo,
};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};
use arrow::array::new_null_array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_expr::utils::from_plan;
use datafusion_expr::Volatility;

/// Provides simplification information based on schema and properties
Expand Down Expand Up @@ -234,7 +234,7 @@ impl OptimizerRule for SimplifyExpressions {
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/optimizer/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Projection};
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan};
use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use datafusion_expr::utils::{columnize_expr, from_plan};
use hashbrown::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -155,7 +155,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result<LogicalPlan> {
.iter()
.map(|plan| optimize(plan))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
from_plan(plan, &expr, &new_inputs)
}

fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
Expand Down
Loading