feat: split Parquet files into row-group-sized morsels#10
feat: split Parquet files into row-group-sized morsels#10adriangb wants to merge 2 commits intoadriangb:mainfrom
Conversation
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false |
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
cancel benchmark #10 (comment) |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (311a854) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (311a854) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (311a854) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (311a854) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Root cause: stale base, not a morsel-split regressionThe apparent regression is a comparison artifact — this branch is based on Local reproduction (5 iterations, M-series)
The branch's parent is already ~82ms on Q24. apache#21351 alone takes it to 39ms. This PR neither helps nor hurts relative to its (pre-apache#21351) base — it's just missing the speedup. Investigation path
FixRebase onto current 🤖 Generated with Claude Code |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
311a854 to
5b0a69a
Compare
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (5b0a69a) to 7acbe03 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Each Parquet file previously produced a single morsel containing one `ParquetPushDecoder` over the full pruned `ParquetAccessPlan`. Morselize at row-group granularity instead: after all pruning work is done, pack surviving row groups into chunks bounded by a per-morsel row budget and compressed-byte budget (defaults: 100k rows, 64 MiB). Each chunk becomes its own stream so the executor can interleave row-group decode work with other operators and — in a follow-up — let sibling `FileStream`s steal row-group-sized units of work across partitions. A single oversized row group still becomes its own morsel; no sub-row-group splitting is introduced. `EarlyStoppingStream` (which is driven by the non-Clone `FilePruner`) is attached only to the first morsel's stream so the whole file can still short-circuit on dynamic-filter narrowing. Row-group reversal is applied per-chunk on the `PreparedAccessPlan` and the chunk list is reversed so reverse output order is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5b0a69a to
ff805cf
Compare
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
The previous `build_stream` built every morsel's `RowFilter`, `ParquetPushDecoder`, `AsyncFileReader`, and `Projector` eagerly in a single loop inside the file planner — before any morsel was scheduled. That loop ran on the scheduler thread and was visible as a 10–15% regression vs. main on ClickBench-partitioned queries that have many row-group morsels per file (e.g. Q15, Q16 at pushdown=off). Replace `ParquetStreamMorsel` (which held a pre-built `BoxStream`) with `ParquetLazyMorsel`, which holds only the per-chunk `ParquetAccessPlan` plus an `Arc<LazyMorselShared>` of the file-level state. The decoder and reader are constructed inside `Morsel::into_stream`, so each morsel pays its setup cost only when the scheduler actually picks it up, and the work is distributed across worker threads instead of serialised on the planner. `FilePruner` is `!Clone` and drives whole-file early-stop via `EarlyStoppingStream`, so it still lives on chunk 0's morsel only. The warm `async_file_reader` from metadata / page-index / bloom-filter load is dropped at the end of `build_stream` — every morsel mints a fresh reader via the factory at `into_stream` time. For both built-in factories (`DefaultParquetFileReaderFactory`, `CachedParquetFileReaderFactory`) the "warm cache" benefit of reusing a reader is negligible because the underlying `Arc<dyn ObjectStore>` / `Arc<dyn FileMetadataCache>` is already shared across readers, so the simplification is free. Local ClickBench-partitioned, 10 iterations, pushdown=off (M-series): | Query | main | eager (before) | lazy (this commit) | |-------|------:|---------------:|-------------------:| | Q14 | 325 | 335 | 313 ms | | Q15 | 309 | 358 | 302 ms | | Q16 | 911 | 1049 | 786 ms | | Q24 | 48 | 55 | 56 ms | | Q26 | 41 | 45 | 45 ms | Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Updated to the lazy-morsel variant ( The new commit replaces Local 10-iter ClickBench-partitioned, pushdown=off (M-series):
Full analysis + pushdown=on numbers in the isolation comment on PR #9. 🤖 Generated with Claude Code |
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmarks env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morsel-split (8ef53b7) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
Each Parquet file previously produced a single morsel containing one
ParquetPushDecoderover the full prunedParquetAccessPlan. Morselize at row-group granularity instead: after all pruning work is done, pack surviving row groups into chunks bounded by a per-morsel row budget and compressed-byte budget (defaults: 100k rows, 64 MiB). Each chunk becomes its own stream so the executor can interleave row-group decode work with other operators and — in a follow-up — let siblingFileStreams steal row-group-sized units of work across partitions.A single oversized row group still becomes its own morsel; no sub-row-group splitting is introduced.
EarlyStoppingStream(which is driven by the non-CloneFilePruner) is attached only to the first morsel's stream so the whole file can still short-circuit on dynamic-filter narrowing. Row-group reversal is applied per-chunk on thePreparedAccessPlanand the chunk list is reversed so reverse output order is preserved.Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?