diff --git a/.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml b/.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml new file mode 100644 index 000000000..8e132b9d3 --- /dev/null +++ b/.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml @@ -0,0 +1,59 @@ +# Opt-in KV-offload sweep for Cam's trace-replay lanes. +# This file mirrors .github/configs/multiturn-agentic-trace-isb1.yaml but swaps +# the coarse on/off axis for explicit cpu-offload budgets. +# +# offload values: +# 0 — omit --cpu-offload-gb entirely (HBM-only baseline) +# 20 / 40 / 80 +# — pass --cpu-offload-gb N to the existing replay/aiperf wrappers +# noprefix — omit --cpu-offload-gb and add --no-enable-prefix-caching +# +# This is additive and operator-only: pass this sweep file to Cam's existing +# trace-replay or LMCache aiperf scripts. No harness edits required. + +h200-fp8-qwen3-isb1-code-8k: + tp2: {users: [2, 4, 8, 16, 32, 64, 128], offload: [0, 20, 40, 80, "noprefix"]} + tp4: {users: [2, 4, 8, 16, 32, 64, 128], offload: [0, 20, 40, 80, "noprefix"]} + +h200-fp8-qwen3-isb1-chat-32k: + tp2: {users: [1, 2, 4, 8, 16, 32], offload: [0, 20, 40, 80, "noprefix"]} + tp4: {users: [1, 2, 4, 8, 16, 32, 64], offload: [0, 20, 40, 80, "noprefix"]} + +h200-fp8-qwen3-isb1-code-131k: + tp4: {users: [1, 2, 4, 8], offload: [0, 20, 40, 80, "noprefix"]} + tp8: {users: [1, 2, 4, 8, 16], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-dsr1-isb1-code-8k: + tp4: {ep: 4, users: [4, 8, 16, 32, 64, 128, 256], offload: [0, 20, 40, 80, "noprefix"]} + tp8: {ep: 8, users: [8, 16, 32, 64, 128, 256, 512], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-dsr1-isb1-chat-32k: + tp4: {ep: 4, users: [1, 2, 4, 8, 16, 32, 64], offload: [0, 20, 40, 80, "noprefix"]} + tp8: {ep: 8, users: [1, 2, 4, 8, 16, 32, 64, 128], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-dsr1-isb1-code-131k: + tp8: {ep: 8, users: [1, 2, 4, 8, 16], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-dsr1-isb1-code-131k-hf: + tp8: {ep: 8, users: [1, 2, 4, 8, 16], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-qwen3-isb1-chat-500k-preview: + tp4: {users: [1, 2, 4], offload: [0, 20, 40, 80, "noprefix"]} + tp8: {users: [1, 2, 4, 8], offload: [0, 20, 40, 80, "noprefix"]} + +b200-fp4-qwen3-isb1-chat-1m-preview: + tp8: {users: [1, 2], offload: [0, 20, 40, 80, "noprefix"]} + +mi355x-fp8-qwen3-isb1-code-8k: + tp2: {users: [2, 4, 8, 16, 32, 64], offload: [0, 20, 40, 80, "noprefix"]} + tp4: {users: [2, 4, 8, 16, 32, 64, 128], offload: [0, 20, 40, 80, "noprefix"]} + +mi355x-fp8-qwen3-isb1-chat-32k: + tp4: {users: [1, 2, 4, 8, 16, 32], offload: [0, 20, 40, 80, "noprefix"]} + +h100-fp8-qwen3-isb1-code-8k-lmcache: + tp2: {users: [1, 2, 4, 8, 16, 32], offload: [0, 20, 40, 80, "noprefix"]} + tp4: {users: [1, 2, 4, 8, 16, 32, 64], offload: [0, 20, 40, 80, "noprefix"]} + +h200-fp8-qwen3-isb1-debug: + tp2: {users: [2], offload: [0, 20, 40, 80, "noprefix"]} diff --git a/datasets/isb1/kv_pressure/manifest.json b/datasets/isb1/kv_pressure/manifest.json new file mode 100644 index 000000000..1576590b8 --- /dev/null +++ b/datasets/isb1/kv_pressure/manifest.json @@ -0,0 +1,25 @@ +{ + "schema_version": "0.1.0", + "source_root": "datasets/isb1/converted", + "description": "Reference-only high-pressure subset for explicit KV offload sweeps. No new bundles are added; entries point at existing converted traces.", + "entries": [ + { + "file": "preview/long_context_1m/inferencex_trace_replay__coding_qwen3.5_ulc2_1m_preview_v1/isb1_hb_depth_cache_ulc2_offload_cliff_01.json", + "cumulative_isl_tokens": 9754210, + "max_concurrent_session_isl_tokens": 1626197, + "rationale": "1M coding replay that stays far beyond Hopper/Blackwell single-session KV budgets; use it to force preemption and swap activity quickly." + }, + { + "file": "preview/long_context_500k/inferencex_trace_replay__coding_qwen3.5_xlc2_500k_preview_v1/isb1_hb_depth_cache_xlc2_hot_cold_session_mix_01.json", + "cumulative_isl_tokens": 2688641, + "max_concurrent_session_isl_tokens": 672449, + "rationale": "500K hot/cold coding mix that surfaces partial-residency churn before the full 1M cliff, useful for intermediate cpu-offload settings." + }, + { + "file": "extension_131k/code_131k1k_qwen3.5/isb1_hb_depth_cache_xlc1_text_shared_prefix_swarm_01.json", + "cumulative_isl_tokens": 1214768, + "max_concurrent_session_isl_tokens": 304114, + "rationale": "131K shared-prefix coding swarm that isolates reuse-vs-offload tradeoffs without needing the preview lanes." + } + ] +} diff --git a/docs/kv_offload_playbook.md b/docs/kv_offload_playbook.md new file mode 100644 index 000000000..12daf072f --- /dev/null +++ b/docs/kv_offload_playbook.md @@ -0,0 +1,215 @@ +# KV Offload Playbook +This playbook is the operator companion to +`.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml`. +It adds three opt-in knobs on top of Cam's existing replay + aiperf flow: +1. explicit `--cpu-offload-gb` sweep values (`0`, `20`, `40`, `80`) +2. a side-car `/metrics` probe (`tools/kv_offload_probe.py`) +3. an LMCache NVMe cold-tier recipe (`docs/lmcache_nvme_recipe.md`) +Nothing here requires a harness edit. +You keep using the same replay wrappers and only swap: +- the sweep YAML you hand the generator, and/or +- the extra config you point LMCache at, and/or +- the probe you launch in parallel. +--- +## 1. What problem this solves +The original ISB1 replay lane exposed a coarse `offload ∈ {on, off, noprefix}`. +That is enough to show whether offload helps at all, but not enough to answer: +- how much CPU spill is needed before a cliff softens +- when prefix caching is better than spill +- when LMCache cold-tiering is cleaner than pushing more host RAM +- how preemption/swapping evolves during a real replay run +The new sweep file turns that binary switch into an operator gradient. +--- +## 2. The three knobs, in one table +| Knob | What it changes | Best first use | Main failure mode | +|---|---|---|---| +| `offload=0/20/40/80` | passes `--cpu-offload-gb N` to vLLM | HBM is close but not enough | host-memory bandwidth / latency tax | +| `offload=noprefix` | disables prefix caching with no CPU spill | establish the clean-cache floor | looks worse than production, by design | +| LMCache NVMe recipe | adds a colder disk tier via `LMCACHE_EXTRA_CONFIG_FILE` | working set is bigger than host RAM comfort zone | reclaim/readback jitter if disk is weak | +A practical order: +1. baseline with `0` +2. measure `noprefix` +3. sweep `20 → 40 → 80` +4. only then try LMCache NVMe if the cliff still arrives too early +--- +## 3. Interpreting the sweep values +### `offload=0` +This is the cleanest HBM-only comparison point. +The wrapper should omit `--cpu-offload-gb` entirely. +Use it to answer: “what happens if I rely only on HBM + prefix reuse?” +### `offload=20` +Small host spill budget. +Good first stop on H100-like rigs where you want to test whether the cliff is +caused by a modest overflow rather than a fundamentally oversized trace. +### `offload=40` +Middle ground. +Usually the best first operator setting on H200 if the workload just spills past +HBM during later turns or synchronized fan-out bursts. +### `offload=80` +Large spill budget. +Use this when the trace is intentionally pressure-heavy and you want to see if +more host residency flattens the cliff or just moves the pain into swap churn. +### `offload=noprefix` +No CPU spill, no prefix caching. +This is not meant to win. +It exists to expose the lower bound: what the lane looks like when every replay +turn pays the full prompt rebuild cost. +If `noprefix` and `0` are nearly identical, prefix caching is not buying much. +If `0` is much better than `noprefix`, preserve prefix reuse before reaching for +larger offload budgets. +--- +## 4. When to use CPU offload vs LMCache NVMe vs noprefix +## Use CPU offload when +- the cliff arrives late in the replay, not immediately +- preemption exists but does not explode +- you need a minimal, no-script-change test +- the workload fits in host RAM with reasonable margin +CPU offload is the lowest-friction intervention because it stays inside the +existing vLLM process model. +## Use LMCache NVMe when +- `80` GiB of CPU spill still leaves the run unstable +- host RAM is precious or shared with many worker processes +- you want a colder overflow tier without touching the existing shell wrappers +- the replay contains very large cold segments that are revisited less often +LMCache adds more moving parts, but it gives you a larger cold tier than pure +CPU spill. +## Use `noprefix` when +- you want the “no cache help at all” floor +- you suspect cache sharing, not spill size, is dominating results +- you want to quantify how much reuse the trace actually provides +Do not treat `noprefix` as a production recommendation. +Treat it as a measurement control. +--- +## 5. The curated KV-pressure subset +`datasets/isb1/kv_pressure/manifest.json` is intentionally small. +It points at existing converted traces with large replay inputs so you can force +pressure quickly without materializing new bundles. +Current entries: +| File | Cumulative ISL tokens | Peak per-turn ISL tokens | Why it matters | +|---|---:|---:|---| +| `preview/long_context_1m/.../isb1_hb_depth_cache_ulc2_offload_cliff_01.json` | 9,754,210 | 1,626,197 | the full long-context offload cliff | +| `preview/long_context_500k/.../isb1_hb_depth_cache_xlc2_hot_cold_session_mix_01.json` | 2,688,641 | 672,449 | intermediate hot/cold residency churn | +| `extension_131k/code_131k1k_qwen3.5/isb1_hb_depth_cache_xlc1_text_shared_prefix_swarm_01.json` | 1,214,768 | 304,114 | shared-prefix pressure without preview-only setup | +Use the validator flag to guarantee these numbers stay honest: +```bash +python3 tools/validate_kvcache_tester_trace.py \ + datasets/isb1/converted/ \ + --pressure-manifest datasets/isb1/kv_pressure/manifest.json +``` +--- +## 6. Running the side-car probe +The probe polls the vLLM Prometheus endpoint once per interval and appends JSONL. +Default interval is `1s`. +### Minimal launch +```bash +python3 tools/kv_offload_probe.py \ + --url http://127.0.0.1:8000 \ + --output /tmp/aiperf_run/probe.jsonl +``` +### Launch beside a replay/aiperf run +```bash +python3 tools/kv_offload_probe.py \ + --url http://127.0.0.1:8000 \ + --output /tmp/aiperf_run/probe.jsonl \ + --interval 1 \ + > /tmp/aiperf_run/probe.stdout.log \ + 2> /tmp/aiperf_run/probe.stderr.log & +PROBE_PID=$! +# run the existing replay / aiperf wrapper here +wait "$PROBE_PID" +``` +If you want the probe to stop automatically in a scripted smoke test, pass the +hidden `--max-samples N` flag. The production path does not need it. +--- +## 7. Probe fields and how to read them +Each JSONL row contains: +| Field | Meaning | +|---|---| +| `ts` | sample timestamp (`time.time()`) | +| `kv_cache_usage_perc` | best available KV occupancy gauge | +| `gpu_cache_usage_perc` | GPU-specific occupancy gauge when exposed | +| `cpu_offload_queue_depth` | optional queue depth metric if the endpoint exposes one | +| `num_preempted` | cumulative preemption counter | +| `num_swapped` | swap-related counter/gauge when exposed | +The probe accepts both older and newer vLLM metric names. +That matters because current stable docs expose `vllm:kv_cache_usage_perc`, while +older vLLM series exposed `vllm:gpu_cache_usage_perc`, +`vllm:num_requests_swapped`, and `vllm:num_preemptions_total`. +If a metric is missing, the field is written as `null`. +That is expected for version-skewed metrics like swap-related counters. +--- +## 8. Reading `preempted` and `swapped` +### `num_preempted` +This tells you the scheduler had to evict or reshuffle work to make progress. +A slow increase can be acceptable in a pressure test. +A rapid increase while throughput collapses is the classic cliff signal. +### `num_swapped` +This is the stronger warning sign. +If swap-related counters begin moving while TTFT and end-to-end latency spike, +you are no longer in a “small spill tax” regime — you are in an offload-bound +regime. +### Rule of thumb +| Pattern | Likely interpretation | +|---|---| +| KV usage high, no preemption, no swap | close to limit but still healthy | +| preemption rises, swap flat | soft pressure; try a modest spill budget | +| preemption and swap both rise | cliff onset; compare `40` vs `80` vs LMCache | +| `noprefix` much worse than `0` | prefix reuse is doing real work | +| `80` barely better than `40` | host spill is no longer the right lever | +--- +## 9. What the cliff looks like in the new YAML +The offload sweep is useful because it makes the cliff legible. +Typical pattern: +1. `0`: sharp collapse once the replay exceeds practical HBM residency +2. `20`: cliff shifts right slightly +3. `40`: cliff softens for medium pressure, still fails on the heaviest traces +4. `80`: either meaningfully extends stability or proves the run is now host-bound +5. `noprefix`: worst-case floor with cache reuse removed +You are looking for the smallest spill value that materially improves the curve. +If `80` is required just to survive, the lane may be a better fit for LMCache or +for a smaller working-set slice. +--- +## 10. Suggested operating sequence +### Fast triage +1. run `0` +2. run `noprefix` +3. if `0` already fails, jump to `40` +4. attach the probe once a failure mode reproduces +### Detailed comparison +1. select one pressure trace from `datasets/isb1/kv_pressure/manifest.json` +2. run `0`, `20`, `40`, `80`, `noprefix` +3. store `probe.jsonl` beside each artifact directory +4. compare TTFT / throughput alongside `num_preempted` and `num_swapped` +### Escalation path +- if `20` helps: keep the lane simple and stay on CPU spill +- if `40` or `80` helps but swap churn remains: consider LMCache NVMe +- if none help and `noprefix` is similar to `0`: the real issue may be poor reuse, + not insufficient spill +--- +## 11. Paste-ready command checklist +### Validate the subset + traces +```bash +python3 tools/validate_kvcache_tester_trace.py \ + datasets/isb1/converted/ \ + --pressure-manifest datasets/isb1/kv_pressure/manifest.json +``` +### Validate the sweep YAML parses +```bash +/opt/homebrew/opt/python@3.13/bin/python3.13 -c \ + "import yaml; yaml.safe_load(open('.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml'))" +``` +### Probe smoke test +```bash +/opt/homebrew/opt/python@3.13/bin/python3.13 -m pytest tools/test_kv_offload_probe.py -q +``` +--- +## 12. Guardrails +- this playbook is additive and opt-in +- nothing here edits `experimental/**` +- nothing here requires changing Cam's `*_lmcache_aiperf.sh` wrappers +- `datasets/isb1/kv_pressure/manifest.json` is reference-only and points at + existing converted traces +- if you only need one answer, prefer the smallest pressure trace that still + reproduces the failure +That is the entire point of this PR: give operators more precise offload +controls without forcing a harness fork. diff --git a/docs/kv_offload_readme.md b/docs/kv_offload_readme.md new file mode 100644 index 000000000..b2489b02d --- /dev/null +++ b/docs/kv_offload_readme.md @@ -0,0 +1,30 @@ +# KV Offload Readme + +This PR adds three operator-facing KV-pressure controls without touching the +existing replay harness scripts: + +1. **Granular CPU offload sweep** + - file: `.github/configs/multiturn-agentic-trace-isb1-offload-sweep.yaml` + - values: `0`, `20`, `40`, `80`, `noprefix` + +2. **Live vLLM metrics probe** + - file: `tools/kv_offload_probe.py` + - output: JSONL side-car from `/metrics` + +3. **LMCache NVMe cold-tier recipe** + - file: `docs/lmcache_nvme_recipe.md` + - consumed through `LMCACHE_EXTRA_CONFIG_FILE` + +Supporting pieces: + +- curated reference subset: `datasets/isb1/kv_pressure/manifest.json` +- validator extension: `tools/validate_kvcache_tester_trace.py --pressure-manifest` +- operator walkthrough: `docs/kv_offload_playbook.md` + +Suggested first pass: + +1. validate the trace tree + pressure manifest +2. compare `0` vs `noprefix` +3. sweep `20/40/80` +4. add the probe once the cliff reproduces +5. only then try LMCache NVMe diff --git a/docs/lmcache_nvme_recipe.md b/docs/lmcache_nvme_recipe.md new file mode 100644 index 000000000..0f5a859c8 --- /dev/null +++ b/docs/lmcache_nvme_recipe.md @@ -0,0 +1,113 @@ +# LMCache NVMe Recipe + +This is a paste-able NVMe cold-tier recipe for operators already using Cam's +existing LMCache replay lane. + +Do not edit the wrapper script. +Instead, point the existing flow at an extra config file via +`LMCACHE_EXTRA_CONFIG_FILE`. + +--- + +## 1. Intended use + +Use this when explicit `--cpu-offload-gb` values are not enough, or when you do +not want to keep pushing more working-set pressure into host RAM. + +Best fit: + +- very large cold turns +- replay lanes with bursty revisits +- operators who already trust the LMCache path + +--- + +## 2. Invocation pattern + +Cam's existing H100 LMCache lane is the consumer: + +- `experimental/multiturn/benchmarks/single_node/multiturn_fp8_h100_lmcache_aiperf.sh` + +You do **not** modify that script. +You only export one extra environment variable before invoking it. + +```bash +cat > /tmp/lmcache_nvme.yaml <<'YAML' +chunk_size: 256 +local_cpu: + enabled: true + max_size: 40GB +local_disk: + enabled: true + path: /local_nvme/lmcache + max_local_disk_size: 50GB + file_rotation: + enabled: true + max_file_size: 4GB + max_files: 16 + policy: lru +prefetch: + enabled: false +metrics: + enabled: true +YAML + +export LMCACHE_EXTRA_CONFIG_FILE=/tmp/lmcache_nvme.yaml +``` + +Then run the existing wrapper exactly as usual. + +--- + +## 3. Recommended file contents + +```yaml +chunk_size: 256 +local_cpu: + enabled: true + max_size: 40GB +local_disk: + enabled: true + path: /local_nvme/lmcache + max_local_disk_size: 50GB + file_rotation: + enabled: true + max_file_size: 4GB + max_files: 16 + policy: lru +prefetch: + enabled: false +metrics: + enabled: true +``` + +Why these defaults: + +- `chunk_size: 256` keeps chunking coarse enough for large replay assets +- `max_local_disk_size: 50GB` is large enough to matter but small enough to be a + deliberate cold tier, not an unbounded scratch dump +- rotation prevents silent NVMe growth during long aiperf loops +- disabling prefetch keeps the first experiment easier to reason about + +--- + +## 4. Operator notes + +- prefer a local NVMe mount, not network storage +- keep the LMCache directory isolated per run when debugging corruption or stale + cold blocks +- if disk latency is unstable, compare against the explicit CPU offload sweep + before drawing conclusions +- if `80` GiB CPU spill already works cleanly, LMCache may be unnecessary + +--- + +## 5. Minimal runbook snippet + +```bash +export LMCACHE_EXTRA_CONFIG_FILE=/tmp/lmcache_nvme.yaml +bash experimental/multiturn/benchmarks/single_node/multiturn_fp8_h100_lmcache_aiperf.sh +``` + +This recipe is intentionally config-only so the entire PR stays additive and +avoids any `experimental/**` code changes. diff --git a/tools/kv_offload_probe.py b/tools/kv_offload_probe.py new file mode 100644 index 000000000..d2702028e --- /dev/null +++ b/tools/kv_offload_probe.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Poll a vLLM ``/metrics`` endpoint and emit KV-offload JSONL samples.""" + +from __future__ import annotations + +import argparse +import json +import signal +import sys +import threading +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Iterable + +METRIC_ALIASES = { + 'kv_cache_usage_perc': ( + 'vllm:kv_cache_usage_perc', + 'vllm:gpu_cache_usage_perc', + ), + 'gpu_cache_usage_perc': ( + 'vllm:gpu_cache_usage_perc', + 'vllm:kv_cache_usage_perc', + ), + 'cpu_offload_queue_depth': ( + 'vllm:cpu_offload_queue_depth', + 'vllm:offload_queue_depth', + 'cpu_offload_queue_depth', + 'offload_queue_depth', + ), + 'num_preempted': ( + 'vllm:num_preemptions', + 'vllm:num_preemptions_total', + ), + 'num_swapped': ( + 'vllm:num_requests_swapped', + 'vllm:num_swapped', + 'vllm:num_swapped_total', + ), +} + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog='kv_offload_probe.py', + description='Poll vLLM /metrics and emit JSONL samples for KV-offload analysis.', + ) + parser.add_argument('--url', required=True, help='Base vLLM URL or full /metrics URL.') + parser.add_argument('--output', required=True, help='JSONL output path.') + parser.add_argument('--interval', type=float, default=1.0, help='Polling interval in seconds (default: 1.0).') + parser.add_argument('--timeout', type=float, default=5.0, help='HTTP timeout in seconds (default: 5.0).') + parser.add_argument('--max-samples', type=int, default=None, help=argparse.SUPPRESS) + return parser.parse_args(argv) + + +def _normalize_metrics_url(raw_url: str) -> str: + parsed = urllib.parse.urlparse(raw_url) + if not parsed.scheme: + raise ValueError(f'URL must include a scheme: {raw_url!r}') + if parsed.path in ('', '/'): + parsed = parsed._replace(path='/metrics') + return urllib.parse.urlunparse(parsed) + + +def _parse_prometheus_text(payload: str) -> dict[str, float]: + metrics: dict[str, float] = {} + for raw_line in payload.splitlines(): + line = raw_line.strip() + if not line or line.startswith('#'): + continue + parts = line.split() + if len(parts) < 2: + continue + metric_name = parts[0].split('{', 1)[0] + try: + value = float(parts[1]) + except ValueError: + continue + current = metrics.get(metric_name) + if current is None or value > current: + metrics[metric_name] = value + return metrics + + +def _select_metric(metrics: dict[str, float], aliases: Iterable[str]) -> float | None: + for name in aliases: + if name in metrics: + return metrics[name] + return None + + +def fetch_probe_sample(url: str, timeout: float = 5.0) -> dict[str, float | None]: + with urllib.request.urlopen(url, timeout=timeout) as response: + body = response.read().decode('utf-8', errors='replace') + parsed = _parse_prometheus_text(body) + return { + field: _select_metric(parsed, aliases) + for field, aliases in METRIC_ALIASES.items() + } + + +def poll_metrics( + *, + url: str, + output: Path, + interval: float, + timeout: float, + stop_event: threading.Event, + max_samples: int | None = None, +) -> int: + output.parent.mkdir(parents=True, exist_ok=True) + samples_written = 0 + with output.open('a', encoding='utf-8') as handle: + while not stop_event.is_set(): + ts = time.time() + try: + sample = fetch_probe_sample(url, timeout=timeout) + except (OSError, urllib.error.URLError, urllib.error.HTTPError) as exc: + sample = {field: None for field in METRIC_ALIASES} + sample['error'] = str(exc) + print(f'[kv_offload_probe] {exc}', file=sys.stderr) + sample['ts'] = ts + handle.write(json.dumps(sample, sort_keys=True) + '\n') + handle.flush() + samples_written += 1 + if max_samples is not None and samples_written >= max_samples: + break + if stop_event.wait(interval): + break + return samples_written + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv or sys.argv[1:]) + if args.interval <= 0: + print('--interval must be > 0', file=sys.stderr) + return 2 + if args.timeout <= 0: + print('--timeout must be > 0', file=sys.stderr) + return 2 + if args.max_samples is not None and args.max_samples <= 0: + print('--max-samples must be > 0 when provided', file=sys.stderr) + return 2 + + stop_event = threading.Event() + + def _stop(_: int, __) -> None: + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + try: + signal.signal(sig, _stop) + except ValueError: + pass + + try: + url = _normalize_metrics_url(args.url) + except ValueError as exc: + print(str(exc), file=sys.stderr) + return 2 + + samples = poll_metrics( + url=url, + output=Path(args.output), + interval=args.interval, + timeout=args.timeout, + stop_event=stop_event, + max_samples=args.max_samples, + ) + print(f'wrote {samples} samples to {args.output}') + return 0 + + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/tools/test_kv_offload_probe.py b/tools/test_kv_offload_probe.py new file mode 100644 index 000000000..516183595 --- /dev/null +++ b/tools/test_kv_offload_probe.py @@ -0,0 +1,150 @@ +# SPDX-License-Identifier: Apache-2.0 +"""unittest coverage for ``tools/kv_offload_probe.py``.""" + +from __future__ import annotations + +import http.server +import importlib.util +import json +import subprocess +import sys +import tempfile +import threading +import unittest +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +PROBE_PATH = REPO_ROOT / 'tools' / 'kv_offload_probe.py' + +_spec = importlib.util.spec_from_file_location('kv_offload_probe', PROBE_PATH) +assert _spec and _spec.loader +probe = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(probe) + + +class _MetricsHandler(http.server.BaseHTTPRequestHandler): + responses: list[str] = [] + request_count = 0 + + def do_GET(self) -> None: # noqa: N802 + if self.path != '/metrics': + self.send_response(404) + self.end_headers() + return + idx = min(self.__class__.request_count, len(self.__class__.responses) - 1) + payload = self.__class__.responses[idx] + self.__class__.request_count += 1 + body = payload.encode('utf-8') + self.send_response(200) + self.send_header('Content-Type', 'text/plain; version=0.0.4') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args) -> None: # noqa: A003 + return + + +class FakeMetricsServer: + def __init__(self, responses: list[str]) -> None: + _MetricsHandler.responses = responses + _MetricsHandler.request_count = 0 + self.server = http.server.ThreadingHTTPServer(('127.0.0.1', 0), _MetricsHandler) + self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) + + @property + def url(self) -> str: + host, port = self.server.server_address + return f'http://{host}:{port}' + + def __enter__(self) -> 'FakeMetricsServer': + self.thread.start() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + +class ParsePrometheusTextTests(unittest.TestCase): + def test_supports_v0_and_v1_aliases(self) -> None: + payload = ''' +# HELP vllm:gpu_cache_usage_perc GPU KV-cache usage. +vllm:gpu_cache_usage_perc{model_name="foo"} 0.5 +vllm:num_preemptions_total{model_name="foo"} 7 +vllm:num_requests_swapped{model_name="foo"} 2 +vllm:cpu_offload_queue_depth 3 +vllm:kv_cache_usage_perc{model_name="foo"} 0.75 +''' + parsed = probe._parse_prometheus_text(payload) + self.assertEqual(parsed['vllm:gpu_cache_usage_perc'], 0.5) + self.assertEqual(parsed['vllm:kv_cache_usage_perc'], 0.75) + sample = { + field: probe._select_metric(parsed, aliases) + for field, aliases in probe.METRIC_ALIASES.items() + } + self.assertEqual( + sample, + { + 'kv_cache_usage_perc': 0.75, + 'gpu_cache_usage_perc': 0.5, + 'cpu_offload_queue_depth': 3.0, + 'num_preempted': 7.0, + 'num_swapped': 2.0, + }, + ) + + +class ProbeCliTests(unittest.TestCase): + def test_cli_writes_jsonl_samples(self) -> None: + responses = [ + 'vllm:kv_cache_usage_perc 0.25\nvllm:num_preemptions_total 1\n', + 'vllm:gpu_cache_usage_perc 0.5\nvllm:num_requests_swapped 4\nvllm:cpu_offload_queue_depth 2\n', + ] + with tempfile.TemporaryDirectory() as tmpdir, FakeMetricsServer(responses) as server: + output = Path(tmpdir) / 'probe.jsonl' + result = subprocess.run( + [ + sys.executable, + str(PROBE_PATH), + '--url', + server.url, + '--output', + str(output), + '--interval', + '0.01', + '--timeout', + '1', + '--max-samples', + '2', + ], + capture_output=True, + text=True, + check=False, + ) + self.assertEqual(result.returncode, 0, msg=result.stderr) + rows = [json.loads(line) for line in output.read_text().splitlines()] + self.assertEqual(len(rows), 2) + self.assertEqual(rows[0]['kv_cache_usage_perc'], 0.25) + self.assertEqual(rows[0]['gpu_cache_usage_perc'], 0.25) + self.assertEqual(rows[0]['num_preempted'], 1.0) + self.assertIsNone(rows[0]['num_swapped']) + self.assertEqual(rows[1]['kv_cache_usage_perc'], 0.5) + self.assertEqual(rows[1]['gpu_cache_usage_perc'], 0.5) + self.assertEqual(rows[1]['cpu_offload_queue_depth'], 2.0) + self.assertEqual(rows[1]['num_swapped'], 4.0) + self.assertIsInstance(rows[0]['ts'], float) + + def test_fetch_probe_sample_handles_missing_metrics(self) -> None: + with FakeMetricsServer(['vllm:kv_cache_usage_perc 0.6\n']) as server: + sample = probe.fetch_probe_sample(server.url + '/metrics', timeout=1) + self.assertEqual(sample['kv_cache_usage_perc'], 0.6) + self.assertEqual(sample['gpu_cache_usage_perc'], 0.6) + self.assertIsNone(sample['cpu_offload_queue_depth']) + self.assertIsNone(sample['num_preempted']) + self.assertIsNone(sample['num_swapped']) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tools/validate_kvcache_tester_trace.py b/tools/validate_kvcache_tester_trace.py index 850da4991..11bf3a85b 100644 --- a/tools/validate_kvcache_tester_trace.py +++ b/tools/validate_kvcache_tester_trace.py @@ -4,7 +4,9 @@ Stdlib-only validator for the compact trace schema consumed by `trace_replay_tester.py` / `normalize_trace()` in Cam's kv-cache-tester. Supports validating a single JSON file or recursively walking a directory of -trace files. +trace files. When ``--pressure-manifest`` is provided, the validator also +cross-checks that the curated KV-pressure subset points at real trace files and +that its claimed ISL numbers match the on-disk traces exactly. """ from __future__ import annotations @@ -193,13 +195,11 @@ def _validate_request( else: _add_issue(errors, f"{prefix}.hash_ids must not mix flat and nested entries", max_issues) - optional_string_fields = ("model", "stop") - for field_name in optional_string_fields: + for field_name in ("model", "stop"): if field_name in req and not isinstance(req[field_name], str): _add_issue(errors, f"{prefix}.{field_name} must be str", max_issues) - optional_list_fields = ("input_types", "output_types") - for field_name in optional_list_fields: + for field_name in ("input_types", "output_types"): if field_name in req: value = req[field_name] if not isinstance(value, list): @@ -209,8 +209,7 @@ def _validate_request( if not isinstance(item, str): _add_issue(errors, f"{prefix}.{field_name}[{idx}] must be str", max_issues) - optional_number_fields = ("api_time", "think_time") - for field_name in optional_number_fields: + for field_name in ("api_time", "think_time"): if field_name in req: value = req[field_name] if not _is_number(value): @@ -275,6 +274,99 @@ def validate_trace(trace: Any, *, max_issues: int) -> tuple[list[str], list[str] return errors, warnings +def compute_trace_isl_stats(trace: dict[str, Any]) -> dict[str, int]: + requests = [ + req + for req in trace.get("requests", []) + if isinstance(req, dict) and req.get("type") != "subagent" + ] + in_tokens = [int(req.get("in", 0)) for req in requests if _is_int(req.get("in"))] + return { + "cumulative_isl_tokens": sum(in_tokens), + "max_concurrent_session_isl_tokens": max(in_tokens) if in_tokens else 0, + } + + +def validate_pressure_manifest( + manifest: Any, + *, + trace_root: Path, + max_issues: int, +) -> tuple[list[str], list[str]]: + errors: list[str] = [] + warnings: list[str] = [] + + if not isinstance(manifest, dict): + return [f"pressure manifest must be a JSON object, got {type(manifest).__name__}"], warnings + + entries = manifest.get("entries") + if not isinstance(entries, list) or not entries: + _add_issue(errors, "entries must be a non-empty list", max_issues) + return errors, warnings + + resolved_root = trace_root.resolve() + seen_files: set[str] = set() + + for idx, entry in enumerate(entries): + prefix = f"entries[{idx}]" + if not isinstance(entry, dict): + _add_issue(errors, f"{prefix} must be object, got {type(entry).__name__}", max_issues) + continue + + file_value = entry.get("file") + rationale = entry.get("rationale") + cumulative = entry.get("cumulative_isl_tokens") + maximum = entry.get("max_concurrent_session_isl_tokens") + + if not isinstance(file_value, str) or not file_value: + _add_issue(errors, f"{prefix}.file must be non-empty str", max_issues) + continue + if file_value in seen_files: + _add_issue(warnings, f"{prefix}.file duplicates an earlier entry: {file_value}", max_issues) + seen_files.add(file_value) + + if not isinstance(rationale, str) or not rationale.strip(): + _add_issue(errors, f"{prefix}.rationale must be non-empty str", max_issues) + if not _is_int(cumulative) or cumulative < 0: + _add_issue(errors, f"{prefix}.cumulative_isl_tokens must be int >= 0", max_issues) + if not _is_int(maximum) or maximum < 0: + _add_issue(errors, f"{prefix}.max_concurrent_session_isl_tokens must be int >= 0", max_issues) + if len(errors) >= max_issues: + continue + + candidate = (trace_root / file_value).resolve() + try: + candidate.relative_to(resolved_root) + except ValueError: + _add_issue(errors, f"{prefix}.file escapes trace root: {file_value}", max_issues) + continue + if not candidate.is_file(): + _add_issue(errors, f"{prefix}.file does not exist under {trace_root}: {file_value}", max_issues) + continue + + try: + trace = json.loads(candidate.read_text()) + except Exception as exc: # pragma: no cover - defensive path + _add_issue(errors, f"{prefix}.file is not valid JSON: {exc}", max_issues) + continue + + actual = compute_trace_isl_stats(trace) + if cumulative != actual["cumulative_isl_tokens"]: + _add_issue( + errors, + f"{prefix}.cumulative_isl_tokens = {cumulative}, expected {actual['cumulative_isl_tokens']} from {file_value}", + max_issues, + ) + if maximum != actual["max_concurrent_session_isl_tokens"]: + _add_issue( + errors, + f"{prefix}.max_concurrent_session_isl_tokens = {maximum}, expected {actual['max_concurrent_session_isl_tokens']} from {file_value}", + max_issues, + ) + + return errors, warnings + + def iter_trace_files(path: Path) -> list[Path]: if path.is_file(): return [path] @@ -307,6 +399,12 @@ def parse_args(argv: list[str]) -> argparse.Namespace: default=5, help="maximum errors reported per file (default: 5)", ) + parser.add_argument( + "--pressure-manifest", + type=Path, + default=None, + help="optional manifest.json describing curated high-pressure trace entries to cross-check", + ) return parser.parse_args(argv) @@ -329,6 +427,7 @@ def main(argv: list[str] | None = None) -> int: valid_count = 0 failed_count = 0 + manifest_clean = False for file_path in files: try: @@ -356,8 +455,43 @@ def main(argv: list[str] | None = None) -> int: for warning in warnings[: args.max_errors_per_file]: print(f" {warning}") + if args.pressure_manifest is not None: + try: + manifest_payload = json.loads(args.pressure_manifest.read_text()) + except FileNotFoundError: + print(f"Path not found: {args.pressure_manifest}", file=sys.stderr) + return 2 + except Exception as exc: + print(f"invalid pressure manifest JSON: {exc}", file=sys.stderr) + return 2 + + trace_root = path if path.is_dir() else path.parent + manifest_errors, manifest_warnings = validate_pressure_manifest( + manifest_payload, + trace_root=trace_root, + max_issues=args.max_errors_per_file, + ) + effective_manifest_errors = list(manifest_errors) + if args.strict: + effective_manifest_errors.extend(manifest_warnings) + if effective_manifest_errors: + failed_count += 1 + if not args.quiet: + print(f"{CROSS} {args.pressure_manifest}") + for issue in effective_manifest_errors[: args.max_errors_per_file]: + print(f" {issue}") + else: + manifest_clean = True + if manifest_warnings and not args.quiet: + print(f"{WARN} {args.pressure_manifest}") + for warning in manifest_warnings[: args.max_errors_per_file]: + print(f" {warning}") + if failed_count == 0: - print(f"{CHECK} {valid_count} files valid | 0 failed") + if manifest_clean: + print(f"{CHECK} {valid_count} files valid | pressure manifest clean | 0 failed") + else: + print(f"{CHECK} {valid_count} files valid | 0 failed") return 0 plural = "files" if failed_count != 1 else "file"