diff --git a/pyproject.toml b/pyproject.toml
index 6d796f65..37154bc7 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -52,6 +52,9 @@ docs = [
axisarray = [
"numpy>=2.2.6",
]
+perf = [
+ "xarray",
+]
[project.scripts]
ezmsg = "ezmsg.core.command:cmdline"
diff --git a/src/ezmsg/core/command.py b/src/ezmsg/core/command.py
index 61e76f97..f9c753d3 100644
--- a/src/ezmsg/core/command.py
+++ b/src/ezmsg/core/command.py
@@ -2,8 +2,6 @@
import asyncio
import inspect
-from ezmsg.util.perf.command import setup_perf_cmdline
-
from .commands import setup_core_cmdline
from .commands.graphviz import handle_graphviz
from .commands.mermaid import handle_mermaid, mermaid_url as mm
@@ -38,6 +36,8 @@ def build_parser() -> argparse.ArgumentParser:
subparsers = parser.add_subparsers(dest="command", required=True, help="command for ezmsg")
setup_core_cmdline(subparsers)
+ from ezmsg.util.perf.command import setup_perf_cmdline
+
setup_perf_cmdline(subparsers)
return parser
diff --git a/src/ezmsg/util/perf/ab.py b/src/ezmsg/util/perf/ab.py
index 5a2da621..e4696f57 100644
--- a/src/ezmsg/util/perf/ab.py
+++ b/src/ezmsg/util/perf/ab.py
@@ -2,6 +2,7 @@
import argparse
import contextlib
+import hashlib
import json
import os
import random
@@ -15,6 +16,27 @@
DEFAULT_PAIR_SEED = 0
+DEFAULT_REF_A = "dev"
+DEFAULT_REF_B = "CURRENT"
+VENV_DIR_CANDIDATES = (".venv", "venv", ".env", "env")
+
+
+@dataclass(frozen=True)
+class ABEnvironmentInfo:
+ label: str
+ ref: str
+ tree: str
+ python: str
+ python_version: str
+ ezmsg_version: str
+ numpy_version: str
+ git_commit: str
+ git_branch: str
+ dirty: bool
+ env_mode: str
+ pyproject_hash: str | None
+ uv_lock_hash: str | None
+ env_overrides: dict[str, str]
@dataclass(frozen=True)
@@ -34,6 +56,9 @@ class ABRunSummary:
ref_b: str
rounds: int
seed: int
+ env_a: ABEnvironmentInfo
+ env_b: ABEnvironmentInfo
+ warnings: list[str]
cases: list[ABCaseSummary]
@@ -49,6 +74,7 @@ def _hotpath_json_arg(path: Path) -> list[str]:
def build_hotpath_command(
+ python: str | Path,
output_path: Path,
count: int,
warmup: int,
@@ -59,9 +85,7 @@ def build_hotpath_command(
quiet: bool,
) -> list[str]:
cmd = [
- "uv",
- "run",
- "python",
+ str(python),
"-m",
"ezmsg.util.perf.hotpath",
"--count",
@@ -85,6 +109,18 @@ def build_hotpath_command(
return cmd
+def parse_env_assignments(values: list[str]) -> dict[str, str]:
+ assignments: dict[str, str] = {}
+ for value in values:
+ if "=" not in value:
+ raise ValueError(f"Environment override must use KEY=VALUE format: {value}")
+ key, env_value = value.split("=", 1)
+ if not key:
+ raise ValueError(f"Environment override is missing a key: {value}")
+ assignments[key] = env_value
+ return assignments
+
+
def load_hotpath_summary(path: Path) -> dict[str, float]:
payload = json.loads(path.read_text())
return {
@@ -99,6 +135,9 @@ def summarize_ab_results(
rounds: int,
seed: int,
paired_runs: list[tuple[dict[str, float], dict[str, float]]],
+ env_a: ABEnvironmentInfo | None = None,
+ env_b: ABEnvironmentInfo | None = None,
+ warnings: list[str] | None = None,
) -> ABRunSummary:
case_ids = sorted(paired_runs[0][0].keys())
cases: list[ABCaseSummary] = []
@@ -119,11 +158,30 @@ def summarize_ab_results(
)
)
+ placeholder = ABEnvironmentInfo(
+ label="?",
+ ref="unknown",
+ tree="unknown",
+ python="unknown",
+ python_version="unknown",
+ ezmsg_version="unknown",
+ numpy_version="unknown",
+ git_commit="unknown",
+ git_branch="unknown",
+ dirty=False,
+ env_mode="unknown",
+ pyproject_hash=None,
+ uv_lock_hash=None,
+ env_overrides={},
+ )
return ABRunSummary(
ref_a=ref_a,
ref_b=ref_b,
rounds=rounds,
seed=seed,
+ env_a=env_a or placeholder,
+ env_b=env_b or placeholder,
+ warnings=warnings or [],
cases=cases,
)
@@ -136,9 +194,7 @@ def _median(values: list[float]) -> float:
return (ordered[mid - 1] + ordered[mid]) / 2.0
-def _run_checked(cmd: list[str], cwd: Path) -> None:
- env = os.environ.copy()
- env.pop("VIRTUAL_ENV", None)
+def _run_checked(cmd: list[str], cwd: Path, env: dict[str, str] | None = None) -> None:
completed = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, env=env)
if completed.returncode == 0:
return
@@ -151,6 +207,18 @@ def _run_checked(cmd: list[str], cwd: Path) -> None:
)
+def _run_json(cmd: list[str], cwd: Path, env: dict[str, str]) -> dict[str, str]:
+ completed = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, env=env)
+ if completed.returncode != 0:
+ raise RuntimeError(
+ f"Command failed in {cwd}:\n"
+ f"$ {' '.join(cmd)}\n\n"
+ f"stdout:\n{completed.stdout}\n"
+ f"stderr:\n{completed.stderr}"
+ )
+ return json.loads(completed.stdout)
+
+
def _is_current_ref(ref: str) -> bool:
return ref.upper() == "CURRENT"
@@ -184,10 +252,6 @@ def _provision_tree(
shutil.rmtree(parent, ignore_errors=True)
-def _maybe_sync(tree: Path) -> None:
- _run_checked(["uv", "sync", "--group", "dev"], cwd=tree)
-
-
def _mirror_hotpath_module(source_root: Path, target_tree: Path) -> None:
source = source_root / "src" / "ezmsg" / "util" / "perf" / "hotpath.py"
target = target_tree / "src" / "ezmsg" / "util" / "perf" / "hotpath.py"
@@ -209,11 +273,182 @@ def _ensure_json_files_match(
)
+def _python_from_venv(venv_dir: Path) -> Path | None:
+ candidates = [venv_dir / "bin" / "python", venv_dir / "Scripts" / "python.exe"]
+ for candidate in candidates:
+ if candidate.exists():
+ return candidate
+ return None
+
+
+def discover_tree_python(tree: Path) -> Path | None:
+ for dirname in VENV_DIR_CANDIDATES:
+ python = _python_from_venv(tree / dirname)
+ if python is not None:
+ return python
+ return None
+
+
+def _virtual_env_from_python(python: Path) -> str | None:
+ parent = python.parent
+ if parent.name in {"bin", "Scripts"}:
+ return str(parent.parent)
+ return None
+
+
+def _tree_env(
+ base_env: dict[str, str],
+ tree: Path,
+ python: Path,
+ overrides: dict[str, str],
+) -> dict[str, str]:
+ env = base_env.copy()
+ env.update(overrides)
+ path_parts = [str(tree / "src")]
+ if env.get("PYTHONPATH"):
+ path_parts.append(env["PYTHONPATH"])
+ env["PYTHONPATH"] = os.pathsep.join(path_parts)
+ venv = _virtual_env_from_python(python)
+ if venv is not None:
+ env["VIRTUAL_ENV"] = venv
+ return env
+
+
+def _hash_file(path: Path) -> str | None:
+ if not path.exists():
+ return None
+ return hashlib.sha256(path.read_bytes()).hexdigest()[:12]
+
+
+def _git_value(tree: Path, args: list[str], default: str = "unknown") -> str:
+ completed = subprocess.run(
+ ["git", *args],
+ cwd=tree,
+ capture_output=True,
+ text=True,
+ )
+ if completed.returncode != 0:
+ return default
+ return completed.stdout.strip()
+
+
+def _git_dirty(tree: Path) -> bool:
+ completed = subprocess.run(
+ ["git", "status", "--porcelain"],
+ cwd=tree,
+ capture_output=True,
+ text=True,
+ )
+ if completed.returncode != 0:
+ return False
+ return bool(completed.stdout.strip())
+
+
+def _runtime_probe_env(
+ tree: Path,
+ python: Path,
+ overrides: dict[str, str],
+) -> dict[str, str]:
+ return _tree_env(os.environ.copy(), tree, python, overrides)
+
+
+def _probe_runtime(tree: Path, python: Path, overrides: dict[str, str]) -> dict[str, str]:
+ script = (
+ "import json, sys\n"
+ "payload = {\n"
+ " 'python': sys.executable,\n"
+ " 'python_version': sys.version.replace('\\n', ' '),\n"
+ " 'ezmsg_version': 'unknown',\n"
+ " 'numpy_version': 'unknown',\n"
+ "}\n"
+ "try:\n"
+ " import ezmsg.core as ez\n"
+ " payload['ezmsg_version'] = ez.__version__\n"
+ "except Exception:\n"
+ " pass\n"
+ "try:\n"
+ " import numpy as np\n"
+ " payload['numpy_version'] = np.__version__\n"
+ "except Exception:\n"
+ " pass\n"
+ "print(json.dumps(payload))\n"
+ )
+ return _run_json(
+ [str(python), "-c", script],
+ cwd=tree,
+ env=_runtime_probe_env(tree, python, overrides),
+ )
+
+
+def _build_env_info(
+ label: str,
+ ref: str,
+ tree: Path,
+ python: Path,
+ env_mode: str,
+ overrides: dict[str, str],
+) -> ABEnvironmentInfo:
+ runtime = _probe_runtime(tree, python, overrides)
+ return ABEnvironmentInfo(
+ label=label,
+ ref=ref,
+ tree=str(tree),
+ python=runtime["python"],
+ python_version=runtime["python_version"],
+ ezmsg_version=runtime["ezmsg_version"],
+ numpy_version=runtime["numpy_version"],
+ git_commit=_git_value(tree, ["rev-parse", "HEAD"]),
+ git_branch=_git_value(tree, ["rev-parse", "--abbrev-ref", "HEAD"]),
+ dirty=_git_dirty(tree),
+ env_mode=env_mode,
+ pyproject_hash=_hash_file(tree / "pyproject.toml"),
+ uv_lock_hash=_hash_file(tree / "uv.lock"),
+ env_overrides=overrides,
+ )
+
+
+def _shared_env_warnings(info_a: ABEnvironmentInfo, info_b: ABEnvironmentInfo) -> list[str]:
+ warnings: list[str] = []
+ if info_a.pyproject_hash != info_b.pyproject_hash:
+ warnings.append(
+ "pyproject.toml differs between A and B while using a shared environment."
+ )
+ if info_a.uv_lock_hash != info_b.uv_lock_hash:
+ warnings.append(
+ "uv.lock differs between A and B while using a shared environment."
+ )
+ return warnings
+
+
+def _print_env_info(env_info: ABEnvironmentInfo) -> None:
+ print(
+ f"{env_info.label}: ref={env_info.ref} tree={env_info.tree} "
+ f"python={env_info.python} env_mode={env_info.env_mode}"
+ )
+ print(
+ f" python_version={env_info.python_version} "
+ f"ezmsg={env_info.ezmsg_version} numpy={env_info.numpy_version}"
+ )
+ print(
+ f" branch={env_info.git_branch} commit={env_info.git_commit} dirty={env_info.dirty}"
+ )
+ print(
+ f" pyproject_hash={env_info.pyproject_hash} uv_lock_hash={env_info.uv_lock_hash}"
+ )
+ if env_info.env_overrides:
+ pairs = ", ".join(f"{key}={value}" for key, value in sorted(env_info.env_overrides.items()))
+ print(f" env_overrides={pairs}")
+
+
def _print_summary(summary: ABRunSummary) -> None:
print(
f"Interleaved hot-path comparison: A={summary.ref_a}, "
f"B={summary.ref_b}, rounds={summary.rounds}, seed={summary.seed}"
)
+ _print_env_info(summary.env_a)
+ _print_env_info(summary.env_b)
+ for warning in summary.warnings:
+ print(f"WARNING: {warning}")
for case in summary.cases:
sign = "regression" if case.delta_pct_median > 0 else "improvement"
print(
@@ -232,14 +467,68 @@ def dump_ab_json(summary: ABRunSummary, path: Path) -> None:
"ref_b": summary.ref_b,
"rounds": summary.rounds,
"seed": summary.seed,
+ "env_a": asdict(summary.env_a),
+ "env_b": asdict(summary.env_b),
+ "warnings": summary.warnings,
"cases": [asdict(case) for case in summary.cases],
}
path.write_text(json.dumps(payload, indent=2) + "\n")
+def _default_ref(ref: str | None, fallback: str) -> str:
+ return fallback if ref is None else ref
+
+
+def _resolve_python_for_shared(
+ python_a: str | None,
+ python_b: str | None,
+) -> Path:
+ if python_a is not None and python_b is not None and python_a != python_b:
+ raise ValueError(
+ "shared env mode requires a single interpreter; --python-a and --python-b must match"
+ )
+ if python_a is not None:
+ return Path(python_a)
+ if python_b is not None:
+ return Path(python_b)
+ return Path(sys.executable)
+
+
+def _resolve_python_for_existing(
+ tree: Path,
+ label: str,
+ explicit_python: str | None,
+ repo_root: Path,
+) -> Path:
+ if explicit_python is not None:
+ return Path(explicit_python)
+
+ discovered = discover_tree_python(tree)
+ if discovered is not None:
+ return discovered
+
+ if tree.resolve() == repo_root.resolve():
+ return Path(sys.executable)
+
+ raise ValueError(
+ f"Could not locate a Python interpreter for side {label} in {tree}. "
+ "Prepare the environment yourself and rerun with --env-mode existing "
+ "plus --python-a/--python-b or local .venv/venv directories."
+ )
+
+
def perf_ab(
- ref_a: str,
- ref_b: str,
+ ref_a: str | None,
+ ref_b: str | None,
+ dir_a: Path | None,
+ dir_b: Path | None,
+ python_a: str | None,
+ python_b: str | None,
+ env_mode: str,
+ force_shared_env: bool,
+ env: list[str],
+ env_a: list[str],
+ env_b: list[str],
rounds: int,
count: int,
warmup: int,
@@ -251,11 +540,23 @@ def perf_ab(
seed: int,
json_out: Path | None,
keep_worktrees: bool,
- sync: bool,
quiet: bool,
) -> None:
if rounds <= 0:
raise ValueError("rounds must be > 0")
+ if dir_a is not None and ref_a is not None:
+ raise ValueError("Use either --ref-a or --dir-a, not both")
+ if dir_b is not None and ref_b is not None:
+ raise ValueError("Use either --ref-b or --dir-b, not both")
+ if force_shared_env and env_mode != "shared":
+ raise ValueError("--force-shared-env only applies to --env-mode shared")
+
+ shared_overrides = parse_env_assignments(env)
+ env_overrides_a = {**shared_overrides, **parse_env_assignments(env_a)}
+ env_overrides_b = {**shared_overrides, **parse_env_assignments(env_b)}
+
+ resolved_ref_a = dir_a.name if dir_a is not None else _default_ref(ref_a, DEFAULT_REF_A)
+ resolved_ref_b = dir_b.name if dir_b is not None else _default_ref(ref_b, DEFAULT_REF_B)
repo_root = Path(
subprocess.run(
@@ -267,63 +568,117 @@ def perf_ab(
)
pair_order = build_pair_order(rounds, seed)
- with _provision_tree(repo_root, ref_a, "A", keep_worktrees) as tree_a:
- with _provision_tree(repo_root, ref_b, "B", keep_worktrees) as tree_b:
- if tree_a != repo_root:
- _mirror_hotpath_module(repo_root, tree_a)
- if tree_b != repo_root:
- _mirror_hotpath_module(repo_root, tree_b)
-
- if sync:
- _maybe_sync(tree_a)
- if tree_b != tree_a:
- _maybe_sync(tree_b)
-
- with tempfile.TemporaryDirectory(prefix="ezmsg-perf-ab-runs-") as tmpdir_name:
- tmpdir = Path(tmpdir_name)
- cmd_by_label = {
- "A": lambda path: build_hotpath_command(
- path,
- count=count,
- warmup=warmup,
- payload_sizes=payload_sizes,
- transports=transports,
- apis=apis,
- num_buffers=num_buffers,
- quiet=quiet,
- ),
- "B": lambda path: build_hotpath_command(
- path,
- count=count,
- warmup=warmup,
- payload_sizes=payload_sizes,
- transports=transports,
- apis=apis,
- num_buffers=num_buffers,
- quiet=quiet,
- ),
- }
- tree_by_label = {"A": tree_a, "B": tree_b}
-
- for idx in range(prewarm):
- for label in ("A", "B"):
- if label == "B" and tree_b == tree_a:
- continue
- warm_path = tmpdir / f"warm-{label}-{idx}.json"
- _run_checked(cmd_by_label[label](warm_path), cwd=tree_by_label[label])
-
- paired_runs: list[tuple[dict[str, float], dict[str, float]]] = []
- for round_idx, (first, second) in enumerate(pair_order, start=1):
- outputs: dict[str, dict[str, float]] = {}
- for label in (first, second):
- output_path = tmpdir / f"round-{round_idx:02d}-{label}.json"
- _run_checked(cmd_by_label[label](output_path), cwd=tree_by_label[label])
- outputs[label] = load_hotpath_summary(output_path)
-
- _ensure_json_files_match(outputs["A"], outputs["B"], ref_a, ref_b)
- paired_runs.append((outputs["A"], outputs["B"]))
-
- summary = summarize_ab_results(ref_a, ref_b, rounds, seed, paired_runs)
+ with contextlib.ExitStack() as stack:
+ tree_a = (
+ dir_a.resolve()
+ if dir_a is not None
+ else stack.enter_context(_provision_tree(repo_root, resolved_ref_a, "A", keep_worktrees))
+ )
+ tree_b = (
+ dir_b.resolve()
+ if dir_b is not None
+ else stack.enter_context(_provision_tree(repo_root, resolved_ref_b, "B", keep_worktrees))
+ )
+
+ if dir_a is None and tree_a != repo_root:
+ _mirror_hotpath_module(repo_root, tree_a)
+ if dir_b is None and tree_b != repo_root and tree_b != tree_a:
+ _mirror_hotpath_module(repo_root, tree_b)
+
+ if env_mode == "shared":
+ shared_python = _resolve_python_for_shared(python_a, python_b)
+ python_path_a = shared_python
+ python_path_b = shared_python
+ else:
+ python_path_a = _resolve_python_for_existing(tree_a, "A", python_a, repo_root)
+ python_path_b = _resolve_python_for_existing(tree_b, "B", python_b, repo_root)
+
+ env_info_a = _build_env_info(
+ "A", resolved_ref_a, tree_a, python_path_a, env_mode, env_overrides_a
+ )
+ env_info_b = _build_env_info(
+ "B", resolved_ref_b, tree_b, python_path_b, env_mode, env_overrides_b
+ )
+
+ warnings = []
+ if env_mode == "shared":
+ warnings = _shared_env_warnings(env_info_a, env_info_b)
+ if warnings and not force_shared_env:
+ raise ValueError(
+ "Shared-environment comparison detected project metadata mismatches:\n"
+ + "\n".join(f"- {warning}" for warning in warnings)
+ + "\n\nRe-run with --force-shared-env to continue anyway, or prepare "
+ "side-specific environments and use --env-mode existing."
+ )
+
+ with tempfile.TemporaryDirectory(prefix="ezmsg-perf-ab-runs-") as tmpdir_name:
+ tmpdir = Path(tmpdir_name)
+ cmd_by_label = {
+ "A": lambda path: build_hotpath_command(
+ python_path_a,
+ path,
+ count=count,
+ warmup=warmup,
+ payload_sizes=payload_sizes,
+ transports=transports,
+ apis=apis,
+ num_buffers=num_buffers,
+ quiet=quiet,
+ ),
+ "B": lambda path: build_hotpath_command(
+ python_path_b,
+ path,
+ count=count,
+ warmup=warmup,
+ payload_sizes=payload_sizes,
+ transports=transports,
+ apis=apis,
+ num_buffers=num_buffers,
+ quiet=quiet,
+ ),
+ }
+ tree_by_label = {"A": tree_a, "B": tree_b}
+ env_by_label = {
+ "A": _tree_env(os.environ.copy(), tree_a, python_path_a, env_overrides_a),
+ "B": _tree_env(os.environ.copy(), tree_b, python_path_b, env_overrides_b),
+ }
+
+ for idx in range(prewarm):
+ for label in ("A", "B"):
+ if label == "B" and tree_b == tree_a and env_by_label["B"] == env_by_label["A"]:
+ continue
+ warm_path = tmpdir / f"warm-{label}-{idx}.json"
+ _run_checked(
+ cmd_by_label[label](warm_path),
+ cwd=tree_by_label[label],
+ env=env_by_label[label],
+ )
+
+ paired_runs: list[tuple[dict[str, float], dict[str, float]]] = []
+ for round_idx, (first, second) in enumerate(pair_order, start=1):
+ outputs: dict[str, dict[str, float]] = {}
+ for label in (first, second):
+ output_path = tmpdir / f"round-{round_idx:02d}-{label}.json"
+ _run_checked(
+ cmd_by_label[label](output_path),
+ cwd=tree_by_label[label],
+ env=env_by_label[label],
+ )
+ outputs[label] = load_hotpath_summary(output_path)
+
+ _ensure_json_files_match(outputs["A"], outputs["B"], resolved_ref_a, resolved_ref_b)
+ paired_runs.append((outputs["A"], outputs["B"]))
+
+ summary = summarize_ab_results(
+ resolved_ref_a,
+ resolved_ref_b,
+ rounds,
+ seed,
+ paired_runs,
+ env_a=env_info_a,
+ env_b=env_info_b,
+ warnings=warnings,
+ )
_print_summary(summary)
if json_out is not None:
dump_ab_json(summary, json_out)
@@ -333,10 +688,43 @@ def perf_ab(
def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None:
p_ab = subparsers.add_parser(
"ab",
- help="run interleaved A/B hot-path comparisons using git worktrees",
+ help="run interleaved A/B hot-path comparisons using worktrees or prepared directories",
+ )
+ p_ab.add_argument("--ref-a", default=None, help=f"baseline git ref (default = {DEFAULT_REF_A})")
+ p_ab.add_argument("--ref-b", default=None, help=f"candidate git ref (default = {DEFAULT_REF_B})")
+ p_ab.add_argument("--dir-a", type=Path, default=None, help="use an existing directory for side A")
+ p_ab.add_argument("--dir-b", type=Path, default=None, help="use an existing directory for side B")
+ p_ab.add_argument("--python-a", default=None, help="explicit Python interpreter for side A")
+ p_ab.add_argument("--python-b", default=None, help="explicit Python interpreter for side B")
+ p_ab.add_argument(
+ "--env-mode",
+ choices=["shared", "existing"],
+ default="shared",
+ help="shared = reuse one interpreter for both sides; existing = use prepared per-tree environments",
+ )
+ p_ab.add_argument(
+ "--force-shared-env",
+ action="store_true",
+ help="continue shared-env comparisons even when pyproject.toml or uv.lock differ",
+ )
+ p_ab.add_argument(
+ "--env",
+ action="append",
+ default=[],
+ help="environment override for both sides (KEY=VALUE). Repeatable.",
+ )
+ p_ab.add_argument(
+ "--env-a",
+ action="append",
+ default=[],
+ help="environment override for side A only (KEY=VALUE). Repeatable.",
+ )
+ p_ab.add_argument(
+ "--env-b",
+ action="append",
+ default=[],
+ help="environment override for side B only (KEY=VALUE). Repeatable.",
)
- p_ab.add_argument("--ref-a", default="dev", help="baseline git ref or CURRENT")
- p_ab.add_argument("--ref-b", default="CURRENT", help="candidate git ref or CURRENT")
p_ab.add_argument(
"--rounds",
type=int,
@@ -405,11 +793,6 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None:
action="store_true",
help="leave auto-provisioned worktrees on disk for inspection",
)
- p_ab.add_argument(
- "--sync",
- action="store_true",
- help="run 'uv sync --group dev' in each provisioned worktree first",
- )
p_ab.add_argument(
"--quiet",
action="store_true",
@@ -419,6 +802,15 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None:
_handler=lambda ns: perf_ab(
ref_a=ns.ref_a,
ref_b=ns.ref_b,
+ dir_a=ns.dir_a,
+ dir_b=ns.dir_b,
+ python_a=ns.python_a,
+ python_b=ns.python_b,
+ env_mode=ns.env_mode,
+ force_shared_env=ns.force_shared_env,
+ env=ns.env,
+ env_a=ns.env_a,
+ env_b=ns.env_b,
rounds=ns.rounds,
count=ns.count,
warmup=ns.warmup,
@@ -430,7 +822,6 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None:
seed=ns.seed,
json_out=ns.json_out,
keep_worktrees=ns.keep_worktrees,
- sync=ns.sync,
quiet=ns.quiet,
)
)
diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py
index 590e681f..743acbbe 100644
--- a/src/ezmsg/util/perf/analysis.py
+++ b/src/ezmsg/util/perf/analysis.py
@@ -1,35 +1,50 @@
-import json
-import dataclasses
+from __future__ import annotations
+
import argparse
+import dataclasses
import html
+import json
import math
import webbrowser
from pathlib import Path
-
-from ..messagecodec import MessageDecoder
-from .envinfo import TestEnvironmentInfo, format_env_diff
-from .run import get_datestamp
-from .impl import (
- TestParameters,
- Metrics,
- TestLogEntry,
-)
+from typing import TYPE_CHECKING, Any
import ezmsg.core as ez
-try:
+if TYPE_CHECKING:
+ import numpy as np
+ import pandas as pd
import xarray as xr
- import pandas as pd # xarray depends on pandas
-except ImportError:
- ez.logger.error("ezmsg perf analysis requires xarray")
- raise
+ from .envinfo import TestEnvironmentInfo
-try:
- import numpy as np
-except ImportError:
- ez.logger.error("ezmsg perf analysis requires numpy")
- raise
+xr: Any | None = None
+pd: Any | None = None
+np: Any | None = None
+
+
+def _load_analysis_dependencies() -> tuple[Any, Any, Any]:
+ global xr, pd, np
+
+ if xr is None or pd is None:
+ try:
+ import xarray as _xr
+ import pandas as _pd # xarray depends on pandas
+ except ImportError:
+ ez.logger.error("ezmsg perf analysis requires xarray")
+ raise
+ xr = _xr
+ pd = _pd
+
+ if np is None:
+ try:
+ import numpy as _np
+ except ImportError:
+ ez.logger.error("ezmsg perf analysis requires numpy")
+ raise
+ np = _np
+
+ return xr, pd, np
TEST_DESCRIPTION = """
Configurations (config):
@@ -46,7 +61,7 @@
- shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively
Variables:
-- n_clients: pubs (fanin), subs (fanout), or relays (relay)
+- n_clients: pubs (fanin), subs (fanout), or relays (relay)
- msg_size: nominal message size (bytes)
Metrics:
@@ -55,10 +70,74 @@
- latency_mean: average send -> receive latency in seconds (lower = better)
"""
+KEY_COLUMNS = ["config", "comms", "n_clients", "msg_size"]
+DISPLAY_METRICS = [
+ "sample_rate_mean",
+ "sample_rate_median",
+ "data_rate",
+ "latency_mean",
+ "latency_median",
+]
+METRIC_LABELS = {
+ "sample_rate_mean": "sample_rate_mean",
+ "sample_rate_median": "sample_rate_median",
+ "data_rate": "data_rate",
+ "latency_mean": "latency_mean",
+ "latency_median": "latency_median",
+}
+METRIC_GROUPS = {
+ "sample_rate_mean": "throughput",
+ "sample_rate_median": "throughput",
+ "data_rate": "throughput",
+ "latency_mean": "latency",
+ "latency_median": "latency",
+}
+ABSOLUTE_UNITS = {
+ "sample_rate_mean": "msgs/s",
+ "sample_rate_median": "msgs/s",
+ "data_rate": "MB/s",
+ "latency_mean": "us",
+ "latency_median": "us",
+}
+RELATIVE_UNITS = {metric: "%" for metric in DISPLAY_METRICS}
+NOISE_BAND_PCT = 10.0
+
+
+@dataclasses.dataclass
+class MetricDelta:
+ metric: str
+ config: str
+ comms: str
+ n_clients: int
+ msg_size: int
+ value: float
+ score: float
+
+
+@dataclasses.dataclass
+class ReportBundle:
+ perf_path: Path
+ baseline_path: Path | None
+ info: TestEnvironmentInfo
+ baseline_info: TestEnvironmentInfo | None
+ env_diff: str | None
+ relative: bool
+ terminal_df: pd.DataFrame
+ candidate_df: pd.DataFrame
+ baseline_df: pd.DataFrame | None
+ relative_df: pd.DataFrame | None
+ delta_counts: dict[str, int]
+ top_improvements: list[MetricDelta]
+ top_regressions: list[MetricDelta]
+
def load_perf(perf: Path) -> xr.Dataset:
- all_results: dict[TestParameters, dict[int, list[Metrics]]] = dict()
+ xr, _, np = _load_analysis_dependencies()
+ from ..messagecodec import MessageDecoder
+ from .envinfo import TestEnvironmentInfo
+ from .impl import Metrics, TestLogEntry
+ all_results: dict[Any, dict[int, list[Any]]] = dict()
run_idx = 0
with open(perf, "r") as perf_f:
@@ -74,10 +153,10 @@ def load_perf(perf: Path) -> xr.Dataset:
runs[run_idx] = metrics
all_results[obj.params] = runs
- n_clients_axis = list(sorted(set([p.n_clients for p in all_results.keys()])))
- msg_size_axis = list(sorted(set([p.msg_size for p in all_results.keys()])))
- comms_axis = list(sorted(set([p.comms for p in all_results.keys()])))
- config_axis = list(sorted(set([p.config for p in all_results.keys()])))
+ n_clients_axis = list(sorted({p.n_clients for p in all_results}))
+ msg_size_axis = list(sorted({p.msg_size for p in all_results}))
+ comms_axis = list(sorted({p.comms for p in all_results}))
+ config_axis = list(sorted({p.config for p in all_results}))
dims = ["n_clients", "msg_size", "comms", "config"]
coords = {
@@ -89,7 +168,7 @@ def load_perf(perf: Path) -> xr.Dataset:
data_vars = {}
for field in dataclasses.fields(Metrics):
- m = (
+ metric_values = (
np.zeros(
(
len(n_clients_axis),
@@ -100,400 +179,947 @@ def load_perf(perf: Path) -> xr.Dataset:
)
* np.nan
)
- for p, a in all_results.items():
- # tests are run multiple times; get the median of means
- m[
- n_clients_axis.index(p.n_clients),
- msg_size_axis.index(p.msg_size),
- comms_axis.index(p.comms),
- config_axis.index(p.config),
+ for params, runs in all_results.items():
+ metric_values[
+ n_clients_axis.index(params.n_clients),
+ msg_size_axis.index(params.msg_size),
+ comms_axis.index(params.comms),
+ config_axis.index(params.config),
] = np.median(
- [np.mean([getattr(v, field.name) for v in r]) for r in a.values()]
+ [np.mean([getattr(v, field.name) for v in run]) for run in runs.values()]
)
- data_vars[field.name] = xr.DataArray(m, dims=dims, coords=coords)
+ data_vars[field.name] = xr.DataArray(metric_values, dims=dims, coords=coords)
- dataset = xr.Dataset(data_vars, attrs=dict(info=info))
- return dataset
+ return xr.Dataset(data_vars, attrs=dict(info=info))
-def _escape(s: str) -> str:
- return html.escape(str(s), quote=True)
+def default_report_html_path(perf_path: Path) -> Path:
+ return perf_path.with_suffix(".html")
-def _env_block(title: str, body: str) -> str:
- return f"""
-
- {_escape(title)}
- {_escape(body).strip()}
-
- """
+def default_compare_html_path(perf_path: Path, baseline_path: Path) -> Path:
+ return perf_path.with_name(f"{perf_path.stem}.vs_{baseline_path.stem}.html")
-def _legend_block() -> str:
- return """
-
- Legend
-
- Comparison mode: values are percentages (100 = no change).
- Green: improvement (↑ sample/data rate, ↓ latency).
- Red: regression (↓ sample/data rate, ↑ latency).
-
-
- """
+def _escape(value: object) -> str:
+ return html.escape(str(value), quote=True)
+
+
+def _frame_from_dataset(dataset: xr.Dataset) -> pd.DataFrame:
+ frame = dataset.to_dataframe().dropna(how="all")
+ frame = frame.reset_index()
+ frame = frame.dropna(subset=DISPLAY_METRICS, how="all")
+ return frame.sort_values(KEY_COLUMNS).reset_index(drop=True)
+
+
+def _display_frame(frame: pd.DataFrame, relative: bool) -> pd.DataFrame:
+ out = frame.copy()
+ out = out[KEY_COLUMNS + DISPLAY_METRICS]
+ if not relative:
+ out["data_rate"] = out["data_rate"] / 2**20
+ out["latency_mean"] = out["latency_mean"] * 1e6
+ out["latency_median"] = out["latency_median"] * 1e6
+ return out
+
+
+def _metric_score(metric: str, value: float) -> float:
+ if not (isinstance(value, (int, float)) and math.isfinite(value)):
+ return 0.0
+ if "latency" in metric:
+ return 100.0 - value
+ return value - 100.0
+
+
+def _classify_metric(metric: str, value: float, noise_band_pct: float = NOISE_BAND_PCT) -> str:
+ score = _metric_score(metric, value)
+ if abs(score) <= noise_band_pct:
+ return "neutral"
+ return "improvement" if score > 0 else "regression"
+
+
+def _collect_metric_deltas(
+ relative_df: pd.DataFrame | None, noise_band_pct: float = NOISE_BAND_PCT
+) -> tuple[dict[str, int], list[MetricDelta], list[MetricDelta]]:
+ if relative_df is None:
+ return {"improvement": 0, "neutral": 0, "regression": 0}, [], []
+
+ counts = {"improvement": 0, "neutral": 0, "regression": 0}
+ improvements: list[MetricDelta] = []
+ regressions: list[MetricDelta] = []
+
+ for _, row in relative_df.iterrows():
+ for metric in DISPLAY_METRICS:
+ value = float(row[metric])
+ classification = _classify_metric(metric, value, noise_band_pct=noise_band_pct)
+ counts[classification] += 1
+ score = _metric_score(metric, value)
+ if classification == "improvement":
+ improvements.append(
+ MetricDelta(
+ metric=metric,
+ config=str(row["config"]),
+ comms=str(row["comms"]),
+ n_clients=int(row["n_clients"]),
+ msg_size=int(row["msg_size"]),
+ value=value,
+ score=score,
+ )
+ )
+ elif classification == "regression":
+ regressions.append(
+ MetricDelta(
+ metric=metric,
+ config=str(row["config"]),
+ comms=str(row["comms"]),
+ n_clients=int(row["n_clients"]),
+ msg_size=int(row["msg_size"]),
+ value=value,
+ score=score,
+ )
+ )
+
+ improvements.sort(key=lambda item: item.score, reverse=True)
+ regressions.sort(key=lambda item: item.score)
+ return counts, improvements, regressions
+
+
+def _format_terminal_value(metric: str, value: float, relative: bool) -> str:
+ if pd.isna(value):
+ return "nan"
+ if relative:
+ return f"{float(value):.1f}%"
+ if metric in {"latency_mean", "latency_median"}:
+ return f"{float(value):,.3f}"
+ if metric == "data_rate":
+ return f"{float(value):,.3f}"
+ return f"{float(value):,.2f}"
+
+
+def _terminal_table(frame: pd.DataFrame, relative: bool) -> str:
+ formatted = frame.copy()
+ for metric in DISPLAY_METRICS:
+ formatted[metric] = formatted[metric].map(
+ lambda value, metric=metric: _format_terminal_value(metric, value, relative)
+ )
+
+ sections: list[str] = []
+ for (config, comms), group in formatted.groupby(["config", "comms"], sort=False):
+ sections.append(f"{config} / {comms}")
+ sections.append(
+ group[["n_clients", "msg_size", *DISPLAY_METRICS]].to_string(index=False)
+ )
+ sections.append("")
+ return "\n".join(sections).strip()
+
+
+def _terminal_delta_summary(
+ counts: dict[str, int],
+ improvements: list[MetricDelta],
+ regressions: list[MetricDelta],
+ limit: int = 5,
+) -> str:
+ lines = [
+ "COMPARISON OVERVIEW",
+ (
+ f" improvements: {counts['improvement']}, neutral: {counts['neutral']}, "
+ f"regressions: {counts['regression']}"
+ ),
+ "",
+ ]
+
+ if regressions:
+ lines.append("BIGGEST REGRESSIONS")
+ for delta in regressions[:limit]:
+ lines.append(
+ " "
+ f"{delta.metric}: {delta.value:.1f}% "
+ f"({delta.config}/{delta.comms}, n_clients={delta.n_clients}, msg_size={delta.msg_size})"
+ )
+ lines.append("")
+
+ if improvements:
+ lines.append("BIGGEST IMPROVEMENTS")
+ for delta in improvements[:limit]:
+ lines.append(
+ " "
+ f"{delta.metric}: {delta.value:.1f}% "
+ f"({delta.config}/{delta.comms}, n_clients={delta.n_clients}, msg_size={delta.msg_size})"
+ )
+ lines.append("")
+
+ return "\n".join(lines).strip()
+
+
+def build_report_bundle(perf_path: Path, baseline_path: Path | None = None) -> ReportBundle:
+ _load_analysis_dependencies()
+ from .envinfo import format_env_diff
+
+ candidate = load_perf(perf_path)
+ info: TestEnvironmentInfo = candidate.attrs["info"]
+ candidate_frame = _display_frame(_frame_from_dataset(candidate), relative=False)
+
+ baseline_info: TestEnvironmentInfo | None = None
+ env_diff: str | None = None
+ relative = baseline_path is not None
+ baseline_frame: pd.DataFrame | None = None
+ relative_frame: pd.DataFrame | None = None
+
+ if baseline_path is not None:
+ baseline = load_perf(baseline_path)
+ baseline_info = baseline.attrs["info"]
+ env_diff = format_env_diff(info.diff(baseline_info))
+ baseline_frame = _display_frame(_frame_from_dataset(baseline), relative=False)
+ relative_dataset = (candidate / baseline) * 100.0
+ relative_dataset = relative_dataset.drop_vars(["latency_total", "num_msgs"])
+ relative_frame = _display_frame(_frame_from_dataset(relative_dataset), relative=True)
+ terminal_df = relative_frame
+ else:
+ terminal_df = candidate_frame
+
+ delta_counts, top_improvements, top_regressions = _collect_metric_deltas(relative_frame)
+
+ return ReportBundle(
+ perf_path=perf_path,
+ baseline_path=baseline_path,
+ info=info,
+ baseline_info=baseline_info,
+ env_diff=env_diff,
+ relative=relative,
+ terminal_df=terminal_df,
+ candidate_df=candidate_frame,
+ baseline_df=baseline_frame,
+ relative_df=relative_frame,
+ delta_counts=delta_counts,
+ top_improvements=top_improvements,
+ top_regressions=top_regressions,
+ )
+
+
+def _build_terminal_output(bundle: ReportBundle) -> str:
+ lines = [str(bundle.info), ""]
+
+ if bundle.relative:
+ lines.extend(
+ [
+ "PERFORMANCE COMPARISON",
+ "",
+ bundle.env_diff or "No differences.",
+ "",
+ _terminal_delta_summary(
+ bundle.delta_counts, bundle.top_improvements, bundle.top_regressions
+ ),
+ "",
+ ]
+ )
+ else:
+ lines.extend(["PERFORMANCE REPORT", ""])
+
+ lines.extend([_terminal_table(bundle.terminal_df, relative=bundle.relative), ""])
+ return "\n".join(line for line in lines if line is not None).strip()
+
+
+def _format_html_number(metric: str, value: float, mode: str) -> str:
+ if not (isinstance(value, (int, float)) and math.isfinite(value)):
+ return "n/a"
+ if mode == "relative":
+ return f"{value:.1f}%"
+ if metric in {"latency_mean", "latency_median"}:
+ return f"{value:,.3f}"
+ if metric == "data_rate":
+ return f"{value:,.3f}"
+ return f"{value:,.2f}"
+
+
+def _color_for_comparison(
+ value: float, metric: str, noise_band_pct: float = NOISE_BAND_PCT
+) -> str:
+ if not (isinstance(value, (int, float)) and math.isfinite(value)):
+ return ""
+ score = _metric_score(metric, value)
+ magnitude = abs(score)
+ if magnitude <= noise_band_pct:
+ return ""
+
+ scale = max(0.0, min(1.0, (magnitude - noise_band_pct) / 45.0))
+ hue = "var(--green)" if score > 0 else "var(--red)"
+ alpha = 0.15 + 0.35 * scale
+ return f"background-color: hsla({hue}, 70%, 45%, {alpha});"
def _base_css() -> str:
- # Minimal, print-friendly CSS + color scales for cells.
return """
"""
-def _color_for_comparison(
- value: float, metric: str, noise_band_pct: float = 10.0
-) -> str:
- """
- Returns inline CSS background for a comparison % value.
- value: e.g., 97.3, 104.8, etc.
- For sample_rate/data_rate: improvement > 100 (good).
- For latency_mean: improvement < 100 (good).
- Noise band ±10% around 100 is neutral.
- """
- if not (isinstance(value, (int, float)) and math.isfinite(value)):
- return ""
+def _render_delta_list(title: str, deltas: list[MetricDelta], empty: str) -> str:
+ if not deltas:
+ return f"{_escape(title)} {_escape(empty)}
"
- delta = value - 100.0
- # Determine direction: + is good for sample/data; - is good for latency
- if "rate" in metric:
- # positive delta good, negative bad
- magnitude = abs(delta)
- sign_good = delta > 0
- elif "latency" in metric:
- # negative delta good (lower latency)
- magnitude = abs(delta)
- sign_good = delta < 0
- else:
- return ""
+ items = []
+ for delta in deltas[:5]:
+ items.append(
+ "
"
+ f"{_escape(delta.metric)} : {_escape(f'{delta.value:.1f}%')} "
+ f"({_escape(delta.config)}/{_escape(delta.comms)}, "
+ f"n_clients={delta.n_clients}, msg_size={delta.msg_size}) "
+ " "
+ )
+ return (
+ f"{_escape(title)} "
+ f"{''.join(items)} "
+ )
- # Noise band: keep neutral
- if magnitude <= noise_band_pct:
- return ""
- # Scale 5%..50% across 0..1; clamp
- scale = max(0.0, min(1.0, (magnitude - noise_band_pct) / 45.0))
+def _build_rows(bundle: ReportBundle) -> list[dict[str, object]]:
+ rows: list[dict[str, object]] = []
+ baseline_lookup: dict[tuple[str, str, int, int], dict[str, object]] = {}
+ relative_lookup: dict[tuple[str, str, int, int], dict[str, object]] = {}
- # Choose hue and lightness; use HSL with gentle saturation
- hue = "var(--green)" if sign_good else "var(--red)"
- # opacity via alpha blend on lightness via HSLa
- # Use saturation ~70%, lightness around 40–50% blended with table bg
- alpha = 0.15 + 0.35 * scale # 0.15..0.50
- return f"background-color: hsla({hue}, 70%, 45%, {alpha});"
+ if bundle.baseline_df is not None:
+ for row in bundle.baseline_df.to_dict("records"):
+ key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"]))
+ baseline_lookup[key] = row
+ if bundle.relative_df is not None:
+ for row in bundle.relative_df.to_dict("records"):
+ key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"]))
+ relative_lookup[key] = row
-def _format_number(x) -> str:
- if isinstance(x, (int,)) and not isinstance(x, bool):
- return f"{x:d}"
- try:
- xf = float(x)
- except Exception:
- return _escape(str(x))
- # Heuristic: for comparison percentages, 1 decimal is nice; for absolute, 3 decimals for latency.
- return f"{xf:.3f}"
+ for row in bundle.candidate_df.to_dict("records"):
+ key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"]))
+ baseline_row = baseline_lookup.get(key)
+ relative_row = relative_lookup.get(key)
+ severity = 0.0
+ if relative_row is not None:
+ severity = max(abs(_metric_score(metric, float(relative_row[metric]))) for metric in DISPLAY_METRICS)
+ rows.append(
+ {
+ "config": key[0],
+ "comms": key[1],
+ "n_clients": key[2],
+ "msg_size": key[3],
+ "candidate": row,
+ "baseline": baseline_row,
+ "relative": relative_row,
+ "severity": severity,
+ }
+ )
+ return rows
-def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> None:
- """print perf test results and comparisons to the console"""
+def _options(values: list[object], label: str) -> str:
+ options = ["All "]
+ for value in values:
+ options.append(f"{_escape(value)} ")
+ return (
+ f"{_escape(label)}"
+ f"{''.join(options)} "
+ )
- output = ""
- perf = load_perf(perf_path)
- info: TestEnvironmentInfo = perf.attrs["info"]
- output += str(info) + "\n\n"
+def _report_title(bundle: ReportBundle) -> str:
+ return "ezmsg Performance Comparison" if bundle.relative else "ezmsg Performance Report"
- relative = False
- env_diff = None
- if baseline_path is not None:
- relative = True
- output += "PERFORMANCE COMPARISON\n\n"
- baseline = load_perf(baseline_path)
- perf = (perf / baseline) * 100.0
- baseline_info: TestEnvironmentInfo = baseline.attrs["info"]
- env_diff = format_env_diff(info.diff(baseline_info))
- output += env_diff + "\n\n"
-
- # These raw stats are still valuable to have, but are confusing
- # when making relative comparisons
- perf = perf.drop_vars(["latency_total", "num_msgs"])
-
- perf = perf.stack(params=["n_clients", "msg_size"]).dropna("params")
- df = perf.squeeze().to_dataframe()
- df = df.drop("n_clients", axis=1)
- df = df.drop("msg_size", axis=1)
-
- for _, config_ds in perf.groupby("config"):
- for _, comms_ds in config_ds.groupby("comms"):
- output += str(comms_ds.squeeze().to_dataframe()) + "\n\n"
- output += "\n"
-
- print(output)
-
- if html:
- # Ensure expected columns exist
- expected_cols = {
- "sample_rate_mean",
- "sample_rate_median",
- "data_rate",
- "latency_mean",
- "latency_median",
- }
- missing = expected_cols - set(df.columns)
- if missing:
- raise ValueError(f"Missing expected columns in dataset: {missing}")
-
- # We'll render a table per (config, comms) group.
- groups = (
- df.reset_index()
- .sort_values(by=["config", "comms", "n_clients", "msg_size"])
- .groupby(["config", "comms"], sort=False)
- )
- # Build HTML
- parts: list[str] = []
- parts.append(" ")
+def render_html_report(bundle: ReportBundle) -> str:
+ rows = _build_rows(bundle)
+ filters = {
+ "config": sorted(bundle.candidate_df["config"].unique().tolist()),
+ "comms": sorted(bundle.candidate_df["comms"].unique().tolist()),
+ "n_clients": sorted(bundle.candidate_df["n_clients"].unique().tolist()),
+ "msg_size": sorted(bundle.candidate_df["msg_size"].unique().tolist()),
+ }
+ mode = "relative" if bundle.relative else "candidate"
+
+ parts: list[str] = []
+ parts.append(" ")
+ parts.append(
+ " "
+ )
+ parts.append(f"{_escape(_report_title(bundle))} ")
+ parts.append(_base_css())
+ parts.append("")
+
+ parts.append("
")
+ parts.append(f"{_escape(_report_title(bundle))} ")
+ sub = str(bundle.perf_path)
+ if bundle.baseline_path is not None:
+ sub += f" relative to {bundle.baseline_path}"
+ parts.append(f"{_escape(sub)}
")
+ parts.append(" ")
+
+ if bundle.relative:
+ parts.append("
Comparison Overview ")
+ for label, value in [
+ ("Improvements", bundle.delta_counts["improvement"]),
+ ("Neutral", bundle.delta_counts["neutral"]),
+ ("Regressions", bundle.delta_counts["regression"]),
+ ]:
+ parts.append(
+ "
"
+ f"{_escape(label)} "
+ f"{_escape(value)} "
+ "
"
+ )
+ parts.append("
")
+
+ parts.append("
")
parts.append(
- "
"
+ _render_delta_list(
+ "Biggest Regressions",
+ bundle.top_regressions,
+ "No regressions outside the noise band.",
+ )
)
- parts.append("
ezmsg perf report ")
- parts.append(_base_css())
- parts.append("
")
-
- parts.append("
")
- parts.append("ezmsg Performance Report ")
- sub = str(perf_path)
- if baseline_path is not None:
- sub += f" relative to {str(baseline_path)}"
- parts.append(f"{_escape(sub)}
")
- parts.append(" ")
-
- if info is not None:
- parts.append(_env_block("Test Environment", str(info)))
-
- parts.append(_env_block("Test Details", TEST_DESCRIPTION))
-
- if env_diff is not None:
- # Show diffs using your helper
- parts.append("
")
- parts.append("Environment Differences vs Baseline ")
- parts.append(f"{_escape(env_diff)} ")
- parts.append(" ")
- parts.append(_legend_block())
-
- # Render each group
- for (config, comms), g in groups:
- # Keep only expected columns in order
- cols = [
- "n_clients",
- "msg_size",
- "sample_rate_mean",
- "sample_rate_median",
- "data_rate",
- "latency_mean",
- "latency_median",
- ]
- g = g[cols].copy()
+ parts.append(
+ _render_delta_list(
+ "Biggest Improvements",
+ bundle.top_improvements,
+ "No improvements outside the noise band.",
+ )
+ )
+ parts.append("
")
+
+ parts.append("
")
+ parts.append("Environment ")
+ parts.append(f"{_escape(str(bundle.info))} ")
+ parts.append(" ")
+
+ if bundle.env_diff is not None:
+ parts.append("
")
+ parts.append("Environment Differences vs Baseline ")
+ parts.append(f"{_escape(bundle.env_diff)} ")
+ parts.append(" ")
+
+ parts.append("
Test Details ")
+ parts.append(f"{_escape(TEST_DESCRIPTION.strip())} ")
+ parts.append("")
- # String format some columns (msg_size with separators)
- g["msg_size"] = g["msg_size"].map(
- lambda x: f"{int(x):,}" if pd.notna(x) else x
+ parts.append("
")
+ parts.append("Explore ")
+ parts.append("")
+ parts.append(_options(filters["config"], "Config"))
+ parts.append(_options(filters["comms"], "Comms"))
+ parts.append(_options(filters["n_clients"], "N_clients"))
+ parts.append(_options(filters["msg_size"], "Msg_size"))
+ parts.append(
+ "Sort"
+ ""
+ "Config "
+ "Comms "
+ "n_clients "
+ "msg_size "
+ "Largest change "
+ " "
+ )
+ parts.append("
")
+ parts.append("")
+ for view, label in [("all", "All Metrics"), ("throughput", "Throughput"), ("latency", "Latency")]:
+ active = " active" if view == "all" else ""
+ parts.append(
+ f"{_escape(label)} "
+ )
+ if bundle.relative:
+ parts.append(" ")
+ for display_mode, label in [
+ ("relative", "Relative"),
+ ("candidate", "Candidate"),
+ ("baseline", "Baseline"),
+ ]:
+ active = " active" if display_mode == mode else ""
+ parts.append(
+ f"{_escape(label)} "
)
+ parts.append("
")
+ parts.append(" ")
- # Build table manually so we can inject inline cell styles easily
- # (pandas Styler is great but produces bulky HTML; manual keeps it clean)
- header = f"""
-
-
- n_clients
- msg_size {"" if relative else "(b)"}
- sample_rate_mean {"" if relative else "(msgs/s)"}
- sample_rate_median {"" if relative else "(msgs/s)"}
- data_rate {"" if relative else "(MB/s)"}
- latency_mean {"" if relative else "(us)"}
- latency_median {"" if relative else "(us)"}
-
-
- """
- body_rows: list[str] = []
- for _, row in g.iterrows():
- sr, srm, dr, lt, lm = (
- row["sample_rate_mean"],
- row["sample_rate_median"],
- row["data_rate"],
- row["latency_mean"],
- row["latency_median"],
- )
- dr = dr if relative else dr / 2**20
- lt = lt if relative else lt * 1e6
- lm = lm if relative else lm * 1e6
- sr_style = (
- _color_for_comparison(sr, "sample_rate_mean") if relative else ""
- )
- srm_style = (
- _color_for_comparison(srm, "sample_rate_median") if relative else ""
- )
- dr_style = _color_for_comparison(dr, "data_rate") if relative else ""
- lt_style = _color_for_comparison(lt, "latency_mean") if relative else ""
- lm_style = (
- _color_for_comparison(lm, "latency_median") if relative else ""
- )
+ parts.append("
Results ")
+ for column in ["config", "comms", "n_clients", "msg_size"]:
+ parts.append(f"{_escape(column)} ")
+ for metric in DISPLAY_METRICS:
+ parts.append(
+ f""
+ )
+ parts.append(" ")
- body_rows.append(
- ""
- f"{_format_number(row['n_clients'])} "
- f"{_escape(row['msg_size'])} "
- f"{_format_number(sr)} "
- f"{_format_number(srm)} "
- f"{_format_number(dr)} "
- f"{_format_number(lt)} "
- f"{_format_number(lm)} "
- " "
- )
- table_html = f"{header}{''.join(body_rows)}
"
+ for row in rows:
+ parts.append(
+ ""
+ )
+ for column in ["config", "comms", "n_clients", "msg_size"]:
+ parts.append(f"{_escape(row[column])} ")
+ candidate = row["candidate"]
+ baseline = row["baseline"]
+ relative_row = row["relative"]
+ for metric in DISPLAY_METRICS:
+ candidate_value = float(candidate[metric])
+ baseline_value = (
+ float(baseline[metric]) if baseline is not None else float("nan")
+ )
+ relative_value = (
+ float(relative_row[metric]) if relative_row is not None else float("nan")
+ )
+ style = (
+ _color_for_comparison(relative_value, metric)
+ if bundle.relative and math.isfinite(relative_value)
+ else ""
+ )
+ initial_value = (
+ relative_value if bundle.relative else candidate_value
+ )
+ initial_mode = "relative" if bundle.relative else "candidate"
parts.append(
- f""
- f"{_escape(config)} "
- f"{_escape(comms)} "
- f" {table_html} "
+ ""
+ f"{_escape(_format_html_number(metric, initial_value, initial_mode))}"
+ " "
)
+ parts.append(" ")
+
+ parts.append("
")
+ parts.append(
+ """
+
+ """
+ % (
+ json.dumps(METRIC_LABELS),
+ json.dumps(ABSOLUTE_UNITS),
+ json.dumps(RELATIVE_UNITS),
+ json.dumps(mode),
+ )
+ )
+ parts.append("
")
+ return "".join(parts)
+
+
+def write_html_report(
+ perf_path: Path,
+ baseline_path: Path | None = None,
+ output_path: Path | None = None,
+ open_browser: bool = False,
+) -> Path:
+ bundle = build_report_bundle(perf_path, baseline_path=baseline_path)
+ if output_path is None:
+ output_path = (
+ default_compare_html_path(perf_path, baseline_path)
+ if baseline_path is not None
+ else default_report_html_path(perf_path)
+ )
+ html_text = render_html_report(bundle)
+ output_path.write_text(html_text, encoding="utf-8")
+ if open_browser:
+ webbrowser.open(output_path.resolve().as_uri())
+ return output_path
+
+
+def report(
+ perf_path: Path,
+ output_path: Path | None = None,
+ open_browser: bool = True,
+) -> None:
+ bundle = build_report_bundle(perf_path)
+ print(_build_terminal_output(bundle))
+ out_path = write_html_report(
+ perf_path=perf_path,
+ output_path=output_path,
+ open_browser=open_browser,
+ )
+ print(f"\nHTML report: {out_path}")
+
+
+def compare(
+ perf_path: Path,
+ baseline_path: Path,
+ output_path: Path | None = None,
+ open_browser: bool = True,
+) -> None:
+ bundle = build_report_bundle(perf_path, baseline_path=baseline_path)
+ print(_build_terminal_output(bundle))
+ out_path = write_html_report(
+ perf_path=perf_path,
+ baseline_path=baseline_path,
+ output_path=output_path,
+ open_browser=open_browser,
+ )
+ print(f"\nComparison report: {out_path}")
+
+
+def setup_report_cmdline(subparsers: argparse._SubParsersAction) -> None:
+ parser = subparsers.add_parser(
+ "report", help="render an absolute report for a benchmark file"
+ )
+ parser.add_argument("perf", type=Path, help="benchmark file")
+ parser.add_argument(
+ "--output",
type=Path,
- help="perf test",
+ default=None,
+ help="optional explicit HTML output path",
+ )
+ parser.add_argument(
+ "--no-browser",
+ action="store_true",
+ help="write HTML without opening it in a browser",
+ )
+ parser.set_defaults(
+ _handler=lambda ns: report(
+ perf_path=ns.perf,
+ output_path=ns.output,
+ open_browser=not ns.no_browser,
+ )
+ )
+
+
+def setup_compare_cmdline(subparsers: argparse._SubParsersAction) -> None:
+ parser = subparsers.add_parser(
+ "compare", help="compare one benchmark file against a baseline"
)
- p_summary.add_argument(
+ parser.add_argument("perf", type=Path, help="candidate benchmark file")
+ parser.add_argument(
"--baseline",
"-b",
type=Path,
+ required=True,
+ help="baseline benchmark file for comparison",
+ )
+ parser.add_argument(
+ "--output",
+ type=Path,
default=None,
- help="baseline perf test for comparison",
+ help="optional explicit HTML output path",
)
- p_summary.add_argument(
- "--html",
+ parser.add_argument(
+ "--no-browser",
action="store_true",
- help="generate an html output file and render results in browser",
+ help="write HTML without opening it in a browser",
)
-
- p_summary.set_defaults(
- _handler=lambda ns: summary(
- perf_path=ns.perf, baseline_path=ns.baseline, html=ns.html
+ parser.set_defaults(
+ _handler=lambda ns: compare(
+ perf_path=ns.perf,
+ baseline_path=ns.baseline,
+ output_path=ns.output,
+ open_browser=not ns.no_browser,
)
)
diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py
index 31f70451..b15200a4 100644
--- a/src/ezmsg/util/perf/command.py
+++ b/src/ezmsg/util/perf/command.py
@@ -2,19 +2,20 @@
import sys
from .ab import setup_ab_cmdline
-from .analysis import setup_summary_cmdline
+from .analysis import setup_compare_cmdline, setup_report_cmdline
from .hotpath import setup_hotpath_cmdline
-from .run import setup_run_cmdline
+from .run import setup_benchmark_cmdline
def setup_perf_cmdline(subparsers: argparse._SubParsersAction) -> None:
parser = subparsers.add_parser("perf", help="performance test utilities")
perf_subparsers = parser.add_subparsers(dest="perf_command", required=True)
- setup_run_cmdline(perf_subparsers)
+ setup_benchmark_cmdline(perf_subparsers)
+ setup_report_cmdline(perf_subparsers)
+ setup_compare_cmdline(perf_subparsers)
setup_hotpath_cmdline(perf_subparsers)
setup_ab_cmdline(perf_subparsers)
- setup_summary_cmdline(perf_subparsers)
def build_parser() -> argparse.ArgumentParser:
diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py
index 3f794f21..d002ad74 100644
--- a/src/ezmsg/util/perf/run.py
+++ b/src/ezmsg/util/perf/run.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import os
import sys
import json
@@ -9,24 +11,15 @@
from datetime import datetime, timedelta
from contextlib import contextmanager, redirect_stdout, redirect_stderr
+from pathlib import Path
import ezmsg.core as ez
from ezmsg.core.graphserver import GraphServer
-from ..messagecodec import MessageEncoder
-from .envinfo import TestEnvironmentInfo
-from .util import warmup
-from .impl import (
- TestParameters,
- TestLogEntry,
- perform_test,
- Communication,
- CONFIGS,
-)
-
DEFAULT_MSG_SIZES = [2**4, 2**20]
DEFAULT_N_CLIENTS = [1, 16]
-DEFAULT_COMMS = [c for c in Communication]
+DEFAULT_COMMS = ["local", "shm", "tcp", "shm_spread", "tcp_spread"]
+DEFAULT_CONFIGS = ["fanin", "fanout", "relay"]
# --- Output Suppression Context Manager ---
@@ -98,7 +91,23 @@ def get_datestamp() -> str:
return datetime.now().strftime("%Y%m%d_%H%M%S")
-def perf_run(
+def output_paths_for_name(name: str) -> tuple[Path, Path]:
+ return Path(f"perf_{name}.txt"), Path(f"report_{name}.html")
+
+
+def warmup(*args, **kwargs):
+ from .util import warmup as _warmup
+
+ return _warmup(*args, **kwargs)
+
+
+def perform_test(**kwargs):
+ from .impl import perform_test as _perform_test
+
+ return _perform_test(**kwargs)
+
+
+def benchmark(
max_duration: float,
num_msgs: int,
num_buffers: int,
@@ -110,17 +119,24 @@ def perf_run(
configs: typing.Iterable[str] | None,
grid: bool,
warmup_dur: float,
-) -> None:
+ name: str | None = None,
+ open_browser: bool = True,
+) -> tuple[Path, Path | None]:
+ from ..messagecodec import MessageEncoder
+ from .envinfo import TestEnvironmentInfo
+ from .impl import Communication, CONFIGS, TestLogEntry, TestParameters
+
if n_clients is None:
n_clients = DEFAULT_N_CLIENTS
if any(c < 0 for c in n_clients):
ez.logger.error("All tests must have >=0 clients")
- return
+ raise ValueError("All tests must have >=0 clients")
if msg_sizes is None:
msg_sizes = DEFAULT_MSG_SIZES
if any(s < 0 for s in msg_sizes):
ez.logger.error("All msg_sizes must be >=0 bytes")
+ raise ValueError("All msg_sizes must be >=0 bytes")
if not grid and len(list(n_clients)) != len(list(msg_sizes)):
ez.logger.warning(
@@ -136,7 +152,7 @@ def perf_run(
ez.logger.error(
f"Invalid test communications requested. Valid communications: {', '.join([c.value for c in Communication])}"
)
- return
+ raise ValueError("Invalid test communications requested")
try:
configurators = (
@@ -146,7 +162,7 @@ def perf_run(
ez.logger.error(
f"Invalid test configuration requested. Valid configurations: {', '.join([c for c in CONFIGS])}"
)
- return
+ raise ValueError("Invalid test configuration requested")
subitr = itertools.product if grid else zip
@@ -177,12 +193,17 @@ def perf_run(
quitting = False
start_time = time.time()
+ if name is not None:
+ output_path, html_out = output_paths_for_name(name)
+ else:
+ output_path = Path(f"perf_{get_datestamp()}.txt")
+ html_out = None
try:
ez.logger.info(f"Warming up for {warmup_dur} seconds...")
warmup(warmup_dur)
- with open(f"perf_{get_datestamp()}.txt", "w") as out_f:
+ with open(output_path, "w") as out_f:
for _ in range(repeats):
out_f.write(
json.dumps(TestEnvironmentInfo(), cls=MessageEncoder) + "\n"
@@ -235,9 +256,25 @@ def perf_run(
)
ez.logger.info(f"Tests concluded. Wallclock Runtime: {dur_str}s")
+ html_path = None
+ try:
+ from .analysis import write_html_report
+
+ html_path = write_html_report(
+ perf_path=output_path,
+ output_path=html_out,
+ open_browser=open_browser,
+ )
+ ez.logger.info(f"Wrote benchmark log to {output_path}")
+ ez.logger.info(f"Wrote benchmark report to {html_path}")
+ except ImportError:
+ ez.logger.warning("Could not generate benchmark HTML report; analysis dependencies are unavailable.")
+
+ return output_path, html_path
-def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None:
- p_run = subparsers.add_parser("run", help="run performance test")
+
+def setup_benchmark_cmdline(subparsers: argparse._SubParsersAction) -> None:
+ p_run = subparsers.add_parser("benchmark", help="run the legacy benchmark matrix")
p_run.add_argument(
"--max-duration",
@@ -310,7 +347,7 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None:
type=str,
default=None,
nargs="*",
- help=f"communication strategies to test (default = {[c.value for c in DEFAULT_COMMS]})",
+ help=f"communication strategies to test (default = {DEFAULT_COMMS})",
)
p_run.add_argument(
@@ -318,7 +355,7 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None:
type=str,
default=None,
nargs="*",
- help=f"configurations to test (default = {[c for c in CONFIGS]})",
+ help=f"configurations to test (default = {DEFAULT_CONFIGS})",
)
p_run.add_argument(
@@ -328,8 +365,21 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None:
help="warmup CPU with busy task for some number of seconds (default = 60.0)",
)
+ p_run.add_argument(
+ "--name",
+ type=str,
+ default=None,
+ help="optional short name used for perf_
.txt and report_.html",
+ )
+
+ p_run.add_argument(
+ "--no-browser",
+ action="store_true",
+ help="write the generated HTML report without opening it in a browser",
+ )
+
p_run.set_defaults(
- _handler=lambda ns: perf_run(
+ _handler=lambda ns: benchmark(
max_duration=ns.max_duration,
num_msgs=ns.num_msgs,
num_buffers=ns.num_buffers,
@@ -341,5 +391,7 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None:
configs=ns.configs,
grid=True,
warmup_dur=ns.warmup,
+ name=ns.name,
+ open_browser=not ns.no_browser,
)
)
diff --git a/tests/test_command.py b/tests/test_command.py
index 6a737253..b74df4bd 100644
--- a/tests/test_command.py
+++ b/tests/test_command.py
@@ -1,4 +1,5 @@
import pytest
+from pathlib import Path
from ezmsg.core.command import build_parser
@@ -31,20 +32,79 @@ def test_perf_subparser_accepts_nested_perf_args():
args = parser.parse_args(
[
"perf",
- "hotpath",
- "--count",
- "10",
- "--samples",
+ "benchmark",
+ "--name",
+ "smoke",
+ "--num-msgs",
"2",
- "--quiet",
+ "--repeats",
+ "1",
+ "--no-browser",
]
)
assert args.command == "perf"
- assert args.perf_command == "hotpath"
- assert args.count == 10
- assert args.samples == 2
- assert args.quiet is True
+ assert args.perf_command == "benchmark"
+ assert args.name == "smoke"
+ assert args.num_msgs == 2
+ assert args.repeats == 1
+ assert args.no_browser is True
+
+
+def test_perf_compare_subparser_accepts_baseline_args():
+ parser = build_parser()
+
+ args = parser.parse_args(
+ [
+ "perf",
+ "compare",
+ "candidate.txt",
+ "--baseline",
+ "baseline.txt",
+ "--output",
+ "diff.html",
+ "--no-browser",
+ ]
+ )
+
+ assert args.command == "perf"
+ assert args.perf_command == "compare"
+ assert str(args.perf) == "candidate.txt"
+ assert str(args.baseline) == "baseline.txt"
+ assert str(args.output) == "diff.html"
+ assert args.no_browser is True
+
+
+def test_perf_ab_subparser_accepts_manual_env_args():
+ parser = build_parser()
+
+ args = parser.parse_args(
+ [
+ "perf",
+ "ab",
+ "--dir-a",
+ "/tmp/a",
+ "--dir-b",
+ "/tmp/b",
+ "--env-mode",
+ "existing",
+ "--env",
+ "FOO=bar",
+ "--env-a",
+ "ONLY_A=1",
+ "--python-b",
+ "/tmp/b/.venv/bin/python",
+ ]
+ )
+
+ assert args.command == "perf"
+ assert args.perf_command == "ab"
+ assert args.dir_a == Path("/tmp/a")
+ assert args.dir_b == Path("/tmp/b")
+ assert args.env_mode == "existing"
+ assert args.env == ["FOO=bar"]
+ assert args.env_a == ["ONLY_A=1"]
+ assert args.python_b == "/tmp/b/.venv/bin/python"
def test_graphviz_subparser_rejects_mermaid_only_args():
@@ -65,4 +125,4 @@ def test_perf_subparser_rejects_core_only_args():
parser = build_parser()
with pytest.raises(SystemExit):
- parser.parse_args(["perf", "hotpath", "--address", "127.0.0.1:4000"])
+ parser.parse_args(["perf", "benchmark", "--address", "127.0.0.1:4000"])
diff --git a/tests/test_perf_ab.py b/tests/test_perf_ab.py
index 3fbb935c..b6b0131f 100644
--- a/tests/test_perf_ab.py
+++ b/tests/test_perf_ab.py
@@ -1,6 +1,8 @@
from ezmsg.util.perf.ab import (
+ ABEnvironmentInfo,
build_hotpath_command,
build_pair_order,
+ parse_env_assignments,
summarize_ab_results,
)
@@ -17,6 +19,7 @@ def test_build_pair_order_is_balanced_and_reproducible():
def test_build_hotpath_command_contains_expected_args(tmp_path):
cmd = build_hotpath_command(
+ "/tmp/shared-python",
tmp_path / "out.json",
count=100,
warmup=10,
@@ -27,12 +30,19 @@ def test_build_hotpath_command_contains_expected_args(tmp_path):
quiet=True,
)
- assert cmd[:5] == ["uv", "run", "python", "-m", "ezmsg.util.perf.hotpath"]
+ assert cmd[:3] == ["/tmp/shared-python", "-m", "ezmsg.util.perf.hotpath"]
assert "--count" in cmd
assert "--payload-sizes" in cmd
assert "--quiet" in cmd
+def test_parse_env_assignments_merges_repeatable_values():
+ assert parse_env_assignments(["FOO=bar", "BAZ=qux", "FOO=override"]) == {
+ "FOO": "override",
+ "BAZ": "qux",
+ }
+
+
def test_summarize_ab_results_uses_b_vs_a_delta():
paired_runs = [
(
@@ -45,15 +55,54 @@ def test_summarize_ab_results_uses_b_vs_a_delta():
),
]
+ env_a = ABEnvironmentInfo(
+ label="A",
+ ref="dev",
+ tree="/tmp/a",
+ python="/tmp/a/.venv/bin/python",
+ python_version="3.11.0",
+ ezmsg_version="1.0.0",
+ numpy_version="2.0.0",
+ git_commit="abc",
+ git_branch="dev",
+ dirty=False,
+ env_mode="shared",
+ pyproject_hash="123",
+ uv_lock_hash="456",
+ env_overrides={"FOO": "bar"},
+ )
+ env_b = ABEnvironmentInfo(
+ label="B",
+ ref="CURRENT",
+ tree="/tmp/b",
+ python="/tmp/b/.venv/bin/python",
+ python_version="3.11.0",
+ ezmsg_version="1.0.0",
+ numpy_version="2.0.0",
+ git_commit="def",
+ git_branch="main",
+ dirty=True,
+ env_mode="shared",
+ pyproject_hash="123",
+ uv_lock_hash="789",
+ env_overrides={"FOO": "baz"},
+ )
+
summary = summarize_ab_results(
ref_a="dev",
ref_b="CURRENT",
rounds=2,
seed=0,
paired_runs=paired_runs,
+ env_a=env_a,
+ env_b=env_b,
+ warnings=["uv.lock differs"],
)
assert len(summary.cases) == 1
+ assert summary.env_a == env_a
+ assert summary.env_b == env_b
+ assert summary.warnings == ["uv.lock differs"]
case = summary.cases[0]
assert case.case_id == "async/shm/payload=64/buffers=1"
assert case.a_us_per_message_median == 9.0
diff --git a/tests/test_perf_analysis.py b/tests/test_perf_analysis.py
new file mode 100644
index 00000000..12c00dc2
--- /dev/null
+++ b/tests/test_perf_analysis.py
@@ -0,0 +1,218 @@
+import json
+from pathlib import Path
+
+from ezmsg.util.messagecodec import MessageEncoder
+from ezmsg.util.perf.analysis import (
+ build_report_bundle,
+ default_compare_html_path,
+ default_report_html_path,
+ write_html_report,
+)
+from ezmsg.util.perf.envinfo import TestEnvironmentInfo as PerfEnvironmentInfo
+from ezmsg.util.perf.impl import (
+ Metrics as PerfMetrics,
+ TestLogEntry as PerfLogEntry,
+ TestParameters as PerfParameters,
+)
+from ezmsg.util.perf.run import benchmark, output_paths_for_name
+
+
+def _write_perf_log(
+ path: Path, info: PerfEnvironmentInfo, entries: list[PerfLogEntry]
+) -> None:
+ with open(path, "w") as handle:
+ handle.write(json.dumps(info, cls=MessageEncoder) + "\n")
+ for entry in entries:
+ handle.write(json.dumps(entry, cls=MessageEncoder) + "\n")
+
+
+def _entry(
+ *,
+ config: str,
+ comms: str,
+ n_clients: int,
+ msg_size: int,
+ sample_rate: float,
+ latency: float,
+ data_rate: float,
+) -> PerfLogEntry:
+ return PerfLogEntry(
+ params=PerfParameters(
+ msg_size=msg_size,
+ num_msgs=128,
+ n_clients=n_clients,
+ config=config,
+ comms=comms,
+ max_duration=1.0,
+ num_buffers=1,
+ ),
+ results=PerfMetrics(
+ num_msgs=128,
+ sample_rate_mean=sample_rate,
+ sample_rate_median=sample_rate * 0.98,
+ latency_mean=latency,
+ latency_median=latency * 0.97,
+ latency_total=latency * 128,
+ data_rate=data_rate,
+ ),
+ )
+
+
+def test_report_and_compare_html_paths(tmp_path):
+ perf = tmp_path / "perf_20260406_120000.txt"
+ baseline = tmp_path / "perf_20260405_120000.txt"
+
+ assert default_report_html_path(perf) == tmp_path / "perf_20260406_120000.html"
+ assert default_compare_html_path(perf, baseline) == (
+ tmp_path / "perf_20260406_120000.vs_perf_20260405_120000.html"
+ )
+ assert output_paths_for_name("smoke") == (
+ Path("perf_smoke.txt"),
+ Path("report_smoke.html"),
+ )
+
+
+def test_build_report_bundle_and_html_report(tmp_path):
+ candidate = tmp_path / "candidate.txt"
+ baseline = tmp_path / "baseline.txt"
+
+ candidate_info = PerfEnvironmentInfo(git_branch="candidate", git_commit="abc123")
+ baseline_info = PerfEnvironmentInfo(git_branch="baseline", git_commit="def456")
+
+ _write_perf_log(
+ candidate,
+ candidate_info,
+ [
+ _entry(
+ config="fanin",
+ comms="local",
+ n_clients=1,
+ msg_size=64,
+ sample_rate=1200.0,
+ latency=0.0009,
+ data_rate=2_400_000.0,
+ ),
+ _entry(
+ config="relay",
+ comms="tcp",
+ n_clients=2,
+ msg_size=256,
+ sample_rate=700.0,
+ latency=0.0024,
+ data_rate=1_200_000.0,
+ ),
+ ],
+ )
+ _write_perf_log(
+ baseline,
+ baseline_info,
+ [
+ _entry(
+ config="fanin",
+ comms="local",
+ n_clients=1,
+ msg_size=64,
+ sample_rate=1000.0,
+ latency=0.0012,
+ data_rate=2_000_000.0,
+ ),
+ _entry(
+ config="relay",
+ comms="tcp",
+ n_clients=2,
+ msg_size=256,
+ sample_rate=900.0,
+ latency=0.0018,
+ data_rate=1_700_000.0,
+ ),
+ ],
+ )
+
+ bundle = build_report_bundle(candidate, baseline_path=baseline)
+ assert bundle.relative is True
+ assert bundle.delta_counts["improvement"] > 0
+ assert bundle.delta_counts["regression"] > 0
+ assert bundle.top_improvements
+ assert bundle.top_regressions
+
+ out_path = write_html_report(candidate, baseline_path=baseline, open_browser=False)
+ html = out_path.read_text(encoding="utf-8")
+
+ assert "Comparison Overview" in html
+ assert "Biggest Regressions" in html
+ assert "data-filter='config'" in html
+ assert "display-mode" in html
+ assert "metric-view" in html
+ assert "candidate.vs_baseline.html" == out_path.name
+
+
+def test_benchmark_writes_raw_output_and_html_report(tmp_path, monkeypatch):
+ output_path = tmp_path / "perf_smoke.txt"
+ html_path = tmp_path / "report_smoke.html"
+ monkeypatch.chdir(tmp_path)
+
+ class FakeGraphServer:
+ def __init__(self):
+ self.address = ("127.0.0.1", 0)
+
+ def start(self):
+ return None
+
+ def stop(self):
+ return None
+
+ html_calls: list[tuple[Path, Path | None, bool]] = []
+
+ monkeypatch.setattr("ezmsg.util.perf.run.GraphServer", FakeGraphServer)
+ monkeypatch.setattr("ezmsg.util.perf.run.warmup", lambda _: None)
+ monkeypatch.setattr(
+ "ezmsg.util.perf.run.perform_test",
+ lambda **_: PerfMetrics(
+ num_msgs=8,
+ sample_rate_mean=1000.0,
+ sample_rate_median=950.0,
+ latency_mean=0.001,
+ latency_median=0.0009,
+ latency_total=0.008,
+ data_rate=1_000_000.0,
+ ),
+ )
+
+ def _fake_write_html_report(
+ perf_path: Path,
+ baseline_path: Path | None = None,
+ output_path: Path | None = None,
+ open_browser: bool = False,
+ ) -> Path:
+ html_calls.append((perf_path, output_path, open_browser))
+ target = output_path or perf_path.with_suffix(".html")
+ target.write_text("", encoding="utf-8")
+ return target
+
+ monkeypatch.setattr(
+ "ezmsg.util.perf.analysis.write_html_report",
+ _fake_write_html_report,
+ )
+
+ raw_path, report_path = benchmark(
+ max_duration=0.01,
+ num_msgs=8,
+ num_buffers=1,
+ iters=1,
+ repeats=1,
+ msg_sizes=[64],
+ n_clients=[1],
+ comms=["local"],
+ configs=["fanin"],
+ grid=True,
+ warmup_dur=0.0,
+ name="smoke",
+ open_browser=False,
+ )
+
+ assert raw_path.name == output_path.name
+ assert report_path is not None
+ assert report_path.name == html_path.name
+ assert output_path.exists()
+ assert html_path.exists()
+ assert html_calls == [(raw_path, report_path, False)]