Use NDV estimate to pre-allocate hash tables during aggregation#21654
Use NDV estimate to pre-allocate hash tables during aggregation#21654Dandandan wants to merge 3 commits intoapache:mainfrom
Conversation
Use column distinct_count statistics (from Parquet metadata or other sources) to pre-size the hash table in GroupValues implementations, avoiding expensive rehashing during aggregation. The capacity hint is bounded by 128K entries to prevent over-allocation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark |
|
Hi @Dandandan, Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
Add `preallocate(total_num_groups)` method to the `GroupsAccumulator` trait (default no-op) and implement it for: - PrimitiveGroupsAccumulator (SUM, MIN, MAX, etc.) - CountGroupsAccumulator - VarianceGroupsAccumulator - CorrelationGroupsAccumulator Call preallocate on all accumulators in GroupedHashAggregateStream when NDV capacity hint is available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
Thanks for tackling this @Dandandan! It was on my to-do list. We have a workload in Comet that spends the vast majority of its time in the hash agg just resizing and rehashing (almost 1 billion unique values), so I wanted to take a look at using Statistics to preallocate. The description seems perfect to me!
|
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
Yeah the hashtable resizing is a pretty costly one (prob terms of cache / brach mispredicts), especially as the table grows larger.
As there also is a risk of estimating NDV too high, I added a cap for 128K rows (I think it should be configurable). |
|
|
||
| if let Some(capacity) = capacity_hint { | ||
| for acc in &mut accumulators { | ||
| acc.preallocate(capacity); |
There was a problem hiding this comment.
Should probably do these calls after MemoryConsumer
So for Comet we have the final aggregation after a shuffle stage, and Spark will tell us the number of rows in the shuffle stage. That would act as an upper-bound for NDV for Comet, while the max NDV in any single partition would act as our lower-bound. Comet would have to store these statistics values at plan generation for that stage. |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
When distinct_count is absent, estimate NDV from min/max range using Interval::cardinality(), matching the approach already used by join cardinality estimation. This enables NDV-based pre-allocation for Parquet files that have min/max stats but lack explicit distinct_count metadata (e.g. ClickBench). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Can't seem to get some good performance - closing for now. Though I think the idea is sound / should help in certain cases. |
@Dandandan I agree that this is sound, it's most probably the current level of statistics propagation accuracy that is lacking (~5% according to #20292). FYI: there is active work on getting better accuracy (#21120 and #21443 from myself, #8227 in general and #20766 for NDV specifically). I think we can revisit when we get better accuracy. Re. #20292, we spoke offline with @gabotechs, statistics aggregation/propagation results seem numerically unstable (e.g., different results on different envs, probably due to approximations in the computation and different order of execution), we are currently trying to figure out a better way to frame this so that we can measure accuracy and help getting it better. |
|
Could we lock it behind an option that defaults off? In simple plans with a scan (with good stats) and aggregation could you see good performance? That resembles more what a Comet stage plan would look like. |
|
I’ll look at the code some more, but is it only applying the optimization if the statistics are exact or also if estimated? If it’s only exact NDV, I’m confused by the regressions. |
It's both (as exact NDV is rare). |


Which issue does this PR close?
Rationale for this change
During hash aggregation, the hash table in
GroupValuesstarts with a small or zero initial capacity and grows dynamically as new groups are discovered. Each resize requires rehashing all existing entries, which is expensive for high-cardinality group-by queries.When column statistics include
distinct_count(e.g. from Parquet metadata), we can estimate the number of groups upfront and pre-allocate the hash table to avoid repeated rehashing.What changes are included in this PR?
GroupedHashAggregateStream::new(), compute the NDV (number of distinct values) estimate from child statistics usingAggregateExec::compute_group_ndv(), bounded by 128K entriesnew_group_values()to allGroupValuesimplementations:GroupValuesPrimitive- pre-sizesHashTableand valuesVecGroupValuesColumn- pre-sizesHashTableGroupValuesRows- pre-sizesHashTableand row bufferGroupValuesBytes/GroupValuesBytesView- pre-sizes underlyingArrowBytesMap/ArrowBytesViewMapwith_capacity()constructors toArrowBytesMapandArrowBytesViewMapAre these changes tested?
Covered by existing aggregation tests. The change is transparent - it only affects initial allocation sizes, not correctness.
Are there any user-facing changes?
No user-facing API changes. Aggregation queries may use slightly more initial memory but avoid rehashing overhead, improving performance for queries where NDV statistics are available.