From 0b6090423c996d410bc0d4ff981e529780208674 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 18 Apr 2026 13:05:02 +0800 Subject: [PATCH 1/3] fix: generate multi-file benchmark data with scrambled RG order Both the inexact and overlap benchmark data generation had problems: 1. The original single-file ORDER BY approach caused the parquet writer to merge rows from adjacent chunks at RG boundaries, widening RG ranges to ~6M and making reorder_by_statistics a no-op. 2. The per-file split fix (one RG per file) meant reorder_by_statistics had nothing to reorder within each file, since each had only 1 RG. RG reorder is an intra-file optimization. Fix by generating multiple files where each file has MULTIPLE row groups with scrambled order: - inexact: 3 files x ~20 RGs each (scrambled within each file) - overlap: 5 files x ~12 RGs each (different permutation) Each RG has a narrow l_orderkey range (~100K) but appears in scrambled order within its file. This properly tests: - Row-group-level reorder (reorder_by_statistics within each file) - TopK threshold initialization from RG statistics - File-level ordering effects Uses pyarrow to read RGs from a sorted temp file and redistribute them into multiple output files with scrambled RG order. --- benchmarks/bench.sh | 147 +++++++++++++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 44 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index aa1ec477345c6..94ba8b4a093c7 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files -sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder -sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder -sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -1154,10 +1154,23 @@ run_sort_pushdown_sorted() { # Generates data for sort pushdown Inexact benchmark. # -# Produces a single large lineitem parquet file where row groups have -# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally -# sorted, RGs shuffled). This simulates append-heavy workloads where data -# is written in batches at different times. +# Produces multiple parquet files where each file has MULTIPLE row groups +# with scrambled RG order. This tests both: +# - Row-group-level reorder within each file (reorder_by_statistics) +# - TopK threshold initialization from RG statistics +# +# Strategy: +# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total). +# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the +# RG order within each file using a deterministic permutation. +# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges +# but in scrambled order. +# +# Writing a single file with ORDER BY scramble does NOT work: the parquet +# writer merges rows from adjacent chunks at RG boundaries, widening +# ranges and defeating reorder_by_statistics. +# +# Requires pyarrow (pip install pyarrow). data_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then @@ -1165,7 +1178,7 @@ data_sort_pushdown_inexact() { return fi - echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..." # Re-use the sort_pushdown data as the source (generate if missing) data_sort_pushdown @@ -1173,65 +1186,111 @@ data_sort_pushdown_inexact() { mkdir -p "${INEXACT_DIR}" SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" - # Use datafusion-cli to bucket rows into 64 groups by a deterministic - # scrambler, then sort within each bucket by orderkey. This produces - # ~64 RG-sized segments where each has a tight orderkey range but the - # segments appear in scrambled (non-sorted) order in the file. + # Step 1: Write a single sorted file with small (100K-row) RGs + TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet" (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - COPY ( - SELECT * FROM src - ORDER BY - (l_orderkey * 1664525 + 1013904223) % 64, - l_orderkey - ) - TO '${INEXACT_DIR}/shuffled.parquet' + COPY (SELECT * FROM src ORDER BY l_orderkey) + TO '${TMPFILE}' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" + # Step 2: Redistribute RGs into 3 files with scrambled RG order. + # Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file, + # permutation (rg_idx * 41 + 7) % n scrambles the order within file. + python3 -c " +import pyarrow.parquet as pq + +pf = pq.ParquetFile('${TMPFILE}') +n = pf.metadata.num_row_groups +n_files = 3 + +# Assign each RG to a file, scramble order within each file +file_rgs = [[] for _ in range(n_files)] +for rg_idx in range(n): + slot = (rg_idx * 41 + 7) % n # scrambled index + file_id = slot % n_files + file_rgs[file_id].append(rg_idx) + +# Write each file with its assigned RGs (in scrambled order) +for file_id in range(n_files): + rgs = file_rgs[file_id] + if not rgs: + continue + tables = [pf.read_row_group(rg) for rg in rgs] + writer = pq.ParquetWriter( + '${INEXACT_DIR}/part_%03d.parquet' % file_id, + pf.schema_arrow) + for t in tables: + writer.write_table(t) + writer.close() + print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') +" + + rm -f "${TMPFILE}" + echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}" ls -la "${INEXACT_DIR}" - # Also generate a file with partially overlapping row groups. - # Simulates streaming data with network delays: each chunk is mostly - # in order but has a small overlap with the next chunk (±5% of the - # chunk range). This is the pattern described by @adriangb — data - # arriving with timestamps that are generally increasing but with - # network-induced jitter causing small overlaps between row groups. + # Also generate overlap data: same strategy but with different file count + # and permutation. Simulates streaming data with network delays where + # chunks arrive out of sequence. + # + # Requires pyarrow (pip install pyarrow). OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" return fi - echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." + echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..." mkdir -p "${OVERLAP_DIR}" + # Step 1: Write a single sorted file with small (100K-row) RGs + TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet" (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - -- Add jitter to l_orderkey: shift each row by a random-ish offset - -- proportional to its position. This creates overlap between adjacent - -- row groups while preserving the general ascending trend. - -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 - -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. - COPY ( - SELECT * FROM src - ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 - ) - TO '${OVERLAP_DIR}/overlapping.parquet' + COPY (SELECT * FROM src ORDER BY l_orderkey) + TO '${TMPFILE}' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" - ls -la "${OVERLAP_DIR}" + # Step 2: Redistribute into 5 files with scrambled RG order. + python3 -c " +import pyarrow.parquet as pq + +pf = pq.ParquetFile('${TMPFILE}') +n = pf.metadata.num_row_groups +n_files = 5 + +file_rgs = [[] for _ in range(n_files)] +for rg_idx in range(n): + slot = (rg_idx * 37 + 13) % n + file_id = slot % n_files + file_rgs[file_id].append(rg_idx) + +for file_id in range(n_files): + rgs = file_rgs[file_id] + if not rgs: + continue + tables = [pf.read_row_group(rg) for rg in rgs] + writer = pq.ParquetWriter( + '${OVERLAP_DIR}/part_%03d.parquet' % file_id, + pf.schema_arrow) + for t in tables: + writer.write_table(t) + writer.close() + print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') +" + + rm -f "${TMPFILE}" } # Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). @@ -1240,7 +1299,7 @@ data_sort_pushdown_inexact() { run_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" - echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." + echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -1256,13 +1315,13 @@ run_sort_pushdown_inexact_unsorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } -# Runs the sort pushdown benchmark with partially overlapping RGs. -# Simulates streaming data with network jitter — RGs are mostly in order -# but have small overlaps (±2500 orderkey jitter between adjacent RGs). +# Runs the sort pushdown benchmark with multi-file scrambled RG order. +# Simulates streaming data with network delays — multiple files, each with +# scrambled RGs. Tests both RG-level reorder and TopK stats initialization. run_sort_pushdown_inexact_overlap() { OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" - echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." + echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } From a2dd162a26152120a513594204661fd86de85db5 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 21 Apr 2026 16:12:51 +0800 Subject: [PATCH 2/3] feat: add DESC LIMIT queries to sort_pushdown benchmark Add q5-q8: ORDER BY l_orderkey DESC LIMIT queries on sorted data. These test the reverse scan + TopK optimization path (Inexact sort pushdown) which benefits from RG reorder, stats init, and cumulative pruning. q5: DESC LIMIT 100 (narrow projection) q6: DESC LIMIT 1000 (narrow projection) q7: DESC LIMIT 100 (wide projection) q8: DESC LIMIT 1000 (wide projection) --- benchmarks/queries/sort_pushdown/q5.sql | 7 +++++++ benchmarks/queries/sort_pushdown/q6.sql | 5 +++++ benchmarks/queries/sort_pushdown/q7.sql | 5 +++++ benchmarks/queries/sort_pushdown/q8.sql | 5 +++++ 4 files changed, 22 insertions(+) create mode 100644 benchmarks/queries/sort_pushdown/q5.sql create mode 100644 benchmarks/queries/sort_pushdown/q6.sql create mode 100644 benchmarks/queries/sort_pushdown/q7.sql create mode 100644 benchmarks/queries/sort_pushdown/q8.sql diff --git a/benchmarks/queries/sort_pushdown/q5.sql b/benchmarks/queries/sort_pushdown/q5.sql new file mode 100644 index 0000000000000..60ad636ad3c9c --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q5.sql @@ -0,0 +1,7 @@ +-- Reverse scan: ORDER BY DESC LIMIT (narrow projection) +-- With --sorted: reverse_row_groups=true + TopK + stats init + cumulative prune +-- Without --sorted: full TopK sort over all data +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown/q6.sql b/benchmarks/queries/sort_pushdown/q6.sql new file mode 100644 index 0000000000000..d36a35a1e5a0d --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q6.sql @@ -0,0 +1,5 @@ +-- Reverse scan: ORDER BY DESC LIMIT larger fetch (narrow projection) +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown/q7.sql b/benchmarks/queries/sort_pushdown/q7.sql new file mode 100644 index 0000000000000..3e8856822d83d --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q7.sql @@ -0,0 +1,5 @@ +-- Reverse scan: wide projection + DESC LIMIT +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown/q8.sql b/benchmarks/queries/sort_pushdown/q8.sql new file mode 100644 index 0000000000000..95ba89fdd5089 --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q8.sql @@ -0,0 +1,5 @@ +-- Reverse scan: wide projection + DESC LIMIT larger fetch +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 From e7c302be58531b8515a6fb56801fcaa6d5324d69 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 22 Apr 2026 12:07:19 +0800 Subject: [PATCH 3/3] address review: add pyarrow check + document sort pushdown benchmarks - Add pyarrow dependency check with helpful error message before python3 blocks in data_sort_pushdown_inexact() - Add Sort Pushdown section to benchmarks/README.md documenting all variants, queries (q1-q8), data generation requirements, and run commands --- benchmarks/README.md | 41 +++++++++++++++++++++++++++++++++++++++++ benchmarks/bench.sh | 7 +++++++ 2 files changed, 48 insertions(+) diff --git a/benchmarks/README.md b/benchmarks/README.md index df602ea538102..a4ddb09e0771c 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -900,3 +900,44 @@ This command will: ``` This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations. + +## Sort Pushdown + +Benchmarks for sort pushdown optimizations on TPC-H lineitem data (SF=1). + +### Variants + +| Benchmark | Description | +|-----------|-------------| +| `sort_pushdown` | Baseline — no `WITH ORDER`, tests standard sort behavior | +| `sort_pushdown_sorted` | With `WITH ORDER` — tests sort elimination on sorted files | +| `sort_pushdown_inexact` | Inexact path (`--sorted` DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder | +| `sort_pushdown_inexact_unsorted` | No `WITH ORDER` — same data, tests Unsupported path + RG reorder | +| `sort_pushdown_inexact_overlap` | Multi-file scrambled RGs — streaming data scenario | + +### Queries + +**sort_pushdown / sort_pushdown_sorted** (q1-q8): +- q1-q4: ASC queries (sort elimination with `--sorted`) +- q5-q8: DESC LIMIT queries (reverse scan + TopK optimization with `--sorted`) + +**sort_pushdown_inexact** (q1-q4): DESC LIMIT queries on scrambled data + +### Data Generation + +The inexact/overlap data requires pyarrow (`pip install pyarrow`) to generate +multi-file parquet with scrambled row group order. DataFusion's COPY cannot produce +narrow-range RGs in scrambled order because the parquet writer merges rows from +adjacent chunks at RG boundaries. + +### Running + +```bash +# Generate data and run all sort pushdown benchmarks +./bench.sh data sort_pushdown +./bench.sh data sort_pushdown_inexact +./bench.sh run sort_pushdown +./bench.sh run sort_pushdown_sorted +./bench.sh run sort_pushdown_inexact +./bench.sh run sort_pushdown_inexact_overlap +``` diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 94ba8b4a093c7..6c5e0e483e15c 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -1178,6 +1178,13 @@ data_sort_pushdown_inexact() { return fi + # Check pyarrow dependency (needed to split/scramble RGs) + if ! python3 -c "import pyarrow" 2>/dev/null; then + echo "Error: pyarrow is required for sort pushdown Inexact data generation." + echo "Install with: pip install pyarrow" + return 1 + fi + echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..." # Re-use the sort_pushdown data as the source (generate if missing)