From 0075cf8784c9509b214adf1e74967a48778d77db Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Wed, 18 Feb 2026 10:22:55 -0500 Subject: [PATCH 1/2] feat: migrate opencode scripts from API to sqlite --- scripts/opencode-find-session | 76 +++--- scripts/opencode-get-message | 98 +++++++ scripts/opencode-session-timeline | 411 ++++++++---------------------- scripts/opencode_api.py | 249 ++++++++++++++++++ 4 files changed, 489 insertions(+), 345 deletions(-) create mode 100755 scripts/opencode-get-message create mode 100644 scripts/opencode_api.py diff --git a/scripts/opencode-find-session b/scripts/opencode-find-session index 5b7e2087..9e5f3de2 100755 --- a/scripts/opencode-find-session +++ b/scripts/opencode-find-session @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Find OpenCode session IDs by title search. +Find OpenCode session IDs by title search using the OpenCode database. Returns matching session IDs ordered by last usage time. Usage: opencode-find-session [--exact] [--json] @@ -8,46 +8,28 @@ Usage: opencode-find-session [--exact] [--json] import json import argparse -from pathlib import Path from datetime import datetime +from opencode_api import APIError, add_api_arguments, create_client_from_args, list_sessions_across_projects -def get_all_sessions(storage: Path) -> list[dict]: - """Get all sessions with their metadata.""" - session_dir = storage / "session" - message_dir = storage / "message" - - if not session_dir.exists(): - return [] - + +def get_all_sessions(client, session_list_limit: int) -> list[dict]: + """Get all sessions with normalized metadata.""" + api_sessions = list_sessions_across_projects(client, per_project_limit=session_list_limit) sessions = [] - - for app_dir in session_dir.iterdir(): - if not app_dir.is_dir(): - continue - - for session_file in app_dir.glob("*.json"): - try: - session = json.loads(session_file.read_text()) - session_id = session_file.stem - - # Get last modified time from message directory - msg_path = message_dir / session_id - if msg_path.exists(): - mtime = msg_path.stat().st_mtime - else: - mtime = session_file.stat().st_mtime - - sessions.append({ - "id": session_id, - "title": session.get("title", "Untitled"), - "created_at": session.get("createdAt"), - "last_used": mtime, - "last_used_iso": datetime.fromtimestamp(mtime).isoformat() - }) - except (json.JSONDecodeError, IOError): - pass - + for session in api_sessions: + time_data = session.get("time", {}) + updated_ms = time_data.get("updated") or time_data.get("created") or 0 + last_used = updated_ms / 1000 if updated_ms else 0 + sessions.append( + { + "id": session.get("id", ""), + "title": session.get("title", "Untitled"), + "created_at": time_data.get("created"), + "last_used": last_used, + "last_used_iso": datetime.fromtimestamp(last_used).isoformat() if last_used else None, + } + ) return sessions @@ -101,6 +83,7 @@ def main(): ) parser.add_argument( "search_term", + nargs="?", type=str, help="Text to search for in session titles" ) @@ -119,16 +102,19 @@ def main(): action="store_true", help="Show all sessions (ignore search term)" ) + add_api_arguments(parser) args = parser.parse_args() - - storage = Path.home() / ".local/share/opencode/storage" - - if not storage.exists(): - print("Error: OpenCode storage not found at", storage) + + if not args.all and not args.search_term: + parser.error("search_term is required unless --all is used") + + try: + with create_client_from_args(args) as client: + sessions = get_all_sessions(client, args.session_list_limit) + except APIError as err: + print(f"Error: {err}") return 1 - - sessions = get_all_sessions(storage) - + if args.all: results = sorted(sessions, key=lambda s: s["last_used"], reverse=True) else: diff --git a/scripts/opencode-get-message b/scripts/opencode-get-message new file mode 100755 index 00000000..ac39a623 --- /dev/null +++ b/scripts/opencode-get-message @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +""" +Get full OpenCode message payload(s) by message ID from the OpenCode database. + +Usage: + opencode-get-message [message-id ...] + opencode-get-message --session [message-id ...] +""" + +import argparse +import json + +from opencode_api import APIError, add_api_arguments, create_client_from_args, list_sessions_across_projects + + +def normalize_message_payload(payload: dict) -> dict: + """Normalize API response into expected output shape.""" + info = payload.get("info", {}) + parts = payload.get("parts", []) + return {"info": info, "parts": parts} + + +def not_found_result(message_id: str) -> dict: + return {"id": message_id, "error": "message_not_found"} + + +def get_message_for_session(client, session: dict, message_id: str) -> dict: + """Get a message payload within a known session.""" + session_id = session.get("id", "") + directory = session.get("directory") + try: + payload = client.get_session_message(session_id, message_id, directory=directory) + return normalize_message_payload(payload) + except APIError as err: + if err.status_code == 404: + return not_found_result(message_id) + raise + + +def find_messages_without_session(client, message_ids: list[str], scan_sessions: int, session_list_limit: int) -> list[dict]: + """Search recent sessions for requested message IDs.""" + wanted = set(message_ids) + found: dict[str, dict] = {} + + sessions = list_sessions_across_projects(client, per_project_limit=session_list_limit) + if scan_sessions > 0: + sessions = sessions[:scan_sessions] + + for session in sessions: + if not wanted: + break + messages = client.get_session_messages(session.get("id", ""), directory=session.get("directory")) + for message in messages: + info = message.get("info", {}) + mid = info.get("id") + if mid in wanted: + found[mid] = normalize_message_payload(message) + wanted.remove(mid) + + return [found.get(message_id, not_found_result(message_id)) for message_id in message_ids] + + +def main() -> int: + parser = argparse.ArgumentParser(description="Get full OpenCode message payload by message ID") + parser.add_argument("--session", "-s", type=str, default=None, help="Session ID for direct lookup") + parser.add_argument( + "--scan-sessions", + type=int, + default=200, + help="When --session is omitted, scan this many recent sessions for message IDs", + ) + parser.add_argument("message_ids", nargs="+", help="One or more message IDs") + add_api_arguments(parser) + args = parser.parse_args() + + try: + with create_client_from_args(args) as client: + if args.session: + session = client.get_session(args.session) + results = [get_message_for_session(client, session, message_id) for message_id in args.message_ids] + else: + results = find_messages_without_session( + client, + args.message_ids, + scan_sessions=args.scan_sessions, + session_list_limit=args.session_list_limit, + ) + except APIError as err: + print(f"Error: {err}") + return 1 + + output = results[0] if len(results) == 1 else results + print(json.dumps(output, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/opencode-session-timeline b/scripts/opencode-session-timeline index a3683cea..3a24a049 100755 --- a/scripts/opencode-session-timeline +++ b/scripts/opencode-session-timeline @@ -3,22 +3,30 @@ Analyze token values at each step within a single OpenCode session. Shows cache growth over time and highlights DCP tool usage that causes cache drops. -Usage: opencode-session-timeline [--session ID] [--json] [--no-color] +Queries the OpenCode SQLite database directly for fast, offline access. + +Usage: opencode-session-timeline [--session ID] [--json] [--no-color] [--db PATH] """ -import json import argparse -from pathlib import Path +import json from typing import Optional -from datetime import datetime -# DCP tool names (tools that prune context and reduce cache) +from opencode_api import APIError, add_api_arguments, create_client_from_args, list_sessions_across_projects + + DCP_TOOLS = { - "prune", "discard", "extract", "context_pruning", - "squash", "compress", "consolidate", "distill" + "compress", + "prune", + "distill", + "discard", + "extract", + "context_pruning", + "squash", + "consolidate", } -# ANSI colors + class Colors: RESET = "\033[0m" BOLD = "\033[1m" @@ -26,82 +34,35 @@ class Colors: RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" - BLUE = "\033[34m" - MAGENTA = "\033[35m" CYAN = "\033[36m" + NO_COLOR = Colors() for attr in dir(NO_COLOR): - if not attr.startswith('_'): + if not attr.startswith("_"): setattr(NO_COLOR, attr, "") -def format_duration(ms: Optional[int], colors: Colors = None) -> str: - """Format milliseconds as human-readable duration.""" +def format_duration(ms: Optional[int]) -> str: if ms is None: return "-" - seconds = ms / 1000 if seconds < 60: return f"{seconds:.1f}s" - elif seconds < 3600: + if seconds < 3600: minutes = int(seconds // 60) secs = seconds % 60 return f"{minutes}m{secs:.0f}s" - else: - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - return f"{hours}h{minutes}m" - - -def get_session_messages(storage: Path, session_id: str) -> list[dict]: - """Get all messages for a session, sorted by creation order.""" - message_dir = storage / "message" / session_id - if not message_dir.exists(): - return [] - - messages = [] - for msg_file in message_dir.glob("*.json"): - try: - msg = json.loads(msg_file.read_text()) - msg["_file"] = str(msg_file) - msg["_id"] = msg_file.stem - # Extract timing info - time_info = msg.get("time", {}) - msg["_created"] = time_info.get("created") - msg["_completed"] = time_info.get("completed") - messages.append(msg) - except (json.JSONDecodeError, IOError): - pass - - return sorted(messages, key=lambda m: m.get("_id", "")) - - -def get_message_parts(storage: Path, message_id: str) -> list[dict]: - """Get all parts for a message, sorted by creation order.""" - parts_dir = storage / "part" / message_id - if not parts_dir.exists(): - return [] - - parts = [] - for part_file in parts_dir.glob("*.json"): - try: - part = json.loads(part_file.read_text()) - part["_file"] = str(part_file) - part["_id"] = part_file.stem - parts.append(part) - except (json.JSONDecodeError, IOError): - pass - - return sorted(parts, key=lambda p: p.get("_id", "")) + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + return f"{hours}h{minutes}m" def extract_step_data(parts: list[dict]) -> Optional[dict]: - """Extract step-finish data and tool calls from message parts.""" step_finish = None tools_used = [] dcp_tools_used = [] - + for part in parts: if part.get("type") == "step-finish" and "tokens" in part: step_finish = part @@ -110,13 +71,12 @@ def extract_step_data(parts: list[dict]) -> Optional[dict]: tools_used.append(tool_name) if tool_name in DCP_TOOLS: dcp_tools_used.append(tool_name) - + if step_finish is None: return None - + tokens = step_finish.get("tokens", {}) cache = tokens.get("cache", {}) - return { "input": tokens.get("input", 0), "output": tokens.get("output", 0), @@ -127,286 +87,137 @@ def extract_step_data(parts: list[dict]) -> Optional[dict]: "reason": step_finish.get("reason", "unknown"), "tools_used": tools_used, "dcp_tools_used": dcp_tools_used, - "has_dcp": len(dcp_tools_used) > 0 + "has_dcp": len(dcp_tools_used) > 0, } -def get_most_recent_session(storage: Path) -> Optional[str]: - """Get the most recent session ID.""" - message_dir = storage / "message" - if not message_dir.exists(): - return None - - sessions = sorted(message_dir.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True) - return sessions[0].name if sessions else None - - -def get_session_title(storage: Path, session_id: str) -> str: - """Get session title from metadata.""" - session_dir = storage / "session" - if not session_dir.exists(): - return "Unknown" - - for s_dir in session_dir.iterdir(): - s_file = s_dir / f"{session_id}.json" - if s_file.exists(): - try: - sess = json.loads(s_file.read_text()) - return sess.get("title", "Untitled") - except (json.JSONDecodeError, IOError): - pass - return "Unknown" - - -def analyze_session(storage: Path, session_id: str) -> dict: - """Analyze a single session step by step.""" - messages = get_session_messages(storage, session_id) - title = get_session_title(storage, session_id) - +def get_most_recent_session(client, session_list_limit: int) -> Optional[dict]: + sessions = list_sessions_across_projects(client, per_project_limit=session_list_limit) + return sessions[0] if sessions else None + + +def analyze_session(client, session: dict) -> dict: + session_id = session["id"] + messages = client.get_session_messages(session_id, directory=session.get("directory")) + title = session.get("title", "Unknown") + steps = [] - for msg in messages: - msg_id = msg.get("_id", "") - parts = get_message_parts(storage, msg_id) + for message in messages: + info = message.get("info", {}) + parts = message.get("parts", []) step_data = extract_step_data(parts) - - if step_data: - step_data["message_id"] = msg_id - step_data["created"] = msg.get("_created") - step_data["completed"] = msg.get("_completed") - steps.append(step_data) - - # Calculate deltas - for i, step in enumerate(steps): - if i == 0: + if not step_data: + continue + time_info = info.get("time", {}) + step_data["message_id"] = info.get("id", "") + step_data["created"] = time_info.get("created") + step_data["completed"] = time_info.get("completed") + steps.append(step_data) + + for idx, step in enumerate(steps): + if idx == 0: step["cache_read_delta"] = step["cache_read"] step["input_delta"] = step["input"] + step["time_since_prev_ms"] = None else: - prev = steps[i - 1] + prev = steps[idx - 1] step["cache_read_delta"] = step["cache_read"] - prev["cache_read"] step["input_delta"] = step["input"] - prev["input"] - - # Calculate cache hit rate - total_context = step["input"] + step["cache_read"] - step["cache_hit_rate"] = (step["cache_read"] / total_context * 100) if total_context > 0 else 0 - - # Calculate step duration and time since previous step + prev_completed = prev.get("completed") + created = step.get("created") + step["time_since_prev_ms"] = (created - prev_completed) if (prev_completed and created) else None + created = step.get("created") completed = step.get("completed") - - if created and completed: - step["duration_ms"] = completed - created - else: - step["duration_ms"] = None - - if i == 0: - step["time_since_prev_ms"] = None - else: - prev_completed = steps[i - 1].get("completed") - if prev_completed and created: - step["time_since_prev_ms"] = created - prev_completed - else: - step["time_since_prev_ms"] = None - + step["duration_ms"] = (completed - created) if (created and completed) else None + + total_context = step["input"] + step["cache_read"] + step["cache_hit_rate"] = (step["cache_read"] / total_context * 100) if total_context > 0 else 0 + return { "session_id": session_id, "title": title, "steps": steps, - "total_steps": len(steps) + "total_steps": len(steps), } def print_timeline(result: dict, colors: Colors): - """Print the step-by-step timeline.""" c = colors - print(f"{c.BOLD}{'=' * 130}{c.RESET}") print(f"{c.BOLD}SESSION TIMELINE: Token Values at Each Step{c.RESET}") - print(f"{c.BOLD}{'=' * 130}{c.RESET}") - print() + print(f"{c.BOLD}{'=' * 130}{c.RESET}\n") print(f" Session: {c.CYAN}{result['session_id']}{c.RESET}") print(f" Title: {result['title']}") - print(f" Steps: {result['total_steps']}") - print() - + print(f" Steps: {result['total_steps']}\n") + if not result["steps"]: print(" No steps found in this session.") return - - # Header - print(f"{c.BOLD}{'Step':<6} {'Cache Read':>12} {'Δ Cache':>12} {'Input':>10} {'Output':>10} {'Cache %':>9} {'Duration':>10} {'Gap':>10} {'DCP Tools':<15} {'Reason':<12}{c.RESET}") + + print( + f"{c.BOLD}{'Step':<6} {'Cache Read':>12} {'Δ Cache':>12} {'Input':>10} {'Output':>10} {'Cache %':>9} {'Duration':>10} {'Gap':>10} {'DCP Tools':<15} {'Reason':<12}{c.RESET}" + ) print("-" * 130) - - prev_cache = 0 - for i, step in enumerate(result["steps"], 1): - cache_read = step["cache_read"] + + for idx, step in enumerate(result["steps"], 1): cache_delta = step["cache_read_delta"] - input_tokens = step["input"] - output_tokens = step["output"] - cache_pct = step["cache_hit_rate"] - has_dcp = step["has_dcp"] - dcp_tools = step["dcp_tools_used"] - reason = step["reason"] - - # Color the delta based on direction if cache_delta > 0: - delta_str = f"{c.GREEN}+{cache_delta:,}{c.RESET}" + delta = f"{c.GREEN}{'+' + f'{cache_delta:,}':>11}{c.RESET}" elif cache_delta < 0: - delta_str = f"{c.RED}{cache_delta:,}{c.RESET}" - else: - delta_str = f"{c.DIM}0{c.RESET}" - - # Pad delta string for alignment (accounting for color codes) - delta_display = f"{cache_delta:+,}" if cache_delta != 0 else "0" - delta_padded = f"{delta_str:>22}" if cache_delta != 0 else f"{c.DIM}{'0':>12}{c.RESET}" - - # Highlight DCP rows - if has_dcp: - row_prefix = f"{c.YELLOW}{c.BOLD}" - row_suffix = c.RESET - dcp_str = f"{c.YELLOW}{', '.join(dcp_tools)}{c.RESET}" + delta = f"{c.RED}{f'{cache_delta:,}':>12}{c.RESET}" else: - row_prefix = "" - row_suffix = "" - dcp_str = f"{c.DIM}-{c.RESET}" - - # Cache percentage coloring - if cache_pct >= 80: - pct_str = f"{c.GREEN}{cache_pct:>8.1f}%{c.RESET}" - elif cache_pct >= 50: - pct_str = f"{c.YELLOW}{cache_pct:>8.1f}%{c.RESET}" - else: - pct_str = f"{c.RED}{cache_pct:>8.1f}%{c.RESET}" - - # Format delta with proper width - if cache_delta > 0: - delta_formatted = f"{c.GREEN}{'+' + f'{cache_delta:,}':>11}{c.RESET}" - elif cache_delta < 0: - delta_formatted = f"{c.RED}{f'{cache_delta:,}':>12}{c.RESET}" + delta = f"{c.DIM}{'0':>12}{c.RESET}" + + pct = step["cache_hit_rate"] + if pct >= 80: + pct_str = f"{c.GREEN}{pct:>8.1f}%{c.RESET}" + elif pct >= 50: + pct_str = f"{c.YELLOW}{pct:>8.1f}%{c.RESET}" else: - delta_formatted = f"{c.DIM}{'0':>12}{c.RESET}" - - print(f"{row_prefix}{i:<6}{row_suffix} {cache_read:>12,} {delta_formatted} {input_tokens:>10,} {output_tokens:>10,} {pct_str} {format_duration(step.get('duration_ms')):>10} {format_duration(step.get('time_since_prev_ms')):>10} {dcp_str:<15} {reason:<12}") - - prev_cache = cache_read - + pct_str = f"{c.RED}{pct:>8.1f}%{c.RESET}" + + dcp_str = f"{c.YELLOW}{', '.join(step['dcp_tools_used'])}{c.RESET}" if step["has_dcp"] else f"{c.DIM}-{c.RESET}" + row_prefix = f"{c.YELLOW}{c.BOLD}" if step["has_dcp"] else "" + row_suffix = c.RESET if step["has_dcp"] else "" + + print( + f"{row_prefix}{idx:<6}{row_suffix} {step['cache_read']:>12,} {delta} {step['input']:>10,} {step['output']:>10,} {pct_str} {format_duration(step.get('duration_ms')):>10} {format_duration(step.get('time_since_prev_ms')):>10} {dcp_str:<15} {step['reason']:<12}" + ) + print("-" * 130) - print() - - # Summary statistics - steps = result["steps"] - total_input = sum(s["input"] for s in steps) - total_output = sum(s["output"] for s in steps) - total_cache_read = sum(s["cache_read"] for s in steps) - - dcp_steps = [s for s in steps if s["has_dcp"]] - cache_increases = [s for s in steps if s["cache_read_delta"] > 0] - cache_decreases = [s for s in steps if s["cache_read_delta"] < 0] - - # Overall cache hit rate - total_context = total_input + total_cache_read - overall_cache_rate = (total_cache_read / total_context * 100) if total_context > 0 else 0 - - print(f"{c.BOLD}CACHE BEHAVIOR SUMMARY{c.RESET}") - print("-" * 50) - - # Overall cache hit rate with coloring - if overall_cache_rate >= 80: - rate_str = f"{c.GREEN}{overall_cache_rate:.1f}%{c.RESET}" - elif overall_cache_rate >= 50: - rate_str = f"{c.YELLOW}{overall_cache_rate:.1f}%{c.RESET}" - else: - rate_str = f"{c.RED}{overall_cache_rate:.1f}%{c.RESET}" - - print(f" {c.BOLD}Overall cache hit rate: {rate_str}{c.RESET}") - print(f" Total input tokens: {total_input:>12,}") - print(f" Total cache read tokens: {total_cache_read:>12,}") - print() - print(f" Steps with cache increase: {c.GREEN}{len(cache_increases):>5}{c.RESET}") - print(f" Steps with cache decrease: {c.RED}{len(cache_decreases):>5}{c.RESET}") - print(f" Steps with DCP tools: {c.YELLOW}{len(dcp_steps):>5}{c.RESET}") - print() - - if dcp_steps: - dcp_decreases = [s for s in dcp_steps if s["cache_read_delta"] < 0] - print(f" DCP steps with cache drop: {len(dcp_decreases)}/{len(dcp_steps)}") - if dcp_decreases: - avg_drop = sum(s["cache_read_delta"] for s in dcp_decreases) / len(dcp_decreases) - print(f" Avg cache drop on DCP: {c.RED}{avg_drop:,.0f}{c.RESET} tokens") - - print() - - # Cache growth verification - if len(steps) >= 2: - first_cache = steps[0]["cache_read"] - last_cache = steps[-1]["cache_read"] - max_cache = max(s["cache_read"] for s in steps) - - print(f"{c.BOLD}CACHE GROWTH VERIFICATION{c.RESET}") - print("-" * 50) - print(f" First step cache read: {first_cache:>12,}") - print(f" Last step cache read: {last_cache:>12,}") - print(f" Max cache read observed: {max_cache:>12,}") - - if last_cache > first_cache: - growth = last_cache - first_cache - print(f" Net cache growth: {c.GREEN}+{growth:>11,}{c.RESET}") - print(f"\n {c.GREEN}✓ Provider caching appears to be working{c.RESET}") - elif last_cache < first_cache: - loss = first_cache - last_cache - print(f" Net cache loss: {c.RED}-{loss:>11,}{c.RESET}") - if dcp_steps: - print(f"\n {c.YELLOW}⚠ Cache decreased (likely due to DCP pruning){c.RESET}") - else: - print(f"\n {c.RED}⚠ Cache decreased without DCP - investigate{c.RESET}") - else: - print(f"\n {c.DIM}Cache unchanged between first and last step{c.RESET}") - - print() - print(f"{c.BOLD}{'=' * 130}{c.RESET}") def main(): - parser = argparse.ArgumentParser( - description="Analyze token values at each step within an OpenCode session" - ) - parser.add_argument( - "--session", "-s", type=str, default=None, - help="Session ID to analyze (default: most recent)" - ) - parser.add_argument( - "--json", "-j", action="store_true", - help="Output as JSON" - ) - parser.add_argument( - "--no-color", action="store_true", - help="Disable colored output" - ) + parser = argparse.ArgumentParser(description="Analyze token values at each step within an OpenCode session") + parser.add_argument("--session", "-s", type=str, default=None, help="Session ID to analyze (default: most recent)") + parser.add_argument("--json", "-j", action="store_true", help="Output as JSON") + parser.add_argument("--no-color", action="store_true", help="Disable colored output") + add_api_arguments(parser) args = parser.parse_args() - - storage = Path.home() / ".local/share/opencode/storage" - - if not storage.exists(): - print("Error: OpenCode storage not found at", storage) + + try: + with create_client_from_args(args) as client: + if args.session is None: + session = get_most_recent_session(client, args.session_list_limit) + if session is None: + print("Error: No sessions found") + return 1 + else: + session = client.get_session(args.session) + result = analyze_session(client, session) + except APIError as err: + print(f"Error: {err}") return 1 - - session_id = args.session - if session_id is None: - session_id = get_most_recent_session(storage) - if session_id is None: - print("Error: No sessions found") - return 1 - - result = analyze_session(storage, session_id) - + if args.json: - # Remove non-serializable fields print(json.dumps(result, indent=2, default=str)) else: colors = NO_COLOR if args.no_color else Colors() print_timeline(result, colors) - + return 0 if __name__ == "__main__": - exit(main()) + raise SystemExit(main()) diff --git a/scripts/opencode_api.py b/scripts/opencode_api.py new file mode 100644 index 00000000..8b01987f --- /dev/null +++ b/scripts/opencode_api.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +"""Shared helpers for querying the OpenCode SQLite database from scripts.""" + +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from typing import Any + + +DEFAULT_DB_PATH = Path.home() / ".local/share/opencode/opencode.db" +DEFAULT_SESSION_LIST_LIMIT = 5000 + + +class APIError(RuntimeError): + """Script data access error (kept for backwards compatibility).""" + + def __init__(self, message: str, *, status_code: int | None = None): + super().__init__(message) + self.status_code = status_code + + +class OpencodeAPI: + """Compatibility wrapper with the old API client interface.""" + + def __init__(self, db_path: Path): + self.db_path = db_path + self.conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + self.conn.row_factory = sqlite3.Row + + def close(self): + self.conn.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + + def health(self) -> dict[str, Any]: + self.conn.execute("SELECT 1").fetchone() + return {"status": "ok"} + + def list_projects(self) -> list[dict[str, Any]]: + rows = self.conn.execute( + "SELECT id, worktree FROM project ORDER BY time_updated DESC" + ).fetchall() + return [{"id": row["id"], "worktree": row["worktree"]} for row in rows] + + def _format_session_row(self, row: sqlite3.Row) -> dict[str, Any]: + return { + "id": row["id"], + "projectID": row["project_id"], + "parentID": row["parent_id"], + "directory": row["directory"], + "title": row["title"], + "time": {"created": row["time_created"], "updated": row["time_updated"]}, + } + + def list_sessions( + self, + *, + directory: str | None = None, + roots: bool | None = None, + start: int | None = None, + search: str | None = None, + limit: int | None = None, + ) -> list[dict[str, Any]]: + clauses: list[str] = [] + params: list[Any] = [] + + if directory: + clauses.append("directory = ?") + params.append(directory) + if roots is True: + clauses.append("parent_id IS NULL") + elif roots is False: + clauses.append("parent_id IS NOT NULL") + if search: + clauses.append("LOWER(title) LIKE ?") + params.append(f"%{search.lower()}%") + + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + query = f""" + SELECT id, project_id, parent_id, directory, title, time_created, time_updated + FROM session + {where} + ORDER BY time_updated DESC + """ + + if start is not None and start > 0: + query += " LIMIT -1 OFFSET ?" + params.append(start) + if limit is not None and limit > 0: + query = query.replace("LIMIT -1", "LIMIT ?", 1) + params[-1] = limit + params.append(start) + elif limit is not None and limit > 0: + query += " LIMIT ?" + params.append(limit) + + rows = self.conn.execute(query, params).fetchall() + return [self._format_session_row(row) for row in rows] + + def get_session(self, session_id: str, *, directory: str | None = None) -> dict[str, Any]: + params: list[Any] = [session_id] + query = """ + SELECT id, project_id, parent_id, directory, title, time_created, time_updated + FROM session + WHERE id = ? + """ + if directory: + query += " AND directory = ?" + params.append(directory) + row = self.conn.execute(query, params).fetchone() + if row is None: + raise APIError(f"Session not found: {session_id}", status_code=404) + return self._format_session_row(row) + + def get_session_messages( + self, + session_id: str, + *, + directory: str | None = None, + limit: int | None = None, + ) -> list[dict[str, Any]]: + if directory: + self.get_session(session_id, directory=directory) + + message_query = """ + SELECT id, data + FROM message + WHERE session_id = ? + ORDER BY time_created ASC + """ + message_params: list[Any] = [session_id] + if limit is not None and limit > 0: + message_query += " LIMIT ?" + message_params.append(limit) + message_rows = self.conn.execute(message_query, message_params).fetchall() + + part_rows = self.conn.execute( + """ + SELECT message_id, data + FROM part + WHERE session_id = ? + ORDER BY time_created ASC + """, + [session_id], + ).fetchall() + + parts_by_message: dict[str, list[dict[str, Any]]] = {} + for row in part_rows: + try: + payload = json.loads(row["data"]) + except (json.JSONDecodeError, TypeError): + continue + parts_by_message.setdefault(row["message_id"], []).append(payload) + + messages: list[dict[str, Any]] = [] + for row in message_rows: + try: + info = json.loads(row["data"]) + except (json.JSONDecodeError, TypeError): + info = {} + info["id"] = row["id"] + messages.append({"info": info, "parts": parts_by_message.get(row["id"], [])}) + return messages + + def get_session_message( + self, + session_id: str, + message_id: str, + *, + directory: str | None = None, + ) -> dict[str, Any]: + if directory: + self.get_session(session_id, directory=directory) + + row = self.conn.execute( + """ + SELECT data + FROM message + WHERE session_id = ? AND id = ? + """, + [session_id, message_id], + ).fetchone() + if row is None: + raise APIError(f"Message not found: {message_id}", status_code=404) + + try: + info = json.loads(row["data"]) + except (json.JSONDecodeError, TypeError): + info = {} + info["id"] = message_id + + part_rows = self.conn.execute( + """ + SELECT data + FROM part + WHERE session_id = ? AND message_id = ? + ORDER BY time_created ASC + """, + [session_id, message_id], + ).fetchall() + + parts: list[dict[str, Any]] = [] + for part_row in part_rows: + try: + parts.append(json.loads(part_row["data"])) + except (json.JSONDecodeError, TypeError): + continue + + return {"info": info, "parts": parts} + + +def add_api_arguments(parser): + parser.add_argument( + "--db", + type=str, + default=str(DEFAULT_DB_PATH), + help=f"Path to OpenCode SQLite database (default: {DEFAULT_DB_PATH})", + ) + parser.add_argument( + "--session-list-limit", + type=int, + default=DEFAULT_SESSION_LIST_LIMIT, + help="Max sessions scanned", + ) + + +def create_client_from_args(args) -> OpencodeAPI: + db_path = Path(getattr(args, "db", DEFAULT_DB_PATH)).expanduser() + if not db_path.exists(): + raise APIError(f"OpenCode database not found: {db_path}") + client = OpencodeAPI(db_path) + client.health() + return client + + +def list_sessions_across_projects( + client: OpencodeAPI, + *, + search: str | None = None, + roots: bool | None = None, + per_project_limit: int = DEFAULT_SESSION_LIST_LIMIT, +) -> list[dict[str, Any]]: + return client.list_sessions(search=search, roots=roots, limit=per_project_limit) From 9da5765b021c16fde4efefb5ab335b8ceddd3bcf Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Wed, 18 Feb 2026 22:21:28 -0500 Subject: [PATCH 2/2] fix: guard malformed compress summaries --- lib/messages/prune.ts | 54 +++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/lib/messages/prune.ts b/lib/messages/prune.ts index 783647ff..9d317f4e 100644 --- a/lib/messages/prune.ts +++ b/lib/messages/prune.ts @@ -179,33 +179,41 @@ const filterCompressedRanges = ( const msgId = msg.info.id // Check if there's a summary to inject at this anchor point - const summary = state.compressSummaries?.find((s) => s.anchorMessageId === msgId) + const summary = state.compressSummaries?.find((s) => s?.anchorMessageId === msgId) if (summary) { - // Find user message for variant and as base for synthetic message - const msgIndex = messages.indexOf(msg) - const userMessage = getLastUserMessage(messages, msgIndex) - - if (userMessage) { - const userInfo = userMessage.info as UserMessage - const summaryContent = summary.summary - const summarySeed = `${summary.blockId}:${summary.anchorMessageId}` - result.push( - createSyntheticUserMessage( - userMessage, - summaryContent, - userInfo.variant, - summarySeed, - ), - ) - - logger.info("Injected compress summary", { + const rawSummaryContent = (summary as { summary?: unknown }).summary + if (typeof rawSummaryContent !== "string" || rawSummaryContent.length === 0) { + logger.warn("Skipping malformed compress summary", { anchorMessageId: msgId, - summaryLength: summary.summary.length, + blockId: (summary as { blockId?: unknown }).blockId, }) } else { - logger.warn("No user message found for compress summary", { - anchorMessageId: msgId, - }) + // Find user message for variant and as base for synthetic message + const msgIndex = messages.indexOf(msg) + const userMessage = getLastUserMessage(messages, msgIndex) + + if (userMessage) { + const userInfo = userMessage.info as UserMessage + const summaryContent = rawSummaryContent + const summarySeed = `${summary.blockId}:${summary.anchorMessageId}` + result.push( + createSyntheticUserMessage( + userMessage, + summaryContent, + userInfo.variant, + summarySeed, + ), + ) + + logger.info("Injected compress summary", { + anchorMessageId: msgId, + summaryLength: summaryContent.length, + }) + } else { + logger.warn("No user message found for compress summary", { + anchorMessageId: msgId, + }) + } } }