From 7372107ab06adc045b3dc1d67c8d86bebab15ce3 Mon Sep 17 00:00:00 2001 From: OpenClaw Agent Date: Sat, 7 Mar 2026 17:39:09 -0800 Subject: [PATCH] Add deterministic benchmark routing + judge-only regrading --- README.md | 28 ++- SKILL.md | 26 ++- scripts/benchmark.py | 462 ++++++++++++++++++++++++++++++++++------- scripts/lib_agent.py | 265 +++++++++++++++++++++-- scripts/lib_grading.py | 13 ++ 5 files changed, 697 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 4009eb5..205ba97 100644 --- a/README.md +++ b/README.md @@ -74,15 +74,39 @@ Skip uploading with `--no-upload` if you just want local results. | Flag | Description | |------|-------------| -| `--model MODEL` | Model to test (e.g., `anthropic/claude-sonnet-4`) | +| `--model MODEL` | **Required provider-qualified model** to test (e.g., `anthropic/claude-sonnet-4`, `openai-codex/gpt-5.3-codex`) | +| `--judge-model MODEL` | Provider-qualified model for LLM-judge tasks (default: `anthropic/claude-opus-4.5`) | | `--suite SUITE` | `all`, `automated-only`, or comma-separated task IDs | +| `--thinking LEVEL` | Thinking level for benchmark turns (`off|minimal|low|medium|high|xhigh|adaptive`) | | `--runs N` | Number of runs per task for averaging | | `--timeout-multiplier N` | Scale timeouts for slower models | -| `--output-dir DIR` | Where to save results (default: `results/`) | +| `--output-dir DIR` | Where to save results/checkpoints (default: `results/`) | +| `--judge-only FILE` | Re-run grading only (no task execution) from an existing results/checkpoint JSON | +| `--clear-sessions` | Clear stored OpenClaw session transcripts before each turn (default is keep/persistent) | | `--no-upload` | Skip uploading to leaderboard | | `--register` | Request an API token for submissions | | `--upload FILE` | Upload a previous results JSON | +## Deterministic Model Routing (No Silent Fallbacks) + +PinchBench now enforces strict model determinism for benchmark integrity: + +- You must provide provider-qualified model refs (no bare model IDs) +- Requested models must exist and be available in local OpenClaw model catalog +- Runtime provider/model is verified on every task via transcript model snapshots +- Any provider/model mismatch fails the run immediately +- Judge execution is skipped when task execution fails + +If a requested model is unavailable, PinchBench errors early with a suggestion when possible (for example `openrouter//`). + +## Persistence, Checkpoints, and Re-Judging + +- Sessions are now **persistent by default** (no auto cleanup between tasks) +- A rolling checkpoint is written during benchmark runs: `results/_.checkpoint.json` +- Final output includes full transcripts per task for audit + regrading workflows +- Use `--judge-only --judge-model ` to re-run grading with a different judge model without re-executing tasks +- Use `--clear-sessions` if you explicitly want old session transcripts deleted before each turn + ## Contributing Tasks We welcome new tasks! Check out [`tasks/TASK_TEMPLATE.md`](tasks/TASK_TEMPLATE.md) for the format. Good tasks are: diff --git a/SKILL.md b/SKILL.md index d6f0284..5fbf3a5 100644 --- a/SKILL.md +++ b/SKILL.md @@ -68,15 +68,37 @@ uv run benchmark.py --model anthropic/claude-sonnet-4 --no-upload | Option | Description | |--------|-------------| -| `--model` | Model identifier (e.g., `anthropic/claude-sonnet-4`) | +| `--model` | **Required provider-qualified model** (e.g., `anthropic/claude-sonnet-4`, `openai-codex/gpt-5.3-codex`) | +| `--judge-model` | Provider-qualified judge model (default: `anthropic/claude-opus-4.5`) | | `--suite` | `all`, `automated-only`, or comma-separated task IDs | -| `--output-dir` | Results directory (default: `results/`) | +| `--thinking` | Thinking level (`off|minimal|low|medium|high|xhigh|adaptive`) for benchmark turns | +| `--output-dir` | Results/checkpoint directory (default: `results/`) | | `--timeout-multiplier` | Scale task timeouts for slower models | | `--runs` | Number of runs per task for averaging | +| `--judge-only FILE` | Re-run grading only from existing results/checkpoint JSON (no task execution) | +| `--clear-sessions` | Clear agent/judge transcript sessions before each turn (default is persistent) | | `--no-upload` | Skip uploading to leaderboard | | `--register` | Request new API token for submissions | | `--upload FILE` | Upload previous results JSON | +## Determinism Guarantees + +PinchBench enforces strict model routing for benchmark validity: + +- No silent provider/model fallbacks +- Requested benchmark model must match runtime model on every task +- Requested judge model must match runtime judge model +- Model mismatches fail the run immediately +- If execution fails, PinchBench does not continue to judge that task + +## Persistence & Re-Judging + +- Session transcripts are preserved by default (no auto cleanup between tasks) +- Benchmarks write rolling checkpoint files during execution: `results/_.checkpoint.json` +- Final results include per-task transcripts to support replay/regrading workflows +- You can rerun grading with another judge model using `--judge-only --judge-model ` +- Use `--clear-sessions` when you explicitly want old transcript state removed before each turn + ## Token Registration To submit results to the leaderboard: diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 10fe48a..1b44513 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -26,10 +26,12 @@ from lib_agent import ( cleanup_agent_sessions, ensure_agent_exists, + ensure_model_available, execute_openclaw_task, + normalize_model_id, slugify_model, ) -from lib_grading import GradeResult, grade_task +from lib_grading import DEFAULT_JUDGE_MODEL, grade_task from lib_tasks import Task, TaskLoader @@ -212,6 +214,39 @@ def _parse_args() -> argparse.Namespace: default=1, help="Number of runs per task for averaging", ) + parser.add_argument( + "--judge-model", + default=DEFAULT_JUDGE_MODEL, + help=( + "Judge model identifier (provider/model, e.g. anthropic/claude-opus-4.5). " + "Must be available in the local OpenClaw model catalog." + ), + ) + parser.add_argument( + "--thinking", + default=None, + help=( + "Thinking level for benchmark model turns. " + "Allowed: off|minimal|low|medium|high|xhigh|adaptive" + ), + ) + parser.add_argument( + "--judge-only", + type=str, + metavar="EXECUTION_JSON", + help=( + "Skip benchmark execution and re-run grading from an existing results/checkpoint JSON. " + "Useful for rejudging with a different --judge-model." + ), + ) + parser.add_argument( + "--clear-sessions", + action="store_true", + help=( + "Clear stored agent/judge session transcripts before each turn. " + "Default is to preserve sessions for audit/resume workflows." + ), + ) return parser.parse_args() @@ -265,6 +300,79 @@ def _get_git_version(script_dir: Path) -> str: return result.stdout.strip() +def _build_task_payload( + *, + result: Dict[str, Any], + grading: Optional[Dict[str, Any]], + frontmatter: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + payload = { + "agent_id": result.get("agent_id"), + "task_id": result.get("task_id"), + "status": result.get("status"), + "requested_model": result.get("requested_model"), + "runtime_model": result.get("runtime_model"), + "timed_out": result.get("timed_out", False), + "execution_time": result.get("execution_time", 0.0), + "exit_code": result.get("exit_code"), + "stdout": result.get("stdout", ""), + "stderr": result.get("stderr", ""), + "transcript_length": len(result.get("transcript", [])), + "transcript": result.get("transcript", []), + "usage": result.get("usage", {}), + "workspace": result.get("workspace", ""), + "grading": grading, + "frontmatter": frontmatter or result.get("frontmatter", {}), + } + return payload + + +def _write_aggregate( + *, + output_path: Path, + benchmark_model: Optional[str], + judge_model: str, + thinking_level: Optional[str], + benchmark_version: str, + run_id: str, + suite: str, + runs_per_task: int, + results: List[Dict[str, Any]], + grades_by_task_id: Dict[str, Any], + tasks_by_id: Dict[str, Task], + mode: str = "benchmark", + source_results: Optional[str] = None, +) -> None: + tasks_payload = [] + for result in results: + task_id = result.get("task_id") + task_obj = tasks_by_id.get(task_id) if task_id else None + grading = grades_by_task_id.get(task_id) + tasks_payload.append( + _build_task_payload( + result=result, + grading=grading, + frontmatter=task_obj.frontmatter if task_obj else result.get("frontmatter", {}), + ) + ) + + aggregate = { + "mode": mode, + "source_results": source_results, + "model": benchmark_model, + "judge_model": judge_model, + "thinking": thinking_level, + "benchmark_version": benchmark_version, + "run_id": run_id, + "timestamp": time.time(), + "suite": suite, + "runs_per_task": runs_per_task, + "tasks": tasks_payload, + } + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(aggregate, indent=2), encoding="utf-8") + + def _colorize_gradient(ascii_art: str) -> str: if not _supports_truecolor(): return ascii_art @@ -302,8 +410,8 @@ def main(): sys.exit(1) args = _parse_args() - if not args.model and not args.register and not args.upload: - logger.error("Missing required argument: --model (unless using --register or --upload)") + if not args.model and not args.register and not args.upload and not args.judge_only: + logger.error("Missing required argument: --model (unless using --register, --upload, or --judge-only)") sys.exit(2) if args.register: @@ -345,7 +453,207 @@ def main(): logger.info("📂 Loading tasks from directory...") runner.load_tasks() - model_slug = slugify_model(args.model) + skill_dir = skill_root + + if args.judge_only: + source_path = Path(args.judge_only) + if not source_path.exists(): + logger.error("--judge-only file not found: %s", source_path) + sys.exit(2) + try: + source_payload = json.loads(source_path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + logger.error("Failed to parse --judge-only JSON: %s", exc) + sys.exit(2) + + source_tasks_raw = source_payload.get("tasks", []) if isinstance(source_payload, dict) else [] + if not isinstance(source_tasks_raw, list) or not source_tasks_raw: + logger.error("--judge-only JSON has no tasks to grade") + sys.exit(2) + + try: + judge_model = ensure_model_available(normalize_model_id(args.judge_model), role="Judge") + except (ValueError, RuntimeError) as exc: + logger.error(str(exc)) + sys.exit(2) + + source_tasks: List[Dict[str, Any]] = [] + source_by_id: Dict[str, Dict[str, Any]] = {} + for task_entry in source_tasks_raw: + if not isinstance(task_entry, dict): + continue + task_id = task_entry.get("task_id") + if not isinstance(task_id, str) or not task_id: + continue + source_tasks.append(task_entry) + if task_id not in source_by_id: + source_by_id[task_id] = task_entry + + requested_task_ids = _select_task_ids(runner.tasks, args.suite) + if requested_task_ids is None: + target_task_ids = [task.get("task_id") for task in source_tasks if task.get("task_id")] + else: + missing_task_ids = [task_id for task_id in requested_task_ids if task_id not in source_by_id] + if missing_task_ids: + logger.error( + "--judge-only input is missing requested task IDs: %s", + ", ".join(missing_task_ids), + ) + sys.exit(2) + target_task_ids = requested_task_ids + + if not target_task_ids: + logger.error("No tasks selected for --judge-only run") + sys.exit(2) + + tasks_by_id = {task.task_id: task for task in runner.tasks} + run_root = Path("/tmp/pinchbench") + run_id = _next_run_id(run_root) + + benchmark_model = source_payload.get("model") if isinstance(source_payload, dict) else None + if benchmark_model is not None and not isinstance(benchmark_model, str): + benchmark_model = None + thinking_level = source_payload.get("thinking") if isinstance(source_payload, dict) else None + if thinking_level is not None and not isinstance(thinking_level, str): + thinking_level = None + + results: List[Dict[str, Any]] = [] + grades_by_task_id: Dict[str, Any] = {} + + for i, task_id in enumerate(target_task_ids, 1): + task = tasks_by_id.get(task_id) + if task is None: + logger.error("Task id %s not found in local task set", task_id) + sys.exit(2) + + source_task = source_by_id[task_id] + if source_task.get("status") != "success": + logger.error( + "Cannot judge task %s because execution status is %s (must be success)", + task_id, + source_task.get("status"), + ) + sys.exit(1) + + transcript = source_task.get("transcript") + if not isinstance(transcript, list): + logger.error( + "Task %s in --judge-only input has no transcript array. " + "Use a results/checkpoint file produced by this updated benchmark.", + task_id, + ) + sys.exit(2) + + execution_result = { + "agent_id": source_task.get("agent_id", ""), + "task_id": task_id, + "status": source_task.get("status", "success"), + "requested_model": source_task.get("requested_model"), + "runtime_model": source_task.get("runtime_model"), + "transcript": transcript, + "usage": source_task.get("usage", {}), + "workspace": source_task.get("workspace", ""), + "exit_code": source_task.get("exit_code", 0), + "timed_out": bool(source_task.get("timed_out", False)), + "execution_time": float(source_task.get("execution_time", 0.0)), + "stdout": source_task.get("stdout", ""), + "stderr": source_task.get("stderr", ""), + "frontmatter": source_task.get("frontmatter", {}), + } + + logger.info("\n%s", "=" * 80) + logger.info("📋 Judge-only task %s/%s: %s", i, len(target_task_ids), task_id) + logger.info("%s", "=" * 80) + + try: + grade = grade_task( + task=task, + execution_result=execution_result, + skill_dir=skill_dir, + judge_model=judge_model, + clear_judge_sessions=args.clear_sessions, + ) + except Exception as exc: + logger.error("Judge-only grading failed for %s: %s", task_id, exc) + sys.exit(1) + + results.append(execution_result) + grades_by_task_id[task_id] = { + "runs": [grade.to_dict()], + "mean": grade.score, + "std": 0.0, + "min": grade.score, + "max": grade.score, + } + + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + model_slug = slugify_model(benchmark_model or "judge-only") + output_path = output_dir / f"{run_id}_{model_slug}_judge-only.json" + + _write_aggregate( + output_path=output_path, + benchmark_model=benchmark_model, + judge_model=judge_model, + thinking_level=thinking_level, + benchmark_version=_get_git_version(skill_root), + run_id=run_id, + suite=args.suite, + runs_per_task=1, + results=results, + grades_by_task_id=grades_by_task_id, + tasks_by_id=tasks_by_id, + mode="judge-only", + source_results=str(source_path.resolve()), + ) + + logger.info("Saved judge-only results to %s", output_path) + if args.no_upload: + logger.info("Skipping upload (--no-upload)") + else: + try: + from lib_upload import UploadError, upload_results + + uploaded = upload_results(output_path) + if uploaded.rank is not None: + logger.info("Uploaded to leaderboard: rank #%s", uploaded.rank) + if uploaded.leaderboard_url: + logger.info("View at: %s", uploaded.leaderboard_url) + except UploadError as exc: + logger.warning("Upload failed: %s", exc) + return + + try: + benchmark_model = ensure_model_available(args.model, role="Benchmark") + judge_model = normalize_model_id(args.judge_model) + except (ValueError, RuntimeError) as exc: + logger.error(str(exc)) + sys.exit(2) + + allowed_thinking_levels = {"off", "minimal", "low", "medium", "high", "xhigh", "adaptive"} + thinking_level: Optional[str] = None + if args.thinking: + parsed_levels = [level.strip().lower() for level in args.thinking.split(",") if level.strip()] + invalid_levels = [level for level in parsed_levels if level not in allowed_thinking_levels] + if invalid_levels: + logger.error( + "Invalid --thinking value(s): %s. Allowed values: %s", + ", ".join(invalid_levels), + ", ".join(sorted(allowed_thinking_levels)), + ) + sys.exit(2) + if not parsed_levels: + logger.error("--thinking was provided but no valid thinking levels were parsed") + sys.exit(2) + if len(parsed_levels) > 1: + logger.error( + "Multiple thinking levels in one run are not supported in this branch. " + "Run separate benchmarks per level (e.g., --thinking off, then --thinking low)." + ) + sys.exit(2) + thinking_level = parsed_levels[0] + + model_slug = slugify_model(benchmark_model) run_root = Path("/tmp/pinchbench") run_id = _next_run_id(run_root) skill_dir = skill_root @@ -353,8 +661,9 @@ def main(): # Use a shared workspace for the agent - we'll copy fixtures per task agent_workspace = Path(f"/tmp/pinchbench/{run_id}/agent_workspace") - ensure_agent_exists(agent_id, args.model, agent_workspace) - cleanup_agent_sessions(agent_id) + ensure_agent_exists(agent_id, benchmark_model, agent_workspace) + if args.clear_sessions: + cleanup_agent_sessions(agent_id) task_ids = _select_task_ids(runner.tasks, args.suite) results = [] @@ -365,7 +674,34 @@ def main(): tasks_to_run = [task for task in runner.tasks if task.task_id in task_ids] tasks_by_id = {task.task_id: task for task in tasks_to_run} + if any(task.grading_type in ("llm_judge", "hybrid") for task in tasks_to_run): + try: + judge_model = ensure_model_available(judge_model, role="Judge") + except (ValueError, RuntimeError) as exc: + logger.error(str(exc)) + sys.exit(2) + runs_per_task = max(1, args.runs) + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + checkpoint_path = output_dir / f"{run_id}_{model_slug}.checkpoint.json" + + def _flush_checkpoint() -> None: + _write_aggregate( + output_path=checkpoint_path, + benchmark_model=benchmark_model, + judge_model=judge_model, + thinking_level=thinking_level, + benchmark_version=_get_git_version(skill_root), + run_id=run_id, + suite=args.suite, + runs_per_task=runs_per_task, + results=results, + grades_by_task_id=grades_by_task_id, + tasks_by_id=tasks_by_id, + mode="checkpoint", + ) + for i, task in enumerate(tasks_to_run, 1): task_grades = [] for run_index in range(runs_per_task): @@ -378,52 +714,48 @@ def main(): runs_per_task, ) logger.info("%s", "=" * 80) - execution_error = None try: result = execute_openclaw_task( task=task, agent_id=agent_id, - model_id=args.model, + model_id=benchmark_model, run_id=f"{run_id}-{run_index + 1}", timeout_multiplier=args.timeout_multiplier, skill_dir=skill_dir, + thinking_level=thinking_level, + clear_sessions=args.clear_sessions, ) except Exception as exc: - execution_error = str(exc) - logger.warning( - "Task execution failed for %s, continuing: %s", task.task_id, exc + logger.error("Task execution failed for %s: %s", task.task_id, exc) + sys.exit(1) + + results.append(result) + _flush_checkpoint() + + if result.get("status") != "success": + logger.error( + "Task %s failed determinism checks or execution. requested_model=%s runtime_model=%s stderr=%s", + task.task_id, + result.get("requested_model"), + result.get("runtime_model"), + result.get("stderr"), ) - result = { - "agent_id": agent_id, - "task_id": task.task_id, - "status": "error", - "transcript": [], - "usage": {}, - "workspace": "", - "exit_code": -1, - "timed_out": False, - "execution_time": 0.0, - "stdout": "", - "stderr": execution_error, - } + sys.exit(1) + try: - grade = grade_task(task=task, execution_result=result, skill_dir=skill_dir) - except Exception as exc: - if execution_error: - note = f"Execution failed: {execution_error}; Grading failed: {exc}" - else: - note = f"Grading failed: {exc}" - logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=note, + grade = grade_task( + task=task, + execution_result=result, + skill_dir=skill_dir, + judge_model=judge_model, + clear_judge_sessions=args.clear_sessions, ) + except Exception as exc: + logger.error("Task grading failed for %s: %s", task.task_id, exc) + _flush_checkpoint() + sys.exit(1) + task_grades.append(grade) - results.append(result) task_scores = [grade.score for grade in task_grades] grades_by_task_id[task.task_id] = { @@ -433,47 +765,37 @@ def main(): "min": min(task_scores), "max": max(task_scores), } - - output_dir = Path(args.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - aggregate = { - "model": args.model, - "benchmark_version": _get_git_version(skill_root), - "run_id": run_id, - "timestamp": time.time(), - "suite": args.suite, - "runs_per_task": runs_per_task, - "tasks": [ - { - "task_id": result["task_id"], - "status": result["status"], - "timed_out": result["timed_out"], - "execution_time": result["execution_time"], - "transcript_length": len(result["transcript"]), - "usage": result.get("usage", {}), - "workspace": result["workspace"], - "grading": grades_by_task_id[result["task_id"]], - "frontmatter": tasks_by_id[result["task_id"]].frontmatter, - } - for result in results - ], - } + _flush_checkpoint() output_path = output_dir / f"{run_id}_{model_slug}.json" - output_path.write_text(json.dumps(aggregate, indent=2), encoding="utf-8") + _write_aggregate( + output_path=output_path, + benchmark_model=benchmark_model, + judge_model=judge_model, + thinking_level=thinking_level, + benchmark_version=_get_git_version(skill_root), + run_id=run_id, + suite=args.suite, + runs_per_task=runs_per_task, + results=results, + grades_by_task_id=grades_by_task_id, + tasks_by_id=tasks_by_id, + mode="benchmark", + ) logger.info("Saved results to %s", output_path) + logger.info("Checkpoint file: %s", checkpoint_path) if args.no_upload: logger.info("Skipping upload (--no-upload)") else: try: from lib_upload import UploadError, upload_results - result = upload_results(output_path) - if result.rank is not None: - logger.info("Uploaded to leaderboard: rank #%s", result.rank) - if result.leaderboard_url: - logger.info("View at: %s", result.leaderboard_url) + uploaded = upload_results(output_path) + if uploaded.rank is not None: + logger.info("Uploaded to leaderboard: rank #%s", uploaded.rank) + if uploaded.leaderboard_url: + logger.info("View at: %s", uploaded.leaderboard_url) except UploadError as exc: logger.warning("Upload failed: %s", exc) diff --git a/scripts/lib_agent.py b/scripts/lib_agent.py index 46e3355..adebb9e 100644 --- a/scripts/lib_agent.py +++ b/scripts/lib_agent.py @@ -8,8 +8,9 @@ import logging import subprocess import time +from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from lib_tasks import Task @@ -23,12 +24,92 @@ def slugify_model(model_id: str) -> str: def normalize_model_id(model_id: str) -> str: - """Ensure model id is provider-qualified for OpenClaw.""" - if "/" not in model_id: - return model_id - if model_id.startswith("openrouter/"): - return model_id - return f"openrouter/{model_id}" + """Normalize a model ref while preserving provider routing. + + PinchBench is a deterministic benchmark tool: it must not silently rewrite + provider-qualified model refs (for example by forcing openrouter/...). + """ + normalized = model_id.strip() + while normalized.startswith("/"): + normalized = normalized[1:] + if not normalized or "/" not in normalized: + raise ValueError( + "Model must be provider-qualified (provider/model), " + f"got: {model_id!r}" + ) + return normalized + + +_MODEL_CATALOG_CACHE: Optional[Dict[str, Dict[str, Any]]] = None + + +def _load_model_catalog() -> Dict[str, Dict[str, Any]]: + global _MODEL_CATALOG_CACHE + if _MODEL_CATALOG_CACHE is not None: + return _MODEL_CATALOG_CACHE + + try: + result = subprocess.run( + ["openclaw", "models", "list", "--all", "--json"], + capture_output=True, + text=True, + check=False, + ) + except FileNotFoundError as exc: + raise RuntimeError(f"openclaw CLI not found while loading model catalog: {exc}") from exc + + if result.returncode != 0: + raise RuntimeError( + "Failed to load model catalog via `openclaw models list --all --json`: " + f"{result.stderr.strip() or result.stdout.strip()}" + ) + + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"Failed to parse model catalog JSON: {exc}") from exc + + models = payload.get("models", []) if isinstance(payload, dict) else [] + catalog: Dict[str, Dict[str, Any]] = {} + for entry in models: + if not isinstance(entry, dict): + continue + key = entry.get("key") + if not isinstance(key, str): + continue + catalog[key.lower()] = entry + + _MODEL_CATALOG_CACHE = catalog + return catalog + + +def _suggest_model_ref(model_ref: str, catalog: Dict[str, Dict[str, Any]]) -> Optional[str]: + if model_ref.lower().startswith("openrouter/"): + return None + candidate = f"openrouter/{model_ref}".lower() + if candidate in catalog: + return f"openrouter/{model_ref}" + return None + + +def ensure_model_available(model_id: str, *, role: str) -> str: + """Validate model ref format and availability in local OpenClaw config.""" + normalized_model = normalize_model_id(model_id) + catalog = _load_model_catalog() + entry = catalog.get(normalized_model.lower()) + if entry is None: + suggestion = _suggest_model_ref(normalized_model, catalog) + extra = f" Maybe you meant `{suggestion}`." if suggestion else "" + raise ValueError( + f"{role} model `{normalized_model}` is not in the OpenClaw model catalog.{extra}" + ) + if not entry.get("available", False): + suggestion = _suggest_model_ref(normalized_model, catalog) + extra = f" Maybe you meant `{suggestion}`." if suggestion else "" + raise ValueError( + f"{role} model `{normalized_model}` exists but is not available/configured in this OpenClaw instance.{extra}" + ) + return normalized_model def _get_agent_workspace(agent_id: str) -> Path | None: @@ -277,6 +358,25 @@ def _find_recent_session_path(agent_dir: Path, started_at: float) -> Path | None return max(pool, key=lambda path: path.stat().st_mtime) +def _entry_timestamp_epoch(entry: Dict[str, Any]) -> Optional[float]: + ts = entry.get("timestamp") + if isinstance(ts, (int, float)): + # OpenClaw sometimes stores ms in nested message.timestamp, but the + # top-level timestamp is typically ISO. Keep numeric support just in case. + return ts / 1000.0 if ts > 10_000_000_000 else float(ts) + if isinstance(ts, str): + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")).timestamp() + except ValueError: + return None + msg = entry.get("message") + if isinstance(msg, dict): + mts = msg.get("timestamp") + if isinstance(mts, (int, float)): + return mts / 1000.0 if mts > 10_000_000_000 else float(mts) + return None + + def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[Dict[str, Any]]: agent_dir = _get_agent_store_dir(agent_id) transcript_path = None @@ -352,6 +452,18 @@ def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[ except json.JSONDecodeError as exc: logger.warning("Failed to parse transcript line: %s", exc) transcript.append({"raw": line, "parse_error": str(exc)}) + + # When sessions are persistent, multiple task turns can share one session + # file. Keep only entries for this invocation window when possible. + cutoff = started_at - 2.0 + recent_entries = [] + for entry in transcript: + entry_ts = _entry_timestamp_epoch(entry) + if entry_ts is not None and entry_ts >= cutoff: + recent_entries.append(entry) + if recent_entries: + return recent_entries + return transcript @@ -386,6 +498,67 @@ def _extract_usage_from_transcript(transcript: List[Dict[str, Any]]) -> Dict[str return totals +def _extract_runtime_model_ref(transcript: List[Dict[str, Any]]) -> Optional[str]: + """Extract the last provider/model seen at runtime from transcript.""" + latest_model_ref: Optional[str] = None + + for entry in transcript: + if entry.get("type") != "custom": + continue + if entry.get("customType") != "model-snapshot": + continue + data = entry.get("data", {}) + provider = data.get("provider") + model_id = data.get("modelId") + if isinstance(provider, str) and isinstance(model_id, str): + latest_model_ref = f"{provider}/{model_id}" + + if latest_model_ref: + return latest_model_ref + + for entry in transcript: + if entry.get("type") != "message": + continue + msg = entry.get("message", {}) + if msg.get("role") != "assistant": + continue + provider = msg.get("provider") + model_id = msg.get("model") + if isinstance(provider, str) and isinstance(model_id, str): + latest_model_ref = f"{provider}/{model_id}" + + return latest_model_ref + + +def _extract_terminal_assistant_error(transcript: List[Dict[str, Any]]) -> Optional[str]: + """Return terminal assistant error if the *last* assistant message ended in error. + + OpenClaw may recover from transient provider/auth errors mid-turn (profile + rotation). For deterministic benchmark status we only treat the run as an + assistant error when the last assistant event is itself an error. + """ + last_assistant_msg: Optional[Dict[str, Any]] = None + for entry in transcript: + if entry.get("type") != "message": + continue + msg = entry.get("message", {}) + if msg.get("role") != "assistant": + continue + if isinstance(msg, dict): + last_assistant_msg = msg + + if not last_assistant_msg: + return None + + error_message = last_assistant_msg.get("errorMessage") + stop_reason = last_assistant_msg.get("stopReason") + if isinstance(error_message, str) and error_message.strip(): + return error_message.strip() + if stop_reason == "error": + return "assistant stopReason=error" + return None + + def execute_openclaw_task( *, task: Task, @@ -394,14 +567,17 @@ def execute_openclaw_task( run_id: str, timeout_multiplier: float, skill_dir: Path, + thinking_level: Optional[str] = None, + clear_sessions: bool = False, ) -> Dict[str, Any]: logger.info("🤖 Agent [%s] starting task: %s", agent_id, task.task_id) logger.info(" Task: %s", task.name) logger.info(" Category: %s", task.category) - # Clean up previous session transcripts so we can reliably find this task's - # transcript (OpenClaw uses its own UUID-based naming, not our session ID). - cleanup_agent_sessions(agent_id) + # Optional cleanup for deterministic fresh session directories. + # Default is to preserve sessions so runs are resumable / auditable. + if clear_sessions: + cleanup_agent_sessions(agent_id) start_time = time.time() workspace = prepare_task_workspace(skill_dir, run_id, task, agent_id) @@ -413,17 +589,21 @@ def execute_openclaw_task( timed_out = False try: + command = [ + "openclaw", + "agent", + "--agent", + agent_id, + "--session-id", + session_id, + "--message", + task.prompt, + ] + if thinking_level: + command.extend(["--thinking", thinking_level]) + result = subprocess.run( - [ - "openclaw", - "agent", - "--agent", - agent_id, - "--session-id", - session_id, - "--message", - task.prompt, - ], + command, capture_output=True, text=True, cwd=str(workspace), @@ -444,6 +624,10 @@ def execute_openclaw_task( usage = _extract_usage_from_transcript(transcript) execution_time = time.time() - start_time + requested_model = normalize_model_id(model_id) + runtime_model = _extract_runtime_model_ref(transcript) + assistant_error = _extract_terminal_assistant_error(transcript) + status = "success" if timed_out: status = "timeout" @@ -453,11 +637,24 @@ def execute_openclaw_task( status = "error" if stderr and "openclaw command not found" in str(stderr): status = "error" + if assistant_error: + status = "error" + stderr = f"{stderr}\nAssistant error: {assistant_error}".strip() + if runtime_model is None: + status = "error" + stderr = f"{stderr}\nCould not verify runtime provider/model from transcript.".strip() + elif runtime_model.lower() != requested_model.lower(): + status = "error" + stderr = ( + f"{stderr}\nModel mismatch: requested `{requested_model}` but runtime used `{runtime_model}`." + ).strip() return { "agent_id": agent_id, "task_id": task.task_id, "status": status, + "requested_model": requested_model, + "runtime_model": runtime_model, "transcript": transcript, "usage": usage, "workspace": str(workspace), @@ -475,11 +672,14 @@ def run_openclaw_prompt( prompt: str, workspace: Path, timeout_seconds: float, + expected_model_ref: Optional[str] = None, + clear_sessions: bool = False, ) -> Dict[str, Any]: """Run a single OpenClaw prompt for helper agents like the judge.""" - # Clean up previous session transcripts so we can reliably find this - # prompt's transcript (OpenClaw uses its own UUID-based naming). - cleanup_agent_sessions(agent_id) + # Optional cleanup for deterministic fresh session directories. + # Default is to preserve judge sessions for audit/debug/resume. + if clear_sessions: + cleanup_agent_sessions(agent_id) start_time = time.time() workspace.mkdir(parents=True, exist_ok=True) @@ -552,6 +752,8 @@ def run_openclaw_prompt( transcript = _load_transcript(agent_id, session_id, start_time) execution_time = time.time() - start_time + runtime_model = _extract_runtime_model_ref(transcript) + assistant_error = _extract_terminal_assistant_error(transcript) status = "success" if timed_out: @@ -562,10 +764,27 @@ def run_openclaw_prompt( status = "error" if stderr and "openclaw command not found" in str(stderr): status = "error" + if assistant_error: + status = "error" + stderr = f"{stderr}\nAssistant error: {assistant_error}".strip() + + expected_model = normalize_model_id(expected_model_ref) if expected_model_ref else None + if expected_model: + if runtime_model is None: + status = "error" + stderr = f"{stderr}\nCould not verify runtime provider/model from judge transcript.".strip() + elif runtime_model.lower() != expected_model.lower(): + status = "error" + stderr = ( + f"{stderr}\nJudge model mismatch: requested `{expected_model}` " + f"but runtime used `{runtime_model}`." + ).strip() return { "agent_id": agent_id, "status": status, + "requested_model": expected_model, + "runtime_model": runtime_model, "transcript": transcript, "workspace": str(workspace), "exit_code": exit_code, diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index 773ac33..0b538ec 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -51,6 +51,7 @@ def grade_task( judge_model: str = DEFAULT_JUDGE_MODEL, judge_agent_prefix: str = DEFAULT_JUDGE_AGENT_PREFIX, judge_timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS, + clear_judge_sessions: bool = False, ) -> GradeResult: grading_type = task.grading_type if grading_type == "automated": @@ -63,6 +64,7 @@ def grade_task( judge_agent_prefix=judge_agent_prefix, judge_timeout_seconds=judge_timeout_seconds, skill_dir=skill_dir, + clear_judge_sessions=clear_judge_sessions, ) if grading_type == "hybrid": auto_result = _grade_automated(task, execution_result) @@ -73,6 +75,7 @@ def grade_task( judge_agent_prefix=judge_agent_prefix, judge_timeout_seconds=judge_timeout_seconds, skill_dir=skill_dir, + clear_judge_sessions=clear_judge_sessions, ) return _combine_grades(task, auto_result, llm_result) raise ValueError(f"Unknown grading type: {grading_type}") @@ -129,6 +132,7 @@ def _grade_llm_judge( judge_agent_prefix: str, judge_timeout_seconds: float, skill_dir: Path, + clear_judge_sessions: bool = False, ) -> GradeResult: transcript_summary = _summarize_transcript(execution_result.get("transcript", [])) rubric = task.llm_judge_rubric or _format_grading_criteria(task) @@ -141,7 +145,16 @@ def _grade_llm_judge( prompt=prompt, workspace=judge_workspace, timeout_seconds=judge_timeout_seconds, + expected_model_ref=judge_model, + clear_sessions=clear_judge_sessions, ) + if judge_result.get("status") != "success": + raise RuntimeError( + "Judge run failed determinism checks or execution: " + f"requested_model={judge_result.get('requested_model')} " + f"runtime_model={judge_result.get('runtime_model')} " + f"stderr={judge_result.get('stderr')}" + ) parsed = _parse_judge_response(judge_result.get("transcript", [])) breakdown = parsed.get("scores", {})