From 45b23e83d60f1fe2d7f13746167deb44c1e9f170 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 17:19:12 +0100 Subject: [PATCH 1/2] Fix performance issue with hash aggregation --- .../src/physical_plan/hash_aggregate.rs | 86 ++++++++++--------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 26e4ef0efc6..421db8938f6 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -288,6 +288,9 @@ fn group_aggregate_batch( // Make sure we can create the accumulators or otherwise return an error create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; + // Keys received in this batch + let mut batch_keys = vec![]; + for row in 0..batch.num_rows() { // 1.1 create_key(&group_values, row, &mut key) @@ -297,11 +300,17 @@ fn group_aggregate_batch( .raw_entry_mut() .from_key(&key) // 1.3 - .and_modify(|_, (_, _, v)| v.push(row as u32)) + .and_modify(|_, (_, _, v)| { + if v.is_empty() { + batch_keys.push(key.clone()) + }; + v.push(row as u32) + }) // 1.2 .or_insert_with(|| { // We can safely unwrap here as we checked we can create an accumulator before let accumulator_set = create_accumulators(aggr_expr).unwrap(); + batch_keys.push(key.clone()); let _ = create_group_by_values(&group_values, row, &mut group_by_values); ( key.clone(), @@ -315,43 +324,42 @@ fn group_aggregate_batch( // 2.3 `take` from each of its arrays the keys' values // 2.4 update / merge the accumulator with the values // 2.5 clear indices - accumulators - .iter_mut() - .try_for_each(|(_, (_, accumulator_set, indices))| { - // 2.2 - accumulator_set - .iter_mut() - .zip(&aggr_input_values) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - compute::take( - array.as_ref(), - &UInt32Array::from(indices.clone()), - None, // None: no index check - ) - .unwrap() - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| match mode { - AggregateMode::Partial => accumulator.update_batch(&values), - AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }) - // 2.5 - .and({ - indices.clear(); - Ok(()) - }) - })?; + batch_keys.iter_mut().try_for_each(|key| { + let (_, accumulator_set, indices) = accumulators.get_mut(key).unwrap(); + let primitive_indices = UInt32Array::from(indices.clone()); + // 2.2 + accumulator_set + .iter_mut() + .zip(&aggr_input_values) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + compute::take( + array.as_ref(), + &primitive_indices, + None, // None: no index check + ) + .unwrap() + }) + .collect::>(), + ) + }) + .try_for_each(|(accumulator, values)| match mode { + AggregateMode::Partial => accumulator.update_batch(&values), + AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values) + } + }) + .and({ + indices.clear(); + Ok(()) + }) + })?; Ok(accumulators) } @@ -1062,4 +1070,4 @@ mod tests { check_aggregates(input).await } -} +} \ No newline at end of file From eaf918e9ee8bb44bf5404ac220837c0d3f1c3eb1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 18:44:28 +0100 Subject: [PATCH 2/2] Fmt, doc --- rust/datafusion/src/physical_plan/hash_aggregate.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 421db8938f6..2a46bb5eaf3 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -319,7 +319,7 @@ fn group_aggregate_batch( }); } - // 2.1 for each key + // 2.1 for each key in this batch // 2.2 for each aggregation // 2.3 `take` from each of its arrays the keys' values // 2.4 update / merge the accumulator with the values @@ -355,6 +355,7 @@ fn group_aggregate_batch( accumulator.merge_batch(&values) } }) + // 2.5 .and({ indices.clear(); Ok(()) @@ -1070,4 +1071,4 @@ mod tests { check_aggregates(input).await } -} \ No newline at end of file +}