diff --git a/scripts/lib_trend.py b/scripts/lib_trend.py new file mode 100644 index 0000000..499f215 --- /dev/null +++ b/scripts/lib_trend.py @@ -0,0 +1,173 @@ +""" +RunTrendAnalyzer — detect score regression across sequential benchmark runs. + +Analyzes results JSON files written by benchmark.py to detect whether a model's +performance is improving, stable, or degrading over time via OLS slope fitting. +""" +import json +import logging +import statistics +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + +logger = logging.getLogger("benchmark") + + +@dataclass +class RunPoint: + """A single data point from a benchmark run.""" + run_id: str + timestamp: float + model: str + score_pct: float + task_count: int + + +@dataclass +class RunTrendReport: + """Trend analysis report for a single model.""" + model: str + run_count: int + window: int + slope: float + points: List[RunPoint] + regression_detected: bool + regression_threshold: float + task_count_varies: bool = False + + def summary(self) -> str: + """Return a CLI-friendly summary string.""" + direction = ( + "▼ REGRESSION" + if self.regression_detected + else "▲ improving" + if self.slope > 0 + else "→ stable" + ) + note = ( + " ⚠ task count varied — slope may reflect suite changes" + if self.task_count_varies + else "" + ) + return ( + f"{direction}: {self.model} slope={self.slope:+.2f}%/run " + f"over last {self.run_count} runs " + f"(threshold={self.regression_threshold:+.2f}){note}" + ) + + +class RunTrendAnalyzer: + """Detect performance regression across sequential benchmark runs.""" + + def __init__( + self, + results_dir: Path, + window: int = 10, + regression_threshold: float = -0.5, + ): + """ + Args: + results_dir: Directory containing benchmark result JSON files. + window: Number of most recent runs to analyze. + regression_threshold: Slope (pct/run) below which regression is flagged. + """ + self.results_dir = results_dir + self.window = window + self.regression_threshold = regression_threshold + + def load_points(self, model: Optional[str] = None) -> Dict[str, List[RunPoint]]: + """ + Load and group RunPoint data from result JSON files, keyed by model slug. + Skips files that fail to parse (JSONDecodeError, OSError). + """ + grouped: Dict[str, List[RunPoint]] = {} + for path in sorted(self.results_dir.glob("*.json")): + try: + data = json.loads(path.read_text()) + except (json.JSONDecodeError, OSError): + continue + + m = data.get("model", "") + ts = data.get("timestamp", 0.0) + run_id = data.get("run_id", path.stem) + tasks = data.get("tasks", []) + if not tasks: + continue + + total = sum( + t["grading"]["mean"] + for t in tasks + if "grading" in t + ) + score_pct = (total / len(tasks)) * 100 + + if model and m != model: + continue + + grouped.setdefault(m, []).append( + RunPoint(run_id, ts, m, score_pct, len(tasks)) + ) + + for pts in grouped.values(): + pts.sort(key=lambda p: p.timestamp) + + return grouped + + def analyze( + self, model: Optional[str] = None + ) -> List[RunTrendReport]: + """ + Run OLS slope analysis per model over the configured window. + Returns a list of RunTrendReport, sorted by slope ascending. + """ + grouped = self.load_points(model) + reports: List[RunTrendReport] = [] + + for m, pts in grouped.items(): + window_pts = pts[-self.window:] + if len(window_pts) < 2: + continue + + xs = list(range(len(window_pts))) + ys = [p.score_pct for p in window_pts] + slope, intercept = statistics.linear_regression(xs, ys) + + task_counts = {p.task_count for p in window_pts} + task_count_varies = len(task_counts) > 1 + + reports.append( + RunTrendReport( + model=m, + run_count=len(window_pts), + window=self.window, + slope=slope, + points=window_pts, + regression_detected=slope < self.regression_threshold, + regression_threshold=self.regression_threshold, + task_count_varies=task_count_varies, + ) + ) + + reports.sort(key=lambda r: r.slope) + return reports + + def run(self, model: Optional[str] = None) -> None: + """CLI entry: analyze and print results.""" + reports = self.analyze(model) + if not reports: + logger.info("No trend data available (need ≥2 runs per model).") + return + + logger.info("\n" + "=" * 80) + logger.info("📈 RUN TREND ANALYSIS") + logger.info("=" * 80) + + for report in reports: + logger.info(" %s", report.summary()) + + # Show recent scores + for p in report.points: + logger.info(" %s: %.1f%% (%d tasks)", p.run_id, p.score_pct, p.task_count) + + logger.info("%s", "=" * 80) diff --git a/tests/test_lib_trend.py b/tests/test_lib_trend.py new file mode 100644 index 0000000..1fb7c6b --- /dev/null +++ b/tests/test_lib_trend.py @@ -0,0 +1,160 @@ +"""Tests for lib_trend — RunTrendAnalyzer.""" +import json +import tempfile +import time +from pathlib import Path +from unittest import TestCase + +from scripts.lib_trend import RunTrendAnalyzer, RunPoint, RunTrendReport + + +class TestRunTrendAnalyzer(TestCase): + def _write_run(self, run_dir: Path, run_id: str, model: str, scores: list): + """Helper: each score = one task's grading.mean in the result JSON.""" + tasks = [ + {"task_id": f"task_{i}", "grading": {"mean": s}} + for i, s in enumerate(scores) + ] + data = { + "model": model, + "run_id": run_id, + "timestamp": time.time(), + "suite": "all", + "tasks": tasks, + } + (run_dir / f"{run_id}_{model.replace('/', '_')}.json").write_text(json.dumps(data)) + + def test_no_data_returns_empty(self): + with tempfile.TemporaryDirectory() as tmp: + analyzer = RunTrendAnalyzer(Path(tmp)) + self.assertEqual(analyzer.analyze(), []) + + def test_single_run_returns_empty(self): + """Need >= 2 runs for trend analysis.""" + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + self._write_run(run_dir, "0001", "claude", [0.8]) + analyzer = RunTrendAnalyzer(run_dir) + self.assertEqual(analyzer.analyze(), []) + + def test_regression_detected(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + # Declining overall scores: 4 runs, single-task each + for i, score in enumerate([0.9, 0.85, 0.80, 0.75]): + self._write_run(run_dir, f"{i:04d}", "claude-sonnet", [score]) + analyzer = RunTrendAnalyzer(run_dir, window=10, regression_threshold=-0.5) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + self.assertTrue(reports[0].regression_detected) + self.assertLess(reports[0].slope, -0.5) + + def test_improving_not_regression(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + for i, score in enumerate([0.75, 0.80, 0.85, 0.90]): + self._write_run(run_dir, f"{i:04d}", "gpt", [score]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + self.assertFalse(reports[0].regression_detected) + self.assertGreater(reports[0].slope, 0) + + def test_malformed_file_skipped(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + (run_dir / "bad.json").write_text("{INVALID JSON!") + self._write_run(run_dir, "0001", "model-a", [0.8]) + self._write_run(run_dir, "0002", "model-a", [0.9]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + + def test_task_count_varies_flag(self): + """Suite expansion across runs should set task_count_varies.""" + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + self._write_run(run_dir, "0001", "claude", [0.9]) # 1 task + self._write_run(run_dir, "0002", "claude", [0.85, 0.88, 0.90]) # 3 tasks + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + self.assertTrue(reports[0].task_count_varies) + + def test_task_count_varies_false_when_equal(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + self._write_run(run_dir, "0001", "claude", [0.9, 0.8]) + self._write_run(run_dir, "0002", "claude", [0.85, 0.88]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + self.assertFalse(reports[0].task_count_varies) + + def test_summary_string_regression(self): + report = RunTrendReport( + model="claude-sonnet", + run_count=5, + window=10, + slope=-1.2, + points=[], + regression_detected=True, + regression_threshold=-0.5, + task_count_varies=False, + ) + summary = report.summary() + self.assertIn("REGRESSION", summary) + self.assertIn("-1.20", summary) + + def test_summary_string_task_count_warning(self): + report = RunTrendReport( + model="gpt-4", + run_count=4, + window=10, + slope=-0.8, + points=[], + regression_detected=True, + regression_threshold=-0.5, + task_count_varies=True, + ) + summary = report.summary() + self.assertIn("task count varied", summary) + + def test_stable_scores(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + for i, score in enumerate([0.80, 0.80, 0.80, 0.80]): + self._write_run(run_dir, f"{i:04d}", "stable-model", [score]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 1) + self.assertEqual(reports[0].slope, 0.0) + self.assertFalse(reports[0].regression_detected) + + def test_multiple_models(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + self._write_run(run_dir, "0001", "claude", [0.9]) + self._write_run(run_dir, "0002", "claude", [0.8]) + self._write_run(run_dir, "0003", "gpt", [0.7]) + self._write_run(run_dir, "0004", "gpt", [0.75]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze() + self.assertEqual(len(reports), 2) + # Sorted by slope ascending + self.assertEqual(reports[0].model, "claude") + self.assertLess(reports[0].slope, 0) + self.assertEqual(reports[1].model, "gpt") + self.assertGreater(reports[1].slope, 0) + + def test_filter_by_model(self): + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + self._write_run(run_dir, "0001", "claude", [0.9]) + self._write_run(run_dir, "0002", "claude", [0.8]) + self._write_run(run_dir, "0003", "gpt", [0.7]) + self._write_run(run_dir, "0004", "gpt", [0.75]) + analyzer = RunTrendAnalyzer(run_dir) + reports = analyzer.analyze(model="claude") + self.assertEqual(len(reports), 1) + self.assertEqual(reports[0].model, "claude")