diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index d95ba2199ec8d..8acfc455ca198 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1600,10 +1600,14 @@ impl ScalarValue { let mut valid = BooleanBufferBuilder::new(0); let mut flat_len = 0i32; for scalar in scalars { - if let ScalarValue::List(values, _) = scalar { + if let ScalarValue::List(values, field) = scalar { match values { Some(values) => { - let element_array = ScalarValue::iter_to_array(values)?; + let element_array = if !values.is_empty() { + ScalarValue::iter_to_array(values)? + } else { + arrow::array::new_empty_array(field.data_type()) + }; // Add new offset index flat_len += element_array.len() as i32; diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index aefc6571b068a..b185ec1ec5ee2 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -479,8 +479,17 @@ fn create_batch_from_map( results[i].push(acc.evaluate(&state_accessor).unwrap()); } } - for scalars in results { - columns.push(ScalarValue::iter_to_array(scalars)?); + // We skip over the first `columns.len()` elements. + // + // This shouldn't panic if the `output_schema` has enough fields. + let remaining_field_iterator = output_schema.fields()[columns.len()..].iter(); + + for (scalars, field) in results.into_iter().zip(remaining_field_iterator) { + if !scalars.is_empty() { + columns.push(ScalarValue::iter_to_array(scalars)?); + } else { + columns.push(arrow::array::new_empty_array(field.data_type())) + } } } } diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 4b8a158fb4ff7..836eed16358e2 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -2314,3 +2314,37 @@ async fn aggregate_with_alias() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn array_agg_zero() -> Result<()> { + let ctx = SessionContext::new(); + // 2 different aggregate functions: avg and sum(distinct) + let sql = "SELECT ARRAY_AGG([]);"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+------------------------+", + "| ARRAYAGG(List([NULL])) |", + "+------------------------+", + "| [] |", + "+------------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn array_agg_one() -> Result<()> { + let ctx = SessionContext::new(); + // 2 different aggregate functions: avg and sum(distinct) + let sql = "SELECT ARRAY_AGG([1]);"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---------------------+", + "| ARRAYAGG(List([1])) |", + "+---------------------+", + "| [[1]] |", + "+---------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +}