From 64e8559c81463d64eb76e7d9544b8908062f810d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 08:15:12 -0600 Subject: [PATCH 01/10] Make Like a top-level Expr --- datafusion/core/src/execution/context.rs | 3 + datafusion/core/src/physical_plan/planner.rs | 16 +- datafusion/core/tests/sql/mod.rs | 27 +-- datafusion/expr/src/binary_rule.rs | 6 +- datafusion/expr/src/expr.rs | 15 +- datafusion/expr/src/expr_fn.rs | 24 ++- datafusion/expr/src/operator.rs | 8 - .../optimizer/src/simplify_expressions.rs | 16 +- .../physical-expr/src/expressions/binary.rs | 41 +---- .../physical-expr/src/expressions/like.rs | 167 ++++++++++++++++++ .../physical-expr/src/expressions/mod.rs | 1 + datafusion/physical-expr/src/planner.rs | 33 +++- datafusion/proto/proto/datafusion.proto | 1 - datafusion/proto/src/from_proto.rs | 2 - datafusion/proto/src/lib.rs | 54 ++++++ datafusion/sql/src/planner.rs | 41 ++--- 16 files changed, 360 insertions(+), 95 deletions(-) create mode 100644 datafusion/physical-expr/src/expressions/like.rs diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index a6a508d368437..82ff8ec720ea1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1439,6 +1439,8 @@ impl SessionState { rules.push(Arc::new(TypeCoercion::new())); rules.push(Arc::new(LimitPushDown::new())); rules.push(Arc::new(SingleDistinctToGroupBy::new())); + //TODO add a config so we can turn this off since it is new + rules.push(Arc::new(TypeCoercion::default())); let mut physical_optimizers: Vec> = vec![ Arc::new(AggregateStatistics::new()), @@ -1589,6 +1591,7 @@ impl SessionState { ) -> Result> { let planner = self.query_planner.clone(); let logical_plan = self.optimize(logical_plan)?; + println!("optimized plan [2]: {:?}", logical_plan); planner.create_physical_plan(&logical_plan, self).await } } diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index a2d8683618aad..df1fe001b79aa 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1682,6 +1682,8 @@ mod tests { use datafusion_expr::expr::GroupingSet; use datafusion_expr::sum; use datafusion_expr::{col, lit}; + use datafusion_optimizer::type_coercion::TypeCoercion; + use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; use fmt::Debug; use std::collections::HashMap; use std::convert::TryFrom; @@ -1859,7 +1861,19 @@ mod tests { col("c1").like(col("c2")), ]; for case in cases { - let logical_plan = test_csv_scan().await?.project(vec![case.clone()]); + let logical_plan = test_csv_scan() + .await? + .project(vec![case.clone()]) + .and_then(|b| b.build()) + .and_then(|plan| { + // this test was expecting type coercion/validation errors before the optimizer + // had run due to the legacy approach of type coercion but now we need to run the + // optimizer here + let type_coercion = TypeCoercion::default(); + let mut config = OptimizerConfig::new(); + type_coercion.optimize(&plan, &mut config) + }); + let message = format!( "Expression {:?} expected to error due to impossible coercion", case diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index ef4b4386e0a81..e1c0f66d17419 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -751,29 +751,32 @@ async fn try_execute_to_batches( /// Execute query and return results as a Vec of RecordBatches async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec { let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx + let logical_plan = ctx .create_logical_plan(sql) .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let logical_schema = plan.schema(); + let logical_schema = logical_plan.schema(); - let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx - .optimize(&plan) + let msg = format!("Optimizing logical plan for '{}': {:?}", sql, logical_plan); + let optimized_logical_plan = ctx + .optimize(&logical_plan) .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let optimized_logical_schema = plan.schema(); - - let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx - .create_physical_plan(&plan) + println!("optimized plan [1]: {:?}", optimized_logical_plan); + let optimized_logical_schema = optimized_logical_plan.schema(); + + // creating a physical plan will call `optimize` again so we pass in the + // unoptimized logical plan here + let msg = format!("Creating physical plan for '{}': {:?}", sql, logical_plan); + let physical_plan = ctx + .create_physical_plan(&logical_plan) .await .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); + let msg = format!("Executing physical plan for '{}': {:?}", sql, physical_plan); let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx) + let results = collect(physical_plan, task_ctx) .await .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index d0057a1c94bba..b719a7c9572aa 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -42,8 +42,6 @@ pub fn binary_operator_data_type( | Operator::NotEq | Operator::And | Operator::Or - | Operator::Like - | Operator::NotLike | Operator::Lt | Operator::Gt | Operator::GtEq @@ -95,8 +93,6 @@ pub fn coerce_types( | Operator::Gt | Operator::GtEq | Operator::LtEq => comparison_coercion(lhs_type, rhs_type), - // "like" operators operate on strings and always return a boolean - Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type), // date +/- interval returns date Operator::Plus | Operator::Minus if (*lhs_type == DataType::Date32 @@ -504,7 +500,7 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option /// coercion rules for like operations. /// This is a union of string coercion rules and dictionary coercion rules -fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { +pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { string_coercion(lhs_type, rhs_type) .or_else(|| dictionary_coercion(lhs_type, rhs_type, false)) .or_else(|| null_coercion(lhs_type, rhs_type)) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 6226887e80d0e..c398fe515b8a8 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -444,12 +444,22 @@ impl Expr { /// Return `self LIKE other` pub fn like(self, other: Expr) -> Expr { - binary_expr(self, Operator::Like, other) + Expr::Like { + negated: false, + expr: Box::new(self), + pattern: Box::new(other), + escape_char: None, + } } /// Return `self NOT LIKE other` pub fn not_like(self, other: Expr) -> Expr { - binary_expr(self, Operator::NotLike, other) + Expr::Like { + negated: true, + expr: Box::new(self), + pattern: Box::new(other), + escape_char: None, + } } /// Return `self AS name` alias expression @@ -505,6 +515,7 @@ impl Not for Expr { type Output = Self; fn not(self) -> Self::Output { + // TODO file issue for extending this to other similar expressions match self { Expr::Like { negated, diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 8b0f1646608dc..768f8d798e621 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -21,11 +21,12 @@ use crate::expr::GroupingSet; use crate::{ aggregate_function, built_in_function, conditional_expressions::CaseBuilder, lit, logical_plan::Subquery, AccumulatorFunctionImplementation, AggregateUDF, - BuiltinScalarFunction, Expr, LogicalPlan, Operator, ReturnTypeFunction, - ScalarFunctionImplementation, ScalarUDF, Signature, StateTypeFunction, Volatility, + BuiltinScalarFunction, Expr, ExprSchemable, LogicalPlan, Operator, + ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, + StateTypeFunction, Volatility, }; use arrow::datatypes::DataType; -use datafusion_common::Result; +use datafusion_common::{DFSchema, Result}; use std::sync::Arc; /// Create a column expression based on a qualified or unqualified column name @@ -259,6 +260,23 @@ pub fn cast(expr: Expr, data_type: DataType) -> Expr { } } +/// Create a cast expression +pub fn cast_if_needed( + expr: Expr, + data_type: &DataType, + input_schema: &DFSchema, +) -> Result { + let t = expr.get_type(input_schema)?; + if &t == data_type { + Ok(expr) + } else { + Ok(Expr::Cast { + expr: Box::new(expr), + data_type: data_type.clone(), + }) + } +} + /// Create an convenience function representing a unary scalar function macro_rules! unary_scalar_expr { ($ENUM:ident, $FUNC:ident, $DOC:expr) => { diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index a5b752c643779..558a27919a763 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -51,10 +51,6 @@ pub enum Operator { And, /// Logical OR, like `||` Or, - /// Matches a wildcard pattern - Like, - /// Does not match a wildcard pattern - NotLike, /// IS DISTINCT FROM IsDistinctFrom, /// IS NOT DISTINCT FROM @@ -90,8 +86,6 @@ impl Operator { Operator::LtEq => Some(Operator::Gt), Operator::Gt => Some(Operator::LtEq), Operator::GtEq => Some(Operator::Lt), - Operator::Like => Some(Operator::NotLike), - Operator::NotLike => Some(Operator::Like), Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), Operator::Plus @@ -130,8 +124,6 @@ impl fmt::Display for Operator { Operator::Modulo => "%", Operator::And => "AND", Operator::Or => "OR", - Operator::Like => "LIKE", - Operator::NotLike => "NOT LIKE", Operator::RegexMatch => "~", Operator::RegexIMatch => "~*", Operator::RegexNotMatch => "!~", diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 1c826d7c39d58..dd84b963850d3 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -201,6 +201,18 @@ fn negate_clause(expr: Expr) -> Expr { }; } match op { + // not (A is distinct from B) ===> (A is not distinct from B) + Operator::IsDistinctFrom => Expr::BinaryExpr { + left, + op: Operator::IsNotDistinctFrom, + right, + }, + // not (A is not distinct from B) ===> (A is distinct from B) + Operator::IsNotDistinctFrom => Expr::BinaryExpr { + left, + op: Operator::IsDistinctFrom, + right, + }, // not (A and B) ===> (not A) or (not B) Operator::And => { let left = negate_clause(*left); @@ -2210,7 +2222,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: #test.a NOT LIKE #test.b AS NOT test.a LIKE test.b\ + let expected = "Filter: #test.a NOT LIKE #test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2232,7 +2244,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: #test.a LIKE #test.b AS NOT test.a NOT LIKE test.b\ + let expected = "Filter: #test.a LIKE #test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 8323aa5abbe69..506c632046807 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -28,6 +28,8 @@ use arrow::compute::kernels::arithmetic::{ multiply_scalar, subtract, subtract_scalar, }; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; +use arrow::compute::kernels::comparison::regexp_is_match_utf8; +use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar; use arrow::compute::kernels::comparison::{ eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar, lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar, @@ -47,10 +49,6 @@ use arrow::compute::kernels::comparison::{ use arrow::compute::kernels::comparison::{ eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar, }; -use arrow::compute::kernels::comparison::{like_utf8, nlike_utf8, regexp_is_match_utf8}; -use arrow::compute::kernels::comparison::{ - like_utf8_scalar, nlike_utf8_scalar, regexp_is_match_utf8_scalar, -}; use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; @@ -323,19 +321,6 @@ macro_rules! compute_op { }}; } -macro_rules! binary_string_array_op_scalar { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ - let result: Result> = match $LEFT.data_type() { - DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), - other => Err(DataFusionError::Internal(format!( - "Data type {:?} not supported for scalar operation '{}' on string array", - other, stringify!($OP) - ))), - }; - Some(result) - }}; -} - macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -623,7 +608,7 @@ impl PhysicalExpr for BinaryExpr { } /// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel -fn unwrap_dict_value(v: ScalarValue) -> ScalarValue { +pub fn unwrap_dict_value(v: ScalarValue) -> ScalarValue { if let ScalarValue::Dictionary(_key_type, v) = v { unwrap_dict_value(*v) } else { @@ -713,12 +698,6 @@ impl BinaryExpr { Operator::NotEq => { binary_array_op_dyn_scalar!(array, scalar.clone(), neq, bool_type) } - Operator::Like => { - binary_string_array_op_scalar!(array, scalar.clone(), like, bool_type) - } - Operator::NotLike => { - binary_string_array_op_scalar!(array, scalar.clone(), nlike, bool_type) - } Operator::Plus => { binary_primitive_array_op_scalar!(array, scalar.clone(), add) } @@ -818,8 +797,6 @@ impl BinaryExpr { right_data_type: &DataType, ) -> Result { match &self.op { - Operator::Like => binary_string_array_op!(left, right, like), - Operator::NotLike => binary_string_array_op!(left, right, nlike), Operator::Lt => lt_dyn(&left, &right), Operator::LtEq => lt_eq_dyn(&left, &right), Operator::Gt => gt_dyn(&left, &right), @@ -1115,18 +1092,6 @@ mod tests { DataType::Float32, vec![2f32] ); - test_coercion!( - StringArray, - DataType::Utf8, - vec!["hello world", "world"], - StringArray, - DataType::Utf8, - vec!["%hello%", "%hello%"], - Operator::Like, - BooleanArray, - DataType::Boolean, - vec![true, false] - ); test_coercion!( StringArray, DataType::Utf8, diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs new file mode 100644 index 0000000000000..d2815b4bf34bd --- /dev/null +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! LIKE and NOT LIKE expression + +use arrow::array::*; +use arrow::compute::kernels::comparison::*; // TODO import specific items +use arrow::{ + datatypes::{DataType, Schema}, + record_batch::RecordBatch, +}; +use std::{any::Any, sync::Arc}; + +use crate::PhysicalExpr; +use datafusion_common::DataFusionError; +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; + +/// IS NULL expression +#[derive(Debug)] +pub struct LikeExpr { + /// negated + negated: bool, + /// Input expression + expr: Arc, + /// Regex pattern + pattern: Arc, +} + +impl LikeExpr { + /// Create new not expression + pub fn new( + negated: bool, + expr: Arc, + pattern: Arc, + ) -> Self { + Self { + negated, + expr, + pattern, + } + } + + /// Get the input expression + pub fn expr(&self) -> &Arc { + &self.expr + } + + /// Get the pattern expression + pub fn pattern(&self) -> &Arc { + &self.pattern + } +} + +impl std::fmt::Display for LikeExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + if self.negated { + write!(f, "{} NOT LIKE", self.expr) + } else { + write!(f, "{} LIKE", self.expr) + } + } +} + +macro_rules! binary_string_array_op_scalar { + ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ + let result: Result> = match $LEFT.data_type() { + DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), + other => Err(DataFusionError::Internal(format!( + "Data type {:?} not supported for scalar operation '{}' on string array", + other, stringify!($OP) + ))), + }; + Some(result) + }}; +} + +impl PhysicalExpr for LikeExpr { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + self.expr.nullable(_input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + println!("LIKE evaluate"); + + let inputs = self.expr.evaluate(batch)?; + let pattern = self.pattern.evaluate(batch)?; + match inputs { + ColumnarValue::Array(array) => match pattern { + ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( + "LIKE: non-scalar pattern not supported".to_string(), + )), + ColumnarValue::Scalar(scalar) => foo(array, scalar, self.negated), + }, + ColumnarValue::Scalar(data) => match pattern { + ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( + "LIKE: non-scalar pattern not supported".to_string(), + )), + ColumnarValue::Scalar(pattern) => { + foo(data.to_array(), pattern, self.negated) + } + }, + } + } +} + +//TODO escape_char +fn foo(data: ArrayRef, pattern: ScalarValue, negated: bool) -> Result { + let arc: Arc = Arc::new(if negated { + binary_string_array_op_scalar!(data, pattern, nlike, &DataType::Boolean) + //TODO unwraps + .unwrap() + .unwrap() + } else { + binary_string_array_op_scalar!( + data, + pattern, + like, //TODO what aboue like_utf8_scalar, nlike_utf8_scalar, ? + &DataType::Boolean + ) + //TODO unwraps + .unwrap() + .unwrap() + }); + Ok(ColumnarValue::Array(arc)) +} +/// Create a LIKE expression +pub fn like( + expr: Arc, + pattern: Arc, +) -> Result> { + Ok(Arc::new(LikeExpr::new(false, expr, pattern))) +} + +/// Create a NOT LIKE expression +pub fn not_like( + expr: Arc, + pattern: Arc, +) -> Result> { + Ok(Arc::new(LikeExpr::new(true, expr, pattern))) +} + +// TODO unit tests diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 00bf6aafade46..fed6082819429 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -28,6 +28,7 @@ mod get_indexed_field; mod in_list; mod is_not_null; mod is_null; +pub mod like; //TODO re-export what we need mod literal; mod negative; mod not; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index c344982b33796..305541ec8ffe9 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -20,7 +20,9 @@ use crate::var_provider::is_system_variables; use crate::{ execution_props::ExecutionProps, expressions::{ - self, binary, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, + self, binary, + like::{like, not_like}, + Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, }, functions, udf, var_provider::VarType, @@ -202,6 +204,35 @@ pub fn create_physical_expr( } } } + Expr::Like { + negated, + expr, + pattern, + escape_char, + } => { + if escape_char.is_some() { + return Err(DataFusionError::Execution( + "LIKE does not support escape_char".to_string(), + )); + } + let lhs = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let rhs = create_physical_expr( + pattern, + input_dfschema, + input_schema, + execution_props, + )?; + if *negated { + not_like(lhs, rhs) + } else { + like(lhs, rhs) + } + } Expr::Case { expr, when_then_expr, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 69e1617f39f93..46cd5836d4f5b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -315,7 +315,6 @@ message LogicalExprNode { LikeNode like = 31; ILikeNode ilike = 32; SimilarToNode similar_to = 33; - } } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 0a4e0e68bbe1a..0b57aa9dd9098 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -1524,8 +1524,6 @@ fn from_proto_binary_op(op: &str) -> Result { "Multiply" => Ok(Operator::Multiply), "Divide" => Ok(Operator::Divide), "Modulo" => Ok(Operator::Modulo), - "Like" => Ok(Operator::Like), - "NotLike" => Ok(Operator::NotLike), other => Err(proto_error(format!( "Unsupported binary operator '{:?}'", other diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 7e009a846e698..7c8a760744a64 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -929,6 +929,60 @@ mod roundtrip_tests { roundtrip_expr_test(test_expr, ctx); } + #[test] + fn roundtrip_like() { + fn like(negated: bool, escape_char: Option) { + let test_expr = Expr::Like { + negated, + expr: Box::new(col("col")), + pattern: Box::new(lit("[0-9]+")), + escape_char, + }; + let ctx = SessionContext::new(); + roundtrip_expr_test(test_expr, ctx); + } + like(true, Some('X')); + like(false, Some('\\')); + like(true, None); + like(false, None); + } + + #[test] + fn roundtrip_ilike() { + fn ilike(negated: bool, escape_char: Option) { + let test_expr = Expr::ILike { + negated, + expr: Box::new(col("col")), + pattern: Box::new(lit("[0-9]+")), + escape_char, + }; + let ctx = SessionContext::new(); + roundtrip_expr_test(test_expr, ctx); + } + ilike(true, Some('X')); + ilike(false, Some('\\')); + ilike(true, None); + ilike(false, None); + } + + #[test] + fn roundtrip_similar_to() { + fn similar_to(negated: bool, escape_char: Option) { + let test_expr = Expr::SimilarTo { + negated, + expr: Box::new(col("col")), + pattern: Box::new(lit("[0-9]+")), + escape_char, + }; + let ctx = SessionContext::new(); + roundtrip_expr_test(test_expr, ctx); + } + similar_to(true, Some('X')); + similar_to(false, Some('\\')); + similar_to(true, None); + similar_to(false, None); + } + #[test] fn roundtrip_count() { let test_expr = Expr::AggregateFunction { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 5cc388ba01a43..6817639dcda91 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -1940,30 +1940,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Like { negated, expr, pattern, escape_char } => { - match escape_char { - Some(_) => { - // to support this we will need to introduce `Expr::Like` instead - // of treating it like a binary expression - Err(DataFusionError::NotImplemented("LIKE with ESCAPE is not yet supported".to_string())) - }, - _ => { - Ok(Expr::BinaryExpr { - left: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), - op: if negated { Operator::NotLike } else { Operator::Like }, - right: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), - }) - } - } + Ok(Expr::Like { + negated, + expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), + pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + escape_char + + }) } - SQLExpr::ILike { .. } => { - // https://github.com/apache/arrow-datafusion/issues/3099 - Err(DataFusionError::NotImplemented("ILIKE is not yet supported".to_string())) + SQLExpr::ILike { negated, expr, pattern, escape_char } => { + Ok(Expr::ILike { + negated, + expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), + pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + escape_char + }) } - SQLExpr::SimilarTo { .. } => { - // https://github.com/apache/arrow-datafusion/issues/3099 - Err(DataFusionError::NotImplemented("SIMILAR TO is not yet supported".to_string())) + SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => { + Ok(Expr::SimilarTo { + negated, + expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), + pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + escape_char + }) } SQLExpr::BinaryOp { From d1d6655de65318401aae3927d6d9f16b31c208cf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 08:18:30 -0600 Subject: [PATCH 02/10] revert some changes --- datafusion/core/src/execution/context.rs | 3 --- datafusion/core/tests/sql/mod.rs | 27 +++++++++++------------- datafusion/expr/src/expr_fn.rs | 24 +++------------------ 3 files changed, 15 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 82ff8ec720ea1..a6a508d368437 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1439,8 +1439,6 @@ impl SessionState { rules.push(Arc::new(TypeCoercion::new())); rules.push(Arc::new(LimitPushDown::new())); rules.push(Arc::new(SingleDistinctToGroupBy::new())); - //TODO add a config so we can turn this off since it is new - rules.push(Arc::new(TypeCoercion::default())); let mut physical_optimizers: Vec> = vec![ Arc::new(AggregateStatistics::new()), @@ -1591,7 +1589,6 @@ impl SessionState { ) -> Result> { let planner = self.query_planner.clone(); let logical_plan = self.optimize(logical_plan)?; - println!("optimized plan [2]: {:?}", logical_plan); planner.create_physical_plan(&logical_plan, self).await } } diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index e1c0f66d17419..ef4b4386e0a81 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -751,32 +751,29 @@ async fn try_execute_to_batches( /// Execute query and return results as a Vec of RecordBatches async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec { let msg = format!("Creating logical plan for '{}'", sql); - let logical_plan = ctx + let plan = ctx .create_logical_plan(sql) .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let logical_schema = logical_plan.schema(); + let logical_schema = plan.schema(); - let msg = format!("Optimizing logical plan for '{}': {:?}", sql, logical_plan); - let optimized_logical_plan = ctx - .optimize(&logical_plan) + let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); + let plan = ctx + .optimize(&plan) .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - println!("optimized plan [1]: {:?}", optimized_logical_plan); - let optimized_logical_schema = optimized_logical_plan.schema(); - - // creating a physical plan will call `optimize` again so we pass in the - // unoptimized logical plan here - let msg = format!("Creating physical plan for '{}': {:?}", sql, logical_plan); - let physical_plan = ctx - .create_physical_plan(&logical_plan) + let optimized_logical_schema = plan.schema(); + + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); + let plan = ctx + .create_physical_plan(&plan) .await .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); - let msg = format!("Executing physical plan for '{}': {:?}", sql, physical_plan); + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); let task_ctx = ctx.task_ctx(); - let results = collect(physical_plan, task_ctx) + let results = collect(plan, task_ctx) .await .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 768f8d798e621..8b0f1646608dc 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -21,12 +21,11 @@ use crate::expr::GroupingSet; use crate::{ aggregate_function, built_in_function, conditional_expressions::CaseBuilder, lit, logical_plan::Subquery, AccumulatorFunctionImplementation, AggregateUDF, - BuiltinScalarFunction, Expr, ExprSchemable, LogicalPlan, Operator, - ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, - StateTypeFunction, Volatility, + BuiltinScalarFunction, Expr, LogicalPlan, Operator, ReturnTypeFunction, + ScalarFunctionImplementation, ScalarUDF, Signature, StateTypeFunction, Volatility, }; use arrow::datatypes::DataType; -use datafusion_common::{DFSchema, Result}; +use datafusion_common::Result; use std::sync::Arc; /// Create a column expression based on a qualified or unqualified column name @@ -260,23 +259,6 @@ pub fn cast(expr: Expr, data_type: DataType) -> Expr { } } -/// Create a cast expression -pub fn cast_if_needed( - expr: Expr, - data_type: &DataType, - input_schema: &DFSchema, -) -> Result { - let t = expr.get_type(input_schema)?; - if &t == data_type { - Ok(expr) - } else { - Ok(Expr::Cast { - expr: Box::new(expr), - data_type: data_type.clone(), - }) - } -} - /// Create an convenience function representing a unary scalar function macro_rules! unary_scalar_expr { ($ENUM:ident, $FUNC:ident, $DOC:expr) => { From bd14a93ef046e69d02a32683109cead5451f58b7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 08:41:30 -0600 Subject: [PATCH 03/10] add type validation --- datafusion/optimizer/src/type_coercion.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index d9f16159926f5..1d9b6d3157813 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -19,7 +19,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use arrow::datatypes::DataType; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::binary_rule::coerce_types; use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; use datafusion_expr::logical_plan::builder::build_join_schema; @@ -116,6 +116,15 @@ impl ExprRewriter for TypeCoercionRewriter { } } } + Expr::Like { pattern, .. } + | Expr::ILike { pattern, .. } + | Expr::SimilarTo { pattern, .. } => match pattern.get_type(&self.schema)? { + DataType::Utf8 => Ok(expr), + other => Err(DataFusionError::Plan(format!( + "Expected pattern in Like, ILike, or SimilarTo to be Utf8 but was {}", + other + ))), + }, _ => Ok(expr), } } From 41f7db10587302974583f3e41ae684b11de8f841 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 11:35:10 -0600 Subject: [PATCH 04/10] Revert physical plan changes and reduce scope of the PR --- datafusion/expr/src/binary_rule.rs | 4 + datafusion/expr/src/operator.rs | 8 + .../physical-expr/src/expressions/binary.rs | 41 ++++- .../physical-expr/src/expressions/like.rs | 167 ------------------ .../physical-expr/src/expressions/mod.rs | 1 - datafusion/physical-expr/src/planner.rs | 27 +-- 6 files changed, 58 insertions(+), 190 deletions(-) delete mode 100644 datafusion/physical-expr/src/expressions/like.rs diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index b719a7c9572aa..f043979c15382 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -42,6 +42,8 @@ pub fn binary_operator_data_type( | Operator::NotEq | Operator::And | Operator::Or + | Operator::Like + | Operator::NotLike | Operator::Lt | Operator::Gt | Operator::GtEq @@ -93,6 +95,8 @@ pub fn coerce_types( | Operator::Gt | Operator::GtEq | Operator::LtEq => comparison_coercion(lhs_type, rhs_type), + // "like" operators operate on strings and always return a boolean + Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type), // date +/- interval returns date Operator::Plus | Operator::Minus if (*lhs_type == DataType::Date32 diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 558a27919a763..a5b752c643779 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -51,6 +51,10 @@ pub enum Operator { And, /// Logical OR, like `||` Or, + /// Matches a wildcard pattern + Like, + /// Does not match a wildcard pattern + NotLike, /// IS DISTINCT FROM IsDistinctFrom, /// IS NOT DISTINCT FROM @@ -86,6 +90,8 @@ impl Operator { Operator::LtEq => Some(Operator::Gt), Operator::Gt => Some(Operator::LtEq), Operator::GtEq => Some(Operator::Lt), + Operator::Like => Some(Operator::NotLike), + Operator::NotLike => Some(Operator::Like), Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), Operator::Plus @@ -124,6 +130,8 @@ impl fmt::Display for Operator { Operator::Modulo => "%", Operator::And => "AND", Operator::Or => "OR", + Operator::Like => "LIKE", + Operator::NotLike => "NOT LIKE", Operator::RegexMatch => "~", Operator::RegexIMatch => "~*", Operator::RegexNotMatch => "!~", diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 506c632046807..8323aa5abbe69 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -28,8 +28,6 @@ use arrow::compute::kernels::arithmetic::{ multiply_scalar, subtract, subtract_scalar, }; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; -use arrow::compute::kernels::comparison::regexp_is_match_utf8; -use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar; use arrow::compute::kernels::comparison::{ eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar, lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar, @@ -49,6 +47,10 @@ use arrow::compute::kernels::comparison::{ use arrow::compute::kernels::comparison::{ eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar, }; +use arrow::compute::kernels::comparison::{like_utf8, nlike_utf8, regexp_is_match_utf8}; +use arrow::compute::kernels::comparison::{ + like_utf8_scalar, nlike_utf8_scalar, regexp_is_match_utf8_scalar, +}; use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; @@ -321,6 +323,19 @@ macro_rules! compute_op { }}; } +macro_rules! binary_string_array_op_scalar { + ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ + let result: Result> = match $LEFT.data_type() { + DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), + other => Err(DataFusionError::Internal(format!( + "Data type {:?} not supported for scalar operation '{}' on string array", + other, stringify!($OP) + ))), + }; + Some(result) + }}; +} + macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -608,7 +623,7 @@ impl PhysicalExpr for BinaryExpr { } /// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel -pub fn unwrap_dict_value(v: ScalarValue) -> ScalarValue { +fn unwrap_dict_value(v: ScalarValue) -> ScalarValue { if let ScalarValue::Dictionary(_key_type, v) = v { unwrap_dict_value(*v) } else { @@ -698,6 +713,12 @@ impl BinaryExpr { Operator::NotEq => { binary_array_op_dyn_scalar!(array, scalar.clone(), neq, bool_type) } + Operator::Like => { + binary_string_array_op_scalar!(array, scalar.clone(), like, bool_type) + } + Operator::NotLike => { + binary_string_array_op_scalar!(array, scalar.clone(), nlike, bool_type) + } Operator::Plus => { binary_primitive_array_op_scalar!(array, scalar.clone(), add) } @@ -797,6 +818,8 @@ impl BinaryExpr { right_data_type: &DataType, ) -> Result { match &self.op { + Operator::Like => binary_string_array_op!(left, right, like), + Operator::NotLike => binary_string_array_op!(left, right, nlike), Operator::Lt => lt_dyn(&left, &right), Operator::LtEq => lt_eq_dyn(&left, &right), Operator::Gt => gt_dyn(&left, &right), @@ -1092,6 +1115,18 @@ mod tests { DataType::Float32, vec![2f32] ); + test_coercion!( + StringArray, + DataType::Utf8, + vec!["hello world", "world"], + StringArray, + DataType::Utf8, + vec!["%hello%", "%hello%"], + Operator::Like, + BooleanArray, + DataType::Boolean, + vec![true, false] + ); test_coercion!( StringArray, DataType::Utf8, diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs deleted file mode 100644 index d2815b4bf34bd..0000000000000 --- a/datafusion/physical-expr/src/expressions/like.rs +++ /dev/null @@ -1,167 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! LIKE and NOT LIKE expression - -use arrow::array::*; -use arrow::compute::kernels::comparison::*; // TODO import specific items -use arrow::{ - datatypes::{DataType, Schema}, - record_batch::RecordBatch, -}; -use std::{any::Any, sync::Arc}; - -use crate::PhysicalExpr; -use datafusion_common::DataFusionError; -use datafusion_common::Result; -use datafusion_common::ScalarValue; -use datafusion_expr::ColumnarValue; - -/// IS NULL expression -#[derive(Debug)] -pub struct LikeExpr { - /// negated - negated: bool, - /// Input expression - expr: Arc, - /// Regex pattern - pattern: Arc, -} - -impl LikeExpr { - /// Create new not expression - pub fn new( - negated: bool, - expr: Arc, - pattern: Arc, - ) -> Self { - Self { - negated, - expr, - pattern, - } - } - - /// Get the input expression - pub fn expr(&self) -> &Arc { - &self.expr - } - - /// Get the pattern expression - pub fn pattern(&self) -> &Arc { - &self.pattern - } -} - -impl std::fmt::Display for LikeExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - if self.negated { - write!(f, "{} NOT LIKE", self.expr) - } else { - write!(f, "{} LIKE", self.expr) - } - } -} - -macro_rules! binary_string_array_op_scalar { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ - let result: Result> = match $LEFT.data_type() { - DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), - other => Err(DataFusionError::Internal(format!( - "Data type {:?} not supported for scalar operation '{}' on string array", - other, stringify!($OP) - ))), - }; - Some(result) - }}; -} - -impl PhysicalExpr for LikeExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, _input_schema: &Schema) -> Result { - Ok(DataType::Boolean) - } - - fn nullable(&self, _input_schema: &Schema) -> Result { - self.expr.nullable(_input_schema) - } - - fn evaluate(&self, batch: &RecordBatch) -> Result { - println!("LIKE evaluate"); - - let inputs = self.expr.evaluate(batch)?; - let pattern = self.pattern.evaluate(batch)?; - match inputs { - ColumnarValue::Array(array) => match pattern { - ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( - "LIKE: non-scalar pattern not supported".to_string(), - )), - ColumnarValue::Scalar(scalar) => foo(array, scalar, self.negated), - }, - ColumnarValue::Scalar(data) => match pattern { - ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( - "LIKE: non-scalar pattern not supported".to_string(), - )), - ColumnarValue::Scalar(pattern) => { - foo(data.to_array(), pattern, self.negated) - } - }, - } - } -} - -//TODO escape_char -fn foo(data: ArrayRef, pattern: ScalarValue, negated: bool) -> Result { - let arc: Arc = Arc::new(if negated { - binary_string_array_op_scalar!(data, pattern, nlike, &DataType::Boolean) - //TODO unwraps - .unwrap() - .unwrap() - } else { - binary_string_array_op_scalar!( - data, - pattern, - like, //TODO what aboue like_utf8_scalar, nlike_utf8_scalar, ? - &DataType::Boolean - ) - //TODO unwraps - .unwrap() - .unwrap() - }); - Ok(ColumnarValue::Array(arc)) -} -/// Create a LIKE expression -pub fn like( - expr: Arc, - pattern: Arc, -) -> Result> { - Ok(Arc::new(LikeExpr::new(false, expr, pattern))) -} - -/// Create a NOT LIKE expression -pub fn not_like( - expr: Arc, - pattern: Arc, -) -> Result> { - Ok(Arc::new(LikeExpr::new(true, expr, pattern))) -} - -// TODO unit tests diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index fed6082819429..00bf6aafade46 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -28,7 +28,6 @@ mod get_indexed_field; mod in_list; mod is_not_null; mod is_null; -pub mod like; //TODO re-export what we need mod literal; mod negative; mod not; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 305541ec8ffe9..9c6670c630bfd 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -20,9 +20,7 @@ use crate::var_provider::is_system_variables; use crate::{ execution_props::ExecutionProps, expressions::{ - self, binary, - like::{like, not_like}, - Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, + self, binary, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, }, functions, udf, var_provider::VarType, @@ -215,23 +213,14 @@ pub fn create_physical_expr( "LIKE does not support escape_char".to_string(), )); } - let lhs = create_physical_expr( - expr, - input_dfschema, - input_schema, - execution_props, - )?; - let rhs = create_physical_expr( - pattern, - input_dfschema, - input_schema, - execution_props, - )?; - if *negated { - not_like(lhs, rhs) + let op = if *negated { + Operator::NotLike } else { - like(lhs, rhs) - } + Operator::Like + }; + let bin_expr = + binary_expr(expr.as_ref().clone(), op, pattern.as_ref().clone()); + create_physical_expr(&bin_expr, input_dfschema, input_schema, execution_props) } Expr::Case { expr, From 718c304ea985ddf1965c1fd335753f31efc11825 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 11:43:17 -0600 Subject: [PATCH 05/10] Revert more changes --- datafusion/expr/src/expr.rs | 14 ++------------ datafusion/optimizer/src/simplify_expressions.rs | 16 ++-------------- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/from_proto.rs | 2 ++ 4 files changed, 7 insertions(+), 26 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c398fe515b8a8..c88e7221df2f6 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -444,22 +444,12 @@ impl Expr { /// Return `self LIKE other` pub fn like(self, other: Expr) -> Expr { - Expr::Like { - negated: false, - expr: Box::new(self), - pattern: Box::new(other), - escape_char: None, - } + binary_expr(self, Operator::Like, other) } /// Return `self NOT LIKE other` pub fn not_like(self, other: Expr) -> Expr { - Expr::Like { - negated: true, - expr: Box::new(self), - pattern: Box::new(other), - escape_char: None, - } + binary_expr(self, Operator::NotLike, other) } /// Return `self AS name` alias expression diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index dd84b963850d3..1c826d7c39d58 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -201,18 +201,6 @@ fn negate_clause(expr: Expr) -> Expr { }; } match op { - // not (A is distinct from B) ===> (A is not distinct from B) - Operator::IsDistinctFrom => Expr::BinaryExpr { - left, - op: Operator::IsNotDistinctFrom, - right, - }, - // not (A is not distinct from B) ===> (A is distinct from B) - Operator::IsNotDistinctFrom => Expr::BinaryExpr { - left, - op: Operator::IsDistinctFrom, - right, - }, // not (A and B) ===> (not A) or (not B) Operator::And => { let left = negate_clause(*left); @@ -2222,7 +2210,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: #test.a NOT LIKE #test.b\ + let expected = "Filter: #test.a NOT LIKE #test.b AS NOT test.a LIKE test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2244,7 +2232,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: #test.a LIKE #test.b\ + let expected = "Filter: #test.a LIKE #test.b AS NOT test.a NOT LIKE test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 46cd5836d4f5b..69e1617f39f93 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -315,6 +315,7 @@ message LogicalExprNode { LikeNode like = 31; ILikeNode ilike = 32; SimilarToNode similar_to = 33; + } } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 0b57aa9dd9098..0a4e0e68bbe1a 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -1524,6 +1524,8 @@ fn from_proto_binary_op(op: &str) -> Result { "Multiply" => Ok(Operator::Multiply), "Divide" => Ok(Operator::Divide), "Modulo" => Ok(Operator::Modulo), + "Like" => Ok(Operator::Like), + "NotLike" => Ok(Operator::NotLike), other => Err(proto_error(format!( "Unsupported binary operator '{:?}'", other From 64dbd313c2f0047d4a65e42e66b290d5fddbac86 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 11:46:55 -0600 Subject: [PATCH 06/10] Revert more changes --- datafusion/expr/src/binary_rule.rs | 2 +- datafusion/expr/src/expr.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index f043979c15382..d0057a1c94bba 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -504,7 +504,7 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option /// coercion rules for like operations. /// This is a union of string coercion rules and dictionary coercion rules -pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { +fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { string_coercion(lhs_type, rhs_type) .or_else(|| dictionary_coercion(lhs_type, rhs_type, false)) .or_else(|| null_coercion(lhs_type, rhs_type)) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c88e7221df2f6..6226887e80d0e 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -505,7 +505,6 @@ impl Not for Expr { type Output = Self; fn not(self) -> Self::Output { - // TODO file issue for extending this to other similar expressions match self { Expr::Like { negated, From df73d161eacc6b0a212759f9dfe56fc8ccf895cc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Sep 2022 16:08:38 -0600 Subject: [PATCH 07/10] clippy --- datafusion/optimizer/src/type_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index ed22978471cb0..9c08160997221 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -90,7 +90,7 @@ impl ExprRewriter for TypeCoercionRewriter { Expr::BinaryExpr { left, op, right } => { let left_type = left.get_type(&self.schema)?; let right_type = right.get_type(&self.schema)?; - let coerced_type = coerce_types(&left_type, &op, &right_type)?; + let coerced_type = coerce_types(&left_type, op, &right_type)?; Ok(Expr::BinaryExpr { left: Box::new( left.as_ref().clone().cast_to(&coerced_type, &self.schema)?, From 3a86db8ec51bc3504bfe42384dff798aafb39c9e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 8 Sep 2022 19:18:38 -0600 Subject: [PATCH 08/10] address feedback --- datafusion/optimizer/src/type_coercion.rs | 12 +-------- datafusion/sql/src/planner.rs | 31 +++++++++++++++++++---- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index f88ca58736a2a..6bbc97f04256d 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -18,8 +18,7 @@ //! Optimizer rule for type validation and coercion use crate::{OptimizerConfig, OptimizerRule}; -use arrow::datatypes::DataType; -use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; use datafusion_expr::binary_rule::coerce_types; use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; use datafusion_expr::logical_plan::builder::build_join_schema; @@ -105,15 +104,6 @@ impl ExprRewriter for TypeCoercionRewriter { ), }) } - Expr::Like { pattern, .. } - | Expr::ILike { pattern, .. } - | Expr::SimilarTo { pattern, .. } => match pattern.get_type(&self.schema)? { - DataType::Utf8 => Ok(expr.clone()), - other => Err(DataFusionError::Plan(format!( - "Expected pattern in Like, ILike, or SimilarTo to be Utf8 but was {}", - other - ))), - }, Expr::ScalarUDF { fun, args } => { let new_expr = coerce_arguments_for_signature( args.as_slice(), diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index cf6c71ab78b5b..dd44459f5081e 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -35,8 +35,8 @@ use datafusion_expr::utils::{ COUNT_STAR_EXPANSION, }; use datafusion_expr::{ - and, col, lit, AggregateFunction, AggregateUDF, Expr, Operator, ScalarUDF, - WindowFrame, WindowFrameUnits, + and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, Operator, + ScalarUDF, WindowFrame, WindowFrameUnits, }; use datafusion_expr::{ window_function::WindowFunction, BuiltinScalarFunction, TableSource, @@ -1939,29 +1939,50 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } SQLExpr::Like { negated, expr, pattern, escape_char } => { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, ctes)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in LIKE expression".to_string(), + )); + } Ok(Expr::Like { negated, expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), - pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + pattern: Box::new(pattern), escape_char }) } SQLExpr::ILike { negated, expr, pattern, escape_char } => { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, ctes)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in ILIKE expression".to_string(), + )); + } Ok(Expr::ILike { negated, expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), - pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + pattern: Box::new(pattern), escape_char }) } SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, ctes)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in SIMILAR TO expression".to_string(), + )); + } Ok(Expr::SimilarTo { negated, expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, ctes)?), - pattern: Box::new(self.sql_expr_to_logical_expr(*pattern, schema, ctes)?), + pattern: Box::new(pattern), escape_char }) } From 8ea73b13c2dd290e17d4f1cad8aa150867767d94 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 8 Sep 2022 19:20:17 -0600 Subject: [PATCH 09/10] revert change to test --- datafusion/core/src/physical_plan/planner.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 20f708b61a8ae..6ec35a59648af 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1682,8 +1682,6 @@ mod tests { use datafusion_expr::expr::GroupingSet; use datafusion_expr::sum; use datafusion_expr::{col, lit}; - use datafusion_optimizer::type_coercion::TypeCoercion; - use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; use fmt::Debug; use std::collections::HashMap; use std::convert::TryFrom; @@ -1861,19 +1859,7 @@ mod tests { col("c1").like(col("c2")), ]; for case in cases { - let logical_plan = test_csv_scan() - .await? - .project(vec![case.clone()]) - .and_then(|b| b.build()) - .and_then(|plan| { - // this test was expecting type coercion/validation errors before the optimizer - // had run due to the legacy approach of type coercion but now we need to run the - // optimizer here - let type_coercion = TypeCoercion::default(); - let mut config = OptimizerConfig::new(); - type_coercion.optimize(&plan, &mut config) - }); - + let logical_plan = test_csv_scan().await?.project(vec![case.clone()]); let message = format!( "Expression {:?} expected to error due to impossible coercion", case From a18dec8c2c9e140dca6e55f34dde9d6dc5c898e7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 8 Sep 2022 19:22:42 -0600 Subject: [PATCH 10/10] revert more changes --- datafusion/optimizer/src/type_coercion.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index 6bbc97f04256d..42c081af3dfe1 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -86,22 +86,15 @@ impl ExprRewriter for TypeCoercionRewriter { } fn mutate(&mut self, expr: Expr) -> Result { - match &expr { + match expr { Expr::BinaryExpr { left, op, right } => { let left_type = left.get_type(&self.schema)?; let right_type = right.get_type(&self.schema)?; - let coerced_type = coerce_types(&left_type, op, &right_type)?; + let coerced_type = coerce_types(&left_type, &op, &right_type)?; Ok(Expr::BinaryExpr { - left: Box::new( - left.as_ref().clone().cast_to(&coerced_type, &self.schema)?, - ), - op: *op, - right: Box::new( - right - .as_ref() - .clone() - .cast_to(&coerced_type, &self.schema)?, - ), + left: Box::new(left.cast_to(&coerced_type, &self.schema)?), + op, + right: Box::new(right.cast_to(&coerced_type, &self.schema)?), }) } Expr::ScalarUDF { fun, args } => { @@ -111,11 +104,11 @@ impl ExprRewriter for TypeCoercionRewriter { &fun.signature, )?; Ok(Expr::ScalarUDF { - fun: fun.clone(), + fun, args: new_expr, }) } - expr => Ok(expr.clone()), + expr => Ok(expr), } } }