From ff2515dcb31b8f9ec24eb4a10bd8241dfbbc0971 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Apr 2024 13:05:22 -0400 Subject: [PATCH 1/4] Minor: Add `Column::from(Tableref, &FieldRef)` --- benchmarks/src/tpch/convert.rs | 5 ++--- datafusion/common/src/column.rs | 11 ++++++++++- datafusion/core/src/dataframe/mod.rs | 6 +++--- datafusion/core/src/physical_planner.rs | 8 ++------ datafusion/expr/src/expr_rewriter/mod.rs | 5 ++--- datafusion/expr/src/logical_plan/builder.rs | 2 +- datafusion/expr/src/utils.rs | 5 ++--- datafusion/optimizer/src/common_subexpr_eliminate.rs | 7 ++----- .../optimizer/src/replace_distinct_aggregate.rs | 2 +- datafusion/sql/src/expr/mod.rs | 2 +- datafusion/sql/src/statement.rs | 3 +-- 11 files changed, 27 insertions(+), 29 deletions(-) diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index a841fe5322948..c75e4d0e017f6 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -88,9 +88,8 @@ impl ConvertOpt { .schema() .iter() .take(schema.fields.len() - 1) - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) + .map(Column::from) + .map(Expr::Column) .collect(); csv = csv.select(selection)?; diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index dec87d9d071e8..ae3146516349f 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -17,7 +17,7 @@ //! Column -use arrow_schema::Field; +use arrow_schema::{Field, FieldRef}; use crate::error::_schema_err; use crate::utils::{parse_identifiers_normalized, quote_identifier}; @@ -63,6 +63,8 @@ impl Column { } /// Create Column from unqualified name. + /// + /// Alias for `Column::new_unqualified` pub fn from_name(name: impl Into) -> Self { Self { relation: None, @@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column { } } +/// Create a column, use qualifier and field name +impl From<(Option<&TableReference>, &FieldRef)> for Column { + fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self { + Self::new(relation.cloned(), field.name()) + } +} + impl FromStr for Column { type Err = Infallible; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 90ae05c6805fe..6f9a5ce17ef78 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1332,7 +1332,7 @@ impl DataFrame { col_exists = true; new_column.clone() } else { - col(Column::from((qualifier, field.as_ref()))) + col(Column::from((qualifier, field))) } }) .collect(); @@ -1402,9 +1402,9 @@ impl DataFrame { .iter() .map(|(qualifier, field)| { if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename { - col(Column::from((qualifier, field.as_ref()))).alias(new_name) + col(Column::from((qualifier, field))).alias(new_name) } else { - col(Column::from((qualifier, field.as_ref()))) + col(Column::from((qualifier, field))) } }) .collect::>(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 301f68c0f24bf..4aceeb2e1a223 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1248,12 +1248,8 @@ impl DefaultPhysicalPlanner { if left_projected || right_projected { let final_join_result = join_schema .iter() - .map(|(qualifier, field)| { - Expr::Column(datafusion_common::Column::from(( - qualifier, - field.as_ref(), - ))) - }) + .map(datafusion_common::Column::from) + .map(Expr::Column) .collect::>(); let projection = LogicalPlan::Projection(Projection::try_new( final_join_result, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index c11619fc0ea2e..be58c496c2549 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -221,9 +221,8 @@ pub fn coerce_plan_expr_for_schema( let exprs: Vec = plan .schema() .iter() - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) + .map(Column::from) + .map(Expr::Column) .collect(); let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f7c0fbac537b8..f75d5ad844f18 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1577,7 +1577,7 @@ pub fn unnest_with_options( return Ok(input); } }; - qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref()))); + qualified_columns.push(Column::from((unnest_qualifier, &unnested_field))); unnested_fields.insert(index, unnested_field); } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 8c6b98f17933d..94a7731128e85 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -358,9 +358,8 @@ fn get_exprs_except_skipped( if columns_to_skip.is_empty() { schema .iter() - .map(|(qualifier, field)| { - Expr::Column(Column::from((qualifier, field.as_ref()))) - }) + .map(Column::from) + .map(Expr::Column) .collect::>() } else { schema diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 2fabd5de9282a..9128e37d9b4bc 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -506,7 +506,7 @@ fn build_common_expr_project_plan( for (qualifier, field) in input.schema().iter() { if fields_set.insert(qualified_name(qualifier, field.name())) { - project_exprs.push(Expr::Column(Column::from((qualifier, field.as_ref())))); + project_exprs.push(Expr::Column(Column::from((qualifier, field)))); } } @@ -525,10 +525,7 @@ fn build_recover_project_plan( schema: &DFSchema, input: LogicalPlan, ) -> Result { - let col_exprs = schema - .iter() - .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field.as_ref())))) - .collect(); + let col_exprs = schema.iter().map(Column::from).map(Expr::Column).collect(); Ok(LogicalPlan::Projection(Projection::try_new( col_exprs, Arc::new(input), diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index f464506057ff1..4f68e2623f403 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { .skip(on_expr.len()) .zip(schema.iter()) .map(|((new_qualifier, new_field), (old_qualifier, old_field))| { - Ok(col(Column::from((new_qualifier, new_field.as_ref()))) + Ok(col(Column::from((new_qualifier, new_field))) .alias_qualified(old_qualifier.cloned(), old_field.name())) }) .collect::>>()?; diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 7d4e745eb21e9..567b876c9725d 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -143,7 +143,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { _ => false, }) { Some((qualifier, df_field)) => { - Expr::Column(Column::from((qualifier, df_field.as_ref()))) + Expr::Column(Column::from((qualifier, df_field))) } None => Expr::Column(col), } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 53fbfb0552bb4..dd6d24e2f51be 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )) } else { datafusion_expr::Expr::Column(Column::from(( - qualifier, - field.as_ref(), + qualifier, field, ))) } } From c4bc7e36d92899e5609f7ab2a78145b2e2faed6d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Apr 2024 15:14:18 -0400 Subject: [PATCH 2/4] Add Expr::from() --- benchmarks/src/tpch/convert.rs | 3 +- datafusion/core/src/physical_planner.rs | 7 +-- datafusion/expr/src/expr.rs | 45 ++++++++++++++++++- datafusion/expr/src/expr_rewriter/mod.rs | 7 +-- datafusion/expr/src/utils.rs | 8 +--- .../optimizer/src/common_subexpr_eliminate.rs | 4 +- datafusion/sql/src/expr/mod.rs | 8 ++-- 7 files changed, 55 insertions(+), 27 deletions(-) diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index c75e4d0e017f6..30178d17aa544 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -88,8 +88,7 @@ impl ConvertOpt { .schema() .iter() .take(schema.fields.len() - 1) - .map(Column::from) - .map(Expr::Column) + .map(Expr::from) .collect(); csv = csv.select(selection)?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4aceeb2e1a223..5b59aeea98f87 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1246,11 +1246,8 @@ impl DefaultPhysicalPlanner { // Remove temporary projected columns if left_projected || right_projected { - let final_join_result = join_schema - .iter() - .map(datafusion_common::Column::from) - .map(Expr::Column) - .collect::>(); + let final_join_result = + join_schema.iter().map(Expr::from).collect::>(); let projection = LogicalPlan::Projection(Projection::try_new( final_join_result, Arc::new(new_join), diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 08d495c3be350..63b41b7add44b 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -32,7 +32,7 @@ use crate::{ Signature, }; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference, @@ -84,6 +84,32 @@ use sqlparser::ast::NullTreatment; /// assert_eq!(binary_expr.op, Operator::Eq); /// } /// ``` +/// +/// Return a list of [`Expr::Column`] from a schema's columns +/// +/// Example: +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{DFSchema, Column}; +/// # use datafusion_expr::Expr; +/// # use datafusion_expr::expr::exprs_from_schema; +/// +/// let arrow_schema = Schema::new(vec![ +/// Field::new("c1", DataType::Int32, false), +/// Field::new("c2", DataType::Float64, false), +/// ]); +/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap(); +/// +/// // Form a list of expressions for each item in the schema +/// let exprs: Vec<_> = df_schema.iter() +/// .map(Expr::from) +/// .collect(); +/// +/// assert_eq!(exprs, vec![ +/// Expr::from(Column::from_qualified_name("t1.c1")), +/// Expr::from(Column::from_qualified_name("t1.c2")), +/// ]); +/// ``` #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub enum Expr { /// An expression with a specific name. @@ -190,6 +216,23 @@ impl Default for Expr { } } +/// Create an [`Expr`] from a [`Column`] +impl From for Expr { + fn from(value: Column) -> Self { + Expr::Column(value) + } +} + +/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is +/// useful for creating [`Expr`] from a [`DFSchema`]. +/// +/// See example on [`Expr`] +impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr { + fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self { + Expr::from(Column::from(value)) + } +} + #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct Unnest { pub expr: Box, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index be58c496c2549..f5a532f1653ae 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -218,12 +218,7 @@ pub fn coerce_plan_expr_for_schema( Ok(LogicalPlan::Projection(projection)) } _ => { - let exprs: Vec = plan - .schema() - .iter() - .map(Column::from) - .map(Expr::Column) - .collect(); + let exprs: Vec = plan.schema().iter().map(Expr::from).collect(); let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?; let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err()); diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 94a7731128e85..9dd9e65a392de 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -356,11 +356,7 @@ fn get_exprs_except_skipped( columns_to_skip: HashSet, ) -> Vec { if columns_to_skip.is_empty() { - schema - .iter() - .map(Column::from) - .map(Expr::Column) - .collect::>() + schema.iter().map(Expr::from).collect::>() } else { schema .columns() @@ -854,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { Expr::Column(col) => { let (qualifier, field) = plan.schema().qualified_field_from_column(col)?; - Ok(Expr::Column(Column::from((qualifier, field)))) + Ok(Expr::from(Column::from((qualifier, field)))) } _ => Ok(Expr::Column(Column::from_name(expr.display_name()?))), } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 9128e37d9b4bc..941526f0b8238 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -506,7 +506,7 @@ fn build_common_expr_project_plan( for (qualifier, field) in input.schema().iter() { if fields_set.insert(qualified_name(qualifier, field.name())) { - project_exprs.push(Expr::Column(Column::from((qualifier, field)))); + project_exprs.push(Expr::from((qualifier, field))); } } @@ -525,7 +525,7 @@ fn build_recover_project_plan( schema: &DFSchema, input: LogicalPlan, ) -> Result { - let col_exprs = schema.iter().map(Column::from).map(Expr::Column).collect(); + let col_exprs = schema.iter().map(Expr::from).collect(); Ok(LogicalPlan::Projection(Projection::try_new( col_exprs, Arc::new(input), diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 567b876c9725d..feb256a169b6e 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, Va use sqlparser::parser::ParserError::ParserError; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema, - Result, ScalarValue, + internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result, + ScalarValue, }; use datafusion_expr::expr::AggregateFunctionDefinition; use datafusion_expr::expr::InList; @@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } _ => false, }) { - Some((qualifier, df_field)) => { - Expr::Column(Column::from((qualifier, df_field))) - } + Some((qualifier, df_field)) => Expr::from((qualifier, df_field)), None => Expr::Column(col), } } From 3f792b3c64d95fcd7090422e44650ffdc2867334 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Apr 2024 15:15:39 -0400 Subject: [PATCH 3/4] fix docs --- datafusion/expr/src/expr.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 63b41b7add44b..5ad2ef3d0c380 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -85,9 +85,7 @@ use sqlparser::ast::NullTreatment; /// } /// ``` /// -/// Return a list of [`Expr::Column`] from a schema's columns -/// -/// Example: +/// ## Return a list of [`Expr::Column`] from a schema's columns /// ``` /// # use arrow::datatypes::{DataType, Field, Schema}; /// # use datafusion_common::{DFSchema, Column}; From 4db07ee31f4f59ea3194e08a1bf3aa3deb02507b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Apr 2024 15:42:37 -0400 Subject: [PATCH 4/4] Fix doc test --- datafusion/expr/src/expr.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index ec37cbf889e35..8fc2e102c1071 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -90,7 +90,6 @@ use sqlparser::ast::NullTreatment; /// # use arrow::datatypes::{DataType, Field, Schema}; /// # use datafusion_common::{DFSchema, Column}; /// # use datafusion_expr::Expr; -/// # use datafusion_expr::expr::exprs_from_schema; /// /// let arrow_schema = Schema::new(vec![ /// Field::new("c1", DataType::Int32, false),