diff --git a/benchmark/locomo/README.md b/benchmark/locomo/README.md index 9b85563b2..8a0737745 100644 --- a/benchmark/locomo/README.md +++ b/benchmark/locomo/README.md @@ -16,9 +16,12 @@ benchmark/locomo/ │ ├── data/ # 测试数据目录 │ └── result/ # 评测结果目录 └── openclaw/ # OpenClaw 评测脚本 - ├── eval.py # OpenClaw 评估脚本 + ├── import_to_ov.py # 导入数据到 OpenViking + ├── eval.py # OpenClaw 评估脚本 (ingest/qa) ├── judge.py # LLM 裁判打分(适配 OpenClaw) + ├── stat_judge_result.py # 统计评分结果和 token 使用 ├── run_full_eval.sh # 一键运行完整评测流程 + ├── data/ # 测试数据目录 └── result/ # 评测结果目录 ``` @@ -158,19 +161,28 @@ python stat_judge_result.py --input <评分结果文件> ```bash cd benchmark/locomo/openclaw -bash run_full_eval.sh # 只导入 OpenViking +bash run_full_eval.sh # 只导入 OpenViking(跳过已导入的) bash run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw(并行执行) bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 QA 评估 +bash run_full_eval.sh --force-ingest # 强制重新导入所有数据 +bash run_full_eval.sh --sample 0 # 只处理第 0 个 sample ``` **脚本参数说明:** +| 参数 | 说明 | +|------|------| +| `--skip-import` | 跳过导入步骤,直接运行 QA 评估 | +| `--with-claw-import` | 同时导入 OpenViking 和 OpenClaw(并行执行) | +| `--force-ingest` | 强制重新导入所有数据(忽略已导入记录) | +| `--sample ` | 只处理指定的 sample(0-based) | + **脚本执行流程:** 1. 导入数据到 OpenViking(可选同时导入 OpenClaw) 2. 等待 60 秒确保数据导入完成 3. 运行 QA 评估(`eval.py qa`,输出到 `result/qa_results.csv`) 4. 裁判打分(`judge.py`,并行度 40) -5. 统计结果(`stat_judge_result.py`) +5. 统计结果(`stat_judge_result.py`,同时统计 QA 和 Import 的 token 使用) **脚本内部配置参数:** @@ -184,9 +196,50 @@ bash run_full_eval.sh --skip-import # 跳过导入步骤,直接运行 Q ### 分步使用说明 -使用 `openclaw/eval.py` 进行 OpenClaw 评测,该脚本有两种模式: +OpenClaw 评测包含以下脚本: +- `import_to_ov.py`: 导入数据到 OpenViking +- `eval.py`: OpenClaw 评估脚本(ingest/qa 两种模式) +- `judge.py`: LLM 裁判打分 +- `stat_judge_result.py`: 统计评分结果和 token 使用 + +--- + +#### import_to_ov.py - 导入对话数据到 OpenViking + +```bash +python import_to_ov.py [选项] +``` -### 模式 1: ingest - 导入对话数据到OpenClaw +**参数说明:** +- `--input`: 输入文件路径(JSON 或 TXT),默认 `../data/locomo10.json` +- `--sample`: 指定样本索引(0-based) +- `--sessions`: 指定会话范围,如 `1-4` +- `--question-index`: 根据 question 的 evidence 自动推断需要的 session +- `--force-ingest`: 强制重新导入 +- `--no-user-agent-id`: 不传入 user_id 和 agent_id 给 OpenViking 客户端 +- `--openviking-url`: OpenViking 服务地址,默认 `http://localhost:1933` +- `--success-csv`: 成功记录 CSV 路径,默认 `./result/import_success.csv` +- `--error-log`: 错误日志路径,默认 `./result/import_errors.log` + +**示例:** +```bash +# 导入所有数据(跳过已导入的) +python import_to_ov.py + +# 强制重新导入,不使用 user/agent id +python import_to_ov.py --force-ingest --no-user-agent-id + +# 只导入第 0 个 sample +python import_to_ov.py --sample 0 +``` + +--- + +#### eval.py - OpenClaw 评估脚本 + +该脚本有两种模式: + +##### 模式 1: ingest - 导入对话数据到 OpenClaw ```bash python eval.py ingest <输入文件> [选项] @@ -195,37 +248,83 @@ python eval.py ingest <输入文件> [选项] **参数说明:** - `--sample`: 指定样本索引 - `--sessions`: 指定会话范围,如 `1-4` -- `--viking`: 使用 OpenViking 而非 OpenClaw 导入 - `--force-ingest`: 强制重新导入 - `--agent-id`: Agent ID,默认 `locomo-eval` +- `--token`: OpenClaw Gateway Token **示例:** ```bash # 导入第一个样本的 1-4 会话到 OpenClaw -python eval.py ingest locomo10.json --sample 0 --sessions 1-4 - -# 导入到 OpenViking -python eval.py ingest locomo10.json --sample 0 --viking +python eval.py ingest locomo10.json --sample 0 --sessions 1-4 --token ``` -### 模式 2: qa - 运行 QA 评估 -- 该评测制定了指定了`X-OpenClaw-Session-Key`,确保每次openclaw使用相同的session_id。Token计算将统计`session.jsonl`文件中的所有assistant轮次的Token消耗。每道题目执行完后会清空session.jsonl文件。 -- 该评测仅支持单线程运行,不支持并发。 -- 需先执行一次,查看`.openclaw/agents/{your_agent_id}/sessions/`下的session文件ID,作为`--session-id`参数的值开始完整评测。 +##### 模式 2: qa - 运行 QA 评估 + +- 该评测指定了 `X-OpenClaw-Session-Key`,确保每次 OpenClaw 使用相同的 session_id +- Token 计算统计 `session.jsonl` 文件中的所有 assistant 轮次的 Token 消耗 +- 每道题目执行完后会归档 session 文件 +- 支持并发运行(`--parallel` 参数) +- 问题会自动添加时间上下文(从最后一个 session 提取) + ```bash python eval.py qa <输入文件> [选项] ``` **参数说明:** -- `--output`: 输出文件路径 +- `--output`: 输出文件路径(不含 .csv 后缀) - `--sample`: 指定样本索引 - `--count`: 运行的 QA 问题数量 - `--user`: 用户 ID,默认 `eval-1` +- `--parallel`: 并发数,默认 10,最大 40 - `--token`: OpenClaw Gateway Token(或设置 `OPENCLAW_GATEWAY_TOKEN` 环境变量) **示例:** ```bash -python eval.py qa locomo10.json --sample 0 --output qa_results.txt +# 运行所有 sample 的 QA 评估 +python eval.py qa locomo10.json --token --parallel 15 + +# 只运行第 0 个 sample +python eval.py qa locomo10.json --sample 0 --output qa_results_sample0 +``` + +--- + +#### judge.py - LLM 裁判打分 + +```bash +python judge.py [选项] +``` + +**参数说明:** +- `--input`: QA 结果 CSV 文件路径 +- `--parallel`: 并发请求数,默认 40 + +**示例:** +```bash +python judge.py --input ./result/qa_results.csv --parallel 40 +``` + +--- + +#### stat_judge_result.py - 统计结果 + +同时统计 QA 结果和 OpenViking Import 的 token 使用: + +```bash +python stat_judge_result.py [选项] +``` + +**参数说明:** +- `--input`: QA 结果 CSV 文件路径,默认 `./result/qa_results_sample0.csv` +- `--import-csv`: Import 成功 CSV 文件路径,默认 `./result/import_success.csv` + +**输出统计包括:** +- QA 结果统计:正确率、token 使用(no-cache、cacheRead、output) +- OpenViking Import 统计:embedding_tokens、vlm_tokens、total_tokens + +**示例:** +```bash +python stat_judge_result.py --input ./result/qa_results_sample0.csv --import-csv ./result/import_success.csv ``` --- diff --git a/benchmark/locomo/openclaw/eval.py b/benchmark/locomo/openclaw/eval.py index 22a565710..3e6c92c5f 100644 --- a/benchmark/locomo/openclaw/eval.py +++ b/benchmark/locomo/openclaw/eval.py @@ -22,16 +22,20 @@ import os import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime from pathlib import Path +from threading import Lock import requests # Configuration constants DEFAULT_BASE_URL = "http://127.0.0.1:18789" -DEFAULT_SESSION_KEY = "eval-test-2" -DEFAULT_AGENT_ID = "locomo-eval" +DEFAULT_AGENT_ID = "main" DEFAULT_INGEST_RECORD_PATH = ".ingest_record.json" -DEFAULT_OV_COMMAND = ["ov", "add-memory"] + +# CSV write lock for thread safety +csv_lock = Lock() # --------------------------------------------------------------------------- @@ -181,6 +185,56 @@ def build_session_messages( return sessions +# --------------------------------------------------------------------------- +# Question time helpers +# --------------------------------------------------------------------------- + +def parse_locomo_datetime(date_str: str) -> datetime | None: + """解析 LoCoMo 时间格式,如 '1:56 pm on 8 May, 2023'""" + try: + # 移除时间部分,只保留日期 "8 May, 2023" + if " on " in date_str: + date_part = date_str.split(" on ")[-1] + return datetime.strptime(date_part.strip(), "%d %B, %Y") + except ValueError: + pass + return None + + +def get_sample_question_time(sample: dict) -> str | None: + """从 sample 的 conversation 中提取最后一个有内容 session 的时间,返回 ISO 格式日期""" + conversation = sample.get("conversation", {}) + + # 找所有 session_N 字段(非 date_time) + session_keys = [ + k for k in conversation.keys() if k.startswith("session_") and "date_time" not in k + ] + if not session_keys: + return None + + # 按 session 编号排序,找到最后一个有内容的 + def get_session_num(key): + try: + return int(key.replace("session_", "")) + except ValueError: + return 0 + + session_keys.sort(key=get_session_num, reverse=True) + + for session_key in session_keys: + if conversation.get(session_key): # 有内容 + # 找到对应的 date_time + session_num = get_session_num(session_key) + dt_key = f"session_{session_num}_date_time" + date_str = conversation.get(dt_key) + if date_str: + dt = parse_locomo_datetime(date_str) + if dt: + return dt.strftime("%Y-%m-%d") + + return None + + # --------------------------------------------------------------------------- # Ingest record helpers (avoid duplicate ingestion) # --------------------------------------------------------------------------- @@ -261,6 +315,51 @@ def extract_response_text(response_json: dict) -> str: return f"[ERROR: could not extract text from response: {response_json}]" +def get_session_id_from_key(session_key: str, user: str, agent_id: str = "main") -> str | None: + """Search all agents' sessions.json files for the session_key and return sessionFile path. + Returns the full path to the session JSONL file if found, None otherwise. + """ + agents_base_dir = os.path.expanduser("~/.openclaw/agents") + + if not os.path.exists(agents_base_dir): + print(f" [session] Agents directory not found: {agents_base_dir}", file=sys.stderr) + return None + + # Iterate through all agent directories + for agent_name in os.listdir(agents_base_dir): + agent_dir = os.path.join(agents_base_dir, agent_name) + if not os.path.isdir(agent_dir): + continue + + sessions_dir = os.path.join(agent_dir, "sessions") + sessions_file = os.path.join(sessions_dir, "sessions.json") + + if not os.path.exists(sessions_file): + continue + + try: + with open(sessions_file, "r") as f: + data = json.load(f) + + # Search for the session_key in this sessions.json + for key, value in data.items(): + if session_key in key and isinstance(value, dict): + session_file = value.get("sessionFile") + if session_file: + print(f" [session] Found sessionFile in agent '{agent_name}': {session_file}", file=sys.stderr) + return session_file + + except json.JSONDecodeError as e: + print(f" [session] Error parsing {sessions_file}: {e}", file=sys.stderr) + continue + except IOError as e: + print(f" [session] Error reading {sessions_file}: {e}", file=sys.stderr) + continue + + print(f" [session] session_key '{session_key}' not found in any agent's sessions.json", file=sys.stderr) + return None + + def get_session_id(user: str, agent_id: str = "main") -> str | None: """Read the current session ID for the given user from sessions.json.""" sessions_file = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions/sessions.json") @@ -280,46 +379,85 @@ def get_session_id(user: str, agent_id: str = "main") -> str | None: return None -def reset_session(session_id: str, agent_id: str = "main") -> str | None: - """Archive the session .jsonl file by renaming it with a timestamp suffix. +def reset_session(session_path: str, agent_id: str = "main") -> str | None: + """Rename the session .jsonl file with a timestamp suffix. + Accepts either a session_id or a full path to the session file. Returns the new filename if successful, None otherwise. """ - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") - src = os.path.join(sessions_dir, f"{session_id}.jsonl") - dst = f"{src}.{int(time.time())}" + # Check if session_path is already a full path + if os.path.isabs(session_path) and os.path.exists(session_path): + src = session_path + else: + # Treat as session_id + sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") + src = os.path.join(sessions_dir, f"{session_path}.jsonl") + + if not os.path.exists(src): + print(f" [backup] Session file not found: {src}", file=sys.stderr) + return None + + timestamp = time.strftime("%Y%m%d_%H%M%S") + dst = f"{src}.{timestamp}" try: os.rename(src, dst) new_filename = os.path.basename(dst) - print(f" [reset] archived {session_id}.jsonl -> {new_filename}", file=sys.stderr) + print(f" [backup] renamed {os.path.basename(src)} -> {new_filename}", file=sys.stderr) return new_filename - except FileNotFoundError: - print(f" [reset] Session file not found: {src}", file=sys.stderr) - return None except IOError as e: - print(f" [reset] could not archive session file: {e}", file=sys.stderr) + print(f" [backup] could not rename session file: {e}", file=sys.stderr) return None -def viking_ingest(msg: str) -> None: - """Save a message to OpenViking via `ov add-memory`.""" - import subprocess - result = subprocess.run( - DEFAULT_OV_COMMAND + [msg], - capture_output=True, - text=True, - ) - if result.returncode != 0: - raise RuntimeError(result.stderr.strip() or f"ov exited with code {result.returncode}") +def calculate_usage_from_jsonl(jsonl_filename: str, agent_id: str = "main") -> dict: + """Calculate token usage from archived JSONL file.""" + # Check if jsonl_filename is already a full path + if os.path.isabs(jsonl_filename) and os.path.exists(jsonl_filename): + jsonl_full_path = jsonl_filename + else: + sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{agent_id}/sessions") + jsonl_full_path = os.path.join(sessions_dir, jsonl_filename) + + usage = { + "input_tokens": 0, + "output_tokens": 0, + "cacheRead": 0, + "cacheWrite": 0, + "total_tokens": 0, + } + + if not os.path.exists(jsonl_full_path): + return usage + + try: + with open(jsonl_full_path, "r", encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + entry = json.loads(line) + if entry.get("type") == "message" and entry.get("message", {}).get("role") == "assistant": + entry_usage = entry.get("message", {}).get("usage", {}) + usage["input_tokens"] += entry_usage.get("input", 0) + usage["output_tokens"] += entry_usage.get("output", 0) + usage["cacheRead"] += entry_usage.get("cacheRead", 0) + usage["cacheWrite"] += entry_usage.get("cacheWrite", 0) + usage["total_tokens"] += entry_usage.get("totalTokens", 0) + except json.JSONDecodeError as e: + print(f" [usage] Error parsing JSONL file: {e}", file=sys.stderr) + except IOError as e: + print(f" [usage] Error reading JSONL file: {e}", file=sys.stderr) + + return usage def send_message_with_retry( - base_url: str, token: str, user: str, message: str, retries: int = 2, agent_id: str = DEFAULT_AGENT_ID + base_url: str, token: str, user: str, message: str, retries: int = 2, + agent_id: str = DEFAULT_AGENT_ID, session_key: str | None = None ) -> tuple[str, dict]: """Call send_message with up to `retries` retries on failure.""" last_exc = None for attempt in range(retries + 1): try: - return send_message(base_url, token, user, message, agent_id) + return send_message(base_url, token, user, message, agent_id, session_key) except Exception as e: last_exc = e if attempt < retries: @@ -328,7 +466,8 @@ def send_message_with_retry( def send_message( - base_url: str, token: str, user: str, message: str, agent_id: str = DEFAULT_AGENT_ID + base_url: str, token: str, user: str, message: str, + agent_id: str = DEFAULT_AGENT_ID, session_key: str | None = None ) -> tuple[str, dict]: """Send a single message to the OpenClaw responses API. @@ -338,9 +477,10 @@ def send_message( headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}", - "X-OpenClaw-Agent-ID": agent_id, - "X-OpenClaw-Session-Key": DEFAULT_SESSION_KEY + "X-OpenClaw-Agent-ID": agent_id } + if session_key: + headers["X-OpenClaw-Session-Key"] = session_key payload = { "model": "openclaw", "input": message, @@ -414,62 +554,36 @@ def run_ingest( preview = msg.replace("\n", " | ")[:80] print(f" [{label}] {preview}...", file=sys.stderr) - if args.viking: - try: - viking_ingest(msg) - print(f" -> [viking] saved", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": "[viking] saved", - "usage": {}, - }) - # Mark as successfully ingested - mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { - "mode": "viking", - "date_time": meta['date_time'] - }) - except Exception as e: - print(f" -> [ERROR] {e}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": f"[ERROR] {e}", - "usage": {}, - }) - else: - try: - reply, usage = send_message(args.base_url, args.token, user_key, msg, args.agent_id) - print(f" -> {reply[:80]}{'...' if len(reply) > 80 else ''}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": reply, - "usage": usage, - }) - # Mark as successfully ingested - mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { - "mode": "openclaw", - "date_time": meta['date_time'], - "usage": usage - }) - except Exception as e: - print(f" -> [ERROR] {e}", file=sys.stderr) - results.append({ - "sample_id": sample_id, - "session": meta["session_key"], - "user": user_key, - "reply": f"[ERROR] {e}", - "usage": {}, - }) - - if session_id is None: - session_id = get_session_id(user_key, args.agent_id) - if session_id: - reset_session(session_id, args.agent_id) + try: + reply, usage = send_message(args.base_url, args.token, user_key, msg, args.agent_id) + print(f" -> {reply[:80]}{'...' if len(reply) > 80 else ''}", file=sys.stderr) + results.append({ + "sample_id": sample_id, + "session": meta["session_key"], + "user": user_key, + "reply": reply, + "usage": usage, + }) + # Mark as successfully ingested + mark_ingested(args.agent_id, user_key, sample_id, meta['session_key'], ingest_record, { + "mode": "openclaw", + "date_time": meta['date_time'], + "usage": usage + }) + except Exception as e: + print(f" -> [ERROR] {e}", file=sys.stderr) + results.append({ + "sample_id": sample_id, + "session": meta["session_key"], + "user": user_key, + "reply": f"[ERROR] {e}", + "usage": {}, + }) + + if session_id is None: + session_id = get_session_id(user_key, args.agent_id) + if session_id: + reset_session(session_id, args.agent_id) if args.output: try: @@ -545,6 +659,89 @@ def run_ingest( # QA: run QA questions and compare with expected answers # --------------------------------------------------------------------------- +def process_single_question( + sample_id: str, + sample_idx: int, + original_qi: int, + qa: dict, + args: argparse.Namespace, + csv_path: str, + question_time: str | None = None, +) -> dict: + """Process a single QA question. Returns the record.""" + question = qa["question"] + expected = str(qa["answer"]) + category = qa.get("category", "") + evidence = qa.get("evidence", []) + + # Generate unique session_key based on sample_id + question_index + session_key = f"qa-{sample_id}-q{original_qi}" + user_key = args.user or f"eval-{sample_idx}" + + print(f" [{sample_idx}] Q{original_qi}: {question[:60]}{'...' if len(question) > 60 else ''}", file=sys.stderr) + # 如果有 question_time,注入到 prompt 中 + if question_time: + input_msg = f"Current date: {question_time}. Answer the question directly: {question}" + else: + input_msg = f"Answer the question directly: {question}" + + jsonl_filename = "" + try: + response, api_usage = send_message_with_retry( + args.base_url, args.token, sample_id, input_msg, 2, args.agent_id, session_key + ) + print(f" [{sample_idx}] A: {response[:60]}{'...' if len(response) > 60 else ''}", file=sys.stderr) + + # Get sessionFile path from sessions.json using session_key + session_file_path = get_session_id_from_key(session_key, user_key, args.agent_id) + jsonl_filename = "" + + # Archive the session file if we found it + if session_file_path: + jsonl_filename = reset_session(session_file_path, args.agent_id) + + # Calculate usage from JSONL file if available, otherwise use API usage + if jsonl_filename and session_file_path: + # Use the directory from session_file_path and the archived filename + usage = calculate_usage_from_jsonl(os.path.join(os.path.dirname(session_file_path), jsonl_filename), args.agent_id) + print(f" [{sample_idx}] tokens (from JSONL): in={usage['input_tokens']} out={usage['output_tokens']} cacheRead={usage['cacheRead']} cacheWrite={usage['cacheWrite']} total={usage['total_tokens']}", file=sys.stderr) + else: + usage = { + "input_tokens": api_usage.get("input_tokens", 0), + "output_tokens": api_usage.get("output_tokens", 0), + "cacheRead": api_usage.get("cacheRead", 0), + "cacheWrite": api_usage.get("cacheWrite", 0), + "total_tokens": api_usage.get("total_tokens", 0), + } + print(f" [{sample_idx}] tokens (from API): in={usage['input_tokens']} out={usage['output_tokens']} cacheRead={usage['cacheRead']} cacheWrite={usage['cacheWrite']} total={usage['total_tokens']}", file=sys.stderr) + + except Exception as e: + response = f"[ERROR] {e}" + usage = {} + jsonl_filename = "" + print(f" [{sample_idx}] A: {response}", file=sys.stderr) + + record = { + "sample_id": sample_id, + "sample_idx": sample_idx, + "qi": original_qi, + "question": question, + "expected": expected, + "response": response, + "category": category, + "evidence": evidence, + "usage": usage, + "jsonl_filename": jsonl_filename, + } + + # Save to CSV with lock for thread safety + with csv_lock: + save_record_to_csv(csv_path, record) + print(f" [{sample_idx}] Saved to CSV: Q{original_qi}", file=sys.stderr) + + return record + + def run_sample_qa( item: dict, sample_idx: int, @@ -552,9 +749,10 @@ def run_sample_qa( executed_records: set, csv_path: str, ) -> tuple[list[dict], dict]: - """Process QA for a single sample. Returns (records, sample_usage).""" + """Process QA for a single sample with concurrent question execution. Returns (records, sample_usage).""" sample_id = item["sample_id"] user_key = args.user or f"eval-{sample_idx}" + question_time = get_sample_question_time(item) qas = [q for q in item.get("qa", []) if str(q.get("category", "")) != "5"] if args.count is not None: qas = qas[:args.count] @@ -573,131 +771,35 @@ def run_sample_qa( print(f" All QA questions already executed, skipping sample.", file=sys.stderr) return [], {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - jsonl_path = f"{args.output}.{sample_idx}.jsonl" if args.output else None - - sample_usage = {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - records = [] - session_id = None - print(f"\n=== Sample {sample_id} [{sample_idx}] (user={user_key}) ===", file=sys.stderr) - print(f" Running {len(qas)} QA question(s)...", file=sys.stderr) + if question_time: + print(f" Question time context: {question_time}", file=sys.stderr) + print(f" Running {len(qas)} QA question(s) with max {args.parallel} workers...", file=sys.stderr) - jsonl_file = None - if jsonl_path: - try: - jsonl_file = open(jsonl_path, "w", encoding="utf-8") - except IOError as e: - print(f"Warning: Could not open JSONL file {jsonl_path}: {e}", file=sys.stderr) + records = [] + sample_usage = {"input_tokens": 0, "output_tokens": 0, "cacheRead": 0, "cacheWrite": 0, "total_tokens": 0} - try: + # Use ThreadPoolExecutor for concurrent question execution + with ThreadPoolExecutor(max_workers=args.parallel) as executor: + futures = [] for original_qi, qa in qas: - question = qa["question"] - expected = str(qa["answer"]) - category = qa.get("category", "") - evidence = qa.get("evidence", []) - - print(f" [{sample_idx}] Q{original_qi}: {question[:60]}{'...' if len(question) > 60 else ''}", file=sys.stderr) - - jsonl_filename = "" + future = executor.submit( + process_single_question, + sample_id, sample_idx, original_qi, qa, args, csv_path, question_time + ) + futures.append(future) + + # Collect results + for future in as_completed(futures): try: - response, api_usage = send_message_with_retry( - args.base_url, args.token, user_key, question, 2, args.agent_id, - ) - print(f" [{sample_idx}] A: {response[:60]}{'...' if len(response) > 60 else ''}", file=sys.stderr) - - # Use provided session_id if available, otherwise get from system - if args.session_id: - session_id = args.session_id - elif session_id is None: - session_id = get_session_id(user_key, args.agent_id) - - # Reset session and get archived filename - if session_id: - jsonl_filename = reset_session(session_id, args.agent_id) - - # Use API usage by default - usage = api_usage - # Calculate usage from JSONL file if session_id is provided and we have the archived file - if args.session_id and jsonl_filename: - # Parse the archived JSONL file to calculate usage - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{args.agent_id}/sessions") - jsonl_full_path = os.path.join(sessions_dir, jsonl_filename) - if os.path.exists(jsonl_full_path): - total_input = 0 - total_output = 0 - total_cache_read = 0 - total_cache_write = 0 - total_total_tokens = 0 - try: - with open(jsonl_full_path, "r", encoding="utf-8") as f: - for line in f: - if not line.strip(): - continue - entry = json.loads(line) - if entry.get("type") == "message" and entry.get("message", {}).get("role") == "assistant": - entry_usage = entry.get("message", {}).get("usage", {}) - total_input += entry_usage.get("input", 0) - total_output += entry_usage.get("output", 0) - total_cache_read += entry_usage.get("cacheRead", 0) - total_cache_write += entry_usage.get("cacheWrite", 0) - total_total_tokens += entry_usage.get("totalTokens", 0) - usage = { - "input_tokens": total_input, - "output_tokens": total_output, - "cacheRead": total_cache_read, - "cacheWrite": total_cache_write, - "total_tokens": total_total_tokens, - } - print(f" [{sample_idx}] tokens (from JSONL): in={total_input} out={total_output} cacheRead={total_cache_read} cacheWrite={total_cache_write} total={total_total_tokens}", file=sys.stderr) - except json.JSONDecodeError as e: - print(f" [{sample_idx}] Error parsing JSONL file: {e}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - except IOError as e: - print(f" [{sample_idx}] Error reading JSONL file: {e}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - else: - print(f" [{sample_idx}] JSONL file not found: {jsonl_full_path}, using API usage", file=sys.stderr) - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - else: - print(f" [{sample_idx}] tokens (from API): in={usage.get('input_tokens',0)} out={usage.get('output_tokens',0)} cacheRead={usage.get('cacheRead',0)} cacheWrite={usage.get('cacheWrite',0)} total={usage.get('total_tokens',0)}", file=sys.stderr) - + record = future.result() + records.append(record) + # Accumulate usage + usage = record.get("usage", {}) for k in sample_usage: sample_usage[k] += usage.get(k, 0) except Exception as e: - response = f"[ERROR] {e}" - usage = {} - jsonl_filename = "" - print(f" [{sample_idx}] A: {response}", file=sys.stderr) - - record = { - "sample_id": sample_id, - "sample_idx": sample_idx, - "qi": original_qi, - "question": question, - "expected": expected, - "response": response, - "category": category, - "evidence": evidence, - "usage": usage, - "jsonl_filename": jsonl_filename, - } - records.append(record) - - # Save to CSV immediately after successful execution - save_record_to_csv(csv_path, record) - print(f" [{sample_idx}] Saved to CSV: Q{original_qi}", file=sys.stderr) - - if jsonl_file: - try: - jsonl_file.write(json.dumps(record, ensure_ascii=False) + "\n") - jsonl_file.flush() - except IOError as e: - print(f"Warning: Error writing to JSONL file: {e}", file=sys.stderr) - - finally: - if jsonl_file: - jsonl_file.close() - print(f" [{sample_idx}] written to {jsonl_path}", file=sys.stderr) + print(f" [{sample_idx}] Error in question task: {e}", file=sys.stderr) return records, sample_usage @@ -763,9 +865,12 @@ def run_qa( print("Error: QA mode only works with LoCoMo JSON files", file=sys.stderr) sys.exit(1) + # Ensure parallel is within reasonable bounds (1-40) + args.parallel = max(1, min(40, args.parallel)) + samples = load_locomo_data(args.input, args.sample) print(f" user: {args.user or 'eval-{sample_idx}'}", file=sys.stderr) - print(f" running in single-thread mode", file=sys.stderr) + print(f" running with {args.parallel} concurrent workers", file=sys.stderr) # Load already executed records from CSV csv_path = f"{args.output}.csv" if args.output else args.default_csv_path @@ -774,17 +879,6 @@ def run_qa( executed_records = load_executed_records(csv_path) print(f" Loaded {len(executed_records)} already executed records from {csv_path}", file=sys.stderr) - # Clean up existing session file if session_id is provided - if args.session_id: - sessions_dir = os.path.expanduser(f"~/.openclaw/agents/{args.agent_id}/sessions") - session_file = os.path.join(sessions_dir, f"{args.session_id}.jsonl") - if os.path.exists(session_file): - try: - os.remove(session_file) - print(f" Cleaned up existing session file: {os.path.basename(session_file)}", file=sys.stderr) - except Exception as e: - print(f" Warning: Could not remove existing session file: {e}", file=sys.stderr) - results_list = [] for idx, item in enumerate(samples): result = run_sample_qa(item, idx + 1, args, executed_records, csv_path) @@ -797,7 +891,31 @@ def run_qa( print(f"\n total tokens: in={total_usage['input_tokens']} out={total_usage['output_tokens']} total={total_usage['total_tokens']}", file=sys.stderr) + # Generate timestamp once for all backups + timestamp = time.strftime("%Y%m%d_%H%M%S") + import shutil + + # Backup CSV file with timestamp + if os.path.exists(csv_path): + csv_path_obj = Path(csv_path) + backup_csv_path = csv_path_obj.parent / f"{csv_path_obj.stem}_{timestamp}{csv_path_obj.suffix}" + try: + shutil.copy2(csv_path, backup_csv_path) + print(f" CSV backed up to: {backup_csv_path}", file=sys.stderr) + except Exception as e: + print(f"Warning: Failed to backup CSV file: {e}", file=sys.stderr) + if args.output: + # Backup output summary file too + if os.path.exists(args.output): + output_path_obj = Path(args.output) + backup_output_path = output_path_obj.parent / f"{output_path_obj.stem}_{timestamp}{output_path_obj.suffix}" + try: + shutil.copy2(args.output, backup_output_path) + print(f" Summary backed up to: {backup_output_path}", file=sys.stderr) + except Exception as e: + print(f"Warning: Failed to backup summary file: {e}", file=sys.stderr) + try: with open(args.output, "w", encoding="utf-8") as f: f.write("=== TOTAL USAGE ===\n") @@ -877,15 +995,9 @@ def main(): parser.add_argument( "-p", "--parallel", type=int, - default=1, + default=10, metavar="N", - help="QA mode: number of samples to process concurrently (max 10, default 1).", - ) - parser.add_argument( - "--viking", - action="store_true", - default=False, - help="Ingest mode: save to OpenViking via `ov add-memory` instead of OpenClaw.", + help="QA mode: number of questions to process concurrently (max 40, default 10).", ) parser.add_argument( "--agent-id", @@ -895,7 +1007,7 @@ def main(): parser.add_argument( "--session-id", default=None, - help="Session ID for API requests. If provided, will use this session ID and calculate token usage from corresponding JSONL file.", + help="Session ID for API requests (ingest mode only).", ) parser.add_argument( "--force-ingest", @@ -913,7 +1025,7 @@ def main(): # 添加默认 CSV 路径到 args args.default_csv_path = default_csv_path - if not args.token and not getattr(args, "viking", False): + if not args.token: print("Error: --token or OPENCLAW_GATEWAY_TOKEN env var is required", file=sys.stderr) sys.exit(1) diff --git a/benchmark/locomo/openclaw/import_to_ov.py b/benchmark/locomo/openclaw/import_to_ov.py new file mode 100644 index 000000000..14a51cb21 --- /dev/null +++ b/benchmark/locomo/openclaw/import_to_ov.py @@ -0,0 +1,669 @@ +""" +OpenViking data import tool. + +Import conversations from LoCoMo JSON or plain text files into OpenViking memory. + +Usage: + # Import LoCoMo JSON conversations + uv run python import_to_ov.py locomo10.json --sample 0 --sessions 1-4 + + # Import plain text conversations + uv run python import_to_ov.py example.txt +""" + +import argparse +import asyncio +import csv +import json +import sys +import time +import traceback +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Any, Tuple, Optional + +import openviking as ov + + +def _get_session_number(session_key: str) -> int: + """Extract session number from session key.""" + return int(session_key.split("_")[1]) + + +def parse_test_file(path: str) -> List[Dict[str, Any]]: + """Parse txt test file into sessions. + + Each session is a dict with: + - messages: list of user message strings + """ + with open(path, "r", encoding="utf-8") as f: + content = f.read() + + raw_sessions = content.split("---\n") + sessions = [] + + for raw in raw_sessions: + lines = [line for line in raw.strip().splitlines() if line.strip()] + if not lines: + continue + + messages = [] + for line in lines: + if not line.startswith("eval:"): # Skip eval lines + messages.append(line) + + if messages: + sessions.append({"messages": messages}) + + return sessions + + +def load_locomo_data( + path: str, + sample_index: Optional[int] = None, +) -> List[Dict[str, Any]]: + """Load LoCoMo JSON and optionally filter to one sample.""" + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + + if sample_index is not None: + if sample_index < 0 or sample_index >= len(data): + raise ValueError(f"Sample index {sample_index} out of range (0-{len(data) - 1})") + return [data[sample_index]] + return data + + +def build_session_messages( + item: Dict[str, Any], + session_range: Optional[Tuple[int, int]] = None, +) -> List[Dict[str, Any]]: + """Build session messages for one LoCoMo sample. + + Returns list of dicts with keys: messages, meta. + Each dict represents a session with multiple messages (user/assistant role). + """ + conv = item["conversation"] + speakers = f"{conv['speaker_a']} & {conv['speaker_b']}" + + session_keys = sorted( + [k for k in conv if k.startswith("session_") and not k.endswith("_date_time")], + key=_get_session_number, + ) + + sessions = [] + for sk in session_keys: + sess_num = _get_session_number(sk) + if session_range: + lo, hi = session_range + if sess_num < lo or sess_num > hi: + continue + + dt_key = f"{sk}_date_time" + date_time = conv.get(dt_key, "") + + # Extract messages with all as user role, including speaker in content + messages = [] + for idx, msg in enumerate(conv[sk]): + speaker = msg.get("speaker", "unknown") + text = msg.get("text", "") + messages.append( + {"role": "user", "text": f"[{speaker}]: {text}", "speaker": speaker, "index": idx} + ) + + sessions.append( + { + "messages": messages, + "meta": { + "sample_id": item["sample_id"], + "session_key": sk, + "date_time": date_time, + "speakers": speakers, + }, + } + ) + + return sessions + + +# --------------------------------------------------------------------------- +# Ingest record helpers (avoid duplicate ingestion) +# --------------------------------------------------------------------------- + + +def load_success_csv(csv_path: str = "./result/import_success.csv") -> set: + """加载成功导入的CSV记录,返回已成功的键集合""" + success_keys = set() + if Path(csv_path).exists(): + with open(csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + key = f"viking:{row['sample_id']}:{row['session']}" + success_keys.add(key) + return success_keys + + +def write_success_record( + record: Dict[str, Any], csv_path: str = "./result/import_success.csv" +) -> None: + """写入成功记录到CSV文件""" + file_exists = Path(csv_path).exists() + fieldnames = [ + "timestamp", + "sample_id", + "session", + "date_time", + "speakers", + "embedding_tokens", + "vlm_tokens", + "llm_input_tokens", + "llm_output_tokens", + "total_tokens", + ] + + with open(csv_path, "a", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + if not file_exists: + writer.writeheader() + + writer.writerow( + { + "timestamp": record["timestamp"], + "sample_id": record["sample_id"], + "session": record["session"], + "date_time": record.get("meta", {}).get("date_time", ""), + "speakers": record.get("meta", {}).get("speakers", ""), + "embedding_tokens": record["token_usage"].get("embedding", 0), + "vlm_tokens": record["token_usage"].get("vlm", 0), + "llm_input_tokens": record["token_usage"].get("llm_input", 0), + "llm_output_tokens": record["token_usage"].get("llm_output", 0), + "total_tokens": record["token_usage"].get("total", 0), + } + ) + + +def write_error_record( + record: Dict[str, Any], error_path: str = "./result/import_errors.log" +) -> None: + """写入错误记录到日志文件""" + with open(error_path, "a", encoding="utf-8") as f: + timestamp = record["timestamp"] + sample_id = record["sample_id"] + session = record["session"] + error = record["error"] + f.write(f"[{timestamp}] ERROR [{sample_id}/{session}]: {error}\n") + + +def is_already_ingested( + sample_id: str | int, + session_key: str, + success_keys: Optional[set] = None, +) -> bool: + """Check if a specific session has already been successfully ingested.""" + key = f"viking:{sample_id}:{session_key}" + return success_keys is not None and key in success_keys + + +# --------------------------------------------------------------------------- +# OpenViking import +# --------------------------------------------------------------------------- +def _parse_token_usage(commit_result: Dict[str, Any]) -> Dict[str, int]: + """解析Token使用数据(从commit返回的telemetry或task result中提取)""" + # 尝试从 task result 中提取(task 完成后包含完整 token_usage) + if "result" in commit_result: + result = commit_result["result"] + if "token_usage" in result: + tu = result["token_usage"] + embedding = tu.get("embedding", {}) + llm = tu.get("llm", {}) + # embedding 格式可能是 {"total": N} 或 {"total_tokens": N} + embed_total = embedding.get("total", embedding.get("total_tokens", 0)) + llm_total = llm.get("total", llm.get("total_tokens", 0)) + return { + "embedding": embed_total, + "vlm": llm_total, + "llm_input": llm.get("input", 0), + "llm_output": llm.get("output", 0), + "total": tu.get("total", {}).get("total_tokens", embed_total + llm_total), + } + + # 从 commit 响应的 telemetry 中提取 + telemetry = commit_result.get("telemetry", {}).get("summary", {}) + tokens = telemetry.get("tokens", {}) + return { + "embedding": tokens.get("embedding", {}).get("total", 0), + "vlm": tokens.get("llm", {}).get("total", 0), + "llm_input": tokens.get("llm", {}).get("input", 0), + "llm_output": tokens.get("llm", {}).get("output", 0), + "total": tokens.get("total", 0), + } + + +async def viking_ingest( + messages: List[Dict[str, Any]], + openviking_url: str, + session_time: Optional[str] = None, + user_id: Optional[str] = None, + agent_id: Optional[str] = None, +) -> Dict[str, int]: + """Save messages to OpenViking via OpenViking SDK client. + Returns token usage dict with embedding and vlm token counts. + + Args: + messages: List of message dicts with role and text + openviking_url: OpenViking service URL + session_time: Session time string (e.g., "9:36 am on 2 April, 2023") + user_id: User identifier for separate userspace (e.g., "conv-26") + agent_id: Agent identifier for separate agentspace (e.g., "conv-26") + """ + # 解析 session_time - 为每条消息计算递增的时间戳 + base_datetime = None + if session_time: + try: + base_datetime = datetime.strptime(session_time, "%I:%M %p on %d %B, %Y") + except ValueError: + print(f"Warning: Failed to parse session_time: {session_time}", file=sys.stderr) + + # Create client + client_kwargs = {"url": openviking_url} + if user_id is not None: + client_kwargs["user"] = user_id + if agent_id is not None: + client_kwargs["agent_id"] = agent_id + client = ov.AsyncHTTPClient(**client_kwargs) + await client.initialize() + + try: + # Create session + create_res = await client.create_session() + session_id = create_res["session_id"] + + # Add messages one by one with created_at + for idx, msg in enumerate(messages): + msg_created_at = None + if base_datetime: + # 每条消息递增1秒,确保时间顺序 + msg_dt = base_datetime + timedelta(seconds=idx) + msg_created_at = msg_dt.isoformat() + + await client.add_message( + session_id=session_id, + role=msg["role"], + parts=[{"type": "text", "text": msg["text"]}], + created_at=msg_created_at, + ) + + # Commit + result = await client.commit_session(session_id, telemetry=True) + + # Accept both "committed" and "accepted" as success - accepted means the session was archived + if result.get("status") not in ("committed", "accepted"): + raise RuntimeError(f"Commit failed: {result}") + + # 等待 task 完成以获取准确 token 消耗 + task_id = result.get("task_id") + if task_id: + # 轮询任务状态直到完成 + max_attempts = 1200 # 最多等待20分钟 + for attempt in range(max_attempts): + task = await client.get_task(task_id) + status = task.get("status") if task else "unknown" + if status == "completed": + token_usage = _parse_token_usage(task) + break + elif status in ("failed", "cancelled", "unknown"): + raise RuntimeError(f"Task {task_id} {status}: {task}") + await asyncio.sleep(1) + else: + raise RuntimeError(f"Task {task_id} timed out after {max_attempts} attempts") + else: + token_usage = {"embedding": 0, "vlm": 0, "total": 0} + + # Get trace_id from commit result + trace_id = result.get("trace_id", "") + return {"token_usage": token_usage, "task_id": task_id, "trace_id": trace_id} + + finally: + await client.close() + + +def parse_session_range(s: str) -> Tuple[int, int]: + """Parse '1-4' or '3' into (lo, hi) inclusive tuple.""" + if "-" in s: + lo, hi = s.split("-", 1) + return int(lo), int(hi) + n = int(s) + return n, n + + +async def process_single_session( + messages: List[Dict[str, Any]], + sample_id: str | int, + session_key: str, + meta: Dict[str, Any], + run_time: str, + args: argparse.Namespace, +) -> Dict[str, Any]: + """处理单个会话的导入任务""" + try: + # 根据参数决定是否使用 sample_id 作为 user_id 和 agent_id + user_id = str(sample_id) if not args.no_user_agent_id else None + agent_id = str(sample_id) if not args.no_user_agent_id else None + result = await viking_ingest( + messages, + args.openviking_url, + meta.get("date_time"), + user_id=user_id, + agent_id=agent_id, + ) + token_usage = result["token_usage"] + task_id = result.get("task_id") + trace_id = result.get("trace_id", "") + embedding_tokens = token_usage.get("embedding", 0) + vlm_tokens = token_usage.get("vlm", 0) + print( + f" -> [COMPLETED] [{sample_id}/{session_key}] embed={embedding_tokens}, vlm={vlm_tokens}, task_id={task_id}, trace_id={trace_id}", + file=sys.stderr, + ) + + # Write success record + result = { + "timestamp": run_time, + "sample_id": sample_id, + "session": session_key, + "status": "success", + "meta": meta, + "token_usage": token_usage, + "embedding_tokens": embedding_tokens, + "vlm_tokens": vlm_tokens, + "task_id": task_id, + "trace_id": trace_id, + } + + # 写入成功CSV + write_success_record(result, args.success_csv) + + return result + + except Exception as e: + print(f" -> [ERROR] [{sample_id}/{session_key}] {e}", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + + # Write error record + result = { + "timestamp": run_time, + "sample_id": sample_id, + "session": session_key, + "status": "error", + "error": str(e), + } + + # 写入错误日志 + write_error_record(result, args.error_log) + + return result + + +async def run_import(args: argparse.Namespace) -> None: + session_range = parse_session_range(args.sessions) if args.sessions else None + + # 如果指定了 question-index,自动从 evidence 推断需要的 session + if args.question_index is not None and not args.sessions: + # 加载数据获取 question 的 evidence + with open(args.input, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取 sample + sample_idx = args.sample if args.sample is not None else 0 + if sample_idx < 0 or sample_idx >= len(data): + raise ValueError(f"sample index {sample_idx} out of range") + sample = data[sample_idx] + + # 获取 question 的 evidence + qa_items = sample.get("qa", []) + if args.question_index < 0 or args.question_index >= len(qa_items): + raise ValueError(f"question index {args.question_index} out of range") + qa = qa_items[args.question_index] + evidence_list = qa.get("evidence", []) + + # 从 evidence 提取 session 号 (D1:3 -> session 1) + session_nums = set() + for ev in evidence_list: + try: + # D1:3 -> session 1 + sess_num = int(ev.split(":")[0][1:]) + session_nums.add(sess_num) + except (ValueError, IndexError): + pass + + if session_nums: + min_sess = min(session_nums) + max_sess = max(session_nums) + session_range = (min_sess, max_sess) + print( + f"[INFO] Auto-detected sessions from evidence: {min_sess}-{max_sess}", + file=sys.stderr, + ) + + # 加载成功CSV记录用于去重 + success_keys = set() + if not args.force_ingest: + success_keys = load_success_csv(args.success_csv) + print( + f"[INFO] Loaded {len(success_keys)} existing success records from {args.success_csv}", + file=sys.stderr, + ) + + # Write run header + run_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + skipped_count = 0 + success_count = 0 + error_count = 0 + total_embedding_tokens = 0 + total_vlm_tokens = 0 + + if args.input.endswith(".json"): + # LoCoMo JSON format + samples = load_locomo_data(args.input, args.sample) + + # 为每个 sample 创建独立的处理协程 + async def process_sample(item): + sample_id = item["sample_id"] + sessions = build_session_messages(item, session_range) + + print(f"\n=== Sample {sample_id} ===", file=sys.stderr) + print(f" {len(sessions)} session(s) to import", file=sys.stderr) + + # 同一 sample 内串行处理所有 sessions + for sess in sessions: + meta = sess["meta"] + messages = sess["messages"] + session_key = meta["session_key"] + label = f"{session_key} ({meta['date_time']})" + + # Skip already ingested sessions unless force-ingest is enabled + if not args.force_ingest and is_already_ingested( + sample_id, session_key, success_keys + ): + print( + f" [{label}] [SKIP] already imported (use --force-ingest to reprocess)", + file=sys.stderr, + ) + nonlocal skipped_count + skipped_count += 1 + continue + + # Preview messages + preview = " | ".join( + [f"{msg['role']}: {msg['text'][:30]}..." for msg in messages[:3]] + ) + print(f" [{label}] {preview}", file=sys.stderr) + + # 串行执行(等待完成后再处理下一个 session) + await process_single_session( + messages=messages, + sample_id=sample_id, + session_key=session_key, + meta=meta, + run_time=run_time, + args=args, + ) + + # 不同 sample 之间并行执行 + tasks = [asyncio.create_task(process_sample(item)) for item in samples] + results = await asyncio.gather(*tasks, return_exceptions=True) + + else: + # Plain text format + sessions = parse_test_file(args.input) + print(f"Found {len(sessions)} session(s) in text file", file=sys.stderr) + + for idx, session in enumerate(sessions, start=1): + session_key = f"txt-session-{idx}" + print(f"\n=== Text Session {idx} ===", file=sys.stderr) + + # Skip already ingested sessions unless force-ingest is enabled + if not args.force_ingest and is_already_ingested( + "txt", session_key, success_keys + ): + print( + f" [SKIP] already imported (use --force-ingest to reprocess)", file=sys.stderr + ) + skipped_count += 1 + continue + + # For plain text, all messages as user role + messages = [] + for i, text in enumerate(session["messages"]): + messages.append( + {"role": "user", "text": text.strip(), "speaker": "user", "index": i} + ) + + preview = " | ".join([f"{msg['role']}: {msg['text'][:30]}..." for msg in messages[:3]]) + print(f" {preview}", file=sys.stderr) + + # 创建异步任务 + task = asyncio.create_task( + process_single_session( + messages=messages, + sample_id="txt", + session_key=session_key, + meta={"session_index": idx}, + run_time=run_time, + args=args, + ) + ) + tasks.append(task) + + # 等待所有 sample 处理完成 + print( + f"\n[INFO] Starting import with {len(tasks)} tasks to process", + file=sys.stderr, + ) + await asyncio.gather(*tasks, return_exceptions=True) + + # 从成功 CSV 统计结果 + if Path(args.success_csv).exists(): + with open(args.success_csv, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + success_count += 1 + total_embedding_tokens += int(row.get("embedding_tokens", 0) or 0) + total_vlm_tokens += int(row.get("vlm_tokens", 0) or 0) + + # Final summary + total_processed = success_count + error_count + skipped_count + print(f"\n=== Import summary ===", file=sys.stderr) + print(f"Total sessions: {total_processed}", file=sys.stderr) + print(f"Successfully imported: {success_count}", file=sys.stderr) + print(f"Failed: {error_count}", file=sys.stderr) + print(f"Skipped (already imported): {skipped_count}", file=sys.stderr) + print(f"\n=== Token usage summary ===", file=sys.stderr) + print(f"Total Embedding tokens: {total_embedding_tokens}", file=sys.stderr) + print(f"Total VLM tokens: {total_vlm_tokens}", file=sys.stderr) + if success_count > 0: + print( + f"Average Embedding per session: {total_embedding_tokens // success_count}", + file=sys.stderr, + ) + print(f"Average VLM per session: {total_vlm_tokens // success_count}", file=sys.stderr) + print(f"\nResults saved to:", file=sys.stderr) + print(f" - Success records: {args.success_csv}", file=sys.stderr) + print(f" - Error logs: {args.error_log}", file=sys.stderr) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(): + # 基于脚本所在目录计算默认数据文件路径 + script_dir = Path(__file__).parent.resolve() + default_input = str(script_dir / ".." / "data" / "locomo10.json") + + parser = argparse.ArgumentParser(description="Import conversations into OpenViking") + parser.add_argument( + "--input", + default=default_input, + help="Path to input file (.txt or LoCoMo .json)", + ) + parser.add_argument( + "--success-csv", + default="./result/import_success.csv", + help="Path to success records CSV file (default: import_success.csv)", + ) + parser.add_argument( + "--error-log", + default="./result/import_errors.log", + help="Path to error log file (default: import_errors.log)", + ) + parser.add_argument( + "--openviking-url", + default="http://localhost:1933", + help="OpenViking service URL (default: http://localhost:1933)", + ) + parser.add_argument( + "--sample", + type=int, + default=None, + help="LoCoMo JSON: sample index (0-based). Default: all samples.", + ) + parser.add_argument( + "--sessions", + default=None, + help="LoCoMo JSON: session range, e.g. '1-4' or '3'. Default: all sessions.", + ) + parser.add_argument( + "--question-index", + type=int, + default=None, + help="LoCoMo JSON: question index (0-based). When specified, auto-detect required sessions from question's evidence.", + ) + parser.add_argument( + "--force-ingest", + action="store_true", + default=False, + help="Force re-import even if already recorded as completed", + ) + parser.add_argument( + "--no-user-agent-id", + action="store_true", + default=False, + help="Do not pass user_id and agent_id to OpenViking client", + ) + args = parser.parse_args() + + # 确保输出目录存在 + Path(args.success_csv).parent.mkdir(parents=True, exist_ok=True) + Path(args.error_log).parent.mkdir(parents=True, exist_ok=True) + + try: + asyncio.run(run_import(args)) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/benchmark/locomo/openclaw/run_full_eval.sh b/benchmark/locomo/openclaw/run_full_eval.sh index a14cfa413..3f1edff11 100755 --- a/benchmark/locomo/openclaw/run_full_eval.sh +++ b/benchmark/locomo/openclaw/run_full_eval.sh @@ -6,9 +6,12 @@ set -e OpenClaw 完整评估流程脚本 用法: - ./run_full_eval.sh # 只导入 OpenViking - ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw - ./run_full_eval.sh --skip-import # 跳过导入步骤 + ./run_full_eval.sh # 只导入 OpenViking (所有 samples) + ./run_full_eval.sh --with-claw-import # 同时导入 OpenViking 和 OpenClaw (所有 samples) + ./run_full_eval.sh --skip-import # 跳过导入步骤 (所有 samples) + ./run_full_eval.sh --sample 0 # 只处理第 0 个 sample + ./run_full_eval.sh --sample 1 --with-claw-import # 只处理第 1 个 sample,同时导入 OpenClaw + ./run_full_eval.sh --force-ingest # 强制重新导入所有数据 ' # 基于脚本所在目录计算数据文件路径 @@ -16,20 +19,58 @@ SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" INPUT_FILE="$SCRIPT_DIR/../data/locomo10.json" RESULT_DIR="$SCRIPT_DIR/result" OUTPUT_CSV="$RESULT_DIR/qa_results.csv" -GATEWAY_TOKEN="your_gateway_token" +GATEWAY_TOKEN="90f2d2dc2f7b4d50cb943d3d3345e667bb3e9bcb7ec3a1fb" # 解析参数 SKIP_IMPORT=false WITH_CLAW_IMPORT=false -for arg in "$@"; do - if [ "$arg" = "--skip-import" ]; then - SKIP_IMPORT=true - elif [ "$arg" = "--with-claw-import" ]; then - WITH_CLAW_IMPORT=true - fi +FORCE_INGEST=false +SAMPLE_IDX="" + +while [[ $# -gt 0 ]]; do + case $1 in + --skip-import) + SKIP_IMPORT=true + shift + ;; + --with-claw-import) + WITH_CLAW_IMPORT=true + shift + ;; + --force-ingest) + FORCE_INGEST=true + shift + ;; + --sample) + if [ -z "$2" ] || [[ "$2" == --* ]]; then + echo "错误: --sample 需要一个参数 (sample index, 0-based)" + exit 1 + fi + SAMPLE_IDX="$2" + shift 2 + ;; + *) + echo "警告: 未知参数 $1" + shift + ;; + esac done +# 构建 sample 参数 +SAMPLE_ARG="" +if [ -n "$SAMPLE_IDX" ]; then + SAMPLE_ARG="--sample $SAMPLE_IDX" + # 如果指定了 sample,修改输出文件名以避免覆盖 + OUTPUT_CSV="$RESULT_DIR/qa_results_sample${SAMPLE_IDX}.csv" +fi + +# 构建 force-ingest 参数 +FORCE_INGEST_ARG="" +if [ "$FORCE_INGEST" = true ]; then + FORCE_INGEST_ARG="--force-ingest" +fi + # 确保结果目录存在 mkdir -p "$RESULT_DIR" @@ -39,18 +80,18 @@ if [ "$SKIP_IMPORT" = false ]; then echo "[1/5] 导入数据到 OpenViking 和 OpenClaw..." # 后台运行 OpenViking 导入 - python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest > "$RESULT_DIR/import_ov.log" 2>&1 & + python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" $FORCE_INGEST_ARG $SAMPLE_ARG > "$RESULT_DIR/import_ov.log" 2>&1 & PID_OV=$! # 后台运行 OpenClaw 导入 - python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" --force-ingest --token "$GATEWAY_TOKEN" > "$RESULT_DIR/import_claw.log" 2>&1 & + python "$SCRIPT_DIR/eval.py" ingest "$INPUT_FILE" $FORCE_INGEST_ARG --token "$GATEWAY_TOKEN" $SAMPLE_ARG > "$RESULT_DIR/import_claw.log" 2>&1 & PID_CLAW=$! # 等待两个导入任务完成 wait $PID_OV $PID_CLAW else echo "[1/5] 导入数据到 OpenViking..." - python "$SCRIPT_DIR/../vikingbot/import_to_ov.py" --input "$INPUT_FILE" --force-ingest + python "$SCRIPT_DIR/import_to_ov.py" --no-user-agent-id --input "$INPUT_FILE" $FORCE_INGEST_ARG $SAMPLE_ARG fi echo "导入完成,等待 1 分钟..." @@ -61,7 +102,7 @@ fi # Step 2: 运行 QA 模型(默认输出到 result/qa_results.csv) echo "[2/5] 运行 QA 评估..." -python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" +python "$SCRIPT_DIR/eval.py" qa "$INPUT_FILE" --token "$GATEWAY_TOKEN" $SAMPLE_ARG --parallel 15 --output "${OUTPUT_CSV%.csv}" # Step 3: 裁判打分 echo "[3/5] 裁判打分..." @@ -69,7 +110,7 @@ python "$SCRIPT_DIR/judge.py" --input "$OUTPUT_CSV" --parallel 40 # Step 4: 计算结果 echo "[4/5] 计算结果..." -python "$SCRIPT_DIR/../vikingbot/stat_judge_result.py" --input "$OUTPUT_CSV" +python "$SCRIPT_DIR/stat_judge_result.py" --input "$OUTPUT_CSV" echo "[5/5] 完成!" echo "结果文件: $OUTPUT_CSV" diff --git a/benchmark/locomo/openclaw/stat_judge_result.py b/benchmark/locomo/openclaw/stat_judge_result.py new file mode 100644 index 000000000..63816e004 --- /dev/null +++ b/benchmark/locomo/openclaw/stat_judge_result.py @@ -0,0 +1,161 @@ +import argparse +import csv +import os + + +def main(): + parser = argparse.ArgumentParser(description="Statistics for judge result csv") + parser.add_argument( + "--input", + default="./result/qa_results_sample0.csv", + help="Path to judge result csv file, default: ./result/qa_results_sample0.csv", + ) + parser.add_argument( + "--import-csv", + default="./result/import_success.csv", + help="Path to import_success.csv file for OpenViking token stats, default: ./result/import_success.csv", + ) + args = parser.parse_args() + + output_lines = [] + + # 统计 QA 结果 + if os.path.exists(args.input): + qa_stats = process_qa_results(args.input) + output_lines.extend(qa_stats) + else: + output_lines.append(f"Warning: QA result file not found: {args.input}") + + # 统计 Import token + if os.path.exists(args.import_csv): + if output_lines: + output_lines.append("") + import_stats = process_import_csv(args.import_csv) + output_lines.extend(import_stats) + else: + output_lines.append(f"Warning: Import CSV file not found: {args.import_csv}") + + # 打印到控制台 + for line in output_lines: + print(line) + + # 写入summary.txt + if args.input: + summary_path = os.path.join(os.path.dirname(args.input), "summary.txt") + elif args.import_csv: + summary_path = os.path.join(os.path.dirname(args.import_csv), "summary.txt") + else: + summary_path = "./result/summary.txt" + + os.makedirs(os.path.dirname(summary_path), exist_ok=True) + with open(summary_path, "w", encoding="utf-8") as f: + f.write("\n".join(output_lines) + "\n") + print(f"\nSummary saved to {summary_path}") + + +def process_qa_results(input_path: str) -> list[str]: + """处理 QA 结果 CSV""" + # 统计所有题目 (排除 category=5) + correct = 0 + wrong = 0 + total_no_cache_tokens = 0 # input_tokens + total_cache_read_tokens = 0 # cacheRead + total_output_tokens = 0 # output_tokens + total_input_tokens = 0 # no_cache + cacheRead + valid_rows = 0 + + with open(input_path, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + # 检查 category 是否为 5,跳过 + category = row.get("category", "") + if category == "5": + continue + + valid_rows += 1 + + # 统计结果 + result = row.get("result", "").strip().upper() + if result == "CORRECT": + correct += 1 + elif result == "WRONG": + wrong += 1 + + # 统计token + try: + no_cache = int(row.get("input_tokens", 0)) + cache_read = int(row.get("cacheRead", 0)) + output = int(row.get("output_tokens", 0)) + + total_no_cache_tokens += no_cache + total_cache_read_tokens += cache_read + total_output_tokens += output + total_input_tokens += no_cache + cache_read + except (ValueError, TypeError): + pass + + total_graded = correct + wrong + accuracy = correct / total_graded if total_graded > 0 else 0.0 + + # 平均 token 消耗 + avg_no_cache = total_no_cache_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_cache_read = total_cache_read_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_output = total_output_tokens / valid_rows if valid_rows > 0 else 0.0 + avg_total_input = total_input_tokens / valid_rows if valid_rows > 0 else 0.0 + + return [ + "=== Judge Result Statistics (excluding category=5) ===", + f"Total rows: {valid_rows:,}", + f"Graded rows: {total_graded:,}", + f"Correct: {correct:,}", + f"Wrong: {wrong:,}", + f"Accuracy: {accuracy:.2%}", + f"\nToken usage (QA):", + f" Total no-cache tokens (input_tokens): {total_no_cache_tokens:,}", + f" Total cacheRead tokens: {total_cache_read_tokens:,}", + f" Total output tokens: {total_output_tokens:,}", + f" Total input tokens (no-cache + cacheRead): {total_input_tokens:,}", + f" Avg no-cache tokens: {avg_no_cache:,.2f}", + f" Avg cacheRead tokens: {avg_cache_read:,.2f}", + f" Avg output tokens: {avg_output:,.2f}", + f" Avg total input tokens: {avg_total_input:,.2f}", + ] + + +def process_import_csv(input_path: str) -> list[str]: + """处理 import_success.csv 的 token 统计""" + total_embedding = 0 + total_vlm = 0 + total_total = 0 + valid_rows = 0 + + with open(input_path, "r", encoding="utf-8", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + valid_rows += 1 + try: + total_embedding += int(row.get("embedding_tokens", 0)) + total_vlm += int(row.get("vlm_tokens", 0)) + total_total += int(row.get("total_tokens", 0)) + except (ValueError, TypeError): + pass + + avg_embedding = total_embedding / valid_rows if valid_rows > 0 else 0.0 + avg_vlm = total_vlm / valid_rows if valid_rows > 0 else 0.0 + avg_total = total_total / valid_rows if valid_rows > 0 else 0.0 + + return [ + "=== OpenViking Import Token Statistics ===", + f"Total sessions: {valid_rows:,}", + f"\nToken usage (Import):", + f" Total embedding tokens: {total_embedding:,}", + f" Total VLM tokens: {total_vlm:,}", + f" Total tokens: {total_total:,}", + f" Avg embedding tokens: {avg_embedding:,.2f}", + f" Avg VLM tokens: {avg_vlm:,.2f}", + f" Avg total tokens: {avg_total:,.2f}", + ] + + +if __name__ == "__main__": + main()