diff --git a/CHANGELOG.md b/CHANGELOG.md index b96273a..b67328d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed - **`browser-use` and `stagehand` agent providers**: Both have been retired due to upstream churn. Agent mode now offers only `auto` (Playwright MCP) and `chrome-mcp` (Chrome DevTools MCP). Configs with `agent_provider` set to `browser-use` or `stagehand` migrate to `auto`; the `browser_use_model` and `stagehand_model` keys are dropped silently. The `[agent]` optional-dependency extra is removed. +- **Tag system (`@record-only`, `@codegen`, `@id`, `@docs`, `@help`)**: The inline tag syntax inside REPL prompts has been removed. Surviving capabilities have first-class CLI flags / arguments instead: + - `@record-only` → `manual --no-engineer` (already existed) + - `@id ` → `engineer ` (positional, already existed) + - `@id ` → `engineer --prompt "..."` (new flag; layered as additional instructions on top of the captured run's original goal) + - `@id --fresh ` → `engineer --fresh --prompt "..."` (new flag; with `--fresh`, `--prompt` fully replaces the original goal) + - `@codegen` and Playwright-action recording: removed entirely with the `ActionRecorder` and `playwright_codegen` modules (low usage, replaced by the standard capture+engineer flow) + - `@docs` (OpenAPI generation): removed for now; will return as a dedicated `docs` subcommand in a follow-up + - `@help`: superseded by the existing `/help` slash command in the REPL +- **`--reverse-engineer/--no-engineer` flag on `agent`**: was parsed but never propagated to the auto-capture path — removed. Agent mode runs an integrated capture + engineering pipeline; use `manual --no-engineer` for HAR-only recordings. + +### Added +- **`engineer --prompt`** and **`engineer --fresh`** CLI flags to replace the equivalent `@id [--fresh] ` REPL syntax. ## [0.7.1] - 2026-04-06 diff --git a/README.md b/README.md index f475c2a..b160bac 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,6 @@ No more manual reverse engineering—just browse, capture, and get clean API cod - [Engineer Mode](#engineer-mode) - [Agent Mode](#agent-mode) - [Collector Mode](#collector-mode) -- [Tags](#tags) - [Configuration](#-configuration) - [Model Selection](#model-selection) - [Agent Configuration](#agent-configuration) @@ -57,7 +56,6 @@ No more manual reverse engineering—just browse, capture, and get clean API cod - 📦 **Production Ready**: Generated scripts include error handling, type hints, and documentation - 💾 **Session History**: All runs saved locally with full message logs - 💰 **Cost Tracking**: Detailed token usage and cost estimation with cache support -- 🏷️ **Tag System**: Powerful tags for fine-grained control (@record-only, @codegen, @docs, @id) ### Limitations @@ -200,42 +198,6 @@ $ reverse-api-engineer # Data saved to: ./collected/js_frameworks/ ``` -## 🏷️ Tags - -Tags provide additional control and functionality within each mode: - -### Manual/Agent Mode Tags - -- **`@record-only`** - Record HAR file only, skip reverse engineering step - - Example: `@record-only navigate checkout flow` - - Useful when you want to capture traffic for later analysis - -- **`@codegen`** - Record browser actions and generate Playwright automation script - - Example: `@codegen navigate to google` - - Captures clicks, fills, and navigations to create a reusable Playwright script - -### Engineer Mode Tags - -- **`@id `** - Switch context to a specific run ID - - Example: `@id abc123` - - Loads a previous capture session for re-engineering - -- **`@id `** - Run engineer on a specific run with instructions - - Example: `@id abc123 extract user profile` - - Re-processes a capture with new instructions - -- **`@id --fresh `** - Start fresh (ignore previous scripts) - - Example: `@id abc123 --fresh restart analysis` - - Generates new code from scratch, ignoring previous implementations - -- **`@docs`** - Generate API documentation (OpenAPI spec) for the latest run - - Example: `@docs` - - Creates OpenAPI specification from captured traffic - -- **`@id @docs`** - Generate API documentation for a specific run - - Example: `@id abc123 @docs` - - Creates OpenAPI specification for a specific capture session - ## 🔧 Configuration Settings stored in `~/.reverse-api/config.json`: diff --git a/src/reverse_api/action_recorder.py b/src/reverse_api/action_recorder.py deleted file mode 100644 index 490e2e1..0000000 --- a/src/reverse_api/action_recorder.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Action recording infrastructure for manual browser sessions.""" - -import json -from dataclasses import asdict, dataclass -from pathlib import Path -from typing import List, Optional - - -@dataclass -class RecordedAction: - """A single recorded browser action.""" - - type: str # "click", "fill", "navigate", "press" - selector: Optional[str] = None - value: Optional[str] = None - url: Optional[str] = None - timestamp: float = 0.0 - metadata: Optional[dict] = None - - -class ActionRecorder: - """Records browser actions during manual sessions.""" - - def __init__(self): - self.actions: List[RecordedAction] = [] - - def add_action(self, action: RecordedAction) -> None: - """Add an action to the recording.""" - self.actions.append(action) - - def get_actions(self) -> List[RecordedAction]: - """Get all recorded actions.""" - return self.actions - - def save(self, path: Path) -> None: - """Save actions to a JSON file.""" - data = [asdict(action) for action in self.actions] - with open(path, "w") as f: - json.dump(data, f, indent=2) - - @classmethod - def load(cls, path: Path) -> "ActionRecorder": - """Load actions from a JSON file.""" - recorder = cls() - if path.exists(): - with open(path) as f: - data = json.load(f) - for item in data: - recorder.add_action(RecordedAction(**item)) - return recorder - diff --git a/src/reverse_api/base_engineer.py b/src/reverse_api/base_engineer.py index 4fd91b4..dba7d56 100644 --- a/src/reverse_api/base_engineer.py +++ b/src/reverse_api/base_engineer.py @@ -504,7 +504,6 @@ def _build_prompts(self) -> tuple[str, str]: scripts_dir=str(self.scripts_dir), existing_client_guidance=self._get_existing_client_guidance(), additional_instructions=additional_instructions, - tag_extra="@docs" if is_docs else "", tag_mode_label="Documentation" if is_docs else "Re-engineer", run_id=self.run_id, har_parent=str(self.har_path.parent), diff --git a/src/reverse_api/browser.py b/src/reverse_api/browser.py index 548f895..8986147 100644 --- a/src/reverse_api/browser.py +++ b/src/reverse_api/browser.py @@ -12,7 +12,6 @@ from rich.console import Console from rich.status import Status -from .action_recorder import ActionRecorder, RecordedAction from .utils import get_har_dir, get_timestamp console = Console() @@ -188,13 +187,11 @@ def __init__( prompt: str, output_dir: str | None = None, use_real_chrome: bool = True, # New option to use real Chrome - enable_action_recording: bool = False, ): self.run_id = run_id self.prompt = prompt self.output_dir = output_dir self.use_real_chrome = use_real_chrome - self.enable_action_recording = enable_action_recording self.har_dir = get_har_dir(run_id, output_dir) self.har_path = self.har_dir / "recording.har" @@ -207,193 +204,6 @@ def __init__( self._user_agent = random.choice(USER_AGENTS) self._using_persistent = False # Track if using persistent context - self.action_recorder = ActionRecorder() if enable_action_recording else None - - def _inject_action_recorder(self, page: Page) -> None: - """Inject action recording script into page. - - Uses console.log + page.on('console') for reliable capture. - Works best with stealth Chromium mode (not real Chrome). - """ - if not self.enable_action_recording: - return - - # Simple JS that logs actions to console with a special prefix - recorder_js = """ - window.__recordedActions = []; - window.__lastUrl = null; - - document.addEventListener('click', (e) => { - const el = e.target; - - // Build a robust selector by traversing up to find parent with ID - function buildSelector(element) { - // Priority 1: data-testid on element or close parent - let current = element; - for (let i = 0; i < 3 && current; i++) { - if (current.dataset && current.dataset.testid) { - const path = i === 0 ? '' : ' ' + getPathFromAncestor(current, element); - return '[data-testid="' + current.dataset.testid.replace(/"/g, '\\"') + '"]' + path; - } - current = current.parentElement; - } - - // Priority 2: element has short ID - if (element.id && element.id.length < 20) { - return '#' + element.id; - } - - // Priority 3: find parent with ID and build path - current = element.parentElement; - let depth = 1; - while (current && depth < 5) { - if (current.id && current.id.length < 20) { - const path = getPathFromAncestor(current, element); - const selector = '#' + current.id + ' > ' + path; - // Verify it's unique - if (document.querySelectorAll(selector).length === 1) { - return selector; - } - } - current = current.parentElement; - depth++; - } - - // Priority 4: name attribute - if (element.name) { - return '[name="' + element.name.replace(/"/g, '\\"') + '"]'; - } - - // Priority 5: aria-label - if (element.getAttribute && element.getAttribute('aria-label')) { - return '[aria-label="' + element.getAttribute('aria-label').replace(/"/g, '\\"') + '"]'; - } - - // Priority 6: role + text for buttons - if ((element.tagName === 'BUTTON' || element.role === 'button') && element.innerText) { - const text = element.innerText.trim().substring(0, 30); - if (text && !text.includes('\\n')) { - return 'button:has-text(' + JSON.stringify(text) + ')'; - } - } - - // Priority 7: link text - if (element.tagName === 'A' && element.innerText) { - const text = element.innerText.trim().substring(0, 30); - if (text && !text.includes('\\n')) { - return 'a:has-text(' + JSON.stringify(text) + ')'; - } - } - - // Priority 8: tag + class - if (element.className && typeof element.className === 'string') { - const cls = element.className.split(' ').filter(c => c && c.length < 30 && !c.includes('hover') && !c.includes('active'))[0]; - if (cls) { - const baseSelector = element.tagName.toLowerCase() + '.' + cls; - const matches = document.querySelectorAll(baseSelector); - if (matches.length === 1) return baseSelector; - const idx = Array.from(matches).indexOf(element); - if (idx >= 0) return baseSelector + ' >> nth=' + idx; - } - } - - // Fallback: tag name - return element.tagName.toLowerCase(); - } - - // Get path from ancestor to descendant - function getPathFromAncestor(ancestor, descendant) { - if (ancestor === descendant) return descendant.tagName.toLowerCase(); - - let path = []; - let current = descendant; - while (current && current !== ancestor) { - path.unshift(current.tagName.toLowerCase()); - current = current.parentElement; - } - return path.join(' > '); - } - - const selector = buildSelector(el); - const action = {type: 'click', selector: selector, timestamp: Date.now()}; - window.__recordedActions.push(action); - console.log('__ACTION__' + JSON.stringify(action)); - }, true); - - document.addEventListener('input', (e) => { - if (e.target.tagName === 'INPUT' || e.target.tagName === 'TEXTAREA') { - const el = e.target; - let selector = ''; - if (el.id && el.id.length < 20) selector = '#' + el.id; - else if (el.name) selector = '[name="' + el.name.replace(/"/g, '\\"') + '"]'; - else if (el.placeholder) selector = '[placeholder="' + el.placeholder.replace(/"/g, '\\"') + '"]'; - else selector = el.tagName.toLowerCase(); - - const action = {type: 'fill', selector: selector, value: el.value, timestamp: Date.now()}; - window.__recordedActions.push(action); - console.log('__ACTION__' + JSON.stringify(action)); - } - }, true); - - document.addEventListener('keydown', (e) => { - if (e.key === 'Enter') { - const el = e.target; - let selector = ''; - if (el.id && el.id.length < 20) selector = '#' + el.id; - else if (el.name) selector = '[name="' + el.name.replace(/"/g, '\\"') + '"]'; - else selector = el.tagName.toLowerCase(); - - const action = {type: 'press', selector: selector, value: 'Enter', timestamp: Date.now()}; - window.__recordedActions.push(action); - console.log('__ACTION__' + JSON.stringify(action)); - } - }, true); - - // Only log navigation for top-level main frame, not iframes - if (window === window.top) { - const url = window.location.href; - // Skip about:blank, blob:, data:, service workers, embeds - if (url && !url.startsWith('about:') && !url.startsWith('blob:') && - !url.startsWith('data:') && !url.includes('/embed') && - !url.includes('service_worker') && !url.includes('googletagmanager')) { - // Only log if URL changed - if (url !== window.__lastUrl) { - window.__lastUrl = url; - console.log('__ACTION__' + JSON.stringify({type: 'navigate', url: url, timestamp: Date.now()})); - } - } - } - """ - - # Listen to console for actions - import json - - last_url = [None] # Mutable to track last URL - - def on_console(msg): - text = msg.text - if text.startswith("__ACTION__"): - try: - action_json = text[10:] # Remove '__ACTION__' prefix - action_data = json.loads(action_json) - - # Filter duplicate navigations - if action_data.get("type") == "navigate": - url = action_data.get("url", "") - if url == last_url[0]: - return # Skip duplicate - last_url[0] = url - - if self.action_recorder: - self.action_recorder.add_action(RecordedAction(**action_data)) - except Exception as e: - console.print(f" [dim]action parse error: {e}[/dim]") - - page.on("console", on_console) - page.add_init_script(recorder_js) - - console.print(" [dim]action recording enabled[/dim]") - def _save_metadata(self, end_time: str) -> None: """Save run metadata to JSON file.""" metadata = { @@ -460,9 +270,6 @@ def _start_with_real_chrome(self, start_url: str | None = None) -> Path: # For HAR recording & context page = self._context.new_page() - if self.enable_action_recording: - self._inject_action_recorder(page) - if start_url: page.goto(start_url, wait_until="domcontentloaded") else: @@ -555,9 +362,6 @@ def _start_with_stealth_chromium(self, start_url: str | None = None) -> Path: # Open initial page page = self._context.new_page() - if self.enable_action_recording: - self._inject_action_recorder(page) - if start_url: page.goto(start_url, wait_until="domcontentloaded") else: @@ -652,13 +456,5 @@ def close(self) -> Path: console.print(" [dim]capture saved[/dim]") console.print(" [dim]metadata synced[/dim]") - if self.action_recorder: - try: - actions_path = self.har_dir / "actions.json" - self.action_recorder.save(actions_path) - console.print(" [dim]actions saved[/dim]") - except Exception as e: - console.print(f" [yellow]warning: error saving actions: {e}[/yellow]") - return self.har_path diff --git a/src/reverse_api/cli.py b/src/reverse_api/cli.py index 7716f9d..0d97a3f 100644 --- a/src/reverse_api/cli.py +++ b/src/reverse_api/cli.py @@ -1,7 +1,6 @@ import asyncio import json import random -import re from pathlib import Path import click @@ -21,7 +20,6 @@ from .config import ConfigManager from .engineer import run_reverse_engineering from .messages import MessageStore -from .playwright_codegen import PlaywrightCodeGenerator from .session import SessionManager from .tui import ( ERROR_CTA, @@ -38,17 +36,12 @@ discover_scripts, generate_folder_name, generate_run_id, - get_actions_path, get_base_output_dir, get_config_path, get_har_dir, get_history_path, get_messages_path, - get_scripts_dir, get_timestamp, - parse_codegen_tag, - parse_engineer_prompt, - parse_record_only_tag, resolve_run, ) @@ -139,60 +132,15 @@ def get_completions(self, document, complete_event): for cmd in commands: if cmd.startswith(text): yield Completion(cmd, start_position=-len(text)) - # Tag completion for manual/agent modes - elif mode_state["mode"] in ("manual", "agent") and text.startswith("@"): - # Tags for manual/agent modes with descriptions - tags = [ - ("@record-only", "record HAR only, skip reverse engineering"), - ("@codegen", "record actions and generate Playwright script"), - ("@help", "show mode-specific help"), - ] - for tag, meta in tags: - if tag.startswith(text): + # Run-id completion in engineer mode + elif mode_state["mode"] == "engineer" and text: + for run_id in self._get_run_ids(): + if run_id.startswith(text): yield Completion( - tag, + run_id, start_position=-len(text), - display_meta=meta, + display_meta=self._get_run_meta(run_id), ) - # Tag completion in engineer mode - elif mode_state["mode"] == "engineer" and text: - if text.startswith("@"): - # Tag completion with descriptions - tags = [ - ("@id", "switch context to run ID"), - ("@docs", "generate API documentation"), - ("@help", "show engineer mode help"), - ] - - # specific check for @id completion - id_match = re.match(r"@id\s+(.*)", text) - if id_match: - prefix = id_match.group(1) - for run_id in self._get_run_ids(): - if run_id.startswith(prefix): - yield Completion( - run_id, - start_position=-len(prefix), - display_meta=self._get_run_meta(run_id), - ) - else: - # Suggest tags with descriptions - for tag, meta in tags: - if tag.startswith(text): - yield Completion( - tag, - start_position=-len(text), - display_meta=meta, - ) - - else: - for run_id in self._get_run_ids(): - if run_id.startswith(text): - yield Completion( - run_id, - start_position=-len(text), - display_meta=self._get_run_meta(run_id), - ) def _get_run_ids(self): """Get all run IDs from history (newest first).""" @@ -291,9 +239,6 @@ def get_prompt(): if prompt.startswith("/"): return {"command": prompt.lower(), "mode": mode_state["mode"]} - if prompt.strip() == "@help": - return {"command": "@help", "mode": mode_state["mode"]} - # Return mode in all cases result_mode = mode_state["mode"] @@ -439,15 +384,6 @@ def repl_loop(): handle_history(mode_color) elif cmd == "/help" or cmd == "/commands": handle_help(mode_color) - elif cmd == "@help": - if current_mode == "engineer": - handle_engineer_help(mode_color) - elif current_mode == "agent": - handle_agent_help(mode_color) - elif current_mode == "collector": - handle_collector_help(mode_color) - elif current_mode == "manual": - handle_manual_help(mode_color) elif cmd.startswith("/messages"): parts = cmd.split(maxsplit=1) if len(parts) > 1: @@ -464,77 +400,30 @@ def repl_loop(): # Handle different modes if mode == "engineer": - # Engineer mode raw_input = options.get("run_id") - # Handle empty input if not raw_input: - console.print(" [dim]Usage:[/dim] @id [instructions]") - console.print(" [dim] [/dim] @docs (generate docs for latest run)") - console.print(" [dim] [/dim] @id @docs [prompt]") - console.print(" [dim] [/dim] (to switch context)") - continue - - # Parse tag with session_manager to resolve latest run centrally - parsed = parse_engineer_prompt(raw_input, session_manager) - - # Handle parser errors - if parsed["error"]: - console.print(f" [red]error:[/red] {parsed['error']}") + console.print(" [dim]Usage:[/dim] (switch context)") + console.print(" [dim] [/dim] (re-engineer the latest run)") continue - target_run_id = parsed["run_id"] - is_fresh = parsed["fresh"] - is_docs = parsed["docs"] - user_text = parsed["prompt"] - - main_prompt = None - add_instr = None - - if parsed["is_tag_command"]: - # Explicit @id or @docs command - # Validate HAR file exists for @docs mode - if is_docs and target_run_id: - run_data = session_manager.get_run(target_run_id) - if run_data: - paths = run_data.get("paths", {}) - har_dir = Path(paths.get("har_dir", get_har_dir(target_run_id, None))) - else: - har_dir = get_har_dir(target_run_id, None) - - har_path = har_dir / "recording.har" - if not har_path.exists(): - console.print(f" [red]error:[/red] run {target_run_id} has no HAR file") - console.print(" [dim]tip:[/dim] use @id @docs to specify a run with captured traffic") - continue - - if not target_run_id: - console.print(" [red]error:[/red] invalid @id syntax") - continue - - # If fresh, user text is new prompt. Else, it's additive. - if is_fresh: - main_prompt = user_text if user_text else None - else: - add_instr = user_text if user_text else None - + # Two cases: input matches a known run_id (switch context) or + # it's free text (additive instructions on the latest run). + if session_manager.get_run(raw_input): + target_run_id = raw_input + add_instr = None else: - # Implicit mode - parser already resolved latest run - # Check if input is just a run_id (switching context) - if session_manager.get_run(user_text): - target_run_id = user_text - # Just switching run, no new instructions - else: - # Parser resolved latest run, treat input as additive instructions - add_instr = user_text + latest = session_manager.get_history(limit=1) + if not latest: + console.print(" [red]error:[/red] no runs found in history") + continue + target_run_id = latest[0]["run_id"] + add_instr = raw_input run_engineer( target_run_id, - prompt=main_prompt, model=options.get("model"), additional_instructions=add_instr, - is_fresh=is_fresh, - output_mode="docs" if is_docs else "client", ) continue @@ -543,7 +432,6 @@ def repl_loop(): run_agent_capture( prompt=options["prompt"], url=options.get("url"), - reverse_engineer=True, # Enable reverse engineering model=options.get("model"), ) continue @@ -904,130 +792,6 @@ def handle_history(mode_color=THEME_PRIMARY): console.print(" [dim]> not found[/dim]") -def handle_manual_help(mode_color=THEME_PRIMARY): - """Show help specific to manual mode.""" - from rich.table import Table - - console.print() - console.print(" [bold white]Manual Mode Help[/bold white]") - console.print(" [dim]Launch a browser for manual interaction and capture traffic.[/dim]") - console.print() - - table = Table(show_header=False, box=None, padding=(0, 1)) - table.add_column(style=f"{mode_color} bold", justify="left", width=30) - table.add_column(style="white", justify="left") - - table.add_row("", "Describe the task/goal for the session.\n[dim]Example: extract jobs from apple.com[/dim]") - table.add_row("", "") - - table.add_row("@record-only [prompt]", "Record HAR only, skip reverse engineering.\n[dim]Example: @record-only[/dim]") - table.add_row("", "") - - table.add_row("@codegen [prompt]", "Record actions and generate Playwright script.\n[dim]Example: @codegen navigate to google[/dim]") - table.add_row("", "") - - table.add_row("Shift+Tab", "Cycle to other modes (Engineer, Agent).") - - console.print(table) - console.print() - - -def handle_agent_help(mode_color=THEME_PRIMARY): - """Show help specific to agent mode.""" - from rich.table import Table - - console.print() - console.print(" [bold white]Agent Mode Help[/bold white]") - console.print(" [dim]Launch an autonomous AI agent to navigate and perform tasks.[/dim]") - console.print() - - table = Table(show_header=False, box=None, padding=(0, 1)) - table.add_column(style=f"{mode_color} bold", justify="left", width=30) - table.add_column(style="white", justify="left") - - table.add_row("", "Instruction for the autonomous agent.\n[dim]Example: Go to google.com and search for 'OpenAI'[/dim]") - table.add_row("", "") - - table.add_row("@record-only ", "Record HAR only, skip reverse engineering.\n[dim]Example: @record-only navigate checkout flow[/dim]") - table.add_row("", "") - - table.add_row("Shift+Tab", "Cycle to other modes (Manual, Engineer).") - table.add_row("", "") - - table.add_row("Ctrl+R", "Fill prompt with a random task suggestion.\n[dim]Press multiple times to cycle through ideas.[/dim]") - - console.print(table) - console.print() - - -def handle_collector_help(mode_color=THEME_PRIMARY): - """Show help specific to collector mode.""" - from rich.table import Table - - console.print() - console.print(" [bold white]Collector Mode Help[/bold white]") - console.print(" [dim]AI-powered web data collection. Describe what data you want, get JSON/CSV output.[/dim]") - console.print() - - table = Table(show_header=False, box=None, padding=(0, 1)) - table.add_column(style=f"{mode_color} bold", justify="left", width=30) - table.add_column(style="white", justify="left") - - table.add_row( - "", "Describe data to collect in natural language.\n[dim]Example: Find 10 YC W24 AI startups with name, website, funding[/dim]" - ) - table.add_row("", "") - - table.add_row("Output", "JSON + CSV saved to ./collected//") - table.add_row("", "") - - table.add_row("Shift+Tab", "Cycle to other modes.") - - console.print(table) - console.print() - - -def handle_engineer_help(mode_color=THEME_PRIMARY): - """Show help specific to engineer mode.""" - from rich.table import Table - - console.print() - console.print(" [bold white]Engineer Mode Help[/bold white]") - console.print(" [dim]Reverse engineer APIs from captured sessions (HAR files).[/dim]") - console.print() - - # Syntax table - table = Table(show_header=False, box=None, padding=(0, 1)) - table.add_column(style=f"{mode_color} bold", justify="left", width=30) - table.add_column(style="white", justify="left") - - table.add_row("@id ", "Switch context to a specific run ID.\n[dim]Example: @id abc123[/dim]") - table.add_row("", "") - - table.add_row("@id ", "Run engineer on a specific run with instructions.\n[dim]Example: @id abc123 extract user profile[/dim]") - table.add_row("", "") - - table.add_row( - "@id --fresh ", - "Start fresh (ignore previous scripts) with new instructions.\n[dim]Example: @id abc123 --fresh restart analysis[/dim]", - ) - table.add_row("", "") - - table.add_row("", "Quick context switch (same as @id ).\n[dim]Example: abc123[/dim]") - table.add_row("", "") - - table.add_row("", "Run engineer on the *current* context/latest run.\n[dim]Example: improve error handling[/dim]") - table.add_row("", "") - - table.add_row("@docs", "Generate API documentation (OpenAPI spec) for the latest run.\n[dim]Example: @docs[/dim]") - table.add_row("", "") - - table.add_row("@id @docs", "Generate API documentation for a specific run.\n[dim]Example: @id abc123 @docs[/dim]") - - console.print(table) - console.print() - - def handle_help(mode_color=THEME_PRIMARY): """Show enhanced help with command details and examples.""" from rich.table import Table @@ -1167,12 +931,6 @@ def manual(prompt, url, reverse_engineer, model, output_dir): @main.command() @click.option("--prompt", "-p", default=None, help="Instruction for the autonomous agent.") @click.option("--url", "-u", default=None, help="Optional starting URL.") -@click.option( - "--reverse-engineer/--no-engineer", - "reverse_engineer", - default=True, - help="Run reverse engineering after capture.", -) @click.option( "--model", "-m", @@ -1180,9 +938,13 @@ def manual(prompt, url, reverse_engineer, model, output_dir): default=None, ) @click.option("--output-dir", "-o", default=None, help="Custom output directory.") -def agent(prompt, url, reverse_engineer, model, output_dir): - """Run autonomous agent browser session.""" - run_agent_capture(prompt=prompt, url=url, reverse_engineer=reverse_engineer, model=model, output_dir=output_dir) +def agent(prompt, url, model, output_dir): + """Run autonomous agent browser session. + + Agent mode runs an integrated capture + reverse-engineering pipeline; if you + only want a HAR recording, use `manual --no-engineer` instead. + """ + run_agent_capture(prompt=prompt, url=url, model=model, output_dir=output_dir) def run_manual_capture(prompt=None, url=None, reverse_engineer=True, model=None, output_dir=None): @@ -1203,15 +965,6 @@ def run_manual_capture(prompt=None, url=None, reverse_engineer=True, model=None, reverse_engineer = options["reverse_engineer"] model = options["model"] - # Parse @record-only tag - if present, skip reverse engineering - prompt, is_record_only = parse_record_only_tag(prompt) - prompt, is_codegen = parse_codegen_tag(prompt) - - if is_record_only: - reverse_engineer = False - if is_codegen: - reverse_engineer = False - run_id = generate_run_id() timestamp = get_timestamp() sdk = config_manager.get("sdk", "claude") @@ -1228,7 +981,7 @@ def run_manual_capture(prompt=None, url=None, reverse_engineer=True, model=None, paths={"har_dir": str(get_har_dir(run_id, output_dir))}, ) - browser = ManualBrowser(run_id=run_id, prompt=prompt, output_dir=output_dir, enable_action_recording=is_codegen) + browser = ManualBrowser(run_id=run_id, prompt=prompt, output_dir=output_dir) har_path = browser.start(start_url=url) if reverse_engineer: @@ -1247,17 +1000,13 @@ def run_manual_capture(prompt=None, url=None, reverse_engineer=True, model=None, usage=result.get("usage", {}), paths={"script_path": result.get("script_path")}, ) - elif is_codegen: - # Generate Playwright script from recorded actions - run_playwright_codegen(run_id, prompt, output_dir, start_url=url) - elif is_record_only: - # Show helpful message for record-only mode + else: console.print(" [dim]>[/dim] [white]recording complete[/white]") console.print(f" [dim]>[/dim] [white]run_id: {run_id}[/white]") - console.print(f" [dim]>[/dim] [dim]use @id {run_id} to engineer later[/dim]\n") + console.print(f" [dim]>[/dim] [dim]use 'reverse-api-engineer engineer {run_id}' to engineer later[/dim]\n") -def run_agent_capture(prompt=None, url=None, reverse_engineer=False, model=None, output_dir=None): +def run_agent_capture(prompt=None, url=None, model=None, output_dir=None): """Shared logic for agent capture mode.""" output_dir = output_dir or config_manager.get("output_dir") @@ -1265,26 +1014,15 @@ def run_agent_capture(prompt=None, url=None, reverse_engineer=False, model=None, options = prompt_interactive_options( prompt=prompt, url=url, - reverse_engineer=reverse_engineer, + reverse_engineer=False, model=model, ) if "command" in options: return prompt = options["prompt"] url = options["url"] - reverse_engineer = options["reverse_engineer"] model = options["model"] - # Parse @record-only tag - if present, skip reverse engineering - prompt, is_record_only = parse_record_only_tag(prompt) - if is_record_only: - reverse_engineer = False - # Agent mode requires a prompt for @record-only - if not prompt or not prompt.strip(): - console.print(" [red]error:[/red] @record-only requires a prompt in agent mode") - console.print(" [dim]tip:[/dim] @record-only - e.g., @record-only navigate checkout flow") - return - agent_provider = config_manager.get("agent_provider", "auto") return run_auto_capture( prompt=prompt, @@ -1466,48 +1204,23 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p return None -def run_playwright_codegen(run_id: str, prompt: str, output_dir: str | None = None, start_url: str | None = None): - """Generate Playwright script from recorded actions.""" - actions_path = get_actions_path(run_id, output_dir) - if not actions_path.exists(): - console.print(" [red]error:[/red] no actions recorded") - return - - from .action_recorder import ActionRecorder - - actions = ActionRecorder.load(actions_path) - action_list = actions.get_actions() - - # If no explicit start_url, extract from first navigate action - if not start_url and action_list: - if action_list[0].type == "navigate" and action_list[0].url: - start_url = action_list[0].url - - generator = PlaywrightCodeGenerator(action_list, start_url=start_url) - script = generator.generate() - - scripts_dir = get_scripts_dir(run_id, output_dir) - - # Handle duplicate file paths - script_path = scripts_dir / "automation.py" - if script_path.exists(): - i = 1 - while (scripts_dir / f"automation_{i}.py").exists(): - i += 1 - script_path = scripts_dir / f"automation_{i}.py" - - script_path.write_text(script) - - # Also write requirements.txt - (scripts_dir / "requirements.txt").write_text("playwright\n") - - console.print(" [dim]>[/dim] [white]codegen complete[/white]") - console.print(f" [dim]>[/dim] [white]{script_path}[/white]") - console.print(f" [dim]>[/dim] [dim]run with: uv run python {script_path}[/dim]\n") - @main.command() @click.argument("run_id") +@click.option( + "--prompt", + "-p", + default=None, + help=( + "With --fresh: replace the captured run's original goal. " + "Without --fresh: layered as additional instructions on top of the original prompt." + ), +) +@click.option( + "--fresh", + is_flag=True, + help="Ignore previously generated scripts and re-engineer from scratch.", +) @click.option( "--model", "-m", @@ -1515,9 +1228,20 @@ def run_playwright_codegen(run_id: str, prompt: str, output_dir: str | None = No default=None, ) @click.option("--output-dir", "-o", default=None, help="Custom output directory.") -def engineer(run_id, model, output_dir): +def engineer(run_id, prompt, fresh, model, output_dir): """Run reverse engineering on a previous run.""" - run_engineer(run_id, model=model, output_dir=output_dir) + # --fresh treats --prompt as a full replacement of the original goal; + # without --fresh, --prompt is additive so the captured run's context is preserved. + main_prompt = prompt if fresh else None + additional = prompt if (prompt and not fresh) else None + run_engineer( + run_id, + prompt=main_prompt, + additional_instructions=additional, + model=model, + output_dir=output_dir, + is_fresh=fresh, + ) def run_engineer( diff --git a/src/reverse_api/playwright_codegen.py b/src/reverse_api/playwright_codegen.py deleted file mode 100644 index 3e1d63c..0000000 --- a/src/reverse_api/playwright_codegen.py +++ /dev/null @@ -1,133 +0,0 @@ -"""Generates Playwright automation scripts from recorded actions.""" - -import json -from typing import List - -from .action_recorder import RecordedAction - - -class PlaywrightCodeGenerator: - """Generates Playwright automation scripts from recorded actions.""" - - def __init__(self, actions: List[RecordedAction], start_url: str | None = None): - self.actions = self._clean_actions(actions) - self.start_url = start_url - - def _clean_actions(self, actions: List[RecordedAction]) -> List[RecordedAction]: - """Clean up actions (deduplicate fills, remove redundant events).""" - cleaned = [] - - for i, action in enumerate(actions): - # For 'fill' actions on the same selector, only keep the last one - if action.type == "fill": - # Look ahead to see if there's another fill for this selector - is_last_fill = True - for next_action in actions[i + 1 :]: - if next_action.type == "fill" and next_action.selector == action.selector: - is_last_fill = False - break - if next_action.type != "fill": - # If interaction changes (e.g. click), we must keep the current fill - break - - if is_last_fill: - cleaned.append(action) - else: - cleaned.append(action) - - return cleaned - - def _get_base_url(self, url: str | None) -> str | None: - """Extract base URL (without query params) for comparison.""" - if not url: - return None - from urllib.parse import urlparse - - parsed = urlparse(url) - return f"{parsed.scheme}://{parsed.netloc}{parsed.path}" - - def generate(self) -> str: - """Generate complete Playwright script.""" - lines = [ - '"""', - "Playwright automation script", - "Generated by reverse-api-engineer", - '"""', - "", - "from playwright.sync_api import sync_playwright", - "", - "", - "# Stealth Chrome arguments to avoid bot detection", - "STEALTH_ARGS = [", - ' "--disable-blink-features=AutomationControlled",', - ' "--disable-infobars",', - ' "--disable-dev-shm-usage",', - ' "--disable-extensions",', - ' "--no-first-run",', - ' "--no-default-browser-check",', - "]", - "", - "", - "def main():", - " with sync_playwright() as p:", - " # Launch browser with stealth settings", - " browser = p.chromium.launch(", - " headless=False,", - " args=STEALTH_ARGS,", - ' ignore_default_args=["--enable-automation"],', - " )", - " context = browser.new_context(", - ' user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",', - " )", - " page = context.new_page()", - "", - ] - - if self.start_url: - lines.append(f" # Start URL") - lines.append(f" page.goto({json.dumps(self.start_url)})") - lines.append("") - - current_url = self.start_url - last_nav_base = self._get_base_url(self.start_url) if self.start_url else None - - for action in self.actions: - if action.type == "click" and action.selector: - lines.append(f" page.click({json.dumps(action.selector)})") - - elif action.type == "fill" and action.selector and action.value is not None: - # Use json.dumps to safely escape newlines, quotes, and backslashes - lines.append(f" page.fill({json.dumps(action.selector)}, {json.dumps(action.value)})") - - elif action.type == "press" and action.selector and action.value is not None: - lines.append(f" page.press({json.dumps(action.selector)}, {json.dumps(action.value)})") - - elif action.type == "navigate": - # Skip if same as start URL or very similar (just different query params) - nav_base = self._get_base_url(action.url) - if nav_base != last_nav_base: - lines.append(f" page.goto({json.dumps(action.url)})") - last_nav_base = nav_base - else: - continue # Skip duplicate navigation, don't add delay - - # Add a small delay between actions as requested - lines.append(" page.wait_for_timeout(1000)") - - lines.extend( - [ - "", - " # Keep browser open for a moment", - " page.wait_for_timeout(2000)", - "", - " context.close()", - " browser.close()", - "", - "", - "if __name__ == '__main__':", - " main()", - "", - ] - ) - - return "\n".join(lines) diff --git a/src/reverse_api/prompts/engineer/user.md b/src/reverse_api/prompts/engineer/user.md index b265f47..5fe5f17 100644 --- a/src/reverse_api/prompts/engineer/user.md +++ b/src/reverse_api/prompts/engineer/user.md @@ -14,16 +14,14 @@ Here is the output directory where you should save your generated files: {existing_client_guidance} {additional_instructions} -## Tag-Based Workflows +## Run Context -This session uses tag-based context loading: - -- **@id ** {tag_extra}: {tag_mode_label} mode active - - Target run: {run_id} - - HAR location: {har_parent} - - Existing {existing_label}: {scripts_dir} - - Message history: {messages_path} (available for reference if needed) - - Fresh mode: {is_fresh} +- Mode: {tag_mode_label} +- Target run: {run_id} +- HAR location: {har_parent} +- Existing {existing_label}: {scripts_dir} +- Message history: {messages_path} (available for reference if needed) +- Fresh mode: {is_fresh} By default, treat this as an iterative refinement. The user's prompt describes changes or improvements to make to the existing {existing_artifact}. If fresh mode is enabled, diff --git a/src/reverse_api/utils.py b/src/reverse_api/utils.py index e6b4da1..3f74752 100644 --- a/src/reverse_api/utils.py +++ b/src/reverse_api/utils.py @@ -253,136 +253,6 @@ def _slugify(text: str) -> str: return "_".join(words)[:50] -def parse_engineer_prompt(input_text: str, session_manager=None) -> dict: - """Parse engineer mode input for tags. - - Args: - input_text: The raw input text to parse - session_manager: Optional SessionManager to resolve latest run_id when needed - - Returns: - dict: { - "run_id": str | None, - "fresh": bool, - "docs": bool, - "prompt": str, - "is_tag_command": bool, - "error": str | None # Error message if validation failed - } - """ - if not input_text: - return { - "run_id": None, - "fresh": False, - "docs": False, - "prompt": "", - "is_tag_command": False, - "error": None, - } - - # Check for standalone @docs first (no prompt parameter) - if input_text.strip() == "@docs": - # Resolve latest run if session_manager provided - run_id = None - error = None - if session_manager: - latest_runs = session_manager.get_history(limit=1) - if not latest_runs: - error = "no runs found in history" - else: - run_id = latest_runs[0]["run_id"] - - return { - "run_id": run_id, - "fresh": False, - "docs": True, - "prompt": "", - "is_tag_command": True, - "error": error, - } - - # Enhanced regex for @id [--fresh] [@docs] - # Group 1: run_id - # Group 2: fresh flag (optional) - # Group 3: docs flag (optional) - # Group 4: prompt (optional) - pattern = r"@id\s+([a-zA-Z0-9-_]+)(?:\s+(--fresh))?(?:\s+(@docs))?(?:\s+(.*))?" - match = re.match(pattern, input_text.strip()) - - if match: - run_id = match.group(1) - fresh = bool(match.group(2)) - docs = bool(match.group(3)) - remaining_prompt = match.group(4) or "" - return { - "run_id": run_id, - "fresh": fresh, - "docs": docs, - "prompt": remaining_prompt, - "is_tag_command": True, - "error": None, - } - - # Implicit mode - resolve latest run if session_manager provided - run_id = None - error = None - if session_manager: - latest_runs = session_manager.get_history(limit=1) - if not latest_runs: - error = "no runs found in history" - else: - run_id = latest_runs[0]["run_id"] - - return { - "run_id": run_id, - "fresh": False, - "docs": False, - "prompt": input_text.strip(), - "is_tag_command": False, - "error": error, - } - - -def parse_record_only_tag(prompt: str) -> tuple[str, bool]: - """Parse @record-only tag from prompt. - - When present, skips reverse engineering and only records HAR. - - Args: - prompt: The user prompt that may contain @record-only tag - - Returns: - tuple: (cleaned_prompt, is_record_only) - """ - if not prompt: - return "", False - - # Match @record-only anywhere in the prompt (case-insensitive) - pattern = r"@record-only\s*" - if re.search(pattern, prompt, re.IGNORECASE): - cleaned = re.sub(pattern, "", prompt, flags=re.IGNORECASE).strip() - return cleaned, True - return prompt, False - - -def parse_codegen_tag(prompt: str) -> tuple[str, bool]: - """Parse @codegen tag from prompt. - - When present, records actions and generates Playwright script instead of API client. - - Returns: - tuple: (cleaned_prompt, is_codegen) - """ - if not prompt: - return "", False - - pattern = r"@codegen\s*" - if re.search(pattern, prompt, re.IGNORECASE): - cleaned = re.sub(pattern, "", prompt, flags=re.IGNORECASE).strip() - return cleaned, True - return prompt, False - - def generate_run_id() -> str: """Generate a unique run ID using a short UUID format.""" return uuid.uuid4().hex[:12] @@ -459,12 +329,6 @@ def get_har_dir(run_id: str, output_dir: str | None = None) -> Path: return har_dir -def get_actions_path(run_id: str, output_dir: str | None = None) -> Path: - """Get the actions JSON file path for a specific run.""" - har_dir = get_har_dir(run_id, output_dir) - return har_dir / "actions.json" - - def get_scripts_dir(run_id: str, output_dir: str | None = None) -> Path: """Get the scripts directory for a specific run. diff --git a/tests/test_action_recorder.py b/tests/test_action_recorder.py deleted file mode 100644 index 001be45..0000000 --- a/tests/test_action_recorder.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Tests for action_recorder.py and playwright_codegen.py.""" - -import json -from pathlib import Path - -import pytest - -from reverse_api.action_recorder import ActionRecorder, RecordedAction -from reverse_api.playwright_codegen import PlaywrightCodeGenerator - - -class TestRecordedAction: - """Test RecordedAction dataclass.""" - - def test_default_values(self): - """RecordedAction has correct defaults.""" - action = RecordedAction(type="click") - assert action.type == "click" - assert action.selector is None - assert action.value is None - assert action.url is None - assert action.timestamp == 0.0 - assert action.metadata is None - - def test_all_fields(self): - """RecordedAction accepts all fields.""" - action = RecordedAction( - type="fill", - selector="#input", - value="test", - url="https://example.com", - timestamp=1.5, - metadata={"key": "value"}, - ) - assert action.type == "fill" - assert action.selector == "#input" - assert action.value == "test" - assert action.url == "https://example.com" - assert action.timestamp == 1.5 - assert action.metadata == {"key": "value"} - - -class TestActionRecorder: - """Test ActionRecorder class.""" - - def test_empty_recorder(self): - """New recorder has no actions.""" - recorder = ActionRecorder() - assert recorder.get_actions() == [] - - def test_add_action(self): - """Add action to recorder.""" - recorder = ActionRecorder() - action = RecordedAction(type="click", selector="#btn") - recorder.add_action(action) - assert len(recorder.get_actions()) == 1 - assert recorder.get_actions()[0].selector == "#btn" - - def test_add_multiple_actions(self): - """Add multiple actions.""" - recorder = ActionRecorder() - recorder.add_action(RecordedAction(type="click", selector="#btn1")) - recorder.add_action(RecordedAction(type="fill", selector="#input", value="text")) - recorder.add_action(RecordedAction(type="press", selector="#input", value="Enter")) - assert len(recorder.get_actions()) == 3 - - def test_save_and_load(self, tmp_path): - """Save and load actions round-trip.""" - recorder = ActionRecorder() - recorder.add_action(RecordedAction(type="click", selector="#btn", timestamp=1.0)) - recorder.add_action(RecordedAction(type="fill", selector="#input", value="hello", timestamp=2.0)) - - save_path = tmp_path / "actions.json" - recorder.save(save_path) - - # Verify saved format - with open(save_path) as f: - data = json.load(f) - assert len(data) == 2 - assert data[0]["type"] == "click" - assert data[1]["value"] == "hello" - - # Load back - loaded = ActionRecorder.load(save_path) - assert len(loaded.get_actions()) == 2 - assert loaded.get_actions()[0].type == "click" - assert loaded.get_actions()[1].value == "hello" - - def test_load_nonexistent_file(self, tmp_path): - """Loading from nonexistent file returns empty recorder.""" - loaded = ActionRecorder.load(tmp_path / "nonexistent.json") - assert loaded.get_actions() == [] - - def test_save_with_metadata(self, tmp_path): - """Save action with metadata.""" - recorder = ActionRecorder() - recorder.add_action( - RecordedAction(type="click", selector="#btn", metadata={"visible": True}) - ) - save_path = tmp_path / "actions.json" - recorder.save(save_path) - - loaded = ActionRecorder.load(save_path) - assert loaded.get_actions()[0].metadata == {"visible": True} - - -class TestPlaywrightCodeGenerator: - """Test PlaywrightCodeGenerator class.""" - - def test_empty_actions(self): - """Generate script with no actions.""" - gen = PlaywrightCodeGenerator([]) - code = gen.generate() - assert "playwright" in code.lower() - assert "def main():" in code - assert "browser.close()" in code - - def test_with_start_url(self): - """Generate script with start URL.""" - gen = PlaywrightCodeGenerator([], start_url="https://example.com") - code = gen.generate() - assert "https://example.com" in code - assert "page.goto" in code - - def test_click_action(self): - """Generate click action.""" - actions = [RecordedAction(type="click", selector="#submit-btn")] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert 'page.click("#submit-btn")' in code - - def test_fill_action(self): - """Generate fill action.""" - actions = [RecordedAction(type="fill", selector="#email", value="test@example.com")] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert 'page.fill("#email", "test@example.com")' in code - - def test_press_action(self): - """Generate press action.""" - actions = [RecordedAction(type="press", selector="#input", value="Enter")] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert 'page.press("#input", "Enter")' in code - - def test_navigate_action(self): - """Generate navigate action.""" - actions = [RecordedAction(type="navigate", url="https://example.com/page2")] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert "https://example.com/page2" in code - - def test_navigate_deduplication(self): - """Duplicate navigations to same base URL are skipped.""" - actions = [ - RecordedAction(type="navigate", url="https://example.com/page?q=1"), - RecordedAction(type="navigate", url="https://example.com/page?q=2"), - ] - gen = PlaywrightCodeGenerator(actions, start_url="https://example.com") - code = gen.generate() - # The two navigations have same base path, so second should be skipped - goto_count = code.count("page.goto") - # start_url goto + first navigate only (second deduplicated) - assert goto_count >= 1 - - def test_fill_deduplication(self): - """Consecutive fills to same selector keeps only last.""" - actions = [ - RecordedAction(type="fill", selector="#input", value="first"), - RecordedAction(type="fill", selector="#input", value="second"), - RecordedAction(type="fill", selector="#input", value="final"), - ] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - # Only the last fill should remain - assert '"final"' in code - assert '"first"' not in code - assert '"second"' not in code - - def test_fill_kept_when_interrupted(self): - """Fill is kept when followed by different action type.""" - actions = [ - RecordedAction(type="fill", selector="#input", value="kept"), - RecordedAction(type="click", selector="#submit"), - RecordedAction(type="fill", selector="#input", value="also_kept"), - ] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert '"kept"' in code - assert '"also_kept"' in code - - def test_wait_between_actions(self): - """Actions have wait_for_timeout between them.""" - actions = [ - RecordedAction(type="click", selector="#btn1"), - RecordedAction(type="click", selector="#btn2"), - ] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - assert "wait_for_timeout(1000)" in code - - def test_stealth_args(self): - """Generated script includes stealth arguments.""" - gen = PlaywrightCodeGenerator([]) - code = gen.generate() - assert "STEALTH_ARGS" in code - assert "AutomationControlled" in code - - def test_main_guard(self): - """Generated script has __main__ guard.""" - gen = PlaywrightCodeGenerator([]) - code = gen.generate() - assert "if __name__" in code - - def test_special_chars_in_value(self): - """Special characters in fill value are properly escaped.""" - actions = [RecordedAction(type="fill", selector="#input", value='test"with\nnewline')] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - # json.dumps should handle escaping - assert "page.fill" in code - # Should not have raw newline in the fill call - assert "\\n" in code or "newline" in code - - def test_get_base_url_none(self): - """_get_base_url returns None for None input.""" - gen = PlaywrightCodeGenerator([]) - assert gen._get_base_url(None) is None - assert gen._get_base_url("") is None - - def test_get_base_url_strips_query(self): - """_get_base_url strips query parameters.""" - gen = PlaywrightCodeGenerator([]) - result = gen._get_base_url("https://example.com/path?q=1&r=2") - assert result == "https://example.com/path" - - def test_unknown_action_type(self): - """Unknown action type is handled gracefully.""" - actions = [RecordedAction(type="unknown_action", selector="#btn")] - gen = PlaywrightCodeGenerator(actions) - code = gen.generate() - # Should still generate valid code without crashing - assert "def main():" in code diff --git a/tests/test_base_engineer.py b/tests/test_base_engineer.py index e08ff65..b82b306 100644 --- a/tests/test_base_engineer.py +++ b/tests/test_base_engineer.py @@ -293,11 +293,11 @@ def test_prompt_includes_additional_instructions(self, tmp_path): system_prompt, user_message = eng._build_prompts() assert "Focus on auth" in user_message - def test_prompt_includes_tag_context(self, tmp_path): - """User message includes tag-based workflow context.""" + def test_prompt_includes_run_context(self, tmp_path): + """User message includes run context (target run id, mode label).""" eng = self._make_engineer(tmp_path) system_prompt, user_message = eng._build_prompts() - assert "Tag-Based Workflows" in user_message + assert "Run Context" in user_message assert eng.run_id in user_message def test_prompt_includes_existing_client_guidance(self, tmp_path): diff --git a/tests/test_cli_engineer_command.py b/tests/test_cli_engineer_command.py new file mode 100644 index 0000000..fb2acf1 --- /dev/null +++ b/tests/test_cli_engineer_command.py @@ -0,0 +1,65 @@ +"""Tests for the `engineer` click command wiring. + +Specifically: --prompt and --fresh must combine the way the old `@id +[--fresh] ` REPL syntax did. Without --fresh, --prompt is layered as +*additional instructions* so the captured run's original goal is preserved; +with --fresh, --prompt fully replaces the original goal. + +Regression for chatgpt-codex-connector PR #63 review (P2). +""" + +from unittest.mock import patch + +from click.testing import CliRunner + +from reverse_api.cli import engineer + + +class TestEngineerCommandPromptWiring: + """`engineer [--prompt] [--fresh]` flag combinations.""" + + def test_no_prompt_no_fresh(self): + runner = CliRunner() + with patch("reverse_api.cli.run_engineer") as mock_run: + result = runner.invoke(engineer, ["abc123"]) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] is None + assert kwargs["additional_instructions"] is None + assert kwargs["is_fresh"] is False + + def test_prompt_without_fresh_is_additive(self): + """Without --fresh, --prompt becomes additional_instructions so the + run's original prompt is still loaded by run_engineer. + """ + runner = CliRunner() + with patch("reverse_api.cli.run_engineer") as mock_run: + result = runner.invoke(engineer, ["abc123", "--prompt", "add pagination"]) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] is None, "must NOT pass user text as the main prompt without --fresh" + assert kwargs["additional_instructions"] == "add pagination" + assert kwargs["is_fresh"] is False + + def test_fresh_without_prompt(self): + runner = CliRunner() + with patch("reverse_api.cli.run_engineer") as mock_run: + result = runner.invoke(engineer, ["abc123", "--fresh"]) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] is None + assert kwargs["additional_instructions"] is None + assert kwargs["is_fresh"] is True + + def test_fresh_with_prompt_replaces_main(self): + """With --fresh, --prompt fully replaces the run's original goal.""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer") as mock_run: + result = runner.invoke( + engineer, ["abc123", "--fresh", "--prompt", "reverse engineer the auth flow only"] + ) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] == "reverse engineer the auth flow only" + assert kwargs["additional_instructions"] is None + assert kwargs["is_fresh"] is True diff --git a/tests/test_prompts.py b/tests/test_prompts.py index 42dcb63..750080e 100644 --- a/tests/test_prompts.py +++ b/tests/test_prompts.py @@ -99,7 +99,6 @@ def test_engineer_user_loads(self): scripts_dir="/tmp/scripts", existing_client_guidance="", additional_instructions="", - tag_extra="", tag_mode_label="Re-engineer", run_id="test123", har_parent="/tmp", diff --git a/tests/test_utils.py b/tests/test_utils.py index 663a91b..6f1f3be 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -11,7 +11,6 @@ check_for_updates, generate_folder_name, generate_run_id, - get_actions_path, get_app_dir, get_base_output_dir, get_collected_dir, @@ -22,9 +21,6 @@ get_messages_path, get_project_root, get_timestamp, - parse_codegen_tag, - parse_engineer_prompt, - parse_record_only_tag, ) @@ -68,177 +64,6 @@ def test_only_special_chars(self): assert result == "" -class TestParseEngineerPrompt: - """Test parse_engineer_prompt function.""" - - def test_empty_input(self): - """Empty input returns defaults.""" - result = parse_engineer_prompt("") - assert result["run_id"] is None - assert result["fresh"] is False - assert result["docs"] is False - assert result["prompt"] == "" - assert result["is_tag_command"] is False - assert result["error"] is None - - def test_none_input(self): - """None input returns defaults.""" - result = parse_engineer_prompt(None) - assert result["prompt"] == "" - - def test_standalone_docs_with_history(self): - """@docs resolves latest run from session manager.""" - mock_sm = MagicMock() - mock_sm.get_history.return_value = [{"run_id": "latest123"}] - result = parse_engineer_prompt("@docs", session_manager=mock_sm) - assert result["run_id"] == "latest123" - assert result["docs"] is True - assert result["is_tag_command"] is True - - def test_standalone_docs_no_history(self): - """@docs with empty history returns error.""" - mock_sm = MagicMock() - mock_sm.get_history.return_value = [] - result = parse_engineer_prompt("@docs", session_manager=mock_sm) - assert result["error"] == "no runs found in history" - - def test_standalone_docs_no_session_manager(self): - """@docs without session_manager returns None run_id.""" - result = parse_engineer_prompt("@docs") - assert result["run_id"] is None - assert result["docs"] is True - - def test_id_tag_basic(self): - """@id parses correctly.""" - result = parse_engineer_prompt("@id abc123 improve the auth") - assert result["run_id"] == "abc123" - assert result["prompt"] == "improve the auth" - assert result["is_tag_command"] is True - - def test_id_tag_with_fresh(self): - """@id with --fresh flag.""" - result = parse_engineer_prompt("@id abc123 --fresh start over") - assert result["run_id"] == "abc123" - assert result["fresh"] is True - assert result["prompt"] == "start over" - - def test_id_tag_with_docs(self): - """@id with @docs flag.""" - result = parse_engineer_prompt("@id abc123 @docs generate openapi") - assert result["run_id"] == "abc123" - assert result["docs"] is True - assert result["prompt"] == "generate openapi" - - def test_id_tag_with_fresh_and_docs(self): - """@id with both --fresh and @docs.""" - result = parse_engineer_prompt("@id abc123 --fresh @docs regenerate") - assert result["run_id"] == "abc123" - assert result["fresh"] is True - assert result["docs"] is True - assert result["prompt"] == "regenerate" - - def test_id_tag_no_prompt(self): - """@id with no trailing prompt.""" - result = parse_engineer_prompt("@id abc123") - assert result["run_id"] == "abc123" - assert result["prompt"] == "" - - def test_plain_text_with_session_manager(self): - """Plain text resolves latest run via session manager.""" - mock_sm = MagicMock() - mock_sm.get_history.return_value = [{"run_id": "latest456"}] - result = parse_engineer_prompt("fix the auth handler", session_manager=mock_sm) - assert result["run_id"] == "latest456" - assert result["prompt"] == "fix the auth handler" - assert result["is_tag_command"] is False - - def test_plain_text_no_session_manager(self): - """Plain text without session manager returns None run_id.""" - result = parse_engineer_prompt("fix the auth handler") - assert result["run_id"] is None - assert result["prompt"] == "fix the auth handler" - - def test_plain_text_empty_history(self): - """Plain text with empty history returns error.""" - mock_sm = MagicMock() - mock_sm.get_history.return_value = [] - result = parse_engineer_prompt("fix something", session_manager=mock_sm) - assert result["error"] == "no runs found in history" - - -class TestParseRecordOnlyTag: - """Test parse_record_only_tag function.""" - - def test_empty_string(self): - """Empty string returns empty prompt and False.""" - prompt, is_record = parse_record_only_tag("") - assert prompt == "" - assert is_record is False - - def test_no_tag(self): - """No tag returns original prompt.""" - prompt, is_record = parse_record_only_tag("capture the api") - assert prompt == "capture the api" - assert is_record is False - - def test_with_tag(self): - """@record-only tag is detected and removed.""" - prompt, is_record = parse_record_only_tag("@record-only capture traffic") - assert prompt == "capture traffic" - assert is_record is True - - def test_tag_case_insensitive(self): - """Tag is case insensitive.""" - prompt, is_record = parse_record_only_tag("@RECORD-ONLY capture") - assert is_record is True - assert prompt == "capture" - - def test_tag_at_end(self): - """Tag at end of prompt.""" - prompt, is_record = parse_record_only_tag("capture traffic @record-only") - assert is_record is True - assert prompt == "capture traffic" - - def test_none_input(self): - """None input returns empty and False.""" - prompt, is_record = parse_record_only_tag(None) - assert prompt == "" - assert is_record is False - - -class TestParseCodegenTag: - """Test parse_codegen_tag function.""" - - def test_empty_string(self): - """Empty string returns empty prompt and False.""" - prompt, is_codegen = parse_codegen_tag("") - assert prompt == "" - assert is_codegen is False - - def test_no_tag(self): - """No tag returns original prompt.""" - prompt, is_codegen = parse_codegen_tag("automate this") - assert prompt == "automate this" - assert is_codegen is False - - def test_with_tag(self): - """@codegen tag is detected and removed.""" - prompt, is_codegen = parse_codegen_tag("@codegen login flow") - assert prompt == "login flow" - assert is_codegen is True - - def test_tag_case_insensitive(self): - """Tag is case insensitive.""" - prompt, is_codegen = parse_codegen_tag("@CODEGEN login") - assert is_codegen is True - - def test_none_input(self): - """None input returns empty and False.""" - prompt, is_codegen = parse_codegen_tag(None) - assert prompt == "" - assert is_codegen is False - - class TestGenerateRunId: """Test generate_run_id function.""" @@ -422,17 +247,6 @@ def test_too_long_raises(self): get_docs_dir("x" * 65) -class TestGetActionsPath: - """Test get_actions_path.""" - - def test_returns_actions_json(self, tmp_path): - """Returns actions.json inside har dir.""" - with patch("reverse_api.utils.get_base_output_dir", return_value=tmp_path): - path = get_actions_path("run123") - assert path.name == "actions.json" - assert "har" in str(path) - - class TestGetCollectedDir: """Test get_collected_dir."""