Add filter pushdown scatter threshold#9414
Conversation
Previously, every predicate in the RowFilter received the same ProjectionMask containing ALL filter columns. This caused unnecessary decoding of expensive string columns when evaluating cheap integer predicates. Now each predicate receives a mask with only the single column it needs. Key sync improvements (vs baseline): - Q37: 63.7ms -> 7.3ms (-88.6%, Title LIKE with CounterID=62 filter) - Q36: 117ms -> 24ms (-79.5%, URL <> '' with CounterID=62 filter) - Q40: 17.9ms -> 5.1ms (-71.5%, multi-pred with RefererHash eq) - Q41: 17.3ms -> 5.5ms (-68.1%, multi-pred with URLHash eq) - Q22: 303ms -> 127ms (-58.2%, 3 string predicates) - Q42: 7.6ms -> 3.9ms (-48.5%, int-only multi-predicate) - Q38: 19.1ms -> 12.4ms (-34.9%, 5 int predicates) - Q21: 159ms -> 98ms (-38.5%, URL LIKE + SearchPhrase) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use page-level min/max statistics (via StatisticsConverter) to compute a RowSelection that skips pages where equality predicates cannot match. For each equality predicate with an integer literal, we check if the literal falls within each page's [min, max] range and skip pages where it doesn't. Impact is data-dependent - most effective when data is sorted/clustered by the filter column. For this particular 100K-row sample file the data isn't sorted by filter columns, so improvements are modest (~5% for some CounterID=62 queries). Would show larger gains on sorted datasets. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Put the cheapest/most selective predicate first: SearchPhrase <> '' filters ~87% of rows before expensive LIKE predicates run. This reduces string column decoding for Title and URL significantly. Q22 sync: ~6% improvement, Q22 async: ~13% improvement. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmark arrow_reader_clickbench |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark arrow_reader_clickbench |
|
run benchmark arrow_reader_clickbench |
|
@alamb can you get the runner "unstuck" again? :D |
Done What I think is happening is that the runer is being oomkilled:
|
|
I won't schedule any more benchmarks from that branch. Interesting about the mem usage. |
|
run benchmark arrow_reader_clickbench |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Nice |
|
Actually I think we should just have / try this. |
Yes, I think it's pretty effective, simple, and does something DataFusion can not handle yet (skip applying the mask indeed). |
|
🤖 Arrow criterion benchmark running (GKE) | trigger |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Details
Resource Usagebase (merge-base)
branch
|
Measuring selectivity against the absolute result makes the threshold less intuitive since it becomes more scattered after and_then. Revert to measuring against the raw predicate result (relative to current selection). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
I've some ideas to improve this further |
Instead of measuring selectivity (fraction of rows passing), measure scattering: how much applying a predicate would fragment the selection. A predicate is deferred if it would increase the selector count beyond `current_selectors * scatter_threshold`. This directly targets what makes fragmented selections expensive: many small skip/read transitions during decoding. - Rename selectivity_threshold -> scatter_threshold - Add RowSelection::selector_count() (O(1) via Vec::len) - Use selector count ratio instead of row selectivity ratio Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger |
Instead of comparing selector count ratios, measure selector density: selectors / total_rows. A density of 0.25 means at most 25 selectors per 100 rows — anything more fragmented gets deferred. This is more intuitive and directly proportional to the per-row cost of skip/read transitions during decoding. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger |
Store total row count in RowSelection at construction time, enabling O(1) total_row_count() instead of iterating all selectors. Also add selector_count() for O(1) fragmentation measurement. Update split_off() and limit() to maintain the cached row_count. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Details
Resource Usagebase (merge-base)
branch
|
The total row count needed for scatter density calculation is already available at both call sites (sync reader sums row group sizes, async reader has row_count in scope). Pass it as a parameter instead of storing it in RowSelection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Details
Resource Usagebase (merge-base)
branch
|
|
run benchmark arrow_reader_clickbench |
Based on ClickBench profiling, scattering predicates have densities of 0.008-0.054 while clean predicates are <0.001. A threshold of 0.01 defers the scattering ones while applying the clean ones. Also removes the eprintln debug instrumentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger |
|
🤖 Arrow criterion benchmark running (GKE) | trigger |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Details
Resource Usagebase (merge-base)
branch
|
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Details
Resource Usagebase (merge-base)
branch
|
Don't defer a predicate if applying it would reduce the selector count (make the selection less fragmented). Only defer when the predicate both increases selectors AND exceeds the density threshold. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| } | ||
| } | ||
|
|
||
| /// Set a scatter threshold for filter deferral. |
There was a problem hiding this comment.
I like this even more now! This is now dealing with selector density, something that would be hard to do on the datafusion side and is correlated but not equivalent to overall filter selectivity, which I think is what probably ends up mattering more for coarser IO patterns and such given object store range coalescing, pages, etc.
There was a problem hiding this comment.
Yeah - I also think this is quite a bit better - I think it's OK if a filter "only" selects 50% of the rows, if it is nicely packed that will probably be able to skip almost 50% of the pages and lead to efficient IO and decoding.
But (especially when combining multiple filters), chances are the filter gets too fragmented, a 50% packed one is better than a 25% selective one that just does select 1 / skip 1 / select 1 and will do the same amount of IO and having horrible decoding performance.
I think you could go even go further and check which combination of filtera are giving the best of both worlds (selectivity and fragmentation)
There was a problem hiding this comment.
I think it's great, but this heuristic doesn't know anything about the distribution of data. Low selector density doesn't guarantee that skip/read transitions are large.
For example:
total_rows = 10_000
selector_count = 100
density = 100 / 10_000 = 0.01 (at threshold)
RowSelection (100 selectors total):
1) select 4951 rows
2) skip 1
3) select 1
4) skip 1
...
99) skip 1
100) select 4951 rows
Which wouldn't be good:
[ SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ][XSXSXSXSXS...XS][ SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ]
~4951 selected 98 rows ~4951 selected
One way to catch this is by checking selectivity. 49 / 10_000 = 0.0049 (Very low)
I propose keeping both selector_density and selectivity_threshold you had before.
This matrix shows the possible cases:
Density (selectors/rows) |
Selectivity (skipped/rows) |
Typical Pattern | Pushdown / Deferral Hint |
|---|---|---|---|
| Low | Low | Small contiguous skipped island | Would think is good without checking the selectivity |
| Low | High | Large contiguous skips | Great! |
| High | Low | Many tiny transitions, little skipped overall | Worst case |
| High | High | Many tiny transitions but lots skipped | Check if it increases the selector_count |
| /// produce a density above this value, its result is deferred. | ||
| /// For example, `0.25` allows at most 25 selectors per 100 rows. | ||
| /// | ||
| /// A high selector density means many small skip/read transitions, |
There was a problem hiding this comment.
Yes, but low selector density doesn't guarantee that skip/read transitions are large either.
| } | ||
| } | ||
|
|
||
| /// Set a scatter threshold for filter deferral. |
There was a problem hiding this comment.
I think it's great, but this heuristic doesn't know anything about the distribution of data. Low selector density doesn't guarantee that skip/read transitions are large.
For example:
total_rows = 10_000
selector_count = 100
density = 100 / 10_000 = 0.01 (at threshold)
RowSelection (100 selectors total):
1) select 4951 rows
2) skip 1
3) select 1
4) skip 1
...
99) skip 1
100) select 4951 rows
Which wouldn't be good:
[ SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ][XSXSXSXSXS...XS][ SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ]
~4951 selected 98 rows ~4951 selected
One way to catch this is by checking selectivity. 49 / 10_000 = 0.0049 (Very low)
I propose keeping both selector_density and selectivity_threshold you had before.
This matrix shows the possible cases:
Density (selectors/rows) |
Selectivity (skipped/rows) |
Typical Pattern | Pushdown / Deferral Hint |
|---|---|---|---|
| Low | Low | Small contiguous skipped island | Would think is good without checking the selectivity |
| Low | High | Large contiguous skips | Great! |
| High | Low | Many tiny transitions, little skipped overall | Worst case |
| High | High | Many tiny transitions but lots skipped | Check if it increases the selector_count |
|
I also now have a very different idea. Basically, why deal with averages around densities and selections, when we have the real numbers? For selection materialization we loop through the selectors and count non-zero ones: arrow-rs/parquet/src/arrow/arrow_reader/read_plan.rs Lines 113 to 123 in aa9432c Why don't we count rows covered by long/short selectors instead? // total_rows: total rows covered by selection
// effective_count: non-empty selector runs
// short_*_rows: rows in runs <= short_threshold
// long_*_rows: rows in runs >= long_threshold
let (
total_rows,
effective_count,
short_select_rows,
short_skip_rows,
long_select_rows,
long_skip_rows,
) = selection.iter().fold(
(0usize, 0usize, 0usize, 0usize, 0usize, 0usize),
|(rows, cnt, ss, sk, ls, lk), s| {
if s.row_count == 0 {
return (rows, cnt, ss, sk, ls, lk);
}
let rows = rows + s.row_count;
let cnt = cnt + 1;
let is_short = s.row_count <= short_threshold;
let is_long = s.row_count >= long_threshold;
match (s.skip, is_short, is_long) {
(true, true, _) => (rows, cnt, ss, sk + s.row_count, ls, lk),
(true, _, true) => (rows, cnt, ss, sk, ls, lk + s.row_count),
(false, true, _) => (rows, cnt, ss + s.row_count, sk, ls, lk),
(false, _, true) => (rows, cnt, ss, sk, ls + s.row_count, lk),
_ => (rows, cnt, ss, sk, ls, lk), // middle bin
}
},
);Where With this statistics/histogram we'd be able to make a more data-driven decision on keeping or deferring the selection. |
Which issue does this PR close?
Rationale for this change
It can be better to altogether skip (combined) filters with low effectivity - as there still will be overhead of individual (small) skip/read during Parquet decoder.
What changes are included in this PR?
This adds a simple threshold to skip pushing down if the current selection is not "effective", i.e. under a fraction of rows
Are these changes tested?
Are there any user-facing changes?