diff --git a/benchmark/locomo/README.md b/benchmark/locomo/README.md index fd42a177d..9b85563b2 100644 --- a/benchmark/locomo/README.md +++ b/benchmark/locomo/README.md @@ -16,7 +16,10 @@ benchmark/locomo/ │ ├── data/ # 测试数据目录 │ └── result/ # 评测结果目录 └── openclaw/ # OpenClaw 评测脚本 - └── eval.py # OpenClaw 评估脚本 + ├── eval.py # OpenClaw 评估脚本 + ├── judge.py # LLM 裁判打分(适配 OpenClaw) + ├── run_full_eval.sh # 一键运行完整评测流程 + └── result/ # 评测结果目录 ``` --- @@ -149,6 +152,38 @@ python stat_judge_result.py --input <评分结果文件> ## OpenClaw 评测流程 +### 完整一键评测 + +使用 `openclaw/run_full_eval.sh` 可以一键运行完整评测流程: + +```bash +cd benchmark/locomo/openclaw +bash run_full_eval.sh # 只导入 OpenViking +bash run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw(并行执行) +bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 QA 评估 +``` + +**脚本参数说明:** + +**脚本执行流程:** +1. 导入数据到 OpenViking(可选同时导入 OpenClaw) +2. 等待 60 秒确保数据导入完成 +3. 运行 QA 评估(`eval.py qa`,输出到 `result/qa_results.csv`) +4. 裁判打分(`judge.py`,并行度 40) +5. 统计结果(`stat_judge_result.py`) + +**脚本内部配置参数:** + +在 `run_full_eval.sh` 脚本顶部可以修改以下配置: + +| 变量 | 说明 | 默认值 | +|------|------|---------------------------| +| `INPUT_FILE` | 输入数据文件路径 | `../data/locomo10.json` | +| `RESULT_DIR` | 结果输出目录 | `./result` | +| `GATEWAY_TOKEN` | OpenClaw Gateway Token | 需要设置为实际 openclaw 网关 token | + +### 分步使用说明 + 使用 `openclaw/eval.py` 进行 OpenClaw 评测,该脚本有两种模式: ### 模式 1: ingest - 导入对话数据到OpenClaw diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 744d441eb..22a565710 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -22,6 +22,7 @@ import os import sys import time +from pathlib import Path import requests @@ -570,7 +571,7 @@ def run_sample_qa( if not qas: print(f"\n=== Sample {sample_id} [{sample_idx}] (user={user_key}) ===", file=sys.stderr) print(f" All QA questions already executed, skipping sample.", file=sys.stderr) - return [], {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + return [], {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} jsonl_path = f"{args.output}.{sample_idx}.jsonl" if args.output else None @@ -725,7 +726,7 @@ def save_record_to_csv(csv_path: str, record: dict) -> None: "sample_id", "sample_idx", "qi", "question", "expected", "response", "category", "evidence", "input_tokens", "output_tokens", "cacheRead", "cacheWrite", "total_tokens", - "timestamp", "jsonl_filename" + "timestamp", "jsonl_filename", "result", "reasoning" ] # Flatten usage fields @@ -738,6 +739,8 @@ def save_record_to_csv(csv_path: str, record: dict) -> None: flat_record["total_tokens"] = usage.get("total_tokens", 0) flat_record["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") flat_record["jsonl_filename"] = flat_record.get("jsonl_filename", "") + flat_record["result"] = "" # 默认为空,由 judge.py 填充 + flat_record["reasoning"] = "" # 默认为空,由 judge.py 填充 try: with open(csv_path, "a", encoding="utf-8", newline="") as f: @@ -765,7 +768,9 @@ def run_qa( print(f" running in single-thread mode", file=sys.stderr) # Load already executed records from CSV - csv_path = f"{args.output}.csv" if args.output else "qa_results.csv" + csv_path = f"{args.output}.csv" if args.output else args.default_csv_path + # 确保输出目录存在 + os.makedirs(os.path.dirname(csv_path), exist_ok=True) executed_records = load_executed_records(csv_path) print(f" Loaded {len(executed_records)} already executed records from {csv_path}", file=sys.stderr) @@ -820,6 +825,10 @@ def parse_session_range(s: str) -> tuple[int, int]: def main(): + # 基于脚本所在目录计算默认 CSV 路径 + script_dir = Path(__file__).parent.resolve() + default_csv_path = str(script_dir / "result" / "qa_results.csv") + parser = argparse.ArgumentParser(description="Evaluate OpenClaw responses") parser.add_argument("mode", choices=["ingest", "qa"], help="Mode: ingest (load conversations) or qa (run QA eval)") parser.add_argument("input", help="Path to test file (.txt or .json)") @@ -901,6 +910,8 @@ def main(): help="Clear all existing ingest records before running", ) args = parser.parse_args() + # 添加默认 CSV 路径到 args + args.default_csv_path = default_csv_path if not args.token and not getattr(args, "viking", False): print("Error: --token or OPENCLAW_GATEWAY_TOKEN env var is required", file=sys.stderr) diff --git a/benchmark/locomo/openclaw/judge.py b/benchmark/locomo/openclaw/judge.py new file mode 100644 index 000000000..f89bbc688 --- /dev/null +++ b/benchmark/locomo/openclaw/judge.py @@ -0,0 +1,203 @@ +import argparse +import csv +import json +import os +import asyncio +from openai import AsyncOpenAI +from dotenv import load_dotenv +from pathlib import Path + +# 加载本地环境变量文件 +env_file = Path.home() / ".openviking_benchmark_env" +load_dotenv(env_file) + + +async def grade_answer( + llm_client, model: str, question: str, gold_answer: str, response: str +) -> tuple[bool, str]: + system_prompt = """ + You are an expert grader that determines if answers to questions match a gold standard answer + """ + + ACCURACY_PROMPT = f""" + Your task is to label an answer to a question as 'CORRECT' or 'WRONG'. You will be given the following data: + (1) a question (posed by one user to another user), + (2) a 'gold' (ground truth) answer, + (3) a generated answer + which you will score as CORRECT/WRONG. + + The point of the question is to ask about something one user should know about the other user based on their prior conversations. + The gold answer will usually be a concise and short answer that includes the referenced topic, for example: + Question: Do you remember what I got the last time I went to Hawaii? + Gold answer: A shell necklace + The generated answer might be much longer, but you should be generous with your grading - as long as it touches on the same topic as the gold answer, it should be counted as CORRECT. + + For time related questions, the gold answer will be a specific date, month, year, etc. The generated answer might be much longer or use relative time references (like "last Tuesday" or "next month"), but you should be generous with your grading - as long as it refers to the same date or time period as the gold answer, it should be counted as CORRECT. Even if the format differs (e.g., "May 7th" vs "7 May"), consider it CORRECT if it's the same date. + + Now it's time for the real question: + Question: {question} + Gold answer: {gold_answer} + Generated answer: {response} + + First, provide a short (one sentence) explanation of your reasoning, then finish with CORRECT or WRONG. + Do NOT include both CORRECT and WRONG in your response, or it will break the evaluation script. + + Respond with JSON only: {{"is_correct": "CORRECT" or "WRONG", "reasoning": "your explanation"}} + """ + + try: + resp = await llm_client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": ACCURACY_PROMPT}, + ], + temperature=0, + timeout=60, + ) + content = resp.choices[0].message.content.strip() + # 提取JSON内容 + start_idx = content.find("{") + end_idx = content.rfind("}") + if start_idx != -1 and end_idx != -1: + json_str = content[start_idx : end_idx + 1].strip() + result = json.loads(json_str) + is_correct = result.get("is_correct", "WRONG").strip().upper() == "CORRECT" + reasoning = result.get("reasoning", "") + return is_correct, reasoning + return False, f"[PARSE ERROR] Invalid response: {content}" + except Exception as e: + return False, f"[API ERROR] {str(e)}" + + +def load_answers(input_path: str) -> tuple[list[dict], list[str]]: + """加载待评分的回答,返回所有行和表头""" + if not os.path.exists(input_path): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with open(input_path, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + fieldnames = reader.fieldnames.copy() + # 新增reasoning列如果不存在 + if "reasoning" not in fieldnames: + fieldnames.append("reasoning") + rows = list(reader) + return rows, fieldnames + + +async def main(): + parser = argparse.ArgumentParser( + description="VikingBot QA judge script, same logic as openclaw evaluation" + ) + parser.add_argument( + "--input", + default="./result/locomo_qa_result_only_sys_memory.csv", + help="Path to QA result csv file, default: ./result/locomo_qa_result.csv", + ) + parser.add_argument( + "--base-url", + default="https://ark.cn-beijing.volces.com/api/v3", + help="Volcengine API base URL, default: https://ark.cn-beijing.volces.com/api/v3", + ) + parser.add_argument( + "--token", + default=os.getenv("ARK_API_KEY", os.getenv("OPENAI_API_KEY", "")), + help="Volcengine API token, default from ARK_API_KEY or OPENAI_API_KEY env var", + ) + parser.add_argument( + "--model", + default="doubao-seed-2-0-pro-260215", + help="Judge model name, default: doubao-seed-2-0-pro-260215", + ) + parser.add_argument( + "--parallel", type=int, default=5, help="Parallel request count, default: 5" + ) + args = parser.parse_args() + + if not args.token: + print("Error: API token is required") + print("\n请通过以下方式设置 API key:") + print(" 1. 创建 ~/.openviking_benchmark_env 文件,内容如下:") + print(" ARK_API_KEY=你的key") + print(" 2. 或者通过 --token 参数传入") + print(" 3. 或者设置环境变量: export ARK_API_KEY=你的key") + exit(1) + + # 加载数据 + rows, fieldnames = load_answers(args.input) + + # 筛选掉 category=5 的行,只处理未评分的行 + valid_rows = [] + ungraded = [] + for i, row in enumerate(rows): + category = row.get("category", "") + if category == "5": + continue + valid_rows.append(i) + if not row.get("result"): + ungraded.append(i) + + total = len(rows) + valid_total = len(valid_rows) + print(f"Total answers: {total}, valid (category != 5): {valid_total}, ungraded: {len(ungraded)}") + + if not ungraded: + print("All valid answers already graded, exit") + return + + # 初始化OpenAI客户端 + client = AsyncOpenAI(base_url=args.base_url, api_key=args.token) + + # 并发处理 + semaphore = asyncio.Semaphore(args.parallel) + file_lock = asyncio.Lock() # 用于同步文件写入 + + async def save_results(): + """保存当前所有结果到CSV文件,使用临时文件+原子替换避免文件损坏""" + async with file_lock: + temp_file = f"{args.input}.tmp" + with open(temp_file, "w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + os.replace(temp_file, args.input) + + async def process_row(idx): + async with semaphore: + row = rows[idx] + question = row["question"] + # 兼容两种列名: expected (eval.py) 或 answer (vikingbot) + gold = row.get("expected") or row.get("answer") + response = row["response"] + print(f"Grading {idx + 1}/{total}: {question[:60]}...") + is_correct, reasoning = await grade_answer(client, args.model, question, gold, response) + row["result"] = "CORRECT" if is_correct else "WRONG" + row["reasoning"] = reasoning + + # 处理完一条就立即保存结果 + await save_results() + print(f"Saved result for {idx + 1}/{total}: {row['result']}") + + return idx, row + + tasks = [process_row(idx) for idx in ungraded] + await asyncio.gather(*tasks) + + # 统计结果 + correct = 0 + total_graded = 0 + for row in rows: + category = row.get("category", "") + if category == "5": + continue + if row.get("result"): + total_graded += 1 + if row.get("result") == "CORRECT": + correct += 1 + accuracy = correct / total_graded if total_graded > 0 else 0.0 + print(f"\nGrading completed: {correct}/{total_graded} correct, accuracy: {accuracy:.2%}") + print(f"All results saved to {args.input}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh new file mode 100755 index 000000000..a14cfa413 --- /dev/null +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -e + +: ' +OpenClaw 完整评估流程脚本 + +用法: + ./run_full_eval.sh # 只导入 OpenViking + ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw + ./run_full_eval.sh --skip-import # 跳过导入步骤 +' + +# 基于脚本所在目录计算数据文件路径 +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +INPUT_FILE="$SCRIPT_DIR/../data/locomo10.json" +RESULT_DIR="$SCRIPT_DIR/result" +OUTPUT_CSV="$RESULT_DIR/qa_results.csv" +GATEWAY_TOKEN="your_gateway_token" + + +# 解析参数 +SKIP_IMPORT=false +WITH_CLAW_IMPORT=false +for arg in "$@"; do + if [ "$arg" = "--skip-import" ]; then + SKIP_IMPORT=true + elif [ "$arg" = "--with-claw-import" ]; then + WITH_CLAW_IMPORT=true + fi +done + +# 确保结果目录存在 +mkdir -p "$RESULT_DIR" + +# Step 1: 导入数据 +if [ "$SKIP_IMPORT" = false ]; then + if [ "$WITH_CLAW_IMPORT" = true ]; then + echo "[1/5] 导入数据到 OpenViking 和 OpenClaw..." + + # 后台运行 OpenViking 导入 + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest > "$RESULT_DIR/import_ov.log" 2>&1 & + PID_OV=$! + + # 后台运行 OpenClaw 导入 + python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" > "$RESULT_DIR/import_claw.log" 2>&1 & + PID_CLAW=$! + + # 等待两个导入任务完成 + wait $PID_OV $PID_CLAW + else + echo "[1/5] 导入数据到 OpenViking..." + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest + fi + + echo "导入完成,等待 1 分钟..." + sleep 60 +else + echo "[1/5] 跳过导入数据..." +fi + +# Step 2: 运行 QA 模型(默认输出到 result/qa_results.csv) +echo "[2/5] 运行 QA 评估..." +python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" + +# Step 3: 裁判打分 +echo "[3/5] 裁判打分..." +python "$SCRIPT_DIR/judge.py" --input "$OUTPUT_CSV" --parallel 40 + +# Step 4: 计算结果 +echo "[4/5] 计算结果..." +python "$SCRIPT_DIR/../vikingbot/stat_judge_result.py" --input "$OUTPUT_CSV" + +echo "[5/5] 完成!" +echo "结果文件: $OUTPUT_CSV" diff --git a/benchmark/locomo/vikingbot/import_to_ov.py b/benchmark/locomo/vikingbot/import_to_ov.py index 509f93b2a..a6b23c461 100644 --- a/benchmark/locomo/vikingbot/import_to_ov.py +++ b/benchmark/locomo/vikingbot/import_to_ov.py @@ -360,17 +360,6 @@ async def viking_ingest( await client.close() -def sync_viking_ingest( - messages: List[Dict[str, Any]], openviking_url: str, session_time: Optional[str] = None -) -> Dict[str, int]: - """Synchronous wrapper for viking_ingest to maintain existing API.""" - return asyncio.run(viking_ingest(messages, openviking_url, session_time)) - -# --------------------------------------------------------------------------- -# Main import logic -# --------------------------------------------------------------------------- - - def parse_session_range(s: str) -> Tuple[int, int]: """Parse '1-4' or '3' into (lo, hi) inclusive tuple.""" if "-" in s: @@ -613,7 +602,7 @@ async def process_sample(item): # 等待所有 sample 处理完成 print( - f"\n[INFO] Starting import with {args.parallel} concurrent workers, {len(tasks)} tasks to process", + f"\n[INFO] Starting import with {len(tasks)} tasks to process", file=sys.stderr, ) await asyncio.gather(*tasks, return_exceptions=True) @@ -679,12 +668,6 @@ def main(): default="http://localhost:1933", help="OpenViking service URL (default: http://localhost:1933)", ) - parser.add_argument( - "--parallel", - type=int, - default=5, - help="Number of concurrent import workers (default: 5)", - ) parser.add_argument( "--sample", type=int,