Adaptive filter scheduling + row-group morsel split#9
Conversation
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (3c51143) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (3c51143) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (3c51143) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
Each Parquet file previously produced a single morsel containing one `ParquetPushDecoder` over the full pruned `ParquetAccessPlan`. Morselize at row-group granularity instead: after all pruning work is done, pack surviving row groups into chunks bounded by a per-morsel row budget and compressed-byte budget (defaults: 100k rows, 64 MiB). Each chunk becomes its own stream so the executor can interleave row-group decode work with other operators and — in a follow-up — let sibling `FileStream`s steal row-group-sized units of work across partitions. A single oversized row group still becomes its own morsel; no sub-row-group splitting is introduced. `EarlyStoppingStream` (which is driven by the non-Clone `FilePruner`) is attached only to the first morsel's stream so the whole file can still short-circuit on dynamic-filter narrowing. Row-group reversal is applied per-chunk on the `PreparedAccessPlan` and the chunk list is reversed so reverse output order is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (b64f2d9) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (b64f2d9) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (b64f2d9) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (28ebd52) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Regression diagnosis: it's the adaptive tracker itself, not the placementTL;DR: On the main regressed queries, forcing every filter to a static placement (either all-PostScan or all-RowFilter) is 20–50 ms faster than the adaptive path. The tracker's per-morsel Local A/B with four placement strategies (5 iterations, M-series)Knobs used on the branch binary +
For Q10, Q14, Q40 — all-postscan is faster than apache/main with pushdown=off. The branch's "worst case" placement is already better than the no-pushdown baseline. The adaptive path is just wasting CPU on top of that. Where the overhead isTwo sources, both on the hot path:
What does this tell usAdaptive currently beats pure static in the one spot it was designed for — hash-join dynamic filters like Q23 (3.3 s → 298 ms, still 11× faster than main+off). But on ClickBench user-written filters, static PostScan is a very hard baseline to beat, and our per-morsel/per-batch bookkeeping consistently loses to it. Possible fixes, smallest → largest
Happy to prototype one of these — (2) looks cheapest and most impactful since it removes the per-batch locks without changing any placement behavior. (4) would structurally recover the old code path for the common case, at the cost of "no adaptation unless we know adaptation is needed". On the morsel split itselfNo-filter queries (Q15/Q16/Q17) also regress ~5–10% at pushdown=off, which is independent of everything above — it's the morsel-split fan-out cost (multiple decoders + readers + projectors per file). Setting 🤖 Generated with Claude Code |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
Isolation experiment: where does the overhead actually live?Built the three strata separately, then 10-iter 4-way on ClickBench-partitioned (local M-series):
pushdown=off
pushdown=on
Findings
What changed in
|
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
run benchmark clickbench_partitioned baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
The previous `build_stream` built every morsel's `RowFilter`, `ParquetPushDecoder`, `AsyncFileReader`, and `Projector` eagerly in a single loop inside the file planner — before any morsel was scheduled. That loop ran on the scheduler thread and was visible as a 10–15% regression vs. main on ClickBench-partitioned queries that have many row-group morsels per file (e.g. Q15, Q16 at pushdown=off). Replace `ParquetStreamMorsel` (which held a pre-built `BoxStream`) with `ParquetLazyMorsel`, which holds only the per-chunk `ParquetAccessPlan` plus an `Arc<LazyMorselShared>` of the file-level state. The decoder and reader are constructed inside `Morsel::into_stream`, so each morsel pays its setup cost only when the scheduler actually picks it up, and the work is distributed across worker threads instead of serialised on the planner. `FilePruner` is `!Clone` and drives whole-file early-stop via `EarlyStoppingStream`, so it still lives on chunk 0's morsel only. The warm `async_file_reader` from metadata / page-index / bloom-filter load is dropped at the end of `build_stream` — every morsel mints a fresh reader via the factory at `into_stream` time. For both built-in factories (`DefaultParquetFileReaderFactory`, `CachedParquetFileReaderFactory`) the "warm cache" benefit of reusing a reader is negligible because the underlying `Arc<dyn ObjectStore>` / `Arc<dyn FileMetadataCache>` is already shared across readers, so the simplification is free. Local ClickBench-partitioned, 10 iterations, pushdown=off (M-series): | Query | main | eager (before) | lazy (this commit) | |-------|------:|---------------:|-------------------:| | Q14 | 325 | 335 | 313 ms | | Q15 | 309 | 358 | 302 ms | | Q16 | 911 | 1049 | 786 ms | | Q24 | 48 | 55 | 56 ms | | Q26 | 41 | 45 | 45 ms | Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (af2a26f) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (af2a26f) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Local vs GKE are disagreeing — asking for an isolation run to nail it downThe headline from this run:
My 10-iter local run on M-series showed the lazy-morsel branch (without any adaptive code) matching or beating apache/main on every query I tested, including Q15/Q16/Q17/Q24 at pushdown=off. GKE's 16-core Neoverse-V2 is disagreeing with that. Looking at which queries regress here:
The no-filter regressions implicate either the morsel-split fan-out or the lazy-morsel wrapper itself, not the adaptive tracker — there's no predicate to adapt on those. That contradicts my local result, which means either (a) GKE's cache/NUMA/allocator behaviour exposes an overhead my M-series hides, or (b) the local run had less variance than I gave it credit for and GKE is closer to the truth. To isolate, could you kick off a clickbench run on PR #10 with the same config? PR #10 is now exactly Trigger: run benchmark clickbench_partitioned
baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false(posted on PR #10 directly, no adaptive env to set). Once I know which layer owns the no-filter regression on GKE, the follow-up is clear:
🤖 Generated with Claude Code |
Local full 43-query ClickBench, 5 iterations, 4 configs (M-series, default morsel budgets)Aggregate: branch is 12 % slower (pushdown=off) / 16 % slower (pushdown=on) than apache/main. Surprise: Q23 is not the win the GKE report suggestedThe recent GKE "11× faster on Q23" was comparing baseline=main+off (3480 ms) to branch+on (316 ms). Apples-to-apples (br+on vs main+on), Q23 is 2.28× slower than static pushdown on my laptop:
Static RowFilter does 119 ms. Our adaptive does 272 ms. So yes, the adaptive system gives most of the arrow-rs speedup — but leaves 150 ms on the table that static pushdown doesn't. The GKE baseline config ( Biggest regressions at pushdown=on (br+on / main+on)
Biggest wins at pushdown=on
What this tells us
Next stepsI think we should separate the two concerns:
Want me to proceed with (1) first? It's the larger absolute regression and its cause is narrower (no adaptive involved). 🤖 Generated with Claude Code |
PR #10 isolation — my morsel/lazy attribution was wrongRan full ClickBench-partitioned + a 10-iter drill-down comparing main, PR #10 (morsel-split + lazy, no adaptive), and this PR (adaptive). Headline: Aggregate (full 43 queries, 5 iter)
PR #10 is essentially at parity with main — 2 % faster at pushdown=off, 3 % slower at pushdown=on. So the lazy-morsel refactor is clean on this hardware. 10-iter drill-down on the queries I flagged yesterday
Three things this tells us:
Where to look nextGiven that PR #10 is at parity with main, the clean recommendation is: merge PR #10 as-is, and pursue the adaptive layer on this PR as a separate effort that has to prove itself against main+on, not against main+off. On the adaptive layer, the specific investigation is: on a query with
Happy to profile 🤖 Generated with Claude Code |
Investigation summary + a rebase questionWhat the profile showedInstrumented
The multi-thread numbers were the big hint: at Where the contention is coming fromUnverified conclusively, but the strong suspicion is this combination:
Ruled out:
What helped so farCommit 5856cca99b gates the file-level
Q3/Q26 closed fully. Q33/Q34 went from ~33%/10% slower to ~22%/18%. Q23 still 2× on filter queries. Full 43-query aggregate (5 iter, pushdown=on)
So this branch is now at 1.12× main aggregate (was 1.16× before). PR #10 is at parity. What would help nextWant to keep going but the next step is structural and probably needs its own PR:
On the rebaseI attempted to rebase this branch onto PR #10 cleanly, and the diff between PR #10 ( 🤖 Generated with Claude Code |
Adds a runtime filter-placement layer on top of the row-group-morsel split introduced by the parent commit. Each Parquet predicate is assigned a `FilterId` and flows through a state machine (`SelectivityTracker`) that moves it between three placements: - `RowFilter` — evaluated inside the arrow-rs decoder as an `ArrowPredicate`, enabling late-materialization savings when the filter columns are a small fraction of the projection. - `PostScan` — evaluated against the decoded wide batch before the projector strips it; used when late materialization has little to save or when the filter is CPU-heavy. - `Dropped` — optional filters (hash-join dynamic filters wrapped in `OptionalFilterPhysicalExpr`) are skipped mid-stream when their CI upper bound on bytes-saved-per-second falls below a minimum. Initial placement uses a cheap byte-ratio heuristic (`filter_compressed_bytes / projection_compressed_bytes`); subsequent placements refine using Welford online stats reported from both the row-filter path (`DatafusionArrowPredicate::evaluate`) and the post-scan path (`apply_post_scan_filters_with_stats`). Placement is re-evaluated per morsel, so stats from the prior morsel's scan feed into the next morsel's decision. Config knobs on `TableParquetOptions.execution.parquet`: - `filter_pushdown_min_bytes_per_sec` (default 100 MB/s) - `filter_collecting_byte_ratio_threshold` (default 0.20) - `filter_confidence_z` (default 2.0 ≈ 97.5% one-sided CI) The `reorder_filters` option is removed; the adaptive tracker subsumes its role. Notable trade-offs documented in PR discussion: - The adaptive layer adds ~10 % aggregate ClickBench overhead vs the pure morsel-split base (PR #10). Most of it lives in `ParquetLazyMorsel::build_stream_now` under parallel load; single- thread shows no regression. Candidate fix is splitting adaptive state out of `LazyMorselShared` so non-adaptive queries get the same `Arc` allocation shape as PR #10. - The `OptionalFilterPhysicalExpr` wrapper changes plan display output (`DynamicFilter [...]` → `Optional(DynamicFilter [...])`); several sqllogictest expected outputs and snapshot tests were updated accordingly. - A selectivity-tracker microbench was added under `benches/selectivity_tracker.rs` so future iterations on the tracker can be measured independently of full ClickBench. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rebase done — and it uncovered the root causeSquashed the adaptive delta as a single commit on top of PR #10's
But the interesting thing is what rebasing did to the numbersFull 43-query ClickBench, pushdown=on, 5 iter:
The rebased branch is 4 % faster than apache/main in aggregate — and faster than PR #10 too. Contrast with the un-rebased branch which was 1.12× slower than main last run. Q33 verification across 3 trials (the query that was the stickiest regression on the un-rebased PR):
Rebased Q33 is at parity with PR #10 (1443 ms), ~24 % faster than the un-rebased branch.
Per-query vs main on the rebased branch (top 10 each, pushdown=on)Wins:
Regressions:
Q23 still regresses ~1.8× — this is the "filter ⊆ projection with a selective LIKE" query I diagnosed earlier. Adaptive's byte-ratio heuristic starts it as PostScan (byte_ratio ≈ 1), and with Where this leaves us
🤖 Generated with Claude Code |
5856cca to
dbcf5ac
Compare
Summary
Mashup of two in-flight PRs, branched off
adriangb/filter-pushdown-dynamic-bytes-morselswithpydantic#59cherry-picked on top:SelectivityTrackerthat moves filters betweenRowFilterand post-scan based on measured effectiveness, plus optional-filter mid-stream skip).ParquetAccessPlan::split_into_chunks, per-chunkParquetPushDecoder+AsyncFileReader,EarlyStoppingStreamattached to chunk 0 only).Merge resolution
The two PRs both touched
datafusion/datasource-parquet/src/opener.rs::build_stream. The merge keeps:partition_filtersbucket split → row-filter vs post-scan,build_row_filterinvoked once to drainunbuildableback into post-scan (wrong-result guard),post_scan_other_bytes_per_rowprecompute, read-plan projection mask over the union of projection + post-scan columns, rebased projection/post-scan exprs againststream_schema.RowFilterfrom the (stable)row_filter_conjunctslist —RowFilteris notClone; build the decoder withprojection_mask.clone(); mint a freshAsyncFileReader(chunk 0 reuses the warm one); clonepost_scan_filtersandpost_scan_other_bytes_per_rowinto eachPushDecoderStreamState; decoder-level.with_limit()still only applied whenpost_scan_filters.is_empty();EarlyStoppingStreamwraps chunk 0 only.datafusion/datasource-parquet/src/access_plan.rsanddatafusion/datasource-parquet/src/source.rsapplied clean.Test plan
cargo check -p datafusion-datasource-parquetcargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warningscargo test -p datafusion-datasource-parquet --lib— 156 passed, includingtest_row_group_split_*and fulltest_reverse_scan_*suitecargo fmt --allclippy::mutable_key_typeerror indatafusion-exprthat fails workspace clippy.🤖 Generated with Claude Code