From 75b65ebb40b98ebe480fe507b30cb9a59fc0bf7b Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 6 Apr 2026 15:34:51 -0400 Subject: [PATCH 1/4] refactored perf commandline and outputs --- src/ezmsg/core/command.py | 4 +- src/ezmsg/util/perf/analysis.py | 1244 +++++++++++++++++++++++-------- src/ezmsg/util/perf/command.py | 9 +- src/ezmsg/util/perf/run.py | 62 +- tests/test_command.py | 47 +- tests/test_perf_analysis.py | 218 ++++++ 6 files changed, 1236 insertions(+), 348 deletions(-) create mode 100644 tests/test_perf_analysis.py diff --git a/src/ezmsg/core/command.py b/src/ezmsg/core/command.py index 61e76f97..f9c753d3 100644 --- a/src/ezmsg/core/command.py +++ b/src/ezmsg/core/command.py @@ -2,8 +2,6 @@ import asyncio import inspect -from ezmsg.util.perf.command import setup_perf_cmdline - from .commands import setup_core_cmdline from .commands.graphviz import handle_graphviz from .commands.mermaid import handle_mermaid, mermaid_url as mm @@ -38,6 +36,8 @@ def build_parser() -> argparse.ArgumentParser: subparsers = parser.add_subparsers(dest="command", required=True, help="command for ezmsg") setup_core_cmdline(subparsers) + from ezmsg.util.perf.command import setup_perf_cmdline + setup_perf_cmdline(subparsers) return parser diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index 590e681f..ea3db477 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -1,7 +1,7 @@ -import json -import dataclasses import argparse +import dataclasses import html +import json import math import webbrowser @@ -9,12 +9,7 @@ from ..messagecodec import MessageDecoder from .envinfo import TestEnvironmentInfo, format_env_diff -from .run import get_datestamp -from .impl import ( - TestParameters, - Metrics, - TestLogEntry, -) +from .impl import Metrics, TestLogEntry, TestParameters import ezmsg.core as ez @@ -46,7 +41,7 @@ - shm_spread / tcp_spread: each client in its own process; comms via SHM / TCP respectively Variables: -- n_clients: pubs (fanin), subs (fanout), or relays (relay) +- n_clients: pubs (fanin), subs (fanout), or relays (relay) - msg_size: nominal message size (bytes) Metrics: @@ -55,10 +50,69 @@ - latency_mean: average send -> receive latency in seconds (lower = better) """ +KEY_COLUMNS = ["config", "comms", "n_clients", "msg_size"] +DISPLAY_METRICS = [ + "sample_rate_mean", + "sample_rate_median", + "data_rate", + "latency_mean", + "latency_median", +] +METRIC_LABELS = { + "sample_rate_mean": "sample_rate_mean", + "sample_rate_median": "sample_rate_median", + "data_rate": "data_rate", + "latency_mean": "latency_mean", + "latency_median": "latency_median", +} +METRIC_GROUPS = { + "sample_rate_mean": "throughput", + "sample_rate_median": "throughput", + "data_rate": "throughput", + "latency_mean": "latency", + "latency_median": "latency", +} +ABSOLUTE_UNITS = { + "sample_rate_mean": "msgs/s", + "sample_rate_median": "msgs/s", + "data_rate": "MB/s", + "latency_mean": "us", + "latency_median": "us", +} +RELATIVE_UNITS = {metric: "%" for metric in DISPLAY_METRICS} +NOISE_BAND_PCT = 10.0 + + +@dataclasses.dataclass +class MetricDelta: + metric: str + config: str + comms: str + n_clients: int + msg_size: int + value: float + score: float + + +@dataclasses.dataclass +class ReportBundle: + perf_path: Path + baseline_path: Path | None + info: TestEnvironmentInfo + baseline_info: TestEnvironmentInfo | None + env_diff: str | None + relative: bool + terminal_df: pd.DataFrame + candidate_df: pd.DataFrame + baseline_df: pd.DataFrame | None + relative_df: pd.DataFrame | None + delta_counts: dict[str, int] + top_improvements: list[MetricDelta] + top_regressions: list[MetricDelta] + def load_perf(perf: Path) -> xr.Dataset: all_results: dict[TestParameters, dict[int, list[Metrics]]] = dict() - run_idx = 0 with open(perf, "r") as perf_f: @@ -74,10 +128,10 @@ def load_perf(perf: Path) -> xr.Dataset: runs[run_idx] = metrics all_results[obj.params] = runs - n_clients_axis = list(sorted(set([p.n_clients for p in all_results.keys()]))) - msg_size_axis = list(sorted(set([p.msg_size for p in all_results.keys()]))) - comms_axis = list(sorted(set([p.comms for p in all_results.keys()]))) - config_axis = list(sorted(set([p.config for p in all_results.keys()]))) + n_clients_axis = list(sorted({p.n_clients for p in all_results})) + msg_size_axis = list(sorted({p.msg_size for p in all_results})) + comms_axis = list(sorted({p.comms for p in all_results})) + config_axis = list(sorted({p.config for p in all_results})) dims = ["n_clients", "msg_size", "comms", "config"] coords = { @@ -89,7 +143,7 @@ def load_perf(perf: Path) -> xr.Dataset: data_vars = {} for field in dataclasses.fields(Metrics): - m = ( + metric_values = ( np.zeros( ( len(n_clients_axis), @@ -100,400 +154,944 @@ def load_perf(perf: Path) -> xr.Dataset: ) * np.nan ) - for p, a in all_results.items(): - # tests are run multiple times; get the median of means - m[ - n_clients_axis.index(p.n_clients), - msg_size_axis.index(p.msg_size), - comms_axis.index(p.comms), - config_axis.index(p.config), + for params, runs in all_results.items(): + metric_values[ + n_clients_axis.index(params.n_clients), + msg_size_axis.index(params.msg_size), + comms_axis.index(params.comms), + config_axis.index(params.config), ] = np.median( - [np.mean([getattr(v, field.name) for v in r]) for r in a.values()] + [np.mean([getattr(v, field.name) for v in run]) for run in runs.values()] ) - data_vars[field.name] = xr.DataArray(m, dims=dims, coords=coords) + data_vars[field.name] = xr.DataArray(metric_values, dims=dims, coords=coords) - dataset = xr.Dataset(data_vars, attrs=dict(info=info)) - return dataset + return xr.Dataset(data_vars, attrs=dict(info=info)) -def _escape(s: str) -> str: - return html.escape(str(s), quote=True) +def default_report_html_path(perf_path: Path) -> Path: + return perf_path.with_suffix(".html") -def _env_block(title: str, body: str) -> str: - return f""" -
-

{_escape(title)}

-
{_escape(body).strip()}
-
- """ +def default_compare_html_path(perf_path: Path, baseline_path: Path) -> Path: + return perf_path.with_name(f"{perf_path.stem}.vs_{baseline_path.stem}.html") -def _legend_block() -> str: - return """ -
-

Legend

