diff --git a/benchmarks/README.md b/benchmarks/README.md index df602ea538102..a4ddb09e0771c 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -900,3 +900,44 @@ This command will: ``` This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations. + +## Sort Pushdown + +Benchmarks for sort pushdown optimizations on TPC-H lineitem data (SF=1). + +### Variants + +| Benchmark | Description | +|-----------|-------------| +| `sort_pushdown` | Baseline — no `WITH ORDER`, tests standard sort behavior | +| `sort_pushdown_sorted` | With `WITH ORDER` — tests sort elimination on sorted files | +| `sort_pushdown_inexact` | Inexact path (`--sorted` DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder | +| `sort_pushdown_inexact_unsorted` | No `WITH ORDER` — same data, tests Unsupported path + RG reorder | +| `sort_pushdown_inexact_overlap` | Multi-file scrambled RGs — streaming data scenario | + +### Queries + +**sort_pushdown / sort_pushdown_sorted** (q1-q8): +- q1-q4: ASC queries (sort elimination with `--sorted`) +- q5-q8: DESC LIMIT queries (reverse scan + TopK optimization with `--sorted`) + +**sort_pushdown_inexact** (q1-q4): DESC LIMIT queries on scrambled data + +### Data Generation + +The inexact/overlap data requires pyarrow (`pip install pyarrow`) to generate +multi-file parquet with scrambled row group order. DataFusion's COPY cannot produce +narrow-range RGs in scrambled order because the parquet writer merges rows from +adjacent chunks at RG boundaries. + +### Running + +```bash +# Generate data and run all sort pushdown benchmarks +./bench.sh data sort_pushdown +./bench.sh data sort_pushdown_inexact +./bench.sh run sort_pushdown +./bench.sh run sort_pushdown_sorted +./bench.sh run sort_pushdown_inexact +./bench.sh run sort_pushdown_inexact_overlap +``` diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index aa1ec477345c6..6c5e0e483e15c 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files -sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder -sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder -sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -1154,10 +1154,23 @@ run_sort_pushdown_sorted() { # Generates data for sort pushdown Inexact benchmark. # -# Produces a single large lineitem parquet file where row groups have -# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally -# sorted, RGs shuffled). This simulates append-heavy workloads where data -# is written in batches at different times. +# Produces multiple parquet files where each file has MULTIPLE row groups +# with scrambled RG order. This tests both: +# - Row-group-level reorder within each file (reorder_by_statistics) +# - TopK threshold initialization from RG statistics +# +# Strategy: +# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total). +# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the +# RG order within each file using a deterministic permutation. +# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges +# but in scrambled order. +# +# Writing a single file with ORDER BY scramble does NOT work: the parquet +# writer merges rows from adjacent chunks at RG boundaries, widening +# ranges and defeating reorder_by_statistics. +# +# Requires pyarrow (pip install pyarrow). data_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then @@ -1165,7 +1178,14 @@ data_sort_pushdown_inexact() { return fi - echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + # Check pyarrow dependency (needed to split/scramble RGs) + if ! python3 -c "import pyarrow" 2>/dev/null; then + echo "Error: pyarrow is required for sort pushdown Inexact data generation." + echo "Install with: pip install pyarrow" + return 1 + fi + + echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..." # Re-use the sort_pushdown data as the source (generate if missing) data_sort_pushdown @@ -1173,65 +1193,111 @@ data_sort_pushdown_inexact() { mkdir -p "${INEXACT_DIR}" SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" - # Use datafusion-cli to bucket rows into 64 groups by a deterministic - # scrambler, then sort within each bucket by orderkey. This produces - # ~64 RG-sized segments where each has a tight orderkey range but the - # segments appear in scrambled (non-sorted) order in the file. + # Step 1: Write a single sorted file with small (100K-row) RGs + TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet" (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - COPY ( - SELECT * FROM src - ORDER BY - (l_orderkey * 1664525 + 1013904223) % 64, - l_orderkey - ) - TO '${INEXACT_DIR}/shuffled.parquet' + COPY (SELECT * FROM src ORDER BY l_orderkey) + TO '${TMPFILE}' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" + # Step 2: Redistribute RGs into 3 files with scrambled RG order. + # Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file, + # permutation (rg_idx * 41 + 7) % n scrambles the order within file. + python3 -c " +import pyarrow.parquet as pq + +pf = pq.ParquetFile('${TMPFILE}') +n = pf.metadata.num_row_groups +n_files = 3 + +# Assign each RG to a file, scramble order within each file +file_rgs = [[] for _ in range(n_files)] +for rg_idx in range(n): + slot = (rg_idx * 41 + 7) % n # scrambled index + file_id = slot % n_files + file_rgs[file_id].append(rg_idx) + +# Write each file with its assigned RGs (in scrambled order) +for file_id in range(n_files): + rgs = file_rgs[file_id] + if not rgs: + continue + tables = [pf.read_row_group(rg) for rg in rgs] + writer = pq.ParquetWriter( + '${INEXACT_DIR}/part_%03d.parquet' % file_id, + pf.schema_arrow) + for t in tables: + writer.write_table(t) + writer.close() + print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') +" + + rm -f "${TMPFILE}" + echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}" ls -la "${INEXACT_DIR}" - # Also generate a file with partially overlapping row groups. - # Simulates streaming data with network delays: each chunk is mostly - # in order but has a small overlap with the next chunk (±5% of the - # chunk range). This is the pattern described by @adriangb — data - # arriving with timestamps that are generally increasing but with - # network-induced jitter causing small overlaps between row groups. + # Also generate overlap data: same strategy but with different file count + # and permutation. Simulates streaming data with network delays where + # chunks arrive out of sequence. + # + # Requires pyarrow (pip install pyarrow). OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" return fi - echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." + echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..." mkdir -p "${OVERLAP_DIR}" + # Step 1: Write a single sorted file with small (100K-row) RGs + TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet" (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " CREATE EXTERNAL TABLE src STORED AS PARQUET LOCATION '${SRC_DIR}'; - -- Add jitter to l_orderkey: shift each row by a random-ish offset - -- proportional to its position. This creates overlap between adjacent - -- row groups while preserving the general ascending trend. - -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 - -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. - COPY ( - SELECT * FROM src - ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 - ) - TO '${OVERLAP_DIR}/overlapping.parquet' + COPY (SELECT * FROM src ORDER BY l_orderkey) + TO '${TMPFILE}' STORED AS PARQUET OPTIONS ('format.max_row_group_size' '100000'); ") - echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" - ls -la "${OVERLAP_DIR}" + # Step 2: Redistribute into 5 files with scrambled RG order. + python3 -c " +import pyarrow.parquet as pq + +pf = pq.ParquetFile('${TMPFILE}') +n = pf.metadata.num_row_groups +n_files = 5 + +file_rgs = [[] for _ in range(n_files)] +for rg_idx in range(n): + slot = (rg_idx * 37 + 13) % n + file_id = slot % n_files + file_rgs[file_id].append(rg_idx) + +for file_id in range(n_files): + rgs = file_rgs[file_id] + if not rgs: + continue + tables = [pf.read_row_group(rg) for rg in rgs] + writer = pq.ParquetWriter( + '${OVERLAP_DIR}/part_%03d.parquet' % file_id, + pf.schema_arrow) + for t in tables: + writer.write_table(t) + writer.close() + print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs') +" + + rm -f "${TMPFILE}" } # Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). @@ -1240,7 +1306,7 @@ data_sort_pushdown_inexact() { run_sort_pushdown_inexact() { INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" - echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." + echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -1256,13 +1322,13 @@ run_sort_pushdown_inexact_unsorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } -# Runs the sort pushdown benchmark with partially overlapping RGs. -# Simulates streaming data with network jitter — RGs are mostly in order -# but have small overlaps (±2500 orderkey jitter between adjacent RGs). +# Runs the sort pushdown benchmark with multi-file scrambled RG order. +# Simulates streaming data with network delays — multiple files, each with +# scrambled RGs. Tests both RG-level reorder and TopK stats initialization. run_sort_pushdown_inexact_overlap() { OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" - echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." + echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..." DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } diff --git a/benchmarks/queries/sort_pushdown/q5.sql b/benchmarks/queries/sort_pushdown/q5.sql new file mode 100644 index 0000000000000..60ad636ad3c9c --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q5.sql @@ -0,0 +1,7 @@ +-- Reverse scan: ORDER BY DESC LIMIT (narrow projection) +-- With --sorted: reverse_row_groups=true + TopK + stats init + cumulative prune +-- Without --sorted: full TopK sort over all data +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown/q6.sql b/benchmarks/queries/sort_pushdown/q6.sql new file mode 100644 index 0000000000000..d36a35a1e5a0d --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q6.sql @@ -0,0 +1,5 @@ +-- Reverse scan: ORDER BY DESC LIMIT larger fetch (narrow projection) +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown/q7.sql b/benchmarks/queries/sort_pushdown/q7.sql new file mode 100644 index 0000000000000..3e8856822d83d --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q7.sql @@ -0,0 +1,5 @@ +-- Reverse scan: wide projection + DESC LIMIT +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown/q8.sql b/benchmarks/queries/sort_pushdown/q8.sql new file mode 100644 index 0000000000000..95ba89fdd5089 --- /dev/null +++ b/benchmarks/queries/sort_pushdown/q8.sql @@ -0,0 +1,5 @@ +-- Reverse scan: wide projection + DESC LIMIT larger fetch +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000