diff --git a/src/reverse_api/auto_engineer.py b/src/reverse_api/auto_engineer.py index 71311d1..05191f8 100644 --- a/src/reverse_api/auto_engineer.py +++ b/src/reverse_api/auto_engineer.py @@ -37,6 +37,9 @@ def __init__( **kwargs, ): """Initialize auto engineer with expected HAR path (created by MCP).""" + # `headless` is auto-engineer specific (controls the MCP-spawned browser), + # not BaseEngineer concept; pop before super() to avoid an unknown kwarg. + headless = kwargs.pop("headless", False) har_dir = get_har_dir(run_id, output_dir) har_path = har_dir / "recording.har" @@ -50,6 +53,7 @@ def __init__( ) self.mcp_run_id = run_id self.agent_provider = agent_provider + self.headless = headless def _build_auto_prompts(self) -> tuple[str, str]: """Build (system_prompt, user_message) for auto mode. @@ -112,22 +116,35 @@ async def _handle_tool_permission(self, tool_name: str, input_data: dict[str, An return PermissionResultAllow(updated_input=input_data) def _get_mcp_config(self) -> tuple[str, dict]: - """Return (server_name, mcp_config) based on agent_provider.""" + """Return (server_name, mcp_config) based on agent_provider. + + Auto-connect requires a real headed Chrome instance with a remote + debugging server, so it is dropped in headless mode and the MCP + spawns its own headless Chromium instead. + """ if self.agent_provider == "chrome-mcp": + args = ["chrome-devtools-mcp@latest", "--no-usage-statistics"] + if self.headless: + args.append("--headless") + else: + args.append("--autoConnect") return "chrome-devtools", { "type": "stdio", "command": "npx", - "args": ["chrome-devtools-mcp@latest", "--autoConnect", "--no-usage-statistics"], + "args": args, } + playwright_args = [ + "rae-playwright-mcp@latest", + "run-mcp-server", + "--run-id", + self.mcp_run_id, + ] + if self.headless: + playwright_args.append("--headless") return "playwright", { "type": "stdio", "command": "npx", - "args": [ - "rae-playwright-mcp@latest", - "run-mcp-server", - "--run-id", - self.mcp_run_id, - ], + "args": playwright_args, } async def analyze_and_generate(self) -> dict[str, Any] | None: @@ -210,6 +227,7 @@ class OpenCodeAutoEngineer(OpenCodeEngineer): def __init__(self, run_id: str, prompt: str, output_dir: str | None = None, agent_provider: str = "auto", **kwargs): """Initialize auto engineer with expected HAR path (created by MCP).""" + headless = kwargs.pop("headless", False) har_dir = get_har_dir(run_id, output_dir) har_path = har_dir / "recording.har" @@ -223,36 +241,50 @@ def __init__(self, run_id: str, prompt: str, output_dir: str | None = None, agen self.mcp_run_id = run_id self.agent_provider = agent_provider self.mcp_name = None + self.headless = headless def _get_active_prompts(self) -> tuple[str, str]: return ClaudeAutoEngineer._build_auto_prompts(self) def _get_opencode_mcp_config(self) -> dict: - """Return OpenCode MCP registration payload based on agent_provider.""" + """Return OpenCode MCP registration payload based on agent_provider. + + Auto-connect requires a headed Chrome with a remote debugging server, + so it is dropped in headless mode in favor of an MCP-spawned headless + Chromium. + """ if self.agent_provider == "chrome-mcp": self.mcp_name = f"chrome-devtools-{self._session_id}" + cmd = ["npx", "-y", "chrome-devtools-mcp@latest", "--no-usage-statistics"] + if self.headless: + cmd.append("--headless") + else: + cmd.append("--autoConnect") return { "name": self.mcp_name, "config": { "type": "local", - "command": ["npx", "-y", "chrome-devtools-mcp@latest", "--autoConnect", "--no-usage-statistics"], + "command": cmd, "enabled": True, "timeout": 30000, }, } self.mcp_name = f"playwright-{self._session_id}" + cmd = [ + "npx", + "-y", + "rae-playwright-mcp@latest", + "run-mcp-server", + "--run-id", + self.mcp_run_id, + ] + if self.headless: + cmd.append("--headless") return { "name": self.mcp_name, "config": { "type": "local", - "command": [ - "npx", - "-y", - "rae-playwright-mcp@latest", - "run-mcp-server", - "--run-id", - self.mcp_run_id, - ], + "command": cmd, "enabled": True, "timeout": 30000, }, @@ -450,6 +482,7 @@ def __init__( ): from .copilot_engineer import CopilotEngineer + headless = kwargs.pop("headless", False) har_dir = get_har_dir(run_id, output_dir) har_path = har_dir / "recording.har" @@ -463,6 +496,7 @@ def __init__( ) self.mcp_run_id = run_id self.agent_provider = agent_provider + self.headless = headless def start_sync(self) -> None: self._engineer.start_sync() @@ -526,25 +560,33 @@ def on_event(event: Any) -> None: if self.agent_provider == "chrome-mcp": mcp_server_name = "chrome-devtools" + chrome_args = ["-y", "chrome-devtools-mcp@latest", "--no-usage-statistics"] + if self.headless: + chrome_args.append("--headless") + else: + chrome_args.append("--autoConnect") mcp_config = { "type": "local", "command": "npx", - "args": ["-y", "chrome-devtools-mcp@latest", "--autoConnect", "--no-usage-statistics"], + "args": chrome_args, "tools": ["*"], "timeout": 30000, } else: mcp_server_name = "playwright" + pw_args = [ + "-y", + "rae-playwright-mcp@latest", + "run-mcp-server", + "--run-id", + self.mcp_run_id, + ] + if self.headless: + pw_args.append("--headless") mcp_config = { "type": "local", "command": "npx", - "args": [ - "-y", - "rae-playwright-mcp@latest", - "run-mcp-server", - "--run-id", - self.mcp_run_id, - ], + "args": pw_args, "tools": ["*"], "timeout": 30000, } diff --git a/src/reverse_api/base_engineer.py b/src/reverse_api/base_engineer.py index dba7d56..0f83ae1 100644 --- a/src/reverse_api/base_engineer.py +++ b/src/reverse_api/base_engineer.py @@ -42,6 +42,7 @@ def __init__( is_fresh: bool = False, output_language: str = "python", output_mode: str = "client", + interactive: bool = True, ): self.run_id = run_id self.har_path = har_path @@ -67,6 +68,10 @@ def __init__( self.sync_watcher: FileSyncWatcher | None = None self.local_scripts_dir: Path | None = None self._stderr_error_shown = False + # When False, _prompt_follow_up() returns None immediately so the + # conversation loop in subclasses ends after the first generation. + # Set this from --json / --no-interactive entry points. + self.interactive = interactive def _handle_cli_stderr(self, line: str) -> None: """Filter CLI subprocess stderr. Shows full output in DEBUG mode, otherwise shows a single clean error.""" @@ -279,9 +284,16 @@ async def _ask_user_interactive(self, questions: list[dict[str, Any]]) -> dict[s async def _prompt_follow_up(self) -> str | None: """Prompt user for a follow-up message. Returns None to finish. - Uses plain input() via executor instead of questionary to avoid - terminal state issues after the SDK subprocess exits. + In non-interactive mode (e.g. --json / --no-interactive) returns None + immediately so the conversation loop terminates after the first + generation. Otherwise uses plain input() via executor instead of + questionary to avoid terminal state issues after the SDK subprocess + exits. """ + if not self.interactive: + # Still flush sync so any partial output reaches disk before we exit. + self.flush_sync() + return None # Ensure all files are synced locally before waiting for user input self.flush_sync() self.ui.console.print() diff --git a/src/reverse_api/cli.py b/src/reverse_api/cli.py index 8cc0a14..be94944 100644 --- a/src/reverse_api/cli.py +++ b/src/reverse_api/cli.py @@ -119,6 +119,36 @@ def _build_agent_payload( "error": final_error, } + +def _build_engineer_payload( + result: dict | None, + *, + run_id: str, + prompt: str | None, + fresh: bool, + error: str | None = None, +) -> dict: + """Normalize an engineer-mode result into a stable JSON shape. + + Mirrors `_build_agent_payload`'s contract minus the agent-specific fields + (no `url`, no `mode`, no `har_path`). `prompt` is the value the user passed + to --prompt (which may have been used as either a full replacement or as + additional instructions depending on --fresh). + """ + result = result if result is not None else {} + inner_error = result.get("error") if isinstance(result, dict) else None + final_error = error or inner_error or (None if result else "engineering produced no result") + return { + "schema_version": AGENT_JSON_SCHEMA_VERSION, + "status": "error" if final_error else "ok", + "run_id": run_id, + "prompt": prompt, + "fresh": fresh, + "script_path": result.get("script_path") if isinstance(result, dict) else None, + "usage": (result.get("usage") if isinstance(result, dict) else None) or {}, + "error": final_error, + } + # Mode definitions MODES = ["agent", "manual", "engineer", "collector"] MODE_DESCRIPTIONS = { @@ -391,8 +421,23 @@ def main(ctx: click.Context): Run without a subcommand to start the interactive REPL; use agent, manual, or engineer for CLI mode. + + Most subcommands accept --json and --no-interactive for scripted use; see + ` --help` for per-command details. """ if ctx.invoked_subcommand is None: + # Refuse to drop into the prompt_toolkit REPL when stdin is not a TTY: + # without an interactive terminal the REPL would block on stdin + # forever (e.g. CI invocations, agent wrappers that forgot the + # subcommand). Print --help to stderr and exit 2 (misuse). + if not sys.stdin.isatty(): + click.echo(ctx.get_help(), err=True) + click.echo( + "\nerror: no subcommand given and stdin is not a TTY; " + "the interactive REPL requires a terminal.", + err=True, + ) + sys.exit(2) repl_loop() @@ -991,7 +1036,18 @@ def handle_messages(run_id: str, mode_color=THEME_PRIMARY): ) @click.option("--output-dir", "-o", default=None, help="Custom output directory.") def manual(prompt, url, reverse_engineer, model, output_dir): - """Start a manual browser session.""" + """Start a manual browser session. + + \b + Interactive only — this mode requires a human in front of a real, + visible browser to navigate, click, and submit forms. It cannot be + driven by an agent or run on a headless machine. There is no + --headless / --json / --no-interactive flag here on purpose. + + For scripted / agent / CI use cases, use `agent --json --headless` + instead, which runs an autonomous AI-driven capture without needing + a human or an X server. + """ run_manual_capture(prompt, url, reverse_engineer, model, output_dir) @@ -1044,7 +1100,12 @@ def manual(prompt, url, reverse_engineer, model, output_dir): is_flag=True, help="Emit a single JSON result on stdout (logs go to stderr). Implies --no-interactive.", ) -def agent(prompt, url, model, output_dir, no_interactive, as_json): +@click.option( + "--headless", + is_flag=True, + help="Launch the MCP-controlled browser in headless mode (required on machines without an X server). For chrome-mcp this drops --autoConnect since auto-connect requires a headed Chrome instance.", +) +def agent(prompt, url, model, output_dir, no_interactive, as_json, headless): """Run autonomous agent browser session. Agent mode runs an integrated capture + reverse-engineering pipeline; if you @@ -1066,8 +1127,19 @@ def agent(prompt, url, model, output_dir, no_interactive, as_json): click.echo("error: --prompt is required when --no-interactive is set", err=True) sys.exit(2) + # Either flag must suppress the post-generation follow-up prompt that would + # otherwise block on stdin (`input(" > ")`) inside ClaudeAutoEngineer. + interactive = not (as_json or no_interactive) + if not as_json: - run_agent_capture(prompt=prompt, url=url, model=model, output_dir=output_dir) + run_agent_capture( + prompt=prompt, + url=url, + model=model, + output_dir=output_dir, + interactive=interactive, + headless=headless, + ) return payload: dict @@ -1078,6 +1150,8 @@ def agent(prompt, url, model, output_dir, no_interactive, as_json): url=url, model=model, output_dir=output_dir, + interactive=interactive, + headless=headless, ) payload = _build_agent_payload(result, prompt=prompt, url=url, output_dir=output_dir) except KeyboardInterrupt: @@ -1153,7 +1227,7 @@ def run_manual_capture(prompt=None, url=None, reverse_engineer=True, model=None, console.print(f" [dim]>[/dim] [dim]use 'reverse-api-engineer engineer {run_id}' to engineer later[/dim]\n") -def run_agent_capture(prompt=None, url=None, model=None, output_dir=None): +def run_agent_capture(prompt=None, url=None, model=None, output_dir=None, interactive=True, headless=False): """Shared logic for agent capture mode.""" output_dir = output_dir or config_manager.get("output_dir") @@ -1177,6 +1251,8 @@ def run_agent_capture(prompt=None, url=None, model=None, output_dir=None): model=model, output_dir=output_dir, agent_provider=agent_provider, + interactive=interactive, + headless=headless, ) @@ -1229,7 +1305,15 @@ def run_collector(prompt=None, model=None, output_dir=None): traceback.print_exc() -def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_provider="auto"): +def run_auto_capture( + prompt=None, + url=None, + model=None, + output_dir=None, + agent_provider="auto", + interactive=True, + headless=False, +): """Auto mode: LLM-driven browser automation + real-time reverse engineering.""" output_dir = output_dir or config_manager.get("output_dir") @@ -1246,7 +1330,7 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p url = options.get("url") model = options["model"] - if agent_provider == "chrome-mcp": + if agent_provider == "chrome-mcp" and not headless: console.print() console.print(" [dim]chrome devtools mcp (auto-connect)[/dim]") console.print(" [dim]controlling your real chrome browser[/dim]") @@ -1261,6 +1345,11 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p console.print(" [dim]warning: the agent will execute actions on your browser[/dim]") console.print(" [dim]avoid browsing sensitive sites during the session[/dim]") console.print() + elif agent_provider == "chrome-mcp" and headless: + console.print() + console.print(" [dim]chrome devtools mcp (headless)[/dim]") + console.print(" [dim]auto-connect disabled — mcp will spawn its own headless chrome[/dim]") + console.print() run_id = generate_run_id() timestamp = get_timestamp() @@ -1293,6 +1382,8 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p enable_sync=config_manager.get("real_time_sync", False), sdk=sdk, output_language=output_language, + interactive=interactive, + headless=headless, ) elif sdk == "copilot": from .auto_engineer import CopilotAutoEngineer @@ -1306,6 +1397,8 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p enable_sync=config_manager.get("real_time_sync", False), sdk=sdk, output_language=output_language, + interactive=interactive, + headless=headless, ) else: from .auto_engineer import ClaudeAutoEngineer @@ -1319,6 +1412,8 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p enable_sync=config_manager.get("real_time_sync", False), sdk=sdk, output_language=output_language, + interactive=interactive, + headless=headless, ) # Start sync before analysis @@ -1366,7 +1461,34 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p -@main.command() +@main.command( + epilog="""\b +Examples: + reverse-api-engineer engineer abc123def456 + reverse-api-engineer engineer abc123def456 -p "add pagination support" + reverse-api-engineer engineer abc123def456 --fresh -p "extract auth flow only" + reverse-api-engineer engineer abc123def456 --json | jq + +\b +JSON output schema (--json): + { + "schema_version": 1, + "status": "ok" | "error", + "run_id": "", + "prompt": "..." | null, + "fresh": false, + "script_path": "/abs/path/api_client.py" | null, + "usage": { "input_tokens": ..., "output_tokens": ..., "total_cost": ... }, + "error": "" | null + } + +\b +Exit codes: + 0 success + 1 runtime error (engineering failed, run not found) + 2 misuse +""" +) @click.argument("run_id") @click.option( "--prompt", @@ -1389,20 +1511,64 @@ def run_auto_capture(prompt=None, url=None, model=None, output_dir=None, agent_p default=None, ) @click.option("--output-dir", "-o", default=None, help="Custom output directory.") -def engineer(run_id, prompt, fresh, model, output_dir): +@click.option( + "--no-interactive", + is_flag=True, + help="Reserved for symmetry with `agent`/`run`; engineer mode is non-interactive by design.", +) +@click.option( + "--json", + "as_json", + is_flag=True, + help="Emit a single JSON result on stdout (logs go to stderr). Implies --no-interactive.", +) +def engineer(run_id, prompt, fresh, model, output_dir, no_interactive, as_json): """Run reverse engineering on a previous run.""" # --fresh treats --prompt as a full replacement of the original goal; # without --fresh, --prompt is additive so the captured run's context is preserved. main_prompt = prompt if fresh else None additional = prompt if (prompt and not fresh) else None - run_engineer( - run_id, - prompt=main_prompt, - additional_instructions=additional, - model=model, - output_dir=output_dir, - is_fresh=fresh, - ) + # Either flag must suppress the post-generation follow-up prompt that + # would otherwise block on stdin (`input(" > ")`) inside ClaudeEngineer. + interactive = not (as_json or no_interactive) + + if not as_json: + run_engineer( + run_id, + prompt=main_prompt, + additional_instructions=additional, + model=model, + output_dir=output_dir, + is_fresh=fresh, + interactive=interactive, + ) + return + + payload: dict + with _quiet_consoles_for_json() as real_stdout: + try: + result = run_engineer( + run_id, + prompt=main_prompt, + additional_instructions=additional, + model=model, + output_dir=output_dir, + is_fresh=fresh, + interactive=interactive, + ) + payload = _build_engineer_payload(result, run_id=run_id, prompt=prompt, fresh=fresh) + except KeyboardInterrupt: + payload = _build_engineer_payload( + None, run_id=run_id, prompt=prompt, fresh=fresh, error="interrupted" + ) + except Exception as e: + payload = _build_engineer_payload( + None, run_id=run_id, prompt=prompt, fresh=fresh, error=str(e) + ) + + real_stdout.write(json.dumps(payload) + "\n") + real_stdout.flush() + sys.exit(0 if payload["status"] == "ok" else 1) def run_engineer( @@ -1414,6 +1580,7 @@ def run_engineer( additional_instructions=None, is_fresh=False, output_mode="client", + interactive=True, ): """Shared logic for reverse engineering.""" if not har_path or not prompt: @@ -1455,6 +1622,7 @@ def run_engineer( is_fresh=is_fresh, output_language=output_language, output_mode=output_mode, + interactive=interactive, ) elif sdk == "copilot": result = run_reverse_engineering( @@ -1470,6 +1638,7 @@ def run_engineer( is_fresh=is_fresh, output_language=output_language, output_mode=output_mode, + interactive=interactive, ) else: result = run_reverse_engineering( @@ -1484,6 +1653,7 @@ def run_engineer( is_fresh=is_fresh, output_language=output_language, output_mode=output_mode, + interactive=interactive, ) if result: diff --git a/src/reverse_api/engineer.py b/src/reverse_api/engineer.py index b958f4f..2a11792 100644 --- a/src/reverse_api/engineer.py +++ b/src/reverse_api/engineer.py @@ -214,6 +214,7 @@ def run_reverse_engineering( is_fresh: bool = False, output_language: str = "python", output_mode: str = "client", + interactive: bool = True, ) -> dict[str, Any] | None: """Run reverse engineering with the specified SDK. @@ -245,6 +246,7 @@ def run_reverse_engineering( is_fresh=is_fresh, output_language=output_language, output_mode=output_mode, + interactive=interactive, ) elif sdk == "copilot": from .copilot_engineer import CopilotEngineer @@ -263,6 +265,7 @@ def run_reverse_engineering( output_language=output_language, output_mode=output_mode, copilot_model=copilot_model, + interactive=interactive, ) else: engineer = ClaudeEngineer( @@ -278,6 +281,7 @@ def run_reverse_engineering( is_fresh=is_fresh, output_language=output_language, output_mode=output_mode, + interactive=interactive, ) # Start sync before analysis diff --git a/tests/test_cli_followups.py b/tests/test_cli_followups.py new file mode 100644 index 0000000..c4d99af --- /dev/null +++ b/tests/test_cli_followups.py @@ -0,0 +1,412 @@ +"""Tests for the issue #62 follow-up items. + +Covers: +1. TTY-detect at REPL entry (no subcommand + non-TTY stdin → exit 2 + help) +2. `engineer --json` / `--no-interactive` parity with `agent --json` +""" + +import json +import subprocess +from unittest.mock import patch + +from click.testing import CliRunner + +from reverse_api.cli import ( + AGENT_JSON_SCHEMA_VERSION, + _build_engineer_payload, + engineer, + main, +) + +EXPECTED_ENGINEER_KEYS = { + "schema_version", + "status", + "run_id", + "prompt", + "fresh", + "script_path", + "usage", + "error", +} + + +# --------------------------------------------------------------------------- +# Item #1: TTY-detect at REPL entry +# --------------------------------------------------------------------------- + + +class TestTtyDetectionAtReplEntry: + """Without a TTY and no subcommand, the REPL must NOT block on prompt_toolkit.""" + + def test_no_tty_no_subcommand_exits_2(self): + """End-to-end: invoke the installed binary with stdin redirected from /dev/null.""" + result = subprocess.run( + ["uv", "run", "reverse-api-engineer"], + stdin=subprocess.DEVNULL, + capture_output=True, + text=True, + timeout=10, + ) + assert result.returncode == 2 + assert "stdin is not a TTY" in result.stderr + # Help is printed too so a wrapper can self-discover the subcommands + assert "agent" in result.stderr + assert "engineer" in result.stderr + + def test_no_tty_with_subcommand_does_not_trip(self): + """Subcommands work fine without a TTY (this is the whole point of agent --json).""" + result = subprocess.run( + ["uv", "run", "reverse-api-engineer", "list", "--json"], + stdin=subprocess.DEVNULL, + capture_output=True, + text=True, + timeout=10, + ) + # list --json on empty history should succeed with [] + assert result.returncode == 0 + # Nothing about TTY in the output + assert "TTY" not in result.stderr + + +# --------------------------------------------------------------------------- +# Item #2: engineer --json +# --------------------------------------------------------------------------- + + +class TestBuildEngineerPayload: + """Stable shape for the `engineer --json` payload.""" + + def test_success_shape(self): + payload = _build_engineer_payload( + {"script_path": "/abs/api_client.py", "usage": {"total_cost": 0.001}}, + run_id="abc123", + prompt="add pagination", + fresh=False, + ) + assert payload["schema_version"] == AGENT_JSON_SCHEMA_VERSION + assert payload["status"] == "ok" + assert payload["run_id"] == "abc123" + assert payload["prompt"] == "add pagination" + assert payload["fresh"] is False + assert payload["script_path"] == "/abs/api_client.py" + assert payload["usage"]["total_cost"] == 0.001 + assert payload["error"] is None + assert set(payload.keys()) == EXPECTED_ENGINEER_KEYS + + def test_none_result_is_error(self): + payload = _build_engineer_payload(None, run_id="abc", prompt=None, fresh=False) + assert payload["status"] == "error" + assert payload["error"] + assert set(payload.keys()) == EXPECTED_ENGINEER_KEYS + + def test_explicit_error_overrides(self): + payload = _build_engineer_payload( + {"script_path": "/x.py"}, run_id="abc", prompt=None, fresh=False, error="boom" + ) + assert payload["status"] == "error" + assert payload["error"] == "boom" + + def test_inner_error_propagates(self): + payload = _build_engineer_payload( + {"error": "engine crashed"}, run_id="abc", prompt=None, fresh=False + ) + assert payload["status"] == "error" + assert payload["error"] == "engine crashed" + + +class TestEngineerCommandJson: + """`engineer` click command --json wiring.""" + + def test_json_emits_payload_on_success(self): + runner = CliRunner() + fake_result = {"script_path": "/abs/api_client.py", "usage": {"total_cost": 0.0}} + with patch("reverse_api.cli.run_engineer", return_value=fake_result): + result = runner.invoke(engineer, ["abc123", "--json"]) + assert result.exit_code == 0, result.output + payload = json.loads(result.stdout.strip().splitlines()[-1]) + assert payload["status"] == "ok" + assert payload["run_id"] == "abc123" + assert payload["script_path"] == "/abs/api_client.py" + assert set(payload.keys()) == EXPECTED_ENGINEER_KEYS + + def test_json_emits_error_on_not_found(self): + runner = CliRunner() + # run_engineer returns None when the run can't be located + with patch("reverse_api.cli.run_engineer", return_value=None): + result = runner.invoke(engineer, ["doesnotexist", "--json"]) + assert result.exit_code == 1 + payload = json.loads(result.stdout.strip().splitlines()[-1]) + assert payload["status"] == "error" + assert payload["run_id"] == "doesnotexist" + assert payload["error"] + assert set(payload.keys()) == EXPECTED_ENGINEER_KEYS + + def test_json_emits_error_on_keyboard_interrupt(self): + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", side_effect=KeyboardInterrupt): + result = runner.invoke(engineer, ["abc123", "--json"]) + assert result.exit_code == 1 + payload = json.loads(result.stdout.strip().splitlines()[-1]) + assert payload["status"] == "error" + assert payload["error"] == "interrupted" + + def test_json_emits_error_on_exception(self): + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", side_effect=RuntimeError("boom")): + result = runner.invoke(engineer, ["abc123", "--json"]) + assert result.exit_code == 1 + payload = json.loads(result.stdout.strip().splitlines()[-1]) + assert payload["status"] == "error" + assert payload["error"] == "boom" + + def test_prompt_without_fresh_threaded_as_additional(self): + """--prompt without --fresh must reach run_engineer as additional_instructions + even on the JSON path (regression for the cubic-dev-ai catch on PR #63).""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}) as mock_run: + result = runner.invoke(engineer, ["abc123", "--json", "-p", "add pagination"]) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] is None + assert kwargs["additional_instructions"] == "add pagination" + assert kwargs["is_fresh"] is False + + def test_fresh_with_prompt_threaded_as_main(self): + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}) as mock_run: + result = runner.invoke( + engineer, ["abc123", "--json", "--fresh", "-p", "extract auth"] + ) + assert result.exit_code == 0, result.output + kwargs = mock_run.call_args.kwargs + assert kwargs["prompt"] == "extract auth" + assert kwargs["additional_instructions"] is None + assert kwargs["is_fresh"] is True + + def test_no_interactive_flag_accepted(self): + """--no-interactive is reserved for symmetry; should not crash.""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}): + result = runner.invoke(engineer, ["abc123", "--no-interactive"]) + assert result.exit_code == 0, result.output + + +class TestRootHelpMentionsScripted: + """Item #6 partial: root --help should advertise scripted features.""" + + def test_root_help_mentions_json_and_no_interactive(self): + runner = CliRunner() + result = runner.invoke(main, ["--help"]) + assert result.exit_code == 0 + assert "--json" in result.output + assert "--no-interactive" in result.output + + +# --------------------------------------------------------------------------- +# Follow-up suppression in non-interactive mode +# --------------------------------------------------------------------------- + + +class TestFollowUpPromptSuppressed: + """Regression for chatgpt-codex-connector PR #65 review (P2): + + `BaseEngineer._prompt_follow_up()` blocks on `input(" > ")` after the + first generation. In --json / --no-interactive mode the conversation loop + must terminate immediately so stdin is never read; otherwise scripted + invocations like `engineer --json | jq` hang before emitting the + payload. + """ + + def test_prompt_follow_up_returns_none_when_not_interactive(self, tmp_path): + """The base engineer's follow-up prompt short-circuits when + interactive=False, without ever touching stdin.""" + import asyncio + + from reverse_api.base_engineer import BaseEngineer + + class _Eng(BaseEngineer): + def _build_prompts(self): + return ("", "") + + async def analyze_and_generate(self): + return None + + har_path = tmp_path / "test.har" + har_path.touch() + + with patch("reverse_api.base_engineer.get_scripts_dir", return_value=tmp_path): + with patch("reverse_api.base_engineer.MessageStore"): + with patch("reverse_api.base_engineer.SessionManager") as mock_sm: + mock_sm.return_value.get_run.return_value = None + eng = _Eng( + run_id="test123", + har_path=har_path, + prompt="x", + interactive=False, + ) + + # Patch input() at the builtin level — the test fails loudly if it's + # ever reached, proving the short-circuit happens before stdin access. + def _explode(*_args, **_kwargs): + raise AssertionError("input() must not be called when interactive=False") + + with patch("builtins.input", side_effect=_explode): + result = asyncio.run(eng._prompt_follow_up()) + + assert result is None + + def test_engineer_default_interactive_true(self, tmp_path): + """Sanity: the default value of interactive on BaseEngineer is True + so existing REPL UX is unchanged.""" + from reverse_api.base_engineer import BaseEngineer + + class _Eng(BaseEngineer): + def _build_prompts(self): + return ("", "") + + async def analyze_and_generate(self): + return None + + har_path = tmp_path / "test.har" + har_path.touch() + + with patch("reverse_api.base_engineer.get_scripts_dir", return_value=tmp_path): + with patch("reverse_api.base_engineer.MessageStore"): + with patch("reverse_api.base_engineer.SessionManager") as mock_sm: + mock_sm.return_value.get_run.return_value = None + eng = _Eng(run_id="test123", har_path=har_path, prompt="x") + assert eng.interactive is True + + def test_engineer_command_threads_interactive_through_run_engineer(self): + """`engineer --json` must pass interactive=False to run_engineer so + BaseEngineer drops the follow-up loop in the SDK.""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}) as mock_run: + result = runner.invoke(engineer, ["abc123", "--json"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["interactive"] is False + + def test_engineer_command_no_interactive_threads_through(self): + """`engineer --no-interactive` (without --json) also disables follow-up.""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}) as mock_run: + result = runner.invoke(engineer, ["abc123", "--no-interactive"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["interactive"] is False + + def test_engineer_default_threads_interactive_true(self): + """Without flags, run_engineer is called with interactive=True + (preserves the current REPL UX).""" + runner = CliRunner() + with patch("reverse_api.cli.run_engineer", return_value={"script_path": "/x.py"}) as mock_run: + result = runner.invoke(engineer, ["abc123"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["interactive"] is True + + def test_agent_command_threads_interactive_through_run_agent_capture(self): + """`agent --json` must propagate interactive=False so + ClaudeAutoEngineer drops the follow-up loop too (same code path).""" + from reverse_api.cli import agent as agent_cmd + + runner = CliRunner() + with patch( + "reverse_api.cli.run_agent_capture", + return_value={"run_id": "abc", "mode": "auto", "script_path": None, "usage": {}}, + ) as mock_run: + result = runner.invoke(agent_cmd, ["--json", "-p", "x"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["interactive"] is False + + +# --------------------------------------------------------------------------- +# --headless flag (CI / VPS / scripted) +# --------------------------------------------------------------------------- + + +class TestHeadlessFlag: + """`--headless` wires through to auto-engineer constructors (agent only — + manual mode is intentionally human-only and rejects --headless).""" + + def test_manual_rejects_headless_flag(self): + """`manual --headless` must fail: manual mode requires a human and a + visible browser; agents should use `agent --headless` instead.""" + from reverse_api.cli import manual as manual_cmd + + runner = CliRunner() + result = runner.invoke(manual_cmd, ["-p", "x", "-u", "https://example.com", "--headless"]) + assert result.exit_code != 0 + assert "no such option" in result.output.lower() or "--headless" in result.output + + def test_manual_help_mentions_human_only(self): + """`manual --help` must make it explicit that the mode requires a + human and is not scriptable (so agents inspecting --help self-route + to `agent` instead).""" + from reverse_api.cli import manual as manual_cmd + + runner = CliRunner() + result = runner.invoke(manual_cmd, ["--help"]) + assert result.exit_code == 0 + out = result.output.lower() + assert "human" in out + assert "agent" in out # tells the agent where to go instead + + def test_agent_command_threads_headless(self): + from reverse_api.cli import agent as agent_cmd + + runner = CliRunner() + with patch("reverse_api.cli.run_agent_capture", return_value={"run_id": "x", "mode": "auto"}) as mock_run: + result = runner.invoke(agent_cmd, ["-p", "x", "--json", "--headless"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["headless"] is True + + def test_agent_default_headless_false(self): + from reverse_api.cli import agent as agent_cmd + + runner = CliRunner() + with patch("reverse_api.cli.run_agent_capture", return_value={"run_id": "x", "mode": "auto"}) as mock_run: + result = runner.invoke(agent_cmd, ["-p", "x", "--json"]) + assert result.exit_code == 0, result.output + assert mock_run.call_args.kwargs["headless"] is False + + +class TestHeadlessMcpConfig: + """ClaudeAutoEngineer._get_mcp_config arg construction for headed vs headless.""" + + def _make(self, agent_provider: str, headless: bool, tmp_path): + """Build a ClaudeAutoEngineer without invoking heavy init paths.""" + from reverse_api.auto_engineer import ClaudeAutoEngineer + + with patch("reverse_api.base_engineer.get_scripts_dir", return_value=tmp_path): + with patch("reverse_api.base_engineer.MessageStore"): + with patch("reverse_api.base_engineer.SessionManager"): + with patch("reverse_api.auto_engineer.get_har_dir", return_value=tmp_path): + eng = ClaudeAutoEngineer( + run_id="r", + prompt="x", + model="claude-sonnet-4-6", + agent_provider=agent_provider, + headless=headless, + ) + return eng + + def test_chrome_mcp_headed_uses_autoconnect(self, tmp_path): + eng = self._make("chrome-mcp", headless=False, tmp_path=tmp_path) + _, cfg = eng._get_mcp_config() + assert "--autoConnect" in cfg["args"] + assert "--headless" not in cfg["args"] + + def test_chrome_mcp_headless_drops_autoconnect_adds_headless(self, tmp_path): + eng = self._make("chrome-mcp", headless=True, tmp_path=tmp_path) + _, cfg = eng._get_mcp_config() + assert "--autoConnect" not in cfg["args"], "auto-connect cannot work without a real headed Chrome" + assert "--headless" in cfg["args"] + + def test_playwright_headless_adds_flag(self, tmp_path): + eng = self._make("auto", headless=True, tmp_path=tmp_path) + _, cfg = eng._get_mcp_config() + assert "--headless" in cfg["args"] + + def test_playwright_headed_no_headless_flag(self, tmp_path): + eng = self._make("auto", headless=False, tmp_path=tmp_path) + _, cfg = eng._get_mcp_config() + assert "--headless" not in cfg["args"]