diff --git a/.github/scripts/run-benchmarks.sh b/.github/scripts/run-benchmarks.sh index a467b2a1..cfd00e5a 100755 --- a/.github/scripts/run-benchmarks.sh +++ b/.github/scripts/run-benchmarks.sh @@ -17,7 +17,7 @@ BENCH_RAW_FILE="$(mktemp -t structured-zstd-bench-raw.XXXXXX)" trap 'rm -f "$BENCH_RAW_FILE"' EXIT export STRUCTURED_ZSTD_EMIT_REPORT=1 -cargo bench --bench compare_ffi -p structured-zstd -- --output-format bencher | tee "$BENCH_RAW_FILE" +cargo bench --bench compare_ffi -p structured-zstd --features dict_builder -- --output-format bencher | tee "$BENCH_RAW_FILE" echo "Parsing results..." >&2 @@ -38,6 +38,9 @@ MEM_RE = re.compile( DICT_RE = re.compile( r'^REPORT_DICT scenario=(\S+) label="((?:[^"\\]|\\.)+)" level=(\S+) dict_bytes=(\d+) train_ms=([0-9.]+) ffi_no_dict_bytes=(\d+) ffi_with_dict_bytes=(\d+) ffi_no_dict_ratio=([0-9.]+) ffi_with_dict_ratio=([0-9.]+)$' ) +DICT_TRAIN_RE = re.compile( + r'^REPORT_DICT_TRAIN scenario=(\S+) label="((?:[^"\\]|\\.)+)" training_bytes=(\d+) dict_bytes_requested=(\d+) rust_train_ms=([0-9.]+) ffi_train_ms=([0-9.]+) rust_dict_bytes=(\d+) ffi_dict_bytes=(\d+) rust_fastcover_score=(\d+)$' +) def unescape_report_label(value): output = [] @@ -71,8 +74,10 @@ timings = [] ratios = [] memory_rows = [] dictionary_rows = [] +dictionary_training_rows = [] timing_rows = [] scenario_input_bytes = {} +scenario_training_bytes = {} raw_path = os.environ["BENCH_RAW_FILE"] DELTA_LOW = 0.99 @@ -104,6 +109,14 @@ def parse_benchmark_name(name): "source": None, "implementation": parts[4], } + if len(parts) == 5 and parts[0] == "dict-train" and parts[3] == "matrix": + return { + "stage": "dict-train", + "level": parts[1], + "scenario": parts[2], + "source": None, + "implementation": parts[4], + } raise ValueError(f"Unsupported benchmark name format: {name} (parts={parts})") def canonical_key(stage, scenario, level, source): @@ -227,6 +240,41 @@ with open(raw_path) as f: "ffi_no_dict_ratio": float(ffi_no_dict_ratio), "ffi_with_dict_ratio": float(ffi_with_dict_ratio), }) + continue + + dict_train_match = DICT_TRAIN_RE.match(line) + if dict_train_match: + ( + scenario, + label, + training_bytes, + dict_bytes_requested, + rust_train_ms, + ffi_train_ms, + rust_dict_bytes, + ffi_dict_bytes, + rust_fastcover_score, + ) = dict_train_match.groups() + label = unescape_report_label(label) + delta = None + rust_train_ms_float = float(rust_train_ms) + ffi_train_ms_float = float(ffi_train_ms) + if rust_train_ms_float > 0.0: + delta = ffi_train_ms_float / rust_train_ms_float + dictionary_training_rows.append({ + "scenario": scenario, + "label": label, + "training_bytes": int(training_bytes), + "dict_bytes_requested": int(dict_bytes_requested), + "rust_train_ms": rust_train_ms_float, + "ffi_train_ms": ffi_train_ms_float, + "rust_dict_bytes": int(rust_dict_bytes), + "ffi_dict_bytes": int(ffi_dict_bytes), + "rust_fastcover_score": int(rust_fastcover_score), + "delta_ffi_over_rust": delta, + "status": classify_speed_delta(delta), + }) + scenario_training_bytes[scenario] = int(training_bytes) if not benchmark_results: print("ERROR: No benchmark results parsed!", file=sys.stderr) @@ -246,6 +294,13 @@ if not memory_rows: if not dictionary_rows: print("WARN: No REPORT_DICT lines parsed; dictionary section will be empty.", file=sys.stderr) +if not dictionary_training_rows: + print( + "ERROR: No REPORT_DICT_TRAIN lines parsed; dictionary training section would be empty.", + file=sys.stderr, + ) + sys.exit(1) + with open("benchmark-results.json", "w") as f: json.dump(benchmark_results, f, indent=2) @@ -302,7 +357,10 @@ for key in all_keys: scenario = meta["scenario"] if meta else key.split(" + ")[0] level = meta["level"] if meta else "unknown" source = meta["source"] if meta else None - input_bytes = scenario_input_bytes.get(scenario) + if stage == "dict-train": + input_bytes = scenario_training_bytes.get(scenario) + else: + input_bytes = scenario_input_bytes.get(scenario) speed_series = {} for impl_name, impl_row in speed_index.get(key, {}).items(): @@ -325,7 +383,11 @@ for key in all_keys: speed_delta = ( rust_bps / ffi_bps if (rust_bps is not None and ffi_bps is not None and ffi_bps > 0.0) - else None + else ( + ffi_ms / rust_ms + if (rust_ms is not None and ffi_ms is not None and rust_ms > 0.0) + else None + ) ) has_comparable_ratio = ( @@ -368,7 +430,7 @@ for key in all_keys: "delta_low": DELTA_LOW, "delta_high": DELTA_HIGH, }, - "interpretation": "delta>1 means Rust faster than FFI; delta<1 means slower", + "interpretation": "delta>1 means Rust faster than FFI; throughput ratio uses rust_bytes_per_sec/ffi_bytes_per_sec when available, otherwise fallback is ffi_ms_per_iter/rust_ms_per_iter", }, } ) @@ -421,6 +483,22 @@ for row in sorted(dictionary_rows, key=lambda item: (item["scenario"], item["lev f'| {row["scenario"]} | {label} | {row["level"]} | {row["dict_bytes"]} | {row["train_ms"]:.3f} | {row["ffi_no_dict_bytes"]} | {row["ffi_with_dict_bytes"]} | {row["ffi_no_dict_ratio"]:.4f} | {row["ffi_with_dict_ratio"]:.4f} |' ) +lines.extend([ + "", + "## Dictionary Training (Rust FastCOVER vs C FFI)", + "", + "| Scenario | Label | Dict bytes (requested) | Rust train ms | C train ms | Rust dict bytes | C dict bytes | Rust FastCOVER score | Delta (C/Rust) | Status |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |", +]) + +for row in sorted(dictionary_training_rows, key=lambda item: item["scenario"]): + label = markdown_table_escape(row["label"]) + delta = row["delta_ffi_over_rust"] + delta_cell = f"{delta:.4f}" if delta is not None else "n/a" + lines.append( + f'| {row["scenario"]} | {label} | {row["dict_bytes_requested"]} | {row["rust_train_ms"]:.3f} | {row["ffi_train_ms"]:.3f} | {row["rust_dict_bytes"]} | {row["ffi_dict_bytes"]} | {row["rust_fastcover_score"]} | {delta_cell} | {row["status"]} |' + ) + lines.extend([ "", "## Timing Metrics", @@ -502,7 +580,7 @@ delta_lines.extend( "", "## Speed pack", "", - "Interpretation: higher speed is better (`rust_bytes_per_sec / ffi_bytes_per_sec`).", + "Interpretation: higher speed is better; delta uses `rust_bytes_per_sec / ffi_bytes_per_sec` when throughput exists, otherwise fallback is `ffi_ms_per_iter / rust_ms_per_iter`.", "", "### Rust speed", "", @@ -564,6 +642,7 @@ print(f"Wrote {len(benchmark_results)} timing results to benchmark-results.json" print(f"Wrote {len(ratios)} ratio rows to benchmark-report.md", file=sys.stderr) print(f"Wrote {len(memory_rows)} memory rows to benchmark-report.md", file=sys.stderr) print(f"Wrote {len(dictionary_rows)} dictionary rows to benchmark-report.md", file=sys.stderr) +print(f"Wrote {len(dictionary_training_rows)} dictionary training rows to benchmark-report.md", file=sys.stderr) print(f"Wrote {len(delta_rows)} canonical rows to benchmark-delta.json", file=sys.stderr) print(f"Wrote {len(delta_rows)} canonical rows to benchmark-delta.md", file=sys.stderr) PYEOF diff --git a/BENCHMARKS.md b/BENCHMARKS.md index cd6a6093..23ef5fa5 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -38,9 +38,10 @@ encoder: - `structured-zstd::Better` vs `zstd` level `7` - `structured-zstd::Best` vs `zstd` level `11` -Dictionary benchmarks are tracked separately with C FFI `with_dict` vs `without_dict` runs, using a -dictionary trained from scenario samples. Pure Rust dictionary compression is still pending and is -therefore not part of the pure-Rust-vs-C timing matrix yet. +Dictionary benchmarks currently include: + +- C FFI `with_dict` vs `without_dict` compression runs +- dictionary training timing comparison (`dict-train`) between Rust FastCOVER and C FFI trainer ## Issue #24 Acceptance Mapping @@ -55,7 +56,7 @@ therefore not part of the pure-Rust-vs-C timing matrix yet. Run the full Criterion matrix: ```bash -cargo bench --bench compare_ffi -p structured-zstd -- --output-format bencher +cargo bench --bench compare_ffi -p structured-zstd --features dict_builder -- --output-format bencher ``` Generate the CI-style JSON and markdown report locally: @@ -85,6 +86,7 @@ bash scripts/bench-flamegraph.sh decompress/default/decodecorpus-z000033/rust_st - compression ratio tables (`REPORT`) - input+output buffer size estimate tables (`REPORT_MEM`) - dictionary compression tables (`REPORT_DICT`) + - dictionary training comparison tables (`REPORT_DICT_TRAIN`) - timing rows for all benchmark functions - `benchmark-delta.json` with canonical `(scenario + params)` rows including: - raw Rust/FFI ratio values and `rust/ffi` ratio delta @@ -96,7 +98,9 @@ bash scripts/bench-flamegraph.sh decompress/default/decodecorpus-z000033/rust_st Delta interpretation (direct same-run comparison on the same environment): - **Ratio delta** (`rust_ratio / ffi_ratio`): lower is better for Rust -- **Speed delta** (`rust_bytes_per_sec / ffi_bytes_per_sec`): higher is better for Rust +- **Speed delta**: higher is better for Rust + - throughput form: `rust_bytes_per_sec / ffi_bytes_per_sec` + - fallback form (when throughput is unavailable): `ffi_ms_per_iter / rust_ms_per_iter` Status labels in `benchmark-delta` are derived directly from the same-run deltas (no environment calibration/pre-test coefficients): diff --git a/README.md b/README.md index 606164d9..e47d6fa4 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,12 @@ Complete RFC 8878 implementation. Performance: ~1.4-3.5x slower than C zstd depe ### Dictionary Generation -When the `dict_builder` feature is enabled, the `dictionary` module can create raw content dictionaries. Within 0.2% of the official implementation on the `github-users` sample set. +When the `dict_builder` feature is enabled, the `dictionary` module can: +- build raw dictionaries with COVER (`create_raw_dict_from_source`) +- build raw dictionaries with FastCOVER (`create_fastcover_raw_dict_from_source`) +- finalize raw content into full zstd dictionary format (`finalize_raw_dict`) +- train+finalize in one pure-Rust flow (`create_fastcover_dict_from_source`) +- propagate I/O failures from dictionary-building APIs via `io::Result` return values ## Benchmarking diff --git a/zstd/Cargo.toml b/zstd/Cargo.toml index 786aa118..d52c7021 100644 --- a/zstd/Cargo.toml +++ b/zstd/Cargo.toml @@ -52,8 +52,14 @@ harness = false [[bench]] name = "compare_ffi" harness = false +required-features = ["dict_builder"] [[bench]] name = "bitstream" harness = false required-features = ["bench_internals"] + +[[bench]] +name = "dict_builder_fastcover" +harness = false +required-features = ["dict_builder"] diff --git a/zstd/benches/compare_ffi.rs b/zstd/benches/compare_ffi.rs index 2ed5caaa..064b7704 100644 --- a/zstd/benches/compare_ffi.rs +++ b/zstd/benches/compare_ffi.rs @@ -16,6 +16,9 @@ use std::hint::black_box; use std::sync::OnceLock; use std::time::{Duration, Instant}; use structured_zstd::decoding::FrameDecoder; +use structured_zstd::dictionary::{ + FastCoverOptions, FinalizeOptions, finalize_raw_dict, train_fastcover_raw_from_slice, +}; use support::{LevelConfig, Scenario, ScenarioClass, benchmark_scenarios, supported_levels}; static BENCHMARK_SCENARIOS: OnceLock> = OnceLock::new(); @@ -193,37 +196,126 @@ fn bench_dictionary(c: &mut Criterion) { continue; } - let training_samples = split_training_samples(&scenario.bytes); - let sample_refs: Vec<&[u8]> = training_samples.iter().map(Vec::as_slice).collect(); - let total_training_bytes = sample_refs.iter().map(|sample| sample.len()).sum::(); + let sample_count = training_sample_count(&scenario.bytes); + let total_training_bytes = scenario.bytes.len(); + let ffi_samples = [scenario.bytes.as_slice()]; + let max_dict_size = total_training_bytes.saturating_sub(64); let dict_size = dictionary_size_for(scenario.len()) - .min(total_training_bytes.saturating_sub(64)) - .max(256); - let train_started = Instant::now(); - let Ok(dictionary) = zstd::dict::from_samples(&sample_refs, dict_size) else { + .max(256) + .min(max_dict_size); + let Ok(rust_content_budget) = + finalized_training_content_budget(scenario.bytes.as_slice(), dict_size) + else { + eprintln!( + "BENCH_WARN skipping Rust FastCOVER dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={}) due to finalized content budget error", + scenario.id, sample_count, total_training_bytes, dict_size + ); + continue; + }; + let fastcover_options = fastcover_fixed_options(); + + let rust_train_started = Instant::now(); + let Ok((rust_raw_dictionary, rust_tuned)) = train_fastcover_raw_from_slice( + scenario.bytes.as_slice(), + rust_content_budget, + &fastcover_options, + ) else { + eprintln!( + "BENCH_WARN skipping Rust FastCOVER dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={})", + scenario.id, sample_count, total_training_bytes, dict_size + ); + continue; + }; + let Ok(rust_dictionary) = finalize_raw_dict( + rust_raw_dictionary.as_slice(), + scenario.bytes.as_slice(), + dict_size, + FinalizeOptions::default(), + ) else { + eprintln!( + "BENCH_WARN skipping Rust FastCOVER finalization benchmark for {} (samples={}, total_training_bytes={}, dict_size={})", + scenario.id, sample_count, total_training_bytes, dict_size + ); + continue; + }; + let rust_train_ms = rust_train_started.elapsed().as_secs_f64() * 1_000.0; + + let ffi_train_started = Instant::now(); + let Ok(ffi_dictionary) = zstd::dict::from_samples(&ffi_samples, dict_size) else { eprintln!( "BENCH_WARN skipping dictionary benchmark for {} (samples={}, total_training_bytes={}, dict_size={})", scenario.id, - sample_refs.len(), + ffi_samples.len(), total_training_bytes, dict_size ); continue; }; - let train_ms = train_started.elapsed().as_secs_f64() * 1_000.0; + let ffi_train_ms = ffi_train_started.elapsed().as_secs_f64() * 1_000.0; + + if emit_reports { + emit_dictionary_training_report( + scenario, + DictTrainingMetrics { + training_bytes: total_training_bytes, + dict_bytes_requested: dict_size, + rust_train_ms, + ffi_train_ms, + rust_dict_bytes: rust_dictionary.len(), + ffi_dict_bytes: ffi_dictionary.len(), + rust_fastcover_score: rust_tuned.score, + }, + ); + } + + let benchmark_name = format!("dict-train/na/{}/{}", scenario.id, "matrix"); + let mut group = c.benchmark_group(benchmark_name); + configure_group(&mut group, scenario); + group.throughput(Throughput::Bytes(total_training_bytes as u64)); + + group.bench_function("pure_rust", |b| { + b.iter(|| { + let (raw_dict, tuned) = train_fastcover_raw_from_slice( + scenario.bytes.as_slice(), + rust_content_budget, + &fastcover_options, + ) + .expect("fastcover training should succeed"); + let dict = finalize_raw_dict( + raw_dict.as_slice(), + scenario.bytes.as_slice(), + dict_size, + FinalizeOptions::default(), + ) + .expect("fastcover dictionary finalization should succeed"); + black_box((dict.len(), tuned.score)); + }) + }); + + group.bench_function("c_ffi", |b| { + b.iter(|| { + black_box( + zstd::dict::from_samples(&ffi_samples, dict_size) + .expect("ffi dictionary training should succeed") + .len(), + ) + }) + }); + + group.finish(); for level in supported_levels() { let mut no_dict = zstd::bulk::Compressor::new(level.ffi_level).unwrap(); let mut with_dict = - zstd::bulk::Compressor::with_dictionary(level.ffi_level, &dictionary).unwrap(); + zstd::bulk::Compressor::with_dictionary(level.ffi_level, &ffi_dictionary).unwrap(); let no_dict_bytes = no_dict.compress(&scenario.bytes).unwrap(); let with_dict_bytes = with_dict.compress(&scenario.bytes).unwrap(); if emit_reports { emit_dictionary_report( scenario, level, - dictionary.len(), - train_ms, + ffi_dictionary.len(), + ffi_train_ms, &no_dict_bytes, &with_dict_bytes, ); @@ -242,7 +334,8 @@ fn bench_dictionary(c: &mut Criterion) { group.bench_function("c_ffi_with_dict", |b| { let mut compressor = - zstd::bulk::Compressor::with_dictionary(level.ffi_level, &dictionary).unwrap(); + zstd::bulk::Compressor::with_dictionary(level.ffi_level, &ffi_dictionary) + .unwrap(); b.iter(|| black_box(compressor.compress(&scenario.bytes).unwrap())) }); @@ -355,35 +448,84 @@ fn emit_dictionary_report( ); } -fn split_training_samples(source: &[u8]) -> Vec> { +fn emit_dictionary_training_report(scenario: &Scenario, metrics: DictTrainingMetrics) { + let escaped_label = escape_report_label(&scenario.label); + println!( + "REPORT_DICT_TRAIN scenario={} label=\"{}\" training_bytes={} dict_bytes_requested={} rust_train_ms={:.3} ffi_train_ms={:.3} rust_dict_bytes={} ffi_dict_bytes={} rust_fastcover_score={}", + scenario.id, + escaped_label, + metrics.training_bytes, + metrics.dict_bytes_requested, + metrics.rust_train_ms, + metrics.ffi_train_ms, + metrics.rust_dict_bytes, + metrics.ffi_dict_bytes, + metrics.rust_fastcover_score + ); +} + +struct DictTrainingMetrics { + training_bytes: usize, + dict_bytes_requested: usize, + rust_train_ms: f64, + ffi_train_ms: f64, + rust_dict_bytes: usize, + ffi_dict_bytes: usize, + rust_fastcover_score: usize, +} + +fn finalized_training_content_budget(sample: &[u8], dict_size: usize) -> std::io::Result { + let probe = [0u8; 8]; + let finalized = finalize_raw_dict( + probe.as_slice(), + sample, + dict_size, + FinalizeOptions::default(), + )?; + let header_bytes = finalized.len().saturating_sub(probe.len()); + Ok(dict_size.saturating_sub(header_bytes)) +} + +fn training_sample_count(source: &[u8]) -> usize { let sample_size = source.len().div_ceil(16).clamp(256, 8192); - let mut samples: Vec> = source + let samples = source .chunks(sample_size) .take(64) .filter(|chunk| chunk.len() >= 64) - .map(|chunk| chunk.to_vec()) - .collect(); - if samples.len() < 2 { + .count(); + if samples < 2 { let midpoint = source.len() / 2; let left = &source[..midpoint]; let right = &source[midpoint..]; if left.len() >= 64 && right.len() >= 64 { - samples = vec![left.to_vec(), right.to_vec()]; + 2 } else { eprintln!( "BENCH_WARN tiny dictionary training input ({} bytes), using a single sample fallback", source.len() ); - samples = vec![source.to_vec()]; + 1 } + } else { + samples } - samples } fn dictionary_size_for(input_len: usize) -> usize { input_len.div_ceil(8).clamp(256, 16 * 1024) } +fn fastcover_fixed_options() -> FastCoverOptions { + FastCoverOptions { + optimize: false, + accel: 4, + k: 256, + d: 8, + f: 20, + ..FastCoverOptions::default() + } +} + fn escape_report_label(label: &str) -> String { label.replace('\\', "\\\\").replace('\"', "\\\"") } diff --git a/zstd/benches/dict_builder_fastcover.rs b/zstd/benches/dict_builder_fastcover.rs new file mode 100644 index 00000000..4d7c41c3 --- /dev/null +++ b/zstd/benches/dict_builder_fastcover.rs @@ -0,0 +1,78 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use std::hint::black_box; +use std::io::Cursor; +use structured_zstd::dictionary::{ + FastCoverOptions, create_fastcover_raw_dict_from_source, create_raw_dict_from_source, +}; + +fn corpus() -> Vec { + let mut data = Vec::new(); + for i in 0..2_000u32 { + data.extend_from_slice( + format!( + "tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbcccccdddddeeeeefffff\n" + ) + .as_bytes(), + ); + } + data +} + +fn bench_dict_builder(c: &mut Criterion) { + let data = corpus(); + let dict_size = 8 * 1024; + let fastcover_opt = FastCoverOptions::default(); + let fastcover_fixed = FastCoverOptions { + optimize: false, + accel: 4, + k: 256, + d: 8, + f: 20, + ..FastCoverOptions::default() + }; + + c.bench_function("dict_builder/cover_raw", |b| { + b.iter(|| { + let mut out = Vec::new(); + create_raw_dict_from_source( + Cursor::new(data.as_slice()), + data.len(), + &mut out, + black_box(dict_size), + ) + .expect("cover training should succeed"); + black_box(out.len()); + }) + }); + + c.bench_function("dict_builder/fastcover_raw_opt", |b| { + b.iter(|| { + let mut out = Vec::new(); + let tuned = create_fastcover_raw_dict_from_source( + Cursor::new(data.as_slice()), + &mut out, + black_box(dict_size), + &fastcover_opt, + ) + .expect("fastcover training should succeed"); + black_box((out.len(), tuned.score)); + }) + }); + + c.bench_function("dict_builder/fastcover_raw_fixed", |b| { + b.iter(|| { + let mut out = Vec::new(); + let tuned = create_fastcover_raw_dict_from_source( + Cursor::new(data.as_slice()), + &mut out, + black_box(dict_size), + &fastcover_fixed, + ) + .expect("fastcover training should succeed"); + black_box((out.len(), tuned.score)); + }) + }); +} + +criterion_group!(benches, bench_dict_builder); +criterion_main!(benches); diff --git a/zstd/src/dictionary/cover.rs b/zstd/src/dictionary/cover.rs index fa448a2d..e2794ffa 100644 --- a/zstd/src/dictionary/cover.rs +++ b/zstd/src/dictionary/cover.rs @@ -67,11 +67,10 @@ pub struct Context { pub fn pick_best_segment( params: &DictParams, ctx: &mut Context, + epoch: &'_ [u8], collection_sample: &'_ [u8], ) -> Segment { - let mut segments = collection_sample - .chunks(params.segment_size as usize) - .peekable(); + let mut segments = epoch.chunks(params.segment_size as usize).peekable(); let mut best_segment: &[u8] = segments.peek().expect("at least one segment"); let mut top_segment_score: usize = 0; // Iterate over segments and score each segment, keeping track of the best segment @@ -93,9 +92,12 @@ pub fn pick_best_segment( /// /// `score_segment` modifies `ctx.frequencies`. fn score_segment(ctx: &mut Context, collection_sample: &[u8], segment: &[u8]) -> usize { + if segment.len() < K { + return 0; + } let mut segment_score = 0; // Determine the score of each overlapping k-mer - for i in 0..(segment.len() - K - 1) { + for i in 0..=(segment.len() - K) { let kmer: &KMer = (&segment[i..i + K]) .try_into() .expect("Failed to make kmer"); diff --git a/zstd/src/dictionary/fastcover.rs b/zstd/src/dictionary/fastcover.rs new file mode 100644 index 00000000..940d6cdb --- /dev/null +++ b/zstd/src/dictionary/fastcover.rs @@ -0,0 +1,357 @@ +use alloc::vec; +use alloc::vec::Vec; + +#[derive(Debug, Clone, Copy)] +pub struct FastCoverParams { + pub k: usize, + pub d: usize, + pub f: u32, + pub accel: usize, +} + +#[derive(Debug, Clone, Copy)] +pub struct FastCoverTuned { + pub k: usize, + pub d: usize, + pub f: u32, + pub accel: usize, + pub score: usize, +} + +pub const DEFAULT_K_CANDIDATES: &[usize] = &[64, 128, 256, 512, 1024, 2048]; +pub const DEFAULT_D_CANDIDATES: &[usize] = &[6, 8, 12, 16]; +pub const DEFAULT_F_CANDIDATES: &[u32] = &[16, 18, 20]; + +fn hash_dmer(dmer: &[u8]) -> u64 { + // 64-bit FNV-1a, deterministic and cheap for d-mer hashing. + let mut h = 0xcbf29ce484222325u64; + for &b in dmer { + h ^= u64::from(b); + h = h.wrapping_mul(0x100000001b3); + } + h +} + +fn clamp_table_bits(f: u32) -> u32 { + f.clamp(8, 20) +} + +pub(crate) fn normalize_fastcover_params(mut params: FastCoverParams) -> FastCoverParams { + params.d = params.d.clamp(4, 32); + params.k = params.k.max(params.d).max(16); + params.f = clamp_table_bits(params.f); + params.accel = params.accel.clamp(1, 10); + params +} + +fn build_frequency_table(sample: &[u8], d: usize, f: u32, accel: usize) -> Vec { + let bits = clamp_table_bits(f); + let size = 1usize << bits; + let mask = size - 1; + let step = accel.max(1); + let mut table = vec![0u32; size]; + + if sample.len() < d || d == 0 { + return table; + } + + let mut i = 0usize; + while i + d <= sample.len() { + let slot = (hash_dmer(&sample[i..i + d]) as usize) & mask; + table[slot] = table[slot].saturating_add(1); + i += step; + } + table +} + +fn score_segment(segment: &[u8], d: usize, mask: usize, table: &[u32]) -> usize { + if segment.len() < d || d == 0 { + return 0; + } + let mut score = 0usize; + for i in 0..=(segment.len() - d) { + let slot = (hash_dmer(&segment[i..i + d]) as usize) & mask; + score += table[slot] as usize; + } + score +} + +fn build_raw_dict(sample: &[u8], dict_size: usize, params: FastCoverParams) -> Vec { + if sample.is_empty() || dict_size == 0 { + return Vec::new(); + } + + let params = normalize_fastcover_params(params); + let k = params.k; + let d = params.d; + let table = build_frequency_table(sample, d, params.f, params.accel); + let mask = table.len().saturating_sub(1); + + let mut segments: Vec<(usize, &[u8])> = sample + .chunks(k) + .filter(|seg| seg.len() >= d) + .map(|seg| (score_segment(seg, d, mask, &table), seg)) + .collect(); + segments.sort_by(|a, b| b.0.cmp(&a.0)); + + let mut out = Vec::with_capacity(dict_size); + for (_, seg) in segments { + if out.len() >= dict_size { + break; + } + let remaining = dict_size - out.len(); + if seg.len() <= remaining { + out.extend_from_slice(seg); + } else { + out.extend_from_slice(&seg[..remaining]); + } + } + out +} + +fn coverage_score(dict: &[u8], eval: &[u8], d: usize, accel: usize) -> usize { + if dict.len() < d || eval.len() < d || d == 0 { + return 0; + } + let mut seen = std::collections::HashSet::with_capacity(dict.len() / d + 1); + for i in 0..=(dict.len() - d) { + seen.insert(hash_dmer(&dict[i..i + d])); + } + + let mut hits = 0usize; + let step = accel.max(1); + let mut i = 0usize; + while i + d <= eval.len() { + if seen.contains(&hash_dmer(&eval[i..i + d])) { + hits += 1; + } + i += step; + } + hits +} + +pub fn train_fastcover_raw(sample: &[u8], dict_size: usize, params: FastCoverParams) -> Vec { + build_raw_dict(sample, dict_size, params) +} + +pub fn optimize_fastcover_raw( + sample: &[u8], + dict_size: usize, + split_point: f64, + accel: usize, + d_candidates: &[usize], + f_candidates: &[u32], + k_values: &[usize], +) -> (Vec, FastCoverTuned) { + let d_values = if d_candidates.is_empty() { + DEFAULT_D_CANDIDATES + } else { + d_candidates + }; + let f_values = if f_candidates.is_empty() { + DEFAULT_F_CANDIDATES + } else { + f_candidates + }; + let k_candidates = if k_values.is_empty() { + DEFAULT_K_CANDIDATES + } else { + k_values + }; + + if sample.len() < 2 { + let params = normalize_fastcover_params(FastCoverParams { + k: k_candidates[0], + d: d_values[0], + f: f_values[0], + accel, + }); + let mut dict = build_raw_dict(sample, dict_size, params); + if dict.is_empty() && dict_size > 0 { + let take = sample.len().min(dict_size); + dict.extend_from_slice(&sample[..take]); + } + return ( + dict, + FastCoverTuned { + k: params.k, + d: params.d, + f: params.f, + accel: params.accel, + score: 0, + }, + ); + } + + let split = split_point.clamp(0.1, 0.95); + let split_idx = ((sample.len() as f64) * split) as usize; + let split_idx = split_idx.clamp(1, sample.len().saturating_sub(1)); + let (train, eval) = sample.split_at(split_idx); + + let mut best_dict = Vec::new(); + let mut best = FastCoverTuned { + k: 0, + d: 0, + f: 0, + accel: accel.clamp(1, 10), + score: 0, + }; + + for &f in f_values { + for &d in d_values { + for &k in k_candidates { + let params = normalize_fastcover_params(FastCoverParams { k, d, f, accel }); + let dict = build_raw_dict(train, dict_size, params); + let score = coverage_score(dict.as_slice(), eval, params.d, params.accel); + if best_dict.is_empty() || score > best.score { + best.score = score; + best.k = params.k; + best.d = params.d; + best.f = params.f; + best.accel = params.accel; + best_dict = dict; + } + } + } + } + + (best_dict, best) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::format; + + fn corpus() -> Vec { + let mut data = Vec::new(); + for i in 0..500u32 { + data.extend_from_slice( + format!("tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbccccdddd\n") + .as_bytes(), + ); + } + data + } + + #[test] + fn fastcover_raw_produces_non_empty_dict() { + let sample = corpus(); + let dict = train_fastcover_raw( + sample.as_slice(), + 4096, + FastCoverParams { + k: 256, + d: 8, + f: 20, + accel: 1, + }, + ); + assert!(!dict.is_empty()); + assert!(dict.len() <= 4096); + } + + #[test] + fn fastcover_raw_returns_empty_for_empty_or_zero_budget() { + let sample = corpus(); + let params = FastCoverParams { + k: 256, + d: 8, + f: 20, + accel: 1, + }; + assert!(train_fastcover_raw(&[], 1024, params).is_empty()); + assert!(train_fastcover_raw(sample.as_slice(), 0, params).is_empty()); + } + + #[test] + fn fastcover_optimizer_selects_valid_params() { + let sample = corpus(); + let (dict, tuned) = optimize_fastcover_raw( + sample.as_slice(), + 4096, + 0.75, + 1, + &[6, 8], + &[18, 20], + &[128, 256], + ); + assert!(!dict.is_empty()); + assert!([6, 8].contains(&tuned.d)); + assert!([18, 20].contains(&tuned.f)); + assert!([128, 256].contains(&tuned.k)); + } + + #[test] + fn fastcover_optimizer_falls_back_when_k_candidates_empty() { + let sample = corpus(); + let (dict, tuned) = + optimize_fastcover_raw(sample.as_slice(), 4096, 0.75, 1, &[6, 8], &[18, 20], &[]); + assert!(!dict.is_empty()); + assert!(DEFAULT_K_CANDIDATES.contains(&tuned.k)); + } + + #[test] + fn fastcover_optimizer_handles_one_byte_sample_without_panic() { + let sample = [0xAB]; + let (dict, tuned) = optimize_fastcover_raw(&sample, 16, 0.75, 1, &[], &[], &[]); + assert!(!dict.is_empty()); + assert!(dict.len() <= 16); + assert!(DEFAULT_K_CANDIDATES.contains(&tuned.k)); + assert!(DEFAULT_D_CANDIDATES.contains(&tuned.d)); + assert!(DEFAULT_F_CANDIDATES.contains(&tuned.f)); + } + + #[test] + fn fastcover_optimizer_seeds_winner_when_all_scores_are_zero() { + let sample = b"abcdefghijklmnopqrst"; + let (dict, tuned) = optimize_fastcover_raw(sample, 16, 0.9, 1, &[6], &[16], &[8]); + assert!(!dict.is_empty()); + assert_eq!(tuned.k, 16); + assert_eq!(tuned.d, 6); + assert_eq!(tuned.f, 16); + assert_eq!(tuned.score, 0); + } + + #[test] + fn fastcover_optimizer_handles_zero_dict_budget() { + let sample = corpus(); + let (dict, tuned) = optimize_fastcover_raw( + sample.as_slice(), + 0, + 0.75, + 1, + &[6, 8], + &[18, 20], + &[128, 256], + ); + assert!(dict.is_empty()); + assert!([6, 8].contains(&tuned.d)); + assert!([18, 20].contains(&tuned.f)); + assert!([128, 256].contains(&tuned.k)); + } + + #[test] + fn fastcover_optimizer_clamps_extreme_split_points() { + let sample = corpus(); + let (dict_low, tuned_low) = + optimize_fastcover_raw(sample.as_slice(), 2048, 0.0, 1, &[6], &[18], &[128]); + let (dict_high, tuned_high) = + optimize_fastcover_raw(sample.as_slice(), 2048, 1.0, 1, &[6], &[18], &[128]); + assert!(!dict_low.is_empty()); + assert!(!dict_high.is_empty()); + assert_eq!(tuned_low.k, 128); + assert_eq!(tuned_high.k, 128); + } + + #[test] + fn fastcover_optimizer_reports_normalized_params() { + let sample = corpus(); + let (dict, tuned) = + optimize_fastcover_raw(sample.as_slice(), 1024, 0.75, 1, &[64], &[42], &[8]); + assert!(!dict.is_empty()); + assert_eq!(tuned.d, 32); + assert_eq!(tuned.f, 20); + assert_eq!(tuned.k, 32); + } +} diff --git a/zstd/src/dictionary/mod.rs b/zstd/src/dictionary/mod.rs index 117bfe7a..1a9cebb6 100644 --- a/zstd/src/dictionary/mod.rs +++ b/zstd/src/dictionary/mod.rs @@ -23,22 +23,74 @@ // the frequency of w in the reservoir using a rolling karp-rabin hash // - The score of a segment is the sum of `f(w)` called on every kmer within the segment mod cover; +mod fastcover; mod frequency; mod reservoir; +use crate::bit_io::BitWriter; +use crate::blocks::sequence_section::{ + MAX_LITERAL_LENGTH_CODE, MAX_MATCH_LENGTH_CODE, MAX_OFFSET_CODE, +}; +use crate::decoding::dictionary::MAGIC_NUM as DICT_MAGIC_NUM; +use crate::decoding::sequence_section_decoder::{LL_MAX_LOG, ML_MAX_LOG, OF_MAX_LOG}; use crate::dictionary::reservoir::create_sample; -use alloc::vec; +use crate::fse::fse_encoder::{self, build_table_from_data}; +use crate::huff0::HuffmanTable as HuffmanDecoderTable; +use crate::huff0::huff0_encoder::{HuffmanEncoder, HuffmanTable as HuffmanEncoderTable}; use core::cmp::Reverse; use cover::*; +pub use fastcover::{ + DEFAULT_D_CANDIDATES, DEFAULT_F_CANDIDATES, DEFAULT_K_CANDIDATES, FastCoverParams, + FastCoverTuned, +}; use std::{ boxed::Box, collections::{BinaryHeap, HashMap}, + format, fs::{self, File}, - io::{self, BufReader, Read}, + io::{self, Read}, path::{Path, PathBuf}, vec::Vec, }; +const MAX_TRAINING_PREALLOC_BYTES: usize = 8 * 1024 * 1024; +const MAX_HUFFMAN_STATS_BYTES: usize = 64 * 1024; + +/// Tuning knobs for pure-Rust FastCOVER training. +#[derive(Debug, Clone)] +pub struct FastCoverOptions { + pub optimize: bool, + pub split_point: f64, + pub accel: usize, + pub k: usize, + pub d: usize, + pub f: u32, + pub k_candidates: Vec, + pub d_candidates: Vec, + pub f_candidates: Vec, +} + +impl Default for FastCoverOptions { + fn default() -> Self { + Self { + optimize: true, + split_point: 0.75, + accel: 1, + k: 256, + d: 8, + f: 20, + k_candidates: DEFAULT_K_CANDIDATES.to_vec(), + d_candidates: DEFAULT_D_CANDIDATES.to_vec(), + f_candidates: DEFAULT_F_CANDIDATES.to_vec(), + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct FinalizeOptions { + pub dict_id: Option, +} + /// A set of values that are used during dictionary construction. /// /// Changing these values can improve the resulting dictionary size for certain datasets. @@ -59,11 +111,12 @@ pub(super) struct DictParams { /// Creates a "raw content" dictionary, training off of every file in this directory and all /// sub-directories. /// -/// The resulting dictionary will be approxamitely `dict_size` or less, and written to `output`. +/// The resulting dictionary will be approximately `dict_size` or less, and written to `output`. /// /// # Errors /// This function returns `Ok(())` if the dictionary was created successfully, and an -/// `Err(io::Error)` if an error was encountered reading the input directory. +/// `Err(io::Error)` if an error was encountered reading the input directory or +/// writing dictionary bytes to `output`. /// /// # Examples /// ```no_run @@ -71,7 +124,8 @@ pub(super) struct DictParams { /// // Create a roughly 1mb dictionary, training off of file in `sample_files` /// let input_folder = "sample_files/"; /// let mut output = File::create("output.dict").unwrap(); -/// structured_zstd::dictionary::create_raw_dict_from_dir(input_folder, &mut output, 1_000_000); +/// structured_zstd::dictionary::create_raw_dict_from_dir(input_folder, &mut output, 1_000_000) +/// .expect("dictionary training from sample_files should succeed"); /// ``` pub fn create_raw_dict_from_dir, W: io::Write>( path: P, @@ -108,7 +162,7 @@ pub fn create_raw_dict_from_dir, W: io::Write>( .fold(empty_reader, |acc, reader| Box::new(acc.chain(reader))); // Create a dict using the new reader - create_raw_dict_from_source(chained_files, total_file_len as usize, output, dict_size); + create_raw_dict_from_source(chained_files, total_file_len as usize, output, dict_size)?; Ok(()) } @@ -116,31 +170,55 @@ pub fn create_raw_dict_from_dir, W: io::Write>( /// The completed dictionary is written to `output`. /// /// - `source` will be used as training data for the entire dictionary. -/// - `source_size` influences how the data is divided and sampled and is measured -/// in bytes. While this does not need to be exact, estimates should attempt to be -/// larger than the actual collection size. +/// - `source_size` is used only as a preallocation hint before reading `source` and +/// does not affect sampling once all data has been buffered. /// - `output` is where the completed dictionary will be written. /// - `dict_size` determines how large the complete dictionary should be. The completed /// dictionary will be this size or smaller. /// -/// This function uses `BufRead` internally, the provided reader need not be buffered. +/// This function reads the entire `source` into an in-memory `Vec` before building +/// the dictionary. The provided reader need not be buffered, but callers should avoid +/// sources too large to fit comfortably in memory. +/// +/// # API note +/// This public API returns `io::Result<()>` and propagates source/output I/O failures. pub fn create_raw_dict_from_source( - source: R, + mut source: R, source_size: usize, output: &mut W, dict_size: usize, -) { +) -> io::Result<()> { + if dict_size == 0 { + return Ok(()); + } + let prealloc = source_size.min(MAX_TRAINING_PREALLOC_BYTES); + let mut all = Vec::with_capacity(prealloc); + source.read_to_end(&mut all)?; + if all.is_empty() { + return Ok(()); + } + + if all.len() < K { + let keep = usize::min(all.len(), dict_size); + output.write_all(&all[all.len() - keep..])?; + return Ok(()); + } + + let source_size = all.len(); vprintln!("create_dict: creating {dict_size} byte dict from {source_size} byte source"); - let mut buffered_source = BufReader::with_capacity(128_000, source); let params = DictParams { segment_size: 2048 }; - let num_segments = source_size / params.segment_size as usize; + let num_segments = usize::max(1, source_size / params.segment_size as usize); // According to 4. Experiments - Varying Reservoir Sampler Thresholds, // setting reservoir size to collection size / min{collection size / (2 * number of segments), // 256} was effective - let sample_size = source_size / usize::min(source_size / (2 * num_segments), 256); + let denom = usize::max(1, source_size / (2 * num_segments)); + let sample_scale = usize::max(1, usize::min(denom, 256)); + let mut sample_size = source_size / sample_scale; + sample_size = usize::max(sample_size, usize::min(source_size, 16)); vprintln!("create_dict: creating {sample_size} byte sample of collection"); - let collection_sample = create_sample(&mut buffered_source, sample_size); + let mut sample_reader = all.as_slice(); + let collection_sample = create_sample(&mut sample_reader, sample_size); // A collection of segments to be used in the final dictionary. // @@ -148,24 +226,29 @@ pub fn create_raw_dict_from_source( // Reverse is used because we want a min heap, where // the lowest scoring items come first let mut pool: BinaryHeap> = BinaryHeap::new(); - let (_, epoch_size) = compute_epoch_info(¶ms, dict_size, source_size / K); - let num_epochs = source_size / epoch_size; + let (num_epochs, epoch_size_kmers) = compute_epoch_info(¶ms, dict_size, source_size / K); + let epoch_size = usize::max(K, epoch_size_kmers.saturating_mul(K)); vprintln!("create_dict: computed epoch info, using {num_epochs} epochs of {epoch_size} bytes"); - //let mut current_epoch = vec![0; epoch_size]; - let mut current_epoch = vec![0; 100]; let mut epoch_counter = 0; let mut ctx = Context { frequencies: HashMap::with_capacity(epoch_size / K), }; - // Score each segment in the epoch and select the highest scoring segment - // for the pool - while buffered_source - .read(&mut current_epoch) - .expect("can read input") - != 0 - { + // Score each segment in each planned epoch and select the highest-scoring + // segment for the pool. Keep exactly `num_epochs` windows to avoid + // emitting more segments than the requested dictionary budget allows. + for epoch_idx in 0..num_epochs { + let start = epoch_idx.saturating_mul(epoch_size); + if start >= all.len() { + break; + } + let end = if epoch_idx + 1 == num_epochs { + all.len() + } else { + usize::min(start.saturating_add(epoch_size), all.len()) + }; + let epoch = &all[start..end]; epoch_counter += 1; - let best_segment = pick_best_segment(¶ms, &mut ctx, &collection_sample); + let best_segment = pick_best_segment(¶ms, &mut ctx, epoch, &collection_sample); vprintln!( "\tcreate_dict: epoch {epoch_counter}/{num_epochs} has best segment score {}", best_segment.score @@ -181,8 +264,612 @@ pub fn create_raw_dict_from_source( // Write the dictionary with the highest scoring segment last because // closer items can be represented with a smaller offset while let Some(segment) = pool.pop() { - output - .write_all(&segment.0.raw) - .expect("can write to output"); + output.write_all(&segment.0.raw)?; + } + Ok(()) +} + +fn serialize_huffman_table(sample_data: &[u8], raw_content: &[u8]) -> io::Result> { + fn bounded_huffman_stats(data: &[u8]) -> Vec { + if data.len() <= MAX_HUFFMAN_STATS_BYTES { + return data.to_vec(); + } + + let mut stats = Vec::with_capacity(MAX_HUFFMAN_STATS_BYTES); + for i in 0..MAX_HUFFMAN_STATS_BYTES { + let idx = i * data.len() / MAX_HUFFMAN_STATS_BYTES; + stats.push(data[idx]); + } + stats + } + + let source = if sample_data.len() >= 2 { + sample_data + } else { + raw_content + }; + let mut stats = bounded_huffman_stats(source); + if stats.len() < 2 || stats.iter().all(|b| *b == stats[0]) { + stats = (0u8..=255).collect(); + } + + let table = HuffmanEncoderTable::build_from_data(stats.as_slice()); + let mut writer = BitWriter::new(); + let mut encoder = HuffmanEncoder::new(&table, &mut writer); + encoder.encode(&[stats[0]], true); + let encoded = writer.dump(); + + let mut decoder = HuffmanDecoderTable::new(); + let table_size = decoder + .build_decoder(encoded.as_slice()) + .map_err(|e| io::Error::other(format!("failed to decode generated huffman table: {e}")))?; + Ok(encoded[..table_size as usize].to_vec()) +} + +fn serialize_fse_table(table: &fse_encoder::FSETable) -> Vec { + let mut writer = BitWriter::new(); + table.write_table(&mut writer); + writer.dump() +} + +fn bounded_fse_symbols(data: &[u8], max_symbol: u8) -> Vec { + let modulo = u16::from(max_symbol) + 1; + if data.is_empty() { + return Vec::from([0u8]); + } + if data.len() <= MAX_HUFFMAN_STATS_BYTES { + return data + .iter() + .map(|b| (u16::from(*b) % modulo) as u8) + .collect(); + } + + let mut out = Vec::with_capacity(MAX_HUFFMAN_STATS_BYTES); + for i in 0..MAX_HUFFMAN_STATS_BYTES { + let idx = i * data.len() / MAX_HUFFMAN_STATS_BYTES; + out.push((u16::from(data[idx]) % modulo) as u8); + } + out +} + +fn serialize_fse_table_from_corpus( + sample_data: &[u8], + raw_content: &[u8], + max_symbol: u8, + max_log: u8, +) -> Vec { + let source = if sample_data.is_empty() { + raw_content + } else { + sample_data + }; + let symbols = bounded_fse_symbols(source, max_symbol); + let table = build_table_from_data(symbols.into_iter(), max_log, false); + serialize_fse_table(&table) +} + +fn finalized_content_budget( + sample_data: &[u8], + raw_fallback: &[u8], + dict_size: usize, +) -> io::Result { + let min_content_size = 8usize; + let huf_len = serialize_huffman_table(sample_data, raw_fallback)?.len(); + let of_len = + serialize_fse_table_from_corpus(sample_data, raw_fallback, MAX_OFFSET_CODE, OF_MAX_LOG) + .len(); + let ml_len = serialize_fse_table_from_corpus( + sample_data, + raw_fallback, + MAX_MATCH_LENGTH_CODE, + ML_MAX_LOG, + ) + .len(); + let ll_len = serialize_fse_table_from_corpus( + sample_data, + raw_fallback, + MAX_LITERAL_LENGTH_CODE, + LL_MAX_LOG, + ) + .len(); + + let header_len = DICT_MAGIC_NUM.len() + 4 + huf_len + of_len + ml_len + ll_len + 12; + let max_content_budget = dict_size.saturating_sub(header_len); + if max_content_budget < min_content_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "dictionary size too small to fit header and offset history", + )); + } + Ok(max_content_budget) +} + +fn derive_dict_id(raw_content: &[u8]) -> u32 { + let mut h = 0xcbf29ce484222325u64; + for &b in raw_content { + h ^= u64::from(b); + h = h.wrapping_mul(0x100000001b3); + } + let compliant = (h % ((1u64 << 31) - 32768)) + 32768; + compliant as u32 +} + +/// Finalize raw dictionary content into a full zstd dictionary binary +/// (`magic + dict_id + entropy tables + offset history + content`). +pub fn finalize_raw_dict( + raw_content: &[u8], + sample_data: &[u8], + dict_size: usize, + options: FinalizeOptions, +) -> io::Result> { + if raw_content.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "raw dictionary content must not be empty", + )); + } + let mut out = Vec::with_capacity(dict_size.max(256)); + out.extend_from_slice(&DICT_MAGIC_NUM); + let dict_id = options + .dict_id + .unwrap_or_else(|| derive_dict_id(raw_content)); + if dict_id == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "dictionary id must be non-zero", + )); + } + out.extend_from_slice(&dict_id.to_le_bytes()); + out.extend_from_slice(serialize_huffman_table(sample_data, raw_content)?.as_slice()); + out.extend_from_slice( + serialize_fse_table_from_corpus(sample_data, raw_content, MAX_OFFSET_CODE, OF_MAX_LOG) + .as_slice(), + ); + out.extend_from_slice( + serialize_fse_table_from_corpus( + sample_data, + raw_content, + MAX_MATCH_LENGTH_CODE, + ML_MAX_LOG, + ) + .as_slice(), + ); + out.extend_from_slice( + serialize_fse_table_from_corpus( + sample_data, + raw_content, + MAX_LITERAL_LENGTH_CODE, + LL_MAX_LOG, + ) + .as_slice(), + ); + + // Repeat offsets: keep default bootstrap history. + out.extend_from_slice(&1u32.to_le_bytes()); + out.extend_from_slice(&4u32.to_le_bytes()); + out.extend_from_slice(&8u32.to_le_bytes()); + + let min_content_size = 8usize; + let max_content_budget = dict_size.saturating_sub(out.len()); + if max_content_budget < min_content_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "dictionary size too small to fit header and offset history", + )); + } + + let content = if raw_content.len() > max_content_budget { + &raw_content[raw_content.len() - max_content_budget..] + } else { + raw_content + }; + if content.len() < min_content_size { + out.resize(out.len() + (min_content_size - content.len()), 0); + } + out.extend_from_slice(content); + Ok(out) +} + +/// Train a raw FastCOVER dictionary from a source stream. +fn train_fastcover_internal( + sample: &[u8], + dict_size: usize, + options: &FastCoverOptions, +) -> (Vec, FastCoverTuned) { + if options.optimize { + fastcover::optimize_fastcover_raw( + sample, + dict_size, + options.split_point, + options.accel, + options.d_candidates.as_slice(), + options.f_candidates.as_slice(), + options.k_candidates.as_slice(), + ) + } else { + let params = fastcover::normalize_fastcover_params(FastCoverParams { + k: options.k, + d: options.d, + f: options.f, + accel: options.accel, + }); + ( + fastcover::train_fastcover_raw(sample, dict_size, params), + FastCoverTuned { + k: params.k, + d: params.d, + f: params.f, + accel: params.accel, + score: 0, + }, + ) + } +} + +/// Train a raw FastCOVER dictionary directly from an in-memory sample. +pub fn train_fastcover_raw_from_slice( + sample: &[u8], + dict_size: usize, + options: &FastCoverOptions, +) -> io::Result<(Vec, FastCoverTuned)> { + if sample.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "source stream is empty", + )); + } + let (dict, tuned) = train_fastcover_internal(sample, dict_size, options); + if dict.is_empty() && dict_size > 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "training sample is too small for FastCOVER", + )); + } + Ok((dict, tuned)) +} + +/// Train a raw FastCOVER dictionary from a source stream. +/// +/// This function fully buffers the entire training corpus into memory via +/// `read_to_end`, which can consume significant RAM for large inputs. +pub fn create_fastcover_raw_dict_from_source( + mut source: R, + output: &mut W, + dict_size: usize, + options: &FastCoverOptions, +) -> io::Result { + let mut sample = Vec::new(); + source.read_to_end(&mut sample)?; + let (dict, tuned) = train_fastcover_raw_from_slice(sample.as_slice(), dict_size, options)?; + output.write_all(dict.as_slice())?; + Ok(tuned) +} + +/// Train and finalize a FastCOVER dictionary in pure Rust. +/// +/// This function fully buffers the entire training corpus into memory via +/// `read_to_end`, which can consume significant RAM for large inputs. +pub fn create_fastcover_dict_from_source( + mut source: R, + output: &mut W, + dict_size: usize, + fastcover: &FastCoverOptions, + finalize: FinalizeOptions, +) -> io::Result { + let mut sample = Vec::new(); + source.read_to_end(&mut sample)?; + let content_budget = finalized_content_budget(sample.as_slice(), sample.as_slice(), dict_size)?; + let (raw_dict, tuned) = + train_fastcover_raw_from_slice(sample.as_slice(), content_budget, fastcover)?; + + let finalized = finalize_raw_dict(raw_dict.as_slice(), sample.as_slice(), dict_size, finalize)?; + output.write_all(finalized.as_slice())?; + Ok(tuned) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::decoding::Dictionary; + use crate::encoding::{CompressionLevel, FrameCompressor}; + use std::io::Cursor; + use std::string::ToString; + + fn training_data() -> Vec { + let mut data = Vec::new(); + for i in 0..512u32 { + data.extend_from_slice( + format!( + "tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbcccccdddddeeeee\n" + ) + .as_bytes(), + ); + } + data + } + + #[test] + fn finalize_raw_dict_roundtrips_with_ffi_decoder() { + let sample = training_data(); + let dict_size = 4096usize; + let content_budget = + finalized_content_budget(sample.as_slice(), sample.as_slice(), dict_size) + .expect("content budget should be computable"); + let raw = fastcover::train_fastcover_raw( + sample.as_slice(), + content_budget, + FastCoverParams { + k: 256, + d: 8, + f: 20, + accel: 1, + }, + ); + let finalized = finalize_raw_dict( + raw.as_slice(), + sample.as_slice(), + dict_size, + FinalizeOptions::default(), + ) + .expect("finalization should succeed"); + let parsed = Dictionary::decode_dict(finalized.as_slice()) + .expect("finalized dictionary should parse"); + assert!(!parsed.dict_content.is_empty()); + + let mut payload = Vec::new(); + for idx in 0..96u32 { + payload.extend_from_slice( + format!("tenant=demo op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n") + .as_bytes(), + ); + } + + let mut compressed = Vec::new(); + let mut compressor = FrameCompressor::new(CompressionLevel::Fastest); + compressor + .set_dictionary(parsed) + .expect("dictionary should attach"); + compressor.set_source(payload.as_slice()); + compressor.set_drain(&mut compressed); + compressor.compress(); + + let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(finalized.as_slice()) + .expect("ffi decoder should accept finalized dictionary"); + let mut decoded = Vec::with_capacity(payload.len()); + let written = ffi_decoder + .decompress_to_buffer(compressed.as_slice(), &mut decoded) + .expect("ffi decoder should decode payload"); + assert_eq!(written, payload.len()); + assert_eq!(decoded, payload); + } + + #[test] + fn create_fastcover_dict_from_source_writes_non_empty_output() { + let sample = training_data(); + let mut out = Vec::new(); + let tuned = create_fastcover_dict_from_source( + Cursor::new(sample.as_slice()), + &mut out, + 4096, + &FastCoverOptions::default(), + FinalizeOptions::default(), + ) + .expect("fastcover+finalize should succeed"); + assert!(!out.is_empty()); + assert!(tuned.k > 0); + assert!(tuned.d > 0); + } + + #[test] + fn create_fastcover_raw_dict_from_source_rejects_empty_source() { + let mut out = Vec::new(); + let err = create_fastcover_raw_dict_from_source( + Cursor::new(Vec::::new()), + &mut out, + 1024, + &FastCoverOptions::default(), + ) + .expect_err("empty source must be rejected"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } + + #[test] + fn create_fastcover_dict_from_source_propagates_finalize_error() { + let sample = training_data(); + let mut out = Vec::new(); + let err = create_fastcover_dict_from_source( + Cursor::new(sample.as_slice()), + &mut out, + 32, + &FastCoverOptions::default(), + FinalizeOptions::default(), + ) + .expect_err("too-small dictionary budget must fail during finalize"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert!(err.to_string().contains("dictionary size too small")); + } + + #[test] + fn create_raw_dict_from_source_early_returns_on_zero_dict_size() { + let sample = training_data(); + let mut out = Vec::new(); + create_raw_dict_from_source(Cursor::new(sample.as_slice()), sample.len(), &mut out, 0) + .expect("zero dict size should no-op"); + assert!(out.is_empty()); + } + + #[test] + fn create_raw_dict_from_source_treats_source_size_as_hint() { + let sample = training_data(); + let mut out = Vec::new(); + create_raw_dict_from_source(Cursor::new(sample.as_slice()), 0, &mut out, 1024) + .expect("raw dictionary training should succeed"); + assert!(!out.is_empty()); + } + + #[test] + fn create_raw_dict_from_source_handles_tiny_source_without_epochs() { + let sample = b"short"; + let mut out = Vec::new(); + create_raw_dict_from_source(Cursor::new(sample.as_slice()), sample.len(), &mut out, 3) + .expect("tiny source path should succeed"); + assert_eq!(out, b"ort"); + } + + #[test] + fn create_raw_dict_from_source_propagates_read_error() { + struct FailingReader; + impl io::Read for FailingReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + Err(io::Error::other("read failed")) + } + } + + let mut out = Vec::new(); + let err = create_raw_dict_from_source(FailingReader, 1024, &mut out, 1024) + .expect_err("read failures must be returned"); + assert_eq!(err.kind(), io::ErrorKind::Other); + assert_eq!(err.to_string(), "read failed"); + } + + #[test] + fn create_raw_dict_from_source_propagates_write_error() { + struct FailingWriter; + impl io::Write for FailingWriter { + fn write(&mut self, _buf: &[u8]) -> io::Result { + Err(io::Error::other("write failed")) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + let sample = b"short"; + let mut out = FailingWriter; + let err = + create_raw_dict_from_source(Cursor::new(sample.as_slice()), sample.len(), &mut out, 3) + .expect_err("write failures must be returned"); + assert_eq!(err.kind(), io::ErrorKind::Other); + assert_eq!(err.to_string(), "write failed"); + } + + #[test] + fn create_raw_dict_from_source_never_exceeds_requested_size() { + let dict_size = 4096usize; + let source: Vec = core::iter::repeat_n(b'a', 320_001).collect(); + let mut out = Vec::new(); + create_raw_dict_from_source( + Cursor::new(source.as_slice()), + source.len(), + &mut out, + dict_size, + ) + .expect("raw dictionary training should succeed"); + assert!( + out.len() <= dict_size, + "raw dictionary exceeded requested size: {} > {}", + out.len(), + dict_size + ); + } + + #[test] + fn train_fastcover_raw_from_slice_rejects_empty_sample() { + let err = train_fastcover_raw_from_slice(&[], 1024, &FastCoverOptions::default()) + .expect_err("empty sample must be rejected"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } + + #[test] + fn train_fastcover_raw_from_slice_supports_non_optimized_params() { + let sample = training_data(); + let options = FastCoverOptions { + optimize: false, + k: 128, + d: 6, + f: 18, + ..FastCoverOptions::default() + }; + let (dict, tuned) = + train_fastcover_raw_from_slice(sample.as_slice(), 2048, &options).expect("must train"); + assert!(!dict.is_empty()); + assert!(dict.len() <= 2048); + assert_eq!(tuned.k, 128); + assert_eq!(tuned.d, 6); + assert_eq!(tuned.f, 18); + assert_eq!(tuned.score, 0); + } + + #[test] + fn train_fastcover_raw_from_slice_rejects_tiny_sample_with_empty_dict() { + let sample = b"tiny"; + let err = train_fastcover_raw_from_slice(sample, 1024, &FastCoverOptions::default()) + .expect_err("tiny sample should not produce an empty dictionary successfully"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert_eq!( + err.to_string(), + "training sample is too small for FastCOVER" + ); + } + + #[test] + fn train_fastcover_raw_from_slice_normalizes_non_optimized_params() { + let sample = training_data(); + let options = FastCoverOptions { + optimize: false, + k: 8, + d: 64, + f: 42, + ..FastCoverOptions::default() + }; + let (_, tuned) = + train_fastcover_raw_from_slice(sample.as_slice(), 2048, &options).expect("must train"); + assert_eq!(tuned.k, 32); + assert_eq!(tuned.d, 32); + assert_eq!(tuned.f, 20); + } + + #[test] + fn finalize_raw_dict_rejects_empty_raw_content() { + let sample = training_data(); + let err = finalize_raw_dict(&[], sample.as_slice(), 4096, FinalizeOptions::default()) + .expect_err("empty raw dictionary must be rejected"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } + + #[test] + fn finalize_raw_dict_rejects_too_small_budget() { + let sample = training_data(); + let raw = b"some-raw-bytes"; + let err = finalize_raw_dict(raw, sample.as_slice(), 32, FinalizeOptions::default()) + .expect_err("tiny dict_size must fail"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert!(err.to_string().contains("dictionary size too small")); + } + + #[test] + fn finalize_raw_dict_pads_to_minimum_content_size() { + let sample = training_data(); + let raw = b"x"; + let finalized = finalize_raw_dict(raw, sample.as_slice(), 4096, FinalizeOptions::default()) + .expect("finalize should pad small raw content"); + let parsed = Dictionary::decode_dict(finalized.as_slice()).expect("finalized dict parses"); + assert!(parsed.dict_content.len() >= 8); + assert_eq!(parsed.dict_content.last(), Some(&b'x')); + } + + #[test] + fn finalize_raw_dict_rejects_zero_dict_id() { + let sample = training_data(); + let raw = b"raw-fastcover-bytes"; + let err = finalize_raw_dict( + raw, + sample.as_slice(), + 4096, + FinalizeOptions { dict_id: Some(0) }, + ) + .expect_err("dict_id=0 must be rejected"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert_eq!(err.to_string(), "dictionary id must be non-zero"); } } diff --git a/zstd/src/encoding/frame_compressor.rs b/zstd/src/encoding/frame_compressor.rs index c6c11c5a..2e3304a9 100644 --- a/zstd/src/encoding/frame_compressor.rs +++ b/zstd/src/encoding/frame_compressor.rs @@ -730,7 +730,8 @@ mod tests { training.len(), &mut raw_dict, 4096, - ); + ) + .expect("dict_builder training should succeed"); assert!( !raw_dict.is_empty(), "dict_builder produced an empty dictionary"