From 466051658e086a0e921a5a3d885c453eb7a3dc39 Mon Sep 17 00:00:00 2001 From: "Francis (GrandChaman) Le Roy" Date: Mon, 14 Nov 2022 14:39:27 +0100 Subject: [PATCH 1/3] Fixed `ScalarValue::iter_to_array` in some aggregation. The size of the iterator that was passed to the the `iter_to_array` wasn't always checked. Which could cause an error, because the function couldn't figure out the `DataType` of the array its supposed to generate. --- datafusion/common/src/scalar.rs | 8 ++++++-- .../core/src/physical_plan/aggregates/row_hash.rs | 11 +++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index d95ba2199ec8d..8acfc455ca198 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1600,10 +1600,14 @@ impl ScalarValue { let mut valid = BooleanBufferBuilder::new(0); let mut flat_len = 0i32; for scalar in scalars { - if let ScalarValue::List(values, _) = scalar { + if let ScalarValue::List(values, field) = scalar { match values { Some(values) => { - let element_array = ScalarValue::iter_to_array(values)?; + let element_array = if !values.is_empty() { + ScalarValue::iter_to_array(values)? + } else { + arrow::array::new_empty_array(field.data_type()) + }; // Add new offset index flat_len += element_array.len() as i32; diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index aefc6571b068a..f62a0539a4ee4 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -479,8 +479,15 @@ fn create_batch_from_map( results[i].push(acc.evaluate(&state_accessor).unwrap()); } } - for scalars in results { - columns.push(ScalarValue::iter_to_array(scalars)?); + for (scalars, field) in results + .into_iter() + .zip(output_schema.fields()[columns.len()..].iter()) + { + if !scalars.is_empty() { + columns.push(ScalarValue::iter_to_array(scalars)?); + } else { + columns.push(arrow::array::new_empty_array(field.data_type())) + } } } } From 82b94277239e92c55f5e80e3f34e6b174a028001 Mon Sep 17 00:00:00 2001 From: "Francis (GrandChaman) Le Roy" Date: Mon, 14 Nov 2022 15:33:10 +0100 Subject: [PATCH 2/3] Added some test for the issue #4080 --- datafusion/core/tests/sql/aggregates.rs | 34 +++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 4b8a158fb4ff7..836eed16358e2 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -2314,3 +2314,37 @@ async fn aggregate_with_alias() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn array_agg_zero() -> Result<()> { + let ctx = SessionContext::new(); + // 2 different aggregate functions: avg and sum(distinct) + let sql = "SELECT ARRAY_AGG([]);"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+------------------------+", + "| ARRAYAGG(List([NULL])) |", + "+------------------------+", + "| [] |", + "+------------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn array_agg_one() -> Result<()> { + let ctx = SessionContext::new(); + // 2 different aggregate functions: avg and sum(distinct) + let sql = "SELECT ARRAY_AGG([1]);"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---------------------+", + "| ARRAYAGG(List([1])) |", + "+---------------------+", + "| [[1]] |", + "+---------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} From 40ea46b5be2ef79666bc7edf56f8557eb67ce417 Mon Sep 17 00:00:00 2001 From: "Francis (GrandChaman) Le Roy" Date: Mon, 14 Nov 2022 15:35:53 +0100 Subject: [PATCH 3/3] Cleaned up code to explicit the field iterator creation --- .../core/src/physical_plan/aggregates/row_hash.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index f62a0539a4ee4..b185ec1ec5ee2 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -479,10 +479,12 @@ fn create_batch_from_map( results[i].push(acc.evaluate(&state_accessor).unwrap()); } } - for (scalars, field) in results - .into_iter() - .zip(output_schema.fields()[columns.len()..].iter()) - { + // We skip over the first `columns.len()` elements. + // + // This shouldn't panic if the `output_schema` has enough fields. + let remaining_field_iterator = output_schema.fields()[columns.len()..].iter(); + + for (scalars, field) in results.into_iter().zip(remaining_field_iterator) { if !scalars.is_empty() { columns.push(ScalarValue::iter_to_array(scalars)?); } else {