From 56f115bb52bfb9e15c12f47acc736484be3f1d54 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 7 Oct 2022 15:13:08 -0400 Subject: [PATCH 1/2] Public coerce and simplify API --- datafusion/core/src/logical_plan/mod.rs | 2 +- datafusion/core/tests/simplification.rs | 8 +- datafusion/expr/src/logical_plan/plan.rs | 2 +- .../expr/src/type_coercion/functions.rs | 2 +- datafusion/optimizer/src/expr_simplifier.rs | 213 ++++++++++++++++-- .../optimizer/src/simplify_expressions.rs | 115 ++-------- datafusion/optimizer/src/type_coercion.rs | 2 +- 7 files changed, 222 insertions(+), 122 deletions(-) diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index 038571de30570..8001f602d9f09 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -56,4 +56,4 @@ pub use datafusion_expr::{ to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator, }; -pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo}; +pub use datafusion_optimizer::expr_simplifier::SimplifyInfo; diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index fe5f5e254b523..9cccd1aabf9cb 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -19,13 +19,13 @@ use arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::ExprSchemable; -use datafusion::logical_plan::ExprSimplifiable; use datafusion::{ error::Result, execution::context::ExecutionProps, logical_plan::{DFSchema, Expr, SimplifyInfo}, prelude::*, }; +use datafusion_optimizer::expr_simplifier::ExprSimplifier; /// In order to simplify expressions, DataFusion must have information /// about the expressions. @@ -90,7 +90,8 @@ fn basic() { // optimize form `a < 5` automatically let expr = col("a").lt(lit(2i32) + lit(3i32)); - let simplified = expr.simplify(&info).unwrap(); + let simplifier = ExprSimplifier::new(info); + let simplified = simplifier.simplify(expr).unwrap(); assert_eq!(simplified, col("a").lt(lit(5i32))); } @@ -103,6 +104,7 @@ fn fold_and_simplify() { // Since datafusion applies both simplification *and* rewriting // some expressions can be entirely simplified - let simplified = expr.simplify(&info).unwrap(); + let simplifier = ExprSimplifier::new(info); + let simplified = simplifier.simplify(expr).unwrap(); assert_eq!(simplified, lit(true)) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7f5c95ab14364..7b74b8deb147f 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1357,7 +1357,7 @@ impl Aggregate { /// building the schema again when the schema is already known. /// /// This method should only be called when you are absolutely sure that the schema being - /// provided is correct for the aggregate. If in doubt, call [try_new] instead. + /// provided is correct for the aggregate. If in doubt, call [try_new](Self::try_new) instead. pub fn try_new_with_schema( input: Arc, group_expr: Vec, diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 667451a5fd22f..c20a93b99b6f0 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -26,7 +26,7 @@ use datafusion_common::{DataFusionError, Result}; /// each argument must be coerced to match `signature`. /// /// For more details on coercion in general, please see the -/// [`type_coercion`](datafusion::expr::type_coercion) module. +/// [`type_coercion`](datafusion_expr::type_coercion) module. pub fn data_types( current_types: &[DataType], signature: &Signature, diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs index 7cf5f02c6966f..72bffb8ff230a 100644 --- a/datafusion/optimizer/src/expr_simplifier.rs +++ b/datafusion/optimizer/src/expr_simplifier.rs @@ -15,17 +15,24 @@ // specific language governing permissions and limitations // under the License. -//! Expression simplifier +//! Expression simplification API -use crate::simplify_expressions::{ConstEvaluator, Simplifier}; -use datafusion_common::Result; -use datafusion_expr::{expr_rewriter::ExprRewritable, Expr}; +use crate::{ + simplify_expressions::{ConstEvaluator, Simplifier}, + type_coercion::TypeCoercionRewriter, +}; +use arrow::datatypes::DataType; +use datafusion_common::{DFSchemaRef, DataFusionError, Result}; +use datafusion_expr::{expr_rewriter::ExprRewritable, Expr, ExprSchemable}; use datafusion_physical_expr::execution_props::ExecutionProps; #[allow(rustdoc::private_intra_doc_links)] /// The information necessary to apply algebraic simplification to an -/// [Expr]. See [SimplifyContext](crate::optimizer::simplify_expressions::SimplifyContext) -/// for one implementation +/// [Expr]. See [SimplifyContext] for one concrete implementation. +/// +/// This trait exists so that other systems can plug schema +/// information in without having to create `DFSchema` objects. If you +/// have a [`DFSchemaRef`] you can use [`SimplifyContext`] pub trait SimplifyInfo { /// returns true if this Expr has boolean type fn is_boolean_type(&self, expr: &Expr) -> Result; @@ -37,28 +44,41 @@ pub trait SimplifyInfo { fn execution_props(&self) -> &ExecutionProps; } -/// trait for types that can be simplified -pub trait ExprSimplifiable: Sized { - /// simplify this trait object using the given SimplifyInfo - fn simplify(self, info: &S) -> Result; +/// This structure handles API for expression simplification +pub struct ExprSimplifier { + info: S, } -impl ExprSimplifiable for Expr { +impl ExprSimplifier { + /// Create a new `ExprSimplifier` with the given `info`. See + /// [`simplify`](Self::simplify) for an example. + pub fn new(info: S) -> Self { + Self { info } + } + /// Simplifies this [`Expr`]`s as much as possible, evaluating - /// constants and applying algebraic simplifications + /// constants and applying algebraic simplifications. + /// + /// The types of the expression must match what operators expect, + /// or else an error may occur trying to evaluate. See + /// [`coerce`](Self::coerce) for a function to help. /// /// # Example: + /// /// `b > 2 AND b > 2` + /// /// can be written to + /// /// `b > 2` /// /// ``` /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_common::Result; /// use datafusion_physical_expr::execution_props::ExecutionProps; - /// use datafusion_optimizer::expr_simplifier::{SimplifyInfo, ExprSimplifiable}; + /// use datafusion_optimizer::expr_simplifier::{ExprSimplifier, SimplifyInfo}; /// /// /// Simple implementation that provides `Simplifier` the information it needs + /// /// See SimplifyContext for a structure that does this. /// #[derive(Default)] /// struct Info { /// execution_props: ExecutionProps, @@ -76,6 +96,9 @@ impl ExprSimplifiable for Expr { /// } /// } /// + /// // Create the simplifier + /// let simplifier = ExprSimplifier::new(Info::default()); + /// /// // b < 2 /// let b_lt_2 = col("b").gt(lit(2)); /// @@ -83,17 +106,171 @@ impl ExprSimplifiable for Expr { /// let expr = b_lt_2.clone().or(b_lt_2.clone()); /// /// // (b < 2) OR (b < 2) --> (b < 2) - /// let expr = expr.simplify(&Info::default()).unwrap(); + /// let expr = simplifier.simplify(expr).unwrap(); /// assert_eq!(expr, b_lt_2); /// ``` - fn simplify(self, info: &S) -> Result { - let mut rewriter = Simplifier::new(info); - let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?; + pub fn simplify(&self, expr: Expr) -> Result { + let mut rewriter = Simplifier::new(&self.info); + let mut const_evaluator = ConstEvaluator::try_new(self.info.execution_props())?; // TODO iterate until no changes are made during rewrite // (evaluating constants can enable new simplifications and // simplifications can enable new constant evaluation) // https://github.com/apache/arrow-datafusion/issues/1160 - self.rewrite(&mut const_evaluator)?.rewrite(&mut rewriter) + expr.rewrite(&mut const_evaluator)?.rewrite(&mut rewriter) + } + + /// Apply type coercion to an [`Expr`] so that it can be + /// evaluated as a [`PhysicalExpr`](datafusion_physical_expr::PhysicalExpr). + /// + /// See the [type coercion module](datafusion_expr::type_coercion) + /// documentation for more details on type coercion + /// + // Would be nice if this API could use the SimplifyInfo + // rather than creating an DFSchemaRef coerces rather than doing + // it manually. + // TODO ticekt + pub fn coerce(&self, expr: Expr, schema: DFSchemaRef) -> Result { + let mut expr_rewrite = TypeCoercionRewriter { schema }; + + expr.rewrite(&mut expr_rewrite) + } +} + +/// Provides simplification information based on DFSchema and +/// [`ExecutionProps`]. This is the default implementation used by DataFusion +/// +/// For example: +/// ``` +/// use arrow::datatypes::{Schema, Field, DataType}; +/// use datafusion_expr::{col, lit}; +/// use datafusion_common::{DataFusionError, ToDFSchema}; +/// use datafusion_physical_expr::execution_props::ExecutionProps; +/// use datafusion_optimizer::expr_simplifier::{SimplifyContext, ExprSimplifier}; +/// +/// // Create the schema +/// let schema = Schema::new(vec![ +/// Field::new("i", DataType::Int64, false), +/// ]) +/// .to_dfschema_ref().unwrap(); +/// +/// // Create the simplifier +/// let props = ExecutionProps::new(); +/// let context = SimplifyContext::new(&props) +/// .with_schema(schema); +/// let simplifier = ExprSimplifier::new(context); +/// +/// // Use the simplifier +/// +/// // b < 2 or (1 > 3) +/// let expr = col("b").lt(lit(2)).or(lit(1).gt(lit(3))); +/// +/// // b < 2 +/// let simplified = simplifier.simplify(expr).unwrap(); +/// assert_eq!(simplified, col("b").lt(lit(2))); +/// ``` +pub struct SimplifyContext<'a> { + schemas: Vec, + props: &'a ExecutionProps, +} + +impl<'a> SimplifyContext<'a> { + /// Create a new SimplifyContext + pub fn new(props: &'a ExecutionProps) -> Self { + Self { + schemas: vec![], + props, + } + } + + /// Register a [`DFSchemaRef`] with this context + pub fn with_schema(mut self, schema: DFSchemaRef) -> Self { + self.schemas.push(schema); + self + } +} + +impl<'a> SimplifyInfo for SimplifyContext<'a> { + /// returns true if this Expr has boolean type + fn is_boolean_type(&self, expr: &Expr) -> Result { + for schema in &self.schemas { + if let Ok(DataType::Boolean) = expr.get_type(schema) { + return Ok(true); + } + } + + Ok(false) + } + /// Returns true if expr is nullable + fn nullable(&self, expr: &Expr) -> Result { + self.schemas + .iter() + .find_map(|schema| { + // expr may be from another input, so ignore errors + // by converting to None to keep trying + expr.nullable(schema.as_ref()).ok() + }) + .ok_or_else(|| { + // This means we weren't able to compute `Expr::nullable` with + // *any* input schemas, signalling a problem + DataFusionError::Internal(format!( + "Could not find columns in '{}' during simplify", + expr + )) + }) + } + + fn execution_props(&self) -> &ExecutionProps { + self.props + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{Field, Schema}; + use datafusion_common::ToDFSchema; + use datafusion_expr::{col, lit}; + + #[test] + fn api_basic() { + let props = ExecutionProps::new(); + let simplifier = + ExprSimplifier::new(SimplifyContext::new(&props).with_schema(test_schema())); + + let expr = lit(1) + lit(2); + let expected = lit(3); + assert_eq!(expected, simplifier.simplify(expr).unwrap()); + } + + #[test] + fn basic_coercion() { + let schema = test_schema(); + let props = ExecutionProps::new(); + let simplifier = + ExprSimplifier::new(SimplifyContext::new(&props).with_schema(schema.clone())); + + // Note expr type is int32 (not int64) + // (1i64 + 2i32) < i + let expr = (lit(1i64) + lit(2i32)).lt(col("i")); + // should fully simplify to 3 < i (though i has been coerced to i64) + let expected = lit(3i64).lt(col("i")); + + // Would be nice if this API could use the SimplifyInfo + // rather than creating an DFSchemaRef coerces rather than doing + // it manually. + // TODO ticekt + let expr = simplifier.coerce(expr, schema).unwrap(); + + assert_eq!(expected, simplifier.simplify(expr).unwrap()); + } + + fn test_schema() -> DFSchemaRef { + Schema::new(vec![ + Field::new("i", DataType::Int64, false), + Field::new("b", DataType::Boolean, true), + ]) + .to_dfschema_ref() + .unwrap() } } diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index bb17a9925cd28..302a90fe9e203 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -15,73 +15,25 @@ // specific language governing permissions and limitations // under the License. -//! Simplify expressions optimizer rule +//! Simplify expressions optimizer rule and implementation -use crate::expr_simplifier::ExprSimplifiable; +use crate::expr_simplifier::{ExprSimplifier, SimplifyContext}; use crate::{expr_simplifier::SimplifyInfo, OptimizerConfig, OptimizerRule}; use arrow::array::new_null_array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue}; +use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue}; use datafusion_expr::{ expr_fn::{and, or}, expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}, lit, logical_plan::LogicalPlan, utils::from_plan, - BuiltinScalarFunction, ColumnarValue, Expr, ExprSchemable, Operator, Volatility, + BuiltinScalarFunction, ColumnarValue, Expr, Operator, Volatility, }; use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; -/// Provides simplification information based on schema and properties -pub(crate) struct SimplifyContext<'a, 'b> { - schemas: Vec<&'a DFSchemaRef>, - props: &'b ExecutionProps, -} - -impl<'a, 'b> SimplifyContext<'a, 'b> { - /// Create a new SimplifyContext - pub fn new(schemas: Vec<&'a DFSchemaRef>, props: &'b ExecutionProps) -> Self { - Self { schemas, props } - } -} - -impl<'a, 'b> SimplifyInfo for SimplifyContext<'a, 'b> { - /// returns true if this Expr has boolean type - fn is_boolean_type(&self, expr: &Expr) -> Result { - for schema in &self.schemas { - if let Ok(DataType::Boolean) = expr.get_type(schema) { - return Ok(true); - } - } - - Ok(false) - } - /// Returns true if expr is nullable - fn nullable(&self, expr: &Expr) -> Result { - self.schemas - .iter() - .find_map(|schema| { - // expr may be from another input, so ignore errors - // by converting to None to keep trying - expr.nullable(schema.as_ref()).ok() - }) - .ok_or_else(|| { - // This means we weren't able to compute `Expr::nullable` with - // *any* input schemas, signalling a problem - DataFusionError::Internal(format!( - "Could not find columns in '{}' during simplify", - expr - )) - }) - } - - fn execution_props(&self) -> &ExecutionProps { - self.props - } -} - /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting /// [`Expr`]`s evaluating constants and applying algebraic /// simplifications @@ -292,7 +244,14 @@ impl SimplifyExpressions { // projected columns. With just the projected schema, it's not possible to infer types for // expressions that references non-projected columns within the same project plan or its // children plans. - let info = SimplifyContext::new(plan.all_schemas(), execution_props); + let info = plan + .all_schemas() + .into_iter() + .fold(SimplifyContext::new(execution_props), |context, schema| { + context.with_schema(schema.clone()) + }); + + let simplifier = ExprSimplifier::new(info); let new_inputs = plan .inputs() @@ -309,7 +268,7 @@ impl SimplifyExpressions { let name = &e.name(); // Apply the actual simplification logic - let new_e = e.simplify(&info)?; + let new_e = simplifier.simplify(e)?; let new_name = &new_e.name(); @@ -950,30 +909,12 @@ macro_rules! assert_contains { }; } -/// Apply simplification and constant propagation to ([Expr]). -/// -/// # Arguments -/// -/// * `expr` - The logical expression -/// * `schema` - The DataFusion schema for the expr, used to resolve `Column` references -/// to qualified or unqualified fields by name. -/// * `props` - The Arrow schema for the input, used for determining expression data types -/// when performing type coercion. -pub fn simplify_expr( - expr: Expr, - schema: &DFSchemaRef, - props: &ExecutionProps, -) -> Result { - let info = SimplifyContext::new(vec![schema], props); - expr.simplify(&info) -} - #[cfg(test)] mod tests { use super::*; use arrow::array::{ArrayRef, Int32Array}; use chrono::{DateTime, TimeZone, Utc}; - use datafusion_common::{DFField, ToDFSchema}; + use datafusion_common::{DFField, DFSchemaRef}; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ and, binary_expr, call_fn, col, create_udf, lit, lit_timestamp_nano, @@ -1573,8 +1514,10 @@ mod tests { fn simplify(expr: Expr) -> Expr { let schema = expr_test_schema(); let execution_props = ExecutionProps::new(); - let info = SimplifyContext::new(vec![&schema], &execution_props); - expr.simplify(&info).unwrap() + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&execution_props).with_schema(schema), + ); + simplifier.simplify(expr).unwrap() } fn expr_test_schema() -> DFSchemaRef { @@ -2571,26 +2514,4 @@ mod tests { assert_optimized_plan_eq(&plan, expected); } - - #[test] - fn simplify_expr_api_test() { - let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]) - .to_dfschema_ref() - .unwrap(); - let props = ExecutionProps::new(); - - // x + (1 + 3) -> x + 4 - { - let expr = col("x") + (lit(1) + lit(3)); - let simplifed_expr = simplify_expr(expr, &schema, &props).unwrap(); - assert_eq!(simplifed_expr, col("x") + lit(4)); - } - - // x * 1 -> x - { - let expr = col("x") * lit(1); - let simplifed_expr = simplify_expr(expr, &schema, &props).unwrap(); - assert_eq!(simplifed_expr, col("x")); - } - } } diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index f0470da8710c5..db8ae8fb525ca 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -120,7 +120,7 @@ fn optimize_internal( from_plan(plan, &new_expr, &new_inputs) } -struct TypeCoercionRewriter { +pub(crate) struct TypeCoercionRewriter { pub(crate) schema: DFSchemaRef, } From 5e1fd478e3a618673d6e04428b4be4ff1e7313ee Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 11 Oct 2022 06:35:09 -0400 Subject: [PATCH 2/2] Add ticket reference --- datafusion/optimizer/src/expr_simplifier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs index 72bffb8ff230a..b75e8608181b3 100644 --- a/datafusion/optimizer/src/expr_simplifier.rs +++ b/datafusion/optimizer/src/expr_simplifier.rs @@ -129,7 +129,7 @@ impl ExprSimplifier { // Would be nice if this API could use the SimplifyInfo // rather than creating an DFSchemaRef coerces rather than doing // it manually. - // TODO ticekt + // https://github.com/apache/arrow-datafusion/issues/3793 pub fn coerce(&self, expr: Expr, schema: DFSchemaRef) -> Result { let mut expr_rewrite = TypeCoercionRewriter { schema }; @@ -259,7 +259,7 @@ mod tests { // Would be nice if this API could use the SimplifyInfo // rather than creating an DFSchemaRef coerces rather than doing // it manually. - // TODO ticekt + // https://github.com/apache/arrow-datafusion/issues/3793 let expr = simplifier.coerce(expr, schema).unwrap(); assert_eq!(expected, simplifier.simplify(expr).unwrap());