diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh index 1ad2086d..0d235420 100755 --- a/.github/scripts/run-benchmarks.sh +++ b/.github/scripts/run-benchmarks.sh @@ -1,43 +1,239 @@ #!/bin/bash -# Run compare_ffi benchmarks and produce github-action-benchmark JSON. -# Output: benchmark-results.json (customSmallerIsBetter format — lower time = better) +# Run the Criterion benchmark matrix and produce: +# - benchmark-results.json for github-action-benchmark +# - benchmark-report.md for human review +# +# Output format note: +# - benchmark JSON uses customSmallerIsBetter (lower ms/iter is better) +# - report markdown also includes per-scenario compression size + ratio summaries set -eo pipefail -echo "Running benchmarks..." >&2 +echo "Running benchmark matrix..." >&2 -# Run criterion benchmarks, capture output -cargo bench --bench compare_ffi -p structured-zstd -- --output-format bencher | tee /tmp/bench-raw.txt +if [ -n "${GITHUB_ACTIONS:-}" ] && [ -z "${STRUCTURED_ZSTD_BENCH_LARGE_BYTES:-}" ]; then + export STRUCTURED_ZSTD_BENCH_LARGE_BYTES=16777216 +fi +BENCH_RAW_FILE="$(mktemp -t structured-zstd-bench-raw.XXXXXX)" +trap 'rm -f "$BENCH_RAW_FILE"' EXIT + +export STRUCTURED_ZSTD_EMIT_REPORT=1 +cargo bench --bench compare_ffi -p structured-zstd -- --output-format bencher | tee "$BENCH_RAW_FILE" echo "Parsing results..." >&2 -# Parse criterion bencher output into github-action-benchmark JSON -# Format: "test ... bench: ns/iter (+/- )" -python3 - <<'PYEOF' -import json, re, sys - -results = [] -with open("/tmp/bench-raw.txt") as f: - for line in f: - m = re.match(r"test (\S+)\s+\.\.\. bench:\s+([\d,]+) ns/iter", line) - if m: - name = m.group(1) - ns = int(m.group(2).replace(",", "")) - # Convert ns to ms for readability +BENCH_RAW_FILE="$BENCH_RAW_FILE" python3 - <<'PYEOF' +import json +import os +import re +import sys + +BENCH_RE = re.compile(r"test (\S+)\s+\.\.\. bench:\s+([\d,]+) ns/iter") +REPORT_RE = re.compile( + r'^REPORT scenario=(\S+) label="((?:[^"\\]|\\.)+)" level=(\S+) input_bytes=(\d+) rust_bytes=(\d+) ffi_bytes=(\d+) rust_ratio=([0-9.]+) ffi_ratio=([0-9.]+)$' +) +MEM_RE = re.compile( + r'^REPORT_MEM scenario=(\S+) label="((?:[^"\\]|\\.)+)" level=(\S+) stage=(\S+) rust_buffer_bytes_estimate=(\d+) ffi_buffer_bytes_estimate=(\d+)$' +) +DICT_RE = re.compile( + r'^REPORT_DICT scenario=(\S+) label="((?:[^"\\]|\\.)+)" level=(\S+) dict_bytes=(\d+) train_ms=([0-9.]+) ffi_no_dict_bytes=(\d+) ffi_with_dict_bytes=(\d+) ffi_no_dict_ratio=([0-9.]+) ffi_with_dict_ratio=([0-9.]+)$' +) + +def unescape_report_label(value): + output = [] + i = 0 + while i < len(value): + ch = value[i] + if ch == "\\" and i + 1 < len(value): + i += 1 + output.append(value[i]) + else: + output.append(ch) + i += 1 + return "".join(output) + +def markdown_table_escape(value): + escaped = value.strip() + escaped = escaped.replace("\\", "\\\\") + escaped = escaped.replace("|", "\\|") + escaped = escaped.replace("`", "\\`") + escaped = escaped.replace("[", "\\[") + escaped = escaped.replace("]", "\\]") + escaped = escaped.replace("*", "\\*") + escaped = escaped.replace("_", "\\_") + escaped = escaped.replace("<", "<") + escaped = escaped.replace(">", ">") + escaped = escaped.replace("%", "%") + return escaped.replace("\n", "
") + +benchmark_results = [] +timings = [] +ratios = [] +memory_rows = [] +dictionary_rows = [] +raw_path = os.environ["BENCH_RAW_FILE"] + +with open(raw_path) as f: + for raw_line in f: + line = raw_line.strip() + + bench_match = BENCH_RE.match(line) + if bench_match: + name = bench_match.group(1) + ns = int(bench_match.group(2).replace(",", "")) ms = ns / 1_000_000 - results.append({ + benchmark_results.append({ "name": name, "unit": "ms", "value": round(ms, 3), }) + timings.append((name, ms)) + continue -if not results: + report_match = REPORT_RE.match(line) + if report_match: + scenario, label, level, input_bytes, rust_bytes, ffi_bytes, rust_ratio, ffi_ratio = report_match.groups() + label = unescape_report_label(label) + ratios.append({ + "scenario": scenario, + "label": label, + "level": level, + "input_bytes": int(input_bytes), + "rust_bytes": int(rust_bytes), + "ffi_bytes": int(ffi_bytes), + "rust_ratio": float(rust_ratio), + "ffi_ratio": float(ffi_ratio), + }) + continue + + mem_match = MEM_RE.match(line) + if mem_match: + ( + scenario, + label, + level, + stage, + rust_buffer_bytes_estimate, + ffi_buffer_bytes_estimate, + ) = mem_match.groups() + label = unescape_report_label(label) + memory_rows.append({ + "scenario": scenario, + "label": label, + "level": level, + "stage": stage, + "rust_buffer_bytes_estimate": int(rust_buffer_bytes_estimate), + "ffi_buffer_bytes_estimate": int(ffi_buffer_bytes_estimate), + }) + continue + + dict_match = DICT_RE.match(line) + if dict_match: + ( + scenario, + label, + level, + dict_bytes, + train_ms, + ffi_no_dict_bytes, + ffi_with_dict_bytes, + ffi_no_dict_ratio, + ffi_with_dict_ratio, + ) = dict_match.groups() + label = unescape_report_label(label) + dictionary_rows.append({ + "scenario": scenario, + "label": label, + "level": level, + "dict_bytes": int(dict_bytes), + "train_ms": float(train_ms), + "ffi_no_dict_bytes": int(ffi_no_dict_bytes), + "ffi_with_dict_bytes": int(ffi_with_dict_bytes), + "ffi_no_dict_ratio": float(ffi_no_dict_ratio), + "ffi_with_dict_ratio": float(ffi_with_dict_ratio), + }) + +if not benchmark_results: print("ERROR: No benchmark results parsed!", file=sys.stderr) sys.exit(1) +if not ratios: + print( + "ERROR: No REPORT ratio lines parsed; benchmark-report.md would have an empty ratio section.", + file=sys.stderr, + ) + sys.exit(1) + +if not memory_rows: + print("ERROR: No REPORT_MEM lines parsed; memory section would be empty.", file=sys.stderr) + sys.exit(1) + +if not dictionary_rows: + print("WARN: No REPORT_DICT lines parsed; dictionary section will be empty.", file=sys.stderr) + with open("benchmark-results.json", "w") as f: - json.dump(results, f, indent=2) + json.dump(benchmark_results, f, indent=2) + +lines = [ + "# Benchmark Report", + "", + "Generated by `.github/scripts/run-benchmarks.sh` from `cargo bench --bench compare_ffi`.", + "", + "## Compression Ratios", + "", + "| Scenario | Label | Level | Input bytes | Rust bytes | C bytes | Rust ratio | C ratio |", + "| --- | --- | --- | ---: | ---: | ---: | ---: | ---: |", +] + +for row in sorted(ratios, key=lambda item: (item["scenario"], item["level"])): + label = markdown_table_escape(row["label"]) + lines.append( + f'| {row["scenario"]} | {label} | {row["level"]} | {row["input_bytes"]} | {row["rust_bytes"]} | {row["ffi_bytes"]} | {row["rust_ratio"]:.4f} | {row["ffi_ratio"]:.4f} |' + ) + +lines.extend([ + "", + "## Buffer Size Estimates (Input + Output)", + "", + "| Scenario | Label | Level | Stage | Rust buffer bytes (estimate) | C buffer bytes (estimate) |", + "| --- | --- | --- | --- | ---: | ---: |", +]) + +for row in sorted(memory_rows, key=lambda item: (item["scenario"], item["level"], item["stage"])): + label = markdown_table_escape(row["label"]) + lines.append( + f'| {row["scenario"]} | {label} | {row["level"]} | {row["stage"]} | {row["rust_buffer_bytes_estimate"]} | {row["ffi_buffer_bytes_estimate"]} |' + ) + +lines.extend([ + "", + "## Dictionary Compression (C FFI)", + "", + "| Scenario | Label | Level | Dict bytes | Train ms | C bytes (no dict) | C bytes (with dict) | C ratio (no dict) | C ratio (with dict) |", + "| --- | --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", +]) + +for row in sorted(dictionary_rows, key=lambda item: (item["scenario"], item["level"])): + label = markdown_table_escape(row["label"]) + lines.append( + f'| {row["scenario"]} | {label} | {row["level"]} | {row["dict_bytes"]} | {row["train_ms"]:.3f} | {row["ffi_no_dict_bytes"]} | {row["ffi_with_dict_bytes"]} | {row["ffi_no_dict_ratio"]:.4f} | {row["ffi_with_dict_ratio"]:.4f} |' + ) + +lines.extend([ + "", + "## Timing Metrics", + "", + "| Benchmark | ms/iter |", + "| --- | ---: |", +]) + +for name, ms in sorted(timings): + lines.append(f"| `{name}` | {ms:.3f} |") + +with open("benchmark-report.md", "w") as f: + f.write("\n".join(lines) + "\n") -print(f"Wrote {len(results)} benchmark results to benchmark-results.json", file=sys.stderr) -for r in results: - print(f" {r['name']}: {r['value']} {r['unit']}", file=sys.stderr) +print(f"Wrote {len(benchmark_results)} timing results to benchmark-results.json", file=sys.stderr) +print(f"Wrote {len(ratios)} ratio rows to benchmark-report.md", file=sys.stderr) +print(f"Wrote {len(memory_rows)} memory rows to benchmark-report.md", file=sys.stderr) +print(f"Wrote {len(dictionary_rows)} dictionary rows to benchmark-report.md", file=sys.stderr) PYEOF diff --git a/.gitignore b/.gitignore index c9fa3b69..d2a3666c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,7 @@ Cargo.lock /orig-zstd fuzz_decodecorpus perf.data* +benchmark-results.json +benchmark-report.md fuzz/corpus .idea diff --git a/BENCHMARKS.md b/BENCHMARKS.md new file mode 100644 index 00000000..c603cc1c --- /dev/null +++ b/BENCHMARKS.md @@ -0,0 +1,75 @@ +# Benchmark Suite + +`structured-zstd` keeps its compression/decompression performance tracking in the Criterion bench +matrix at `zstd/benches/compare_ffi.rs`. + +## Scenarios + +The current matrix covers: + +- small random payloads (`1 KiB`, `10 KiB`) +- a small structured log payload (`4 KiB`) +- a repository corpus fixture (`decodecorpus_files/z000033`) +- high entropy random payloads (`1 MiB`) +- low entropy repeated payloads (`1 MiB`) +- a large structured stream (`100 MiB`) +- optional Silesia corpus files when `STRUCTURED_ZSTD_SILESIA_DIR=/path/to/silesia` is set + - load is bounded by `STRUCTURED_ZSTD_SILESIA_MAX_FILES` (default `12`) and + `STRUCTURED_ZSTD_SILESIA_MAX_FILE_BYTES` (default `67108864`) + +The local default for the large scenario is `100 MiB`. In GitHub Actions, when +`STRUCTURED_ZSTD_BENCH_LARGE_BYTES` is unset, `.github/scripts/run-benchmarks.sh` defaults it to +`16 MiB` to keep CI regression runs bounded while still exercising the same code path. + +## Level Mapping + +The benchmark suite only compares levels that are currently implemented end-to-end in the pure Rust +encoder: + +- `structured-zstd::Fastest` vs `zstd` level `1` +- `structured-zstd::Default` vs `zstd` level `3` + +`Better` and `Best` are intentionally excluded until the encoder implements them. + +Dictionary benchmarks are tracked separately with C FFI `with_dict` vs `without_dict` runs, using a +dictionary trained from scenario samples. Pure Rust dictionary compression is still pending and is +therefore not part of the pure-Rust-vs-C timing matrix yet. + +## Commands + +Run the full Criterion matrix: + +```bash +cargo bench --bench compare_ffi -p structured-zstd -- --output-format bencher +``` + +Generate the CI-style JSON and markdown report locally: + +```bash +bash .github/scripts/run-benchmarks.sh +``` + +Generate a flamegraph for a hot path: + +```bash +bash scripts/bench-flamegraph.sh +``` + +Override the benchmark targeted by the flamegraph script: + +```bash +bash scripts/bench-flamegraph.sh decompress/default/decodecorpus-z000033/matrix/pure_rust +``` + +## Outputs + +`run-benchmarks.sh` writes: + +- `benchmark-results.json` for GitHub regression tracking +- `benchmark-report.md` with: + - compression ratio tables (`REPORT`) + - input+output buffer size estimate tables (`REPORT_MEM`) + - dictionary compression tables (`REPORT_DICT`) + - timing rows for all benchmark functions + +Criterion also writes its usual detailed estimates under `target/criterion/`. diff --git a/README.md b/README.md index 076ad3a0..63beea4c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,14 @@ Pure Rust zstd implementation — managed fork of [ruzstd](https://github.com/Ki [![docs.rs](https://docs.rs/structured-zstd/badge.svg)](https://docs.rs/structured-zstd) [![License: Apache-2.0](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE) +## Benchmarks Dashboard + +Historical benchmark charts are published to GitHub Pages: + +- [Performance dashboard](https://structured-world.github.io/structured-zstd/dev/bench/) + +Note: the root Pages URL can be empty; benchmark charts live under `/dev/bench/`. + ## Managed Fork This is a **maintained fork** of [KillingSpark/zstd-rs](https://github.com/KillingSpark/zstd-rs) (ruzstd) by [Structured World Foundation](https://sw.foundation). We maintain additional features and hardening for the [CoordiNode](https://github.com/structured-world/coordinode) database engine. @@ -45,6 +53,10 @@ Complete RFC 8878 implementation. Performance: ~1.4-3.5x slower than C zstd depe When the `dict_builder` feature is enabled, the `dictionary` module can create raw content dictionaries. Within 0.2% of the official implementation on the `github-users` sample set. +## Benchmarking + +Performance tracking lives in [BENCHMARKS.md](BENCHMARKS.md). The suite compares `structured-zstd` against the C reference across small payloads, entropy extremes, a `100 MiB` large-stream scenario, repository corpus fixtures, and optional local Silesia corpora. Reports now include compression ratios, input+output buffer size estimates, and C FFI dictionary compression (with/without dictionary) for small and corpus scenarios. + ## Usage ### Compression diff --git a/scripts/bench-flamegraph.sh b/scripts/bench-flamegraph.sh new file mode 100755 index 00000000..fc48e475 --- /dev/null +++ b/scripts/bench-flamegraph.sh @@ -0,0 +1,40 @@ +#!/bin/bash +set -euo pipefail + +BENCH_FILTER="${1:-compress/default/large-log-stream/matrix/pure_rust}" +OUTPUT_DIR="${2:-target/flamegraph}" + +mkdir -p "$OUTPUT_DIR" + +echo "Generating flamegraph for benchmark filter: $BENCH_FILTER" >&2 +echo "Output directory: $OUTPUT_DIR" >&2 + +# Use BENCH_FLAMEGRAPH_USE_ROOT=1 to opt into running cargo flamegraph with --root. +EXTRA_FLAMEGRAPH_ARGS=() +if [[ "${BENCH_FLAMEGRAPH_USE_ROOT:-}" == "1" ]]; then + EXTRA_FLAMEGRAPH_ARGS+=(--root) +fi + +if cargo flamegraph \ + --bench compare_ffi \ + -p structured-zstd \ + ${EXTRA_FLAMEGRAPH_ARGS[@]+"${EXTRA_FLAMEGRAPH_ARGS[@]}"} \ + --output "$OUTPUT_DIR/${BENCH_FILTER//\//_}.svg" \ + -- \ + "$BENCH_FILTER"; then + : +else + status=$? + if [[ "${BENCH_FLAMEGRAPH_USE_ROOT:-}" != "1" ]]; then + cat >&2 <<'EOF' +cargo flamegraph failed. This may be due to insufficient permissions for perf. + +If you see a "Permission denied" or "not allowed to access CPU" error, try re-running with: + + BENCH_FLAMEGRAPH_USE_ROOT=1 sudo -E scripts/bench-flamegraph.sh "" "" + +or otherwise ensure perf has sufficient permissions. +EOF + fi + exit "$status" +fi diff --git a/zstd/Cargo.toml b/zstd/Cargo.toml index 51a5140c..0d435fc7 100644 --- a/zstd/Cargo.toml +++ b/zstd/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" homepage = "https://github.com/structured-world/structured-zstd" repository = "https://github.com/structured-world/structured-zstd" description = "Pure Rust zstd implementation — managed fork of ruzstd. Dictionary decompression, no FFI." -exclude = ["decodecorpus_files/*", "dict_tests/*", "fuzz_decodecorpus/*"] +exclude = ["dict_tests/*", "fuzz_decodecorpus/*", "decodecorpus_files/*"] # Package metadata points at a crate-local symlink so the packaged crate and repo root README stay in sync. readme = "README.md" keywords = ["zstd", "zstandard", "decompression", "compression", "pure-rust"] @@ -31,7 +31,7 @@ alloc = { version = "1.0.0", optional = true, package = "rustc-std-workspace-all [dev-dependencies] criterion = "0.5" rand = { version = "0.8.5", features = ["small_rng"] } -zstd = "0.13.2" +zstd = { version = "0.13.2", features = ["zdict_builder"] } [features] default = ["hash", "std"] diff --git a/zstd/benches/compare_ffi.rs b/zstd/benches/compare_ffi.rs index 2660702f..36aa06b0 100644 --- a/zstd/benches/compare_ffi.rs +++ b/zstd/benches/compare_ffi.rs @@ -1,72 +1,328 @@ -//! Comparison benchmark: structured-zstd (pure Rust) vs zstd (C FFI). +//! Comparison benchmark matrix: structured-zstd (pure Rust) vs zstd (C FFI). //! -//! Five variations: decompress (pure Rust/C FFI), compress (pure Rust/C FFI L1/L3). -//! Both decompress benchmarks allocate per-iteration for symmetric comparison. +//! The suite covers: +//! - small payloads (1-10 KiB) +//! - high entropy and low entropy payloads +//! - a large 100 MiB structured stream +//! - the repository decode corpus fixture +//! - optional Silesia corpus files via `STRUCTURED_ZSTD_SILESIA_DIR` +//! +//! Each run prints `REPORT ...` metadata lines that CI scripts can turn into a markdown report. -use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main}; +mod support; -/// Compressed corpus for decompression benchmarks. -const COMPRESSED_CORPUS: &[u8] = include_bytes!("../decodecorpus_files/z000033.zst"); +use criterion::{Criterion, SamplingMode, Throughput, black_box, criterion_group, criterion_main}; +use std::sync::OnceLock; +use std::time::{Duration, Instant}; +use structured_zstd::decoding::FrameDecoder; +use support::{LevelConfig, Scenario, ScenarioClass, benchmark_scenarios, supported_levels}; -fn bench_decompress(c: &mut Criterion) { - let mut group = c.benchmark_group("decompress"); - - // Pre-compute expected output length for assertions. - let expected_len = zstd::decode_all(COMPRESSED_CORPUS).unwrap().len(); - - // Pure Rust decompression — allocate per-iteration (symmetric with C FFI). - group.bench_function("pure_rust", |b| { - b.iter(|| { - let mut fr = structured_zstd::decoding::FrameDecoder::new(); - let mut target = vec![0u8; expected_len]; - let written = fr.decode_all(COMPRESSED_CORPUS, &mut target).unwrap(); - assert_eq!(written, expected_len); - }) - }); - - // C FFI decompression — allocates per-iteration via decode_all. - group.bench_function("c_ffi", |b| { - b.iter(|| { - let out = zstd::decode_all(COMPRESSED_CORPUS).unwrap(); - assert_eq!(out.len(), expected_len); - }) - }); - - group.finish(); +static BENCHMARK_SCENARIOS: OnceLock> = OnceLock::new(); + +fn benchmark_scenarios_cached() -> &'static [Scenario] { + BENCHMARK_SCENARIOS.get_or_init(benchmark_scenarios) +} + +fn emit_reports_enabled() -> bool { + std::env::var("STRUCTURED_ZSTD_EMIT_REPORT") + .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE")) + .unwrap_or(false) } fn bench_compress(c: &mut Criterion) { - // Get raw data by decompressing the corpus. - let raw_data = zstd::decode_all(COMPRESSED_CORPUS).unwrap(); - - let mut group = c.benchmark_group("compress"); - - // Pure Rust compression (Fastest level) - group.bench_with_input( - BenchmarkId::new("pure_rust", "fastest"), - &raw_data, - |b, data| { - b.iter(|| { - black_box(structured_zstd::encoding::compress_to_vec( - &data[..], - structured_zstd::encoding::CompressionLevel::Fastest, - )) - }) - }, + let emit_reports = emit_reports_enabled(); + for scenario in benchmark_scenarios_cached().iter() { + for level in supported_levels() { + if emit_reports { + let rust_compressed = structured_zstd::encoding::compress_to_vec( + &scenario.bytes[..], + level.rust_level, + ); + let ffi_compressed = + zstd::encode_all(&scenario.bytes[..], level.ffi_level).unwrap(); + emit_report_line(scenario, level, &rust_compressed, &ffi_compressed); + emit_memory_report( + scenario, + level, + "compress", + scenario.len() + rust_compressed.len(), + scenario.len() + ffi_compressed.len(), + ); + } + + let benchmark_name = format!("compress/{}/{}/{}", level.name, scenario.id, "matrix"); + let mut group = c.benchmark_group(benchmark_name); + configure_group(&mut group, scenario); + group.throughput(Throughput::Bytes(scenario.throughput_bytes())); + + group.bench_function("pure_rust", |b| { + b.iter(|| { + black_box(structured_zstd::encoding::compress_to_vec( + &scenario.bytes[..], + level.rust_level, + )) + }) + }); + + group.bench_function("c_ffi", |b| { + b.iter(|| { + black_box(zstd::encode_all(&scenario.bytes[..], level.ffi_level).unwrap()) + }) + }); + + group.finish(); + } + } +} + +fn bench_decompress(c: &mut Criterion) { + let emit_reports = emit_reports_enabled(); + for scenario in benchmark_scenarios_cached().iter() { + for level in supported_levels() { + let ffi_compressed = zstd::encode_all(&scenario.bytes[..], level.ffi_level).unwrap(); + let expected_len = scenario.len(); + if emit_reports { + emit_memory_report( + scenario, + level, + "decompress", + ffi_compressed.len() + expected_len, + ffi_compressed.len() + expected_len, + ); + } + let benchmark_name = format!("decompress/{}/{}/{}", level.name, scenario.id, "matrix"); + let mut group = c.benchmark_group(benchmark_name); + configure_group(&mut group, scenario); + group.throughput(Throughput::Bytes(scenario.throughput_bytes())); + + group.bench_function("pure_rust", |b| { + let mut target = vec![0u8; expected_len]; + let mut decoder = FrameDecoder::new(); + b.iter(|| { + let written = decoder.decode_all(&ffi_compressed, &mut target).unwrap(); + assert_eq!(written, expected_len); + }) + }); + + group.bench_function("c_ffi", |b| { + let mut decoder = zstd::bulk::Decompressor::new().unwrap(); + let mut output = Vec::with_capacity(expected_len); + b.iter(|| { + output.clear(); + let written = decoder + .decompress_to_buffer(&ffi_compressed[..], &mut output) + .unwrap(); + assert_eq!(written, expected_len); + assert_eq!(output.len(), expected_len); + }) + }); + + group.finish(); + } + } +} + +fn bench_dictionary(c: &mut Criterion) { + let emit_reports = emit_reports_enabled(); + for scenario in benchmark_scenarios_cached().iter() { + if !matches!(scenario.class, ScenarioClass::Small | ScenarioClass::Corpus) { + continue; + } + + let training_samples = split_training_samples(&scenario.bytes); + let sample_refs: Vec<&[u8]> = training_samples.iter().map(Vec::as_slice).collect(); + let total_training_bytes = sample_refs.iter().map(|sample| sample.len()).sum::(); + let dict_size = dictionary_size_for(scenario.len()) + .min(total_training_bytes.saturating_sub(64)) + .max(256); + let train_started = Instant::now(); + let Ok(dictionary) = zstd::dict::from_samples(&sample_refs, dict_size) else { + eprintln!( + "BENCH_WARN skipping dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={})", + scenario.id, + sample_refs.len(), + total_training_bytes, + dict_size + ); + continue; + }; + let train_ms = train_started.elapsed().as_secs_f64() * 1_000.0; + + for level in supported_levels() { + let mut no_dict = zstd::bulk::Compressor::new(level.ffi_level).unwrap(); + let mut with_dict = + zstd::bulk::Compressor::with_dictionary(level.ffi_level, &dictionary).unwrap(); + let no_dict_bytes = no_dict.compress(&scenario.bytes).unwrap(); + let with_dict_bytes = with_dict.compress(&scenario.bytes).unwrap(); + if emit_reports { + emit_dictionary_report( + scenario, + level, + dictionary.len(), + train_ms, + &no_dict_bytes, + &with_dict_bytes, + ); + } + + let benchmark_name = + format!("compress-dict/{}/{}/{}", level.name, scenario.id, "matrix"); + let mut group = c.benchmark_group(benchmark_name); + configure_group(&mut group, scenario); + group.throughput(Throughput::Bytes(scenario.throughput_bytes())); + + group.bench_function("c_ffi_without_dict", |b| { + let mut compressor = zstd::bulk::Compressor::new(level.ffi_level).unwrap(); + b.iter(|| black_box(compressor.compress(&scenario.bytes).unwrap())) + }); + + group.bench_function("c_ffi_with_dict", |b| { + let mut compressor = + zstd::bulk::Compressor::with_dictionary(level.ffi_level, &dictionary).unwrap(); + b.iter(|| black_box(compressor.compress(&scenario.bytes).unwrap())) + }); + + group.finish(); + } + } +} + +fn configure_group( + group: &mut criterion::BenchmarkGroup<'_, M>, + scenario: &Scenario, +) { + match scenario.class { + ScenarioClass::Small => { + group.sample_size(30); + group.measurement_time(Duration::from_secs(3)); + group.sampling_mode(SamplingMode::Flat); + } + ScenarioClass::Corpus | ScenarioClass::Entropy => { + group.sample_size(10); + group.measurement_time(Duration::from_secs(4)); + group.sampling_mode(SamplingMode::Flat); + } + ScenarioClass::Large | ScenarioClass::Silesia => { + group.sample_size(10); + group.measurement_time(Duration::from_secs(2)); + group.warm_up_time(Duration::from_millis(500)); + group.sampling_mode(SamplingMode::Flat); + } + } +} + +fn emit_memory_report( + scenario: &Scenario, + level: LevelConfig, + stage: &'static str, + rust_buffer_bytes_estimate: usize, + ffi_buffer_bytes_estimate: usize, +) { + let escaped_label = escape_report_label(&scenario.label); + println!( + "REPORT_MEM scenario={} label=\"{}\" level={} stage={} rust_buffer_bytes_estimate={} ffi_buffer_bytes_estimate={}", + scenario.id, + escaped_label, + level.name, + stage, + rust_buffer_bytes_estimate, + ffi_buffer_bytes_estimate ); +} - // C FFI compression (level 1 ≈ fastest) - group.bench_with_input(BenchmarkId::new("c_ffi", "level1"), &raw_data, |b, data| { - b.iter(|| black_box(zstd::encode_all(&data[..], 1).unwrap())) - }); +fn emit_report_line( + scenario: &Scenario, + level: LevelConfig, + rust_compressed: &[u8], + ffi_compressed: &[u8], +) { + let input_len = scenario.len() as f64; + let escaped_label = escape_report_label(&scenario.label); + let (rust_ratio, ffi_ratio) = if input_len > 0.0 { + ( + rust_compressed.len() as f64 / input_len, + ffi_compressed.len() as f64 / input_len, + ) + } else { + (0.0, 0.0) + }; + println!( + "REPORT scenario={} label=\"{}\" level={} input_bytes={} rust_bytes={} ffi_bytes={} rust_ratio={:.6} ffi_ratio={:.6}", + scenario.id, + escaped_label, + level.name, + scenario.len(), + rust_compressed.len(), + ffi_compressed.len(), + rust_ratio, + ffi_ratio + ); +} - // C FFI compression (level 3 ≈ default) - group.bench_with_input(BenchmarkId::new("c_ffi", "level3"), &raw_data, |b, data| { - b.iter(|| black_box(zstd::encode_all(&data[..], 3).unwrap())) - }); +fn emit_dictionary_report( + scenario: &Scenario, + level: LevelConfig, + dict_bytes: usize, + train_ms: f64, + no_dict_bytes: &[u8], + with_dict_bytes: &[u8], +) { + let input_len = scenario.len() as f64; + let escaped_label = escape_report_label(&scenario.label); + let (no_dict_ratio, with_dict_ratio) = if input_len > 0.0 { + ( + no_dict_bytes.len() as f64 / input_len, + with_dict_bytes.len() as f64 / input_len, + ) + } else { + (0.0, 0.0) + }; + println!( + "REPORT_DICT scenario={} label=\"{}\" level={} dict_bytes={} train_ms={:.3} ffi_no_dict_bytes={} ffi_with_dict_bytes={} ffi_no_dict_ratio={:.6} ffi_with_dict_ratio={:.6}", + scenario.id, + escaped_label, + level.name, + dict_bytes, + train_ms, + no_dict_bytes.len(), + with_dict_bytes.len(), + no_dict_ratio, + with_dict_ratio + ); +} + +fn split_training_samples(source: &[u8]) -> Vec> { + let sample_size = source.len().div_ceil(16).clamp(256, 8192); + let mut samples: Vec> = source + .chunks(sample_size) + .take(64) + .filter(|chunk| chunk.len() >= 64) + .map(|chunk| chunk.to_vec()) + .collect(); + if samples.len() < 2 { + let midpoint = source.len() / 2; + let left = &source[..midpoint]; + let right = &source[midpoint..]; + if left.len() >= 64 && right.len() >= 64 { + samples = vec![left.to_vec(), right.to_vec()]; + } else { + eprintln!( + "BENCH_WARN tiny dictionary training input ({} bytes), using a single sample fallback", + source.len() + ); + samples = vec![source.to_vec()]; + } + } + samples +} + +fn dictionary_size_for(input_len: usize) -> usize { + input_len.div_ceil(8).clamp(256, 16 * 1024) +} - group.finish(); +fn escape_report_label(label: &str) -> String { + label.replace('\\', "\\\\").replace('\"', "\\\"") } -criterion_group!(benches, bench_decompress, bench_compress); +criterion_group!(benches, bench_compress, bench_decompress, bench_dictionary); criterion_main!(benches); diff --git a/zstd/benches/support/mod.rs b/zstd/benches/support/mod.rs new file mode 100644 index 00000000..c8906822 --- /dev/null +++ b/zstd/benches/support/mod.rs @@ -0,0 +1,336 @@ +use rand::{RngCore, SeedableRng, rngs::SmallRng}; +use std::{collections::HashSet, env, fs, path::Path}; +use structured_zstd::encoding::CompressionLevel; + +pub(crate) struct Scenario { + pub(crate) id: String, + pub(crate) label: String, + pub(crate) bytes: Vec, + pub(crate) class: ScenarioClass, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum ScenarioClass { + Small, + Corpus, + Entropy, + Large, + Silesia, +} + +#[derive(Clone, Copy)] +pub(crate) struct LevelConfig { + pub(crate) name: &'static str, + pub(crate) rust_level: CompressionLevel, + pub(crate) ffi_level: i32, +} + +pub(crate) fn benchmark_scenarios() -> Vec { + let mut scenarios = vec![ + Scenario::new( + "small-1k-random", + "Small random payload (1 KiB)", + random_bytes(1024, 0x5EED_1000), + ScenarioClass::Small, + ), + Scenario::new( + "small-10k-random", + "Small random payload (10 KiB)", + random_bytes(10 * 1024, 0x0005_EED1_0000), + ScenarioClass::Small, + ), + Scenario::new( + "small-4k-log-lines", + "Small structured log lines (4 KiB)", + repeated_log_lines(4 * 1024), + ScenarioClass::Small, + ), + load_decode_corpus_scenario(), + Scenario::new( + "high-entropy-1m", + "High entropy random payload (1 MiB)", + random_bytes(1024 * 1024, 0xC0FF_EE11), + ScenarioClass::Entropy, + ), + Scenario::new( + "low-entropy-1m", + "Low entropy patterned payload (1 MiB)", + repeated_pattern_bytes(1024 * 1024), + ScenarioClass::Entropy, + ), + Scenario::new( + "large-log-stream", + "Large structured stream", + repeated_log_lines(large_stream_len()), + ScenarioClass::Large, + ), + ]; + + scenarios.extend(load_silesia_from_env()); + scenarios +} + +pub(crate) fn supported_levels() -> [LevelConfig; 2] { + [ + LevelConfig { + name: "fastest", + rust_level: CompressionLevel::Fastest, + ffi_level: 1, + }, + LevelConfig { + name: "default", + rust_level: CompressionLevel::Default, + ffi_level: 3, + }, + ] +} + +impl Scenario { + fn new( + id: impl Into, + label: impl Into, + bytes: Vec, + class: ScenarioClass, + ) -> Self { + Self { + id: id.into(), + label: label.into(), + bytes, + class, + } + } + + pub(crate) fn len(&self) -> usize { + self.bytes.len() + } + + #[allow(dead_code)] + pub(crate) fn is_empty(&self) -> bool { + self.bytes.is_empty() + } + + pub(crate) fn throughput_bytes(&self) -> u64 { + self.bytes.len() as u64 + } +} + +fn random_bytes(len: usize, seed: u64) -> Vec { + let mut rng = SmallRng::seed_from_u64(seed); + let mut bytes = vec![0u8; len]; + rng.fill_bytes(&mut bytes); + bytes +} + +fn repeated_pattern_bytes(len: usize) -> Vec { + let pattern = b"coordinode:segment:0001|tenant=demo|label=orders|"; + let mut bytes = Vec::with_capacity(len); + while bytes.len() < len { + let remaining = len - bytes.len(); + bytes.extend_from_slice(&pattern[..pattern.len().min(remaining)]); + } + bytes +} + +fn repeated_log_lines(len: usize) -> Vec { + const LINES: &[&str] = &[ + "ts=2026-03-26T21:39:28Z level=INFO msg=\"flush memtable\" tenant=demo table=orders region=eu-west\n", + "ts=2026-03-26T21:39:29Z level=INFO msg=\"rotate segment\" tenant=demo table=orders region=eu-west\n", + "ts=2026-03-26T21:39:30Z level=INFO msg=\"compact level\" tenant=demo table=orders region=eu-west\n", + "ts=2026-03-26T21:39:31Z level=INFO msg=\"write block\" tenant=demo table=orders region=eu-west\n", + ]; + + let mut bytes = Vec::with_capacity(len); + while bytes.len() < len { + for line in LINES { + if bytes.len() == len { + break; + } + let remaining = len - bytes.len(); + bytes.extend_from_slice(&line.as_bytes()[..line.len().min(remaining)]); + } + } + bytes +} + +fn load_silesia_from_env() -> Vec { + const DEFAULT_MAX_FILES: usize = 12; + const DEFAULT_MAX_FILE_BYTES: usize = 64 * 1024 * 1024; + let Some(dir) = env::var_os("STRUCTURED_ZSTD_SILESIA_DIR") else { + return Vec::new(); + }; + let max_files = env::var("STRUCTURED_ZSTD_SILESIA_MAX_FILES") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(DEFAULT_MAX_FILES); + let max_file_bytes = env::var("STRUCTURED_ZSTD_SILESIA_MAX_FILE_BYTES") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(DEFAULT_MAX_FILE_BYTES); + + let Ok(entries) = fs::read_dir(Path::new(&dir)) else { + eprintln!("BENCH_WARN failed to read STRUCTURED_ZSTD_SILESIA_DIR={dir:?}"); + return Vec::new(); + }; + + let mut paths = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + paths.push(path); + } + paths.sort(); + if paths.len() > max_files { + eprintln!( + "BENCH_WARN limiting Silesia fixtures to first {} sorted files in {}", + max_files, + Path::new(&dir).display() + ); + paths.truncate(max_files); + } + + let mut scenarios = Vec::new(); + let mut seen_silesia_ids = HashSet::new(); + for path in paths { + let Ok(metadata) = fs::metadata(&path) else { + eprintln!( + "BENCH_WARN failed to stat Silesia fixture {}", + path.display() + ); + continue; + }; + let file_len = metadata.len(); + if file_len > max_file_bytes as u64 { + eprintln!( + "BENCH_WARN skipping Silesia fixture {} ({} bytes > max {} bytes)", + path.display(), + file_len, + max_file_bytes + ); + continue; + } + + let Ok(bytes) = fs::read(&path) else { + eprintln!( + "BENCH_WARN failed to read Silesia fixture {}", + path.display() + ); + continue; + }; + if bytes.is_empty() { + eprintln!( + "BENCH_WARN skipping empty Silesia fixture {}", + path.display() + ); + continue; + } + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + let scenario_stem = sanitize_scenario_stem(file_name); + let scenario_id = + dedupe_scenario_id(format!("silesia-{scenario_stem}"), &mut seen_silesia_ids); + scenarios.push(Scenario::new( + scenario_id, + format!("Silesia corpus: {file_name}"), + bytes, + ScenarioClass::Silesia, + )); + } + + scenarios.sort_by(|left, right| left.id.cmp(&right.id)); + scenarios +} + +fn large_stream_len() -> usize { + env::var("STRUCTURED_ZSTD_BENCH_LARGE_BYTES") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(100 * 1024 * 1024) +} + +fn load_decode_corpus_scenario() -> Scenario { + const REAL_ID: &str = "decodecorpus-z000033"; + const REAL_LABEL: &str = "Repo decode corpus sample"; + const FALLBACK_ID: &str = "decodecorpus-synthetic-1m"; + const FALLBACK_LABEL: &str = "Synthetic decode corpus fallback (1 MiB)"; + + let manifest_dir = env::var("CARGO_MANIFEST_DIR").ok(); + let fixture_path = manifest_dir + .as_deref() + .map(Path::new) + .map(|dir| dir.join("decodecorpus_files/z000033")); + + if let Some(path) = fixture_path { + match fs::read(&path) { + Ok(bytes) if !bytes.is_empty() => { + return Scenario::new(REAL_ID, REAL_LABEL, bytes, ScenarioClass::Corpus); + } + Ok(_) => { + eprintln!( + "BENCH_WARN decode corpus fixture is empty at {}, using synthetic fallback", + path.display() + ); + } + Err(err) => { + eprintln!( + "BENCH_WARN failed to read decode corpus fixture at {}: {}. Using synthetic fallback", + path.display(), + err + ); + } + } + } else { + eprintln!( + "BENCH_WARN CARGO_MANIFEST_DIR is not set, using synthetic decode corpus fallback" + ); + } + + // Keep the benchmark matrix runnable from packaged sources where fixture files may be omitted. + Scenario::new( + FALLBACK_ID, + FALLBACK_LABEL, + repeated_log_lines(1024 * 1024), + ScenarioClass::Corpus, + ) +} + +fn sanitize_scenario_stem(stem: &str) -> String { + let mut sanitized = String::with_capacity(stem.len()); + for ch in stem.chars() { + if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '_' | '-') { + sanitized.push(ch); + } else { + sanitized.push('_'); + } + } + if sanitized.is_empty() { + "unnamed".to_string() + } else { + sanitized + } +} + +fn dedupe_scenario_id(base_id: String, seen_ids: &mut HashSet) -> String { + const MAX_SUFFIX: usize = 1_000_000; + + if seen_ids.insert(base_id.clone()) { + return base_id; + } + + for suffix in 2..=MAX_SUFFIX { + let candidate = format!("{base_id}-{suffix}"); + if seen_ids.insert(candidate.clone()) { + return candidate; + } + } + + panic!( + "failed to allocate unique scenario id for base '{}' after {} attempts", + base_id, MAX_SUFFIX + ); +}