diff --git a/README.md b/README.md index f783962..2ce77f5 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ CLI tool that captures browser traffic and automatically generates production-re - 🌐 **Browser Automation**: Built on Playwright with stealth mode for realistic browsing - πŸ“Š **HAR Recording**: Captures all network traffic in HTTP Archive format - πŸ€– **AI-Powered Generation**: Uses Claude 4.5 to analyze traffic and generate clean Python code +- πŸ”Œ **OpenCode SDK Support**: Native integration with OpenCode SDK for more flexibility - πŸ’» **Interactive CLI**: Minimalist terminal interface with mode cycling (Shift+Tab) - πŸ“¦ **Production Ready**: Generated scripts include error handling, type hints, and documentation - πŸ’Ύ **Session History**: All runs saved locally with full message logs @@ -115,10 +116,19 @@ Settings are stored in `~/.reverse-api/config.json`: ```json { "model": "claude-sonnet-4-5", + "sdk": "claude", "output_dir": null } ``` +### SDK Selection + +Choose between two SDKs: +- **OpenCode**: Uses OpenCode SDK for AI-powered reverse engineering. Requires OpenCode to be running locally. +- **Claude** (default): Direct integration with Anthropic's Claude API. + +Change SDK in `/settings` or edit `config.json` directly. When using OpenCode SDK, ensure OpenCode is running (`opencode` command). + ## πŸ“ Project Structure ``` @@ -160,11 +170,9 @@ Generated `api_client.py` includes: ## πŸ—ΊοΈ Roadmap ### SDK Support -Expanding support for additional SDKs and platforms: -- **OpenCode** - Integration with OpenCode SDK -- **Cursor Agent CLI** - Support for Cursor's agent CLI -- **Droid** - Android SDK integration -- **Codex** - Codex SDK support +- βœ… **Claude** - Integration with Claude Code +- βœ… **OpenCode** - Integration with OpenCode +- πŸ”„ **Codex** - Codex SDK support ### Fully Automated Extraction Adding browser agent capabilities for fully automated API extraction: @@ -193,7 +201,7 @@ uv build ## πŸ” Requirements - Python 3.10+ -- Claude Code +- Claude Code / OpenCode - Playwright browsers installed ## 🀝 Contributing diff --git a/src/reverse_api/base_engineer.py b/src/reverse_api/base_engineer.py new file mode 100644 index 0000000..880c1e8 --- /dev/null +++ b/src/reverse_api/base_engineer.py @@ -0,0 +1,70 @@ +"""Abstract base class for API reverse engineering.""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Optional, Dict, Any + +from .utils import get_scripts_dir, get_timestamp +from .tui import ClaudeUI +from .messages import MessageStore + + +class BaseEngineer(ABC): + """Abstract base class for API reverse engineering implementations.""" + + def __init__( + self, + run_id: str, + har_path: Path, + prompt: str, + model: Optional[str] = None, + additional_instructions: Optional[str] = None, + output_dir: Optional[str] = None, + verbose: bool = True, + ): + self.run_id = run_id + self.har_path = har_path + self.prompt = prompt + self.model = model + self.additional_instructions = additional_instructions + self.scripts_dir = get_scripts_dir(run_id, output_dir) + self.ui = ClaudeUI(verbose=verbose) + self.usage_metadata: Dict[str, Any] = {} + self.message_store = MessageStore(run_id, output_dir) + + def _build_analysis_prompt(self) -> str: + """Build the prompt for analyzing the HAR file.""" + base_prompt = f"""Analyze the HAR file at {self.har_path} and reverse engineer the APIs captured. + +Original user prompt: {self.prompt} + +Your task: +1. Read and analyze the HAR file to understand the API calls made +2. Identify authentication patterns (cookies, tokens, headers) +3. Extract request/response patterns for each endpoint +4. Generate a clean, well-documented Python script that replicates these API calls + +The Python script should: +- Use the `requests` library +- Include proper authentication handling +- Have functions for each distinct API endpoint +- Include type hints and docstrings +- Handle errors gracefully +- Be production-ready + +Save the generated Python script to: {self.scripts_dir / 'api_client.py'} +Also create a brief README.md in the same folder explaining the APIs discovered. +Always test your implementation to ensure it works. If it doesn't try again if you think you can fix it. You can go up to 5 attempts. +Sometimes websites have bot detection and that kind of things so keep in mind. +If you see you can't achieve with requests, feel free to use playwright with the real user browser with CDP to bypass bot detection. +No matter which implementation you choose, always try to make it production ready and test it. +""" + if self.additional_instructions: + base_prompt += f"\n\nAdditional instructions:\n{self.additional_instructions}" + + return base_prompt + + @abstractmethod + async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: + """Run the reverse engineering analysis. Must be implemented by subclasses.""" + pass diff --git a/src/reverse_api/browser.py b/src/reverse_api/browser.py index b097902..b9b5f0d 100644 --- a/src/reverse_api/browser.py +++ b/src/reverse_api/browser.py @@ -10,6 +10,7 @@ from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page from playwright_stealth import Stealth from rich.console import Console +from rich.status import Status from .utils import get_har_dir, get_timestamp from .tui import THEME_PRIMARY, THEME_DIM, THEME_SUCCESS @@ -264,7 +265,7 @@ def _start_with_real_chrome(self, start_url: Optional[str] = None) -> Path: # Wait for browser to close try: while self._context.pages: - self._context.pages[0].wait_for_timeout(500) + self._context.pages[0].wait_for_timeout(100) # Faster polling except Exception: pass @@ -354,7 +355,7 @@ def _start_with_stealth_chromium(self, start_url: Optional[str] = None) -> Path: # Wait for browser to close try: while self._context.pages: - self._context.pages[0].wait_for_timeout(500) + self._context.pages[0].wait_for_timeout(100) # Faster polling except Exception: pass @@ -388,12 +389,16 @@ def close(self) -> Path: """Close the browser and save HAR file. Returns HAR path.""" end_time = get_timestamp() + console.print(f" [dim]browser closed[/dim]") + if self._context: - try: - self._context.close() # This saves the HAR file - except Exception: - pass - self._context = None + with Status(" [dim]handling har... can take a bit[/dim]", console=console, spinner="dots") as status: + try: + status.update(" [dim]saving har file...[/dim]") + self._context.close() # This saves the HAR file + except Exception: + pass + self._context = None # Only close browser if not using persistent context if self._browser and not self._using_persistent: diff --git a/src/reverse_api/cli.py b/src/reverse_api/cli.py index 7ee3abb..c12618e 100644 --- a/src/reverse_api/cli.py +++ b/src/reverse_api/cli.py @@ -1,16 +1,11 @@ -import sys from pathlib import Path import json import click import questionary from questionary import Choice -from prompt_toolkit.completion import WordCompleter from rich.console import Console -from rich.table import Table from rich.panel import Panel -from rich.text import Text -from rich.box import MINIMAL, ROUNDED from .browser import ManualBrowser from .utils import ( @@ -19,7 +14,6 @@ get_config_path, get_history_path, get_har_dir, - get_scripts_dir, get_timestamp, ) from .tui import ( @@ -29,8 +23,6 @@ THEME_PRIMARY, THEME_SECONDARY, THEME_DIM, - THEME_SUCCESS, - THEME_ERROR, ) from .config import ConfigManager from .session import SessionManager @@ -41,6 +33,8 @@ from prompt_toolkit.key_binding import KeyBindings from prompt_toolkit.styles import Style as PtStyle from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.completion import Completer, Completion +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory console = Console() @@ -68,9 +62,25 @@ def prompt_interactive_options( """ # Slash command completer - command_completer = WordCompleter([ + commands = [ "/settings", "/history", "/messages", "/help", "/exit", "/quit", "/commands" - ], ignore_case=True) + ] + + class FilteredCompleter(Completer): + def get_completions(self, document, complete_event): + text = document.text_before_cursor + if not text.startswith('/'): + return + + # Only suggest if we are still on the first word (the command) + if ' ' in text: + return + + for cmd in commands: + if cmd.startswith(text): + yield Completion(cmd, start_position=-len(text)) + + command_completer = FilteredCompleter() # Track mode state (mutable container for closure) mode_state = {"mode": current_mode, "mode_index": MODES.index(current_mode)} @@ -101,6 +111,8 @@ def get_prompt(): session = PromptSession( message=get_prompt, # Dynamic prompt function completer=command_completer, + auto_suggest=AutoSuggestFromHistory(), + complete_while_typing=True, style=pt_style, key_bindings=kb, ) @@ -168,7 +180,7 @@ def main(ctx: click.Context): def repl_loop(): """Main interactive loop for the CLI.""" display_banner(console) - console.print(f" [dim]shift+tab to cycle modes: manual | engineer[/dim]") + console.print(" [dim]shift+tab to cycle modes: manual | engineer[/dim]") display_footer(console) current_mode = "manual" @@ -197,7 +209,7 @@ def repl_loop(): if len(parts) > 1: handle_messages(parts[1].strip()) else: - console.print(f" [dim]![/dim] [red]usage:[/red] /messages ") + console.print(" [dim]![/dim] [red]usage:[/red] /messages ") else: console.print(f" [dim]![/dim] [red]unknown command:[/red] {cmd}") continue @@ -209,7 +221,7 @@ def repl_loop(): # Engineer mode: only run reverse engineering on existing run_id run_id = options.get("run_id") if not run_id: - console.print(f" [dim]![/dim] [red]error:[/red] enter a run_id to reverse engineer") + console.print(" [dim]![/dim] [red]error:[/red] enter a run_id to reverse engineer") continue run_engineer(run_id, model=options.get("model")) continue @@ -223,7 +235,7 @@ def repl_loop(): ) except (click.Abort, KeyboardInterrupt): - console.print(f"\n [dim]terminated[/dim]") + console.print("\n [dim]terminated[/dim]") return except Exception as e: console.print(f" [dim]![/dim] [red]error:[/red] {e}") @@ -239,6 +251,7 @@ def handle_settings(): "", choices=[ Choice(title="> change model", value="model"), + Choice(title="> change sdk", value="sdk"), Choice(title="> output directory", value="output_dir"), Choice(title="> back", value="back"), ], @@ -269,10 +282,29 @@ def handle_settings(): config_manager.set("model", model) console.print(f" [dim]updated[/dim] {model}\n") + elif action == "sdk": + sdk_choices = [ + Choice(title="> opencode", value="opencode"), + Choice(title="> claude", value="claude"), + Choice(title="> back", value="back"), + ] + sdk = questionary.select( + "", + choices=sdk_choices, + pointer="", + qmark="", + style=questionary.Style([ + ('highlighted', f'fg:{THEME_PRIMARY} bold'), + ]) + ).ask() + if sdk and sdk != "back": + config_manager.set("sdk", sdk) + console.print(f" [dim]updated[/dim] sdk: {sdk}\n") + elif action == "output_dir": current = config_manager.get("output_dir") new_dir = questionary.text( - f" > output directory", + " > output directory", default=current or "", instruction="(Enter for default ~/.reverse-api/runs)", qmark="", @@ -283,14 +315,14 @@ def handle_settings(): ).ask() if new_dir is not None: config_manager.set("output_dir", new_dir if new_dir.strip() else None) - console.print(f" [dim]updated[/dim] output directory\n") + console.print(" [dim]updated[/dim] output directory\n") def handle_history(): """Display history of runs.""" history = session_manager.get_history(limit=15) if not history: - console.print(f" [dim]> no logs found[/dim]") + console.print(" [dim]> no logs found[/dim]") return choices = [] @@ -323,22 +355,22 @@ def handle_history(): model = run.get("model") or config_manager.get("model", "claude-sonnet-4-5") run_engineer(run_id, model=model) else: - console.print(f" [dim]> not found[/dim]") + console.print(" [dim]> not found[/dim]") def handle_help(): """Show help for slash commands.""" console.print() - console.print(f" [white]commands[/white]") - console.print(f" [dim]>[/dim] /settings [dim]system[/dim]") - console.print(f" [dim]>[/dim] /history [dim]logs[/dim]") - console.print(f" [dim]>[/dim] /messages [dim]view run messages[/dim]") - console.print(f" [dim]>[/dim] /help [dim]help[/dim]") - console.print(f" [dim]>[/dim] /exit [dim]quit[/dim]") + console.print(" [white]commands[/white]") + console.print(" [dim]>[/dim] /settings [dim]system[/dim]") + console.print(" [dim]>[/dim] /history [dim]logs[/dim]") + console.print(" [dim]>[/dim] /messages [dim]view run messages[/dim]") + console.print(" [dim]>[/dim] /help [dim]help[/dim]") + console.print(" [dim]>[/dim] /exit [dim]quit[/dim]") console.print() - console.print(f" [white]modes[/white] [dim](shift+tab to cycle)[/dim]") - console.print(f" [dim]>[/dim] manual [dim]full pipeline: browser + reverse engineering[/dim]") - console.print(f" [dim]>[/dim] engineer [dim]reverse engineer only (enter run_id)[/dim]") + console.print(" [white]modes[/white] [dim](shift+tab to cycle)[/dim]") + console.print(" [dim]>[/dim] manual [dim]full pipeline: browser + reverse engineering[/dim]") + console.print(" [dim]>[/dim] engineer [dim]reverse engineer only (enter run_id)[/dim]") console.print() @@ -490,7 +522,8 @@ def run_engineer(run_id, har_path=None, prompt=None, model=None, output_dir=None har_path=har_path, prompt=prompt, model=model or config_manager.get("model", "claude-sonnet-4-5"), - output_dir=output_dir + output_dir=output_dir, + sdk=config_manager.get("sdk", "opencode"), ) if result: @@ -514,7 +547,7 @@ def run_engineer(run_id, har_path=None, prompt=None, model=None, output_dir=None if item.is_file(): shutil.copy2(item, local_dir / item.name) - console.print(f" [dim]>[/dim] [white]decoding complete[/white]") + console.print(" [dim]>[/dim] [white]decoding complete[/white]") console.print(f" [dim]>[/dim] [white]{result['script_path']}[/white]") console.print(f" [dim]>[/dim] [white]copied to ./scripts/{folder_name}[/white]\n") diff --git a/src/reverse_api/config.py b/src/reverse_api/config.py index 722b6d1..20dd328 100644 --- a/src/reverse_api/config.py +++ b/src/reverse_api/config.py @@ -7,6 +7,7 @@ DEFAULT_CONFIG = { "model": "claude-sonnet-4-5", "output_dir": None, # None means use ~/.reverse-api/runs + "sdk": "claude", # "opencode" or "claude" } diff --git a/src/reverse_api/engineer.py b/src/reverse_api/engineer.py index d54f3f0..ee2303e 100644 --- a/src/reverse_api/engineer.py +++ b/src/reverse_api/engineer.py @@ -1,7 +1,6 @@ -"""Reverse engineering module using Claude Agent SDK.""" +"""Reverse engineering module with SDK dispatch.""" import asyncio -import json from pathlib import Path from typing import Optional, Dict, Any @@ -15,65 +14,11 @@ ResultMessage, ) -from .utils import get_scripts_dir, get_timestamp -from .tui import ClaudeUI -from .messages import MessageStore - - -class APIReverseEngineer: - """Uses Claude to analyze HAR files and generate Python API scripts.""" - - def __init__( - self, - run_id: str, - har_path: Path, - prompt: str, - model: Optional[str] = None, - additional_instructions: Optional[str] = None, - output_dir: Optional[str] = None, - verbose: bool = True, - ): - self.run_id = run_id - self.har_path = har_path - self.prompt = prompt - self.model = model - self.additional_instructions = additional_instructions - self.scripts_dir = get_scripts_dir(run_id, output_dir) - self.ui = ClaudeUI(verbose=verbose) - self.usage_metadata: Dict[str, Any] = {} - self.message_store = MessageStore(run_id, output_dir) - - def _build_analysis_prompt(self) -> str: - """Build the prompt for Claude to analyze the HAR file.""" - base_prompt = f"""Analyze the HAR file at {self.har_path} and reverse engineer the APIs captured. - -Original user prompt: {self.prompt} - -Your task: -1. Read and analyze the HAR file to understand the API calls made -2. Identify authentication patterns (cookies, tokens, headers) -3. Extract request/response patterns for each endpoint -4. Generate a clean, well-documented Python script that replicates these API calls - -The Python script should: -- Use the `requests` library -- Include proper authentication handling -- Have functions for each distinct API endpoint -- Include type hints and docstrings -- Handle errors gracefully -- Be production-ready - -Save the generated Python script to: {self.scripts_dir / 'api_client.py'} -Also create a brief README.md in the same folder explaining the APIs discovered. -Always test your implementation to ensure it works. If it doesn't try again if you think you can fix it. You can go up to 5 attempts. -Sometimes websites have bot detection and that kind of things so keep in mind. -If you see you can't achieve with requests, feel free to use playwright with the real user browser with CDP to bypass bot detection. -No matter which implementation you choose, always try to make it production ready and test it. -""" - if self.additional_instructions: - base_prompt += f"\n\nAdditional instructions:\n{self.additional_instructions}" - - return base_prompt +from .base_engineer import BaseEngineer + + +class ClaudeEngineer(BaseEngineer): + """Uses Claude Agent SDK to analyze HAR files and generate Python API scripts.""" async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: """Run the reverse engineering analysis with Claude.""" @@ -97,7 +42,6 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: # Process response and show progress with TUI async for message in client.receive_response(): # Check for usage metadata in message if applicable - # (Note: Current SDK might not expose it easily, but we prepare for it) if hasattr(message, 'usage') and isinstance(getattr(message, 'usage'), dict): self.usage_metadata.update(getattr(message, 'usage')) @@ -187,6 +131,10 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: return None +# Keep old class name for backwards compatibility +APIReverseEngineer = ClaudeEngineer + + def run_reverse_engineering( run_id: str, har_path: Path, @@ -195,15 +143,33 @@ def run_reverse_engineering( additional_instructions: Optional[str] = None, output_dir: Optional[str] = None, verbose: bool = True, + sdk: str = "claude", ) -> Optional[Dict[str, Any]]: - """Synchronous wrapper for reverse engineering.""" - engineer = APIReverseEngineer( - run_id=run_id, - har_path=har_path, - prompt=prompt, - model=model, - additional_instructions=additional_instructions, - output_dir=output_dir, - verbose=verbose, - ) + """Run reverse engineering with the specified SDK. + + Args: + sdk: "opencode" or "claude" - determines which SDK to use + """ + if sdk == "opencode": + from .opencode_engineer import OpenCodeEngineer + engineer = OpenCodeEngineer( + run_id=run_id, + har_path=har_path, + prompt=prompt, + model=model, + additional_instructions=additional_instructions, + output_dir=output_dir, + verbose=verbose, + ) + else: + engineer = ClaudeEngineer( + run_id=run_id, + har_path=har_path, + prompt=prompt, + model=model, + additional_instructions=additional_instructions, + output_dir=output_dir, + verbose=verbose, + ) + return asyncio.run(engineer.analyze_and_generate()) diff --git a/src/reverse_api/opencode_engineer.py b/src/reverse_api/opencode_engineer.py new file mode 100644 index 0000000..649dd51 --- /dev/null +++ b/src/reverse_api/opencode_engineer.py @@ -0,0 +1,381 @@ +"""OpenCode SDK implementation for API reverse engineering. + +Uses direct httpx calls with correct API format based on OpenCode +TypeScript SDK documentation. +""" + +import asyncio +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, Any + +import httpx + +from .base_engineer import BaseEngineer +from .opencode_ui import OpenCodeUI + +# Enable debug mode with OPENCODE_DEBUG=1 +DEBUG = os.environ.get("OPENCODE_DEBUG", "0") == "1" + +def debug_log(msg: str): + """Print debug message if DEBUG mode is enabled.""" + if DEBUG: + ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print(f"[{ts}] [DEBUG] {msg}") + + +class OpenCodeEngineer(BaseEngineer): + """Uses OpenCode AI to analyze HAR files and generate Python API scripts.""" + + BASE_URL = "http://127.0.0.1:4096" + + # Map short model names to full Anthropic model IDs + MODEL_MAP = { + "sonnet": "claude-sonnet-4-5", + "opus": "claude-opus-4-5", + "haiku": "claude-haiku-4-5", + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Override UI with OpenCode-specific version + self.opencode_ui = OpenCodeUI(verbose=kwargs.get("verbose", True)) + self.ui = self.opencode_ui # Ensure base class uses our specialized UI + self._last_error: Optional[str] = None + self._session_id: Optional[str] = None + self._last_event_time = 0.0 + + async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: + """Run the reverse engineering analysis with OpenCode.""" + self.opencode_ui.header(self.run_id, self.prompt, self.model) + self.opencode_ui.start_analysis() + + # Save the prompt to messages + self.message_store.save_prompt(self._build_analysis_prompt()) + + try: + async with httpx.AsyncClient(base_url=self.BASE_URL, timeout=600.0) as client: + # Create a new session + r = await client.post("/session", json={}) + r.raise_for_status() + session_data = r.json() + self._session_id = session_data["id"] + + self.opencode_ui.session_created(self._session_id) + + # Start event stream BEFORE sending message + event_task = asyncio.create_task(self._stream_events(client)) + + # Give event stream a moment to connect + await asyncio.sleep(0.1) + + # Send prompt with correct format + # POST /session/:id/message with model object + # Resolve short model name to full Anthropic ID + model_id = self.MODEL_MAP.get(self.model, self.model) if self.model else "claude-sonnet-4-5-20250514" + + prompt_body = { + "model": { + "providerID": "anthropic", + "modelID": model_id + }, + "parts": [{"type": "text", "text": self._build_analysis_prompt()}] + } + + prompt_r = await client.post( + f"/session/{self._session_id}/message", + json=prompt_body + ) + prompt_r.raise_for_status() + + # Wait for events to complete + try: + await asyncio.wait_for(event_task, timeout=600.0) + except asyncio.TimeoutError: + self._last_error = "Session timed out (10 min)" + self.opencode_ui.error(self._last_error) + + # Stop streaming UI + self.opencode_ui.stop_streaming() + + # Check for errors + if self._last_error: + self.opencode_ui.error(self._last_error) + self.message_store.save_error(self._last_error) + return None + + # Success + script_path = str(self.scripts_dir / 'api_client.py') + self.opencode_ui.success(script_path) + + result_data: Dict[str, Any] = { + "script_path": script_path, + "usage": self.usage_metadata, + "session_id": self._session_id, + } + self.message_store.save_result(result_data) + return result_data + + except httpx.ConnectError: + self.opencode_ui.error("Connection error") + self.opencode_ui.console.print("\n[dim]Make sure OpenCode is running: opencode[/dim]") + self.message_store.save_error("Connection error") + return None + + except Exception as e: + error_msg = str(e) if str(e) else "Unknown error" + self.opencode_ui.error(error_msg) + self.message_store.save_error(error_msg) + return None + + async def _stream_events(self, client: httpx.AsyncClient): + """Stream events from OpenCode and update UI.""" + seen_parts: set = set() # Track part IDs to avoid duplicates + import time + self._last_event_time = time.time() + + debug_log("Starting event stream...") + + # Start the live display + self.opencode_ui.start_streaming() + + try: + debug_log(f"Connecting to GET /event") + async with client.stream("GET", "/event", timeout=None) as response: + debug_log(f"Event stream connected, status={response.status_code}") + async for line in response.aiter_lines(): + if not line: + continue + + # SSE format: "data: {...}" + if not line.startswith("data: "): + if line.startswith("data:"): + line_data = line[5:].strip() + else: + debug_log(f"Skipping non-data line: {line[:50]}") + continue + else: + line_data = line[6:].strip() + + if not line_data: + continue + + self._last_event_time = time.time() + + try: + data = json.loads(line_data) + except json.JSONDecodeError as e: + debug_log(f"JSON decode error: {e}, data: {line_data[:100]}") + continue + + event_type = data.get("type") + properties = data.get("properties", {}) + + debug_log(f"Event: {event_type}") + + # Handle different event types + if event_type == "message.part.updated": + await self._handle_part_update(properties, seen_parts) + + elif event_type == "session.idle": + event_sid = properties.get("sessionID") + debug_log(f"session.idle: sessionID={event_sid}, our session={self._session_id}") + if event_sid == self._session_id: + debug_log("Our session is idle, returning!") + self.opencode_ui.session_status("idle") + return # Done! + + elif event_type == "session.status": + event_sid = properties.get("sessionID") + status = properties.get("status", {}) + status_type = status.get("type", "idle") + debug_log(f"session.status: sessionID={event_sid}, status={status_type}") + if event_sid == self._session_id: + if status_type == "retry": + attempt = status.get("attempt", 1) + message = status.get("message", "") + self.opencode_ui.session_retry(attempt, message) + else: + self.opencode_ui.session_status(status_type) + + if status_type == "idle": + debug_log("Our session status is idle, returning!") + return # Done! + + elif event_type == "permission.updated": + # Auto-approve permissions so the agent can proceed + permission_id = properties.get("id") + perm_session = properties.get("sessionID") + perm_type = properties.get("type", "") + perm_title = properties.get("title", "") + + debug_log(f"permission.updated: id={permission_id}, type={perm_type}, title={perm_title}") + + if perm_session == self._session_id and permission_id: + # Show permission request in UI + self.opencode_ui.permission_requested(perm_type, perm_title) + + # Auto-approve the permission + # OpenCode expects: "once" | "always" | "reject" + debug_log(f"Auto-approving permission {permission_id}") + try: + perm_response = await client.post( + f"/session/{self._session_id}/permissions/{permission_id}", + json={"response": "always"} # "once", "always", or "reject" + ) + debug_log(f"Permission response: {perm_response.status_code}") + if perm_response.status_code == 200: + self.opencode_ui.permission_approved(perm_type) + except Exception as pe: + debug_log(f"Permission approval failed: {pe}") + + elif event_type == "todo.updated": + todos = properties.get("todos", []) + event_sid = properties.get("sessionID") + if event_sid == self._session_id and todos: + debug_log(f"todo.updated: {len(todos)} todos") + self.opencode_ui.todo_updated(todos) + + elif event_type == "file.edited": + file_path = properties.get("file", "") + if file_path: + debug_log(f"file.edited: {file_path}") + self.opencode_ui.file_edited(file_path) + + elif event_type == "session.diff": + event_sid = properties.get("sessionID") + diffs = properties.get("diff", []) + if event_sid == self._session_id and diffs: + debug_log(f"session.diff: {len(diffs)} files changed") + self.opencode_ui.session_diff(diffs) + + elif event_type == "session.compacted": + event_sid = properties.get("sessionID") + if event_sid == self._session_id: + debug_log("session.compacted") + self.opencode_ui.session_compacted() + + elif event_type == "session.error": + event_sid = properties.get("sessionID") + if event_sid and event_sid != self._session_id: + debug_log(f"session.error for other session {event_sid}, ignoring") + continue + + error_obj = properties.get("error", {}) + debug_log(f"session.error: {error_obj}") + + # Parse error with type-specific handling + if isinstance(error_obj, dict): + error_name = error_obj.get("name", "UnknownError") + error_data = error_obj.get("data", {}) + + if error_name == "ProviderAuthError": + provider = error_data.get("providerID", "unknown") + msg = error_data.get("message", "Authentication failed") + self._last_error = f"Auth error ({provider}): {msg}" + elif error_name == "APIError": + msg = error_data.get("message", "API error") + status = error_data.get("statusCode", "") + self._last_error = f"API error{' (' + str(status) + ')' if status else ''}: {msg}" + elif error_name == "MessageAbortedError": + self._last_error = "Aborted" + else: + msg = error_data.get("message", "") if isinstance(error_data, dict) else str(error_data) + self._last_error = f"{error_name}: {msg}" if msg else error_name + else: + self._last_error = str(error_obj) + + self.opencode_ui.error(self._last_error) + return + + except httpx.ReadError as e: + self._last_error = f"Stream disconnected: {e}" + except Exception as e: + self._last_error = str(e) if str(e) else "Stream error" + + async def _handle_part_update(self, properties: dict, seen_parts: set): + """Handle message.part.updated events.""" + part = properties.get("part", {}) + delta = properties.get("delta") # Incremental text update + + part_id = part.get("id", "") + part_type = part.get("type") + part_session = part.get("sessionID") + + # Only process parts for our session + if part_session != self._session_id: + return + + if part_type == "text": + text = part.get("text", "") + debug_log(f"Handling text part: id={part_id}, delta={'yes' if delta else 'no'}, len={len(text)}") + # Use delta for incremental updates if available + self.opencode_ui.update_text(text, delta) + + # Save to message store (only significant updates) + if len(text) > 50 and part_id not in seen_parts: + seen_parts.add(part_id) + self.message_store.save_thinking(text) + + elif part_type == "tool": + tool_name = part.get("tool", "tool") + state = part.get("state", {}) + status = state.get("status") + + debug_log(f"Handling tool part: id={part_id}, tool={tool_name}, status={status}") + + if status == "running" and part_id not in seen_parts: + seen_parts.add(part_id) + tool_input = state.get("input", {}) + self.opencode_ui.tool_start(tool_name, tool_input) + self.message_store.save_tool_start(tool_name, tool_input) + + elif status == "completed": + output = state.get("output", "") + self.opencode_ui.tool_result(tool_name, False, output) + self.message_store.save_tool_result(tool_name, False, output) + + elif status == "error": + error = state.get("error", "Tool error") + self.opencode_ui.tool_result(tool_name, True, error) + self.message_store.save_tool_result(tool_name, True, error) + + elif part_type == "step-finish": + debug_log("Handling step-finish part") + # Extract usage stats + cost = part.get("cost", 0) + tokens = part.get("tokens", {}) + + self.opencode_ui.step_finish(cost, tokens) + + # Update usage metadata + self.usage_metadata["input_tokens"] = self.usage_metadata.get("input_tokens", 0) + tokens.get("input", 0) + self.usage_metadata["output_tokens"] = self.usage_metadata.get("output_tokens", 0) + tokens.get("output", 0) + cache = tokens.get("cache", {}) + self.usage_metadata["cache_read_tokens"] = self.usage_metadata.get("cache_read_tokens", 0) + cache.get("read", 0) + self.usage_metadata["cache_creation_tokens"] = self.usage_metadata.get("cache_creation_tokens", 0) + cache.get("write", 0) + self.usage_metadata["cost"] = self.usage_metadata.get("cost", 0) + cost + + +def run_opencode_engineering( + run_id: str, + har_path: Path, + prompt: str, + model: Optional[str] = None, + additional_instructions: Optional[str] = None, + output_dir: Optional[str] = None, + verbose: bool = True, +) -> Optional[Dict[str, Any]]: + """Synchronous wrapper for OpenCode reverse engineering.""" + engineer = OpenCodeEngineer( + run_id=run_id, + har_path=har_path, + prompt=prompt, + model=model, + additional_instructions=additional_instructions, + output_dir=output_dir, + verbose=verbose, + ) + return asyncio.run(engineer.analyze_and_generate()) diff --git a/src/reverse_api/opencode_ui.py b/src/reverse_api/opencode_ui.py new file mode 100644 index 0000000..76a2774 --- /dev/null +++ b/src/reverse_api/opencode_ui.py @@ -0,0 +1,260 @@ +"""OpenCode-specific Terminal UI with live streaming updates.""" + +from typing import Optional, Dict, Any +from rich.console import Console +from rich.live import Live +from rich.text import Text +from rich.spinner import Spinner +from rich.table import Table +from rich.panel import Panel + +# Theme configuration (matching tui.py) +THEME_PRIMARY = "#ff5f50" +THEME_SECONDARY = "white" +THEME_DIM = "#555555" +THEME_SUCCESS = "#ff5f50" + + +class OpenCodeUI: + """Terminal UI for OpenCode with live streaming support.""" + + def __init__(self, console: Optional[Console] = None, verbose: bool = True): + self.console = console or Console() + self.verbose = verbose + self._live: Optional[Live] = None + self._current_text = "" + self._current_tool: Optional[str] = None + self._tool_status: str = "" + self._session_status: str = "idle" + self._tools_used: list[str] = [] + + def header(self, run_id: str, prompt: str, model: Optional[str] = None) -> None: + """Display the session header.""" + self.console.print() + self.console.print(f" [white]reverse-api[/white] [dim]v0.1.0[/dim]") + self.console.print(f" [dim]━[/dim] [white]{run_id}[/white]") + self.console.print(f" [dim]model[/dim] [white]{model or '---'}[/white]") + self.console.print(f" [{THEME_PRIMARY}]task[/{THEME_PRIMARY}] [white]{prompt}[/white]") + self.console.print() + + def start_analysis(self) -> None: + """Display analysis start message.""" + self.console.print(f" [dim]decoding starting...[/dim]") + self.console.print() + + def session_created(self, session_id: str) -> None: + """Display session creation.""" + self.console.print(f" [dim]session: {session_id[:16]}...[/dim]") + + def start_streaming(self) -> None: + """Start the live display for streaming updates.""" + self._live = Live( + self._build_display(), + console=self.console, + refresh_per_second=10, + transient=True, + ) + self._live.start() + + def stop_streaming(self) -> None: + """Stop the live display.""" + if self._live: + self._live.stop() + self._live = None + + def _build_display(self) -> Text: + """Build the current display state.""" + display = Text() + + # Show current tool if running + if self._current_tool and self._tool_status == "running": + display.append(f" ⟳ ", style=THEME_PRIMARY) + display.append(f"{self._current_tool}", style="white") + display.append(" running...\n", style=THEME_DIM) + + # Show streaming text (last 200 chars) + if self._current_text: + text_preview = self._current_text[-200:].replace("\n", " ").strip() + if len(self._current_text) > 200: + text_preview = "..." + text_preview + display.append(f" {text_preview}\n", style=THEME_DIM) + + return display + + def update_text(self, text: str, delta: Optional[str] = None) -> None: + """Update the streaming text display.""" + if delta: + self._current_text += delta + else: + self._current_text = text + + if self._live: + self._live.update(self._build_display()) + + def tool_start(self, tool_name: str, tool_input: Optional[Dict] = None) -> None: + """Display when a tool starts execution.""" + self._current_tool = tool_name + self._tool_status = "running" + self._tools_used.append(tool_name) + + # Print tool start (this stays in terminal) + input_summary = self._summarize_input(tool_name, tool_input or {}) + self.console.print(f" [dim]>[/dim] {tool_name.lower():12} {input_summary}") + + if self._live: + self._live.update(self._build_display()) + + def tool_result(self, tool_name: str, is_error: bool = False, output: Optional[str] = None) -> None: + """Display when a tool completes.""" + self._current_tool = None + self._tool_status = "completed" if not is_error else "error" + + if self._live: + self._live.update(self._build_display()) + + if is_error: + self.console.print(f" [red]![/red] {tool_name.lower()} failed", style=THEME_DIM) + if output: + # Show first 100 chars of error + error_preview = str(output)[:100].replace("\n", " ") + self.console.print(f" {error_preview}", style=THEME_DIM) + + def step_finish(self, cost: float, tokens: Dict[str, Any]) -> None: + """Display step completion with usage stats.""" + input_tokens = tokens.get("input", 0) + output_tokens = tokens.get("output", 0) + cache = tokens.get("cache", {}) + cache_read = cache.get("read", 0) + cache_write = cache.get("write", 0) + + self.console.print(f" [dim]step: {input_tokens:,}in/{output_tokens:,}out ${cost:.4f}[/dim]") + + def session_status(self, status_type: str) -> None: + """Update session status.""" + self._session_status = status_type + if self._live: + self._live.update(self._build_display()) + + def thinking(self, text: str) -> None: + """Display thinking text (for compatibility with ClaudeUI).""" + # For streaming, we use update_text instead + # This is a fallback for non-streaming scenarios + if not self._live and self.verbose and len(text) > 20: + display_text = text[:100].replace("\n", " ").strip() + if len(text) > 100: + display_text += "..." + self.console.print(f" .. {display_text}", style=THEME_DIM) + + def success(self, script_path: str) -> None: + """Display success message.""" + self.console.print() + self.console.print(f" [dim]decoding complete[/dim]") + self.console.print(f" [white]{script_path}[/white]") + self.console.print() + + def error(self, message: str) -> None: + """Display error message.""" + self.console.print() + self.console.print(f" [dim]![/dim] [red]error:[/red] {message}") + + def permission_requested(self, perm_type: str, title: str) -> None: + """Display when a permission is requested.""" + self.console.print(f" [yellow]?[/yellow] [dim]permission:[/dim] {title}", style=THEME_DIM) + + def permission_approved(self, perm_type: str) -> None: + """Display when a permission is auto-approved.""" + self.console.print(f" [green]βœ“[/green] [dim]auto-approved {perm_type}[/dim]") + + def todo_updated(self, todos: list) -> None: + """Display todo list updates.""" + if not todos: + return + + # Count by status + pending = sum(1 for t in todos if t.get("status") == "pending") + completed = sum(1 for t in todos if t.get("status") == "completed") + in_progress = sum(1 for t in todos if t.get("status") == "in_progress") + + parts = [] + if in_progress: + parts.append(f"{in_progress} active") + if pending: + parts.append(f"{pending} pending") + if completed: + parts.append(f"{completed} done") + + status_str = ", ".join(parts) if parts else f"{len(todos)} items" + self.console.print(f" [dim]tasks:[/dim] {status_str}") + + def file_edited(self, file_path: str) -> None: + """Display when a file is edited.""" + short_path = self._truncate_path(file_path, 40) + self.console.print(f" [dim]✎[/dim] {short_path}", style=THEME_DIM) + + def session_busy(self) -> None: + """Display busy indicator.""" + # This is handled by the live display spinner + pass + + def session_idle(self) -> None: + """Display idle state.""" + # Usually means we're done + pass + + def session_diff(self, diffs: list) -> None: + """Display file changes summary.""" + if not diffs: + return + + total_add = sum(d.get("additions", 0) for d in diffs) + total_del = sum(d.get("deletions", 0) for d in diffs) + + files_summary = f"{len(diffs)} file{'s' if len(diffs) > 1 else ''}" + changes = [] + if total_add: + changes.append(f"+{total_add}") + if total_del: + changes.append(f"-{total_del}") + + change_str = " ".join(changes) if changes else "" + self.console.print(f" [dim]diff:[/dim] {files_summary} {change_str}") + + def session_compacted(self) -> None: + """Display context compaction notification.""" + self.console.print(f" [dim]context compacted[/dim]") + + def session_retry(self, attempt: int, message: str) -> None: + """Display retry status.""" + reason = message if message else "retrying..." + self.console.print(f" [yellow]⟳[/yellow] [dim]attempt {attempt}:[/dim] {reason}") + + def _summarize_input(self, tool_name: str, tool_input: dict) -> str: + """Create a brief summary of tool input.""" + tool_lower = tool_name.lower() + + if tool_lower in ("read", "file_read"): + path = tool_input.get("path", tool_input.get("file_path", "")) + return f"[dim]{self._truncate_path(path)}[/dim]" + elif tool_lower in ("write", "file_write", "edit"): + path = tool_input.get("path", tool_input.get("file_path", "")) + return f"[dim]β†’ {self._truncate_path(path)}[/dim]" + elif tool_lower in ("bash", "shell"): + cmd = str(tool_input.get("command", ""))[:60] + return f"[dim]$ {cmd}{'...' if len(cmd) >= 60 else ''}[/dim]" + elif tool_lower in ("glob", "find"): + pattern = tool_input.get("pattern", tool_input.get("query", "")) + return f"[dim]'{pattern}'[/dim]" + elif tool_lower in ("webfetch", "web_fetch"): + url = str(tool_input.get("url", ""))[:50] + return f"[dim]{url}{'...' if len(url) >= 50 else ''}[/dim]" + elif tool_lower == "todowrite": + todos = tool_input.get("todos", []) + return f"[dim]{len(todos)} items[/dim]" + + return "" + + def _truncate_path(self, path: str, max_len: int = 50) -> str: + """Truncate a path for display.""" + if len(path) <= max_len: + return path + return "..." + path[-(max_len - 3):]