Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,44 @@ This command will:
```

This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.

## Sort Pushdown

Benchmarks for sort pushdown optimizations on TPC-H lineitem data (SF=1).

### Variants

| Benchmark | Description |
|-----------|-------------|
| `sort_pushdown` | Baseline — no `WITH ORDER`, tests standard sort behavior |
| `sort_pushdown_sorted` | With `WITH ORDER` — tests sort elimination on sorted files |
| `sort_pushdown_inexact` | Inexact path (`--sorted` DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder |
| `sort_pushdown_inexact_unsorted` | No `WITH ORDER` — same data, tests Unsupported path + RG reorder |
| `sort_pushdown_inexact_overlap` | Multi-file scrambled RGs — streaming data scenario |

### Queries

**sort_pushdown / sort_pushdown_sorted** (q1-q8):
- q1-q4: ASC queries (sort elimination with `--sorted`)
- q5-q8: DESC LIMIT queries (reverse scan + TopK optimization with `--sorted`)

**sort_pushdown_inexact** (q1-q4): DESC LIMIT queries on scrambled data

### Data Generation

The inexact/overlap data requires pyarrow (`pip install pyarrow`) to generate
multi-file parquet with scrambled row group order. DataFusion's COPY cannot produce
narrow-range RGs in scrambled order because the parquet writer merges rows from
adjacent chunks at RG boundaries.

### Running

```bash
# Generate data and run all sort pushdown benchmarks
./bench.sh data sort_pushdown
./bench.sh data sort_pushdown_inexact
./bench.sh run sort_pushdown
./bench.sh run sort_pushdown_sorted
./bench.sh run sort_pushdown_inexact
./bench.sh run sort_pushdown_inexact_overlap
```
154 changes: 110 additions & 44 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
# Sort Pushdown Benchmarks
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario)

# Sorted Data Benchmarks (ORDER BY Optimization)
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
Expand Down Expand Up @@ -1154,84 +1154,150 @@ run_sort_pushdown_sorted() {

# Generates data for sort pushdown Inexact benchmark.
#
# Produces a single large lineitem parquet file where row groups have
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
# sorted, RGs shuffled). This simulates append-heavy workloads where data
# is written in batches at different times.
# Produces multiple parquet files where each file has MULTIPLE row groups
# with scrambled RG order. This tests both:
# - Row-group-level reorder within each file (reorder_by_statistics)
# - TopK threshold initialization from RG statistics
#
# Strategy:
# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total).
# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the
# RG order within each file using a deterministic permutation.
# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges
# but in scrambled order.
#
# Writing a single file with ORDER BY scramble does NOT work: the parquet
# writer merges rows from adjacent chunks at RG boundaries, widening
# ranges and defeating reorder_by_statistics.
#
# Requires pyarrow (pip install pyarrow).
data_sort_pushdown_inexact() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
return
fi

echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
# Check pyarrow dependency (needed to split/scramble RGs)
if ! python3 -c "import pyarrow" 2>/dev/null; then
echo "Error: pyarrow is required for sort pushdown Inexact data generation."
echo "Install with: pip install pyarrow"
return 1
fi

echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..."

# Re-use the sort_pushdown data as the source (generate if missing)
data_sort_pushdown

mkdir -p "${INEXACT_DIR}"
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"

# Use datafusion-cli to bucket rows into 64 groups by a deterministic
# scrambler, then sort within each bucket by orderkey. This produces
# ~64 RG-sized segments where each has a tight orderkey range but the
# segments appear in scrambled (non-sorted) order in the file.
# Step 1: Write a single sorted file with small (100K-row) RGs
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

COPY (
SELECT * FROM src
ORDER BY
(l_orderkey * 1664525 + 1013904223) % 64,
l_orderkey
)
TO '${INEXACT_DIR}/shuffled.parquet'
COPY (SELECT * FROM src ORDER BY l_orderkey)
TO '${TMPFILE}'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
# Step 2: Redistribute RGs into 3 files with scrambled RG order.
# Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file,
# permutation (rg_idx * 41 + 7) % n scrambles the order within file.
python3 -c "
Comment thread
zhuqi-lucas marked this conversation as resolved.
import pyarrow.parquet as pq

pf = pq.ParquetFile('${TMPFILE}')
n = pf.metadata.num_row_groups
n_files = 3

# Assign each RG to a file, scramble order within each file
file_rgs = [[] for _ in range(n_files)]
for rg_idx in range(n):
slot = (rg_idx * 41 + 7) % n # scrambled index
file_id = slot % n_files
file_rgs[file_id].append(rg_idx)

# Write each file with its assigned RGs (in scrambled order)
for file_id in range(n_files):
rgs = file_rgs[file_id]
if not rgs:
continue
tables = [pf.read_row_group(rg) for rg in rgs]
writer = pq.ParquetWriter(
'${INEXACT_DIR}/part_%03d.parquet' % file_id,
pf.schema_arrow)
for t in tables:
writer.write_table(t)
writer.close()
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
"

rm -f "${TMPFILE}"
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
ls -la "${INEXACT_DIR}"

# Also generate a file with partially overlapping row groups.
# Simulates streaming data with network delays: each chunk is mostly
# in order but has a small overlap with the next chunk (±5% of the
# chunk range). This is the pattern described by @adriangb — data
# arriving with timestamps that are generally increasing but with
# network-induced jitter causing small overlaps between row groups.
# Also generate overlap data: same strategy but with different file count
# and permutation. Simulates streaming data with network delays where
# chunks arrive out of sequence.
#
# Requires pyarrow (pip install pyarrow).
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
return
fi

echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..."
mkdir -p "${OVERLAP_DIR}"

# Step 1: Write a single sorted file with small (100K-row) RGs
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

-- Add jitter to l_orderkey: shift each row by a random-ish offset
-- proportional to its position. This creates overlap between adjacent
-- row groups while preserving the general ascending trend.
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
COPY (
SELECT * FROM src
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
)
TO '${OVERLAP_DIR}/overlapping.parquet'
COPY (SELECT * FROM src ORDER BY l_orderkey)
TO '${TMPFILE}'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
ls -la "${OVERLAP_DIR}"
# Step 2: Redistribute into 5 files with scrambled RG order.
python3 -c "
import pyarrow.parquet as pq

pf = pq.ParquetFile('${TMPFILE}')
n = pf.metadata.num_row_groups
n_files = 5

file_rgs = [[] for _ in range(n_files)]
for rg_idx in range(n):
slot = (rg_idx * 37 + 13) % n
file_id = slot % n_files
file_rgs[file_id].append(rg_idx)

for file_id in range(n_files):
rgs = file_rgs[file_id]
if not rgs:
continue
tables = [pf.read_row_group(rg) for rg in rgs]
writer = pq.ParquetWriter(
'${OVERLAP_DIR}/part_%03d.parquet' % file_id,
pf.schema_arrow)
for t in tables:
writer.write_table(t)
writer.close()
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
"

rm -f "${TMPFILE}"
}

# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
Expand All @@ -1240,7 +1306,7 @@ data_sort_pushdown_inexact() {
run_sort_pushdown_inexact() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json"
echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..."
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}
Expand All @@ -1256,13 +1322,13 @@ run_sort_pushdown_inexact_unsorted() {
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort pushdown benchmark with partially overlapping RGs.
# Simulates streaming data with network jitterRGs are mostly in order
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
# Runs the sort pushdown benchmark with multi-file scrambled RG order.
# Simulates streaming data with network delaysmultiple files, each with
# scrambled RGs. Tests both RG-level reorder and TopK stats initialization.
run_sort_pushdown_inexact_overlap() {
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}
Expand Down
7 changes: 7 additions & 0 deletions benchmarks/queries/sort_pushdown/q5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Reverse scan: ORDER BY DESC LIMIT (narrow projection)
Comment thread
zhuqi-lucas marked this conversation as resolved.
-- With --sorted: reverse_row_groups=true + TopK + stats init + cumulative prune
-- Without --sorted: full TopK sort over all data
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown/q6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Reverse scan: ORDER BY DESC LIMIT larger fetch (narrow projection)
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown/q7.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Reverse scan: wide projection + DESC LIMIT
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown/q8.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Reverse scan: wide projection + DESC LIMIT larger fetch
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
Loading