diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index cad2167c69039..3c8ff962b64d5 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -2016,14 +2016,6 @@ mod tests { col("c1").eq(bool_expr.clone()), // u32 AND bool col("c2").and(bool_expr), - // utf8 LIKE u32 - col("c1").like(col("c2")), - // utf8 NOT LIKE u32 - col("c1").not_like(col("c2")), - // utf8 ILIKE u32 - col("c1").ilike(col("c2")), - // utf8 NOT ILIKE u32 - col("c1").not_ilike(col("c2")), ]; for case in cases { let logical_plan = test_csv_scan().await?.project(vec![case.clone()]); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index fd045b1ac4479..7af6849ffec43 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -245,9 +245,6 @@ impl BinaryExpr { match self.op { Operator::Or => 5, Operator::And => 10, - Operator::Like | Operator::NotLike | Operator::ILike | Operator::NotILike => { - 19 - } Operator::NotEq | Operator::Eq | Operator::Lt @@ -685,22 +682,22 @@ impl Expr { /// Return `self LIKE other` pub fn like(self, other: Expr) -> Expr { - binary_expr(self, Operator::Like, other) + Expr::Like(Like::new(false, Box::new(self), Box::new(other), None)) } /// Return `self NOT LIKE other` pub fn not_like(self, other: Expr) -> Expr { - binary_expr(self, Operator::NotLike, other) + Expr::Like(Like::new(true, Box::new(self), Box::new(other), None)) } /// Return `self ILIKE other` pub fn ilike(self, other: Expr) -> Expr { - binary_expr(self, Operator::ILike, other) + Expr::ILike(Like::new(false, Box::new(self), Box::new(other), None)) } /// Return `self NOT ILIKE other` pub fn not_ilike(self, other: Expr) -> Expr { - binary_expr(self, Operator::NotILike, other) + Expr::ILike(Like::new(true, Box::new(self), Box::new(other), None)) } /// Return `self AS name` alias expression diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 4c623bb2c29df..fac81654c4a82 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -51,14 +51,6 @@ pub enum Operator { And, /// Logical OR, like `||` Or, - /// Matches a wildcard pattern - Like, - /// Does not match a wildcard pattern - NotLike, - /// Matches a wildcard pattern, ignores case - ILike, - /// Does not match a wildcard pattern, ignores case - NotILike, /// IS DISTINCT FROM IsDistinctFrom, /// IS NOT DISTINCT FROM @@ -96,10 +88,6 @@ impl Operator { Operator::LtEq => Some(Operator::Gt), Operator::Gt => Some(Operator::LtEq), Operator::GtEq => Some(Operator::Lt), - Operator::Like => Some(Operator::NotLike), - Operator::NotLike => Some(Operator::Like), - Operator::ILike => Some(Operator::NotILike), - Operator::NotILike => Some(Operator::ILike), Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), Operator::Plus @@ -133,11 +121,7 @@ impl Operator { Operator::LtEq => Some(Operator::GtEq), Operator::Gt => Some(Operator::Lt), Operator::GtEq => Some(Operator::LtEq), - Operator::Like - | Operator::NotLike - | Operator::ILike - | Operator::NotILike - | Operator::IsDistinctFrom + Operator::IsDistinctFrom | Operator::IsNotDistinctFrom | Operator::Plus | Operator::Minus @@ -176,10 +160,6 @@ impl fmt::Display for Operator { Operator::Modulo => "%", Operator::And => "AND", Operator::Or => "OR", - Operator::Like => "LIKE", - Operator::NotLike => "NOT LIKE", - Operator::ILike => "ILIKE", - Operator::NotILike => "NOT ILIKE", Operator::RegexMatch => "~", Operator::RegexIMatch => "~*", Operator::RegexNotMatch => "!~", diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 2de1c7a6612a5..d2923c8dbfae7 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -45,10 +45,6 @@ pub fn binary_operator_data_type( | Operator::NotEq | Operator::And | Operator::Or - | Operator::Like - | Operator::NotLike - | Operator::ILike - | Operator::NotILike | Operator::Lt | Operator::Gt | Operator::GtEq @@ -117,10 +113,6 @@ pub fn coerce_types( | Operator::Gt | Operator::GtEq | Operator::LtEq => comparison_coercion(lhs_type, rhs_type), - // "like" operators operate on strings and always return a boolean - Operator::Like | Operator::NotLike | Operator::ILike | Operator::NotILike => { - like_coercion(lhs_type, rhs_type) - } Operator::Plus | Operator::Minus if is_date(lhs_type) || is_timestamp(lhs_type) => { @@ -525,7 +517,7 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option /// coercion rules for like operations. /// This is a union of string coercion rules and dictionary coercion rules -fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { +pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { string_coercion(lhs_type, rhs_type) .or_else(|| dictionary_coercion(lhs_type, rhs_type, false)) .or_else(|| null_coercion(lhs_type, rhs_type)) @@ -879,30 +871,10 @@ mod tests { #[test] fn test_type_coercion() -> Result<()> { - test_coercion_binary_rule!( - DataType::Utf8, - DataType::Utf8, - Operator::Like, - DataType::Utf8 - ); - test_coercion_binary_rule!( - DataType::Utf8, - DataType::Utf8, - Operator::NotLike, - DataType::Utf8 - ); - test_coercion_binary_rule!( - DataType::Utf8, - DataType::Utf8, - Operator::ILike, - DataType::Utf8 - ); - test_coercion_binary_rule!( - DataType::Utf8, - DataType::Utf8, - Operator::NotILike, - DataType::Utf8 - ); + // test like coercion rule + let result = like_coercion(&DataType::Utf8, &DataType::Utf8); + assert_eq!(result, Some(DataType::Utf8)); + test_coercion_binary_rule!( DataType::Utf8, DataType::Date32, diff --git a/datafusion/optimizer/src/simplify_expressions/utils.rs b/datafusion/optimizer/src/simplify_expressions/utils.rs index 7bf7b4eccf790..10e5c0e874304 100644 --- a/datafusion/optimizer/src/simplify_expressions/utils.rs +++ b/datafusion/optimizer/src/simplify_expressions/utils.rs @@ -21,7 +21,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{ expr::{Between, BinaryExpr}, expr_fn::{and, concat_ws, or}, - lit, BuiltinScalarFunction, Expr, Operator, + lit, BuiltinScalarFunction, Expr, Like, Operator, }; pub static POWS_OF_TEN: [i128; 38] = [ @@ -232,6 +232,20 @@ pub fn negate_clause(expr: Expr) -> Expr { between.low, between.high, )), + // not (A like B) ===> A not like B + Expr::Like(like) => Expr::Like(Like::new( + !like.negated, + like.expr, + like.pattern, + like.escape_char, + )), + // not (A ilike B) ===> A not ilike B + Expr::ILike(like) => Expr::ILike(Like::new( + !like.negated, + like.expr, + like.pattern, + like.escape_char, + )), // use not clause _ => Expr::Not(Box::new(expr)), } diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index 23d1d37065a08..e335c9ff5f851 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -27,7 +27,9 @@ use datafusion_common::{ use datafusion_expr::expr::{self, Between, BinaryExpr, Case, Like, WindowFunction}; use datafusion_expr::expr_rewriter::{ExprRewriter, RewriteRecursion}; use datafusion_expr::logical_plan::Subquery; -use datafusion_expr::type_coercion::binary::{coerce_types, comparison_coercion}; +use datafusion_expr::type_coercion::binary::{ + coerce_types, comparison_coercion, like_coercion, +}; use datafusion_expr::type_coercion::functions::data_types; use datafusion_expr::type_coercion::other::{ get_coerce_type_for_case_when, get_coerce_type_for_list, @@ -175,8 +177,11 @@ impl ExprRewriter for TypeCoercionRewriter { }) => { let left_type = expr.get_type(&self.schema)?; let right_type = pattern.get_type(&self.schema)?; - let coerced_type = - coerce_types(&left_type, &Operator::Like, &right_type)?; + let coerced_type = like_coercion(&left_type, &right_type).ok_or_else(|| { + DataFusionError::Plan(format!( + "There isn't a common type to coerce {left_type} and {right_type} in LIKE expression" + )) + })?; let expr = Box::new(expr.cast_to(&coerced_type, &self.schema)?); let pattern = Box::new(pattern.cast_to(&coerced_type, &self.schema)?); let expr = Expr::Like(Like::new(negated, expr, pattern, escape_char)); @@ -190,8 +195,11 @@ impl ExprRewriter for TypeCoercionRewriter { }) => { let left_type = expr.get_type(&self.schema)?; let right_type = pattern.get_type(&self.schema)?; - let coerced_type = - coerce_types(&left_type, &Operator::ILike, &right_type)?; + let coerced_type = like_coercion(&left_type, &right_type).ok_or_else(|| { + DataFusionError::Plan(format!( + "There isn't a common type to coerce {left_type} and {right_type} in ILIKE expression" + )) + })?; let expr = Box::new(expr.cast_to(&coerced_type, &self.schema)?); let pattern = Box::new(pattern.cast_to(&coerced_type, &self.schema)?); let expr = Expr::ILike(Like::new(negated, expr, pattern, escape_char)); @@ -932,7 +940,9 @@ mod test { let plan = LogicalPlan::Projection(Projection::try_new(vec![like_expr], empty)?); let err = assert_optimized_plan_eq(&plan, expected); assert!(err.is_err()); - assert!(err.unwrap_err().to_string().contains("'Int64 LIKE Utf8' can't be evaluated because there isn't a common type to coerce the types to")); + assert!(err.unwrap_err().to_string().contains( + "There isn't a common type to coerce Int64 and Utf8 in LIKE expression" + )); Ok(()) } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 82e675425024e..149eee593424c 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -28,6 +28,8 @@ use arrow::compute::kernels::arithmetic::{ multiply_scalar, subtract, subtract_scalar, }; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; +use arrow::compute::kernels::comparison::regexp_is_match_utf8; +use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar; use arrow::compute::kernels::comparison::{ eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar, lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar, @@ -47,13 +49,6 @@ use arrow::compute::kernels::comparison::{ use arrow::compute::kernels::comparison::{ eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar, }; -use arrow::compute::kernels::comparison::{ - ilike_utf8, like_utf8, nilike_utf8, nlike_utf8, regexp_is_match_utf8, -}; -use arrow::compute::kernels::comparison::{ - ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar, - regexp_is_match_utf8_scalar, -}; use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; @@ -344,19 +339,6 @@ macro_rules! compute_op { }}; } -macro_rules! binary_string_array_op_scalar { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ - let result: Result> = match $LEFT.data_type() { - DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), - other => Err(DataFusionError::Internal(format!( - "Data type {:?} not supported for scalar operation '{}' on string array", - other, stringify!($OP) - ))), - }; - Some(result) - }}; -} - macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -941,18 +923,6 @@ impl BinaryExpr { Operator::NotEq => { binary_array_op_dyn_scalar!(array, scalar.clone(), neq, bool_type) } - Operator::Like => { - binary_string_array_op_scalar!(array, scalar.clone(), like, bool_type) - } - Operator::NotLike => { - binary_string_array_op_scalar!(array, scalar.clone(), nlike, bool_type) - } - Operator::ILike => { - binary_string_array_op_scalar!(array, scalar.clone(), ilike, bool_type) - } - Operator::NotILike => { - binary_string_array_op_scalar!(array, scalar.clone(), nilike, bool_type) - } Operator::Plus => { binary_primitive_array_op_scalar!(array, scalar.clone(), add) } @@ -1053,10 +1023,6 @@ impl BinaryExpr { right_data_type: &DataType, ) -> Result { match &self.op { - Operator::Like => binary_string_array_op!(left, right, like), - Operator::NotLike => binary_string_array_op!(left, right, nlike), - Operator::ILike => binary_string_array_op!(left, right, ilike), - Operator::NotILike => binary_string_array_op!(left, right, nilike), Operator::Lt => lt_dyn(&left, &right), Operator::LtEq => lt_eq_dyn(&left, &right), Operator::Gt => gt_dyn(&left, &right), @@ -1347,54 +1313,6 @@ mod tests { DataType::Float32, vec![2f32], ); - test_coercion!( - StringArray, - DataType::Utf8, - vec!["hello world", "world"], - StringArray, - DataType::Utf8, - vec!["%hello%", "%hello%"], - Operator::Like, - BooleanArray, - DataType::Boolean, - vec![true, false], - ); - test_coercion!( - StringArray, - DataType::Utf8, - vec!["hello world", "world"], - StringArray, - DataType::Utf8, - vec!["%hello%", "%hello%"], - Operator::NotLike, - BooleanArray, - DataType::Boolean, - vec![false, true], - ); - test_coercion!( - StringArray, - DataType::Utf8, - vec!["hEllo world", "world"], - StringArray, - DataType::Utf8, - vec!["%helLo%", "%helLo%"], - Operator::ILike, - BooleanArray, - DataType::Boolean, - vec![true, false], - ); - test_coercion!( - StringArray, - DataType::Utf8, - vec!["hEllo world", "world"], - StringArray, - DataType::Utf8, - vec!["%helLo%", "%helLo%"], - Operator::NotILike, - BooleanArray, - DataType::Boolean, - vec![false, true], - ); test_coercion!( StringArray, DataType::Utf8, diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs new file mode 100644 index 0000000000000..bd35b4eca6c18 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use arrow::{ + array::{new_null_array, Array, ArrayRef, StringArray}, + record_batch::RecordBatch, +}; +use arrow_schema::{DataType, Schema}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::ColumnarValue; + +use crate::{physical_expr::down_cast_any_ref, AnalysisContext, PhysicalExpr}; + +use arrow::compute::kernels::comparison::{ + ilike_utf8, like_utf8, nilike_utf8, nlike_utf8, +}; +use arrow::compute::kernels::comparison::{ + ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar, +}; + +// Like expression +#[derive(Debug)] +pub struct LikeExpr { + negated: bool, + case_insensitive: bool, + expr: Arc, + pattern: Arc, +} + +impl LikeExpr { + pub fn new( + negated: bool, + case_insensitive: bool, + expr: Arc, + pattern: Arc, + ) -> Self { + Self { + negated, + case_insensitive, + expr, + pattern, + } + } + + /// Is negated + pub fn negated(&self) -> bool { + self.negated + } + + /// Is case insensitive + pub fn case_insensitive(&self) -> bool { + self.case_insensitive + } + + /// Input expression + pub fn expr(&self) -> &Arc { + &self.expr + } + + /// Pattern expression + pub fn pattern(&self) -> &Arc { + &self.pattern + } + + /// Operator name + fn op_name(&self) -> &str { + match (self.negated, self.case_insensitive) { + (false, false) => "LIKE", + (true, false) => "NOT LIKE", + (false, true) => "ILIKE", + (true, true) => "NOT ILIKE", + } + } +} + +impl std::fmt::Display for LikeExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} {} {}", self.expr, self.op_name(), self.pattern) + } +} + +impl PhysicalExpr for LikeExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + Ok(self.expr.nullable(input_schema)? || self.pattern.nullable(input_schema)?) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let expr_value = self.expr.evaluate(batch)?; + let pattern_value = self.pattern.evaluate(batch)?; + let expr_data_type = expr_value.data_type(); + let pattern_data_type = pattern_value.data_type(); + + match ( + &expr_value, + &expr_data_type, + &pattern_value, + &pattern_data_type, + ) { + // Types are equal => valid + (_, l, _, r) if l == r => {} + // Allow comparing a dictionary value with its corresponding scalar value + ( + ColumnarValue::Array(_), + DataType::Dictionary(_, dict_t), + ColumnarValue::Scalar(_), + scalar_t, + ) + | ( + ColumnarValue::Scalar(_), + scalar_t, + ColumnarValue::Array(_), + DataType::Dictionary(_, dict_t), + ) if dict_t.as_ref() == scalar_t => {} + _ => { + return Err(DataFusionError::Internal(format!( + "Cannot evaluate {} expression with types {:?} and {:?}", + self.op_name(), + expr_data_type, + pattern_data_type + ))); + } + } + + // Attempt to use special kernels if one input is scalar and the other is an array + let scalar_result = match (&expr_value, &pattern_value) { + (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => { + self.evaluate_array_scalar(array, scalar)? + } + (_, _) => None, // default to array implementation + }; + + if let Some(result) = scalar_result { + return result.map(|a| ColumnarValue::Array(a)); + } + + // if both arrays or both literals - extract arrays and continue execution + let (expr, pattern) = ( + expr_value.into_array(batch.num_rows()), + pattern_value.into_array(batch.num_rows()), + ); + self.evaluate_array_array(expr, pattern) + .map(|a| ColumnarValue::Array(a)) + } + + fn children(&self) -> Vec> { + vec![self.expr.clone(), self.pattern.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(LikeExpr::new( + self.negated, + self.case_insensitive, + children[0].clone(), + children[1].clone(), + ))) + } + + /// Return the boundaries of this binary expression's result. + fn analyze(&self, context: AnalysisContext) -> AnalysisContext { + context.with_boundaries(None) + } +} + +impl PartialEq for LikeExpr { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| { + self.negated == x.negated + && self.case_insensitive == x.case_insensitive + && self.expr.eq(&x.expr) + && self.pattern.eq(&x.pattern) + }) + .unwrap_or(false) + } +} + +macro_rules! binary_string_array_op_scalar { + ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ + let result: Result> = match $LEFT.data_type() { + DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), + other => Err(DataFusionError::Internal(format!( + "Data type {:?} not supported for scalar operation '{}' on string array", + other, stringify!($OP) + ))), + }; + Some(result) + }}; +} + +impl LikeExpr { + /// Evaluate the expression if the input is an array and + /// pattern is literal - use scalar operations + fn evaluate_array_scalar( + &self, + array: &dyn Array, + scalar: &ScalarValue, + ) -> Result>> { + let scalar_result = match (self.negated, self.case_insensitive) { + (false, false) => binary_string_array_op_scalar!( + array, + scalar.clone(), + like, + &DataType::Boolean + ), + (true, false) => binary_string_array_op_scalar!( + array, + scalar.clone(), + nlike, + &DataType::Boolean + ), + (false, true) => binary_string_array_op_scalar!( + array, + scalar.clone(), + ilike, + &DataType::Boolean + ), + (true, true) => binary_string_array_op_scalar!( + array, + scalar.clone(), + nilike, + &DataType::Boolean + ), + }; + Ok(scalar_result) + } + + fn evaluate_array_array( + &self, + left: Arc, + right: Arc, + ) -> Result { + match (self.negated, self.case_insensitive) { + (false, false) => binary_string_array_op!(left, right, like), + (true, false) => binary_string_array_op!(left, right, nlike), + (false, true) => binary_string_array_op!(left, right, ilike), + (true, true) => binary_string_array_op!(left, right, nilike), + } + } +} + +/// Create a like expression, erroring if the argument types are not compatible. +pub fn like( + negated: bool, + case_insensitive: bool, + expr: Arc, + pattern: Arc, + input_schema: &Schema, +) -> Result> { + let expr_type = &expr.data_type(input_schema)?; + let pattern_type = &pattern.data_type(input_schema)?; + if !expr_type.eq(pattern_type) { + return Err(DataFusionError::Internal(format!( + "The type of {expr_type} AND {pattern_type} of like physical should be same" + ))); + } + Ok(Arc::new(LikeExpr::new( + negated, + case_insensitive, + expr, + pattern, + ))) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::expressions::col; + use arrow::array::BooleanArray; + use arrow_schema::Field; + use datafusion_common::cast::as_boolean_array; + + macro_rules! test_like { + ($A_VEC:expr, $B_VEC:expr, $VEC:expr, $NULLABLE: expr, $NEGATED:expr, $CASE_INSENSITIVE:expr,) => {{ + let schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, $NULLABLE), + Field::new("b", DataType::Utf8, $NULLABLE), + ]); + let a = StringArray::from($A_VEC); + let b = StringArray::from($B_VEC); + + let expression = like( + $NEGATED, + $CASE_INSENSITIVE, + col("a", &schema)?, + col("b", &schema)?, + &schema, + )?; + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + )?; + + // compute + let result = expression.evaluate(&batch)?.into_array(batch.num_rows()); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); + let expected = &BooleanArray::from($VEC); + assert_eq!(expected, result); + }}; + } + + #[test] + fn like_op() -> Result<()> { + test_like!( + vec!["hello world", "world"], + vec!["%hello%", "%hello%"], + vec![true, false], + false, + false, + false, + ); // like + test_like!( + vec![Some("hello world"), None, Some("world")], + vec![Some("%hello%"), None, Some("%hello%")], + vec![Some(false), None, Some(true)], + true, + true, + false, + ); // not like + test_like!( + vec!["hello world", "world"], + vec!["%helLo%", "%helLo%"], + vec![true, false], + false, + false, + true, + ); // ilike + test_like!( + vec![Some("hello world"), None, Some("world")], + vec![Some("%helLo%"), None, Some("%helLo%")], + vec![Some(false), None, Some(true)], + true, + true, + true, + ); // not ilike + + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 8294c0ef7ace5..411482cc4006b 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -27,6 +27,7 @@ mod get_indexed_field; mod in_list; mod is_not_null; mod is_null; +mod like; mod literal; mod negative; mod no_op; @@ -82,6 +83,7 @@ pub use get_indexed_field::GetIndexedFieldExpr; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; +pub use like::{like, LikeExpr}; pub use literal::{lit, Literal}; pub use negative::{negative, NegativeExpr}; pub use no_op::NoOp; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 2be6c1f33b99d..31d28f820555e 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -19,7 +19,7 @@ use crate::var_provider::is_system_variables; use crate::{ execution_props::ExecutionProps, expressions::{ - self, binary, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, + self, binary, like, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal, }, functions, udf, var_provider::VarType, @@ -220,14 +220,25 @@ pub fn create_physical_expr( "LIKE does not support escape_char".to_string(), )); } - let op = if *negated { - Operator::NotLike - } else { - Operator::Like - }; - let bin_expr = - binary_expr(expr.as_ref().clone(), op, pattern.as_ref().clone()); - create_physical_expr(&bin_expr, input_dfschema, input_schema, execution_props) + let physical_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let physical_pattern = create_physical_expr( + pattern, + input_dfschema, + input_schema, + execution_props, + )?; + like( + *negated, + false, + physical_expr, + physical_pattern, + input_schema, + ) } Expr::ILike(Like { negated, @@ -240,14 +251,25 @@ pub fn create_physical_expr( "ILIKE does not support escape_char".to_string(), )); } - let op = if *negated { - Operator::NotILike - } else { - Operator::ILike - }; - let bin_expr = - binary_expr(expr.as_ref().clone(), op, pattern.as_ref().clone()); - create_physical_expr(&bin_expr, input_dfschema, input_schema, execution_props) + let physical_expr = create_physical_expr( + expr, + input_dfschema, + input_schema, + execution_props, + )?; + let physical_pattern = create_physical_expr( + pattern, + input_dfschema, + input_schema, + execution_props, + )?; + like( + *negated, + true, + physical_expr, + physical_pattern, + input_schema, + ) } Expr::Case(case) => { let expr: Option> = if let Some(e) = &case.expr { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 7eee86465dfe8..8a9a8c0f6d748 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1021,6 +1021,8 @@ message PhysicalExprNode { PhysicalScalarUdfNode scalar_udf = 16; PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17; + + PhysicalLikeExprNode like_expr = 18; } } @@ -1074,6 +1076,13 @@ message PhysicalDateTimeIntervalExprNode { string op = 3; } +message PhysicalLikeExprNode { + bool negated = 1; + bool case_insensitive = 2; + PhysicalExprNode expr = 3; + PhysicalExprNode pattern = 4; +} + message PhysicalSortExprNode { PhysicalExprNode expr = 1; bool asc = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 4913761e1c263..65d7af706b75e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13206,6 +13206,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::DateTimeIntervalExpr(v) => { struct_ser.serialize_field("dateTimeIntervalExpr", v)?; } + physical_expr_node::ExprType::LikeExpr(v) => { + struct_ser.serialize_field("likeExpr", v)?; + } } } struct_ser.end() @@ -13247,6 +13250,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "scalarUdf", "date_time_interval_expr", "dateTimeIntervalExpr", + "like_expr", + "likeExpr", ]; #[allow(clippy::enum_variant_names)] @@ -13268,6 +13273,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { WindowExpr, ScalarUdf, DateTimeIntervalExpr, + LikeExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -13306,6 +13312,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "windowExpr" | "window_expr" => Ok(GeneratedField::WindowExpr), "scalarUdf" | "scalar_udf" => Ok(GeneratedField::ScalarUdf), "dateTimeIntervalExpr" | "date_time_interval_expr" => Ok(GeneratedField::DateTimeIntervalExpr), + "likeExpr" | "like_expr" => Ok(GeneratedField::LikeExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -13445,6 +13452,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("dateTimeIntervalExpr")); } expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DateTimeIntervalExpr) +; + } + GeneratedField::LikeExpr => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("likeExpr")); + } + expr_type__ = map.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::LikeExpr) ; } } @@ -13986,6 +14000,149 @@ impl<'de> serde::Deserialize<'de> for PhysicalIsNull { deserializer.deserialize_struct("datafusion.PhysicalIsNull", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalLikeExprNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.negated { + len += 1; + } + if self.case_insensitive { + len += 1; + } + if self.expr.is_some() { + len += 1; + } + if self.pattern.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalLikeExprNode", len)?; + if self.negated { + struct_ser.serialize_field("negated", &self.negated)?; + } + if self.case_insensitive { + struct_ser.serialize_field("caseInsensitive", &self.case_insensitive)?; + } + if let Some(v) = self.expr.as_ref() { + struct_ser.serialize_field("expr", v)?; + } + if let Some(v) = self.pattern.as_ref() { + struct_ser.serialize_field("pattern", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalLikeExprNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "negated", + "case_insensitive", + "caseInsensitive", + "expr", + "pattern", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Negated, + CaseInsensitive, + Expr, + Pattern, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "negated" => Ok(GeneratedField::Negated), + "caseInsensitive" | "case_insensitive" => Ok(GeneratedField::CaseInsensitive), + "expr" => Ok(GeneratedField::Expr), + "pattern" => Ok(GeneratedField::Pattern), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalLikeExprNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalLikeExprNode") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut negated__ = None; + let mut case_insensitive__ = None; + let mut expr__ = None; + let mut pattern__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Negated => { + if negated__.is_some() { + return Err(serde::de::Error::duplicate_field("negated")); + } + negated__ = Some(map.next_value()?); + } + GeneratedField::CaseInsensitive => { + if case_insensitive__.is_some() { + return Err(serde::de::Error::duplicate_field("caseInsensitive")); + } + case_insensitive__ = Some(map.next_value()?); + } + GeneratedField::Expr => { + if expr__.is_some() { + return Err(serde::de::Error::duplicate_field("expr")); + } + expr__ = map.next_value()?; + } + GeneratedField::Pattern => { + if pattern__.is_some() { + return Err(serde::de::Error::duplicate_field("pattern")); + } + pattern__ = map.next_value()?; + } + } + } + Ok(PhysicalLikeExprNode { + negated: negated__.unwrap_or_default(), + case_insensitive: case_insensitive__.unwrap_or_default(), + expr: expr__, + pattern: pattern__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalLikeExprNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalNegativeNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index a6778755987ca..a20b15fcc1980 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1274,7 +1274,7 @@ pub struct PhysicalExtensionNode { pub struct PhysicalExprNode { #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18" )] pub expr_type: ::core::option::Option, } @@ -1323,6 +1323,8 @@ pub mod physical_expr_node { DateTimeIntervalExpr( ::prost::alloc::boxed::Box, ), + #[prost(message, tag = "18")] + LikeExpr(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1404,6 +1406,17 @@ pub struct PhysicalDateTimeIntervalExprNode { pub op: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalLikeExprNode { + #[prost(bool, tag = "1")] + pub negated: bool, + #[prost(bool, tag = "2")] + pub case_insensitive: bool, + #[prost(message, optional, boxed, tag = "3")] + pub expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, optional, boxed, tag = "4")] + pub pattern: ::core::option::Option<::prost::alloc::boxed::Box>, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalSortExprNode { #[prost(message, optional, boxed, tag = "1")] pub expr: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index b17a9c4c12324..f9ded9237ac40 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -1370,10 +1370,6 @@ pub fn from_proto_binary_op(op: &str) -> Result { "Multiply" => Ok(Operator::Multiply), "Divide" => Ok(Operator::Divide), "Modulo" => Ok(Operator::Modulo), - "Like" => Ok(Operator::Like), - "NotLike" => Ok(Operator::NotLike), - "ILike" => Ok(Operator::ILike), - "NotILike" => Ok(Operator::NotILike), "IsDistinctFrom" => Ok(Operator::IsDistinctFrom), "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom), "BitwiseAnd" => Ok(Operator::BitwiseAnd), diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 2a5626d510bae..eee597c5d1561 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -2299,10 +2299,6 @@ mod roundtrip_tests { test(Operator::RegexNotMatch); test(Operator::RegexIMatch); test(Operator::RegexMatch); - test(Operator::Like); - test(Operator::NotLike); - test(Operator::ILike); - test(Operator::NotILike); test(Operator::BitwiseShiftRight); test(Operator::BitwiseShiftLeft); test(Operator::BitwiseAnd); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ffc010465946a..f1f7f8844df20 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -29,6 +29,7 @@ use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::window_function::WindowFunction; use datafusion::physical_expr::expressions::DateTimeIntervalExpr; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; +use datafusion::physical_plan::expressions::LikeExpr; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{ expressions::{ @@ -217,6 +218,22 @@ pub(crate) fn parse_physical_expr( &convert_required!(e.return_type)?, )) } + ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( + like_expr.negated, + like_expr.case_insensitive, + parse_required_physical_box_expr( + &like_expr.expr, + registry, + "expr", + input_schema, + )?, + parse_required_physical_box_expr( + &like_expr.pattern, + registry, + "pattern", + input_schema, + )?, + )), }; Ok(pexpr) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index cff23974e8c77..4c0f701497f56 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1212,6 +1212,7 @@ mod roundtrip_tests { use datafusion::physical_expr::expressions::DateTimeIntervalExpr; use datafusion::physical_expr::ScalarFunctionExpr; use datafusion::physical_plan::aggregates::PhysicalGroupBy; + use datafusion::physical_plan::expressions::like; use datafusion::physical_plan::functions; use datafusion::physical_plan::functions::make_scalar_function; use datafusion::physical_plan::projection::ProjectionExec; @@ -1563,4 +1564,25 @@ mod roundtrip_tests { schema, )?)) } + + #[test] + fn roundtrip_like() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ]); + let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone()))); + let like_expr = like( + false, + false, + col("a", &schema)?, + col("b", &schema)?, + &schema, + )?; + let plan = Arc::new(ProjectionExec::try_new( + vec![(like_expr, "result".to_string())], + input, + )?); + roundtrip_test(plan) + } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c5defbf7764a7..653baec9923d2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,7 +36,9 @@ use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal}; -use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, Sum}; +use datafusion::physical_plan::expressions::{ + Avg, BinaryExpr, Column, LikeExpr, Max, Min, Sum, +}; use datafusion::physical_plan::{AggregateExpr, PhysicalExpr}; use crate::protobuf; @@ -330,6 +332,17 @@ impl TryFrom> for protobuf::PhysicalExprNode { ), ), }) + } else if let Some(expr) = expr.downcast_ref::() { + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr( + Box::new(protobuf::PhysicalLikeExprNode { + negated: expr.negated(), + case_insensitive: expr.case_insensitive(), + expr: Some(Box::new(expr.expr().to_owned().try_into()?)), + pattern: Some(Box::new(expr.pattern().to_owned().try_into()?)), + }), + )), + }) } else { Err(DataFusionError::Internal(format!( "physical_plan::to_proto() unsupported expression {value:?}"