From 11ea3828c6e60ed9abf14bb0d40d91b628be9807 Mon Sep 17 00:00:00 2001 From: DuTao Date: Wed, 8 Apr 2026 00:30:21 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E7=9A=84=E4=B8=80=E9=94=AE=E8=AF=84=E6=B5=8B=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/locomo/openclaw/eval.py | 17 +- benchmark/locomo/openclaw/judge.py | 203 +++++++++++++++++++++ benchmark/locomo/openclaw/run_full_eval.sh | 75 ++++++++ benchmark/locomo/vikingbot/import_to_ov.py | 19 +- 4 files changed, 293 insertions(+), 21 deletions(-) create mode 100644 benchmark/locomo/openclaw/judge.py create mode 100755 benchmark/locomo/openclaw/run_full_eval.sh diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 744d441eb..22a565710 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -22,6 +22,7 @@ import os import sys import time +from pathlib import Path import requests @@ -570,7 +571,7 @@ def run_sample_qa( if not qas: print(f"\n=== Sample {sample_id} [{sample_idx}] (user={user_key}) ===", file=sys.stderr) print(f" All QA questions already executed, skipping sample.", file=sys.stderr) - return [], {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + return [], {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} jsonl_path = f"{args.output}.{sample_idx}.jsonl" if args.output else None @@ -725,7 +726,7 @@ def save_record_to_csv(csv_path: str, record: dict) -> None: "sample_id", "sample_idx", "qi", "question", "expected", "response", "category", "evidence", "input_tokens", "output_tokens", "cacheRead", "cacheWrite", "total_tokens", - "timestamp", "jsonl_filename" + "timestamp", "jsonl_filename", "result", "reasoning" ] # Flatten usage fields @@ -738,6 +739,8 @@ def save_record_to_csv(csv_path: str, record: dict) -> None: flat_record["total_tokens"] = usage.get("total_tokens", 0) flat_record["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") flat_record["jsonl_filename"] = flat_record.get("jsonl_filename", "") + flat_record["result"] = "" # 默认为空,由 judge.py 填充 + flat_record["reasoning"] = "" # 默认为空,由 judge.py 填充 try: with open(csv_path, "a", encoding="utf-8", newline="") as f: @@ -765,7 +768,9 @@ def run_qa( print(f" running in single-thread mode", file=sys.stderr) # Load already executed records from CSV - csv_path = f"{args.output}.csv" if args.output else "qa_results.csv" + csv_path = f"{args.output}.csv" if args.output else args.default_csv_path + # 确保输出目录存在 + os.makedirs(os.path.dirname(csv_path), exist_ok=True) executed_records = load_executed_records(csv_path) print(f" Loaded {len(executed_records)} already executed records from {csv_path}", file=sys.stderr) @@ -820,6 +825,10 @@ def parse_session_range(s: str) -> tuple[int, int]: def main(): + # 基于脚本所在目录计算默认 CSV 路径 + script_dir = Path(__file__).parent.resolve() + default_csv_path = str(script_dir / "result" / "qa_results.csv") + parser = argparse.ArgumentParser(description="Evaluate OpenClaw responses") parser.add_argument("mode", choices=["ingest", "qa"], help="Mode: ingest (load conversations) or qa (run QA eval)") parser.add_argument("input", help="Path to test file (.txt or .json)") @@ -901,6 +910,8 @@ def main(): help="Clear all existing ingest records before running", ) args = parser.parse_args() + # 添加默认 CSV 路径到 args + args.default_csv_path = default_csv_path if not args.token and not getattr(args, "viking", False): print("Error: --token or OPENCLAW_GATEWAY_TOKEN env var is required", file=sys.stderr) diff --git a/benchmark/locomo/openclaw/judge.py b/benchmark/locomo/openclaw/judge.py new file mode 100644 index 000000000..f89bbc688 --- /dev/null +++ b/benchmark/locomo/openclaw/judge.py @@ -0,0 +1,203 @@ +import argparse +import csv +import json +import os +import asyncio +from openai import AsyncOpenAI +from dotenv import load_dotenv +from pathlib import Path + +# 加载本地环境变量文件 +env_file = Path.home() / ".openviking_benchmark_env" +load_dotenv(env_file) + + +async def grade_answer( + llm_client, model: str, question: str, gold_answer: str, response: str +) -> tuple[bool, str]: + system_prompt = """ + You are an expert grader that determines if answers to questions match a gold standard answer + """ + + ACCURACY_PROMPT = f""" + Your task is to label an answer to a question as 'CORRECT' or 'WRONG'. You will be given the following data: + (1) a question (posed by one user to another user), + (2) a 'gold' (ground truth) answer, + (3) a generated answer + which you will score as CORRECT/WRONG. + + The point of the question is to ask about something one user should know about the other user based on their prior conversations. + The gold answer will usually be a concise and short answer that includes the referenced topic, for example: + Question: Do you remember what I got the last time I went to Hawaii? + Gold answer: A shell necklace + The generated answer might be much longer, but you should be generous with your grading - as long as it touches on the same topic as the gold answer, it should be counted as CORRECT. + + For time related questions, the gold answer will be a specific date, month, year, etc. The generated answer might be much longer or use relative time references (like "last Tuesday" or "next month"), but you should be generous with your grading - as long as it refers to the same date or time period as the gold answer, it should be counted as CORRECT. Even if the format differs (e.g., "May 7th" vs "7 May"), consider it CORRECT if it's the same date. + + Now it's time for the real question: + Question: {question} + Gold answer: {gold_answer} + Generated answer: {response} + + First, provide a short (one sentence) explanation of your reasoning, then finish with CORRECT or WRONG. + Do NOT include both CORRECT and WRONG in your response, or it will break the evaluation script. + + Respond with JSON only: {{"is_correct": "CORRECT" or "WRONG", "reasoning": "your explanation"}} + """ + + try: + resp = await llm_client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": ACCURACY_PROMPT}, + ], + temperature=0, + timeout=60, + ) + content = resp.choices[0].message.content.strip() + # 提取JSON内容 + start_idx = content.find("{") + end_idx = content.rfind("}") + if start_idx != -1 and end_idx != -1: + json_str = content[start_idx : end_idx + 1].strip() + result = json.loads(json_str) + is_correct = result.get("is_correct", "WRONG").strip().upper() == "CORRECT" + reasoning = result.get("reasoning", "") + return is_correct, reasoning + return False, f"[PARSE ERROR] Invalid response: {content}" + except Exception as e: + return False, f"[API ERROR] {str(e)}" + + +def load_answers(input_path: str) -> tuple[list[dict], list[str]]: + """加载待评分的回答,返回所有行和表头""" + if not os.path.exists(input_path): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with open(input_path, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + fieldnames = reader.fieldnames.copy() + # 新增reasoning列如果不存在 + if "reasoning" not in fieldnames: + fieldnames.append("reasoning") + rows = list(reader) + return rows, fieldnames + + +async def main(): + parser = argparse.ArgumentParser( + description="VikingBot QA judge script, same logic as openclaw evaluation" + ) + parser.add_argument( + "--input", + default="./result/locomo_qa_result_only_sys_memory.csv", + help="Path to QA result csv file, default: ./result/locomo_qa_result.csv", + ) + parser.add_argument( + "--base-url", + default="https://ark.cn-beijing.volces.com/api/v3", + help="Volcengine API base URL, default: https://ark.cn-beijing.volces.com/api/v3", + ) + parser.add_argument( + "--token", + default=os.getenv("ARK_API_KEY", os.getenv("OPENAI_API_KEY", "")), + help="Volcengine API token, default from ARK_API_KEY or OPENAI_API_KEY env var", + ) + parser.add_argument( + "--model", + default="doubao-seed-2-0-pro-260215", + help="Judge model name, default: doubao-seed-2-0-pro-260215", + ) + parser.add_argument( + "--parallel", type=int, default=5, help="Parallel request count, default: 5" + ) + args = parser.parse_args() + + if not args.token: + print("Error: API token is required") + print("\n请通过以下方式设置 API key:") + print(" 1. 创建 ~/.openviking_benchmark_env 文件,内容如下:") + print(" ARK_API_KEY=你的key") + print(" 2. 或者通过 --token 参数传入") + print(" 3. 或者设置环境变量: export ARK_API_KEY=你的key") + exit(1) + + # 加载数据 + rows, fieldnames = load_answers(args.input) + + # 筛选掉 category=5 的行,只处理未评分的行 + valid_rows = [] + ungraded = [] + for i, row in enumerate(rows): + category = row.get("category", "") + if category == "5": + continue + valid_rows.append(i) + if not row.get("result"): + ungraded.append(i) + + total = len(rows) + valid_total = len(valid_rows) + print(f"Total answers: {total}, valid (category != 5): {valid_total}, ungraded: {len(ungraded)}") + + if not ungraded: + print("All valid answers already graded, exit") + return + + # 初始化OpenAI客户端 + client = AsyncOpenAI(base_url=args.base_url, api_key=args.token) + + # 并发处理 + semaphore = asyncio.Semaphore(args.parallel) + file_lock = asyncio.Lock() # 用于同步文件写入 + + async def save_results(): + """保存当前所有结果到CSV文件,使用临时文件+原子替换避免文件损坏""" + async with file_lock: + temp_file = f"{args.input}.tmp" + with open(temp_file, "w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + os.replace(temp_file, args.input) + + async def process_row(idx): + async with semaphore: + row = rows[idx] + question = row["question"] + # 兼容两种列名: expected (eval.py) 或 answer (vikingbot) + gold = row.get("expected") or row.get("answer") + response = row["response"] + print(f"Grading {idx + 1}/{total}: {question[:60]}...") + is_correct, reasoning = await grade_answer(client, args.model, question, gold, response) + row["result"] = "CORRECT" if is_correct else "WRONG" + row["reasoning"] = reasoning + + # 处理完一条就立即保存结果 + await save_results() + print(f"Saved result for {idx + 1}/{total}: {row['result']}") + + return idx, row + + tasks = [process_row(idx) for idx in ungraded] + await asyncio.gather(*tasks) + + # 统计结果 + correct = 0 + total_graded = 0 + for row in rows: + category = row.get("category", "") + if category == "5": + continue + if row.get("result"): + total_graded += 1 + if row.get("result") == "CORRECT": + correct += 1 + accuracy = correct / total_graded if total_graded > 0 else 0.0 + print(f"\nGrading completed: {correct}/{total_graded} correct, accuracy: {accuracy:.2%}") + print(f"All results saved to {args.input}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh new file mode 100755 index 000000000..c8f660afb --- /dev/null +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -e + +: ' +OpenClaw 完整评估流程脚本 + +用法: + ./run_full_eval.sh # 只导入 OpenViking + ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw + ./run_full_eval.sh --skip-import # 跳过导入步骤 +' + +# 基于脚本所在目录计算数据文件路径 +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +INPUT_FILE="$SCRIPT_DIR/../data/locomo10.json" +RESULT_DIR="$SCRIPT_DIR/result" +OUTPUT_CSV="$RESULT_DIR/qa_results.csv" +GATEWAY_TOKEN="90f2d2dc2f7b4d50cb943d3d3345e667bb3e9bcb7ec3a1fb" + + +# 解析参数 +SKIP_IMPORT=false +WITH_CLAW_IMPORT=false +for arg in "$@"; do + if [ "$arg" = "--skip-import" ]; then + SKIP_IMPORT=true + elif [ "$arg" = "--with-claw-import" ]; then + WITH_CLAW_IMPORT=true + fi +done + +# 确保结果目录存在 +mkdir -p "$RESULT_DIR" + +# Step 1: 导入数据 +if [ "$SKIP_IMPORT" = false ]; then + if [ "$WITH_CLAW_IMPORT" = true ]; then + echo "[1/5] 导入数据到 OpenViking 和 OpenClaw..." + + # 后台运行 OpenViking 导入 + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest > "$RESULT_DIR/import_ov.log" 2>&1 & + PID_OV=$! + + # 后台运行 OpenClaw 导入 + python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" > "$RESULT_DIR/import_claw.log" 2>&1 & + PID_CLAW=$! + + # 等待两个导入任务完成 + wait $PID_OV $PID_CLAW + else + echo "[1/5] 导入数据到 OpenViking..." + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest + fi + + echo "导入完成,等待 1 分钟..." + sleep 60 +else + echo "[1/5] 跳过导入数据..." +fi + +# Step 2: 运行 QA 模型(默认输出到 result/qa_results.csv) +echo "[2/5] 运行 QA 评估..." +python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" + +# Step 3: 裁判打分 +echo "[3/5] 裁判打分..." +python "$SCRIPT_DIR/judge.py" --input "$OUTPUT_CSV" --parallel 40 + +# Step 4: 计算结果 +echo "[4/5] 计算结果..." +python "$SCRIPT_DIR/../vikingbot/stat_judge_result.py" --input "$OUTPUT_CSV" + +echo "[5/5] 完成!" +echo "结果文件: $OUTPUT_CSV" diff --git a/benchmark/locomo/vikingbot/import_to_ov.py b/benchmark/locomo/vikingbot/import_to_ov.py index 509f93b2a..a6b23c461 100644 --- a/benchmark/locomo/vikingbot/import_to_ov.py +++ b/benchmark/locomo/vikingbot/import_to_ov.py @@ -360,17 +360,6 @@ async def viking_ingest( await client.close() -def sync_viking_ingest( - messages: List[Dict[str, Any]], openviking_url: str, session_time: Optional[str] = None -) -> Dict[str, int]: - """Synchronous wrapper for viking_ingest to maintain existing API.""" - return asyncio.run(viking_ingest(messages, openviking_url, session_time)) - -# --------------------------------------------------------------------------- -# Main import logic -# --------------------------------------------------------------------------- - - def parse_session_range(s: str) -> Tuple[int, int]: """Parse '1-4' or '3' into (lo, hi) inclusive tuple.""" if "-" in s: @@ -613,7 +602,7 @@ async def process_sample(item): # 等待所有 sample 处理完成 print( - f"\n[INFO] Starting import with {args.parallel} concurrent workers, {len(tasks)} tasks to process", + f"\n[INFO] Starting import with {len(tasks)} tasks to process", file=sys.stderr, ) await asyncio.gather(*tasks, return_exceptions=True) @@ -679,12 +668,6 @@ def main(): default="http://localhost:1933", help="OpenViking service URL (default: http://localhost:1933)", ) - parser.add_argument( - "--parallel", - type=int, - default=5, - help="Number of concurrent import workers (default: 5)", - ) parser.add_argument( "--sample", type=int, From e41e3b23c9be31d99800e71a591eb4e591381a46 Mon Sep 17 00:00:00 2001 From: DuTao Date: Wed, 8 Apr 2026 00:35:09 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E7=9A=84=E4=B8=80=E9=94=AE=E8=AF=84=E6=B5=8B=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/locomo/README.md | 37 +++++++++++++++++++++- benchmark/locomo/openclaw/run_full_eval.sh | 2 +- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/benchmark/locomo/README.md b/benchmark/locomo/README.md index fd42a177d..9b85563b2 100644 --- a/benchmark/locomo/README.md +++ b/benchmark/locomo/README.md @@ -16,7 +16,10 @@ benchmark/locomo/ │ ├── data/ # 测试数据目录 │ └── result/ # 评测结果目录 └── openclaw/ # OpenClaw 评测脚本 - └── eval.py # OpenClaw 评估脚本 + ├── eval.py # OpenClaw 评估脚本 + ├── judge.py # LLM 裁判打分(适配 OpenClaw) + ├── run_full_eval.sh # 一键运行完整评测流程 + └── result/ # 评测结果目录 ``` --- @@ -149,6 +152,38 @@ python stat_judge_result.py --input <评分结果文件> ## OpenClaw 评测流程 +### 完整一键评测 + +使用 `openclaw/run_full_eval.sh` 可以一键运行完整评测流程: + +```bash +cd benchmark/locomo/openclaw +bash run_full_eval.sh # 只导入 OpenViking +bash run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw(并行执行) +bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 QA 评估 +``` + +**脚本参数说明:** + +**脚本执行流程:** +1. 导入数据到 OpenViking(可选同时导入 OpenClaw) +2. 等待 60 秒确保数据导入完成 +3. 运行 QA 评估(`eval.py qa`,输出到 `result/qa_results.csv`) +4. 裁判打分(`judge.py`,并行度 40) +5. 统计结果(`stat_judge_result.py`) + +**脚本内部配置参数:** + +在 `run_full_eval.sh` 脚本顶部可以修改以下配置: + +| 变量 | 说明 | 默认值 | +|------|------|---------------------------| +| `INPUT_FILE` | 输入数据文件路径 | `../data/locomo10.json` | +| `RESULT_DIR` | 结果输出目录 | `./result` | +| `GATEWAY_TOKEN` | OpenClaw Gateway Token | 需要设置为实际 openclaw 网关 token | + +### 分步使用说明 + 使用 `openclaw/eval.py` 进行 OpenClaw 评测,该脚本有两种模式: ### 模式 1: ingest - 导入对话数据到OpenClaw diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh index c8f660afb..a14cfa413 100755 --- a/benchmark/locomo/openclaw/run_full_eval.sh +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -16,7 +16,7 @@ SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" INPUT_FILE="$SCRIPT_DIR/../data/locomo10.json" RESULT_DIR="$SCRIPT_DIR/result" OUTPUT_CSV="$RESULT_DIR/qa_results.csv" -GATEWAY_TOKEN="90f2d2dc2f7b4d50cb943d3d3345e667bb3e9bcb7ec3a1fb" +GATEWAY_TOKEN="your_gateway_token" # 解析参数 From 68985b01a71d48fc08cab54332e5ccaf1fb9238f Mon Sep 17 00:00:00 2001 From: DuTao Date: Wed, 8 Apr 2026 18:33:49 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=AF=84=E6=B5=8B?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/locomo/openclaw/eval.py | 500 +++++++------ benchmark/locomo/openclaw/import_to_ov.py | 667 ++++++++++++++++++ benchmark/locomo/openclaw/run_full_eval.sh | 59 +- .../locomo/openclaw/stat_judge_result.py | 88 +++ 4 files changed, 1075 insertions(+), 239 deletions(-) create mode 100644 benchmark/locomo/openclaw/import_to_ov.py create mode 100644 benchmark/locomo/openclaw/stat_judge_result.py diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 22a565710..317004ac5 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -22,16 +22,19 @@ import os import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +from threading import Lock import requests # Configuration constants DEFAULT_BASE_URL = "http://127.0.0.1:18789" -DEFAULT_SESSION_KEY = "eval-test-2" -DEFAULT_AGENT_ID = "locomo-eval" +DEFAULT_AGENT_ID = "main" DEFAULT_INGEST_RECORD_PATH = ".ingest_record.json" -DEFAULT_OV_COMMAND = ["ov", "add-memory"] + +# CSV write lock for thread safety +csv_lock = Lock() # --------------------------------------------------------------------------- @@ -261,6 +264,51 @@ def extract_response_text(response_json: dict) -> str: return f"[ERROR: could not extract text from response: {response_json}]" +def get_session_id_from_key(session_key: str, user: str, agent_id: str = "main") -> str | None: + """Search all agents' sessions.json files for the session_key and return sessionFile path. + Returns the full path to the session JSONL file if found, None otherwise. + """ + agents_base_dir = os.path.expanduser("~/.openclaw/agents") + + if not os.path.exists(agents_base_dir): + print(f" [session] Agents directory not found: {agents_base_dir}", file=sys.stderr) + return None + + # Iterate through all agent directories + for agent_name in os.listdir(agents_base_dir): + agent_dir = os.path.join(agents_base_dir, agent_name) + if not os.path.isdir(agent_dir): + continue + + sessions_dir = os.path.join(agent_dir, "sessions") + sessions_file = os.path.join(sessions_dir, "sessions.json") + + if not os.path.exists(sessions_file): + continue + + try: + with open(sessions_file, "r") as f: + data = json.load(f) + + # Search for the session_key in this sessions.json + for key, value in data.items(): + if session_key in key and isinstance(value, dict): + session_file = value.get("sessionFile") + if session_file: + print(f" [session] Found sessionFile in agent '{agent_name}': {session_file}", file=sys.stderr) + return session_file + + except json.JSONDecodeError as e: + print(f" [session] Error parsing {sessions_file}: {e}", file=sys.stderr) + continue + except IOError as e: + print(f" [session] Error reading {sessions_file}: {e}", file=sys.stderr) + continue + + print(f" [session] session_key '{session_key}' not found in any agent's sessions.json", file=sys.stderr) + return None + + def get_session_id(user: str, agent_id: str = "main") -> str | None: """Read the current session ID for the given user from sessions.json.""" sessions_file = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions/sessions.json") @@ -280,46 +328,85 @@ def get_session_id(user: str, agent_id: str = "main") -> str | None: return None -def reset_session(session_id: str, agent_id: str = "main") -> str | None: - """Archive the session .jsonl file by renaming it with a timestamp suffix. +def reset_session(session_path: str, agent_id: str = "main") -> str | None: + """Rename the session .jsonl file with a timestamp suffix. + Accepts either a session_id or a full path to the session file. Returns the new filename if successful, None otherwise. """ - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") - src = os.path.join(sessions_dir, f"{session_id}.jsonl") - dst = f"{src}.{int(time.time())}" + # Check if session_path is already a full path + if os.path.isabs(session_path) and os.path.exists(session_path): + src = session_path + else: + # Treat as session_id + sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") + src = os.path.join(sessions_dir, f"{session_path}.jsonl") + + if not os.path.exists(src): + print(f" [backup] Session file not found: {src}", file=sys.stderr) + return None + + timestamp = time.strftime("%Y%m%d_%H%M%S") + dst = f"{src}.{timestamp}" try: os.rename(src, dst) new_filename = os.path.basename(dst) - print(f" [reset] archived {session_id}.jsonl -> {new_filename}", file=sys.stderr) + print(f" [backup] renamed {os.path.basename(src)} -> {new_filename}", file=sys.stderr) return new_filename - except FileNotFoundError: - print(f" [reset] Session file not found: {src}", file=sys.stderr) - return None except IOError as e: - print(f" [reset] could not archive session file: {e}", file=sys.stderr) + print(f" [backup] could not rename session file: {e}", file=sys.stderr) return None -def viking_ingest(msg: str) -> None: - """Save a message to OpenViking via `ov add-memory`.""" - import subprocess - result = subprocess.run( - DEFAULT_OV_COMMAND + [msg], - capture_output=True, - text=True, - ) - if result.returncode != 0: - raise RuntimeError(result.stderr.strip() or f"ov exited with code {result.returncode}") +def calculate_usage_from_jsonl(jsonl_filename: str, agent_id: str = "main") -> dict: + """Calculate token usage from archived JSONL file.""" + # Check if jsonl_filename is already a full path + if os.path.isabs(jsonl_filename) and os.path.exists(jsonl_filename): + jsonl_full_path = jsonl_filename + else: + sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") + jsonl_full_path = os.path.join(sessions_dir, jsonl_filename) + + usage = { + "input_tokens": 0, + "output_tokens": 0, + "cacheRead": 0, + "cacheWrite": 0, + "total_tokens": 0, + } + + if not os.path.exists(jsonl_full_path): + return usage + + try: + with open(jsonl_full_path, "r", encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + entry = json.loads(line) + if entry.get("type") == "message" and entry.get("message", {}).get("role") == "assistant": + entry_usage = entry.get("message", {}).get("usage", {}) + usage["input_tokens"] += entry_usage.get("input", 0) + usage["output_tokens"] += entry_usage.get("output", 0) + usage["cacheRead"] += entry_usage.get("cacheRead", 0) + usage["cacheWrite"] += entry_usage.get("cacheWrite", 0) + usage["total_tokens"] += entry_usage.get("totalTokens", 0) + except json.JSONDecodeError as e: + print(f" [usage] Error parsing JSONL file: {e}", file=sys.stderr) + except IOError as e: + print(f" [usage] Error reading JSONL file: {e}", file=sys.stderr) + + return usage def send_message_with_retry( - base_url: str, token: str, user: str, message: str, retries: int = 2, agent_id: str = DEFAULT_AGENT_ID + base_url: str, token: str, user: str, message: str, retries: int = 2, + agent_id: str = DEFAULT_AGENT_ID, session_key: str | None = None ) -> tuple[str, dict]: """Call send_message with up to `retries` retries on failure.""" last_exc = None for attempt in range(retries + 1): try: - return send_message(base_url, token, user, message, agent_id) + return send_message(base_url, token, user, message, agent_id, session_key) except Exception as e: last_exc = e if attempt < retries: @@ -328,7 +415,8 @@ def send_message_with_retry( def send_message( - base_url: str, token: str, user: str, message: str, agent_id: str = DEFAULT_AGENT_ID + base_url: str, token: str, user: str, message: str, + agent_id: str = DEFAULT_AGENT_ID, session_key: str | None = None ) -> tuple[str, dict]: """Send a single message to the OpenClaw responses API. @@ -338,9 +426,10 @@ def send_message( headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}", - "X-OpenClaw-Agent-ID": agent_id, - "X-OpenClaw-Session-Key": DEFAULT_SESSION_KEY + "X-OpenClaw-Agent-ID": agent_id } + if session_key: + headers["X-OpenClaw-Session-Key"] = session_key payload = { "model": "openclaw", "input": message, @@ -414,62 +503,36 @@ def run_ingest( preview = msg.replace("\n", " | ")[:80] print(f" [{label}] {preview}...", file=sys.stderr) - if args.viking: - try: - viking_ingest(msg) - print(f" -> [viking] saved", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": "[viking] saved", - "usage": {}, - }) - # Mark as successfully ingested - mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { - "mode": "viking", - "date_time": meta['date_time'] - }) - except Exception as e: - print(f" -> [ERROR] {e}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": f"[ERROR] {e}", - "usage": {}, - }) - else: - try: - reply, usage = send_message(args.base_url, args.token, user_key, msg, args.agent_id) - print(f" -> {reply[:80]}{'...' if len(reply) > 80 else ''}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": reply, - "usage": usage, - }) - # Mark as successfully ingested - mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { - "mode": "openclaw", - "date_time": meta['date_time'], - "usage": usage - }) - except Exception as e: - print(f" -> [ERROR] {e}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": f"[ERROR] {e}", - "usage": {}, - }) - - if session_id is None: - session_id = get_session_id(user_key, args.agent_id) - if session_id: - reset_session(session_id, args.agent_id) + try: + reply, usage = send_message(args.base_url, args.token, user_key, msg, args.agent_id) + print(f" -> {reply[:80]}{'...' if len(reply) > 80 else ''}", file=sys.stderr) + results.append({ + "sample_id": sample_id, + "session": meta["session_key"], + "user": user_key, + "reply": reply, + "usage": usage, + }) + # Mark as successfully ingested + mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { + "mode": "openclaw", + "date_time": meta['date_time'], + "usage": usage + }) + except Exception as e: + print(f" -> [ERROR] {e}", file=sys.stderr) + results.append({ + "sample_id": sample_id, + "session": meta["session_key"], + "user": user_key, + "reply": f"[ERROR] {e}", + "usage": {}, + }) + + if session_id is None: + session_id = get_session_id(user_key, args.agent_id) + if session_id: + reset_session(session_id, args.agent_id) if args.output: try: @@ -545,6 +608,83 @@ def run_ingest( # QA: run QA questions and compare with expected answers # --------------------------------------------------------------------------- +def process_single_question( + sample_id: str, + sample_idx: int, + original_qi: int, + qa: dict, + args: argparse.Namespace, + csv_path: str, +) -> dict: + """Process a single QA question. Returns the record.""" + question = qa["question"] + expected = str(qa["answer"]) + category = qa.get("category", "") + evidence = qa.get("evidence", []) + + # Generate unique session_key based on sample_id + question_index + session_key = f"qa-{sample_id}-q{original_qi}" + user_key = args.user or f"eval-{sample_idx}" + + print(f" [{sample_idx}] Q{original_qi}: {question[:60]}{'...' if len(question) > 60 else ''}", file=sys.stderr) + + jsonl_filename = "" + try: + response, api_usage = send_message_with_retry( + args.base_url, args.token, sample_id, question, 2, args.agent_id, session_key + ) + print(f" [{sample_idx}] A: {response[:60]}{'...' if len(response) > 60 else ''}", file=sys.stderr) + + # Get sessionFile path from sessions.json using session_key + session_file_path = get_session_id_from_key(session_key, user_key, args.agent_id) + jsonl_filename = "" + + # Archive the session file if we found it + if session_file_path: + jsonl_filename = reset_session(session_file_path, args.agent_id) + + # Calculate usage from JSONL file if available, otherwise use API usage + if jsonl_filename and session_file_path: + # Use the directory from session_file_path and the archived filename + usage = calculate_usage_from_jsonl(os.path.join(os.path.dirname(session_file_path), jsonl_filename), args.agent_id) + print(f" [{sample_idx}] tokens (from JSONL): in={usage['input_tokens']} out={usage['output_tokens']} cacheRead={usage['cacheRead']} cacheWrite={usage['cacheWrite']} total={usage['total_tokens']}", file=sys.stderr) + else: + usage = { + "input_tokens": api_usage.get("input_tokens", 0), + "output_tokens": api_usage.get("output_tokens", 0), + "cacheRead": api_usage.get("cacheRead", 0), + "cacheWrite": api_usage.get("cacheWrite", 0), + "total_tokens": api_usage.get("total_tokens", 0), + } + print(f" [{sample_idx}] tokens (from API): in={usage['input_tokens']} out={usage['output_tokens']} cacheRead={usage['cacheRead']} cacheWrite={usage['cacheWrite']} total={usage['total_tokens']}", file=sys.stderr) + + except Exception as e: + response = f"[ERROR] {e}" + usage = {} + jsonl_filename = "" + print(f" [{sample_idx}] A: {response}", file=sys.stderr) + + record = { + "sample_id": sample_id, + "sample_idx": sample_idx, + "qi": original_qi, + "question": question, + "expected": expected, + "response": response, + "category": category, + "evidence": evidence, + "usage": usage, + "jsonl_filename": jsonl_filename, + } + + # Save to CSV with lock for thread safety + with csv_lock: + save_record_to_csv(csv_path, record) + print(f" [{sample_idx}] Saved to CSV: Q{original_qi}", file=sys.stderr) + + return record + + def run_sample_qa( item: dict, sample_idx: int, @@ -552,7 +692,7 @@ def run_sample_qa( executed_records: set, csv_path: str, ) -> tuple[list[dict], dict]: - """Process QA for a single sample. Returns (records, sample_usage).""" + """Process QA for a single sample with concurrent question execution. Returns (records, sample_usage).""" sample_id = item["sample_id"] user_key = args.user or f"eval-{sample_idx}" qas = [q for q in item.get("qa", []) if str(q.get("category", "")) != "5"] @@ -573,131 +713,33 @@ def run_sample_qa( print(f" All QA questions already executed, skipping sample.", file=sys.stderr) return [], {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - jsonl_path = f"{args.output}.{sample_idx}.jsonl" if args.output else None - - sample_usage = {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - records = [] - session_id = None - print(f"\n=== Sample {sample_id} [{sample_idx}] (user={user_key}) ===", file=sys.stderr) - print(f" Running {len(qas)} QA question(s)...", file=sys.stderr) + print(f" Running {len(qas)} QA question(s) with max {args.parallel} workers...", file=sys.stderr) - jsonl_file = None - if jsonl_path: - try: - jsonl_file = open(jsonl_path, "w", encoding="utf-8") - except IOError as e: - print(f"Warning: Could not open JSONL file {jsonl_path}: {e}", file=sys.stderr) + records = [] + sample_usage = {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - try: + # Use ThreadPoolExecutor for concurrent question execution + with ThreadPoolExecutor(max_workers=args.parallel) as executor: + futures = [] for original_qi, qa in qas: - question = qa["question"] - expected = str(qa["answer"]) - category = qa.get("category", "") - evidence = qa.get("evidence", []) - - print(f" [{sample_idx}] Q{original_qi}: {question[:60]}{'...' if len(question) > 60 else ''}", file=sys.stderr) - - jsonl_filename = "" + future = executor.submit( + process_single_question, + sample_id, sample_idx, original_qi, qa, args, csv_path + ) + futures.append(future) + + # Collect results + for future in as_completed(futures): try: - response, api_usage = send_message_with_retry( - args.base_url, args.token, user_key, question, 2, args.agent_id, - ) - print(f" [{sample_idx}] A: {response[:60]}{'...' if len(response) > 60 else ''}", file=sys.stderr) - - # Use provided session_id if available, otherwise get from system - if args.session_id: - session_id = args.session_id - elif session_id is None: - session_id = get_session_id(user_key, args.agent_id) - - # Reset session and get archived filename - if session_id: - jsonl_filename = reset_session(session_id, args.agent_id) - - # Use API usage by default - usage = api_usage - # Calculate usage from JSONL file if session_id is provided and we have the archived file - if args.session_id and jsonl_filename: - # Parse the archived JSONL file to calculate usage - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{args.agent_id}/sessions") - jsonl_full_path = os.path.join(sessions_dir, jsonl_filename) - if os.path.exists(jsonl_full_path): - total_input = 0 - total_output = 0 - total_cache_read = 0 - total_cache_write = 0 - total_total_tokens = 0 - try: - with open(jsonl_full_path, "r", encoding="utf-8") as f: - for line in f: - if not line.strip(): - continue - entry = json.loads(line) - if entry.get("type") == "message" and entry.get("message", {}).get("role") == "assistant": - entry_usage = entry.get("message", {}).get("usage", {}) - total_input += entry_usage.get("input", 0) - total_output += entry_usage.get("output", 0) - total_cache_read += entry_usage.get("cacheRead", 0) - total_cache_write += entry_usage.get("cacheWrite", 0) - total_total_tokens += entry_usage.get("totalTokens", 0) - usage = { - "input_tokens": total_input, - "output_tokens": total_output, - "cacheRead": total_cache_read, - "cacheWrite": total_cache_write, - "total_tokens": total_total_tokens, - } - print(f" [{sample_idx}] tokens (from JSONL): in={total_input} out={total_output} cacheRead={total_cache_read} cacheWrite={total_cache_write} total={total_total_tokens}", file=sys.stderr) - except json.JSONDecodeError as e: - print(f" [{sample_idx}] Error parsing JSONL file: {e}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - except IOError as e: - print(f" [{sample_idx}] Error reading JSONL file: {e}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - else: - print(f" [{sample_idx}] JSONL file not found: {jsonl_full_path}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - else: - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - + record = future.result() + records.append(record) + # Accumulate usage + usage = record.get("usage", {}) for k in sample_usage: sample_usage[k] += usage.get(k, 0) except Exception as e: - response = f"[ERROR] {e}" - usage = {} - jsonl_filename = "" - print(f" [{sample_idx}] A: {response}", file=sys.stderr) - - record = { - "sample_id": sample_id, - "sample_idx": sample_idx, - "qi": original_qi, - "question": question, - "expected": expected, - "response": response, - "category": category, - "evidence": evidence, - "usage": usage, - "jsonl_filename": jsonl_filename, - } - records.append(record) - - # Save to CSV immediately after successful execution - save_record_to_csv(csv_path, record) - print(f" [{sample_idx}] Saved to CSV: Q{original_qi}", file=sys.stderr) - - if jsonl_file: - try: - jsonl_file.write(json.dumps(record, ensure_ascii=False) + "\n") - jsonl_file.flush() - except IOError as e: - print(f"Warning: Error writing to JSONL file: {e}", file=sys.stderr) - - finally: - if jsonl_file: - jsonl_file.close() - print(f" [{sample_idx}] written to {jsonl_path}", file=sys.stderr) + print(f" [{sample_idx}] Error in question task: {e}", file=sys.stderr) return records, sample_usage @@ -763,9 +805,12 @@ def run_qa( print("Error: QA mode only works with LoCoMo JSON files", file=sys.stderr) sys.exit(1) + # Ensure parallel is within reasonable bounds (1-40) + args.parallel = max(1, min(40, args.parallel)) + samples = load_locomo_data(args.input, args.sample) print(f" user: {args.user or 'eval-{sample_idx}'}", file=sys.stderr) - print(f" running in single-thread mode", file=sys.stderr) + print(f" running with {args.parallel} concurrent workers", file=sys.stderr) # Load already executed records from CSV csv_path = f"{args.output}.csv" if args.output else args.default_csv_path @@ -774,17 +819,6 @@ def run_qa( executed_records = load_executed_records(csv_path) print(f" Loaded {len(executed_records)} already executed records from {csv_path}", file=sys.stderr) - # Clean up existing session file if session_id is provided - if args.session_id: - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{args.agent_id}/sessions") - session_file = os.path.join(sessions_dir, f"{args.session_id}.jsonl") - if os.path.exists(session_file): - try: - os.remove(session_file) - print(f" Cleaned up existing session file: {os.path.basename(session_file)}", file=sys.stderr) - except Exception as e: - print(f" Warning: Could not remove existing session file: {e}", file=sys.stderr) - results_list = [] for idx, item in enumerate(samples): result = run_sample_qa(item, idx + 1, args, executed_records, csv_path) @@ -797,7 +831,31 @@ def run_qa( print(f"\n total tokens: in={total_usage['input_tokens']} out={total_usage['output_tokens']} total={total_usage['total_tokens']}", file=sys.stderr) + # Generate timestamp once for all backups + timestamp = time.strftime("%Y%m%d_%H%M%S") + import shutil + + # Backup CSV file with timestamp + if os.path.exists(csv_path): + csv_path_obj = Path(csv_path) + backup_csv_path = csv_path_obj.parent / f"{csv_path_obj.stem}_{timestamp}{csv_path_obj.suffix}" + try: + shutil.copy2(csv_path, backup_csv_path) + print(f" CSV backed up to: {backup_csv_path}", file=sys.stderr) + except Exception as e: + print(f"Warning: Failed to backup CSV file: {e}", file=sys.stderr) + if args.output: + # Backup output summary file too + if os.path.exists(args.output): + output_path_obj = Path(args.output) + backup_output_path = output_path_obj.parent / f"{output_path_obj.stem}_{timestamp}{output_path_obj.suffix}" + try: + shutil.copy2(args.output, backup_output_path) + print(f" Summary backed up to: {backup_output_path}", file=sys.stderr) + except Exception as e: + print(f"Warning: Failed to backup summary file: {e}", file=sys.stderr) + try: with open(args.output, "w", encoding="utf-8") as f: f.write("=== TOTAL USAGE ===\n") @@ -877,15 +935,9 @@ def main(): parser.add_argument( "-p", "--parallel", type=int, - default=1, + default=10, metavar="N", - help="QA mode: number of samples to process concurrently (max 10, default 1).", - ) - parser.add_argument( - "--viking", - action="store_true", - default=False, - help="Ingest mode: save to OpenViking via `ov add-memory` instead of OpenClaw.", + help="QA mode: number of questions to process concurrently (max 40, default 10).", ) parser.add_argument( "--agent-id", @@ -895,7 +947,7 @@ def main(): parser.add_argument( "--session-id", default=None, - help="Session ID for API requests. If provided, will use this session ID and calculate token usage from corresponding JSONL file.", + help="Session ID for API requests (ingest mode only).", ) parser.add_argument( "--force-ingest", @@ -913,7 +965,7 @@ def main(): # 添加默认 CSV 路径到 args args.default_csv_path = default_csv_path - if not args.token and not getattr(args, "viking", False): + if not args.token: print("Error: --token or OPENCLAW_GATEWAY_TOKEN env var is required", file=sys.stderr) sys.exit(1) diff --git a/benchmark/locomo/openclaw/import_to_ov.py b/benchmark/locomo/openclaw/import_to_ov.py new file mode 100644 index 000000000..adca77dc4 --- /dev/null +++ b/benchmark/locomo/openclaw/import_to_ov.py @@ -0,0 +1,667 @@ +""" +OpenViking data import tool. + +Import conversations from LoCoMo JSON or plain text files into OpenViking memory. + +Usage: + # Import LoCoMo JSON conversations + uv run python import_to_ov.py locomo10.json --sample 0 --sessions 1-4 + + # Import plain text conversations + uv run python import_to_ov.py example.txt +""" + +import argparse +import asyncio +import csv +import json +import sys +import time +import traceback +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Any, Tuple, Optional + +import openviking as ov + + +def _get_session_number(session_key: str) -> int: + """Extract session number from session key.""" + return int(session_key.split("_")[1]) + + +def parse_test_file(path: str) -> List[Dict[str, Any]]: + """Parse txt test file into sessions. + + Each session is a dict with: + - messages: list of user message strings + """ + with open(path, "r", encoding="utf-8") as f: + content = f.read() + + raw_sessions = content.split("---\n") + sessions = [] + + for raw in raw_sessions: + lines = [line for line in raw.strip().splitlines() if line.strip()] + if not lines: + continue + + messages = [] + for line in lines: + if not line.startswith("eval:"): # Skip eval lines + messages.append(line) + + if messages: + sessions.append({"messages": messages}) + + return sessions + + +def load_locomo_data( + path: str, + sample_index: Optional[int] = None, +) -> List[Dict[str, Any]]: + """Load LoCoMo JSON and optionally filter to one sample.""" + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + + if sample_index is not None: + if sample_index < 0 or sample_index >= len(data): + raise ValueError(f"Sample index {sample_index} out of range (0-{len(data) - 1})") + return [data[sample_index]] + return data + + +def build_session_messages( + item: Dict[str, Any], + session_range: Optional[Tuple[int, int]] = None, +) -> List[Dict[str, Any]]: + """Build session messages for one LoCoMo sample. + + Returns list of dicts with keys: messages, meta. + Each dict represents a session with multiple messages (user/assistant role). + """ + conv = item["conversation"] + speakers = f"{conv['speaker_a']} & {conv['speaker_b']}" + + session_keys = sorted( + [k for k in conv if k.startswith("session_") and not k.endswith("_date_time")], + key=_get_session_number, + ) + + sessions = [] + for sk in session_keys: + sess_num = _get_session_number(sk) + if session_range: + lo, hi = session_range + if sess_num < lo or sess_num > hi: + continue + + dt_key = f"{sk}_date_time" + date_time = conv.get(dt_key, "") + + # Extract messages with all as user role, including speaker in content + messages = [] + for idx, msg in enumerate(conv[sk]): + speaker = msg.get("speaker", "unknown") + text = msg.get("text", "") + messages.append( + {"role": "user", "text": f"[{speaker}]: {text}", "speaker": speaker, "index": idx} + ) + + sessions.append( + { + "messages": messages, + "meta": { + "sample_id": item["sample_id"], + "session_key": sk, + "date_time": date_time, + "speakers": speakers, + }, + } + ) + + return sessions + + +# --------------------------------------------------------------------------- +# Ingest record helpers (avoid duplicate ingestion) +# --------------------------------------------------------------------------- + + +def load_success_csv(csv_path: str = "./result/import_success.csv") -> set: + """加载成功导入的CSV记录,返回已成功的键集合""" + success_keys = set() + if Path(csv_path).exists(): + with open(csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + key = f"viking:{row['sample_id']}:{row['session']}" + success_keys.add(key) + return success_keys + + +def write_success_record( + record: Dict[str, Any], csv_path: str = "./result/import_success.csv" +) -> None: + """写入成功记录到CSV文件""" + file_exists = Path(csv_path).exists() + fieldnames = [ + "timestamp", + "sample_id", + "session", + "date_time", + "speakers", + "embedding_tokens", + "vlm_tokens", + "llm_input_tokens", + "llm_output_tokens", + "total_tokens", + ] + + with open(csv_path, "a", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + if not file_exists: + writer.writeheader() + + writer.writerow( + { + "timestamp": record["timestamp"], + "sample_id": record["sample_id"], + "session": record["session"], + "date_time": record.get("meta", {}).get("date_time", ""), + "speakers": record.get("meta", {}).get("speakers", ""), + "embedding_tokens": record["token_usage"].get("embedding", 0), + "vlm_tokens": record["token_usage"].get("vlm", 0), + "llm_input_tokens": record["token_usage"].get("llm_input", 0), + "llm_output_tokens": record["token_usage"].get("llm_output", 0), + "total_tokens": record["token_usage"].get("total", 0), + } + ) + + +def write_error_record( + record: Dict[str, Any], error_path: str = "./result/import_errors.log" +) -> None: + """写入错误记录到日志文件""" + with open(error_path, "a", encoding="utf-8") as f: + timestamp = record["timestamp"] + sample_id = record["sample_id"] + session = record["session"] + error = record["error"] + f.write(f"[{timestamp}] ERROR [{sample_id}/{session}]: {error}\n") + + +def is_already_ingested( + sample_id: str | int, + session_key: str, + success_keys: Optional[set] = None, +) -> bool: + """Check if a specific session has already been successfully ingested.""" + key = f"viking:{sample_id}:{session_key}" + return success_keys is not None and key in success_keys + + +# --------------------------------------------------------------------------- +# OpenViking import +# --------------------------------------------------------------------------- +def _parse_token_usage(commit_result: Dict[str, Any]) -> Dict[str, int]: + """解析Token使用数据(从commit返回的telemetry或task result中提取)""" + # 尝试从 task result 中提取(task 完成后包含完整 token_usage) + if "result" in commit_result: + result = commit_result["result"] + if "token_usage" in result: + tu = result["token_usage"] + embedding = tu.get("embedding", {}) + llm = tu.get("llm", {}) + # embedding 格式可能是 {"total": N} 或 {"total_tokens": N} + embed_total = embedding.get("total", embedding.get("total_tokens", 0)) + llm_total = llm.get("total", llm.get("total_tokens", 0)) + return { + "embedding": embed_total, + "vlm": llm_total, + "llm_input": llm.get("input", 0), + "llm_output": llm.get("output", 0), + "total": tu.get("total", {}).get("total_tokens", embed_total + llm_total), + } + + # 从 commit 响应的 telemetry 中提取 + telemetry = commit_result.get("telemetry", {}).get("summary", {}) + tokens = telemetry.get("tokens", {}) + return { + "embedding": tokens.get("embedding", {}).get("total", 0), + "vlm": tokens.get("llm", {}).get("total", 0), + "llm_input": tokens.get("llm", {}).get("input", 0), + "llm_output": tokens.get("llm", {}).get("output", 0), + "total": tokens.get("total", 0), + } + + +async def viking_ingest( + messages: List[Dict[str, Any]], + openviking_url: str, + session_time: Optional[str] = None, + user_id: Optional[str] = None, + agent_id: Optional[str] = None, +) -> Dict[str, int]: + """Save messages to OpenViking via OpenViking SDK client. + Returns token usage dict with embedding and vlm token counts. + + Args: + messages: List of message dicts with role and text + openviking_url: OpenViking service URL + session_time: Session time string (e.g., "9:36 am on 2 April, 2023") + user_id: User identifier for separate userspace (e.g., "conv-26") + agent_id: Agent identifier for separate agentspace (e.g., "conv-26") + """ + # 解析 session_time - 为每条消息计算递增的时间戳 + base_datetime = None + if session_time: + try: + base_datetime = datetime.strptime(session_time, "%I:%M %p on %d %B, %Y") + except ValueError: + print(f"Warning: Failed to parse session_time: {session_time}", file=sys.stderr) + + # Create client + client_kwargs = {"url": openviking_url} + if user_id is not None: + client_kwargs["user"] = user_id + if agent_id is not None: + client_kwargs["agent_id"] = agent_id + client = ov.AsyncHTTPClient(**client_kwargs) + await client.initialize() + + try: + # Create session + create_res = await client.create_session() + session_id = create_res["session_id"] + + # Add messages one by one with created_at + for idx, msg in enumerate(messages): + msg_created_at = None + if base_datetime: + # 每条消息递增1秒,确保时间顺序 + msg_dt = base_datetime + timedelta(seconds=idx) + msg_created_at = msg_dt.isoformat() + + await client.add_message( + session_id=session_id, + role=msg["role"], + parts=[{"type": "text", "text": msg["text"]}], + created_at=msg_created_at, + ) + + # Commit + result = await client.commit_session(session_id, telemetry=True) + + # Accept both "committed" and "accepted" as success - accepted means the session was archived + if result.get("status") not in ("committed", "accepted"): + raise RuntimeError(f"Commit failed: {result}") + + # 等待 task 完成以获取准确 token 消耗 + task_id = result.get("task_id") + if task_id: + # 轮询任务状态直到完成 + max_attempts = 1200 # 最多等待20分钟 + for attempt in range(max_attempts): + task = await client.get_task(task_id) + status = task.get("status") if task else "unknown" + if status == "completed": + token_usage = _parse_token_usage(task) + break + elif status in ("failed", "cancelled", "unknown"): + raise RuntimeError(f"Task {task_id} {status}: {task}") + await asyncio.sleep(1) + else: + raise RuntimeError(f"Task {task_id} timed out after {max_attempts} attempts") + else: + token_usage = {"embedding": 0, "vlm": 0, "total": 0} + + # Get trace_id from commit result + trace_id = result.get("trace_id", "") + return {"token_usage": token_usage, "task_id": task_id, "trace_id": trace_id} + + finally: + await client.close() + + +def parse_session_range(s: str) -> Tuple[int, int]: + """Parse '1-4' or '3' into (lo, hi) inclusive tuple.""" + if "-" in s: + lo, hi = s.split("-", 1) + return int(lo), int(hi) + n = int(s) + return n, n + + +async def process_single_session( + messages: List[Dict[str, Any]], + sample_id: str | int, + session_key: str, + meta: Dict[str, Any], + run_time: str, + args: argparse.Namespace, +) -> Dict[str, Any]: + """处理单个会话的导入任务""" + try: + # 根据参数决定是否使用 sample_id 作为 user_id 和 agent_id + user_id = str(sample_id) if not args.no_user_agent_id else None + agent_id = str(sample_id) if not args.no_user_agent_id else None + result = await viking_ingest( + messages, + args.openviking_url, + meta.get("date_time"), + user_id=user_id, + agent_id=agent_id, + ) + token_usage = result["token_usage"] + task_id = result.get("task_id") + trace_id = result.get("trace_id", "") + embedding_tokens = token_usage.get("embedding", 0) + vlm_tokens = token_usage.get("vlm", 0) + print( + f" -> [COMPLETED] [{sample_id}/{session_key}] embed={embedding_tokens}, vlm={vlm_tokens}, task_id={task_id}, trace_id={trace_id}", + file=sys.stderr, + ) + + # Write success record + result = { + "timestamp": run_time, + "sample_id": sample_id, + "session": session_key, + "status": "success", + "meta": meta, + "token_usage": token_usage, + "embedding_tokens": embedding_tokens, + "vlm_tokens": vlm_tokens, + "task_id": task_id, + "trace_id": trace_id, + } + + # 写入成功CSV + write_success_record(result, args.success_csv) + + return result + + except Exception as e: + print(f" -> [ERROR] [{sample_id}/{session_key}] {e}", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + + # Write error record + result = { + "timestamp": run_time, + "sample_id": sample_id, + "session": session_key, + "status": "error", + "error": str(e), + } + + # 写入错误日志 + write_error_record(result, args.error_log) + + return result + + +async def run_import(args: argparse.Namespace) -> None: + session_range = parse_session_range(args.sessions) if args.sessions else None + + # 如果指定了 question-index,自动从 evidence 推断需要的 session + if args.question_index is not None and not args.sessions: + # 加载数据获取 question 的 evidence + with open(args.input, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取 sample + sample_idx = args.sample if args.sample is not None else 0 + if sample_idx < 0 or sample_idx >= len(data): + raise ValueError(f"sample index {sample_idx} out of range") + sample = data[sample_idx] + + # 获取 question 的 evidence + qa_items = sample.get("qa", []) + if args.question_index < 0 or args.question_index >= len(qa_items): + raise ValueError(f"question index {args.question_index} out of range") + qa = qa_items[args.question_index] + evidence_list = qa.get("evidence", []) + + # 从 evidence 提取 session 号 (D1:3 -> session 1) + session_nums = set() + for ev in evidence_list: + try: + # D1:3 -> session 1 + sess_num = int(ev.split(":")[0][1:]) + session_nums.add(sess_num) + except (ValueError, IndexError): + pass + + if session_nums: + min_sess = min(session_nums) + max_sess = max(session_nums) + session_range = (min_sess, max_sess) + print( + f"[INFO] Auto-detected sessions from evidence: {min_sess}-{max_sess}", + file=sys.stderr, + ) + + # 加载成功CSV记录用于去重 + success_keys = set() + if not args.force_ingest: + success_keys = load_success_csv(args.success_csv) + print( + f"[INFO] Loaded {len(success_keys)} existing success records from {args.success_csv}", + file=sys.stderr, + ) + + # Write run header + run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + skipped_count = 0 + success_count = 0 + error_count = 0 + total_embedding_tokens = 0 + total_vlm_tokens = 0 + + if args.input.endswith(".json"): + # LoCoMo JSON format + samples = load_locomo_data(args.input, args.sample) + + # 为每个 sample 创建独立的处理协程 + async def process_sample(item): + sample_id = item["sample_id"] + sessions = build_session_messages(item, session_range) + + print(f"\n=== Sample {sample_id} ===", file=sys.stderr) + print(f" {len(sessions)} session(s) to import", file=sys.stderr) + + # 同一 sample 内串行处理所有 sessions + for sess in sessions: + meta = sess["meta"] + messages = sess["messages"] + session_key = meta["session_key"] + label = f"{session_key} ({meta['date_time']})" + + # Skip already ingested sessions unless force-ingest is enabled + if not args.force_ingest and is_already_ingested( + sample_id, session_key, success_keys + ): + print( + f" [{label}] [SKIP] already imported (use --force-ingest to reprocess)", + file=sys.stderr, + ) + continue + + # Preview messages + preview = " | ".join( + [f"{msg['role']}: {msg['text'][:30]}..." for msg in messages[:3]] + ) + print(f" [{label}] {preview}", file=sys.stderr) + + # 串行执行(等待完成后再处理下一个 session) + await process_single_session( + messages=messages, + sample_id=sample_id, + session_key=session_key, + meta=meta, + run_time=run_time, + args=args, + ) + + # 不同 sample 之间并行执行 + tasks = [asyncio.create_task(process_sample(item)) for item in samples] + results = await asyncio.gather(*tasks, return_exceptions=True) + + else: + # Plain text format + sessions = parse_test_file(args.input) + print(f"Found {len(sessions)} session(s) in text file", file=sys.stderr) + + for idx, session in enumerate(sessions, start=1): + session_key = f"txt-session-{idx}" + print(f"\n=== Text Session {idx} ===", file=sys.stderr) + + # Skip already ingested sessions unless force-ingest is enabled + if not args.force_ingest and is_already_ingested( + "txt", session_key, success_keys + ): + print( + f" [SKIP] already imported (use --force-ingest to reprocess)", file=sys.stderr + ) + skipped_count += 1 + continue + + # For plain text, all messages as user role + messages = [] + for i, text in enumerate(session["messages"]): + messages.append( + {"role": "user", "text": text.strip(), "speaker": "user", "index": i} + ) + + preview = " | ".join([f"{msg['role']}: {msg['text'][:30]}..." for msg in messages[:3]]) + print(f" {preview}", file=sys.stderr) + + # 创建异步任务 + task = asyncio.create_task( + process_single_session( + messages=messages, + sample_id="txt", + session_key=session_key, + meta={"session_index": idx}, + run_time=run_time, + args=args, + ) + ) + tasks.append(task) + + # 等待所有 sample 处理完成 + print( + f"\n[INFO] Starting import with {len(tasks)} tasks to process", + file=sys.stderr, + ) + await asyncio.gather(*tasks, return_exceptions=True) + + # 从成功 CSV 统计结果 + if Path(args.success_csv).exists(): + with open(args.success_csv, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + success_count += 1 + total_embedding_tokens += int(row.get("embedding_tokens", 0) or 0) + total_vlm_tokens += int(row.get("vlm_tokens", 0) or 0) + + # Final summary + total_processed = success_count + error_count + skipped_count + print(f"\n=== Import summary ===", file=sys.stderr) + print(f"Total sessions: {total_processed}", file=sys.stderr) + print(f"Successfully imported: {success_count}", file=sys.stderr) + print(f"Failed: {error_count}", file=sys.stderr) + print(f"Skipped (already imported): {skipped_count}", file=sys.stderr) + print(f"\n=== Token usage summary ===", file=sys.stderr) + print(f"Total Embedding tokens: {total_embedding_tokens}", file=sys.stderr) + print(f"Total VLM tokens: {total_vlm_tokens}", file=sys.stderr) + if success_count > 0: + print( + f"Average Embedding per session: {total_embedding_tokens // success_count}", + file=sys.stderr, + ) + print(f"Average VLM per session: {total_vlm_tokens // success_count}", file=sys.stderr) + print(f"\nResults saved to:", file=sys.stderr) + print(f" - Success records: {args.success_csv}", file=sys.stderr) + print(f" - Error logs: {args.error_log}", file=sys.stderr) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(): + # 基于脚本所在目录计算默认数据文件路径 + script_dir = Path(__file__).parent.resolve() + default_input = str(script_dir / ".." / "data" / "locomo10.json") + + parser = argparse.ArgumentParser(description="Import conversations into OpenViking") + parser.add_argument( + "--input", + default=default_input, + help="Path to input file (.txt or LoCoMo .json)", + ) + parser.add_argument( + "--success-csv", + default="./result/import_success.csv", + help="Path to success records CSV file (default: import_success.csv)", + ) + parser.add_argument( + "--error-log", + default="./result/import_errors.log", + help="Path to error log file (default: import_errors.log)", + ) + parser.add_argument( + "--openviking-url", + default="http://localhost:1933", + help="OpenViking service URL (default: http://localhost:1933)", + ) + parser.add_argument( + "--sample", + type=int, + default=None, + help="LoCoMo JSON: sample index (0-based). Default: all samples.", + ) + parser.add_argument( + "--sessions", + default=None, + help="LoCoMo JSON: session range, e.g. '1-4' or '3'. Default: all sessions.", + ) + parser.add_argument( + "--question-index", + type=int, + default=None, + help="LoCoMo JSON: question index (0-based). When specified, auto-detect required sessions from question's evidence.", + ) + parser.add_argument( + "--force-ingest", + action="store_true", + default=False, + help="Force re-import even if already recorded as completed", + ) + parser.add_argument( + "--no-user-agent-id", + action="store_true", + default=False, + help="Do not pass user_id and agent_id to OpenViking client", + ) + args = parser.parse_args() + + # 确保输出目录存在 + Path(args.success_csv).parent.mkdir(parents=True, exist_ok=True) + Path(args.error_log).parent.mkdir(parents=True, exist_ok=True) + + try: + asyncio.run(run_import(args)) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh index a14cfa413..f8ca932e2 100755 --- a/benchmark/locomo/openclaw/run_full_eval.sh +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -6,9 +6,11 @@ set -e OpenClaw 完整评估流程脚本 用法: - ./run_full_eval.sh # 只导入 OpenViking - ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw - ./run_full_eval.sh --skip-import # 跳过导入步骤 + ./run_full_eval.sh # 只导入 OpenViking (所有 samples) + ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw (所有 samples) + ./run_full_eval.sh --skip-import # 跳过导入步骤 (所有 samples) + ./run_full_eval.sh --sample 0 # 只处理第 0 个 sample + ./run_full_eval.sh --sample 1 --with-claw-import # 只处理第 1 个 sample,同时导入 OpenClaw ' # 基于脚本所在目录计算数据文件路径 @@ -16,20 +18,47 @@ SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" INPUT_FILE="$SCRIPT_DIR/../data/locomo10.json" RESULT_DIR="$SCRIPT_DIR/result" OUTPUT_CSV="$RESULT_DIR/qa_results.csv" -GATEWAY_TOKEN="your_gateway_token" +GATEWAY_TOKEN="90f2d2dc2f7b4d50cb943d3d3345e667bb3e9bcb7ec3a1fb" # 解析参数 SKIP_IMPORT=false WITH_CLAW_IMPORT=false -for arg in "$@"; do - if [ "$arg" = "--skip-import" ]; then - SKIP_IMPORT=true - elif [ "$arg" = "--with-claw-import" ]; then - WITH_CLAW_IMPORT=true - fi +SAMPLE_IDX="" + +while [[ $# -gt 0 ]]; do + case $1 in + --skip-import) + SKIP_IMPORT=true + shift + ;; + --with-claw-import) + WITH_CLAW_IMPORT=true + shift + ;; + --sample) + if [ -z "$2" ] || [[ "$2" == --* ]]; then + echo "错误: --sample 需要一个参数 (sample index, 0-based)" + exit 1 + fi + SAMPLE_IDX="$2" + shift 2 + ;; + *) + echo "警告: 未知参数 $1" + shift + ;; + esac done +# 构建 sample 参数 +SAMPLE_ARG="" +if [ -n "$SAMPLE_IDX" ]; then + SAMPLE_ARG="--sample $SAMPLE_IDX" + # 如果指定了 sample,修改输出文件名以避免覆盖 + OUTPUT_CSV="$RESULT_DIR/qa_results_sample${SAMPLE_IDX}.csv" +fi + # 确保结果目录存在 mkdir -p "$RESULT_DIR" @@ -39,18 +68,18 @@ if [ "$SKIP_IMPORT" = false ]; then echo "[1/5] 导入数据到 OpenViking 和 OpenClaw..." # 后台运行 OpenViking 导入 - python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest > "$RESULT_DIR/import_ov.log" 2>&1 & + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest $SAMPLE_ARG > "$RESULT_DIR/import_ov.log" 2>&1 & PID_OV=$! # 后台运行 OpenClaw 导入 - python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" > "$RESULT_DIR/import_claw.log" 2>&1 & + python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" $SAMPLE_ARG > "$RESULT_DIR/import_claw.log" 2>&1 & PID_CLAW=$! # 等待两个导入任务完成 wait $PID_OV $PID_CLAW else echo "[1/5] 导入数据到 OpenViking..." - python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest + python "$SCRIPT_DIR/import_to_ov.py" --no-user-agent-id --input "$INPUT_FILE" --force-ingest $SAMPLE_ARG fi echo "导入完成,等待 1 分钟..." @@ -61,7 +90,7 @@ fi # Step 2: 运行 QA 模型(默认输出到 result/qa_results.csv) echo "[2/5] 运行 QA 评估..." -python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" +python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" $SAMPLE_ARG --output "${OUTPUT_CSV%.csv}" # Step 3: 裁判打分 echo "[3/5] 裁判打分..." @@ -69,7 +98,7 @@ python "$SCRIPT_DIR/judge.py" --input "$OUTPUT_CSV" --parallel 40 # Step 4: 计算结果 echo "[4/5] 计算结果..." -python "$SCRIPT_DIR/../vikingbot/stat_judge_result.py" --input "$OUTPUT_CSV" +python "$SCRIPT_DIR/stat_judge_result.py" --input "$OUTPUT_CSV" echo "[5/5] 完成!" echo "结果文件: $OUTPUT_CSV" diff --git a/benchmark/locomo/openclaw/stat_judge_result.py b/benchmark/locomo/openclaw/stat_judge_result.py new file mode 100644 index 000000000..013f94840 --- /dev/null +++ b/benchmark/locomo/openclaw/stat_judge_result.py @@ -0,0 +1,88 @@ +import argparse +import csv +import os + + +def main(): + parser = argparse.ArgumentParser(description="Statistics for judge result csv") + parser.add_argument( + "--input", + default="./result/qa_results_sample0.csv", + help="Path to judge result csv file, default: ./result/qa_results_sample0.csv", + ) + args = parser.parse_args() + + if not os.path.exists(args.input): + print(f"Error: File not found: {args.input}") + exit(1) + + # 统计所有题目 (排除 category=5) + correct = 0 + wrong = 0 + total_input_tokens = 0 + total_output_tokens = 0 + total_tokens = 0 + valid_rows = 0 + + with open(args.input, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + # 检查 category 是否为 5,跳过 + category = row.get("category", "") + if category == "5": + continue + + valid_rows += 1 + + # 统计结果 + result = row.get("result", "").strip().upper() + if result == "CORRECT": + correct += 1 + elif result == "WRONG": + wrong += 1 + + # 统计token + try: + total_input_tokens += int(row.get("input_tokens", 0)) + total_output_tokens += int(row.get("output_tokens", 0)) + total_tokens += int(row.get("total_tokens", 0)) + except (ValueError, TypeError): + pass + + total_graded = correct + wrong + accuracy = correct / total_graded if total_graded > 0 else 0.0 + + # 平均 token 消耗 + avg_input_tokens = total_input_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_output_tokens = total_output_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_total_tokens = total_tokens / valid_rows if valid_rows > 0 else 0.0 + + output_lines = [ + "=== Judge Result Statistics (excluding category=5) ===", + f"Total rows: {valid_rows}", + f"Graded rows: {total_graded}", + f"Correct: {correct}", + f"Wrong: {wrong}", + f"Accuracy: {accuracy:.2%}", + f"\nToken usage:", + f" Total input tokens: {total_input_tokens}", + f" Total output tokens: {total_output_tokens}", + f" Total tokens: {total_tokens}", + f" Avg input tokens: {avg_input_tokens:.2f}", + f" Avg output tokens: {avg_output_tokens:.2f}", + f" Avg total tokens: {avg_total_tokens:.2f}", + ] + + # 打印到控制台 + for line in output_lines: + print(line) + + # 写入summary.txt + summary_path = os.path.join(os.path.dirname(args.input), "summary.txt") + with open(summary_path, "w", encoding="utf-8") as f: + f.write("\n".join(output_lines) + "\n") + print(f"\nSummary saved to {summary_path}") + + +if __name__ == "__main__": + main() From 00c02d788b6037e231445bcf60e399feb3540d2a Mon Sep 17 00:00:00 2001 From: DuTao Date: Wed, 8 Apr 2026 20:34:13 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=AF=84=E6=B5=8B?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/locomo/README.md | 131 +++++++++++++++-- benchmark/locomo/openclaw/eval.py | 66 ++++++++- benchmark/locomo/openclaw/import_to_ov.py | 2 + benchmark/locomo/openclaw/run_full_eval.sh | 20 ++- .../locomo/openclaw/stat_judge_result.py | 139 +++++++++++++----- 5 files changed, 302 insertions(+), 56 deletions(-) diff --git a/benchmark/locomo/README.md b/benchmark/locomo/README.md index 9b85563b2..8a0737745 100644 --- a/benchmark/locomo/README.md +++ b/benchmark/locomo/README.md @@ -16,9 +16,12 @@ benchmark/locomo/ │ ├── data/ # 测试数据目录 │ └── result/ # 评测结果目录 └── openclaw/ # OpenClaw 评测脚本 - ├── eval.py # OpenClaw 评估脚本 + ├── import_to_ov.py # 导入数据到 OpenViking + ├── eval.py # OpenClaw 评估脚本 (ingest/qa) ├── judge.py # LLM 裁判打分(适配 OpenClaw) + ├── stat_judge_result.py # 统计评分结果和 token 使用 ├── run_full_eval.sh # 一键运行完整评测流程 + ├── data/ # 测试数据目录 └── result/ # 评测结果目录 ``` @@ -158,19 +161,28 @@ python stat_judge_result.py --input <评分结果文件> ```bash cd benchmark/locomo/openclaw -bash run_full_eval.sh # 只导入 OpenViking +bash run_full_eval.sh # 只导入 OpenViking(跳过已导入的) bash run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw(并行执行) bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 QA 评估 +bash run_full_eval.sh --force-ingest # 强制重新导入所有数据 +bash run_full_eval.sh --sample 0 # 只处理第 0 个 sample ``` **脚本参数说明:** +| 参数 | 说明 | +|------|------| +| `--skip-import` | 跳过导入步骤,直接运行 QA 评估 | +| `--with-claw-import` | 同时导入 OpenViking 和 OpenClaw(并行执行) | +| `--force-ingest` | 强制重新导入所有数据(忽略已导入记录) | +| `--sample ` | 只处理指定的 sample(0-based) | + **脚本执行流程:** 1. 导入数据到 OpenViking(可选同时导入 OpenClaw) 2. 等待 60 秒确保数据导入完成 3. 运行 QA 评估(`eval.py qa`,输出到 `result/qa_results.csv`) 4. 裁判打分(`judge.py`,并行度 40) -5. 统计结果(`stat_judge_result.py`) +5. 统计结果(`stat_judge_result.py`,同时统计 QA 和 Import 的 token 使用) **脚本内部配置参数:** @@ -184,9 +196,50 @@ bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 Q ### 分步使用说明 -使用 `openclaw/eval.py` 进行 OpenClaw 评测,该脚本有两种模式: +OpenClaw 评测包含以下脚本: +- `import_to_ov.py`: 导入数据到 OpenViking +- `eval.py`: OpenClaw 评估脚本(ingest/qa 两种模式) +- `judge.py`: LLM 裁判打分 +- `stat_judge_result.py`: 统计评分结果和 token 使用 + +--- + +#### import_to_ov.py - 导入对话数据到 OpenViking + +```bash +python import_to_ov.py [选项] +``` -### 模式 1: ingest - 导入对话数据到OpenClaw +**参数说明:** +- `--input`: 输入文件路径(JSON 或 TXT),默认 `../data/locomo10.json` +- `--sample`: 指定样本索引(0-based) +- `--sessions`: 指定会话范围,如 `1-4` +- `--question-index`: 根据 question 的 evidence 自动推断需要的 session +- `--force-ingest`: 强制重新导入 +- `--no-user-agent-id`: 不传入 user_id 和 agent_id 给 OpenViking 客户端 +- `--openviking-url`: OpenViking 服务地址,默认 `http://localhost:1933` +- `--success-csv`: 成功记录 CSV 路径,默认 `./result/import_success.csv` +- `--error-log`: 错误日志路径,默认 `./result/import_errors.log` + +**示例:** +```bash +# 导入所有数据(跳过已导入的) +python import_to_ov.py + +# 强制重新导入,不使用 user/agent id +python import_to_ov.py --force-ingest --no-user-agent-id + +# 只导入第 0 个 sample +python import_to_ov.py --sample 0 +``` + +--- + +#### eval.py - OpenClaw 评估脚本 + +该脚本有两种模式: + +##### 模式 1: ingest - 导入对话数据到 OpenClaw ```bash python eval.py ingest <输入文件> [选项] @@ -195,37 +248,83 @@ python eval.py ingest <输入文件> [选项] **参数说明:** - `--sample`: 指定样本索引 - `--sessions`: 指定会话范围,如 `1-4` -- `--viking`: 使用 OpenViking 而非 OpenClaw 导入 - `--force-ingest`: 强制重新导入 - `--agent-id`: Agent ID,默认 `locomo-eval` +- `--token`: OpenClaw Gateway Token **示例:** ```bash # 导入第一个样本的 1-4 会话到 OpenClaw -python eval.py ingest locomo10.json --sample 0 --sessions 1-4 - -# 导入到 OpenViking -python eval.py ingest locomo10.json --sample 0 --viking +python eval.py ingest locomo10.json --sample 0 --sessions 1-4 --token ``` -### 模式 2: qa - 运行 QA 评估 -- 该评测制定了指定了`X-OpenClaw-Session-Key`,确保每次openclaw使用相同的session_id。Token计算将统计`session.jsonl`文件中的所有assistant轮次的Token消耗。每道题目执行完后会清空session.jsonl文件。 -- 该评测仅支持单线程运行,不支持并发。 -- 需先执行一次,查看`.openclaw/agents/{your_agent_id}/sessions/`下的session文件ID,作为`--session-id`参数的值开始完整评测。 +##### 模式 2: qa - 运行 QA 评估 + +- 该评测指定了 `X-OpenClaw-Session-Key`,确保每次 OpenClaw 使用相同的 session_id +- Token 计算统计 `session.jsonl` 文件中的所有 assistant 轮次的 Token 消耗 +- 每道题目执行完后会归档 session 文件 +- 支持并发运行(`--parallel` 参数) +- 问题会自动添加时间上下文(从最后一个 session 提取) + ```bash python eval.py qa <输入文件> [选项] ``` **参数说明:** -- `--output`: 输出文件路径 +- `--output`: 输出文件路径(不含 .csv 后缀) - `--sample`: 指定样本索引 - `--count`: 运行的 QA 问题数量 - `--user`: 用户 ID,默认 `eval-1` +- `--parallel`: 并发数,默认 10,最大 40 - `--token`: OpenClaw Gateway Token(或设置 `OPENCLAW_GATEWAY_TOKEN` 环境变量) **示例:** ```bash -python eval.py qa locomo10.json --sample 0 --output qa_results.txt +# 运行所有 sample 的 QA 评估 +python eval.py qa locomo10.json --token --parallel 15 + +# 只运行第 0 个 sample +python eval.py qa locomo10.json --sample 0 --output qa_results_sample0 +``` + +--- + +#### judge.py - LLM 裁判打分 + +```bash +python judge.py [选项] +``` + +**参数说明:** +- `--input`: QA 结果 CSV 文件路径 +- `--parallel`: 并发请求数,默认 40 + +**示例:** +```bash +python judge.py --input ./result/qa_results.csv --parallel 40 +``` + +--- + +#### stat_judge_result.py - 统计结果 + +同时统计 QA 结果和 OpenViking Import 的 token 使用: + +```bash +python stat_judge_result.py [选项] +``` + +**参数说明:** +- `--input`: QA 结果 CSV 文件路径,默认 `./result/qa_results_sample0.csv` +- `--import-csv`: Import 成功 CSV 文件路径,默认 `./result/import_success.csv` + +**输出统计包括:** +- QA 结果统计:正确率、token 使用(no-cache、cacheRead、output) +- OpenViking Import 统计:embedding_tokens、vlm_tokens、total_tokens + +**示例:** +```bash +python stat_judge_result.py --input ./result/qa_results_sample0.csv --import-csv ./result/import_success.csv ``` --- diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 317004ac5..4bc323da9 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -23,6 +23,7 @@ import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime from pathlib import Path from threading import Lock @@ -30,7 +31,7 @@ # Configuration constants DEFAULT_BASE_URL = "http://127.0.0.1:18789" -DEFAULT_AGENT_ID = "main" +DEFAULT_AGENT_ID = "locomo-eval" DEFAULT_INGEST_RECORD_PATH = ".ingest_record.json" # CSV write lock for thread safety @@ -184,6 +185,56 @@ def build_session_messages( return sessions +# --------------------------------------------------------------------------- +# Question time helpers +# --------------------------------------------------------------------------- + +def parse_locomo_datetime(date_str: str) -> datetime | None: + """解析 LoCoMo 时间格式,如 '1:56 pm on 8 May, 2023'""" + try: + # 移除时间部分,只保留日期 "8 May, 2023" + if " on " in date_str: + date_part = date_str.split(" on ")[-1] + return datetime.strptime(date_part.strip(), "%d %B, %Y") + except ValueError: + pass + return None + + +def get_sample_question_time(sample: dict) -> str | None: + """从 sample 的 conversation 中提取最后一个有内容 session 的时间,返回 ISO 格式日期""" + conversation = sample.get("conversation", {}) + + # 找所有 session_N 字段(非 date_time) + session_keys = [ + k for k in conversation.keys() if k.startswith("session_") and "date_time" not in k + ] + if not session_keys: + return None + + # 按 session 编号排序,找到最后一个有内容的 + def get_session_num(key): + try: + return int(key.replace("session_", "")) + except ValueError: + return 0 + + session_keys.sort(key=get_session_num, reverse=True) + + for session_key in session_keys: + if conversation.get(session_key): # 有内容 + # 找到对应的 date_time + session_num = get_session_num(session_key) + dt_key = f"session_{session_num}_date_time" + date_str = conversation.get(dt_key) + if date_str: + dt = parse_locomo_datetime(date_str) + if dt: + return dt.strftime("%Y-%m-%d") + + return None + + # --------------------------------------------------------------------------- # Ingest record helpers (avoid duplicate ingestion) # --------------------------------------------------------------------------- @@ -615,6 +666,7 @@ def process_single_question( qa: dict, args: argparse.Namespace, csv_path: str, + question_time: str | None = None, ) -> dict: """Process a single QA question. Returns the record.""" question = qa["question"] @@ -627,11 +679,16 @@ def process_single_question( user_key = args.user or f"eval-{sample_idx}" print(f" [{sample_idx}] Q{original_qi}: {question[:60]}{'...' if len(question) > 60 else ''}", file=sys.stderr) + # 如果有 question_time,注入到 prompt 中 + if question_time: + input_msg = f"Current date: {question_time}. Answer the question directly: {question}" + else: + input_msg = f"Answer the question directly: {question}" jsonl_filename = "" try: response, api_usage = send_message_with_retry( - args.base_url, args.token, sample_id, question, 2, args.agent_id, session_key + args.base_url, args.token, sample_id, input_msg, 2, args.agent_id, session_key ) print(f" [{sample_idx}] A: {response[:60]}{'...' if len(response) > 60 else ''}", file=sys.stderr) @@ -695,6 +752,7 @@ def run_sample_qa( """Process QA for a single sample with concurrent question execution. Returns (records, sample_usage).""" sample_id = item["sample_id"] user_key = args.user or f"eval-{sample_idx}" + question_time = get_sample_question_time(item) qas = [q for q in item.get("qa", []) if str(q.get("category", "")) != "5"] if args.count is not None: qas = qas[:args.count] @@ -714,6 +772,8 @@ def run_sample_qa( return [], {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} print(f"\n=== Sample {sample_id} [{sample_idx}] (user={user_key}) ===", file=sys.stderr) + if question_time: + print(f" Question time context: {question_time}", file=sys.stderr) print(f" Running {len(qas)} QA question(s) with max {args.parallel} workers...", file=sys.stderr) records = [] @@ -725,7 +785,7 @@ def run_sample_qa( for original_qi, qa in qas: future = executor.submit( process_single_question, - sample_id, sample_idx, original_qi, qa, args, csv_path + sample_id, sample_idx, original_qi, qa, args, csv_path, question_time ) futures.append(future) diff --git a/benchmark/locomo/openclaw/import_to_ov.py b/benchmark/locomo/openclaw/import_to_ov.py index adca77dc4..14a51cb21 100644 --- a/benchmark/locomo/openclaw/import_to_ov.py +++ b/benchmark/locomo/openclaw/import_to_ov.py @@ -489,6 +489,8 @@ async def process_sample(item): f" [{label}] [SKIP] already imported (use --force-ingest to reprocess)", file=sys.stderr, ) + nonlocal skipped_count + skipped_count += 1 continue # Preview messages diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh index f8ca932e2..3f1edff11 100755 --- a/benchmark/locomo/openclaw/run_full_eval.sh +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -11,6 +11,7 @@ OpenClaw 完整评估流程脚本 ./run_full_eval.sh --skip-import # 跳过导入步骤 (所有 samples) ./run_full_eval.sh --sample 0 # 只处理第 0 个 sample ./run_full_eval.sh --sample 1 --with-claw-import # 只处理第 1 个 sample,同时导入 OpenClaw + ./run_full_eval.sh --force-ingest # 强制重新导入所有数据 ' # 基于脚本所在目录计算数据文件路径 @@ -24,6 +25,7 @@ GATEWAY_TOKEN="90f2d2dc2f7b4d50cb943d3d3345e667bb3e9bcb7ec3a1fb" # 解析参数 SKIP_IMPORT=false WITH_CLAW_IMPORT=false +FORCE_INGEST=false SAMPLE_IDX="" while [[ $# -gt 0 ]]; do @@ -36,6 +38,10 @@ while [[ $# -gt 0 ]]; do WITH_CLAW_IMPORT=true shift ;; + --force-ingest) + FORCE_INGEST=true + shift + ;; --sample) if [ -z "$2" ] || [[ "$2" == --* ]]; then echo "错误: --sample 需要一个参数 (sample index, 0-based)" @@ -59,6 +65,12 @@ if [ -n "$SAMPLE_IDX" ]; then OUTPUT_CSV="$RESULT_DIR/qa_results_sample${SAMPLE_IDX}.csv" fi +# 构建 force-ingest 参数 +FORCE_INGEST_ARG="" +if [ "$FORCE_INGEST" = true ]; then + FORCE_INGEST_ARG="--force-ingest" +fi + # 确保结果目录存在 mkdir -p "$RESULT_DIR" @@ -68,18 +80,18 @@ if [ "$SKIP_IMPORT" = false ]; then echo "[1/5] 导入数据到 OpenViking 和 OpenClaw..." # 后台运行 OpenViking 导入 - python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest $SAMPLE_ARG > "$RESULT_DIR/import_ov.log" 2>&1 & + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" $FORCE_INGEST_ARG $SAMPLE_ARG > "$RESULT_DIR/import_ov.log" 2>&1 & PID_OV=$! # 后台运行 OpenClaw 导入 - python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" $SAMPLE_ARG > "$RESULT_DIR/import_claw.log" 2>&1 & + python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" $FORCE_INGEST_ARG --token "$GATEWAY_TOKEN" $SAMPLE_ARG > "$RESULT_DIR/import_claw.log" 2>&1 & PID_CLAW=$! # 等待两个导入任务完成 wait $PID_OV $PID_CLAW else echo "[1/5] 导入数据到 OpenViking..." - python "$SCRIPT_DIR/import_to_ov.py" --no-user-agent-id --input "$INPUT_FILE" --force-ingest $SAMPLE_ARG + python "$SCRIPT_DIR/import_to_ov.py" --no-user-agent-id --input "$INPUT_FILE" $FORCE_INGEST_ARG $SAMPLE_ARG fi echo "导入完成,等待 1 分钟..." @@ -90,7 +102,7 @@ fi # Step 2: 运行 QA 模型(默认输出到 result/qa_results.csv) echo "[2/5] 运行 QA 评估..." -python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" $SAMPLE_ARG --output "${OUTPUT_CSV%.csv}" +python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" $SAMPLE_ARG --parallel 15 --output "${OUTPUT_CSV%.csv}" # Step 3: 裁判打分 echo "[3/5] 裁判打分..." diff --git a/benchmark/locomo/openclaw/stat_judge_result.py b/benchmark/locomo/openclaw/stat_judge_result.py index 013f94840..63816e004 100644 --- a/benchmark/locomo/openclaw/stat_judge_result.py +++ b/benchmark/locomo/openclaw/stat_judge_result.py @@ -10,21 +10,61 @@ def main(): default="./result/qa_results_sample0.csv", help="Path to judge result csv file, default: ./result/qa_results_sample0.csv", ) + parser.add_argument( + "--import-csv", + default="./result/import_success.csv", + help="Path to import_success.csv file for OpenViking token stats, default: ./result/import_success.csv", + ) args = parser.parse_args() - if not os.path.exists(args.input): - print(f"Error: File not found: {args.input}") - exit(1) + output_lines = [] + + # 统计 QA 结果 + if os.path.exists(args.input): + qa_stats = process_qa_results(args.input) + output_lines.extend(qa_stats) + else: + output_lines.append(f"Warning: QA result file not found: {args.input}") + # 统计 Import token + if os.path.exists(args.import_csv): + if output_lines: + output_lines.append("") + import_stats = process_import_csv(args.import_csv) + output_lines.extend(import_stats) + else: + output_lines.append(f"Warning: Import CSV file not found: {args.import_csv}") + + # 打印到控制台 + for line in output_lines: + print(line) + + # 写入summary.txt + if args.input: + summary_path = os.path.join(os.path.dirname(args.input), "summary.txt") + elif args.import_csv: + summary_path = os.path.join(os.path.dirname(args.import_csv), "summary.txt") + else: + summary_path = "./result/summary.txt" + + os.makedirs(os.path.dirname(summary_path), exist_ok=True) + with open(summary_path, "w", encoding="utf-8") as f: + f.write("\n".join(output_lines) + "\n") + print(f"\nSummary saved to {summary_path}") + + +def process_qa_results(input_path: str) -> list[str]: + """处理 QA 结果 CSV""" # 统计所有题目 (排除 category=5) correct = 0 wrong = 0 - total_input_tokens = 0 - total_output_tokens = 0 - total_tokens = 0 + total_no_cache_tokens = 0 # input_tokens + total_cache_read_tokens = 0 # cacheRead + total_output_tokens = 0 # output_tokens + total_input_tokens = 0 # no_cache + cacheRead valid_rows = 0 - with open(args.input, "r", encoding="utf-8", newline="") as f: + with open(input_path, "r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: # 检查 category 是否为 5,跳过 @@ -43,9 +83,14 @@ def main(): # 统计token try: - total_input_tokens += int(row.get("input_tokens", 0)) - total_output_tokens += int(row.get("output_tokens", 0)) - total_tokens += int(row.get("total_tokens", 0)) + no_cache = int(row.get("input_tokens", 0)) + cache_read = int(row.get("cacheRead", 0)) + output = int(row.get("output_tokens", 0)) + + total_no_cache_tokens += no_cache + total_cache_read_tokens += cache_read + total_output_tokens += output + total_input_tokens += no_cache + cache_read except (ValueError, TypeError): pass @@ -53,35 +98,63 @@ def main(): accuracy = correct / total_graded if total_graded > 0 else 0.0 # 平均 token 消耗 - avg_input_tokens = total_input_tokens / valid_rows if valid_rows > 0 else 0.0 - avg_output_tokens = total_output_tokens / valid_rows if valid_rows > 0 else 0.0 - avg_total_tokens = total_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_no_cache = total_no_cache_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_cache_read = total_cache_read_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_output = total_output_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_total_input = total_input_tokens / valid_rows if valid_rows > 0 else 0.0 - output_lines = [ + return [ "=== Judge Result Statistics (excluding category=5) ===", - f"Total rows: {valid_rows}", - f"Graded rows: {total_graded}", - f"Correct: {correct}", - f"Wrong: {wrong}", + f"Total rows: {valid_rows:,}", + f"Graded rows: {total_graded:,}", + f"Correct: {correct:,}", + f"Wrong: {wrong:,}", f"Accuracy: {accuracy:.2%}", - f"\nToken usage:", - f" Total input tokens: {total_input_tokens}", - f" Total output tokens: {total_output_tokens}", - f" Total tokens: {total_tokens}", - f" Avg input tokens: {avg_input_tokens:.2f}", - f" Avg output tokens: {avg_output_tokens:.2f}", - f" Avg total tokens: {avg_total_tokens:.2f}", + f"\nToken usage (QA):", + f" Total no-cache tokens (input_tokens): {total_no_cache_tokens:,}", + f" Total cacheRead tokens: {total_cache_read_tokens:,}", + f" Total output tokens: {total_output_tokens:,}", + f" Total input tokens (no-cache + cacheRead): {total_input_tokens:,}", + f" Avg no-cache tokens: {avg_no_cache:,.2f}", + f" Avg cacheRead tokens: {avg_cache_read:,.2f}", + f" Avg output tokens: {avg_output:,.2f}", + f" Avg total input tokens: {avg_total_input:,.2f}", ] - # 打印到控制台 - for line in output_lines: - print(line) - # 写入summary.txt - summary_path = os.path.join(os.path.dirname(args.input), "summary.txt") - with open(summary_path, "w", encoding="utf-8") as f: - f.write("\n".join(output_lines) + "\n") - print(f"\nSummary saved to {summary_path}") +def process_import_csv(input_path: str) -> list[str]: + """处理 import_success.csv 的 token 统计""" + total_embedding = 0 + total_vlm = 0 + total_total = 0 + valid_rows = 0 + + with open(input_path, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + valid_rows += 1 + try: + total_embedding += int(row.get("embedding_tokens", 0)) + total_vlm += int(row.get("vlm_tokens", 0)) + total_total += int(row.get("total_tokens", 0)) + except (ValueError, TypeError): + pass + + avg_embedding = total_embedding / valid_rows if valid_rows > 0 else 0.0 + avg_vlm = total_vlm / valid_rows if valid_rows > 0 else 0.0 + avg_total = total_total / valid_rows if valid_rows > 0 else 0.0 + + return [ + "=== OpenViking Import Token Statistics ===", + f"Total sessions: {valid_rows:,}", + f"\nToken usage (Import):", + f" Total embedding tokens: {total_embedding:,}", + f" Total VLM tokens: {total_vlm:,}", + f" Total tokens: {total_total:,}", + f" Avg embedding tokens: {avg_embedding:,.2f}", + f" Avg VLM tokens: {avg_vlm:,.2f}", + f" Avg total tokens: {avg_total:,.2f}", + ] if __name__ == "__main__": From d68f7d86840eaa22303a08d58b662aa9a920711a Mon Sep 17 00:00:00 2001 From: DuTao Date: Wed, 8 Apr 2026 20:35:01 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=AF=84=E6=B5=8B?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/locomo/openclaw/eval.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 4bc323da9..3e6c92c5f 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -31,7 +31,7 @@ # Configuration constants DEFAULT_BASE_URL = "http://127.0.0.1:18789" -DEFAULT_AGENT_ID = "locomo-eval" +DEFAULT_AGENT_ID = "main" DEFAULT_INGEST_RECORD_PATH = ".ingest_record.json" # CSV write lock for thread safety