From 0eb3f00c4b06375fb1bd54d29a3169ccbf3a806e Mon Sep 17 00:00:00 2001 From: remzi <13716567376yh@gmail.com> Date: Thu, 1 Sep 2022 15:57:02 +0800 Subject: [PATCH 1/4] add binary support Signed-off-by: remzi <13716567376yh@gmail.com> --- .../physical-expr/src/expressions/in_list.rs | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index a391bf51dda18..3b0b32ffdaca5 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -503,6 +503,39 @@ impl InListExpr { )?))) } } + + fn compare_binary( + &self, + array: ArrayRef, + list_values: Vec, + negated: bool, + ) -> Result { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + let contains_null = list_values + .iter() + .any(|v| matches!(v, ColumnarValue::Scalar(s) if s.is_null())); + let values = list_values + .iter() + .flat_map(|expr| match expr { + ColumnarValue::Scalar(s) => match s { + ScalarValue::Binary(Some(v)) | ScalarValue::LargeBinary(Some(v)) => { + Some(v.as_slice()) + } + ScalarValue::Binary(None) | ScalarValue::LargeBinary(None) => None, + datatype => unimplemented!("Unexpected type {} for InList", datatype), + }, + ColumnarValue::Array(_) => { + unimplemented!("InList does not yet support nested columns.") + } + }) + .collect::>(); + + Ok(collection_contains_check!(array, values, negated, contains_null)) + } } impl std::fmt::Display for InListExpr { @@ -795,6 +828,12 @@ impl PhysicalExpr for InListExpr { DataType::LargeUtf8 => { self.compare_utf8::(array, list_values, self.negated) } + DataType::Binary => { + self.compare_binary::(array, list_values, self.negated) + } + DataType::LargeBinary => { + self.compare_binary::(array, list_values, self.negated) + } DataType::Null => { let null_array = new_null_array(&DataType::Boolean, array.len()); Ok(ColumnarValue::Array(Arc::new(null_array))) From af568648491840f79df22c48a6c64af20182cf26 Mon Sep 17 00:00:00 2001 From: remzi <13716567376yh@gmail.com> Date: Thu, 1 Sep 2022 16:49:15 +0800 Subject: [PATCH 2/4] support inset and add tests Signed-off-by: remzi <13716567376yh@gmail.com> --- .../physical-expr/src/expressions/in_list.rs | 125 +++++++++++++++++- 1 file changed, 122 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 3b0b32ffdaca5..cf83df42bc91a 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -37,7 +37,7 @@ use arrow::array::*; use arrow::buffer::{Buffer, MutableBuffer}; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::{ - Boolean, Decimal128, Int16, Int32, Int64, Int8, LargeUtf8, UInt16, UInt32, UInt64, + Binary, Boolean, Decimal128, Int16, Int32, Int64, Int8, LargeBinary, LargeUtf8, UInt16, UInt32, UInt64, UInt8, Utf8, }; use datafusion_common::{DataFusionError, Result}; @@ -386,8 +386,7 @@ fn set_contains_utf8( let native_array = set .iter() .flat_map(|v| match v { - Utf8(v) => v.as_deref(), - LargeUtf8(v) => v.as_deref(), + Utf8(v) | LargeUtf8(v) => v.as_deref(), datatype => { unreachable!("InList can't reach other data type {} for {}.", datatype, v) } @@ -398,6 +397,26 @@ fn set_contains_utf8( collection_contains_check!(array, native_set, negated, contains_null) } +fn set_contains_binary( + array: &GenericBinaryArray, + set: &HashSet, + negated: bool, +) -> ColumnarValue { + let contains_null = set.iter().any(|v| v.is_null()); + let native_array = set + .iter() + .flat_map(|v| match v { + Binary(v) | LargeBinary(v) => v.as_deref(), + datatype => { + unreachable!("InList can't reach other data type {} for {}.", datatype, v) + } + }) + .collect::>(); + let native_set: HashSet<&[u8]> = HashSet::from_iter(native_array); + + collection_contains_check!(array, native_set, negated, contains_null) +} + impl InListExpr { /// Create a new InList expression pub fn new( @@ -703,6 +722,20 @@ impl PhysicalExpr for InListExpr { .unwrap(); Ok(set_contains_utf8(array, set, self.negated)) } + DataType::Binary => { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(set_contains_binary(array, set, self.negated)) + } + DataType::LargeBinary => { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + Ok(set_contains_binary(array, set, self.negated)) + } DataType::Decimal128(_, _) => { let array = array.as_any().downcast_ref::().unwrap(); Ok(make_set_contains_decimal(array, set, self.negated)) @@ -945,6 +978,58 @@ mod tests { Ok(()) } + #[test] + fn in_list_binary() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Binary, true)]); + let a = BinaryArray::from(vec![Some([1, 2, 3].as_slice()), Some([1, 2, 2].as_slice()), None]); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + // expression: "a in ([1, 2, 3], [4, 5, 6])" + let list = vec![lit([1, 2, 3].as_slice()), lit([4, 5, 6].as_slice())]; + in_list!( + batch, + list.clone(), + &false, + vec![Some(true), Some(false), None], + col_a.clone(), + &schema + ); + + // expression: "a not in ([1, 2, 3], [4, 5, 6])" + in_list!( + batch, + list, + &true, + vec![Some(false), Some(true), None], + col_a.clone(), + &schema + ); + + // expression: "a in ([1, 2, 3], [4, 5, 6], null)" + let list = vec![lit([1, 2, 3].as_slice()), lit([4, 5, 6].as_slice()), lit(ScalarValue::Binary(None))]; + in_list!( + batch, + list.clone(), + &false, + vec![Some(true), None, None], + col_a.clone(), + &schema + ); + + // expression: "a in ([1, 2, 3], [4, 5, 6], null)" + in_list!( + batch, + list, + &true, + vec![Some(false), None, None], + col_a.clone(), + &schema + ); + + Ok(()) + } + #[test] fn in_list_int64() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); @@ -1355,6 +1440,40 @@ mod tests { Ok(()) } + + #[test] + fn in_list_set_binary() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Binary, true)]); + let a = BinaryArray::from(vec![Some([1, 2, 3].as_slice()), Some([3, 2, 1].as_slice()), None]); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + let mut list = vec![lit([1, 2, 3].as_slice()), lit(ScalarValue::Binary(None))]; + for v in 0..OPTIMIZER_INSET_THRESHOLD { + list.push(lit([v as u8].as_slice())); + } + + in_list!( + batch, + list.clone(), + &false, + vec![Some(true), None, None], + col_a.clone(), + &schema + ); + + in_list!( + batch, + list.clone(), + &true, + vec![Some(false), None, None], + col_a.clone(), + &schema + ); + + Ok(()) + } + #[test] fn in_list_set_decimal() -> Result<()> { let schema = From 3ae99f5a5e22d2867f7bae78da3f51f56e7d647f Mon Sep 17 00:00:00 2001 From: remzi <13716567376yh@gmail.com> Date: Thu, 1 Sep 2022 16:57:50 +0800 Subject: [PATCH 3/4] clean Signed-off-by: remzi <13716567376yh@gmail.com> --- .../physical-expr/src/expressions/in_list.rs | 72 +------------------ 1 file changed, 1 insertion(+), 71 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index cf83df42bc91a..5f4e5109a7a83 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -34,7 +34,6 @@ use arrow::{ use crate::PhysicalExpr; use arrow::array::*; -use arrow::buffer::{Buffer, MutableBuffer}; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::{ Binary, Boolean, Decimal128, Int16, Int32, Int64, Int8, LargeBinary, LargeUtf8, UInt16, UInt32, UInt64, @@ -49,30 +48,6 @@ use datafusion_expr::ColumnarValue; /// TODO: add switch codeGen in In_List static OPTIMIZER_INSET_THRESHOLD: usize = 30; -macro_rules! compare_op_scalar { - ($left: expr, $right:expr, $op:expr) => {{ - let null_bit_buffer = $left.data().null_buffer().cloned(); - - let comparison = - (0..$left.len()).map(|i| unsafe { $op($left.value_unchecked(i), $right) }); - // same as $left.len() - let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) }; - - let data = unsafe { - ArrayData::new_unchecked( - DataType::Boolean, - $left.len(), - None, - null_bit_buffer, - 0, - vec![Buffer::from(buffer)], - vec![], - ) - }; - Ok(BooleanArray::from(data)) - }}; -} - /// InList #[derive(Debug)] pub struct InListExpr { @@ -293,21 +268,6 @@ macro_rules! collection_contains_check_decimal { }}; } -// whether each value on the left (can be null) is contained in the non-null list -fn in_list_utf8( - array: &GenericStringArray, - values: &[&str], -) -> Result { - compare_op_scalar!(array, values, |x, v: &[&str]| v.contains(&x)) -} - -fn not_in_list_utf8( - array: &GenericStringArray, - values: &[&str], -) -> Result { - compare_op_scalar!(array, values, |x, v: &[&str]| !v.contains(&x)) -} - // try evaluate all list exprs and check if the exprs are constants or not fn try_cast_static_filter_to_set( list: &[Arc], @@ -490,37 +450,7 @@ impl InListExpr { }) .collect::>(); - if negated { - if contains_null { - Ok(ColumnarValue::Array(Arc::new( - array - .iter() - .map(|x| match x.map(|v| !values.contains(&v)) { - Some(true) => None, - x => x, - }) - .collect::(), - ))) - } else { - Ok(ColumnarValue::Array(Arc::new(not_in_list_utf8( - array, &values, - )?))) - } - } else if contains_null { - Ok(ColumnarValue::Array(Arc::new( - array - .iter() - .map(|x| match x.map(|v| values.contains(&v)) { - Some(false) => None, - x => x, - }) - .collect::(), - ))) - } else { - Ok(ColumnarValue::Array(Arc::new(in_list_utf8( - array, &values, - )?))) - } + Ok(collection_contains_check!(array, values, negated, contains_null)) } fn compare_binary( From 2b3c4ac57ca10a7b9544a07f66fd754148c0fdb2 Mon Sep 17 00:00:00 2001 From: remzi <13716567376yh@gmail.com> Date: Thu, 1 Sep 2022 16:58:58 +0800 Subject: [PATCH 4/4] fmt Signed-off-by: remzi <13716567376yh@gmail.com> --- .../physical-expr/src/expressions/in_list.rs | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 5f4e5109a7a83..aed25e5802cac 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -36,8 +36,8 @@ use crate::PhysicalExpr; use arrow::array::*; use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::{ - Binary, Boolean, Decimal128, Int16, Int32, Int64, Int8, LargeBinary, LargeUtf8, UInt16, UInt32, UInt64, - UInt8, Utf8, + Binary, Boolean, Decimal128, Int16, Int32, Int64, Int8, LargeBinary, LargeUtf8, + UInt16, UInt32, UInt64, UInt8, Utf8, }; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; @@ -366,7 +366,7 @@ fn set_contains_binary( let native_array = set .iter() .flat_map(|v| match v { - Binary(v) | LargeBinary(v) => v.as_deref(), + Binary(v) | LargeBinary(v) => v.as_deref(), datatype => { unreachable!("InList can't reach other data type {} for {}.", datatype, v) } @@ -450,7 +450,12 @@ impl InListExpr { }) .collect::>(); - Ok(collection_contains_check!(array, values, negated, contains_null)) + Ok(collection_contains_check!( + array, + values, + negated, + contains_null + )) } fn compare_binary( @@ -483,7 +488,12 @@ impl InListExpr { }) .collect::>(); - Ok(collection_contains_check!(array, values, negated, contains_null)) + Ok(collection_contains_check!( + array, + values, + negated, + contains_null + )) } } @@ -911,7 +921,11 @@ mod tests { #[test] fn in_list_binary() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Binary, true)]); - let a = BinaryArray::from(vec![Some([1, 2, 3].as_slice()), Some([1, 2, 2].as_slice()), None]); + let a = BinaryArray::from(vec![ + Some([1, 2, 3].as_slice()), + Some([1, 2, 2].as_slice()), + None, + ]); let col_a = col("a", &schema)?; let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; @@ -937,7 +951,11 @@ mod tests { ); // expression: "a in ([1, 2, 3], [4, 5, 6], null)" - let list = vec![lit([1, 2, 3].as_slice()), lit([4, 5, 6].as_slice()), lit(ScalarValue::Binary(None))]; + let list = vec![ + lit([1, 2, 3].as_slice()), + lit([4, 5, 6].as_slice()), + lit(ScalarValue::Binary(None)), + ]; in_list!( batch, list.clone(), @@ -1370,11 +1388,14 @@ mod tests { Ok(()) } - #[test] fn in_list_set_binary() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Binary, true)]); - let a = BinaryArray::from(vec![Some([1, 2, 3].as_slice()), Some([3, 2, 1].as_slice()), None]); + let a = BinaryArray::from(vec![ + Some([1, 2, 3].as_slice()), + Some([3, 2, 1].as_slice()), + None, + ]); let col_a = col("a", &schema)?; let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;