- -
- """ +def _escape(value: object) -> str: + return html.escape(str(value), quote=True) + + +def _frame_from_dataset(dataset: xr.Dataset) -> pd.DataFrame: + frame = dataset.to_dataframe().dropna(how="all") + frame = frame.reset_index() + frame = frame.dropna(subset=DISPLAY_METRICS, how="all") + return frame.sort_values(KEY_COLUMNS).reset_index(drop=True) + + +def _display_frame(frame: pd.DataFrame, relative: bool) -> pd.DataFrame: + out = frame.copy() + out = out[KEY_COLUMNS + DISPLAY_METRICS] + if not relative: + out["data_rate"] = out["data_rate"] / 2**20 + out["latency_mean"] = out["latency_mean"] * 1e6 + out["latency_median"] = out["latency_median"] * 1e6 + return out + + +def _metric_score(metric: str, value: float) -> float: + if not (isinstance(value, (int, float)) and math.isfinite(value)): + return 0.0 + if "latency" in metric: + return 100.0 - value + return value - 100.0 + + +def _classify_metric(metric: str, value: float, noise_band_pct: float = NOISE_BAND_PCT) -> str: + score = _metric_score(metric, value) + if abs(score) <= noise_band_pct: + return "neutral" + return "improvement" if score > 0 else "regression" + + +def _collect_metric_deltas( + relative_df: pd.DataFrame | None, noise_band_pct: float = NOISE_BAND_PCT +) -> tuple[dict[str, int], list[MetricDelta], list[MetricDelta]]: + if relative_df is None: + return {"improvement": 0, "neutral": 0, "regression": 0}, [], [] + + counts = {"improvement": 0, "neutral": 0, "regression": 0} + improvements: list[MetricDelta] = [] + regressions: list[MetricDelta] = [] + + for _, row in relative_df.iterrows(): + for metric in DISPLAY_METRICS: + value = float(row[metric]) + classification = _classify_metric(metric, value, noise_band_pct=noise_band_pct) + counts[classification] += 1 + score = _metric_score(metric, value) + if classification == "improvement": + improvements.append( + MetricDelta( + metric=metric, + config=str(row["config"]), + comms=str(row["comms"]), + n_clients=int(row["n_clients"]), + msg_size=int(row["msg_size"]), + value=value, + score=score, + ) + ) + elif classification == "regression": + regressions.append( + MetricDelta( + metric=metric, + config=str(row["config"]), + comms=str(row["comms"]), + n_clients=int(row["n_clients"]), + msg_size=int(row["msg_size"]), + value=value, + score=score, + ) + ) + + improvements.sort(key=lambda item: item.score, reverse=True) + regressions.sort(key=lambda item: item.score) + return counts, improvements, regressions + + +def _format_terminal_value(metric: str, value: float, relative: bool) -> str: + if pd.isna(value): + return "nan" + if relative: + return f"{float(value):.1f}%" + if metric in {"latency_mean", "latency_median"}: + return f"{float(value):,.3f}" + if metric == "data_rate": + return f"{float(value):,.3f}" + return f"{float(value):,.2f}" + + +def _terminal_table(frame: pd.DataFrame, relative: bool) -> str: + formatted = frame.copy() + for metric in DISPLAY_METRICS: + formatted[metric] = formatted[metric].map( + lambda value, metric=metric: _format_terminal_value(metric, value, relative) + ) + + sections: list[str] = [] + for (config, comms), group in formatted.groupby(["config", "comms"], sort=False): + sections.append(f"{config} / {comms}") + sections.append( + group[["n_clients", "msg_size", *DISPLAY_METRICS]].to_string(index=False) + ) + sections.append("") + return "\n".join(sections).strip() + + +def _terminal_delta_summary( + counts: dict[str, int], + improvements: list[MetricDelta], + regressions: list[MetricDelta], + limit: int = 5, +) -> str: + lines = [ + "COMPARISON OVERVIEW", + ( + f" improvements: {counts['improvement']}, neutral: {counts['neutral']}, " + f"regressions: {counts['regression']}" + ), + "", + ] + + if regressions: + lines.append("BIGGEST REGRESSIONS") + for delta in regressions[:limit]: + lines.append( + " " + f"{delta.metric}: {delta.value:.1f}% " + f"({delta.config}/{delta.comms}, n_clients={delta.n_clients}, msg_size={delta.msg_size})" + ) + lines.append("") + + if improvements: + lines.append("BIGGEST IMPROVEMENTS") + for delta in improvements[:limit]: + lines.append( + " " + f"{delta.metric}: {delta.value:.1f}% " + f"({delta.config}/{delta.comms}, n_clients={delta.n_clients}, msg_size={delta.msg_size})" + ) + lines.append("") + + return "\n".join(lines).strip() + + +def build_report_bundle(perf_path: Path, baseline_path: Path | None = None) -> ReportBundle: + candidate = load_perf(perf_path) + info: TestEnvironmentInfo = candidate.attrs["info"] + candidate_frame = _display_frame(_frame_from_dataset(candidate), relative=False) + + baseline_info: TestEnvironmentInfo | None = None + env_diff: str | None = None + relative = baseline_path is not None + baseline_frame: pd.DataFrame | None = None + relative_frame: pd.DataFrame | None = None + + if baseline_path is not None: + baseline = load_perf(baseline_path) + baseline_info = baseline.attrs["info"] + env_diff = format_env_diff(info.diff(baseline_info)) + baseline_frame = _display_frame(_frame_from_dataset(baseline), relative=False) + relative_dataset = (candidate / baseline) * 100.0 + relative_dataset = relative_dataset.drop_vars(["latency_total", "num_msgs"]) + relative_frame = _display_frame(_frame_from_dataset(relative_dataset), relative=True) + terminal_df = relative_frame + else: + terminal_df = candidate_frame + + delta_counts, top_improvements, top_regressions = _collect_metric_deltas(relative_frame) + + return ReportBundle( + perf_path=perf_path, + baseline_path=baseline_path, + info=info, + baseline_info=baseline_info, + env_diff=env_diff, + relative=relative, + terminal_df=terminal_df, + candidate_df=candidate_frame, + baseline_df=baseline_frame, + relative_df=relative_frame, + delta_counts=delta_counts, + top_improvements=top_improvements, + top_regressions=top_regressions, + ) + + +def _build_terminal_output(bundle: ReportBundle) -> str: + lines = [str(bundle.info), ""] + + if bundle.relative: + lines.extend( + [ + "PERFORMANCE COMPARISON", + "", + bundle.env_diff or "No differences.", + "", + _terminal_delta_summary( + bundle.delta_counts, bundle.top_improvements, bundle.top_regressions + ), + "", + ] + ) + else: + lines.extend(["PERFORMANCE REPORT", ""]) + + lines.extend([_terminal_table(bundle.terminal_df, relative=bundle.relative), ""]) + return "\n".join(line for line in lines if line is not None).strip() + + +def _format_html_number(metric: str, value: float, mode: str) -> str: + if not (isinstance(value, (int, float)) and math.isfinite(value)): + return "n/a" + if mode == "relative": + return f"{value:.1f}%" + if metric in {"latency_mean", "latency_median"}: + return f"{value:,.3f}" + if metric == "data_rate": + return f"{value:,.3f}" + return f"{value:,.2f}" + + +def _color_for_comparison( + value: float, metric: str, noise_band_pct: float = NOISE_BAND_PCT +) -> str: + if not (isinstance(value, (int, float)) and math.isfinite(value)): + return "" + score = _metric_score(metric, value) + magnitude = abs(score) + if magnitude <= noise_band_pct: + return "" + + scale = max(0.0, min(1.0, (magnitude - noise_band_pct) / 45.0)) + hue = "var(--green)" if score > 0 else "var(--red)" + alpha = 0.15 + 0.35 * scale + return f"background-color: hsla({hue}, 70%, 45%, {alpha});" def _base_css() -> str: - # Minimal, print-friendly CSS + color scales for cells. return """ """ -def _color_for_comparison( - value: float, metric: str, noise_band_pct: float = 10.0 -) -> str: - """ - Returns inline CSS background for a comparison % value. - value: e.g., 97.3, 104.8, etc. - For sample_rate/data_rate: improvement > 100 (good). - For latency_mean: improvement < 100 (good). - Noise band ±10% around 100 is neutral. - """ - if not (isinstance(value, (int, float)) and math.isfinite(value)): - return "" +def _render_delta_list(title: str, deltas: list[MetricDelta], empty: str) -> str: + if not deltas: + return f"

{_escape(title)}

{_escape(empty)}
" - delta = value - 100.0 - # Determine direction: + is good for sample/data; - is good for latency - if "rate" in metric: - # positive delta good, negative bad - magnitude = abs(delta) - sign_good = delta > 0 - elif "latency" in metric: - # negative delta good (lower latency) - magnitude = abs(delta) - sign_good = delta < 0 - else: - return "" + items = [] + for delta in deltas[:5]: + items.append( + "
  • " + f"{_escape(delta.metric)}: {_escape(f'{delta.value:.1f}%')} " + f"({_escape(delta.config)}/{_escape(delta.comms)}, " + f"n_clients={delta.n_clients}, msg_size={delta.msg_size})" + "
  • " + ) + return ( + f"

    {_escape(title)}

    " + f"
      {''.join(items)}
    " + ) - # Noise band: keep neutral - if magnitude <= noise_band_pct: - return "" - # Scale 5%..50% across 0..1; clamp - scale = max(0.0, min(1.0, (magnitude - noise_band_pct) / 45.0)) +def _build_rows(bundle: ReportBundle) -> list[dict[str, object]]: + rows: list[dict[str, object]] = [] + baseline_lookup: dict[tuple[str, str, int, int], dict[str, object]] = {} + relative_lookup: dict[tuple[str, str, int, int], dict[str, object]] = {} - # Choose hue and lightness; use HSL with gentle saturation - hue = "var(--green)" if sign_good else "var(--red)" - # opacity via alpha blend on lightness via HSLa - # Use saturation ~70%, lightness around 40–50% blended with table bg - alpha = 0.15 + 0.35 * scale # 0.15..0.50 - return f"background-color: hsla({hue}, 70%, 45%, {alpha});" + if bundle.baseline_df is not None: + for row in bundle.baseline_df.to_dict("records"): + key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"])) + baseline_lookup[key] = row + if bundle.relative_df is not None: + for row in bundle.relative_df.to_dict("records"): + key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"])) + relative_lookup[key] = row -def _format_number(x) -> str: - if isinstance(x, (int,)) and not isinstance(x, bool): - return f"{x:d}" - try: - xf = float(x) - except Exception: - return _escape(str(x)) - # Heuristic: for comparison percentages, 1 decimal is nice; for absolute, 3 decimals for latency. - return f"{xf:.3f}" + for row in bundle.candidate_df.to_dict("records"): + key = (str(row["config"]), str(row["comms"]), int(row["n_clients"]), int(row["msg_size"])) + baseline_row = baseline_lookup.get(key) + relative_row = relative_lookup.get(key) + severity = 0.0 + if relative_row is not None: + severity = max(abs(_metric_score(metric, float(relative_row[metric]))) for metric in DISPLAY_METRICS) + rows.append( + { + "config": key[0], + "comms": key[1], + "n_clients": key[2], + "msg_size": key[3], + "candidate": row, + "baseline": baseline_row, + "relative": relative_row, + "severity": severity, + } + ) + return rows -def summary(perf_path: Path, baseline_path: Path | None, html: bool = False) -> None: - """print perf test results and comparisons to the console""" +def _options(values: list[object], label: str) -> str: + options = [""] + for value in values: + options.append(f"") + return ( + f"" + ) - output = "" - perf = load_perf(perf_path) - info: TestEnvironmentInfo = perf.attrs["info"] - output += str(info) + "\n\n" +def _report_title(bundle: ReportBundle) -> str: + return "ezmsg Performance Comparison" if bundle.relative else "ezmsg Performance Report" - relative = False - env_diff = None - if baseline_path is not None: - relative = True - output += "PERFORMANCE COMPARISON\n\n" - baseline = load_perf(baseline_path) - perf = (perf / baseline) * 100.0 - baseline_info: TestEnvironmentInfo = baseline.attrs["info"] - env_diff = format_env_diff(info.diff(baseline_info)) - output += env_diff + "\n\n" - - # These raw stats are still valuable to have, but are confusing - # when making relative comparisons - perf = perf.drop_vars(["latency_total", "num_msgs"]) - - perf = perf.stack(params=["n_clients", "msg_size"]).dropna("params") - df = perf.squeeze().to_dataframe() - df = df.drop("n_clients", axis=1) - df = df.drop("msg_size", axis=1) - - for _, config_ds in perf.groupby("config"): - for _, comms_ds in config_ds.groupby("comms"): - output += str(comms_ds.squeeze().to_dataframe()) + "\n\n" - output += "\n" - - print(output) - - if html: - # Ensure expected columns exist - expected_cols = { - "sample_rate_mean", - "sample_rate_median", - "data_rate", - "latency_mean", - "latency_median", - } - missing = expected_cols - set(df.columns) - if missing: - raise ValueError(f"Missing expected columns in dataset: {missing}") - - # We'll render a table per (config, comms) group. - groups = ( - df.reset_index() - .sort_values(by=["config", "comms", "n_clients", "msg_size"]) - .groupby(["config", "comms"], sort=False) - ) - # Build HTML - parts: list[str] = [] - parts.append("") +def render_html_report(bundle: ReportBundle) -> str: + rows = _build_rows(bundle) + filters = { + "config": sorted(bundle.candidate_df["config"].unique().tolist()), + "comms": sorted(bundle.candidate_df["comms"].unique().tolist()), + "n_clients": sorted(bundle.candidate_df["n_clients"].unique().tolist()), + "msg_size": sorted(bundle.candidate_df["msg_size"].unique().tolist()), + } + mode = "relative" if bundle.relative else "candidate" + + parts: list[str] = [] + parts.append("") + parts.append( + "" + ) + parts.append(f"{_escape(_report_title(bundle))}") + parts.append(_base_css()) + parts.append("
    ") + + parts.append("
    ") + parts.append(f"

    {_escape(_report_title(bundle))}

    ") + sub = str(bundle.perf_path) + if bundle.baseline_path is not None: + sub += f" relative to {bundle.baseline_path}" + parts.append(f"
    {_escape(sub)}
    ") + parts.append("
    ") + + if bundle.relative: + parts.append("

    Comparison Overview

    ") + for label, value in [ + ("Improvements", bundle.delta_counts["improvement"]), + ("Neutral", bundle.delta_counts["neutral"]), + ("Regressions", bundle.delta_counts["regression"]), + ]: + parts.append( + "
    " + f"{_escape(label)}" + f"{_escape(value)}" + "
    " + ) + parts.append("
    ") + + parts.append("
    ") parts.append( - "" + _render_delta_list( + "Biggest Regressions", + bundle.top_regressions, + "No regressions outside the noise band.", + ) ) - parts.append("ezmsg perf report") - parts.append(_base_css()) - parts.append("
    ") - - parts.append("
    ") - parts.append("

    ezmsg Performance Report

    ") - sub = str(perf_path) - if baseline_path is not None: - sub += f" relative to {str(baseline_path)}" - parts.append(f"
    {_escape(sub)}
    ") - parts.append("
    ") - - if info is not None: - parts.append(_env_block("Test Environment", str(info))) - - parts.append(_env_block("Test Details", TEST_DESCRIPTION)) - - if env_diff is not None: - # Show diffs using your helper - parts.append("
    ") - parts.append("

    Environment Differences vs Baseline

    ") - parts.append(f"
    {_escape(env_diff)}
    ") - parts.append("
    ") - parts.append(_legend_block()) - - # Render each group - for (config, comms), g in groups: - # Keep only expected columns in order - cols = [ - "n_clients", - "msg_size", - "sample_rate_mean", - "sample_rate_median", - "data_rate", - "latency_mean", - "latency_median", - ] - g = g[cols].copy() + parts.append( + _render_delta_list( + "Biggest Improvements", + bundle.top_improvements, + "No improvements outside the noise band.", + ) + ) + parts.append("
    ") + + parts.append("
    ") + parts.append("

    Environment

    ") + parts.append(f"
    {_escape(str(bundle.info))}
    ") + parts.append("
    ") + + if bundle.env_diff is not None: + parts.append("
    ") + parts.append("

    Environment Differences vs Baseline

    ") + parts.append(f"
    {_escape(bundle.env_diff)}
    ") + parts.append("
    ") - # String format some columns (msg_size with separators) - g["msg_size"] = g["msg_size"].map( - lambda x: f"{int(x):,}" if pd.notna(x) else x + parts.append("

    Test Details

    ") + parts.append(f"
    {_escape(TEST_DESCRIPTION.strip())}
    ") + parts.append("
    ") + + parts.append("
    ") + parts.append("

    Explore

    ") + parts.append("
    ") + parts.append(_options(filters["config"], "Config")) + parts.append(_options(filters["comms"], "Comms")) + parts.append(_options(filters["n_clients"], "N_clients")) + parts.append(_options(filters["msg_size"], "Msg_size")) + parts.append( + "" + ) + parts.append("
    ") + parts.append("
    ") + for view, label in [("all", "All Metrics"), ("throughput", "Throughput"), ("latency", "Latency")]: + active = " active" if view == "all" else "" + parts.append( + f"" + ) + if bundle.relative: + parts.append("") + for display_mode, label in [ + ("relative", "Relative"), + ("candidate", "Candidate"), + ("baseline", "Baseline"), + ]: + active = " active" if display_mode == mode else "" + parts.append( + f"" ) + parts.append("
    ") + parts.append("
    ") - # Build table manually so we can inject inline cell styles easily - # (pandas Styler is great but produces bulky HTML; manual keeps it clean) - header = f""" - - - n_clients - msg_size {"" if relative else "(b)"} - sample_rate_mean {"" if relative else "(msgs/s)"} - sample_rate_median {"" if relative else "(msgs/s)"} - data_rate {"" if relative else "(MB/s)"} - latency_mean {"" if relative else "(us)"} - latency_median {"" if relative else "(us)"} - - - """ - body_rows: list[str] = [] - for _, row in g.iterrows(): - sr, srm, dr, lt, lm = ( - row["sample_rate_mean"], - row["sample_rate_median"], - row["data_rate"], - row["latency_mean"], - row["latency_median"], - ) - dr = dr if relative else dr / 2**20 - lt = lt if relative else lt * 1e6 - lm = lm if relative else lm * 1e6 - sr_style = ( - _color_for_comparison(sr, "sample_rate_mean") if relative else "" - ) - srm_style = ( - _color_for_comparison(srm, "sample_rate_median") if relative else "" - ) - dr_style = _color_for_comparison(dr, "data_rate") if relative else "" - lt_style = _color_for_comparison(lt, "latency_mean") if relative else "" - lm_style = ( - _color_for_comparison(lm, "latency_median") if relative else "" - ) + parts.append("

    Results

    ") + for column in ["config", "comms", "n_clients", "msg_size"]: + parts.append(f"") + for metric in DISPLAY_METRICS: + parts.append( + f"" + ) + parts.append("") - body_rows.append( - "" - f"" - f"" - f"" - f"" - f"" - f"" - f"" - "" - ) - table_html = f"
    {_escape(column)}
    {_format_number(row['n_clients'])}{_escape(row['msg_size'])}{_format_number(sr)}{_format_number(srm)}{_format_number(dr)}{_format_number(lt)}{_format_number(lm)}
    {header}{''.join(body_rows)}
    " + for row in rows: + parts.append( + "" + ) + for column in ["config", "comms", "n_clients", "msg_size"]: + parts.append(f"{_escape(row[column])}") + candidate = row["candidate"] + baseline = row["baseline"] + relative_row = row["relative"] + for metric in DISPLAY_METRICS: + candidate_value = float(candidate[metric]) + baseline_value = ( + float(baseline[metric]) if baseline is not None else float("nan") + ) + relative_value = ( + float(relative_row[metric]) if relative_row is not None else float("nan") + ) + style = ( + _color_for_comparison(relative_value, metric) + if bundle.relative and math.isfinite(relative_value) + else "" + ) + initial_value = ( + relative_value if bundle.relative else candidate_value + ) + initial_mode = "relative" if bundle.relative else "candidate" parts.append( - f"

    " - f"{_escape(config)}" - f"{_escape(comms)}" - f"

    {table_html}
    " + "" + f"{_escape(_format_html_number(metric, initial_value, initial_mode))}" + "" ) + parts.append("") - parts.append("
    ") - html_text = "".join(parts) + parts.append("") + parts.append( + """ + + """ + % ( + json.dumps(METRIC_LABELS), + json.dumps(ABSOLUTE_UNITS), + json.dumps(RELATIVE_UNITS), + json.dumps(mode), + ) + ) + parts.append("
    ") + return "".join(parts) + + +def write_html_report( + perf_path: Path, + baseline_path: Path | None = None, + output_path: Path | None = None, + open_browser: bool = False, +) -> Path: + bundle = build_report_bundle(perf_path, baseline_path=baseline_path) + if output_path is None: + output_path = ( + default_compare_html_path(perf_path, baseline_path) + if baseline_path is not None + else default_report_html_path(perf_path) + ) + html_text = render_html_report(bundle) + output_path.write_text(html_text, encoding="utf-8") + if open_browser: + webbrowser.open(output_path.resolve().as_uri()) + return output_path + + +def report( + perf_path: Path, + output_path: Path | None = None, + open_browser: bool = True, +) -> None: + bundle = build_report_bundle(perf_path) + print(_build_terminal_output(bundle)) + out_path = write_html_report( + perf_path=perf_path, + output_path=output_path, + open_browser=open_browser, + ) + print(f"\nHTML report: {out_path}") + + +def compare( + perf_path: Path, + baseline_path: Path, + output_path: Path | None = None, + open_browser: bool = True, +) -> None: + bundle = build_report_bundle(perf_path, baseline_path=baseline_path) + print(_build_terminal_output(bundle)) + out_path = write_html_report( + perf_path=perf_path, + baseline_path=baseline_path, + output_path=output_path, + open_browser=open_browser, + ) + print(f"\nComparison report: {out_path}") + + +def setup_report_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "report", help="render an absolute report for a benchmark file" + ) + parser.add_argument("perf", type=Path, help="benchmark file") + parser.add_argument( + "--output", type=Path, - help="perf test", + default=None, + help="optional explicit HTML output path", + ) + parser.add_argument( + "--no-browser", + action="store_true", + help="write HTML without opening it in a browser", + ) + parser.set_defaults( + _handler=lambda ns: report( + perf_path=ns.perf, + output_path=ns.output, + open_browser=not ns.no_browser, + ) + ) + + +def setup_compare_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "compare", help="compare one benchmark file against a baseline" ) - p_summary.add_argument( + parser.add_argument("perf", type=Path, help="candidate benchmark file") + parser.add_argument( "--baseline", "-b", type=Path, + required=True, + help="baseline benchmark file for comparison", + ) + parser.add_argument( + "--output", + type=Path, default=None, - help="baseline perf test for comparison", + help="optional explicit HTML output path", ) - p_summary.add_argument( - "--html", + parser.add_argument( + "--no-browser", action="store_true", - help="generate an html output file and render results in browser", + help="write HTML without opening it in a browser", ) - - p_summary.set_defaults( - _handler=lambda ns: summary( - perf_path=ns.perf, baseline_path=ns.baseline, html=ns.html + parser.set_defaults( + _handler=lambda ns: compare( + perf_path=ns.perf, + baseline_path=ns.baseline, + output_path=ns.output, + open_browser=not ns.no_browser, ) ) diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py index 31f70451..b15200a4 100644 --- a/src/ezmsg/util/perf/command.py +++ b/src/ezmsg/util/perf/command.py @@ -2,19 +2,20 @@ import sys from .ab import setup_ab_cmdline -from .analysis import setup_summary_cmdline +from .analysis import setup_compare_cmdline, setup_report_cmdline from .hotpath import setup_hotpath_cmdline -from .run import setup_run_cmdline +from .run import setup_benchmark_cmdline def setup_perf_cmdline(subparsers: argparse._SubParsersAction) -> None: parser = subparsers.add_parser("perf", help="performance test utilities") perf_subparsers = parser.add_subparsers(dest="perf_command", required=True) - setup_run_cmdline(perf_subparsers) + setup_benchmark_cmdline(perf_subparsers) + setup_report_cmdline(perf_subparsers) + setup_compare_cmdline(perf_subparsers) setup_hotpath_cmdline(perf_subparsers) setup_ab_cmdline(perf_subparsers) - setup_summary_cmdline(perf_subparsers) def build_parser() -> argparse.ArgumentParser: diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 3f794f21..1bebc86d 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -9,6 +9,7 @@ from datetime import datetime, timedelta from contextlib import contextmanager, redirect_stdout, redirect_stderr +from pathlib import Path import ezmsg.core as ez from ezmsg.core.graphserver import GraphServer @@ -98,7 +99,11 @@ def get_datestamp() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") -def perf_run( +def output_paths_for_name(name: str) -> tuple[Path, Path]: + return Path(f"perf_{name}.txt"), Path(f"report_{name}.html") + + +def benchmark( max_duration: float, num_msgs: int, num_buffers: int, @@ -110,17 +115,20 @@ def perf_run( configs: typing.Iterable[str] | None, grid: bool, warmup_dur: float, -) -> None: + name: str | None = None, + open_browser: bool = True, +) -> tuple[Path, Path | None]: if n_clients is None: n_clients = DEFAULT_N_CLIENTS if any(c < 0 for c in n_clients): ez.logger.error("All tests must have >=0 clients") - return + raise ValueError("All tests must have >=0 clients") if msg_sizes is None: msg_sizes = DEFAULT_MSG_SIZES if any(s < 0 for s in msg_sizes): ez.logger.error("All msg_sizes must be >=0 bytes") + raise ValueError("All msg_sizes must be >=0 bytes") if not grid and len(list(n_clients)) != len(list(msg_sizes)): ez.logger.warning( @@ -136,7 +144,7 @@ def perf_run( ez.logger.error( f"Invalid test communications requested. Valid communications: {', '.join([c.value for c in Communication])}" ) - return + raise ValueError("Invalid test communications requested") try: configurators = ( @@ -146,7 +154,7 @@ def perf_run( ez.logger.error( f"Invalid test configuration requested. Valid configurations: {', '.join([c for c in CONFIGS])}" ) - return + raise ValueError("Invalid test configuration requested") subitr = itertools.product if grid else zip @@ -177,12 +185,17 @@ def perf_run( quitting = False start_time = time.time() + if name is not None: + output_path, html_out = output_paths_for_name(name) + else: + output_path = Path(f"perf_{get_datestamp()}.txt") + html_out = None try: ez.logger.info(f"Warming up for {warmup_dur} seconds...") warmup(warmup_dur) - with open(f"perf_{get_datestamp()}.txt", "w") as out_f: + with open(output_path, "w") as out_f: for _ in range(repeats): out_f.write( json.dumps(TestEnvironmentInfo(), cls=MessageEncoder) + "\n" @@ -235,9 +248,25 @@ def perf_run( ) ez.logger.info(f"Tests concluded. Wallclock Runtime: {dur_str}s") + html_path = None + try: + from .analysis import write_html_report + + html_path = write_html_report( + perf_path=output_path, + output_path=html_out, + open_browser=open_browser, + ) + ez.logger.info(f"Wrote benchmark log to {output_path}") + ez.logger.info(f"Wrote benchmark report to {html_path}") + except ImportError: + ez.logger.warning("Could not generate benchmark HTML report; analysis dependencies are unavailable.") + + return output_path, html_path + -def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: - p_run = subparsers.add_parser("run", help="run performance test") +def setup_benchmark_cmdline(subparsers: argparse._SubParsersAction) -> None: + p_run = subparsers.add_parser("benchmark", help="run the legacy benchmark matrix") p_run.add_argument( "--max-duration", @@ -328,8 +357,21 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: help="warmup CPU with busy task for some number of seconds (default = 60.0)", ) + p_run.add_argument( + "--name", + type=str, + default=None, + help="optional short name used for perf_.txt and report_.html", + ) + + p_run.add_argument( + "--no-browser", + action="store_true", + help="write the generated HTML report without opening it in a browser", + ) + p_run.set_defaults( - _handler=lambda ns: perf_run( + _handler=lambda ns: benchmark( max_duration=ns.max_duration, num_msgs=ns.num_msgs, num_buffers=ns.num_buffers, @@ -341,5 +383,7 @@ def setup_run_cmdline(subparsers: argparse._SubParsersAction) -> None: configs=ns.configs, grid=True, warmup_dur=ns.warmup, + name=ns.name, + open_browser=not ns.no_browser, ) ) diff --git a/tests/test_command.py b/tests/test_command.py index 6a737253..5cf9630d 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -31,20 +31,47 @@ def test_perf_subparser_accepts_nested_perf_args(): args = parser.parse_args( [ "perf", - "hotpath", - "--count", - "10", - "--samples", + "benchmark", + "--name", + "smoke", + "--num-msgs", "2", - "--quiet", + "--repeats", + "1", + "--no-browser", ] ) assert args.command == "perf" - assert args.perf_command == "hotpath" - assert args.count == 10 - assert args.samples == 2 - assert args.quiet is True + assert args.perf_command == "benchmark" + assert args.name == "smoke" + assert args.num_msgs == 2 + assert args.repeats == 1 + assert args.no_browser is True + + +def test_perf_compare_subparser_accepts_baseline_args(): + parser = build_parser() + + args = parser.parse_args( + [ + "perf", + "compare", + "candidate.txt", + "--baseline", + "baseline.txt", + "--output", + "diff.html", + "--no-browser", + ] + ) + + assert args.command == "perf" + assert args.perf_command == "compare" + assert str(args.perf) == "candidate.txt" + assert str(args.baseline) == "baseline.txt" + assert str(args.output) == "diff.html" + assert args.no_browser is True def test_graphviz_subparser_rejects_mermaid_only_args(): @@ -65,4 +92,4 @@ def test_perf_subparser_rejects_core_only_args(): parser = build_parser() with pytest.raises(SystemExit): - parser.parse_args(["perf", "hotpath", "--address", "127.0.0.1:4000"]) + parser.parse_args(["perf", "benchmark", "--address", "127.0.0.1:4000"]) diff --git a/tests/test_perf_analysis.py b/tests/test_perf_analysis.py new file mode 100644 index 00000000..12c00dc2 --- /dev/null +++ b/tests/test_perf_analysis.py @@ -0,0 +1,218 @@ +import json +from pathlib import Path + +from ezmsg.util.messagecodec import MessageEncoder +from ezmsg.util.perf.analysis import ( + build_report_bundle, + default_compare_html_path, + default_report_html_path, + write_html_report, +) +from ezmsg.util.perf.envinfo import TestEnvironmentInfo as PerfEnvironmentInfo +from ezmsg.util.perf.impl import ( + Metrics as PerfMetrics, + TestLogEntry as PerfLogEntry, + TestParameters as PerfParameters, +) +from ezmsg.util.perf.run import benchmark, output_paths_for_name + + +def _write_perf_log( + path: Path, info: PerfEnvironmentInfo, entries: list[PerfLogEntry] +) -> None: + with open(path, "w") as handle: + handle.write(json.dumps(info, cls=MessageEncoder) + "\n") + for entry in entries: + handle.write(json.dumps(entry, cls=MessageEncoder) + "\n") + + +def _entry( + *, + config: str, + comms: str, + n_clients: int, + msg_size: int, + sample_rate: float, + latency: float, + data_rate: float, +) -> PerfLogEntry: + return PerfLogEntry( + params=PerfParameters( + msg_size=msg_size, + num_msgs=128, + n_clients=n_clients, + config=config, + comms=comms, + max_duration=1.0, + num_buffers=1, + ), + results=PerfMetrics( + num_msgs=128, + sample_rate_mean=sample_rate, + sample_rate_median=sample_rate * 0.98, + latency_mean=latency, + latency_median=latency * 0.97, + latency_total=latency * 128, + data_rate=data_rate, + ), + ) + + +def test_report_and_compare_html_paths(tmp_path): + perf = tmp_path / "perf_20260406_120000.txt" + baseline = tmp_path / "perf_20260405_120000.txt" + + assert default_report_html_path(perf) == tmp_path / "perf_20260406_120000.html" + assert default_compare_html_path(perf, baseline) == ( + tmp_path / "perf_20260406_120000.vs_perf_20260405_120000.html" + ) + assert output_paths_for_name("smoke") == ( + Path("perf_smoke.txt"), + Path("report_smoke.html"), + ) + + +def test_build_report_bundle_and_html_report(tmp_path): + candidate = tmp_path / "candidate.txt" + baseline = tmp_path / "baseline.txt" + + candidate_info = PerfEnvironmentInfo(git_branch="candidate", git_commit="abc123") + baseline_info = PerfEnvironmentInfo(git_branch="baseline", git_commit="def456") + + _write_perf_log( + candidate, + candidate_info, + [ + _entry( + config="fanin", + comms="local", + n_clients=1, + msg_size=64, + sample_rate=1200.0, + latency=0.0009, + data_rate=2_400_000.0, + ), + _entry( + config="relay", + comms="tcp", + n_clients=2, + msg_size=256, + sample_rate=700.0, + latency=0.0024, + data_rate=1_200_000.0, + ), + ], + ) + _write_perf_log( + baseline, + baseline_info, + [ + _entry( + config="fanin", + comms="local", + n_clients=1, + msg_size=64, + sample_rate=1000.0, + latency=0.0012, + data_rate=2_000_000.0, + ), + _entry( + config="relay", + comms="tcp", + n_clients=2, + msg_size=256, + sample_rate=900.0, + latency=0.0018, + data_rate=1_700_000.0, + ), + ], + ) + + bundle = build_report_bundle(candidate, baseline_path=baseline) + assert bundle.relative is True + assert bundle.delta_counts["improvement"] > 0 + assert bundle.delta_counts["regression"] > 0 + assert bundle.top_improvements + assert bundle.top_regressions + + out_path = write_html_report(candidate, baseline_path=baseline, open_browser=False) + html = out_path.read_text(encoding="utf-8") + + assert "Comparison Overview" in html + assert "Biggest Regressions" in html + assert "data-filter='config'" in html + assert "display-mode" in html + assert "metric-view" in html + assert "candidate.vs_baseline.html" == out_path.name + + +def test_benchmark_writes_raw_output_and_html_report(tmp_path, monkeypatch): + output_path = tmp_path / "perf_smoke.txt" + html_path = tmp_path / "report_smoke.html" + monkeypatch.chdir(tmp_path) + + class FakeGraphServer: + def __init__(self): + self.address = ("127.0.0.1", 0) + + def start(self): + return None + + def stop(self): + return None + + html_calls: list[tuple[Path, Path | None, bool]] = [] + + monkeypatch.setattr("ezmsg.util.perf.run.GraphServer", FakeGraphServer) + monkeypatch.setattr("ezmsg.util.perf.run.warmup", lambda _: None) + monkeypatch.setattr( + "ezmsg.util.perf.run.perform_test", + lambda **_: PerfMetrics( + num_msgs=8, + sample_rate_mean=1000.0, + sample_rate_median=950.0, + latency_mean=0.001, + latency_median=0.0009, + latency_total=0.008, + data_rate=1_000_000.0, + ), + ) + + def _fake_write_html_report( + perf_path: Path, + baseline_path: Path | None = None, + output_path: Path | None = None, + open_browser: bool = False, + ) -> Path: + html_calls.append((perf_path, output_path, open_browser)) + target = output_path or perf_path.with_suffix(".html") + target.write_text("", encoding="utf-8") + return target + + monkeypatch.setattr( + "ezmsg.util.perf.analysis.write_html_report", + _fake_write_html_report, + ) + + raw_path, report_path = benchmark( + max_duration=0.01, + num_msgs=8, + num_buffers=1, + iters=1, + repeats=1, + msg_sizes=[64], + n_clients=[1], + comms=["local"], + configs=["fanin"], + grid=True, + warmup_dur=0.0, + name="smoke", + open_browser=False, + ) + + assert raw_path.name == output_path.name + assert report_path is not None + assert report_path.name == html_path.name + assert output_path.exists() + assert html_path.exists() + assert html_calls == [(raw_path, report_path, False)] From 9e93d0574e8dab4e4f356efdd60dcb247f93e6e9 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 6 Apr 2026 16:30:34 -0400 Subject: [PATCH 2/4] refactored ab testing to support shared env and remove --- src/ezmsg/util/perf/ab.py | 549 ++++++++++++++++++++++++++++++++------ tests/test_command.py | 32 +++ tests/test_perf_ab.py | 51 +++- 3 files changed, 552 insertions(+), 80 deletions(-) diff --git a/src/ezmsg/util/perf/ab.py b/src/ezmsg/util/perf/ab.py index 5a2da621..e4696f57 100644 --- a/src/ezmsg/util/perf/ab.py +++ b/src/ezmsg/util/perf/ab.py @@ -2,6 +2,7 @@ import argparse import contextlib +import hashlib import json import os import random @@ -15,6 +16,27 @@ DEFAULT_PAIR_SEED = 0 +DEFAULT_REF_A = "dev" +DEFAULT_REF_B = "CURRENT" +VENV_DIR_CANDIDATES = (".venv", "venv", ".env", "env") + + +@dataclass(frozen=True) +class ABEnvironmentInfo: + label: str + ref: str + tree: str + python: str + python_version: str + ezmsg_version: str + numpy_version: str + git_commit: str + git_branch: str + dirty: bool + env_mode: str + pyproject_hash: str | None + uv_lock_hash: str | None + env_overrides: dict[str, str] @dataclass(frozen=True) @@ -34,6 +56,9 @@ class ABRunSummary: ref_b: str rounds: int seed: int + env_a: ABEnvironmentInfo + env_b: ABEnvironmentInfo + warnings: list[str] cases: list[ABCaseSummary] @@ -49,6 +74,7 @@ def _hotpath_json_arg(path: Path) -> list[str]: def build_hotpath_command( + python: str | Path, output_path: Path, count: int, warmup: int, @@ -59,9 +85,7 @@ def build_hotpath_command( quiet: bool, ) -> list[str]: cmd = [ - "uv", - "run", - "python", + str(python), "-m", "ezmsg.util.perf.hotpath", "--count", @@ -85,6 +109,18 @@ def build_hotpath_command( return cmd +def parse_env_assignments(values: list[str]) -> dict[str, str]: + assignments: dict[str, str] = {} + for value in values: + if "=" not in value: + raise ValueError(f"Environment override must use KEY=VALUE format: {value}") + key, env_value = value.split("=", 1) + if not key: + raise ValueError(f"Environment override is missing a key: {value}") + assignments[key] = env_value + return assignments + + def load_hotpath_summary(path: Path) -> dict[str, float]: payload = json.loads(path.read_text()) return { @@ -99,6 +135,9 @@ def summarize_ab_results( rounds: int, seed: int, paired_runs: list[tuple[dict[str, float], dict[str, float]]], + env_a: ABEnvironmentInfo | None = None, + env_b: ABEnvironmentInfo | None = None, + warnings: list[str] | None = None, ) -> ABRunSummary: case_ids = sorted(paired_runs[0][0].keys()) cases: list[ABCaseSummary] = [] @@ -119,11 +158,30 @@ def summarize_ab_results( ) ) + placeholder = ABEnvironmentInfo( + label="?", + ref="unknown", + tree="unknown", + python="unknown", + python_version="unknown", + ezmsg_version="unknown", + numpy_version="unknown", + git_commit="unknown", + git_branch="unknown", + dirty=False, + env_mode="unknown", + pyproject_hash=None, + uv_lock_hash=None, + env_overrides={}, + ) return ABRunSummary( ref_a=ref_a, ref_b=ref_b, rounds=rounds, seed=seed, + env_a=env_a or placeholder, + env_b=env_b or placeholder, + warnings=warnings or [], cases=cases, ) @@ -136,9 +194,7 @@ def _median(values: list[float]) -> float: return (ordered[mid - 1] + ordered[mid]) / 2.0 -def _run_checked(cmd: list[str], cwd: Path) -> None: - env = os.environ.copy() - env.pop("VIRTUAL_ENV", None) +def _run_checked(cmd: list[str], cwd: Path, env: dict[str, str] | None = None) -> None: completed = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, env=env) if completed.returncode == 0: return @@ -151,6 +207,18 @@ def _run_checked(cmd: list[str], cwd: Path) -> None: ) +def _run_json(cmd: list[str], cwd: Path, env: dict[str, str]) -> dict[str, str]: + completed = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, env=env) + if completed.returncode != 0: + raise RuntimeError( + f"Command failed in {cwd}:\n" + f"$ {' '.join(cmd)}\n\n" + f"stdout:\n{completed.stdout}\n" + f"stderr:\n{completed.stderr}" + ) + return json.loads(completed.stdout) + + def _is_current_ref(ref: str) -> bool: return ref.upper() == "CURRENT" @@ -184,10 +252,6 @@ def _provision_tree( shutil.rmtree(parent, ignore_errors=True) -def _maybe_sync(tree: Path) -> None: - _run_checked(["uv", "sync", "--group", "dev"], cwd=tree) - - def _mirror_hotpath_module(source_root: Path, target_tree: Path) -> None: source = source_root / "src" / "ezmsg" / "util" / "perf" / "hotpath.py" target = target_tree / "src" / "ezmsg" / "util" / "perf" / "hotpath.py" @@ -209,11 +273,182 @@ def _ensure_json_files_match( ) +def _python_from_venv(venv_dir: Path) -> Path | None: + candidates = [venv_dir / "bin" / "python", venv_dir / "Scripts" / "python.exe"] + for candidate in candidates: + if candidate.exists(): + return candidate + return None + + +def discover_tree_python(tree: Path) -> Path | None: + for dirname in VENV_DIR_CANDIDATES: + python = _python_from_venv(tree / dirname) + if python is not None: + return python + return None + + +def _virtual_env_from_python(python: Path) -> str | None: + parent = python.parent + if parent.name in {"bin", "Scripts"}: + return str(parent.parent) + return None + + +def _tree_env( + base_env: dict[str, str], + tree: Path, + python: Path, + overrides: dict[str, str], +) -> dict[str, str]: + env = base_env.copy() + env.update(overrides) + path_parts = [str(tree / "src")] + if env.get("PYTHONPATH"): + path_parts.append(env["PYTHONPATH"]) + env["PYTHONPATH"] = os.pathsep.join(path_parts) + venv = _virtual_env_from_python(python) + if venv is not None: + env["VIRTUAL_ENV"] = venv + return env + + +def _hash_file(path: Path) -> str | None: + if not path.exists(): + return None + return hashlib.sha256(path.read_bytes()).hexdigest()[:12] + + +def _git_value(tree: Path, args: list[str], default: str = "unknown") -> str: + completed = subprocess.run( + ["git", *args], + cwd=tree, + capture_output=True, + text=True, + ) + if completed.returncode != 0: + return default + return completed.stdout.strip() + + +def _git_dirty(tree: Path) -> bool: + completed = subprocess.run( + ["git", "status", "--porcelain"], + cwd=tree, + capture_output=True, + text=True, + ) + if completed.returncode != 0: + return False + return bool(completed.stdout.strip()) + + +def _runtime_probe_env( + tree: Path, + python: Path, + overrides: dict[str, str], +) -> dict[str, str]: + return _tree_env(os.environ.copy(), tree, python, overrides) + + +def _probe_runtime(tree: Path, python: Path, overrides: dict[str, str]) -> dict[str, str]: + script = ( + "import json, sys\n" + "payload = {\n" + " 'python': sys.executable,\n" + " 'python_version': sys.version.replace('\\n', ' '),\n" + " 'ezmsg_version': 'unknown',\n" + " 'numpy_version': 'unknown',\n" + "}\n" + "try:\n" + " import ezmsg.core as ez\n" + " payload['ezmsg_version'] = ez.__version__\n" + "except Exception:\n" + " pass\n" + "try:\n" + " import numpy as np\n" + " payload['numpy_version'] = np.__version__\n" + "except Exception:\n" + " pass\n" + "print(json.dumps(payload))\n" + ) + return _run_json( + [str(python), "-c", script], + cwd=tree, + env=_runtime_probe_env(tree, python, overrides), + ) + + +def _build_env_info( + label: str, + ref: str, + tree: Path, + python: Path, + env_mode: str, + overrides: dict[str, str], +) -> ABEnvironmentInfo: + runtime = _probe_runtime(tree, python, overrides) + return ABEnvironmentInfo( + label=label, + ref=ref, + tree=str(tree), + python=runtime["python"], + python_version=runtime["python_version"], + ezmsg_version=runtime["ezmsg_version"], + numpy_version=runtime["numpy_version"], + git_commit=_git_value(tree, ["rev-parse", "HEAD"]), + git_branch=_git_value(tree, ["rev-parse", "--abbrev-ref", "HEAD"]), + dirty=_git_dirty(tree), + env_mode=env_mode, + pyproject_hash=_hash_file(tree / "pyproject.toml"), + uv_lock_hash=_hash_file(tree / "uv.lock"), + env_overrides=overrides, + ) + + +def _shared_env_warnings(info_a: ABEnvironmentInfo, info_b: ABEnvironmentInfo) -> list[str]: + warnings: list[str] = [] + if info_a.pyproject_hash != info_b.pyproject_hash: + warnings.append( + "pyproject.toml differs between A and B while using a shared environment." + ) + if info_a.uv_lock_hash != info_b.uv_lock_hash: + warnings.append( + "uv.lock differs between A and B while using a shared environment." + ) + return warnings + + +def _print_env_info(env_info: ABEnvironmentInfo) -> None: + print( + f"{env_info.label}: ref={env_info.ref} tree={env_info.tree} " + f"python={env_info.python} env_mode={env_info.env_mode}" + ) + print( + f" python_version={env_info.python_version} " + f"ezmsg={env_info.ezmsg_version} numpy={env_info.numpy_version}" + ) + print( + f" branch={env_info.git_branch} commit={env_info.git_commit} dirty={env_info.dirty}" + ) + print( + f" pyproject_hash={env_info.pyproject_hash} uv_lock_hash={env_info.uv_lock_hash}" + ) + if env_info.env_overrides: + pairs = ", ".join(f"{key}={value}" for key, value in sorted(env_info.env_overrides.items())) + print(f" env_overrides={pairs}") + + def _print_summary(summary: ABRunSummary) -> None: print( f"Interleaved hot-path comparison: A={summary.ref_a}, " f"B={summary.ref_b}, rounds={summary.rounds}, seed={summary.seed}" ) + _print_env_info(summary.env_a) + _print_env_info(summary.env_b) + for warning in summary.warnings: + print(f"WARNING: {warning}") for case in summary.cases: sign = "regression" if case.delta_pct_median > 0 else "improvement" print( @@ -232,14 +467,68 @@ def dump_ab_json(summary: ABRunSummary, path: Path) -> None: "ref_b": summary.ref_b, "rounds": summary.rounds, "seed": summary.seed, + "env_a": asdict(summary.env_a), + "env_b": asdict(summary.env_b), + "warnings": summary.warnings, "cases": [asdict(case) for case in summary.cases], } path.write_text(json.dumps(payload, indent=2) + "\n") +def _default_ref(ref: str | None, fallback: str) -> str: + return fallback if ref is None else ref + + +def _resolve_python_for_shared( + python_a: str | None, + python_b: str | None, +) -> Path: + if python_a is not None and python_b is not None and python_a != python_b: + raise ValueError( + "shared env mode requires a single interpreter; --python-a and --python-b must match" + ) + if python_a is not None: + return Path(python_a) + if python_b is not None: + return Path(python_b) + return Path(sys.executable) + + +def _resolve_python_for_existing( + tree: Path, + label: str, + explicit_python: str | None, + repo_root: Path, +) -> Path: + if explicit_python is not None: + return Path(explicit_python) + + discovered = discover_tree_python(tree) + if discovered is not None: + return discovered + + if tree.resolve() == repo_root.resolve(): + return Path(sys.executable) + + raise ValueError( + f"Could not locate a Python interpreter for side {label} in {tree}. " + "Prepare the environment yourself and rerun with --env-mode existing " + "plus --python-a/--python-b or local .venv/venv directories." + ) + + def perf_ab( - ref_a: str, - ref_b: str, + ref_a: str | None, + ref_b: str | None, + dir_a: Path | None, + dir_b: Path | None, + python_a: str | None, + python_b: str | None, + env_mode: str, + force_shared_env: bool, + env: list[str], + env_a: list[str], + env_b: list[str], rounds: int, count: int, warmup: int, @@ -251,11 +540,23 @@ def perf_ab( seed: int, json_out: Path | None, keep_worktrees: bool, - sync: bool, quiet: bool, ) -> None: if rounds <= 0: raise ValueError("rounds must be > 0") + if dir_a is not None and ref_a is not None: + raise ValueError("Use either --ref-a or --dir-a, not both") + if dir_b is not None and ref_b is not None: + raise ValueError("Use either --ref-b or --dir-b, not both") + if force_shared_env and env_mode != "shared": + raise ValueError("--force-shared-env only applies to --env-mode shared") + + shared_overrides = parse_env_assignments(env) + env_overrides_a = {**shared_overrides, **parse_env_assignments(env_a)} + env_overrides_b = {**shared_overrides, **parse_env_assignments(env_b)} + + resolved_ref_a = dir_a.name if dir_a is not None else _default_ref(ref_a, DEFAULT_REF_A) + resolved_ref_b = dir_b.name if dir_b is not None else _default_ref(ref_b, DEFAULT_REF_B) repo_root = Path( subprocess.run( @@ -267,63 +568,117 @@ def perf_ab( ) pair_order = build_pair_order(rounds, seed) - with _provision_tree(repo_root, ref_a, "A", keep_worktrees) as tree_a: - with _provision_tree(repo_root, ref_b, "B", keep_worktrees) as tree_b: - if tree_a != repo_root: - _mirror_hotpath_module(repo_root, tree_a) - if tree_b != repo_root: - _mirror_hotpath_module(repo_root, tree_b) - - if sync: - _maybe_sync(tree_a) - if tree_b != tree_a: - _maybe_sync(tree_b) - - with tempfile.TemporaryDirectory(prefix="ezmsg-perf-ab-runs-") as tmpdir_name: - tmpdir = Path(tmpdir_name) - cmd_by_label = { - "A": lambda path: build_hotpath_command( - path, - count=count, - warmup=warmup, - payload_sizes=payload_sizes, - transports=transports, - apis=apis, - num_buffers=num_buffers, - quiet=quiet, - ), - "B": lambda path: build_hotpath_command( - path, - count=count, - warmup=warmup, - payload_sizes=payload_sizes, - transports=transports, - apis=apis, - num_buffers=num_buffers, - quiet=quiet, - ), - } - tree_by_label = {"A": tree_a, "B": tree_b} - - for idx in range(prewarm): - for label in ("A", "B"): - if label == "B" and tree_b == tree_a: - continue - warm_path = tmpdir / f"warm-{label}-{idx}.json" - _run_checked(cmd_by_label[label](warm_path), cwd=tree_by_label[label]) - - paired_runs: list[tuple[dict[str, float], dict[str, float]]] = [] - for round_idx, (first, second) in enumerate(pair_order, start=1): - outputs: dict[str, dict[str, float]] = {} - for label in (first, second): - output_path = tmpdir / f"round-{round_idx:02d}-{label}.json" - _run_checked(cmd_by_label[label](output_path), cwd=tree_by_label[label]) - outputs[label] = load_hotpath_summary(output_path) - - _ensure_json_files_match(outputs["A"], outputs["B"], ref_a, ref_b) - paired_runs.append((outputs["A"], outputs["B"])) - - summary = summarize_ab_results(ref_a, ref_b, rounds, seed, paired_runs) + with contextlib.ExitStack() as stack: + tree_a = ( + dir_a.resolve() + if dir_a is not None + else stack.enter_context(_provision_tree(repo_root, resolved_ref_a, "A", keep_worktrees)) + ) + tree_b = ( + dir_b.resolve() + if dir_b is not None + else stack.enter_context(_provision_tree(repo_root, resolved_ref_b, "B", keep_worktrees)) + ) + + if dir_a is None and tree_a != repo_root: + _mirror_hotpath_module(repo_root, tree_a) + if dir_b is None and tree_b != repo_root and tree_b != tree_a: + _mirror_hotpath_module(repo_root, tree_b) + + if env_mode == "shared": + shared_python = _resolve_python_for_shared(python_a, python_b) + python_path_a = shared_python + python_path_b = shared_python + else: + python_path_a = _resolve_python_for_existing(tree_a, "A", python_a, repo_root) + python_path_b = _resolve_python_for_existing(tree_b, "B", python_b, repo_root) + + env_info_a = _build_env_info( + "A", resolved_ref_a, tree_a, python_path_a, env_mode, env_overrides_a + ) + env_info_b = _build_env_info( + "B", resolved_ref_b, tree_b, python_path_b, env_mode, env_overrides_b + ) + + warnings = [] + if env_mode == "shared": + warnings = _shared_env_warnings(env_info_a, env_info_b) + if warnings and not force_shared_env: + raise ValueError( + "Shared-environment comparison detected project metadata mismatches:\n" + + "\n".join(f"- {warning}" for warning in warnings) + + "\n\nRe-run with --force-shared-env to continue anyway, or prepare " + "side-specific environments and use --env-mode existing." + ) + + with tempfile.TemporaryDirectory(prefix="ezmsg-perf-ab-runs-") as tmpdir_name: + tmpdir = Path(tmpdir_name) + cmd_by_label = { + "A": lambda path: build_hotpath_command( + python_path_a, + path, + count=count, + warmup=warmup, + payload_sizes=payload_sizes, + transports=transports, + apis=apis, + num_buffers=num_buffers, + quiet=quiet, + ), + "B": lambda path: build_hotpath_command( + python_path_b, + path, + count=count, + warmup=warmup, + payload_sizes=payload_sizes, + transports=transports, + apis=apis, + num_buffers=num_buffers, + quiet=quiet, + ), + } + tree_by_label = {"A": tree_a, "B": tree_b} + env_by_label = { + "A": _tree_env(os.environ.copy(), tree_a, python_path_a, env_overrides_a), + "B": _tree_env(os.environ.copy(), tree_b, python_path_b, env_overrides_b), + } + + for idx in range(prewarm): + for label in ("A", "B"): + if label == "B" and tree_b == tree_a and env_by_label["B"] == env_by_label["A"]: + continue + warm_path = tmpdir / f"warm-{label}-{idx}.json" + _run_checked( + cmd_by_label[label](warm_path), + cwd=tree_by_label[label], + env=env_by_label[label], + ) + + paired_runs: list[tuple[dict[str, float], dict[str, float]]] = [] + for round_idx, (first, second) in enumerate(pair_order, start=1): + outputs: dict[str, dict[str, float]] = {} + for label in (first, second): + output_path = tmpdir / f"round-{round_idx:02d}-{label}.json" + _run_checked( + cmd_by_label[label](output_path), + cwd=tree_by_label[label], + env=env_by_label[label], + ) + outputs[label] = load_hotpath_summary(output_path) + + _ensure_json_files_match(outputs["A"], outputs["B"], resolved_ref_a, resolved_ref_b) + paired_runs.append((outputs["A"], outputs["B"])) + + summary = summarize_ab_results( + resolved_ref_a, + resolved_ref_b, + rounds, + seed, + paired_runs, + env_a=env_info_a, + env_b=env_info_b, + warnings=warnings, + ) _print_summary(summary) if json_out is not None: dump_ab_json(summary, json_out) @@ -333,10 +688,43 @@ def perf_ab( def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None: p_ab = subparsers.add_parser( "ab", - help="run interleaved A/B hot-path comparisons using git worktrees", + help="run interleaved A/B hot-path comparisons using worktrees or prepared directories", + ) + p_ab.add_argument("--ref-a", default=None, help=f"baseline git ref (default = {DEFAULT_REF_A})") + p_ab.add_argument("--ref-b", default=None, help=f"candidate git ref (default = {DEFAULT_REF_B})") + p_ab.add_argument("--dir-a", type=Path, default=None, help="use an existing directory for side A") + p_ab.add_argument("--dir-b", type=Path, default=None, help="use an existing directory for side B") + p_ab.add_argument("--python-a", default=None, help="explicit Python interpreter for side A") + p_ab.add_argument("--python-b", default=None, help="explicit Python interpreter for side B") + p_ab.add_argument( + "--env-mode", + choices=["shared", "existing"], + default="shared", + help="shared = reuse one interpreter for both sides; existing = use prepared per-tree environments", + ) + p_ab.add_argument( + "--force-shared-env", + action="store_true", + help="continue shared-env comparisons even when pyproject.toml or uv.lock differ", + ) + p_ab.add_argument( + "--env", + action="append", + default=[], + help="environment override for both sides (KEY=VALUE). Repeatable.", + ) + p_ab.add_argument( + "--env-a", + action="append", + default=[], + help="environment override for side A only (KEY=VALUE). Repeatable.", + ) + p_ab.add_argument( + "--env-b", + action="append", + default=[], + help="environment override for side B only (KEY=VALUE). Repeatable.", ) - p_ab.add_argument("--ref-a", default="dev", help="baseline git ref or CURRENT") - p_ab.add_argument("--ref-b", default="CURRENT", help="candidate git ref or CURRENT") p_ab.add_argument( "--rounds", type=int, @@ -405,11 +793,6 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None: action="store_true", help="leave auto-provisioned worktrees on disk for inspection", ) - p_ab.add_argument( - "--sync", - action="store_true", - help="run 'uv sync --group dev' in each provisioned worktree first", - ) p_ab.add_argument( "--quiet", action="store_true", @@ -419,6 +802,15 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None: _handler=lambda ns: perf_ab( ref_a=ns.ref_a, ref_b=ns.ref_b, + dir_a=ns.dir_a, + dir_b=ns.dir_b, + python_a=ns.python_a, + python_b=ns.python_b, + env_mode=ns.env_mode, + force_shared_env=ns.force_shared_env, + env=ns.env, + env_a=ns.env_a, + env_b=ns.env_b, rounds=ns.rounds, count=ns.count, warmup=ns.warmup, @@ -430,7 +822,6 @@ def setup_ab_cmdline(subparsers: argparse._SubParsersAction) -> None: seed=ns.seed, json_out=ns.json_out, keep_worktrees=ns.keep_worktrees, - sync=ns.sync, quiet=ns.quiet, ) ) diff --git a/tests/test_command.py b/tests/test_command.py index 5cf9630d..b75e59d1 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -74,6 +74,38 @@ def test_perf_compare_subparser_accepts_baseline_args(): assert args.no_browser is True +def test_perf_ab_subparser_accepts_manual_env_args(): + parser = build_parser() + + args = parser.parse_args( + [ + "perf", + "ab", + "--dir-a", + "/tmp/a", + "--dir-b", + "/tmp/b", + "--env-mode", + "existing", + "--env", + "FOO=bar", + "--env-a", + "ONLY_A=1", + "--python-b", + "/tmp/b/.venv/bin/python", + ] + ) + + assert args.command == "perf" + assert args.perf_command == "ab" + assert str(args.dir_a) == "/tmp/a" + assert str(args.dir_b) == "/tmp/b" + assert args.env_mode == "existing" + assert args.env == ["FOO=bar"] + assert args.env_a == ["ONLY_A=1"] + assert args.python_b == "/tmp/b/.venv/bin/python" + + def test_graphviz_subparser_rejects_mermaid_only_args(): parser = build_parser() diff --git a/tests/test_perf_ab.py b/tests/test_perf_ab.py index 3fbb935c..b6b0131f 100644 --- a/tests/test_perf_ab.py +++ b/tests/test_perf_ab.py @@ -1,6 +1,8 @@ from ezmsg.util.perf.ab import ( + ABEnvironmentInfo, build_hotpath_command, build_pair_order, + parse_env_assignments, summarize_ab_results, ) @@ -17,6 +19,7 @@ def test_build_pair_order_is_balanced_and_reproducible(): def test_build_hotpath_command_contains_expected_args(tmp_path): cmd = build_hotpath_command( + "/tmp/shared-python", tmp_path / "out.json", count=100, warmup=10, @@ -27,12 +30,19 @@ def test_build_hotpath_command_contains_expected_args(tmp_path): quiet=True, ) - assert cmd[:5] == ["uv", "run", "python", "-m", "ezmsg.util.perf.hotpath"] + assert cmd[:3] == ["/tmp/shared-python", "-m", "ezmsg.util.perf.hotpath"] assert "--count" in cmd assert "--payload-sizes" in cmd assert "--quiet" in cmd +def test_parse_env_assignments_merges_repeatable_values(): + assert parse_env_assignments(["FOO=bar", "BAZ=qux", "FOO=override"]) == { + "FOO": "override", + "BAZ": "qux", + } + + def test_summarize_ab_results_uses_b_vs_a_delta(): paired_runs = [ ( @@ -45,15 +55,54 @@ def test_summarize_ab_results_uses_b_vs_a_delta(): ), ] + env_a = ABEnvironmentInfo( + label="A", + ref="dev", + tree="/tmp/a", + python="/tmp/a/.venv/bin/python", + python_version="3.11.0", + ezmsg_version="1.0.0", + numpy_version="2.0.0", + git_commit="abc", + git_branch="dev", + dirty=False, + env_mode="shared", + pyproject_hash="123", + uv_lock_hash="456", + env_overrides={"FOO": "bar"}, + ) + env_b = ABEnvironmentInfo( + label="B", + ref="CURRENT", + tree="/tmp/b", + python="/tmp/b/.venv/bin/python", + python_version="3.11.0", + ezmsg_version="1.0.0", + numpy_version="2.0.0", + git_commit="def", + git_branch="main", + dirty=True, + env_mode="shared", + pyproject_hash="123", + uv_lock_hash="789", + env_overrides={"FOO": "baz"}, + ) + summary = summarize_ab_results( ref_a="dev", ref_b="CURRENT", rounds=2, seed=0, paired_runs=paired_runs, + env_a=env_a, + env_b=env_b, + warnings=["uv.lock differs"], ) assert len(summary.cases) == 1 + assert summary.env_a == env_a + assert summary.env_b == env_b + assert summary.warnings == ["uv.lock differs"] case = summary.cases[0] assert case.case_id == "async/shm/payload=64/buffers=1" assert case.a_us_per_message_median == 9.0 From c9fbcc3b0982d716aab428b55d90c2bb0d2be6b1 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 6 Apr 2026 16:51:33 -0400 Subject: [PATCH 3/4] fixed tests for windows --- tests/test_command.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_command.py b/tests/test_command.py index b75e59d1..b74df4bd 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -1,4 +1,5 @@ import pytest +from pathlib import Path from ezmsg.core.command import build_parser @@ -98,8 +99,8 @@ def test_perf_ab_subparser_accepts_manual_env_args(): assert args.command == "perf" assert args.perf_command == "ab" - assert str(args.dir_a) == "/tmp/a" - assert str(args.dir_b) == "/tmp/b" + assert args.dir_a == Path("/tmp/a") + assert args.dir_b == Path("/tmp/b") assert args.env_mode == "existing" assert args.env == ["FOO=bar"] assert args.env_a == ["ONLY_A=1"] From 63b02b01146513292a9e410e7e9351e3c53ee957 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Mon, 6 Apr 2026 17:34:53 -0400 Subject: [PATCH 4/4] fix optional dependencies --- pyproject.toml | 3 ++ src/ezmsg/util/perf/analysis.py | 58 ++++++++++++++++++++++++--------- src/ezmsg/util/perf/run.py | 36 ++++++++++++-------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6d796f65..37154bc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,9 @@ docs = [ axisarray = [ "numpy>=2.2.6", ] +perf = [ + "xarray", +] [project.scripts] ezmsg = "ezmsg.core.command:cmdline" diff --git a/src/ezmsg/util/perf/analysis.py b/src/ezmsg/util/perf/analysis.py index ea3db477..743acbbe 100644 --- a/src/ezmsg/util/perf/analysis.py +++ b/src/ezmsg/util/perf/analysis.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import argparse import dataclasses import html @@ -6,25 +8,43 @@ import webbrowser from pathlib import Path - -from ..messagecodec import MessageDecoder -from .envinfo import TestEnvironmentInfo, format_env_diff -from .impl import Metrics, TestLogEntry, TestParameters +from typing import TYPE_CHECKING, Any import ezmsg.core as ez -try: +if TYPE_CHECKING: + import numpy as np + import pandas as pd import xarray as xr - import pandas as pd # xarray depends on pandas -except ImportError: - ez.logger.error("ezmsg perf analysis requires xarray") - raise + from .envinfo import TestEnvironmentInfo -try: - import numpy as np -except ImportError: - ez.logger.error("ezmsg perf analysis requires numpy") - raise +xr: Any | None = None +pd: Any | None = None +np: Any | None = None + + +def _load_analysis_dependencies() -> tuple[Any, Any, Any]: + global xr, pd, np + + if xr is None or pd is None: + try: + import xarray as _xr + import pandas as _pd # xarray depends on pandas + except ImportError: + ez.logger.error("ezmsg perf analysis requires xarray") + raise + xr = _xr + pd = _pd + + if np is None: + try: + import numpy as _np + except ImportError: + ez.logger.error("ezmsg perf analysis requires numpy") + raise + np = _np + + return xr, pd, np TEST_DESCRIPTION = """ Configurations (config): @@ -112,7 +132,12 @@ class ReportBundle: def load_perf(perf: Path) -> xr.Dataset: - all_results: dict[TestParameters, dict[int, list[Metrics]]] = dict() + xr, _, np = _load_analysis_dependencies() + from ..messagecodec import MessageDecoder + from .envinfo import TestEnvironmentInfo + from .impl import Metrics, TestLogEntry + + all_results: dict[Any, dict[int, list[Any]]] = dict() run_idx = 0 with open(perf, "r") as perf_f: @@ -326,6 +351,9 @@ def _terminal_delta_summary( def build_report_bundle(perf_path: Path, baseline_path: Path | None = None) -> ReportBundle: + _load_analysis_dependencies() + from .envinfo import format_env_diff + candidate = load_perf(perf_path) info: TestEnvironmentInfo = candidate.attrs["info"] candidate_frame = _display_frame(_frame_from_dataset(candidate), relative=False) diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 1bebc86d..d002ad74 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import sys import json @@ -14,20 +16,10 @@ import ezmsg.core as ez from ezmsg.core.graphserver import GraphServer -from ..messagecodec import MessageEncoder -from .envinfo import TestEnvironmentInfo -from .util import warmup -from .impl import ( - TestParameters, - TestLogEntry, - perform_test, - Communication, - CONFIGS, -) - DEFAULT_MSG_SIZES = [2**4, 2**20] DEFAULT_N_CLIENTS = [1, 16] -DEFAULT_COMMS = [c for c in Communication] +DEFAULT_COMMS = ["local", "shm", "tcp", "shm_spread", "tcp_spread"] +DEFAULT_CONFIGS = ["fanin", "fanout", "relay"] # --- Output Suppression Context Manager --- @@ -103,6 +95,18 @@ def output_paths_for_name(name: str) -> tuple[Path, Path]: return Path(f"perf_{name}.txt"), Path(f"report_{name}.html") +def warmup(*args, **kwargs): + from .util import warmup as _warmup + + return _warmup(*args, **kwargs) + + +def perform_test(**kwargs): + from .impl import perform_test as _perform_test + + return _perform_test(**kwargs) + + def benchmark( max_duration: float, num_msgs: int, @@ -118,6 +122,10 @@ def benchmark( name: str | None = None, open_browser: bool = True, ) -> tuple[Path, Path | None]: + from ..messagecodec import MessageEncoder + from .envinfo import TestEnvironmentInfo + from .impl import Communication, CONFIGS, TestLogEntry, TestParameters + if n_clients is None: n_clients = DEFAULT_N_CLIENTS if any(c < 0 for c in n_clients): @@ -339,7 +347,7 @@ def setup_benchmark_cmdline(subparsers: argparse._SubParsersAction) -> None: type=str, default=None, nargs="*", - help=f"communication strategies to test (default = {[c.value for c in DEFAULT_COMMS]})", + help=f"communication strategies to test (default = {DEFAULT_COMMS})", ) p_run.add_argument( @@ -347,7 +355,7 @@ def setup_benchmark_cmdline(subparsers: argparse._SubParsersAction) -> None: type=str, default=None, nargs="*", - help=f"configurations to test (default = {[c for c in CONFIGS]})", + help=f"configurations to test (default = {DEFAULT_CONFIGS})", ) p_run.add_argument(