From 5cd03d7e3771233ee1221d67351fbfe45f70b5cf Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Thu, 2 Oct 2025 15:54:43 +0200 Subject: [PATCH 1/4] feat: convert_array_to_scalar_vec returns optional arrays --- datafusion/common/src/scalar/mod.rs | 164 ++++++++++++++---- datafusion/core/tests/sql/aggregates/basic.rs | 2 +- .../src/merge_arrays.rs | 26 ++- .../functions-aggregate/src/array_agg.rs | 11 +- .../functions-aggregate/src/nth_value.rs | 6 +- datafusion/functions-nested/src/array_has.rs | 51 ++++-- 6 files changed, 199 insertions(+), 61 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 8182e4fd47d4d..60ff1f4b2ed44 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3246,6 +3246,8 @@ impl ScalarValue { /// Retrieve ScalarValue for each row in `array` /// + /// Elements in `array` may be NULL, in which case the corresponding element in the returned vector is None. + /// /// Example 1: Array (ScalarValue::Int32) /// ``` /// use datafusion_common::ScalarValue; @@ -3262,15 +3264,15 @@ impl ScalarValue { /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let expected = vec![ - /// vec![ - /// ScalarValue::Int32(Some(1)), - /// ScalarValue::Int32(Some(2)), - /// ScalarValue::Int32(Some(3)), - /// ], - /// vec![ - /// ScalarValue::Int32(Some(4)), - /// ScalarValue::Int32(Some(5)), - /// ], + /// Some(vec![ + /// ScalarValue::Int32(Some(1)), + /// ScalarValue::Int32(Some(2)), + /// ScalarValue::Int32(Some(3)), + /// ]), + /// Some(vec![ + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ]), /// ]; /// /// assert_eq!(scalar_vec, expected); @@ -3303,28 +3305,62 @@ impl ScalarValue { /// ]); /// /// let expected = vec![ - /// vec![ + /// Some(vec![ /// ScalarValue::List(Arc::new(l1)), /// ScalarValue::List(Arc::new(l2)), - /// ], + /// ]), + /// ]; + /// + /// assert_eq!(scalar_vec, expected); + /// ``` + /// + /// Example 3: Nullable array + /// ``` + /// use datafusion_common::ScalarValue; + /// use arrow::array::ListArray; + /// use arrow::datatypes::{DataType, Int32Type}; + /// + /// let list_arr = ListArray::from_iter_primitive::(vec![ + /// Some(vec![Some(1), Some(2), Some(3)]), + /// None, + /// Some(vec![Some(4), Some(5)]) + /// ]); + /// + /// // Convert the array into Scalar Values for each row + /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); + /// + /// let expected = vec![ + /// Some(vec![ + /// ScalarValue::Int32(Some(1)), + /// ScalarValue::Int32(Some(2)), + /// ScalarValue::Int32(Some(3)), + /// ]), + /// None, + /// Some(vec![ + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ]), /// ]; /// /// assert_eq!(scalar_vec, expected); /// ``` - pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result>> { + pub fn convert_array_to_scalar_vec( + array: &dyn Array, + ) -> Result>>> { fn generic_collect( array: &dyn Array, - ) -> Result>> { + ) -> Result>>> { array .as_list::() .iter() - .map(|nested_array| match nested_array { - Some(nested_array) => (0..nested_array.len()) - .map(|i| ScalarValue::try_from_array(&nested_array, i)) - .collect::>>(), - // TODO: what can we put for null? - // https://github.com/apache/datafusion/issues/17749 - None => Ok(vec![]), + .map(|nested_array| { + nested_array + .map(|array| { + (0..array.len()) + .map(|i| ScalarValue::try_from_array(&array, i)) + .collect::>>() + }) + .transpose() }) .collect() } @@ -9021,7 +9057,7 @@ mod tests { #[test] fn test_convert_array_to_scalar_vec() { - // Regular ListArray + // 1: Regular ListArray let list = ListArray::from_iter_primitive::(vec![ Some(vec![Some(1), Some(2)]), None, @@ -9031,17 +9067,20 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ ScalarValue::Int64(Some(3)), ScalarValue::Int64(None), ScalarValue::Int64(Some(4)) - ], + ]), ] ); - // Regular LargeListArray + // 2: Regular LargeListArray let large_list = LargeListArray::from_iter_primitive::(vec![ Some(vec![Some(1), Some(2)]), None, @@ -9051,17 +9090,20 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ ScalarValue::Int64(Some(3)), ScalarValue::Int64(None), ScalarValue::Int64(Some(4)) - ], + ]), ] ); - // Funky (null slot has non-zero list offsets) + // 3: Funky (null slot has non-zero list offsets) // Offsets + Values looks like this: [[1, 2], [3, 4], [5]] // But with NullBuffer it's like this: [[1, 2], NULL, [5]] let funky = ListArray::new( @@ -9074,9 +9116,63 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ScalarValue::Int64(Some(5))], + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ScalarValue::Int64(Some(5))]), + ] + ); + + // 4: Offsets + Values looks like this: [[1, 2], [], [5]] + // But with NullBuffer it's like this: [[1, 2], NULL, [5]] + // The converted result is: [[1, 2], None, [5]] + let array4 = ListArray::new( + Field::new_list_field(DataType::Int64, true).into(), + OffsetBuffer::new(vec![0, 2, 2, 5].into()), + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])), + Some(NullBuffer::from(vec![true, false, true])), + ); + let converted = ScalarValue::convert_array_to_scalar_vec(&array4).unwrap(); + assert_eq!( + converted, + vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ + ScalarValue::Int64(Some(3)), + ScalarValue::Int64(Some(4)), + ScalarValue::Int64(Some(5)), + ]), + ] + ); + + // 5: Offsets + Values looks like this: [[1, 2], [], [5]] + // Same as 4, but the middle array is not null, so after conversion it's empty. + let array5 = ListArray::new( + Field::new_list_field(DataType::Int64, true).into(), + OffsetBuffer::new(vec![0, 2, 2, 5].into()), + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])), + Some(NullBuffer::from(vec![true, true, true])), + ); + let converted = ScalarValue::convert_array_to_scalar_vec(&array5).unwrap(); + assert_eq!( + converted, + vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + Some(vec![]), + Some(vec![ + ScalarValue::Int64(Some(3)), + ScalarValue::Int64(Some(4)), + ScalarValue::Int64(Some(5)), + ]), ] ); } diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index c6ed260e714e8..4b421b5294e01 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -48,7 +48,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { let column = actual[0].column(0); assert_eq!(column.len(), 1); let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?; - let mut scalars = scalar_vec[0].clone(); + let mut scalars = scalar_vec[0].as_ref().unwrap().clone(); // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { diff --git a/datafusion/functions-aggregate-common/src/merge_arrays.rs b/datafusion/functions-aggregate-common/src/merge_arrays.rs index c6989bc010bdb..bdf1490417beb 100644 --- a/datafusion/functions-aggregate-common/src/merge_arrays.rs +++ b/datafusion/functions-aggregate-common/src/merge_arrays.rs @@ -87,7 +87,7 @@ impl PartialOrd for CustomElement<'_> { /// This functions merges `values` array (`&[Vec]`) into single array `Vec` /// Merging done according to ordering values stored inside `ordering_values` (`&[Vec>]`) -/// Inner `Vec` in the `ordering_values` can be thought as ordering information for the +/// Inner `Vec` in the `ordering_values` can be thought as ordering information for /// each `ScalarValue` in the `values` array. /// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec` /// of the `ordering_values` array). @@ -119,17 +119,27 @@ pub fn merge_ordered_arrays( // Defines according to which ordering comparisons should be done. sort_options: &[SortOptions], ) -> datafusion_common::Result<(Vec, Vec>)> { - // Keep track the most recent data of each branch, in binary heap data structure. + // Keep track of the most recent data of each branch, in a binary heap data structure. let mut heap = BinaryHeap::::new(); - if values.len() != ordering_values.len() - || values - .iter() - .zip(ordering_values.iter()) - .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len()) + if values.len() != ordering_values.len() { + return exec_err!( + "Expects values and ordering_values to have same size but got {} and {}", + values.len(), + ordering_values.len() + ); + } + if let Some((idx, (values, ordering_values))) = values + .iter() + .zip(ordering_values.iter()) + .enumerate() + .find(|(_, (vals, ordering_vals))| vals.len() != ordering_vals.len()) { return exec_err!( - "Expects values arguments and/or ordering_values arguments to have same size" + "Expects values elements and ordering_values elements to have same size but got {} and {} at index {}", + values.len(), + ordering_values.len(), + idx ); } let n_branch = values.len(); diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 268349ecf1b6f..4d8676f24a289 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -687,13 +687,16 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { // Convert array to Scalars to sort them easily. Convert back to array at evaluation. let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - for v in array_agg_res.into_iter() { - partition_values.push(v.into()); + for maybe_v in array_agg_res.into_iter() { + if let Some(v) = maybe_v { + partition_values.push(v.into()); + } else { + partition_values.push(vec![].into()); + } } let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; - - for partition_ordering_rows in orderings.into_iter() { + for partition_ordering_rows in orderings.into_iter().flatten() { // Extract value from struct to ordering_rows for each group/partition let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { if let ScalarValue::Struct(s) = ordering_row { diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index c3f63134bea63..b9dc498ee7469 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -267,7 +267,7 @@ impl Accumulator for TrivialNthValueAccumulator { // First entry in the state is the aggregation result. let n_required = self.n.unsigned_abs() as usize; let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for v in array_agg_res.into_iter() { + for v in array_agg_res.into_iter().flatten() { self.values.extend(v); if self.values.len() > n_required { // There is enough data collected, can stop merging: @@ -457,14 +457,14 @@ impl Accumulator for NthValueAccumulator { let mut partition_values = vec![self.values.clone()]; // First entry in the state is the aggregation result. let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for v in array_agg_res.into_iter() { + for v in array_agg_res.into_iter().flatten() { partition_values.push(v.into()); } // Stores ordering requirement expression results coming from each partition: let mut partition_ordering_values = vec![self.ordering_values.clone()]; let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; // Extract value from struct to ordering_rows for each group/partition: - for partition_ordering_rows in orderings.into_iter() { + for partition_ordering_rows in orderings.into_iter().flatten() { let ordering_values = partition_ordering_rows.into_iter().map(|ordering_row| { let ScalarValue::Struct(s_array) = ordering_row else { return exec_err!( diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 43aa5f4ae60d2..8c36a89fde676 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -142,17 +142,25 @@ impl ScalarUDFImpl for ArrayHas { ScalarValue::convert_array_to_scalar_vec(&array) { assert_eq!(scalar_values.len(), 1); - let list = scalar_values - .into_iter() - .flatten() - .map(|v| Expr::Literal(v, None)) - .collect(); - - return Ok(ExprSimplifyResult::Simplified(in_list( - std::mem::take(needle), - list, - false, - ))); + match &scalar_values[0] { + // Haystack was a single list element as expected + Some(list) => { + let list = list + .iter() + .map(|v| Expr::Literal(v.clone(), None)) + .collect(); + + return Ok(ExprSimplifyResult::Simplified(in_list( + std::mem::take(needle), + list, + false, + ))); + } + // Haystack was a singular null, should be handled elsewhere + None => { + return Ok(ExprSimplifyResult::Original(args)); + } + }; } } Expr::ScalarFunction(ScalarFunction { func, args }) @@ -786,4 +794,25 @@ mod tests { Ok(()) } + + #[test] + fn test_simplify_array_has_with_null_haystack() { + let haystack = ListArray::new_null( + Arc::new(Field::new_list_field(DataType::Int32, true)), + 1, + ); + let haystack = lit(ScalarValue::List(Arc::new(haystack))); + let needle = col("c"); + + let props = ExecutionProps::new(); + let context = datafusion_expr::simplify::SimplifyContext::new(&props); + + let Ok(ExprSimplifyResult::Original(args)) = + ArrayHas::new().simplify(vec![haystack.clone(), needle.clone()], &context) + else { + panic!("Expected non-simplified expression"); + }; + + assert_eq!(args, vec![haystack, col("c")]); + } } From 545ce4e4bffabd589a04eaf37536727cf15d37d2 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Wed, 8 Oct 2025 23:23:19 +0200 Subject: [PATCH 2/4] Revert back to using flatten for performance --- datafusion/functions-nested/src/array_has.rs | 34 +++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 8c36a89fde676..fa852cb0395a8 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -142,25 +142,21 @@ impl ScalarUDFImpl for ArrayHas { ScalarValue::convert_array_to_scalar_vec(&array) { assert_eq!(scalar_values.len(), 1); - match &scalar_values[0] { - // Haystack was a single list element as expected - Some(list) => { - let list = list - .iter() - .map(|v| Expr::Literal(v.clone(), None)) - .collect(); - - return Ok(ExprSimplifyResult::Simplified(in_list( - std::mem::take(needle), - list, - false, - ))); - } - // Haystack was a singular null, should be handled elsewhere - None => { - return Ok(ExprSimplifyResult::Original(args)); - } - }; + let list = scalar_values + .into_iter() + // If the vec is a singular null, `list` will be empty due to this flatten(). + // It would be more clear if we handled the None separately, but this is more performant, + // and still handles it correctly (see `test_simplify_array_has_with_null_haystack`). + .flatten() + .flatten() + .map(|v| Expr::Literal(v.clone(), None)) + .collect(); + + return Ok(ExprSimplifyResult::Simplified(in_list( + std::mem::take(needle), + list, + false, + ))); } } Expr::ScalarFunction(ScalarFunction { func, args }) From 3d3af9baf8a09c55b66cc5f9797685a727c1055d Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Thu, 9 Oct 2025 07:15:53 +0200 Subject: [PATCH 3/4] Update test case to match new behavior --- datafusion/functions-nested/src/array_has.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index fa852cb0395a8..53c0047ce483a 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -674,7 +674,7 @@ mod tests { ScalarValue, }; use datafusion_expr::{ - col, execution_props::ExecutionProps, lit, simplify::ExprSimplifyResult, + col, execution_props::ExecutionProps, in_list, lit, simplify::ExprSimplifyResult, ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, }; @@ -803,12 +803,16 @@ mod tests { let props = ExecutionProps::new(); let context = datafusion_expr::simplify::SimplifyContext::new(&props); - let Ok(ExprSimplifyResult::Original(args)) = - ArrayHas::new().simplify(vec![haystack.clone(), needle.clone()], &context) - else { - panic!("Expected non-simplified expression"); + let expr = match ArrayHas::new() + .simplify(vec![haystack.clone(), needle.clone()], &context) + .unwrap() + { + ExprSimplifyResult::Simplified(expr) => expr, + ExprSimplifyResult::Original(_) => { + panic!("expected simplified result, not original") + } }; - assert_eq!(args, vec![haystack, col("c")]); + assert_eq!(expr, in_list(col("c"), vec![], false)); } } From 633ab644a9e9e597b03b162e0899fd71403956c0 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Thu, 9 Oct 2025 21:00:33 +0200 Subject: [PATCH 4/4] Remove test --- datafusion/functions-nested/src/array_has.rs | 30 ++------------------ 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 53c0047ce483a..f34fea0c4ba07 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -145,8 +145,7 @@ impl ScalarUDFImpl for ArrayHas { let list = scalar_values .into_iter() // If the vec is a singular null, `list` will be empty due to this flatten(). - // It would be more clear if we handled the None separately, but this is more performant, - // and still handles it correctly (see `test_simplify_array_has_with_null_haystack`). + // It would be more clear if we handled the None separately, but this is more performant. .flatten() .flatten() .map(|v| Expr::Literal(v.clone(), None)) @@ -674,7 +673,7 @@ mod tests { ScalarValue, }; use datafusion_expr::{ - col, execution_props::ExecutionProps, in_list, lit, simplify::ExprSimplifyResult, + col, execution_props::ExecutionProps, lit, simplify::ExprSimplifyResult, ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, }; @@ -790,29 +789,4 @@ mod tests { Ok(()) } - - #[test] - fn test_simplify_array_has_with_null_haystack() { - let haystack = ListArray::new_null( - Arc::new(Field::new_list_field(DataType::Int32, true)), - 1, - ); - let haystack = lit(ScalarValue::List(Arc::new(haystack))); - let needle = col("c"); - - let props = ExecutionProps::new(); - let context = datafusion_expr::simplify::SimplifyContext::new(&props); - - let expr = match ArrayHas::new() - .simplify(vec![haystack.clone(), needle.clone()], &context) - .unwrap() - { - ExprSimplifyResult::Simplified(expr) => expr, - ExprSimplifyResult::Original(_) => { - panic!("expected simplified result, not original") - } - }; - - assert_eq!(expr, in_list(col("c"), vec![], false)); - } }