diff --git a/src/reverse_api/browser.py b/src/reverse_api/browser.py index abc247c..f614b74 100644 --- a/src/reverse_api/browser.py +++ b/src/reverse_api/browser.py @@ -39,6 +39,7 @@ def _null_logger(message: dict) -> None: """Null logger that discards all messages.""" pass + # Realistic Chrome user agents (updated for late 2024/2025) USER_AGENTS = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", @@ -593,7 +594,8 @@ def __init__( prompt: str, output_dir: str | None = None, timeout: int = 300, - agent_model: str = "bu-llm", + browser_use_model: str = "bu-llm", + stagehand_model: str = "openai/computer-use-preview-2025-03-11", agent_provider: str = "browser-use", start_url: Optional[str] = None, ): @@ -602,7 +604,8 @@ def __init__( self.prompt = prompt self.output_dir = output_dir self.timeout = timeout - self.agent_model = agent_model + self.browser_use_model = browser_use_model + self.stagehand_model = stagehand_model self.agent_provider = agent_provider self.start_url = start_url @@ -616,12 +619,21 @@ def __init__( def _save_metadata(self, end_time: str, result: dict | None = None) -> None: """Save run metadata to JSON file.""" + # Select the appropriate model based on agent_provider + agent_model = ( + self.stagehand_model + if self.agent_provider == "stagehand" + else self.browser_use_model + ) + metadata = { "run_id": self.run_id, "prompt": self.prompt, "mode": "agent", "agent_provider": self.agent_provider, - "agent_model": self.agent_model, + "agent_model": agent_model, + "browser_use_model": self.browser_use_model, + "stagehand_model": self.stagehand_model, "start_time": self._start_time, "end_time": end_time, "har_file": str(self.har_path), @@ -709,7 +721,7 @@ def emit(self, record): # Parse agent model and validate API key try: provider, model_name = parse_agent_model( - self.agent_model, self.agent_provider + self.browser_use_model, self.agent_provider ) except ValueError as e: result["error"] = str(e) @@ -842,7 +854,7 @@ async def _run_with_stagehand(self) -> dict: try: try: provider, model_name = parse_agent_model( - self.agent_model, self.agent_provider + self.stagehand_model, self.agent_provider ) except ValueError as e: result["error"] = str(e) @@ -1052,7 +1064,8 @@ def run_agent_browser( prompt: str, output_dir: str | None = None, timeout: int = 300, - agent_model: str = "bu-llm", + browser_use_model: str = "bu-llm", + stagehand_model: str = "openai/computer-use-preview-2025-03-11", agent_provider: str = "browser-use", start_url: Optional[str] = None, ) -> Path: @@ -1062,7 +1075,8 @@ def run_agent_browser( prompt=prompt, output_dir=output_dir, timeout=timeout, - agent_model=agent_model, + browser_use_model=browser_use_model, + stagehand_model=stagehand_model, agent_provider=agent_provider, start_url=start_url, ) diff --git a/src/reverse_api/cli.py b/src/reverse_api/cli.py index 3dd1ede..a136ec5 100644 --- a/src/reverse_api/cli.py +++ b/src/reverse_api/cli.py @@ -150,7 +150,8 @@ def get_prompt(): return { "mode": result_mode, "run_id": prompt, - "model": model or config_manager.get("model", "claude-sonnet-4-5"), + "model": model + or config_manager.get("claude_code_model", "claude-sonnet-4-5"), } # Agent mode: similar to manual but uses autonomous browser @@ -174,7 +175,7 @@ def get_prompt(): raise click.Abort() if model is None: - model = config_manager.get("model", "claude-sonnet-4-5") + model = config_manager.get("claude_code_model", "claude-sonnet-4-5") return { "mode": result_mode, @@ -208,7 +209,7 @@ def get_prompt(): reverse_engineer = True if model is None: - model = config_manager.get("model", "claude-sonnet-4-5") + model = config_manager.get("claude_code_model", "claude-sonnet-4-5") return { "mode": result_mode, @@ -315,10 +316,13 @@ def handle_settings(): action = questionary.select( "", choices=[ - Choice(title="> change model", value="model"), + Choice(title="> change claude code model", value="claude_code_model"), Choice(title="> change sdk", value="sdk"), + Choice(title="> opencode provider", value="opencode_provider"), + Choice(title="> opencode model", value="opencode_model"), Choice(title="> agent provider", value="agent_provider"), - Choice(title="> agent model", value="agent_model"), + Choice(title="> browser-use model", value="browser_use_model"), + Choice(title="> stagehand model", value="stagehand_model"), Choice(title="> output directory", value="output_dir"), Choice(title="> back", value="back"), ], @@ -335,7 +339,7 @@ def handle_settings(): if action is None or action == "back": return - if action == "model": + if action == "claude_code_model": model_choices = [ Choice(title=f"> {c['name'].lower()}", value=c["value"]) for c in get_model_choices() @@ -353,7 +357,7 @@ def handle_settings(): ), ).ask() if model and model != "back": - config_manager.set("model", model) + config_manager.set("claude_code_model", model) console.print(f" [dim]updated[/dim] {model}\n") elif action == "sdk": @@ -397,35 +401,109 @@ def handle_settings(): if provider and provider != "back": config_manager.set("agent_provider", provider) console.print(f" [dim]updated[/dim] agent provider: {provider}\n") - # If switching to stagehand, validate current model - if provider == "stagehand": - current_model = config_manager.get("agent_model", "bu-llm") - try: - from .browser import parse_agent_model - parse_agent_model(current_model, provider) - except ValueError: + elif action == "opencode_provider": + current = config_manager.get("opencode_provider", "anthropic") + new_provider = questionary.text( + " > opencode provider", + default=current or "anthropic", + instruction="(e.g., 'anthropic', 'openai', 'google')", + qmark="", + style=questionary.Style( + [ + ("question", f"fg:{THEME_SECONDARY}"), + ("instruction", f"fg:{THEME_DIM} italic"), + ] + ), + ).ask() + if new_provider is not None: + new_provider = new_provider.strip() + if not new_provider: + console.print( + " [yellow]error:[/yellow] opencode provider cannot be empty\n" + ) + else: + config_manager.set("opencode_provider", new_provider) + console.print( + f" [dim]updated[/dim] opencode provider: {new_provider}\n" + ) + + elif action == "opencode_model": + current = config_manager.get("opencode_model", "claude-sonnet-4-5") + new_model = questionary.text( + " > opencode model", + default=current or "claude-sonnet-4-5", + instruction="(e.g., 'claude-sonnet-4-5', 'claude-opus-4-5')", + qmark="", + style=questionary.Style( + [ + ("question", f"fg:{THEME_SECONDARY}"), + ("instruction", f"fg:{THEME_DIM} italic"), + ] + ), + ).ask() + if new_model is not None: + new_model = new_model.strip() + if not new_model: + console.print( + " [yellow]error:[/yellow] opencode model cannot be empty\n" + ) + else: + config_manager.set("opencode_model", new_model) + console.print(f" [dim]updated[/dim] opencode model: {new_model}\n") + + elif action == "browser_use_model": + from .browser import parse_agent_model + + current = config_manager.get("browser_use_model", "bu-llm") + instruction = "(Format: 'bu-llm' or 'provider/model', e.g., 'openai/gpt-4')" + + new_model = questionary.text( + " > browser-use model", + default=current or "bu-llm", + instruction=instruction, + qmark="", + style=questionary.Style( + [ + ("question", f"fg:{THEME_SECONDARY}"), + ("instruction", f"fg:{THEME_DIM} italic"), + ] + ), + ).ask() + if new_model is not None: + new_model = new_model.strip() + if not new_model: + console.print( + " [yellow]error:[/yellow] browser-use model cannot be empty\n" + ) + else: + # Validate format for browser-use + try: + parse_agent_model(new_model, "browser-use") + config_manager.set("browser_use_model", new_model) console.print( - f" [yellow]warning:[/yellow] Current agent model '{current_model}' may not be compatible with stagehand.\n" + f" [dim]updated[/dim] browser-use model: {new_model}\n" ) + except ValueError as e: + console.print(f" [yellow]error:[/yellow] {e}\n") console.print( - f" [dim]Stagehand supports OpenAI and Anthropic Computer Use models[/dim]\n" - f" [dim]Examples: openai/computer-use-preview-2025-03-11, anthropic/claude-sonnet-4-5-20250929[/dim]\n" + " [dim]Valid formats:[/dim]\n" + " [dim] - bu-llm[/dim]\n" + " [dim] - openai/model_name (e.g., openai/gpt-4)[/dim]\n" + " [dim] - google/model_name (e.g., google/gemini-pro)[/dim]\n" ) - elif action == "agent_model": + elif action == "stagehand_model": from .browser import parse_agent_model - current = config_manager.get("agent_model", "bu-llm") - agent_provider = config_manager.get("agent_provider", "browser-use") - - instruction = "(Format: 'bu-llm' or 'provider/model', e.g., 'openai/gpt-4')" - if agent_provider == "stagehand": - instruction = "(Format: 'openai/model' or 'anthropic/model', e.g., 'openai/computer-use-preview-2025-03-11' or 'anthropic/claude-sonnet-4-5-20250929')" + current = config_manager.get( + "stagehand_model", "openai/computer-use-preview-2025-03-11" + ) + instruction = "(Format: 'openai/model' or 'anthropic/model', e.g., 'openai/computer-use-preview-2025-03-11' or 'anthropic/claude-sonnet-4-5-20250929')" new_model = questionary.text( - " > agent model", - default=current or "bu-llm", + " > stagehand model", + default=current or "openai/computer-use-preview-2025-03-11", instruction=instruction, qmark="", style=questionary.Style( @@ -438,30 +516,24 @@ def handle_settings(): if new_model is not None: new_model = new_model.strip() if not new_model: - console.print(" [yellow]error:[/yellow] agent model cannot be empty\n") + console.print( + " [yellow]error:[/yellow] stagehand model cannot be empty\n" + ) else: - # Validate format with current agent_provider + # Validate format for stagehand try: - parse_agent_model(new_model, agent_provider) - config_manager.set("agent_model", new_model) - console.print(f" [dim]updated[/dim] agent model: {new_model}\n") + parse_agent_model(new_model, "stagehand") + config_manager.set("stagehand_model", new_model) + console.print(f" [dim]updated[/dim] stagehand model: {new_model}\n") except ValueError as e: console.print(f" [yellow]error:[/yellow] {e}\n") - if agent_provider == "stagehand": - console.print( - " [dim]Valid formats for stagehand:[/dim]\n" - " [dim] - openai/computer-use-preview-2025-03-11[/dim]\n" - " [dim] - anthropic/claude-sonnet-4-5-20250929[/dim]\n" - " [dim] - anthropic/claude-haiku-4-5-20251001[/dim]\n" - " [dim] - anthropic/claude-opus-4-5-20251101[/dim]\n" - ) - else: - console.print( - " [dim]Valid formats:[/dim]\n" - " [dim] - bu-llm[/dim]\n" - " [dim] - openai/model_name (e.g., openai/gpt-4)[/dim]\n" - " [dim] - google/model_name (e.g., google/gemini-pro)[/dim]\n" - ) + console.print( + " [dim]Valid formats for stagehand:[/dim]\n" + " [dim] - openai/computer-use-preview-2025-03-11[/dim]\n" + " [dim] - anthropic/claude-sonnet-4-5-20250929[/dim]\n" + " [dim] - anthropic/claude-haiku-4-5-20251001[/dim]\n" + " [dim] - anthropic/claude-opus-4-5-20251101[/dim]\n" + ) elif action == "output_dir": current = config_manager.get("output_dir") @@ -518,7 +590,9 @@ def handle_history(): if run: console.print(Panel(json.dumps(run, indent=2), border_style=THEME_DIM)) if questionary.confirm(" > recode?").ask(): - model = run.get("model") or config_manager.get("model", "claude-sonnet-4-5") + model = run.get("model") or config_manager.get( + "claude_code_model", "claude-sonnet-4-5" + ) run_engineer(run_id, model=model) else: console.print(" [dim]> not found[/dim]") @@ -692,8 +766,11 @@ def run_agent_capture( run_id = generate_run_id() timestamp = get_timestamp() - # Get agent model and provider from config - agent_model = config_manager.get("agent_model", "bu-llm") + # Get agent models and provider from config + browser_use_model = config_manager.get("browser_use_model", "bu-llm") + stagehand_model = config_manager.get( + "stagehand_model", "openai/computer-use-preview-2025-03-11" + ) agent_provider = config_manager.get("agent_provider", "browser-use") # Record initial session @@ -713,7 +790,8 @@ def run_agent_capture( run_id=run_id, prompt=prompt, output_dir=output_dir, - agent_model=agent_model, + browser_use_model=browser_use_model, + stagehand_model=stagehand_model, agent_provider=agent_provider, start_url=url, ) @@ -813,14 +891,27 @@ def run_engineer(run_id, har_path=None, prompt=None, model=None, output_dir=None har_dir = Path(paths.get("har_dir", get_har_dir(run_id, None))) har_path = har_dir / "recording.har" - result = run_reverse_engineering( - run_id=run_id, - har_path=har_path, - prompt=prompt, - model=model or config_manager.get("model", "claude-sonnet-4-5"), - output_dir=output_dir, - sdk=config_manager.get("sdk", "opencode"), - ) + sdk = config_manager.get("sdk", "claude") + if sdk == "opencode": + result = run_reverse_engineering( + run_id=run_id, + har_path=har_path, + prompt=prompt, + model=model, + output_dir=output_dir, + sdk=sdk, + opencode_provider=config_manager.get("opencode_provider", "anthropic"), + opencode_model=config_manager.get("opencode_model", "claude-sonnet-4-5"), + ) + else: + result = run_reverse_engineering( + run_id=run_id, + har_path=har_path, + prompt=prompt, + model=model or config_manager.get("claude_code_model", "claude-sonnet-4-5"), + output_dir=output_dir, + sdk=sdk, + ) if result: # Automatically copy scripts to current directory with a readable name diff --git a/src/reverse_api/config.py b/src/reverse_api/config.py index b20fc5f..153f53b 100644 --- a/src/reverse_api/config.py +++ b/src/reverse_api/config.py @@ -5,12 +5,14 @@ from typing import Any, Dict DEFAULT_CONFIG = { - "model": "claude-sonnet-4-5", + "claude_code_model": "claude-sonnet-4-5", + "opencode_provider": "anthropic", + "opencode_model": "claude-sonnet-4-5", "output_dir": None, # None means use ~/.reverse-api/runs "sdk": "claude", # "opencode" or "claude" "agent_provider": "browser-use", - # We support openai & google as model providers - "agent_model": "bu-llm", # "bu-llm" or "{provider}/{model_name}" (e.g. "openai/gpt-5-mini") + "browser_use_model": "bu-llm", # "bu-llm" or "{provider}/{model_name}" (e.g. "openai/gpt-5-mini") + "stagehand_model": "openai/computer-use-preview-2025-03-11", # "{provider}/{model_name}" format } @@ -28,8 +30,35 @@ def load(self): try: with open(self.config_path, "r") as f: user_config = json.load(f) + + # Backward compatibility: migrate old config keys + # Migrate "model" -> "claude_code_model" + if ( + "model" in user_config + and "claude_code_model" not in user_config + ): + user_config["claude_code_model"] = user_config["model"] + + # Migrate "agent_model" -> "browser_use_model" and "stagehand_model" + if "agent_model" in user_config: + agent_provider = user_config.get( + "agent_provider", "browser-use" + ) + if agent_provider == "stagehand": + if "stagehand_model" not in user_config: + user_config["stagehand_model"] = user_config[ + "agent_model" + ] + else: + if "browser_use_model" not in user_config: + user_config["browser_use_model"] = user_config[ + "agent_model" + ] + # Only keep valid keys - valid_config = {k: v for k, v in user_config.items() if k in self.config} + valid_config = { + k: v for k, v in user_config.items() if k in self.config + } self.config.update(valid_config) except (json.JSONDecodeError, OSError): # Fallback to defaults if file is corrupted diff --git a/src/reverse_api/engineer.py b/src/reverse_api/engineer.py index ee2303e..ee8c6bb 100644 --- a/src/reverse_api/engineer.py +++ b/src/reverse_api/engineer.py @@ -24,12 +24,18 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: """Run the reverse engineering analysis with Claude.""" self.ui.header(self.run_id, self.prompt, self.model) self.ui.start_analysis() - - # Save the prompt to messages self.message_store.save_prompt(self._build_analysis_prompt()) options = ClaudeAgentOptions( - allowed_tools=["Read", "Write", "Bash", "Glob", "Grep", "WebSearch", "WebFetch"], + allowed_tools=[ + "Read", + "Write", + "Bash", + "Glob", + "Grep", + "WebSearch", + "WebFetch", + ], permission_mode="acceptEdits", cwd=str(self.scripts_dir.parent.parent), # Project root model=self.model, @@ -42,8 +48,10 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: # Process response and show progress with TUI async for message in client.receive_response(): # Check for usage metadata in message if applicable - if hasattr(message, 'usage') and isinstance(getattr(message, 'usage'), dict): - self.usage_metadata.update(getattr(message, 'usage')) + if hasattr(message, "usage") and isinstance( + getattr(message, "usage"), dict + ): + self.usage_metadata.update(getattr(message, "usage")) if isinstance(message, AssistantMessage): last_tool_name = None @@ -51,70 +59,94 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: if isinstance(block, ToolUseBlock): last_tool_name = block.name self.ui.tool_start(block.name, block.input) - self.message_store.save_tool_start(block.name, block.input) + self.message_store.save_tool_start( + block.name, block.input + ) elif isinstance(block, ToolResultBlock): is_error = block.is_error if block.is_error else False - + # Extract output from ToolResultBlock output = None - if hasattr(block, 'content'): + if hasattr(block, "content"): output = block.content - elif hasattr(block, 'result'): + elif hasattr(block, "result"): output = block.result - elif hasattr(block, 'output'): + elif hasattr(block, "output"): output = block.output - + tool_name = last_tool_name or "Tool" self.ui.tool_result(tool_name, is_error, output) - self.message_store.save_tool_result(tool_name, is_error, str(output) if output else None) + self.message_store.save_tool_result( + tool_name, is_error, str(output) if output else None + ) elif isinstance(block, TextBlock): self.ui.thinking(block.text) self.message_store.save_thinking(block.text) - + elif isinstance(message, ResultMessage): if message.is_error: self.ui.error(message.result or "Unknown error") - self.message_store.save_error(message.result or "Unknown error") + self.message_store.save_error( + message.result or "Unknown error" + ) return None else: - script_path = str(self.scripts_dir / 'api_client.py') + script_path = str(self.scripts_dir / "api_client.py") self.ui.success(script_path) - + # Calculate estimated cost if we have usage data if self.usage_metadata: - input_tokens = self.usage_metadata.get("input_tokens", 0) - output_tokens = self.usage_metadata.get("output_tokens", 0) - cache_creation_tokens = self.usage_metadata.get("cache_creation_input_tokens", 0) - cache_read_tokens = self.usage_metadata.get("cache_read_input_tokens", 0) - + input_tokens = self.usage_metadata.get( + "input_tokens", 0 + ) + output_tokens = self.usage_metadata.get( + "output_tokens", 0 + ) + cache_creation_tokens = self.usage_metadata.get( + "cache_creation_input_tokens", 0 + ) + cache_read_tokens = self.usage_metadata.get( + "cache_read_input_tokens", 0 + ) + # Claude Sonnet 4.5 pricing per million tokens: # - Regular input: $3.00 # - Cache creation: $3.75 # - Cache read: $0.30 # - Output: $15.00 cost = ( - (input_tokens / 1_000_000 * 3.0) + - (cache_creation_tokens / 1_000_000 * 3.75) + - (cache_read_tokens / 1_000_000 * 0.30) + - (output_tokens / 1_000_000 * 15.0) + (input_tokens / 1_000_000 * 3.0) + + (cache_creation_tokens / 1_000_000 * 3.75) + + (cache_read_tokens / 1_000_000 * 0.30) + + (output_tokens / 1_000_000 * 15.0) ) self.usage_metadata["estimated_cost_usd"] = cost - + # Display usage breakdown self.ui.console.print(f" [dim]Usage:[/dim]") if input_tokens > 0: - self.ui.console.print(f" [dim] input: {input_tokens:,} tokens[/dim]") + self.ui.console.print( + f" [dim] input: {input_tokens:,} tokens[/dim]" + ) if cache_creation_tokens > 0: - self.ui.console.print(f" [dim] cache creation: {cache_creation_tokens:,} tokens[/dim]") + self.ui.console.print( + f" [dim] cache creation: {cache_creation_tokens:,} tokens[/dim]" + ) if cache_read_tokens > 0: - self.ui.console.print(f" [dim] cache read: {cache_read_tokens:,} tokens[/dim]") + self.ui.console.print( + f" [dim] cache read: {cache_read_tokens:,} tokens[/dim]" + ) if output_tokens > 0: - self.ui.console.print(f" [dim] output: {output_tokens:,} tokens[/dim]") - self.ui.console.print(f" [dim] total cost: ${cost:.4f}[/dim]") + self.ui.console.print( + f" [dim] output: {output_tokens:,} tokens[/dim]" + ) + self.ui.console.print( + f" [dim] total cost: ${cost:.4f}[/dim]" + ) result: Dict[str, Any] = { "script_path": script_path, - "usage": self.usage_metadata + "usage": self.usage_metadata, } self.message_store.save_result(result) return result @@ -144,14 +176,19 @@ def run_reverse_engineering( output_dir: Optional[str] = None, verbose: bool = True, sdk: str = "claude", + opencode_provider: Optional[str] = None, + opencode_model: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Run reverse engineering with the specified SDK. - + Args: sdk: "opencode" or "claude" - determines which SDK to use + opencode_provider: Provider ID for OpenCode (e.g., "anthropic") + opencode_model: Model ID for OpenCode (e.g., "claude-sonnet-4-5") """ if sdk == "opencode": from .opencode_engineer import OpenCodeEngineer + engineer = OpenCodeEngineer( run_id=run_id, har_path=har_path, @@ -160,6 +197,8 @@ def run_reverse_engineering( additional_instructions=additional_instructions, output_dir=output_dir, verbose=verbose, + opencode_provider=opencode_provider, + opencode_model=opencode_model, ) else: engineer = ClaudeEngineer( @@ -171,5 +210,5 @@ def run_reverse_engineering( output_dir=output_dir, verbose=verbose, ) - + return asyncio.run(engineer.analyze_and_generate()) diff --git a/src/reverse_api/opencode_engineer.py b/src/reverse_api/opencode_engineer.py index 649dd51..baf4ebe 100644 --- a/src/reverse_api/opencode_engineer.py +++ b/src/reverse_api/opencode_engineer.py @@ -19,6 +19,7 @@ # Enable debug mode with OPENCODE_DEBUG=1 DEBUG = os.environ.get("OPENCODE_DEBUG", "0") == "1" + def debug_log(msg: str): """Print debug message if DEBUG mode is enabled.""" if DEBUG: @@ -28,9 +29,9 @@ def debug_log(msg: str): class OpenCodeEngineer(BaseEngineer): """Uses OpenCode AI to analyze HAR files and generate Python API scripts.""" - + BASE_URL = "http://127.0.0.1:4096" - + # Map short model names to full Anthropic model IDs MODEL_MAP = { "sonnet": "claude-sonnet-4-5", @@ -43,63 +44,66 @@ def __init__(self, *args, **kwargs): # Override UI with OpenCode-specific version self.opencode_ui = OpenCodeUI(verbose=kwargs.get("verbose", True)) self.ui = self.opencode_ui # Ensure base class uses our specialized UI + self.opencode_provider = kwargs.get("opencode_provider", "anthropic") + self.opencode_model = kwargs.get("opencode_model", "claude-sonnet-4-5") self._last_error: Optional[str] = None self._session_id: Optional[str] = None self._last_event_time = 0.0 async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: """Run the reverse engineering analysis with OpenCode.""" - self.opencode_ui.header(self.run_id, self.prompt, self.model) + self.opencode_ui.header(self.run_id, self.prompt, self.opencode_model) self.opencode_ui.start_analysis() - + # Save the prompt to messages self.message_store.save_prompt(self._build_analysis_prompt()) try: - async with httpx.AsyncClient(base_url=self.BASE_URL, timeout=600.0) as client: + async with httpx.AsyncClient( + base_url=self.BASE_URL, timeout=600.0 + ) as client: # Create a new session r = await client.post("/session", json={}) r.raise_for_status() session_data = r.json() self._session_id = session_data["id"] - + self.opencode_ui.session_created(self._session_id) - + # Start event stream BEFORE sending message event_task = asyncio.create_task(self._stream_events(client)) - + # Give event stream a moment to connect await asyncio.sleep(0.1) - + # Send prompt with correct format # POST /session/:id/message with model object - # Resolve short model name to full Anthropic ID - model_id = self.MODEL_MAP.get(self.model, self.model) if self.model else "claude-sonnet-4-5-20250514" - + # Resolve short model name to full model ID if needed + model_id = self.MODEL_MAP.get(self.opencode_model, self.opencode_model) + prompt_body = { "model": { - "providerID": "anthropic", - "modelID": model_id + "providerID": self.opencode_provider, + "modelID": model_id, }, - "parts": [{"type": "text", "text": self._build_analysis_prompt()}] + "parts": [{"type": "text", "text": self._build_analysis_prompt()}], } - + prompt_r = await client.post( - f"/session/{self._session_id}/message", - json=prompt_body + f"/session/{self._session_id}/message", json=prompt_body ) prompt_r.raise_for_status() - + # Wait for events to complete try: await asyncio.wait_for(event_task, timeout=600.0) except asyncio.TimeoutError: self._last_error = "Session timed out (10 min)" self.opencode_ui.error(self._last_error) - + # Stop streaming UI self.opencode_ui.stop_streaming() - + # Check for errors if self._last_error: self.opencode_ui.error(self._last_error) @@ -107,9 +111,9 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: return None # Success - script_path = str(self.scripts_dir / 'api_client.py') + script_path = str(self.scripts_dir / "api_client.py") self.opencode_ui.success(script_path) - + result_data: Dict[str, Any] = { "script_path": script_path, "usage": self.usage_metadata, @@ -120,10 +124,12 @@ async def analyze_and_generate(self) -> Optional[Dict[str, Any]]: except httpx.ConnectError: self.opencode_ui.error("Connection error") - self.opencode_ui.console.print("\n[dim]Make sure OpenCode is running: opencode[/dim]") + self.opencode_ui.console.print( + "\n[dim]Make sure OpenCode is running: opencode[/dim]" + ) self.message_store.save_error("Connection error") return None - + except Exception as e: error_msg = str(e) if str(e) else "Unknown error" self.opencode_ui.error(error_msg) @@ -134,13 +140,14 @@ async def _stream_events(self, client: httpx.AsyncClient): """Stream events from OpenCode and update UI.""" seen_parts: set = set() # Track part IDs to avoid duplicates import time + self._last_event_time = time.time() - + debug_log("Starting event stream...") - + # Start the live display self.opencode_ui.start_streaming() - + try: debug_log(f"Connecting to GET /event") async with client.stream("GET", "/event", timeout=None) as response: @@ -148,7 +155,7 @@ async def _stream_events(self, client: httpx.AsyncClient): async for line in response.aiter_lines(): if not line: continue - + # SSE format: "data: {...}" if not line.startswith("data: "): if line.startswith("data:"): @@ -158,40 +165,44 @@ async def _stream_events(self, client: httpx.AsyncClient): continue else: line_data = line[6:].strip() - + if not line_data: continue - + self._last_event_time = time.time() - + try: data = json.loads(line_data) except json.JSONDecodeError as e: debug_log(f"JSON decode error: {e}, data: {line_data[:100]}") continue - + event_type = data.get("type") properties = data.get("properties", {}) - + debug_log(f"Event: {event_type}") - + # Handle different event types if event_type == "message.part.updated": await self._handle_part_update(properties, seen_parts) - + elif event_type == "session.idle": event_sid = properties.get("sessionID") - debug_log(f"session.idle: sessionID={event_sid}, our session={self._session_id}") + debug_log( + f"session.idle: sessionID={event_sid}, our session={self._session_id}" + ) if event_sid == self._session_id: debug_log("Our session is idle, returning!") self.opencode_ui.session_status("idle") return # Done! - + elif event_type == "session.status": event_sid = properties.get("sessionID") status = properties.get("status", {}) status_type = status.get("type", "idle") - debug_log(f"session.status: sessionID={event_sid}, status={status_type}") + debug_log( + f"session.status: sessionID={event_sid}, status={status_type}" + ) if event_sid == self._session_id: if status_type == "retry": attempt = status.get("attempt", 1) @@ -199,78 +210,86 @@ async def _stream_events(self, client: httpx.AsyncClient): self.opencode_ui.session_retry(attempt, message) else: self.opencode_ui.session_status(status_type) - + if status_type == "idle": debug_log("Our session status is idle, returning!") return # Done! - + elif event_type == "permission.updated": # Auto-approve permissions so the agent can proceed permission_id = properties.get("id") perm_session = properties.get("sessionID") perm_type = properties.get("type", "") perm_title = properties.get("title", "") - - debug_log(f"permission.updated: id={permission_id}, type={perm_type}, title={perm_title}") - + + debug_log( + f"permission.updated: id={permission_id}, type={perm_type}, title={perm_title}" + ) + if perm_session == self._session_id and permission_id: # Show permission request in UI self.opencode_ui.permission_requested(perm_type, perm_title) - + # Auto-approve the permission # OpenCode expects: "once" | "always" | "reject" debug_log(f"Auto-approving permission {permission_id}") try: perm_response = await client.post( f"/session/{self._session_id}/permissions/{permission_id}", - json={"response": "always"} # "once", "always", or "reject" + json={ + "response": "always" + }, # "once", "always", or "reject" + ) + debug_log( + f"Permission response: {perm_response.status_code}" ) - debug_log(f"Permission response: {perm_response.status_code}") if perm_response.status_code == 200: self.opencode_ui.permission_approved(perm_type) except Exception as pe: debug_log(f"Permission approval failed: {pe}") - + elif event_type == "todo.updated": todos = properties.get("todos", []) event_sid = properties.get("sessionID") if event_sid == self._session_id and todos: debug_log(f"todo.updated: {len(todos)} todos") self.opencode_ui.todo_updated(todos) - + elif event_type == "file.edited": file_path = properties.get("file", "") if file_path: debug_log(f"file.edited: {file_path}") self.opencode_ui.file_edited(file_path) - + elif event_type == "session.diff": event_sid = properties.get("sessionID") diffs = properties.get("diff", []) if event_sid == self._session_id and diffs: debug_log(f"session.diff: {len(diffs)} files changed") self.opencode_ui.session_diff(diffs) - + elif event_type == "session.compacted": event_sid = properties.get("sessionID") if event_sid == self._session_id: debug_log("session.compacted") self.opencode_ui.session_compacted() - + elif event_type == "session.error": event_sid = properties.get("sessionID") if event_sid and event_sid != self._session_id: - debug_log(f"session.error for other session {event_sid}, ignoring") + debug_log( + f"session.error for other session {event_sid}, ignoring" + ) continue - + error_obj = properties.get("error", {}) debug_log(f"session.error: {error_obj}") - + # Parse error with type-specific handling if isinstance(error_obj, dict): error_name = error_obj.get("name", "UnknownError") error_data = error_obj.get("data", {}) - + if error_name == "ProviderAuthError": provider = error_data.get("providerID", "unknown") msg = error_data.get("message", "Authentication failed") @@ -282,14 +301,20 @@ async def _stream_events(self, client: httpx.AsyncClient): elif error_name == "MessageAbortedError": self._last_error = "Aborted" else: - msg = error_data.get("message", "") if isinstance(error_data, dict) else str(error_data) - self._last_error = f"{error_name}: {msg}" if msg else error_name + msg = ( + error_data.get("message", "") + if isinstance(error_data, dict) + else str(error_data) + ) + self._last_error = ( + f"{error_name}: {msg}" if msg else error_name + ) else: self._last_error = str(error_obj) - + self.opencode_ui.error(self._last_error) return - + except httpx.ReadError as e: self._last_error = f"Stream disconnected: {e}" except Exception as e: @@ -299,63 +324,75 @@ async def _handle_part_update(self, properties: dict, seen_parts: set): """Handle message.part.updated events.""" part = properties.get("part", {}) delta = properties.get("delta") # Incremental text update - + part_id = part.get("id", "") part_type = part.get("type") part_session = part.get("sessionID") - + # Only process parts for our session if part_session != self._session_id: return - + if part_type == "text": text = part.get("text", "") - debug_log(f"Handling text part: id={part_id}, delta={'yes' if delta else 'no'}, len={len(text)}") + debug_log( + f"Handling text part: id={part_id}, delta={'yes' if delta else 'no'}, len={len(text)}" + ) # Use delta for incremental updates if available self.opencode_ui.update_text(text, delta) - + # Save to message store (only significant updates) if len(text) > 50 and part_id not in seen_parts: seen_parts.add(part_id) self.message_store.save_thinking(text) - + elif part_type == "tool": tool_name = part.get("tool", "tool") state = part.get("state", {}) status = state.get("status") - - debug_log(f"Handling tool part: id={part_id}, tool={tool_name}, status={status}") - + + debug_log( + f"Handling tool part: id={part_id}, tool={tool_name}, status={status}" + ) + if status == "running" and part_id not in seen_parts: seen_parts.add(part_id) tool_input = state.get("input", {}) self.opencode_ui.tool_start(tool_name, tool_input) self.message_store.save_tool_start(tool_name, tool_input) - + elif status == "completed": output = state.get("output", "") self.opencode_ui.tool_result(tool_name, False, output) self.message_store.save_tool_result(tool_name, False, output) - + elif status == "error": error = state.get("error", "Tool error") self.opencode_ui.tool_result(tool_name, True, error) self.message_store.save_tool_result(tool_name, True, error) - + elif part_type == "step-finish": debug_log("Handling step-finish part") # Extract usage stats cost = part.get("cost", 0) tokens = part.get("tokens", {}) - + self.opencode_ui.step_finish(cost, tokens) - + # Update usage metadata - self.usage_metadata["input_tokens"] = self.usage_metadata.get("input_tokens", 0) + tokens.get("input", 0) - self.usage_metadata["output_tokens"] = self.usage_metadata.get("output_tokens", 0) + tokens.get("output", 0) + self.usage_metadata["input_tokens"] = self.usage_metadata.get( + "input_tokens", 0 + ) + tokens.get("input", 0) + self.usage_metadata["output_tokens"] = self.usage_metadata.get( + "output_tokens", 0 + ) + tokens.get("output", 0) cache = tokens.get("cache", {}) - self.usage_metadata["cache_read_tokens"] = self.usage_metadata.get("cache_read_tokens", 0) + cache.get("read", 0) - self.usage_metadata["cache_creation_tokens"] = self.usage_metadata.get("cache_creation_tokens", 0) + cache.get("write", 0) + self.usage_metadata["cache_read_tokens"] = self.usage_metadata.get( + "cache_read_tokens", 0 + ) + cache.get("read", 0) + self.usage_metadata["cache_creation_tokens"] = self.usage_metadata.get( + "cache_creation_tokens", 0 + ) + cache.get("write", 0) self.usage_metadata["cost"] = self.usage_metadata.get("cost", 0) + cost @@ -367,6 +404,8 @@ def run_opencode_engineering( additional_instructions: Optional[str] = None, output_dir: Optional[str] = None, verbose: bool = True, + opencode_provider: Optional[str] = None, + opencode_model: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Synchronous wrapper for OpenCode reverse engineering.""" engineer = OpenCodeEngineer( @@ -377,5 +416,7 @@ def run_opencode_engineering( additional_instructions=additional_instructions, output_dir=output_dir, verbose=verbose, + opencode_provider=opencode_provider, + opencode_model=opencode_model, ) return asyncio.run(engineer.analyze_and_generate())