From ef7c7d41b4cf37be9a251f94d002df035c16ddcc Mon Sep 17 00:00:00 2001 From: kalil0321 Date: Fri, 26 Dec 2025 20:31:42 +0100 Subject: [PATCH 1/2] fix: remove hardcoded version display --- src/reverse_api/browser.py | 10 ++- src/reverse_api/opencode_ui.py | 116 ++++++++++++++++++--------------- 2 files changed, 72 insertions(+), 54 deletions(-) diff --git a/src/reverse_api/browser.py b/src/reverse_api/browser.py index 8ac7cf4..a549c98 100644 --- a/src/reverse_api/browser.py +++ b/src/reverse_api/browser.py @@ -19,6 +19,7 @@ from rich.status import Status from .utils import get_har_dir, get_timestamp +from . import __version__ console = Console() @@ -386,7 +387,7 @@ def _start_with_stealth_chromium(self, start_url: Optional[str] = None) -> Path: # Wait for browser to close try: while self._context.pages: - self._context.pages[0].wait_for_timeout(100) + self._context.pages[0].wait_for_timeout(100) except Exception: pass @@ -435,7 +436,7 @@ def close(self) -> Path: time.sleep(1) status.update(" [dim]saving har file...[/dim]") - self._context.close() + self._context.close() if self.har_path.exists(): har_size = self.har_path.stat().st_size @@ -908,7 +909,10 @@ async def _run_with_stagehand(self) -> dict: empty_har = { "log": { "version": "1.2", - "creator": {"name": "reverse-api-engineer", "version": "0.2.0"}, + "creator": { + "name": "reverse-api-engineer", + "version": __version__, + }, "pages": [], "entries": [], } diff --git a/src/reverse_api/opencode_ui.py b/src/reverse_api/opencode_ui.py index 76a2774..c26260f 100644 --- a/src/reverse_api/opencode_ui.py +++ b/src/reverse_api/opencode_ui.py @@ -17,7 +17,7 @@ class OpenCodeUI: """Terminal UI for OpenCode with live streaming support.""" - + def __init__(self, console: Optional[Console] = None, verbose: bool = True): self.console = console or Console() self.verbose = verbose @@ -27,25 +27,29 @@ def __init__(self, console: Optional[Console] = None, verbose: bool = True): self._tool_status: str = "" self._session_status: str = "idle" self._tools_used: list[str] = [] - + def header(self, run_id: str, prompt: str, model: Optional[str] = None) -> None: """Display the session header.""" + from . import __version__ + self.console.print() - self.console.print(f" [white]reverse-api[/white] [dim]v0.1.0[/dim]") + self.console.print(f" [white]reverse-api[/white] [dim]v{__version__}[/dim]") self.console.print(f" [dim]━[/dim] [white]{run_id}[/white]") self.console.print(f" [dim]model[/dim] [white]{model or '---'}[/white]") - self.console.print(f" [{THEME_PRIMARY}]task[/{THEME_PRIMARY}] [white]{prompt}[/white]") + self.console.print( + f" [{THEME_PRIMARY}]task[/{THEME_PRIMARY}] [white]{prompt}[/white]" + ) self.console.print() - + def start_analysis(self) -> None: """Display analysis start message.""" self.console.print(f" [dim]decoding starting...[/dim]") self.console.print() - + def session_created(self, session_id: str) -> None: """Display session creation.""" self.console.print(f" [dim]session: {session_id[:16]}...[/dim]") - + def start_streaming(self) -> None: """Start the live display for streaming updates.""" self._live = Live( @@ -55,70 +59,74 @@ def start_streaming(self) -> None: transient=True, ) self._live.start() - + def stop_streaming(self) -> None: """Stop the live display.""" if self._live: self._live.stop() self._live = None - + def _build_display(self) -> Text: """Build the current display state.""" display = Text() - + # Show current tool if running if self._current_tool and self._tool_status == "running": display.append(f" ⟳ ", style=THEME_PRIMARY) display.append(f"{self._current_tool}", style="white") display.append(" running...\n", style=THEME_DIM) - + # Show streaming text (last 200 chars) if self._current_text: text_preview = self._current_text[-200:].replace("\n", " ").strip() if len(self._current_text) > 200: text_preview = "..." + text_preview display.append(f" {text_preview}\n", style=THEME_DIM) - + return display - + def update_text(self, text: str, delta: Optional[str] = None) -> None: """Update the streaming text display.""" if delta: self._current_text += delta else: self._current_text = text - + if self._live: self._live.update(self._build_display()) - + def tool_start(self, tool_name: str, tool_input: Optional[Dict] = None) -> None: """Display when a tool starts execution.""" self._current_tool = tool_name self._tool_status = "running" self._tools_used.append(tool_name) - + # Print tool start (this stays in terminal) input_summary = self._summarize_input(tool_name, tool_input or {}) self.console.print(f" [dim]>[/dim] {tool_name.lower():12} {input_summary}") - + if self._live: self._live.update(self._build_display()) - - def tool_result(self, tool_name: str, is_error: bool = False, output: Optional[str] = None) -> None: + + def tool_result( + self, tool_name: str, is_error: bool = False, output: Optional[str] = None + ) -> None: """Display when a tool completes.""" self._current_tool = None self._tool_status = "completed" if not is_error else "error" - + if self._live: self._live.update(self._build_display()) - + if is_error: - self.console.print(f" [red]![/red] {tool_name.lower()} failed", style=THEME_DIM) + self.console.print( + f" [red]![/red] {tool_name.lower()} failed", style=THEME_DIM + ) if output: # Show first 100 chars of error error_preview = str(output)[:100].replace("\n", " ") self.console.print(f" {error_preview}", style=THEME_DIM) - + def step_finish(self, cost: float, tokens: Dict[str, Any]) -> None: """Display step completion with usage stats.""" input_tokens = tokens.get("input", 0) @@ -126,15 +134,17 @@ def step_finish(self, cost: float, tokens: Dict[str, Any]) -> None: cache = tokens.get("cache", {}) cache_read = cache.get("read", 0) cache_write = cache.get("write", 0) - - self.console.print(f" [dim]step: {input_tokens:,}in/{output_tokens:,}out ${cost:.4f}[/dim]") - + + self.console.print( + f" [dim]step: {input_tokens:,}in/{output_tokens:,}out ${cost:.4f}[/dim]" + ) + def session_status(self, status_type: str) -> None: """Update session status.""" self._session_status = status_type if self._live: self._live.update(self._build_display()) - + def thinking(self, text: str) -> None: """Display thinking text (for compatibility with ClaudeUI).""" # For streaming, we use update_text instead @@ -144,37 +154,39 @@ def thinking(self, text: str) -> None: if len(text) > 100: display_text += "..." self.console.print(f" .. {display_text}", style=THEME_DIM) - + def success(self, script_path: str) -> None: """Display success message.""" self.console.print() self.console.print(f" [dim]decoding complete[/dim]") self.console.print(f" [white]{script_path}[/white]") self.console.print() - + def error(self, message: str) -> None: """Display error message.""" self.console.print() self.console.print(f" [dim]![/dim] [red]error:[/red] {message}") - + def permission_requested(self, perm_type: str, title: str) -> None: """Display when a permission is requested.""" - self.console.print(f" [yellow]?[/yellow] [dim]permission:[/dim] {title}", style=THEME_DIM) - + self.console.print( + f" [yellow]?[/yellow] [dim]permission:[/dim] {title}", style=THEME_DIM + ) + def permission_approved(self, perm_type: str) -> None: """Display when a permission is auto-approved.""" self.console.print(f" [green]✓[/green] [dim]auto-approved {perm_type}[/dim]") - + def todo_updated(self, todos: list) -> None: """Display todo list updates.""" if not todos: return - + # Count by status pending = sum(1 for t in todos if t.get("status") == "pending") completed = sum(1 for t in todos if t.get("status") == "completed") in_progress = sum(1 for t in todos if t.get("status") == "in_progress") - + parts = [] if in_progress: parts.append(f"{in_progress} active") @@ -182,56 +194,58 @@ def todo_updated(self, todos: list) -> None: parts.append(f"{pending} pending") if completed: parts.append(f"{completed} done") - + status_str = ", ".join(parts) if parts else f"{len(todos)} items" self.console.print(f" [dim]tasks:[/dim] {status_str}") - + def file_edited(self, file_path: str) -> None: """Display when a file is edited.""" short_path = self._truncate_path(file_path, 40) self.console.print(f" [dim]✎[/dim] {short_path}", style=THEME_DIM) - + def session_busy(self) -> None: """Display busy indicator.""" # This is handled by the live display spinner pass - + def session_idle(self) -> None: """Display idle state.""" # Usually means we're done pass - + def session_diff(self, diffs: list) -> None: """Display file changes summary.""" if not diffs: return - + total_add = sum(d.get("additions", 0) for d in diffs) total_del = sum(d.get("deletions", 0) for d in diffs) - + files_summary = f"{len(diffs)} file{'s' if len(diffs) > 1 else ''}" changes = [] if total_add: changes.append(f"+{total_add}") if total_del: changes.append(f"-{total_del}") - + change_str = " ".join(changes) if changes else "" self.console.print(f" [dim]diff:[/dim] {files_summary} {change_str}") - + def session_compacted(self) -> None: """Display context compaction notification.""" self.console.print(f" [dim]context compacted[/dim]") - + def session_retry(self, attempt: int, message: str) -> None: """Display retry status.""" reason = message if message else "retrying..." - self.console.print(f" [yellow]⟳[/yellow] [dim]attempt {attempt}:[/dim] {reason}") - + self.console.print( + f" [yellow]⟳[/yellow] [dim]attempt {attempt}:[/dim] {reason}" + ) + def _summarize_input(self, tool_name: str, tool_input: dict) -> str: """Create a brief summary of tool input.""" tool_lower = tool_name.lower() - + if tool_lower in ("read", "file_read"): path = tool_input.get("path", tool_input.get("file_path", "")) return f"[dim]{self._truncate_path(path)}[/dim]" @@ -250,11 +264,11 @@ def _summarize_input(self, tool_name: str, tool_input: dict) -> str: elif tool_lower == "todowrite": todos = tool_input.get("todos", []) return f"[dim]{len(todos)} items[/dim]" - + return "" - + def _truncate_path(self, path: str, max_len: int = 50) -> str: """Truncate a path for display.""" if len(path) <= max_len: return path - return "..." + path[-(max_len - 3):] + return "..." + path[-(max_len - 3) :] From 2be6d1662607b0ae57e94ee84f0a2b73b6f4d006 Mon Sep 17 00:00:00 2001 From: kalil0321 Date: Sat, 27 Dec 2025 00:27:09 +0100 Subject: [PATCH 2/2] fix: remove external logs --- src/reverse_api/__init__.py | 2 +- src/reverse_api/browser.py | 119 +++++++++++------------------------- src/reverse_api/engineer.py | 5 ++ src/reverse_api/utils.py | 5 ++ 4 files changed, 47 insertions(+), 84 deletions(-) diff --git a/src/reverse_api/__init__.py b/src/reverse_api/__init__.py index aba7ec4..8275de8 100644 --- a/src/reverse_api/__init__.py +++ b/src/reverse_api/__init__.py @@ -1,3 +1,3 @@ """Reverse API - Browser traffic capture for API reverse engineering.""" -__version__ = "0.2.2" +__version__ = "0.2.3" diff --git a/src/reverse_api/browser.py b/src/reverse_api/browser.py index a549c98..090dc1f 100644 --- a/src/reverse_api/browser.py +++ b/src/reverse_api/browser.py @@ -676,69 +676,42 @@ async def _run_with_har_capture(self, cdp_url: str = None) -> dict: async def _run_with_browser_use(self) -> dict: """Run agent with HAR recording via browser-use's built-in HAR capture.""" import logging + import os - # Custom handler to capture memory logs - class MemoryLogHandler(logging.Handler): - def __init__(self, console_instance): - super().__init__() - self.console = console_instance - - def emit(self, record): - try: - msg = record.getMessage() - - if "🧠 Memory:" in msg or ("Memory:" in msg and "🧠" not in msg): - if "🧠 Memory:" in msg: - memory_text = msg.split("🧠 Memory:", 1)[-1].strip() - elif "Memory:" in msg: - memory_text = msg.split("Memory:", 1)[-1].strip() - else: - memory_text = msg - - memory_text = " ".join(memory_text.split()) - - if memory_text: - self.console.print(f" [dim]{memory_text}[/dim]") - except Exception: - pass # Silently ignore handler errors - - # Suppress all browser-use logs completely - # Keep INFO level for memory capture, but prevent propagation to parent handlers - null_handler = logging.NullHandler() - - browser_use_logger = logging.getLogger("browser_use") - browser_use_logger.setLevel(logging.INFO) - browser_use_logger.handlers.clear() # Remove existing handlers - browser_use_logger.propagate = False - - agent_logger = logging.getLogger("Agent") - agent_logger.setLevel(logging.INFO) - agent_logger.handlers.clear() # Remove existing handlers - agent_logger.propagate = False - - # Suppress these loggers completely - for logger_name in ["BrowserSession", "service", "tools"]: - logger = logging.getLogger(logger_name) - logger.setLevel(logging.CRITICAL) - logger.handlers.clear() - logger.addHandler(null_handler) - logger.propagate = False + # Set browser-use logging level to warning to suppress INFO logs + os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'WARNING' try: from browser_use import Agent, Browser from browser_use import ChatBrowserUse + + # Suppress all browser-use loggers after import + def suppress_browser_use_logs(): + for logger_name in [ + "browser_use", + "Agent", + "BrowserSession", + "service", + "tools", + ]: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.CRITICAL) + logger.handlers.clear() + logger.addHandler(logging.NullHandler()) + logger.propagate = False + + suppress_browser_use_logs() except ImportError: result = { "success": False, "message": None, - "error": "browser-use is required for agent mode. Install it with: pip install 'reverse-api-engineer[agent]' or pip install browser-use", + "error": "browser-use is required for agent mode. Install it with: pip install git+https://github.com/browser-use/browser-use.git@49a345fb19e9f12befc5cc1658e0033873892455", } console.print(f" [red]error:[/red] {result['error']}") return result result = {"success": False, "message": None, "error": None} browser = None - memory_handler = None try: # Parse agent model and validate API key @@ -787,60 +760,40 @@ def emit(self, record): console.print(f" [red]error:[/red] {result['error']}") return result + suppress_browser_use_logs() + console.print(f" [dim]starting browser with har...[/dim]") browser = Browser( record_har_path=str(self.har_path), record_har_mode="full", record_har_content="attach", ) + + suppress_browser_use_logs() + await browser.start() + suppress_browser_use_logs() + console.print(f" [dim]browser started[/dim]") task = self.prompt - # Set up memory log handler - memory_handler = MemoryLogHandler(console) - memory_handler.setLevel(logging.INFO) - browser_use_logger.addHandler(memory_handler) - agent_logger.addHandler(memory_handler) - agent = Agent(task=task, llm=llm, browser=browser) - agent_result = await agent.run() - if memory_handler: - browser_use_logger.removeHandler(memory_handler) - agent_logger.removeHandler(memory_handler) + suppress_browser_use_logs() + + agent_result = await agent.run() - # Extract final result from agent_result + # Extract final result using browser-use's built-in method final_message = None - if agent_result: - if hasattr(agent_result, "all_results") and agent_result.all_results: - last_result = agent_result.all_results[-1] - if hasattr(last_result, "text") and last_result.text: - final_message = last_result.text - elif hasattr(last_result, "result") and last_result.result: - final_message = str(last_result.result) - elif hasattr(agent_result, "result"): - final_message = str(agent_result.result) - elif hasattr(agent_result, "text"): - final_message = agent_result.text - else: - msg_str = str(agent_result) - if "Final Result:" in msg_str: - parts = msg_str.split("Final Result:") - if len(parts) > 1: - final_message = parts[-1].strip() - else: - final_message = msg_str + if agent_result and hasattr(agent_result, "final_result"): + final_message = agent_result.final_result() result["success"] = True result["message"] = final_message or "Task completed" except Exception as e: - if memory_handler: - browser_use_logger.removeHandler(memory_handler) - agent_logger.removeHandler(memory_handler) result["error"] = str(e) console.print(f" [yellow]agent error: {e}[/yellow]") finally: @@ -1066,11 +1019,11 @@ def start(self) -> Path: if result.get("success"): console.print(f" [green]agent task completed[/green]") - if result.get("message"): + if result.get("message") and result["message"] != "Task completed": msg = result["message"] if len(msg) > 500: msg = msg[:500] + "..." - console.print(f" [dim]result:[/dim] [white]{msg}[/white]") + console.print(f" [dim]result:[/dim]\n [white]{msg}[/white]") else: error = result.get("error", "unknown error") console.print(f" [yellow]agent error: {error}[/yellow]") diff --git a/src/reverse_api/engineer.py b/src/reverse_api/engineer.py index ee8c6bb..ef7744b 100644 --- a/src/reverse_api/engineer.py +++ b/src/reverse_api/engineer.py @@ -1,6 +1,7 @@ """Reverse engineering module with SDK dispatch.""" import asyncio +import logging from pathlib import Path from typing import Optional, Dict, Any @@ -16,6 +17,10 @@ from .base_engineer import BaseEngineer +# Suppress claude_agent_sdk logs +logging.getLogger("claude_agent_sdk").setLevel(logging.WARNING) +logging.getLogger("claude_agent_sdk._internal.transport.subprocess_cli").setLevel(logging.WARNING) + class ClaudeEngineer(BaseEngineer): """Uses Claude Agent SDK to analyze HAR files and generate Python API scripts.""" diff --git a/src/reverse_api/utils.py b/src/reverse_api/utils.py index f3396dd..f7e00b4 100644 --- a/src/reverse_api/utils.py +++ b/src/reverse_api/utils.py @@ -24,8 +24,13 @@ def generate_folder_name(prompt: str) -> str: async def _generate_folder_name_async(prompt: str) -> str: """Async helper to generate folder name using Claude Agent SDK.""" + import logging from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock + # Suppress claude_agent_sdk logs + logging.getLogger("claude_agent_sdk").setLevel(logging.WARNING) + logging.getLogger("claude_agent_sdk._internal.transport.subprocess_cli").setLevel(logging.WARNING) + options = ClaudeAgentOptions( allowed_tools=[], # No tools needed for simple text generation permission_mode="dontAsk",