diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 55ce76c4b939f..26e03a3b9893e 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1459,7 +1459,10 @@ impl ScalarValue { ScalarValue::DurationMillisecond(v) => v.is_none(), ScalarValue::DurationMicrosecond(v) => v.is_none(), ScalarValue::DurationNanosecond(v) => v.is_none(), - ScalarValue::Union(v, _, _) => v.is_none(), + ScalarValue::Union(v, _, _) => match v { + Some((_, s)) => s.is_null(), + None => true, + }, ScalarValue::Dictionary(_, v) => v.is_null(), } } @@ -6514,4 +6517,33 @@ mod tests { } intervals } + + fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect() + } + + #[test] + fn sparse_scalar_union_is_null() { + let sparse_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Sparse, + ); + assert!(sparse_scalar.is_null()); + } + + #[test] + fn dense_scalar_union_is_null() { + let dense_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Dense, + ); + assert!(dense_scalar.is_null()); + } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e46a92e92818f..2d1904d9e1667 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -29,8 +29,9 @@ use arrow::{ }, record_batch::RecordBatch, }; -use arrow_array::Float32Array; -use arrow_schema::ArrowError; +use arrow_array::{Array, Float32Array, Float64Array, UnionArray}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::{ArrowError, UnionFields, UnionMode}; use datafusion_functions_aggregate::count::count_udaf; use object_store::local::LocalFileSystem; use std::fs; @@ -2195,3 +2196,163 @@ async fn write_parquet_results() -> Result<()> { Ok(()) } + +fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + (2, Arc::new(Field::new("C", DataType::Utf8, true))), + ] + .into_iter() + .collect() +} + +#[tokio::test] +async fn sparse_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]); + let float_array = Float64Array::from(vec![None, None, Some(3.2), None, None, None]); + let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let field = Field::new( + "my_union", + DataType::Union(union_fields(), UnionMode::Sparse), + true, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_batch("union_batch", batch).unwrap(); + + let df = ctx.table("union_batch").await.unwrap(); + + // view_all + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {A=} |", + "| {B=3.2} |", + "| {B=} |", + "| {C=a} |", + "| {C=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); + + // filter where is null + let result_df = df.clone().filter(col("my_union").is_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=} |", + "| {B=} |", + "| {C=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); + + // filter where is not null + let result_df = df.filter(col("my_union").is_not_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {B=3.2} |", + "| {C=a} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); +} + +#[tokio::test] +async fn dense_union_is_null() { + // union of [{A=1}, null, {B=3.2}, {A=34}] + let int_array = Int32Array::from(vec![Some(1), None]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::from(vec![Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 1] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = + UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap(); + + let field = Field::new( + "my_union", + DataType::Union(union_fields(), UnionMode::Dense), + true, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_batch("union_batch", batch).unwrap(); + + let df = ctx.table("union_batch").await.unwrap(); + + // view_all + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {A=} |", + "| {B=3.2} |", + "| {B=} |", + "| {C=a} |", + "| {C=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); + + // filter where is null + let result_df = df.clone().filter(col("my_union").is_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=} |", + "| {B=} |", + "| {C=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); + + // filter where is not null + let result_df = df.filter(col("my_union").is_not_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {B=3.2} |", + "| {C=a} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); +} diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index d8fa77585b5d4..9f7438d13e051 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -73,9 +73,11 @@ impl PhysicalExpr for IsNotNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( - compute::is_not_null(array.as_ref())?, - ))), + ColumnarValue::Array(array) => { + let is_null = super::is_null::compute_is_null(array)?; + let is_not_null = compute::not(&is_null)?; + Ok(ColumnarValue::Array(Arc::new(is_not_null))) + } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(!scalar.is_null())), )), @@ -120,6 +122,8 @@ mod tests { array::{BooleanArray, StringArray}, datatypes::*, }; + use arrow_array::{Array, Float64Array, Int32Array, UnionArray}; + use arrow_buffer::ScalarBuffer; use datafusion_common::cast::as_boolean_array; #[test] @@ -143,4 +147,48 @@ mod tests { Ok(()) } + + #[test] + fn union_is_not_null_op() { + // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None]); + let float_array = + Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None]); + let type_ids = [0, 0, 1, 1, 1].into_iter().collect::>(); + + let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + + let union_fields: UnionFields = [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect(); + + let array = + UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap(); + + let field = Field::new( + "my_union", + DataType::Union(union_fields, UnionMode::Sparse), + true, + ); + + let schema = Schema::new(vec![field]); + let expr = is_not_null(col("my_union", &schema).unwrap()).unwrap(); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); + + // expression: "a is not null" + let actual = expr + .evaluate(&batch) + .unwrap() + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); + let actual = as_boolean_array(&actual).unwrap(); + + let expected = &BooleanArray::from(vec![true, false, true, true, false]); + + assert_eq!(expected, actual); + } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 41becafde6ded..e2dc941e26bce 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -25,6 +25,9 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray}; +use arrow_buffer::{BooleanBuffer, ScalarBuffer}; +use arrow_ord::cmp; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; @@ -74,9 +77,9 @@ impl PhysicalExpr for IsNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( - compute::is_null(array.as_ref())?, - ))), + ColumnarValue::Array(array) => { + Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) + } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), )), @@ -100,6 +103,55 @@ impl PhysicalExpr for IsNullExpr { } } +/// workaround , +/// this can be replaced with a direct call to `arrow::compute::is_null` once it's fixed. +pub(crate) fn compute_is_null(array: ArrayRef) -> Result { + if let Some(union_array) = array.as_any().downcast_ref::() { + if let Some(offsets) = union_array.offsets() { + dense_union_is_null(union_array, offsets) + } else { + sparse_union_is_null(union_array) + } + } else { + compute::is_null(array.as_ref()).map_err(Into::into) + } +} + +fn dense_union_is_null( + union_array: &UnionArray, + offsets: &ScalarBuffer, +) -> Result { + let child_arrays = (0..union_array.type_names().len()) + .map(|type_id| { + compute::is_null(&union_array.child(type_id as i8)).map_err(Into::into) + }) + .collect::>>()?; + + let buffer: BooleanBuffer = offsets + .iter() + .zip(union_array.type_ids()) + .map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset as usize)) + .collect(); + + Ok(BooleanArray::new(buffer, None)) +} + +fn sparse_union_is_null(union_array: &UnionArray) -> Result { + let type_ids = Int8Array::new(union_array.type_ids().clone(), None); + + let mut union_is_null = + BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None); + for type_id in 0..union_array.type_names().len() { + let type_id = type_id as i8; + let union_is_child = cmp::eq(&type_ids, &Int8Array::new_scalar(type_id))?; + let child = union_array.child(type_id); + let child_array_is_null = compute::is_null(&child)?; + let child_is_null = compute::and(&union_is_child, &child_array_is_null)?; + union_is_null = compute::or(&union_is_null, &child_is_null)?; + } + Ok(union_is_null) +} + impl PartialEq for IsNullExpr { fn eq(&self, other: &dyn Any) -> bool { down_cast_any_ref(other) @@ -108,6 +160,7 @@ impl PartialEq for IsNullExpr { .unwrap_or(false) } } + /// Create an IS NULL expression pub fn is_null(arg: Arc) -> Result> { Ok(Arc::new(IsNullExpr::new(arg))) @@ -121,6 +174,8 @@ mod tests { array::{BooleanArray, StringArray}, datatypes::*, }; + use arrow_array::{Float64Array, Int32Array}; + use arrow_buffer::ScalarBuffer; use datafusion_common::cast::as_boolean_array; #[test] @@ -145,4 +200,72 @@ mod tests { Ok(()) } + + fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + (2, Arc::new(Field::new("C", DataType::Utf8, true))), + ] + .into_iter() + .collect() + } + + #[test] + fn sparse_union_is_null() { + // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}, {C=}, {C="a"}] + let int_array = + Int32Array::from(vec![Some(1), None, None, None, None, None, None]); + let float_array = + Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None, None, None]); + let str_array = + StringArray::from(vec![None, None, None, None, None, None, Some("a")]); + let type_ids = [0, 0, 1, 1, 1, 2, 2] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = + UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let array_ref = Arc::new(array) as ArrayRef; + let result = compute_is_null(array_ref).unwrap(); + + let expected = + &BooleanArray::from(vec![false, true, false, false, true, true, false]); + assert_eq!(expected, &result); + } + + #[test] + fn dense_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::from(vec![Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 1] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = + UnionArray::try_new(union_fields(), type_ids, Some(offsets), children) + .unwrap(); + + let array_ref = Arc::new(array) as ArrayRef; + let result = compute_is_null(array_ref).unwrap(); + + let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + assert_eq!(expected, &result); + } }