diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index f52573b2e78e6..f76885bfe0962 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -27,8 +27,8 @@ use crate::AggregateUDF; use crate::Operator; use crate::ScalarUDF; use arrow::datatypes::DataType; +use datafusion_common::Result; use datafusion_common::{plan_err, Column}; -use datafusion_common::{DFSchema, Result}; use datafusion_common::{DataFusionError, ScalarValue}; use std::fmt; use std::fmt::Write; @@ -321,11 +321,9 @@ impl PartialOrd for Expr { } impl Expr { - /// Returns the name of this expression based on [datafusion_common::DFSchema]. - /// - /// This represents how a column with this expression is named when no alias is chosen - pub fn name(&self, input_schema: &DFSchema) -> Result { - create_name(self, input_schema) + /// Returns the name of this expression as it should appear in a schema. + pub fn name(&self) -> Result { + create_name(self) } /// Return String representation of the variant represented by `self` @@ -487,50 +485,16 @@ impl Not for Expr { } } -impl std::fmt::Display for Expr { +/// Format expressions for display as part of a logical plan. In many cases, this will produce +/// similar output to `Expr.name()` except that column names will be prefixed with '#'. +impl fmt::Display for Expr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Expr::BinaryExpr { - ref left, - ref right, - ref op, - } => write!(f, "{} {} {}", left, op, right), - Expr::AggregateFunction { - /// Name of the function - ref fun, - /// List of expressions to feed to the functions as arguments - ref args, - /// Whether this is a DISTINCT aggregation or not - ref distinct, - } => fmt_function(f, &fun.to_string(), *distinct, args, true), - Expr::ScalarFunction { - /// Name of the function - ref fun, - /// List of expressions to feed to the functions as arguments - ref args, - } => fmt_function(f, &fun.to_string(), false, args, true), - Expr::Exists { negated, .. } => { - if *negated { - write!(f, "NOT EXISTS ()") - } else { - write!(f, "EXISTS ()") - } - } - Expr::InSubquery { negated, .. } => { - if *negated { - write!(f, "NOT IN ()") - } else { - write!(f, "IN ()") - } - } - Expr::ScalarSubquery(_) => { - write!(f, "()") - } - _ => write!(f, "{:?}", self), - } + write!(f, "{:?}", self) } } +/// Format expressions for display as part of a logical plan. In many cases, this will produce +/// similar output to `Expr.name()` except that column names will be prefixed with '#'. impl fmt::Debug for Expr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -747,16 +711,8 @@ fn fmt_function( write!(f, "{}({}{})", fun, distinct_str, args.join(", ")) } -fn create_function_name( - fun: &str, - distinct: bool, - args: &[Expr], - input_schema: &DFSchema, -) -> Result { - let names: Vec = args - .iter() - .map(|e| create_name(e, input_schema)) - .collect::>()?; +fn create_function_name(fun: &str, distinct: bool, args: &[Expr]) -> Result { + let names: Vec = args.iter().map(create_name).collect::>()?; let distinct_str = match distinct { true => "DISTINCT ", false => "", @@ -766,15 +722,15 @@ fn create_function_name( /// Returns a readable name of an expression based on the input schema. /// This function recursively transverses the expression for names such as "CAST(a > 2)". -fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { +fn create_name(e: &Expr) -> Result { match e { Expr::Alias(_, name) => Ok(name.clone()), Expr::Column(c) => Ok(c.flat_name()), Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")), Expr::Literal(value) => Ok(format!("{:?}", value)), Expr::BinaryExpr { left, op, right } => { - let left = create_name(left, input_schema)?; - let right = create_name(right, input_schema)?; + let left = create_name(left)?; + let right = create_name(right)?; Ok(format!("{} {} {}", left, op, right)) } Expr::Case { @@ -784,67 +740,67 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { } => { let mut name = "CASE ".to_string(); if let Some(e) = expr { - let e = create_name(e, input_schema)?; + let e = create_name(e)?; let _ = write!(name, "{} ", e); } for (w, t) in when_then_expr { - let when = create_name(w, input_schema)?; - let then = create_name(t, input_schema)?; + let when = create_name(w)?; + let then = create_name(t)?; let _ = write!(name, "WHEN {} THEN {} ", when, then); } if let Some(e) = else_expr { - let e = create_name(e, input_schema)?; + let e = create_name(e)?; let _ = write!(name, "ELSE {} ", e); } name += "END"; Ok(name) } Expr::Cast { expr, data_type } => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("CAST({} AS {:?})", expr, data_type)) } Expr::TryCast { expr, data_type } => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("TRY_CAST({} AS {:?})", expr, data_type)) } Expr::Not(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("NOT {}", expr)) } Expr::Negative(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("(- {})", expr)) } Expr::IsNull(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS NULL", expr)) } Expr::IsNotNull(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS NOT NULL", expr)) } Expr::IsTrue(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS TRUE", expr)) } Expr::IsFalse(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS FALSE", expr)) } Expr::IsUnknown(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS UNKNOWN", expr)) } Expr::IsNotTrue(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS NOT TRUE", expr)) } Expr::IsNotFalse(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS NOT FALSE", expr)) } Expr::IsNotUnknown(expr) => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{} IS NOT UNKNOWN", expr)) } Expr::Exists { negated: true, .. } => Ok("NOT EXISTS".to_string()), @@ -855,15 +811,13 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { Ok(subquery.subquery.schema().field(0).name().clone()) } Expr::GetIndexedField { expr, key } => { - let expr = create_name(expr, input_schema)?; + let expr = create_name(expr)?; Ok(format!("{}[{}]", expr, key)) } Expr::ScalarFunction { fun, args, .. } => { - create_function_name(&fun.to_string(), false, args, input_schema) - } - Expr::ScalarUDF { fun, args, .. } => { - create_function_name(&fun.name, false, args, input_schema) + create_function_name(&fun.to_string(), false, args) } + Expr::ScalarUDF { fun, args, .. } => create_function_name(&fun.name, false, args), Expr::WindowFunction { fun, args, @@ -871,12 +825,8 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { partition_by, order_by, } => { - let mut parts: Vec = vec![create_function_name( - &fun.to_string(), - false, - args, - input_schema, - )?]; + let mut parts: Vec = + vec![create_function_name(&fun.to_string(), false, args)?]; if !partition_by.is_empty() { parts.push(format!("PARTITION BY {:?}", partition_by)); } @@ -893,30 +843,25 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { distinct, args, .. - } => create_function_name(&fun.to_string(), *distinct, args, input_schema), + } => create_function_name(&fun.to_string(), *distinct, args), Expr::AggregateUDF { fun, args } => { let mut names = Vec::with_capacity(args.len()); for e in args { - names.push(create_name(e, input_schema)?); + names.push(create_name(e)?); } Ok(format!("{}({})", fun.name, names.join(","))) } Expr::GroupingSet(grouping_set) => match grouping_set { - GroupingSet::Rollup(exprs) => Ok(format!( - "ROLLUP ({})", - create_names(exprs.as_slice(), input_schema)? - )), - GroupingSet::Cube(exprs) => Ok(format!( - "CUBE ({})", - create_names(exprs.as_slice(), input_schema)? - )), + GroupingSet::Rollup(exprs) => { + Ok(format!("ROLLUP ({})", create_names(exprs.as_slice())?)) + } + GroupingSet::Cube(exprs) => { + Ok(format!("CUBE ({})", create_names(exprs.as_slice())?)) + } GroupingSet::GroupingSets(lists_of_exprs) => { let mut list_of_names = vec![]; for exprs in lists_of_exprs { - list_of_names.push(format!( - "({})", - create_names(exprs.as_slice(), input_schema)? - )); + list_of_names.push(format!("({})", create_names(exprs.as_slice())?)); } Ok(format!("GROUPING SETS ({})", list_of_names.join(", "))) } @@ -926,8 +871,8 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { list, negated, } => { - let expr = create_name(expr, input_schema)?; - let list = list.iter().map(|expr| create_name(expr, input_schema)); + let expr = create_name(expr)?; + let list = list.iter().map(create_name); if *negated { Ok(format!("{} NOT IN ({:?})", expr, list)) } else { @@ -940,9 +885,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { low, high, } => { - let expr = create_name(expr, input_schema)?; - let low = create_name(low, input_schema)?; - let high = create_name(high, input_schema)?; + let expr = create_name(expr)?; + let low = create_name(low)?; + let high = create_name(high)?; if *negated { Ok(format!("{} NOT BETWEEN {} AND {}", expr, low, high)) } else { @@ -962,10 +907,10 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { } /// Create a comma separated list of names from a list of expressions -fn create_names(exprs: &[Expr], input_schema: &DFSchema) -> Result { +fn create_names(exprs: &[Expr]) -> Result { Ok(exprs .iter() - .map(|e| create_name(e, input_schema)) + .map(create_name) .collect::>>()? .join(", ")) } @@ -973,7 +918,20 @@ fn create_names(exprs: &[Expr], input_schema: &DFSchema) -> Result { #[cfg(test)] mod test { use crate::expr_fn::col; - use crate::lit; + use crate::{case, lit}; + use datafusion_common::{Result, ScalarValue}; + + #[test] + fn format_case_when() -> Result<()> { + let expr = case(col("a")) + .when(lit(1), lit(true)) + .when(lit(0), lit(false)) + .otherwise(lit(ScalarValue::Null))?; + assert_eq!("CASE #a WHEN Int32(1) THEN Boolean(true) WHEN Int32(0) THEN Boolean(false) ELSE NULL END", format!("{}", expr)); + assert_eq!("CASE #a WHEN Int32(1) THEN Boolean(true) WHEN Int32(0) THEN Boolean(false) ELSE NULL END", format!("{:?}", expr)); + assert_eq!("CASE a WHEN Int32(1) THEN Boolean(true) WHEN Int32(0) THEN Boolean(false) ELSE NULL END", expr.name()?); + Ok(()) + } #[test] fn test_not() { diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 0900a29993f6d..46c5ecf279329 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -237,7 +237,7 @@ impl ExprSchemable for Expr { )), _ => Ok(DFField::new( None, - &self.name(input_schema)?, + &self.name()?, self.get_type(input_schema)?, self.nullable(input_schema)?, )), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 41ba951400602..ea923e57e85f5 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -669,7 +669,7 @@ impl LogicalPlanBuilder { ) -> Result { let window_expr = normalize_cols(window_expr, &self.plan)?; let all_expr = window_expr.iter(); - validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; + validate_unique_names("Windows", all_expr.clone())?; let mut window_fields: Vec = exprlist_to_fields(all_expr, &self.plan)?; window_fields.extend_from_slice(self.plan.schema().fields()); Ok(Self::from(LogicalPlan::Window(Window { @@ -696,7 +696,7 @@ impl LogicalPlanBuilder { let grouping_expr: Vec = grouping_set_to_exprlist(group_expr.as_slice())?; let all_expr = grouping_expr.iter().chain(aggr_expr.iter()); - validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?; + validate_unique_names("Aggregations", all_expr.clone())?; let aggr_schema = DFSchema::new_with_metadata( exprlist_to_fields(all_expr, &self.plan)?, self.plan.schema().metadata().clone(), @@ -832,11 +832,10 @@ pub fn build_join_schema( fn validate_unique_names<'a>( node_name: &str, expressions: impl IntoIterator, - input_schema: &DFSchema, ) -> Result<()> { let mut unique_names = HashMap::new(); expressions.into_iter().enumerate().try_for_each(|(position, expr)| { - let name = expr.name(input_schema)?; + let name = expr.name()?; match unique_names.get(&name) { None => { unique_names.insert(name, (position, expr)); @@ -956,7 +955,7 @@ pub fn project_with_alias( .push(columnize_expr(normalize_col(e, &plan)?, input_schema)), } } - validate_unique_names("Projections", projected_expr.iter(), input_schema)?; + validate_unique_names("Projections", projected_expr.iter())?; let input_schema = DFSchema::new_with_metadata( exprlist_to_fields(&projected_expr, &plan)?, plan.schema().metadata().clone(), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index e748536d735df..c039ca15ee185 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -629,7 +629,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name) } Expr::ScalarSubquery(_) => e.clone(), - _ => match e.name(input_schema) { + _ => match e.name() { Ok(name) => match input_schema.field_with_unqualified_name(&name) { Ok(field) => Expr::Column(field.qualified_column()), // expression not provided as input, do not convert to a column reference @@ -681,12 +681,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { let field = plan.schema().field_from_column(col)?; Ok(Expr::Column(field.qualified_column())) } - _ => { - // we should not be trying to create a name for the expression - // based on the input schema but this is the current behavior - // see https://github.com/apache/arrow-datafusion/issues/2456 - Ok(Expr::Column(Column::from_name(expr.name(plan.schema())?))) - } + _ => Ok(Expr::Column(Column::from_name(expr.name()?))), } } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index f015aeaa0bca0..4eeda1f4c7d0b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -103,7 +103,6 @@ fn optimize( &[&arrays], input, &mut expr_set, - schema, optimizer_config, )?; @@ -137,7 +136,6 @@ fn optimize( &[&[id_array]], input, &mut expr_set, - input.schema(), optimizer_config, )?; @@ -158,7 +156,6 @@ fn optimize( &[&arrays], input, &mut expr_set, - schema, optimizer_config, )?; @@ -182,7 +179,6 @@ fn optimize( &[&group_arrays, &aggr_arrays], input, &mut expr_set, - schema, optimizer_config, )?; // note the reversed pop order. @@ -204,7 +200,6 @@ fn optimize( &[&arrays], input, &mut expr_set, - input.schema(), optimizer_config, )?; @@ -304,7 +299,6 @@ fn rewrite_expr( arrays_list: &[&[Vec<(usize, String)>]], input: &LogicalPlan, expr_set: &mut ExprSet, - schema: &DFSchema, optimizer_config: &OptimizerConfig, ) -> Result<(Vec>, LogicalPlan)> { let mut affected_id = HashSet::::new(); @@ -318,13 +312,7 @@ fn rewrite_expr( .cloned() .zip(arrays.iter()) .map(|(expr, id_array)| { - replace_common_expr( - expr, - id_array, - expr_set, - &mut affected_id, - schema, - ) + replace_common_expr(expr, id_array, expr_set, &mut affected_id) }) .collect::>>() }) @@ -628,7 +616,6 @@ struct CommonSubexprRewriter<'a> { id_array: &'a [(usize, Identifier)], /// Which identifier is replaced. affected_id: &'a mut HashSet, - schema: &'a DFSchema, /// the max series number we have rewritten. Other expression nodes /// with smaller series number is already replaced and shouldn't @@ -686,7 +673,7 @@ impl ExprRewriter for CommonSubexprRewriter<'_> { self.curr_index += 1; } - let expr_name = expr.name(self.schema)?; + let expr_name = expr.name()?; // Alias this `Column` expr to it original "expr name", // `projection_push_down` optimizer use "expr name" to eliminate useless // projections. @@ -699,13 +686,11 @@ fn replace_common_expr( id_array: &[(usize, Identifier)], expr_set: &mut ExprSet, affected_id: &mut HashSet, - schema: &DFSchema, ) -> Result { expr.rewrite(&mut CommonSubexprRewriter { expr_set, id_array, affected_id, - schema, max_series_number: 0, curr_index: 0, }) diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 3d0415232dbf6..d5dafad4ad4ea 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -387,9 +387,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let new_input = optimize(input, state)?; Ok(from_plan(plan, expr, &[new_input])?) } - LogicalPlan::Aggregate(Aggregate { - aggr_expr, input, .. - }) => { + LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { // An aggregate's aggreagate columns are _not_ filter-commutable => collect these: // * columns whose aggregation expression depends on // * the aggregation columns themselves @@ -400,7 +398,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let agg_columns = aggr_expr .iter() - .map(|x| Ok(Column::from_name(x.name(input.schema())?))) + .map(|x| Ok(Column::from_name(x.name()?))) .collect::>>()?; used_columns.extend(agg_columns); diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 80cc1044df25a..c458923e9d9aa 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -253,16 +253,13 @@ fn optimize_plan( })) } LogicalPlan::Window(Window { - schema, - window_expr, - input, - .. + window_expr, input, .. }) => { // Gather all columns needed for expressions in this Window let mut new_window_expr = Vec::new(); { window_expr.iter().try_for_each(|expr| { - let name = &expr.name(schema)?; + let name = &expr.name()?; let column = Column::from_name(name); if required_columns.contains(&column) { new_window_expr.push(expr.clone()); @@ -321,7 +318,7 @@ fn optimize_plan( // Gather all columns needed for expressions in this Aggregate let mut new_aggr_expr = Vec::new(); aggr_expr.iter().try_for_each(|expr| { - let name = &expr.name(schema)?; + let name = &expr.name()?; let column = Column::from_name(name); if required_columns.contains(&column) { diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 1735e80ef7809..9b7d93655fc24 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -289,12 +289,12 @@ impl SimplifyExpressions { .map(|e| { // We need to keep original expression name, if any. // Constant folding should not change expression name. - let name = &e.name(plan.schema()); + let name = &e.name(); // Apply the actual simplification logic let new_e = e.simplify(&info)?; - let new_name = &new_e.name(plan.schema()); + let new_name = &new_e.name(); if let (Ok(expr_name), Ok(new_expr_name)) = (name, new_name) { if expr_name != new_expr_name { diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index e36706caa226e..45d3e6ff13ca7 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -74,7 +74,7 @@ fn optimize(plan: &LogicalPlan) -> Result { let x = match agg_expr { Expr::AggregateFunction { fun, args, .. } => { // is_single_distinct_agg ensure args.len=1 - if group_fields_set.insert(args[0].name(input.schema())?) { + if group_fields_set.insert(args[0].name()?) { all_group_args .push(args[0].clone().alias(SINGLE_DISTINCT_ALIAS)); } @@ -161,9 +161,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result { fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { match plan { - LogicalPlan::Aggregate(Aggregate { - input, aggr_expr, .. - }) => { + LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { let mut fields_set = HashSet::new(); aggr_expr .iter() @@ -172,7 +170,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { if let Expr::AggregateFunction { distinct, args, .. } = expr { is_distinct = *distinct; args.iter().for_each(|expr| { - fields_set.insert(expr.name(input.schema()).unwrap()); + fields_set.insert(expr.name().unwrap()); }) } is_distinct