diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index bf1846ae983d4..809826c7df768 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -25,6 +25,7 @@ use std::vec; use ahash::RandomState; use arrow::row::{OwnedRow, RowConverter, SortField}; +use arrow_array::NullArray; use datafusion_physical_expr::hash_utils::create_hashes; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -836,24 +837,24 @@ fn slice_and_maybe_filter( filter_opt: Option<&Arc>, offsets: &[usize], ) -> Result> { - let sliced_arrays: Vec = aggr_array - .iter() - .map(|array| array.slice(offsets[0], offsets[1] - offsets[0])) - .collect(); + let null_array = Arc::new(NullArray::new(0)) as ArrayRef; + let mut sliced_arrays: Vec = vec![null_array; aggr_array.len()]; - let filtered_arrays = match filter_opt.as_ref() { - Some(f) => { - let sliced = f.slice(offsets[0], offsets[1] - offsets[0]); - let filter_array = as_boolean_array(&sliced)?; + if let Some(f) = filter_opt { + let sliced = f.slice(offsets[0], offsets[1] - offsets[0]); + let filter_array = as_boolean_array(&sliced)?; - sliced_arrays - .iter() - .map(|array| filter(array, filter_array).unwrap()) - .collect::>() + for (i, arr) in aggr_array.iter().enumerate() { + let sliced = &arr.slice(offsets[0], offsets[1] - offsets[0]); + sliced_arrays[i] = filter(sliced, filter_array).unwrap(); + } + } else { + for (i, arr) in aggr_array.iter().enumerate() { + sliced_arrays[i] = arr.slice(offsets[0], offsets[1] - offsets[0]); } - None => sliced_arrays, - }; - Ok(filtered_arrays) + } + + Ok(sliced_arrays) } /// This method is similar to Scalar::try_from_array except for the Null handling.