From 273a8c1ddf7eb229311a1f003b1c070a270a500c Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 11:50:41 +0800 Subject: [PATCH 1/6] feat(ralph): TOCTOU fix, concurrent lock, dry-run mode - Replace verify_receipt + read_receipt_verdict with single read_and_verify_receipt that reads file once and returns validity + verdict together (eliminates TOCTOU race) - Add Python fcntl.flock-based concurrent run lock on fd 200 to prevent multiple ralph.sh instances on the same repo (degrades gracefully on Windows) - Add --dry-run flag that runs selector loop only, printing iter/status/epic/task info without Claude invocation, branch creation, or state changes Task: fn-4-flowctl-comprehensive-optimization-and.6 --- scripts/flowctl/commands/epic.py | 15 + scripts/flowctl/commands/review/__init__.py | 3 + .../flowctl/commands/review/adversarial.py | 40 +-- .../flowctl/commands/review/codex_utils.py | 86 +++++ scripts/flowctl/commands/review/commands.py | 302 +++--------------- scripts/flowctl/commands/task.py | 17 + scripts/flowctl/compat.py | 17 +- scripts/flowctl/core/git.py | 69 ++++ scripts/flowctl/core/io.py | 8 +- scripts/flowctl/core/state.py | 95 ++++-- scripts/flowctl/tests/__init__.py | 0 scripts/flowctl/tests/conftest.py | 98 ++++++ scripts/flowctl/tests/test_config.py | 131 ++++++++ scripts/flowctl/tests/test_ids.py | 272 ++++++++++++++++ scripts/flowctl/tests/test_io.py | 155 +++++++++ scripts/flowctl/tests/test_state.py | 178 +++++++++++ .../flow-code-ralph-init/templates/ralph.sh | 123 +++++-- 17 files changed, 1266 insertions(+), 343 deletions(-) create mode 100644 scripts/flowctl/tests/__init__.py create mode 100644 scripts/flowctl/tests/conftest.py create mode 100644 scripts/flowctl/tests/test_config.py create mode 100644 scripts/flowctl/tests/test_ids.py create mode 100644 scripts/flowctl/tests/test_io.py create mode 100644 scripts/flowctl/tests/test_state.py diff --git a/scripts/flowctl/commands/epic.py b/scripts/flowctl/commands/epic.py index 56c2f49e..474a6af0 100644 --- a/scripts/flowctl/commands/epic.py +++ b/scripts/flowctl/commands/epic.py @@ -196,6 +196,21 @@ def cmd_epic_set_plan(args: argparse.Namespace) -> None: # Read content from file or stdin content = read_file_or_stdin(args.file, "Input file", use_json=args.json) + # Validate spec headings: reject duplicate headings + headings = re.findall(r"^(##\s+.+?)\s*$", content, flags=re.MULTILINE) + seen = {} + duplicates = [] + for h in headings: + seen[h] = seen.get(h, 0) + 1 + for h, count in seen.items(): + if count > 1: + duplicates.append(f"Duplicate heading: {h} (found {count} times)") + if duplicates: + error_exit( + f"Spec validation failed: {'; '.join(duplicates)}", + use_json=args.json, + ) + # Write spec spec_path = flow_dir / SPECS_DIR / f"{args.id}.md" atomic_write(spec_path, content) diff --git a/scripts/flowctl/commands/review/__init__.py b/scripts/flowctl/commands/review/__init__.py index a859ec39..972b0f00 100644 --- a/scripts/flowctl/commands/review/__init__.py +++ b/scripts/flowctl/commands/review/__init__.py @@ -11,13 +11,16 @@ from flowctl.commands.review.codex_utils import ( # noqa: F401 CODEX_EFFORT_LEVELS, CODEX_SANDBOX_MODES, + delete_stale_receipt, get_codex_version, is_sandbox_failure, + load_receipt, parse_codex_thread_id, parse_codex_verdict, require_codex, resolve_codex_sandbox, run_codex_exec, + save_receipt, ) from flowctl.commands.review.prompts import ( # noqa: F401 build_completion_review_prompt, diff --git a/scripts/flowctl/commands/review/adversarial.py b/scripts/flowctl/commands/review/adversarial.py index c50bbba1..8ea076cd 100644 --- a/scripts/flowctl/commands/review/adversarial.py +++ b/scripts/flowctl/commands/review/adversarial.py @@ -2,15 +2,12 @@ import argparse import json -import os import re -import subprocess from pathlib import Path from typing import Optional -from flowctl.core.git import get_changed_files, get_embedded_file_contents +from flowctl.core.git import get_changed_files, get_diff_context, get_embedded_file_contents from flowctl.core.io import error_exit, json_output -from flowctl.core.paths import get_repo_root from flowctl.commands.review.codex_utils import ( run_codex_exec, @@ -108,39 +105,8 @@ def cmd_codex_adversarial(args: argparse.Namespace) -> None: base_branch = args.base focus = getattr(args, "focus", None) - # Get diff - diff_summary = "" - diff_content = "" - try: - result = subprocess.run( - ["git", "diff", "--stat", f"{base_branch}..HEAD"], - capture_output=True, text=True, cwd=get_repo_root(), - ) - if result.returncode == 0: - diff_summary = result.stdout.strip() - except (subprocess.CalledProcessError, OSError): - pass - - max_diff_bytes = 50000 - try: - proc = subprocess.Popen( - ["git", "diff", f"{base_branch}..HEAD"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=get_repo_root(), - ) - diff_bytes = proc.stdout.read(max_diff_bytes + 1) - was_truncated = len(diff_bytes) > max_diff_bytes - if was_truncated: - diff_bytes = diff_bytes[:max_diff_bytes] - while proc.stdout.read(65536): - pass - proc.stdout.close() - proc.stderr.close() - proc.wait() - diff_content = diff_bytes.decode("utf-8", errors="replace").strip() - if was_truncated: - diff_content += "\n\n... [diff truncated at 50KB]" - except (subprocess.CalledProcessError, OSError): - pass + # Get diff summary + content via shared helper + diff_summary, diff_content = get_diff_context(base_branch) if not diff_summary and not diff_content: error_exit(f"No changes found between {base_branch} and HEAD", use_json=args.json) diff --git a/scripts/flowctl/commands/review/codex_utils.py b/scripts/flowctl/commands/review/codex_utils.py index 65cfe538..f4385162 100644 --- a/scripts/flowctl/commands/review/codex_utils.py +++ b/scripts/flowctl/commands/review/codex_utils.py @@ -67,6 +67,92 @@ def get_codex_version() -> Optional[str]: return None +# ───────────────────────────────────────────────────────────────────────────── +# Receipt lifecycle helpers +# ───────────────────────────────────────────────────────────────────────────── + + +def load_receipt(path: Optional[str]) -> tuple[Optional[str], bool]: + """Load a review receipt and extract session info for re-reviews. + + Args: + path: Receipt file path (may be None). + + Returns: + tuple: (session_id, is_rereview) + - session_id: Codex thread ID from previous review, or None. + - is_rereview: True if a valid session was found (indicates re-review). + """ + if not path: + return None, False + receipt_file = Path(path) + if not receipt_file.exists(): + return None, False + try: + receipt_data = json.loads(receipt_file.read_text(encoding="utf-8")) + session_id = receipt_data.get("session_id") + return session_id, session_id is not None + except (json.JSONDecodeError, Exception): + return None, False + + +def save_receipt( + path: str, + *, + review_type: str, + review_id: str, + mode: str = "codex", + verdict: str, + session_id: Optional[str], + output: str, + base_branch: Optional[str] = None, + focus: Optional[str] = None, +) -> None: + """Write a Ralph-compatible review receipt to *path*. + + Automatically includes the current RALPH_ITERATION env var if set. + """ + receipt_data: dict[str, Any] = { + "type": review_type, + "id": review_id, + "mode": mode, + "verdict": verdict, + "session_id": session_id, + "timestamp": now_iso(), + "review": output, + } + if base_branch is not None: + receipt_data["base"] = base_branch + if focus is not None: + receipt_data["focus"] = focus + + # Add iteration if running under Ralph + ralph_iter = os.environ.get("RALPH_ITERATION") + if ralph_iter: + try: + receipt_data["iteration"] = int(ralph_iter) + except ValueError: + pass + + Path(path).write_text( + json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8" + ) + + +def delete_stale_receipt(path: Optional[str]) -> None: + """Delete a receipt file if it exists (best-effort). + + Used to clear stale receipts on review failure so they don't + falsely satisfy downstream gates. + """ + if not path: + return + try: + Path(path).unlink(missing_ok=True) + except OSError: + pass + + CODEX_SANDBOX_MODES = {"read-only", "workspace-write", "danger-full-access", "auto"} diff --git a/scripts/flowctl/commands/review/commands.py b/scripts/flowctl/commands/review/commands.py index 192d62b5..d6bc9b68 100644 --- a/scripts/flowctl/commands/review/commands.py +++ b/scripts/flowctl/commands/review/commands.py @@ -1,37 +1,33 @@ """Review commands: check, impl-review, plan-review, completion-review.""" import argparse -import json -import os -import re -import subprocess +import shutil +import sys from pathlib import Path -from typing import Any, Optional -from flowctl.core.constants import EPICS_DIR, SPECS_DIR, TASKS_DIR +from flowctl.core.constants import SPECS_DIR, TASKS_DIR from flowctl.core.git import ( gather_context_hints, get_changed_files, + get_diff_context, get_embedded_file_contents, ) from flowctl.core.ids import is_epic_id, is_task_id from flowctl.core.io import ( - atomic_write, - atomic_write_json, error_exit, json_output, - load_json, - load_json_or_exit, - now_iso, ) from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root from flowctl.commands.review.codex_utils import ( - run_codex_exec, + delete_stale_receipt, + get_codex_version, + is_sandbox_failure, + load_receipt, parse_codex_verdict, - parse_codex_thread_id, resolve_codex_sandbox, - is_sandbox_failure, + run_codex_exec, + save_receipt, CODEX_EFFORT_LEVELS, ) from flowctl.commands.review.prompts import ( @@ -84,53 +80,8 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: task_spec = task_spec_path.read_text(encoding="utf-8") - # Get diff summary (--stat) - use base..HEAD for committed changes only - diff_summary = "" - try: - diff_result = subprocess.run( - ["git", "diff", "--stat", f"{base_branch}..HEAD"], - capture_output=True, - text=True, - cwd=get_repo_root(), - ) - if diff_result.returncode == 0: - diff_summary = diff_result.stdout.strip() - except (subprocess.CalledProcessError, OSError): - pass - - # Get actual diff content with size cap (avoid memory spike on large diffs) - # Use base..HEAD for committed changes only (not working tree) - diff_content = "" - max_diff_bytes = 50000 - try: - proc = subprocess.Popen( - ["git", "diff", f"{base_branch}..HEAD"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=get_repo_root(), - ) - # Read only up to max_diff_bytes - diff_bytes = proc.stdout.read(max_diff_bytes + 1) - was_truncated = len(diff_bytes) > max_diff_bytes - if was_truncated: - diff_bytes = diff_bytes[:max_diff_bytes] - # Consume remaining stdout in chunks (avoid allocating the entire diff) - while proc.stdout.read(65536): - pass - stderr_bytes = proc.stderr.read() - proc.stdout.close() - proc.stderr.close() - returncode = proc.wait() - - if returncode != 0 and stderr_bytes: - # Include error info but don't fail - diff is optional context - diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]" - else: - diff_content = diff_bytes.decode("utf-8", errors="replace").strip() - if was_truncated: - diff_content += "\n\n... [diff truncated at 50KB]" - except (subprocess.CalledProcessError, OSError): - pass + # Get diff summary + content via shared helper + diff_summary, diff_content = get_diff_context(base_branch) # Always embed changed file contents so Codex doesn't waste turns reading # files from disk. Without embedding, Codex exhausts its turn budget on @@ -162,17 +113,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: # Check for existing session in receipt (indicates re-review) receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None - session_id = None - is_rereview = False - if receipt_path: - receipt_file = Path(receipt_path) - if receipt_file.exists(): - try: - receipt_data = json.loads(receipt_file.read_text(encoding="utf-8")) - session_id = receipt_data.get("session_id") - is_rereview = session_id is not None - except (json.JSONDecodeError, Exception): - pass + session_id, is_rereview = load_receipt(receipt_path) # For re-reviews, prepend instruction to re-read changed files if is_rereview: @@ -197,12 +138,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: # Check for sandbox failures (clear stale receipt and exit) if is_sandbox_failure(exit_code, output, stderr): - # Clear any stale receipt to prevent false gate satisfaction - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass # Best effort - proceed to error_exit regardless + delete_stale_receipt(receipt_path) msg = ( "Codex sandbox blocked operations. " "Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access" @@ -211,12 +147,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: # Handle non-sandbox failures if exit_code != 0: - # Clear any stale receipt to prevent false gate satisfaction - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) msg = (stderr or output or "codex exec failed").strip() error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2) @@ -225,12 +156,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: # Fail if no verdict found (don't let UNKNOWN pass as success) if not verdict: - # Clear any stale receipt - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) error_exit( "Codex review completed but no verdict found in output. " "Expected SHIP or NEEDS_WORK", @@ -241,29 +167,17 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None: # Determine review id (task_id for task reviews, "branch" for standalone) review_id = task_id if task_id else "branch" - # Write receipt if path provided (Ralph-compatible schema) + # Write receipt if path provided if receipt_path: - receipt_data = { - "type": "impl_review", # Required by Ralph - "id": review_id, # Required by Ralph - "mode": "codex", - "base": base_branch, - "verdict": verdict, - "session_id": thread_id, - "timestamp": now_iso(), - "review": output, # Full review feedback for fix loop - } - # Add iteration if running under Ralph - ralph_iter = os.environ.get("RALPH_ITERATION") - if ralph_iter: - try: - receipt_data["iteration"] = int(ralph_iter) - except ValueError: - pass - if focus: - receipt_data["focus"] = focus - Path(receipt_path).write_text( - json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8" + save_receipt( + receipt_path, + review_type="impl_review", + review_id=review_id, + verdict=verdict, + session_id=thread_id, + output=output, + base_branch=base_branch, + focus=focus, ) # Output @@ -377,17 +291,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: # Check for existing session in receipt (indicates re-review) receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None - session_id = None - is_rereview = False - if receipt_path: - receipt_file = Path(receipt_path) - if receipt_file.exists(): - try: - receipt_data = json.loads(receipt_file.read_text(encoding="utf-8")) - session_id = receipt_data.get("session_id") - is_rereview = session_id is not None - except (json.JSONDecodeError, Exception): - pass + session_id, is_rereview = load_receipt(receipt_path) # For re-reviews, prepend instruction to re-read spec files if is_rereview: @@ -415,12 +319,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: # Check for sandbox failures (clear stale receipt and exit) if is_sandbox_failure(exit_code, output, stderr): - # Clear any stale receipt to prevent false gate satisfaction - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass # Best effort - proceed to error_exit regardless + delete_stale_receipt(receipt_path) msg = ( "Codex sandbox blocked operations. " "Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access" @@ -429,12 +328,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: # Handle non-sandbox failures if exit_code != 0: - # Clear any stale receipt to prevent false gate satisfaction - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) msg = (stderr or output or "codex exec failed").strip() error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2) @@ -443,12 +337,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: # Fail if no verdict found (don't let UNKNOWN pass as success) if not verdict: - # Clear any stale receipt - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) error_exit( "Codex review completed but no verdict found in output. " "Expected SHIP or NEEDS_WORK", @@ -456,26 +345,15 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: code=2, ) - # Write receipt if path provided (Ralph-compatible schema) + # Write receipt if path provided if receipt_path: - receipt_data = { - "type": "plan_review", # Required by Ralph - "id": epic_id, # Required by Ralph - "mode": "codex", - "verdict": verdict, - "session_id": thread_id, - "timestamp": now_iso(), - "review": output, # Full review feedback for fix loop - } - # Add iteration if running under Ralph - ralph_iter = os.environ.get("RALPH_ITERATION") - if ralph_iter: - try: - receipt_data["iteration"] = int(ralph_iter) - except ValueError: - pass - Path(receipt_path).write_text( - json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8" + save_receipt( + receipt_path, + review_type="plan_review", + review_id=epic_id, + verdict=verdict, + session_id=thread_id, + output=output, ) # Output @@ -533,49 +411,8 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Get base branch for diff (default to main) base_branch = args.base if hasattr(args, "base") and args.base else "main" - # Get diff summary - diff_summary = "" - try: - diff_result = subprocess.run( - ["git", "diff", "--stat", f"{base_branch}..HEAD"], - capture_output=True, - text=True, - cwd=get_repo_root(), - ) - if diff_result.returncode == 0: - diff_summary = diff_result.stdout.strip() - except (subprocess.CalledProcessError, OSError): - pass - - # Get actual diff content with size cap - diff_content = "" - max_diff_bytes = 50000 - try: - proc = subprocess.Popen( - ["git", "diff", f"{base_branch}..HEAD"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=get_repo_root(), - ) - diff_bytes = proc.stdout.read(max_diff_bytes + 1) - was_truncated = len(diff_bytes) > max_diff_bytes - if was_truncated: - diff_bytes = diff_bytes[:max_diff_bytes] - while proc.stdout.read(65536): - pass - stderr_bytes = proc.stderr.read() - proc.stdout.close() - proc.stderr.close() - returncode = proc.wait() - - if returncode != 0 and stderr_bytes: - diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]" - else: - diff_content = diff_bytes.decode("utf-8", errors="replace").strip() - if was_truncated: - diff_content += "\n\n... [diff truncated at 50KB]" - except (subprocess.CalledProcessError, OSError): - pass + # Get diff summary + content via shared helper + diff_summary, diff_content = get_diff_context(base_branch) # Always embed changed file contents. See cmd_codex_impl_review comment # for rationale. @@ -595,17 +432,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Check for existing session in receipt (indicates re-review) receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None - session_id = None - is_rereview = False - if receipt_path: - receipt_file = Path(receipt_path) - if receipt_file.exists(): - try: - receipt_data = json.loads(receipt_file.read_text(encoding="utf-8")) - session_id = receipt_data.get("session_id") - is_rereview = session_id is not None - except (json.JSONDecodeError, Exception): - pass + session_id, is_rereview = load_receipt(receipt_path) # For re-reviews, prepend instruction to re-read changed files if is_rereview: @@ -630,11 +457,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Check for sandbox failures if is_sandbox_failure(exit_code, output, stderr): - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) msg = ( "Codex sandbox blocked operations. " "Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access" @@ -643,11 +466,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Handle non-sandbox failures if exit_code != 0: - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) msg = (stderr or output or "codex exec failed").strip() error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2) @@ -656,11 +475,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Fail if no verdict found if not verdict: - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass + delete_stale_receipt(receipt_path) error_exit( "Codex review completed but no verdict found in output. " "Expected SHIP or NEEDS_WORK", @@ -671,27 +486,16 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None: # Preserve session_id for continuity (avoid clobbering on resumed sessions) session_id_to_write = thread_id or session_id - # Write receipt if path provided (Ralph-compatible schema) + # Write receipt if path provided if receipt_path: - receipt_data = { - "type": "completion_review", # Required by Ralph - "id": epic_id, # Required by Ralph - "mode": "codex", - "base": base_branch, - "verdict": verdict, - "session_id": session_id_to_write, - "timestamp": now_iso(), - "review": output, # Full review feedback for fix loop - } - # Add iteration if running under Ralph - ralph_iter = os.environ.get("RALPH_ITERATION") - if ralph_iter: - try: - receipt_data["iteration"] = int(ralph_iter) - except ValueError: - pass - Path(receipt_path).write_text( - json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8" + save_receipt( + receipt_path, + review_type="completion_review", + review_id=epic_id, + verdict=verdict, + session_id=session_id_to_write, + output=output, + base_branch=base_branch, ) # Output diff --git a/scripts/flowctl/commands/task.py b/scripts/flowctl/commands/task.py index 219de50c..393fc827 100644 --- a/scripts/flowctl/commands/task.py +++ b/scripts/flowctl/commands/task.py @@ -676,6 +676,14 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None: # Full file replacement mode (like epic set-plan) if has_file: content = read_file_or_stdin(args.file, "Spec file", use_json=args.json) + # Validate spec headings before writing + from flowctl.commands.admin import validate_task_spec_headings + heading_errors = validate_task_spec_headings(content) + if heading_errors: + error_exit( + f"Spec validation failed: {'; '.join(heading_errors)}", + use_json=args.json, + ) atomic_write(task_spec_path, content) task_data["updated_at"] = now_iso() atomic_write_json(task_json_path, task_data) @@ -713,6 +721,15 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None: except ValueError as e: error_exit(str(e), use_json=args.json) + # Validate final spec headings before writing + from flowctl.commands.admin import validate_task_spec_headings + heading_errors = validate_task_spec_headings(updated_spec) + if heading_errors: + error_exit( + f"Spec validation failed after patching: {'; '.join(heading_errors)}", + use_json=args.json, + ) + # Single atomic write for spec, single for JSON atomic_write(task_spec_path, updated_spec) task_data["updated_at"] = now_iso() diff --git a/scripts/flowctl/compat.py b/scripts/flowctl/compat.py index c539adb1..714aaf29 100644 --- a/scripts/flowctl/compat.py +++ b/scripts/flowctl/compat.py @@ -1,4 +1,7 @@ -"""Platform-specific compatibility: file locking (fcntl on Unix, no-op on Windows).""" +"""Platform-specific compatibility: file locking and fsync (fcntl on Unix, no-op on Windows).""" + +import os +import sys try: import fcntl @@ -15,3 +18,15 @@ def _flock(f, lock_type): LOCK_EX = 0 LOCK_UN = 0 + + +def _fsync(fd: int) -> None: + """Platform-aware fsync: uses F_FULLFSYNC on macOS for true hardware flush.""" + if sys.platform == "darwin": + try: + import fcntl as _fcntl + _fcntl.fcntl(fd, _fcntl.F_FULLFSYNC) + return + except (ImportError, AttributeError, OSError): + pass # Fall through to os.fsync + os.fsync(fd) diff --git a/scripts/flowctl/core/git.py b/scripts/flowctl/core/git.py index e9219c90..1cd16921 100644 --- a/scripts/flowctl/core/git.py +++ b/scripts/flowctl/core/git.py @@ -408,6 +408,75 @@ def gather_context_hints(base_branch: str, max_hints: int = 15) -> str: return "Consider these related files:\n" + "\n".join(hints) +def get_diff_context( + base_branch: str, max_bytes: int = 50000 +) -> tuple[str, str]: + """Get diff summary and content between base_branch and HEAD. + + Returns: + tuple: (diff_summary, diff_content) + - diff_summary: output of ``git diff --stat base_branch..HEAD`` + - diff_content: raw diff truncated to *max_bytes*; a + ``[...truncated at N bytes]`` suffix is appended when truncated. + + Both values default to ``""`` on any git error so callers never need + to handle exceptions. + """ + repo_root = get_repo_root() + + # 1. Diff summary (--stat) + diff_summary = "" + try: + stat_result = subprocess.run( + ["git", "diff", "--stat", f"{base_branch}..HEAD"], + capture_output=True, + text=True, + cwd=repo_root, + ) + if stat_result.returncode == 0: + diff_summary = stat_result.stdout.strip() + except (subprocess.CalledProcessError, OSError): + pass + + # 2. Diff content with byte-cap (avoid memory spike on large diffs) + diff_content = "" + try: + proc = subprocess.Popen( + ["git", "diff", f"{base_branch}..HEAD"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=repo_root, + ) + diff_bytes = proc.stdout.read(max_bytes + 1) + was_truncated = len(diff_bytes) > max_bytes + if was_truncated: + diff_bytes = diff_bytes[:max_bytes] + # Drain remaining stdout to avoid blocking the subprocess + while proc.stdout.read(65536): + pass + stderr_bytes = proc.stderr.read() + proc.stdout.close() + proc.stderr.close() + returncode = proc.wait() + + if returncode != 0 and stderr_bytes: + diff_content = ( + f"[git diff failed: " + f"{stderr_bytes.decode('utf-8', errors='replace').strip()}]" + ) + else: + diff_content = diff_bytes.decode("utf-8", errors="replace").strip() + if was_truncated: + diff_content += ( + f"\n\n... [diff truncated at " + f"{max_bytes // 1000}KB]" + ) + except (subprocess.CalledProcessError, OSError): + pass + + return diff_summary, diff_content + + def get_actor() -> str: """Determine current actor for soft-claim semantics. diff --git a/scripts/flowctl/core/io.py b/scripts/flowctl/core/io.py index 5051db96..4048625d 100644 --- a/scripts/flowctl/core/io.py +++ b/scripts/flowctl/core/io.py @@ -40,14 +40,18 @@ def is_supported_schema(version: Any) -> bool: def atomic_write(path: Path, content: str) -> None: - """Write file atomically via temp + rename.""" + """Write file atomically via temp + rename with fsync for durability.""" + from flowctl.compat import _fsync + path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp") try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content) + f.flush() + _fsync(f.fileno()) os.replace(tmp_path, path) - except Exception: + except BaseException: if os.path.exists(tmp_path): os.unlink(tmp_path) raise diff --git a/scripts/flowctl/core/state.py b/scripts/flowctl/core/state.py index 5eed790e..befc215f 100644 --- a/scripts/flowctl/core/state.py +++ b/scripts/flowctl/core/state.py @@ -168,8 +168,33 @@ def _file_locks_path() -> Path: return get_state_dir() / "file_locks.json" +def _file_locks_mutex_path() -> Path: + """Path to the mutex file for file_locks.json read-modify-write.""" + return get_state_dir() / "file_locks.mutex" + + +@contextmanager +def _file_locks_mutex(): + """Acquire mutual exclusion for file_locks.json read-modify-write. + + Prevents race conditions when concurrent workers both read the same state, + decide a file is unlocked, and both write — second overwriting first's lock. + """ + mutex_path = _file_locks_mutex_path() + mutex_path.parent.mkdir(parents=True, exist_ok=True) + with open(mutex_path, "w") as f: + try: + _flock(f, LOCK_EX) + yield + finally: + _flock(f, LOCK_UN) + + def _load_file_locks() -> dict: - """Load file lock registry. Returns {file_path: {task_id, locked_at}}.""" + """Load file lock registry. Returns {file_path: {task_id, locked_at}}. + + NOTE: Callers doing read-modify-write must wrap in _file_locks_mutex(). + """ path = _file_locks_path() if not path.exists(): return {} @@ -188,34 +213,42 @@ def _save_file_locks(locks: dict) -> None: def lock_files(task_id: str, files: list[str]) -> dict: - """Lock files for a task. Returns {locked: [...], already_locked: [{file, owner}]}.""" - locks = _load_file_locks() - locked = [] - already_locked = [] - for f in files: - existing = locks.get(f) - if existing and existing["task_id"] != task_id: - already_locked.append({"file": f, "owner": existing["task_id"]}) - else: - locks[f] = {"task_id": task_id, "locked_at": now_iso()} - locked.append(f) - _save_file_locks(locks) + """Lock files for a task. Returns {locked: [...], already_locked: [{file, owner}]}. + + Uses fcntl.flock for mutual exclusion to prevent race conditions. + """ + with _file_locks_mutex(): + locks = _load_file_locks() + locked = [] + already_locked = [] + for f in files: + existing = locks.get(f) + if existing and existing["task_id"] != task_id: + already_locked.append({"file": f, "owner": existing["task_id"]}) + else: + locks[f] = {"task_id": task_id, "locked_at": now_iso()} + locked.append(f) + _save_file_locks(locks) return {"locked": locked, "already_locked": already_locked} def unlock_files(task_id: str, files: list[str] | None = None) -> list[str]: - """Unlock files owned by task_id. If files=None, unlock all files for this task.""" - locks = _load_file_locks() - unlocked = [] - to_remove = [] - for f, info in locks.items(): - if info["task_id"] == task_id: - if files is None or f in files: - to_remove.append(f) - unlocked.append(f) - for f in to_remove: - del locks[f] - _save_file_locks(locks) + """Unlock files owned by task_id. If files=None, unlock all files for this task. + + Uses fcntl.flock for mutual exclusion to prevent race conditions. + """ + with _file_locks_mutex(): + locks = _load_file_locks() + unlocked = [] + to_remove = [] + for f, info in locks.items(): + if info["task_id"] == task_id: + if files is None or f in files: + to_remove.append(f) + unlocked.append(f) + for f in to_remove: + del locks[f] + _save_file_locks(locks) return unlocked @@ -231,10 +264,14 @@ def list_file_locks() -> dict: def clear_file_locks() -> int: - """Clear all file locks. Returns count cleared.""" - locks = _load_file_locks() - count = len(locks) - _save_file_locks({}) + """Clear all file locks. Returns count cleared. + + Uses fcntl.flock for mutual exclusion to prevent race conditions. + """ + with _file_locks_mutex(): + locks = _load_file_locks() + count = len(locks) + _save_file_locks({}) return count diff --git a/scripts/flowctl/tests/__init__.py b/scripts/flowctl/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/flowctl/tests/conftest.py b/scripts/flowctl/tests/conftest.py new file mode 100644 index 00000000..466705df --- /dev/null +++ b/scripts/flowctl/tests/conftest.py @@ -0,0 +1,98 @@ +"""Shared pytest fixtures for flowctl tests.""" + +import json +import os +import subprocess + +import pytest + + +@pytest.fixture +def flow_dir(tmp_path, monkeypatch): + """Set up a minimal .flow/ directory structure in a temp directory. + + Creates: + .flow/meta.json — schema version 2 + .flow/epics/ — empty + .flow/tasks/ — empty + .flow/specs/ — empty + .flow/config.json — default config + + Changes cwd to tmp_path so flowctl path resolution finds .flow/. + """ + flow = tmp_path / ".flow" + flow.mkdir() + (flow / "epics").mkdir() + (flow / "tasks").mkdir() + (flow / "specs").mkdir() + + meta = {"schema_version": 2, "next_epic": 1} + (flow / "meta.json").write_text(json.dumps(meta, indent=2) + "\n", encoding="utf-8") + + config = {"memory": {"enabled": True}, "stack": {}} + (flow / "config.json").write_text( + json.dumps(config, indent=2) + "\n", encoding="utf-8" + ) + + monkeypatch.chdir(tmp_path) + return flow + + +@pytest.fixture +def git_repo(tmp_path, monkeypatch): + """Initialize a git repo in tmp_path with a .flow/ directory. + + Needed by state.py's get_state_dir() which calls git rev-parse. + Returns the tmp_path (repo root). + """ + monkeypatch.chdir(tmp_path) + + subprocess.run( + ["git", "init"], + cwd=str(tmp_path), + capture_output=True, + check=True, + ) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=str(tmp_path), + capture_output=True, + check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], + cwd=str(tmp_path), + capture_output=True, + check=True, + ) + + # Create .flow/ structure (same as flow_dir fixture but inside git repo) + flow = tmp_path / ".flow" + flow.mkdir() + (flow / "epics").mkdir() + (flow / "tasks").mkdir() + (flow / "specs").mkdir() + + meta = {"schema_version": 2, "next_epic": 1} + (flow / "meta.json").write_text(json.dumps(meta, indent=2) + "\n", encoding="utf-8") + + config = {"memory": {"enabled": True}, "stack": {}} + (flow / "config.json").write_text( + json.dumps(config, indent=2) + "\n", encoding="utf-8" + ) + + # Initial commit so git rev-parse works + subprocess.run( + ["git", "add", "."], + cwd=str(tmp_path), + capture_output=True, + check=True, + ) + subprocess.run( + ["git", "commit", "-m", "init"], + cwd=str(tmp_path), + capture_output=True, + check=True, + ) + + return tmp_path diff --git a/scripts/flowctl/tests/test_config.py b/scripts/flowctl/tests/test_config.py new file mode 100644 index 00000000..0bdf1594 --- /dev/null +++ b/scripts/flowctl/tests/test_config.py @@ -0,0 +1,131 @@ +"""Tests for flowctl.core.config — deep_merge, config loading.""" + +import json + +import pytest + +from flowctl.core.config import deep_merge, get_default_config, load_flow_config + + +class TestDeepMerge: + """Tests for deep_merge() — recursive dict merging.""" + + def test_flat_merge(self): + """Simple key-value merge, override wins.""" + base = {"a": 1, "b": 2} + override = {"b": 3, "c": 4} + result = deep_merge(base, override) + assert result == {"a": 1, "b": 3, "c": 4} + + def test_nested_merge(self): + """Nested dicts should be merged recursively.""" + base = {"level1": {"a": 1, "b": 2}} + override = {"level1": {"b": 3, "c": 4}} + result = deep_merge(base, override) + assert result == {"level1": {"a": 1, "b": 3, "c": 4}} + + def test_deep_nested_merge(self): + """Multiple levels of nesting should all merge.""" + base = {"l1": {"l2": {"l3": {"a": 1, "b": 2}}}} + override = {"l1": {"l2": {"l3": {"b": 3}}}} + result = deep_merge(base, override) + assert result == {"l1": {"l2": {"l3": {"a": 1, "b": 3}}}} + + def test_override_dict_with_scalar(self): + """When override provides a scalar for a dict key, scalar wins.""" + base = {"a": {"nested": True}} + override = {"a": "flat_value"} + result = deep_merge(base, override) + assert result == {"a": "flat_value"} + + def test_override_scalar_with_dict(self): + """When override provides a dict for a scalar key, dict wins.""" + base = {"a": "flat_value"} + override = {"a": {"nested": True}} + result = deep_merge(base, override) + assert result == {"a": {"nested": True}} + + def test_empty_override(self): + """Empty override should return base unchanged.""" + base = {"a": 1, "b": {"c": 2}} + result = deep_merge(base, {}) + assert result == base + + def test_empty_base(self): + """Empty base should return override.""" + override = {"a": 1, "b": {"c": 2}} + result = deep_merge({}, override) + assert result == override + + def test_both_empty(self): + result = deep_merge({}, {}) + assert result == {} + + def test_does_not_mutate_base(self): + """deep_merge should not mutate the base dict.""" + base = {"a": 1, "b": {"c": 2}} + original_base = {"a": 1, "b": {"c": 2}} + deep_merge(base, {"a": 99}) + assert base == original_base + + def test_lists_are_replaced_not_merged(self): + """Lists should be replaced entirely, not concatenated.""" + base = {"items": [1, 2, 3]} + override = {"items": [4, 5]} + result = deep_merge(base, override) + assert result == {"items": [4, 5]} + + def test_none_override(self): + """None in override should replace base value.""" + base = {"a": "something"} + override = {"a": None} + result = deep_merge(base, override) + assert result == {"a": None} + + +class TestGetDefaultConfig: + def test_has_required_keys(self): + config = get_default_config() + assert "memory" in config + assert "planSync" in config + assert "review" in config + assert "scouts" in config + assert "stack" in config + + def test_memory_enabled_by_default(self): + config = get_default_config() + assert config["memory"]["enabled"] is True + + +class TestLoadFlowConfig: + def test_returns_defaults_when_no_config(self, flow_dir, monkeypatch): + """When config.json is missing, should return defaults.""" + # Remove the config file that flow_dir fixture created + config_path = flow_dir / "config.json" + config_path.unlink() + + config = load_flow_config() + defaults = get_default_config() + assert config == defaults + + def test_merges_with_defaults(self, flow_dir, monkeypatch): + """Partial config should be merged with defaults.""" + config_path = flow_dir / "config.json" + partial = {"memory": {"enabled": False}} + config_path.write_text(json.dumps(partial) + "\n", encoding="utf-8") + + config = load_flow_config() + # Overridden value + assert config["memory"]["enabled"] is False + # Default values still present + assert "planSync" in config + assert "review" in config + + def test_handles_invalid_json(self, flow_dir, monkeypatch): + """Invalid JSON should return defaults.""" + config_path = flow_dir / "config.json" + config_path.write_text("not json{{{", encoding="utf-8") + + config = load_flow_config() + defaults = get_default_config() + assert config == defaults diff --git a/scripts/flowctl/tests/test_ids.py b/scripts/flowctl/tests/test_ids.py new file mode 100644 index 00000000..96a6a1c6 --- /dev/null +++ b/scripts/flowctl/tests/test_ids.py @@ -0,0 +1,272 @@ +"""Tests for flowctl.core.ids — ID parsing, generation, and validation.""" + +import pytest + +from flowctl.core.ids import ( + epic_id_from_task, + generate_epic_suffix, + is_epic_id, + is_task_id, + normalize_epic, + normalize_task, + parse_id, + slugify, + task_priority, +) + + +# --- parse_id --- + + +class TestParseId: + """Tests for parse_id() — the highest-risk function, used everywhere.""" + + def test_legacy_epic(self): + """fn-1 -> (1, None)""" + assert parse_id("fn-1") == (1, None) + + def test_legacy_task(self): + """fn-1.2 -> (1, 2)""" + assert parse_id("fn-1.2") == (1, 2) + + def test_legacy_large_numbers(self): + """fn-99.100 -> (99, 100)""" + assert parse_id("fn-99.100") == (99, 100) + + def test_short_suffix_epic(self): + """fn-5-x7k -> (5, None)""" + assert parse_id("fn-5-x7k") == (5, None) + + def test_short_suffix_task(self): + """fn-5-x7k.3 -> (5, 3)""" + assert parse_id("fn-5-x7k.3") == (5, 3) + + def test_slug_suffix_epic(self): + """fn-4-flowctl-comprehensive-optimization-and -> (4, None)""" + assert parse_id("fn-4-flowctl-comprehensive-optimization-and") == (4, None) + + def test_slug_suffix_task(self): + """fn-4-flowctl-comprehensive-optimization-and.1 -> (4, 1)""" + assert parse_id("fn-4-flowctl-comprehensive-optimization-and.1") == (4, 1) + + def test_two_char_suffix(self): + """fn-3-ab -> (3, None)""" + assert parse_id("fn-3-ab") == (3, None) + + def test_single_char_suffix(self): + """fn-3-a -> (3, None)""" + assert parse_id("fn-3-a") == (3, None) + + def test_invalid_empty(self): + assert parse_id("") == (None, None) + + def test_invalid_no_prefix(self): + assert parse_id("task-1") == (None, None) + + def test_invalid_no_number(self): + assert parse_id("fn-") == (None, None) + + def test_invalid_no_fn_prefix(self): + assert parse_id("1.2") == (None, None) + + def test_invalid_uppercase(self): + """Suffixes must be lowercase.""" + assert parse_id("fn-1-ABC") == (None, None) + + def test_invalid_special_chars(self): + assert parse_id("fn-1-a_b") == (None, None) + + def test_invalid_double_dot(self): + assert parse_id("fn-1.2.3") == (None, None) + + def test_zero_epic(self): + """fn-0 should be valid (0 is a valid integer).""" + assert parse_id("fn-0") == (0, None) + + def test_zero_task(self): + """fn-0.0 should be valid.""" + assert parse_id("fn-0.0") == (0, 0) + + def test_suffix_with_digits(self): + """fn-2-a3b -> (2, None)""" + assert parse_id("fn-2-a3b") == (2, None) + + def test_long_slug_with_many_segments(self): + """Multi-segment slug suffix with task number.""" + assert parse_id("fn-10-my-long-slug-name.5") == (10, 5) + + +# --- is_epic_id / is_task_id --- + + +class TestIsEpicId: + def test_epic_id(self): + assert is_epic_id("fn-1") is True + + def test_task_id_not_epic(self): + assert is_epic_id("fn-1.2") is False + + def test_invalid_not_epic(self): + assert is_epic_id("garbage") is False + + def test_slug_epic(self): + assert is_epic_id("fn-4-flowctl-opt") is True + + +class TestIsTaskId: + def test_task_id(self): + assert is_task_id("fn-1.2") is True + + def test_epic_id_not_task(self): + assert is_task_id("fn-1") is False + + def test_invalid_not_task(self): + assert is_task_id("garbage") is False + + def test_slug_task(self): + assert is_task_id("fn-4-opt.1") is True + + +# --- epic_id_from_task --- + + +class TestEpicIdFromTask: + def test_legacy(self): + assert epic_id_from_task("fn-1.2") == "fn-1" + + def test_with_suffix(self): + assert epic_id_from_task("fn-5-x7k.3") == "fn-5-x7k" + + def test_with_slug(self): + assert epic_id_from_task("fn-4-flowctl-opt.1") == "fn-4-flowctl-opt" + + def test_invalid_raises(self): + with pytest.raises(ValueError): + epic_id_from_task("fn-1") # epic ID, not task + + def test_garbage_raises(self): + with pytest.raises(ValueError): + epic_id_from_task("garbage") + + +# --- slugify --- + + +class TestSlugify: + def test_basic(self): + assert slugify("Hello World") == "hello-world" + + def test_special_chars(self): + assert slugify("Hello! @World#") == "hello-world" + + def test_underscores(self): + assert slugify("my_cool_thing") == "my-cool-thing" + + def test_unicode(self): + result = slugify("café résumé") + assert result == "cafe-resume" + + def test_max_length(self): + result = slugify("a" * 100, max_length=10) + assert len(result) <= 10 + + def test_max_length_word_boundary(self): + result = slugify("hello-world-foo-bar", max_length=12) + # Should truncate at word boundary + assert len(result) <= 12 + assert not result.endswith("-") + + def test_empty_returns_none(self): + assert slugify("!!!") is None + + def test_all_whitespace_returns_none(self): + # After processing, should produce empty string + assert slugify(" ") is None + + +# --- generate_epic_suffix --- + + +class TestGenerateEpicSuffix: + def test_default_length(self): + suffix = generate_epic_suffix() + assert len(suffix) == 3 + + def test_custom_length(self): + suffix = generate_epic_suffix(length=5) + assert len(suffix) == 5 + + def test_valid_chars(self): + """Suffix should only contain lowercase letters and digits.""" + import string + + valid = set(string.ascii_lowercase + string.digits) + for _ in range(20): # run multiple times for randomness + suffix = generate_epic_suffix() + assert all(c in valid for c in suffix) + + +# --- normalize_epic / normalize_task --- + + +class TestNormalizeEpic: + def test_adds_missing_fields(self): + data = {} + result = normalize_epic(data) + assert result["plan_review_status"] == "unknown" + assert result["plan_reviewed_at"] is None + assert result["completion_review_status"] == "unknown" + assert result["completion_reviewed_at"] is None + assert result["branch_name"] is None + assert result["depends_on_epics"] == [] + assert result["default_impl"] is None + assert result["default_review"] is None + assert result["default_sync"] is None + assert result["gaps"] == [] + + def test_preserves_existing(self): + data = {"plan_review_status": "ship", "branch_name": "my-branch"} + result = normalize_epic(data) + assert result["plan_review_status"] == "ship" + assert result["branch_name"] == "my-branch" + + +class TestNormalizeTask: + def test_adds_missing_fields(self): + data = {} + result = normalize_task(data) + assert result["priority"] is None + assert result["depends_on"] == [] + assert result["impl"] is None + assert result["review"] is None + assert result["sync"] is None + + def test_migrates_legacy_deps(self): + data = {"deps": ["fn-1.1", "fn-1.2"]} + result = normalize_task(data) + assert result["depends_on"] == ["fn-1.1", "fn-1.2"] + + def test_depends_on_preferred_over_deps(self): + data = {"depends_on": ["fn-1.3"], "deps": ["fn-1.1"]} + result = normalize_task(data) + assert result["depends_on"] == ["fn-1.3"] + + +# --- task_priority --- + + +class TestTaskPriority: + def test_none_priority(self): + assert task_priority({"priority": None}) == 999 + + def test_missing_priority(self): + assert task_priority({}) == 999 + + def test_numeric_priority(self): + assert task_priority({"priority": 1}) == 1 + + def test_string_priority(self): + assert task_priority({"priority": "5"}) == 5 + + def test_invalid_priority(self): + assert task_priority({"priority": "not-a-number"}) == 999 diff --git a/scripts/flowctl/tests/test_io.py b/scripts/flowctl/tests/test_io.py new file mode 100644 index 00000000..264e0e71 --- /dev/null +++ b/scripts/flowctl/tests/test_io.py @@ -0,0 +1,155 @@ +"""Tests for flowctl.core.io — atomic writes, JSON loading, output formatting.""" + +import json +import os + +import pytest + +from flowctl.core.io import ( + atomic_write, + atomic_write_json, + is_supported_schema, + load_json, + now_iso, +) + + +class TestAtomicWrite: + """Tests for atomic_write() — temp + rename pattern.""" + + def test_creates_file(self, tmp_path): + """atomic_write should create a new file with correct content.""" + target = tmp_path / "output.txt" + atomic_write(target, "hello world\n") + assert target.exists() + assert target.read_text(encoding="utf-8") == "hello world\n" + + def test_overwrites_existing(self, tmp_path): + """atomic_write should overwrite existing file.""" + target = tmp_path / "output.txt" + target.write_text("old content", encoding="utf-8") + atomic_write(target, "new content") + assert target.read_text(encoding="utf-8") == "new content" + + def test_creates_parent_dirs(self, tmp_path): + """atomic_write should create parent directories if missing.""" + target = tmp_path / "a" / "b" / "c" / "output.txt" + atomic_write(target, "deep file") + assert target.read_text(encoding="utf-8") == "deep file" + + def test_cleans_up_on_error(self, tmp_path): + """atomic_write should remove temp file if write fails.""" + target = tmp_path / "output.txt" + + class BadStr: + """Object that raises on write.""" + + def __str__(self): + raise RuntimeError("boom") + + # This should raise because we can't write a non-string + with pytest.raises(TypeError): + atomic_write(target, BadStr()) # type: ignore + + # No temp files should remain + remaining = list(tmp_path.glob("*.tmp")) + assert remaining == [], f"Temp files not cleaned up: {remaining}" + + def test_atomic_write_unicode(self, tmp_path): + """atomic_write should handle unicode content.""" + target = tmp_path / "unicode.txt" + text = "Hello \u00e9\u00e8\u00ea \u2603 \U0001f600" + atomic_write(target, text) + content = target.read_text(encoding="utf-8") + assert "\u00e9" in content + assert "\u2603" in content + assert "\U0001f600" in content + + +class TestAtomicWriteJson: + """Tests for atomic_write_json() — sorted keys, trailing newline.""" + + def test_roundtrip(self, tmp_path): + """Write JSON and load it back.""" + target = tmp_path / "data.json" + data = {"b": 2, "a": 1, "nested": {"x": True}} + atomic_write_json(target, data) + + loaded = load_json(target) + assert loaded == data + + def test_sorted_keys(self, tmp_path): + """Keys should be sorted in output.""" + target = tmp_path / "data.json" + atomic_write_json(target, {"z": 1, "a": 2, "m": 3}) + raw = target.read_text(encoding="utf-8") + # 'a' should appear before 'm' which should appear before 'z' + assert raw.index('"a"') < raw.index('"m"') < raw.index('"z"') + + def test_trailing_newline(self, tmp_path): + """JSON output should end with newline.""" + target = tmp_path / "data.json" + atomic_write_json(target, {"key": "value"}) + raw = target.read_text(encoding="utf-8") + assert raw.endswith("\n") + + +class TestLoadJson: + """Tests for load_json() — basic JSON file loading.""" + + def test_load_valid(self, tmp_path): + target = tmp_path / "valid.json" + target.write_text('{"key": "value"}', encoding="utf-8") + result = load_json(target) + assert result == {"key": "value"} + + def test_load_invalid_raises(self, tmp_path): + target = tmp_path / "invalid.json" + target.write_text("not json", encoding="utf-8") + with pytest.raises(json.JSONDecodeError): + load_json(target) + + def test_load_missing_raises(self, tmp_path): + target = tmp_path / "missing.json" + with pytest.raises(FileNotFoundError): + load_json(target) + + +class TestNowIso: + """Tests for now_iso() — ISO timestamp generation.""" + + def test_format(self): + result = now_iso() + assert result.endswith("Z") + # Should be parseable as ISO format + assert "T" in result + + def test_contains_date_parts(self): + result = now_iso() + # Should contain year-month-day + parts = result.split("T") + assert len(parts) == 2 + date_parts = parts[0].split("-") + assert len(date_parts) == 3 # year, month, day + + +class TestIsSupportedSchema: + """Tests for is_supported_schema() — version checking.""" + + def test_version_1(self): + assert is_supported_schema(1) is True + + def test_version_2(self): + assert is_supported_schema(2) is True + + def test_version_string(self): + """String versions should also work via int conversion.""" + assert is_supported_schema("1") is True + assert is_supported_schema("2") is True + + def test_unsupported_version(self): + assert is_supported_schema(99) is False + + def test_invalid_type(self): + assert is_supported_schema("abc") is False + assert is_supported_schema(None) is False diff --git a/scripts/flowctl/tests/test_state.py b/scripts/flowctl/tests/test_state.py new file mode 100644 index 00000000..891dcc68 --- /dev/null +++ b/scripts/flowctl/tests/test_state.py @@ -0,0 +1,178 @@ +"""Tests for flowctl.core.state — StateStore and task loading with state merge.""" + +import json +import os + +import pytest + +from flowctl.core.state import LocalFileStateStore, load_task_with_state + + +class TestLocalFileStateStore: + """Tests for LocalFileStateStore — file-based state with fcntl locking.""" + + def test_save_and_load_runtime(self, tmp_path): + """save_runtime creates file, load_runtime reads it back.""" + store = LocalFileStateStore(tmp_path) + store.save_runtime("fn-1.1", {"status": "in_progress", "assignee": "alice"}) + + result = store.load_runtime("fn-1.1") + assert result is not None + assert result["status"] == "in_progress" + assert result["assignee"] == "alice" + + def test_load_missing_returns_none(self, tmp_path): + """load_runtime returns None for non-existent task.""" + store = LocalFileStateStore(tmp_path) + assert store.load_runtime("fn-999.1") is None + + def test_load_corrupt_returns_none(self, tmp_path): + """load_runtime returns None for corrupt JSON.""" + store = LocalFileStateStore(tmp_path) + tasks_dir = tmp_path / "tasks" + tasks_dir.mkdir(parents=True) + corrupt_path = tasks_dir / "fn-1.1.state.json" + corrupt_path.write_text("not valid json{{{", encoding="utf-8") + + assert store.load_runtime("fn-1.1") is None + + def test_save_overwrites(self, tmp_path): + """save_runtime overwrites existing state.""" + store = LocalFileStateStore(tmp_path) + store.save_runtime("fn-1.1", {"status": "todo"}) + store.save_runtime("fn-1.1", {"status": "done"}) + + result = store.load_runtime("fn-1.1") + assert result["status"] == "done" + + def test_list_runtime_files_empty(self, tmp_path): + """list_runtime_files returns empty list when no state files.""" + store = LocalFileStateStore(tmp_path) + assert store.list_runtime_files() == [] + + def test_list_runtime_files(self, tmp_path): + """list_runtime_files returns task IDs that have state.""" + store = LocalFileStateStore(tmp_path) + store.save_runtime("fn-1.1", {"status": "todo"}) + store.save_runtime("fn-1.2", {"status": "done"}) + + ids = store.list_runtime_files() + assert sorted(ids) == ["fn-1.1", "fn-1.2"] + + def test_lock_task_context_manager(self, tmp_path): + """lock_task should work as a context manager without errors.""" + store = LocalFileStateStore(tmp_path) + with store.lock_task("fn-1.1"): + # Should be able to do operations inside the lock + store.save_runtime("fn-1.1", {"status": "in_progress"}) + + result = store.load_runtime("fn-1.1") + assert result["status"] == "in_progress" + + def test_state_path(self, tmp_path): + """Internal _state_path should use correct naming convention.""" + store = LocalFileStateStore(tmp_path) + path = store._state_path("fn-1.1") + assert path.name == "fn-1.1.state.json" + assert path.parent.name == "tasks" + + +class TestLoadTaskWithState: + """Tests for load_task_with_state — merges definition with runtime state.""" + + def _setup_task(self, git_repo, task_id, definition, runtime=None): + """Helper to create task definition and optional runtime state.""" + flow_dir = git_repo / ".flow" + tasks_dir = flow_dir / "tasks" + tasks_dir.mkdir(parents=True, exist_ok=True) + + # Write definition file + def_path = tasks_dir / f"{task_id}.json" + def_path.write_text( + json.dumps(definition, indent=2) + "\n", encoding="utf-8" + ) + + if runtime is not None: + # Write runtime state to git state dir + # Use FLOW_STATE_DIR env var to point to a known location + state_dir = git_repo / ".git" / "flow-state" + state_tasks = state_dir / "tasks" + state_tasks.mkdir(parents=True, exist_ok=True) + state_path = state_tasks / f"{task_id}.state.json" + state_path.write_text( + json.dumps(runtime, indent=2) + "\n", encoding="utf-8" + ) + os.environ["FLOW_STATE_DIR"] = str(state_dir) + + def test_merges_correctly(self, git_repo, monkeypatch): + """Runtime state should override definition for runtime fields.""" + definition = { + "id": "fn-1.1", + "title": "Test task", + "epic": "fn-1", + "status": "todo", + } + runtime = {"status": "in_progress", "assignee": "bob"} + self._setup_task(git_repo, "fn-1.1", definition, runtime) + + result = load_task_with_state("fn-1.1") + assert result["status"] == "in_progress" + assert result["assignee"] == "bob" + assert result["title"] == "Test task" + + def test_handles_missing_state(self, git_repo, monkeypatch): + """When no runtime state exists, should use definition fields or defaults.""" + definition = { + "id": "fn-1.1", + "title": "Test task", + "epic": "fn-1", + "status": "todo", + } + self._setup_task(git_repo, "fn-1.1", definition) + + # Point FLOW_STATE_DIR to an empty directory + empty_state = git_repo / "empty-state" + empty_state.mkdir() + os.environ["FLOW_STATE_DIR"] = str(empty_state) + + result = load_task_with_state("fn-1.1") + # Should fall back to definition's status + assert result["status"] == "todo" + assert result["title"] == "Test task" + + def test_normalizes_task(self, git_repo, monkeypatch): + """Result should have all normalized fields (priority, depends_on, etc.).""" + definition = { + "id": "fn-1.1", + "title": "Test task", + "epic": "fn-1", + } + self._setup_task(git_repo, "fn-1.1", definition) + + empty_state = git_repo / "empty-state" + empty_state.mkdir(exist_ok=True) + os.environ["FLOW_STATE_DIR"] = str(empty_state) + + result = load_task_with_state("fn-1.1") + # normalize_task should have added defaults + assert result["priority"] is None + assert result["depends_on"] == [] + assert result["impl"] is None + assert result["review"] is None + assert result["sync"] is None + + def test_definition_without_status(self, git_repo, monkeypatch): + """When definition has no status and no runtime state, defaults to 'todo'.""" + definition = { + "id": "fn-1.1", + "title": "No status task", + "epic": "fn-1", + } + self._setup_task(git_repo, "fn-1.1", definition) + + empty_state = git_repo / "empty-state" + empty_state.mkdir(exist_ok=True) + os.environ["FLOW_STATE_DIR"] = str(empty_state) + + result = load_task_with_state("fn-1.1") + assert result["status"] == "todo" diff --git a/skills/flow-code-ralph-init/templates/ralph.sh b/skills/flow-code-ralph-init/templates/ralph.sh index bf28b938..ae26bafd 100644 --- a/skills/flow-code-ralph-init/templates/ralph.sh +++ b/skills/flow-code-ralph-init/templates/ralph.sh @@ -440,6 +440,9 @@ MEMORY_AUTO_SAVE="${MEMORY_AUTO_SAVE:-0}" TDD_MODE="${TDD_MODE:-0}" export CODEX_SANDBOX # Ensure available to Claude worker for flowctl codex commands +# Dry-run mode: run selector loop only, no Claude invocation +DRY_RUN="${DRY_RUN:-0}" + # Parse command line arguments while [[ $# -gt 0 ]]; do case "$1" in @@ -456,6 +459,10 @@ while [[ $# -gt 0 ]]; do # Already processed in pre-scan; just consume args shift ;; + --dry-run) + DRY_RUN=1 + shift + ;; --help|-h) echo "Usage: ralph.sh [options]" echo "" @@ -463,6 +470,7 @@ while [[ $# -gt 0 ]]; do echo " --config Use alternate config file (default: config.env)" echo " --watch Show tool calls in real-time" echo " --watch verbose Show tool calls + model responses" + echo " --dry-run Run selector loop only; no Claude invocation or state changes" echo " --help, -h Show this help" echo "" echo "Environment variables:" @@ -604,6 +612,43 @@ RUN_ID="$(date -u +%Y%m%d-%H%M%S)-$(rand4)" RUN_ID_FULL="$(date -u +%Y%m%dT%H%M%SZ)-$(hostname -s 2>/dev/null || hostname)-$(sanitize_id "$(get_actor)")-$$-$(rand4)" RUN_DIR="$SCRIPT_DIR/runs/$RUN_ID" mkdir -p "$RUN_DIR" + +# ───────────────────────────────────────────────────────────────────────────── +# Concurrent run lock (Python fcntl.flock — works on macOS/Linux) +# Prevents multiple ralph.sh instances running against the same repo. +# Bash opens fd 200 on the lock file; Python acquires LOCK_EX|LOCK_NB on it. +# Lock is held for shell process lifetime (fd 200 stays open until exit). +# Windows: fcntl unavailable — lock is advisory-only (allows the run). +# ───────────────────────────────────────────────────────────────────────────── +RALPH_LOCK_FILE="$SCRIPT_DIR/.ralph.lock" + +# Open fd 200 on the lock file (bash holds this fd for its lifetime) +exec 200>"$RALPH_LOCK_FILE" + +# Use Python to acquire non-blocking exclusive lock on fd 200 +_lock_result="$("$PYTHON_BIN" - <<'PY' +import sys, os +try: + import fcntl + try: + fcntl.flock(200, fcntl.LOCK_EX | fcntl.LOCK_NB) + # Write PID for diagnostics + os.ftruncate(200, 0) + os.lseek(200, 0, os.SEEK_SET) + os.write(200, f"{os.getppid()}\n".encode()) + print("OK") + except (IOError, OSError): + print("LOCKED") +except ImportError: + # Windows: no fcntl — advisory only, allow the run + print("OK") +PY +)" +if [[ "$_lock_result" == "LOCKED" ]]; then + fail "another ralph instance is already running in this directory (lock: $RALPH_LOCK_FILE). Remove the lock file to force-clear if the previous run crashed." +fi +unset _lock_result + ATTEMPTS_FILE="$RUN_DIR/attempts.json" ensure_attempts_file "$ATTEMPTS_FILE" BRANCHES_FILE="$RUN_DIR/branches.json" @@ -910,38 +955,54 @@ maybe_close_epics() { done } -verify_receipt() { +# Read and verify receipt in a single atomic operation (no TOCTOU gap). +# Returns JSON: {"valid": bool, "verdict": str, "error": str} +# Caller checks validity via json_get on the output, avoiding separate +# file-exists check + read that could race with concurrent writers. +read_and_verify_receipt() { local path="$1" local kind="$2" local id="$3" - [[ -f "$path" ]] || return 1 "$PYTHON_BIN" - "$path" "$kind" "$id" <<'PY' import json, sys path, kind, rid = sys.argv[1], sys.argv[2], sys.argv[3] +result = {"valid": False, "verdict": "", "error": ""} try: - data = json.load(open(path, encoding="utf-8")) -except Exception: - sys.exit(1) + with open(path, encoding="utf-8") as f: + data = json.load(f) +except FileNotFoundError: + result["error"] = "file_not_found" + print(json.dumps(result)) + sys.exit(0) +except Exception as e: + result["error"] = f"parse_error: {e}" + print(json.dumps(result)) + sys.exit(0) if data.get("type") != kind: - sys.exit(1) + result["error"] = f"type_mismatch: expected={kind} got={data.get('type')}" + print(json.dumps(result)) + sys.exit(0) if data.get("id") != rid: - sys.exit(1) -sys.exit(0) + result["error"] = f"id_mismatch: expected={rid} got={data.get('id')}" + print(json.dumps(result)) + sys.exit(0) +result["valid"] = True +result["verdict"] = data.get("verdict", "") +print(json.dumps(result)) PY } -# Read verdict field from receipt file (returns empty string if not found/error) -read_receipt_verdict() { +# Backward-compat wrapper: returns 0 if receipt is valid, 1 otherwise +verify_receipt() { local path="$1" - [[ -f "$path" ]] || return 0 - "$PYTHON_BIN" - "$path" <<'PY' -import json, sys -try: - data = json.load(open(sys.argv[1], encoding="utf-8")) - print(data.get("verdict", "")) -except Exception: - pass -PY + local kind="$2" + local id="$3" + local result + result="$(read_and_verify_receipt "$path" "$kind" "$id")" + local valid + valid="$(json_get valid "$result")" + [[ "$valid" == "1" ]] && return 0 + return 1 } # Create/switch to run branch (once at start, all epics work here) @@ -1313,7 +1374,8 @@ ui_config ui_version_check # Create run branch once at start (all epics work on same branch) -ensure_run_branch +# Skip in dry-run mode — no branches, no state changes +[[ "$DRY_RUN" != "1" ]] && ensure_run_branch # Freeze scope snapshot (opt-in via FREEZE_SCOPE=1) freeze_scope @@ -1427,6 +1489,13 @@ while (( iter <= MAX_ITERATIONS )); do fail "invalid selector status: $status" fi + # Dry-run mode: print iteration info and skip Claude invocation entirely + if [[ "$DRY_RUN" == "1" ]]; then + echo "iter=$iter status=$status epic=${epic_id:-} task=${task_id:-}" + iter=$((iter + 1)) + continue + fi + export REVIEW_MODE export FLOW_RALPH="1" claude_args=(-p) @@ -1558,16 +1627,20 @@ Violations break automation and leave the user with incomplete work. Be precise, fi receipt_verdict="" if [[ "$status" == "work" && ( "$WORK_REVIEW" == "rp" || "$WORK_REVIEW" == "codex" ) ]]; then - if ! verify_receipt "$REVIEW_RECEIPT_PATH" "impl_review" "$task_id"; then - echo "ralph: missing impl review receipt; forcing retry" >> "$iter_log" - log "missing impl receipt; forcing retry" + # Single atomic read+verify (no TOCTOU gap between verify and verdict read) + _receipt_result="$(read_and_verify_receipt "$REVIEW_RECEIPT_PATH" "impl_review" "$task_id")" + _receipt_valid="$(json_get valid "$_receipt_result")" + if [[ "$_receipt_valid" != "1" ]]; then + _receipt_err="$(json_get error "$_receipt_result")" + echo "ralph: invalid impl review receipt ($REVIEW_RECEIPT_PATH): $_receipt_err; forcing retry" >> "$iter_log" + log "invalid impl receipt: $_receipt_err; forcing retry" impl_receipt_ok="0" # Delete corrupted/partial receipt so next attempt starts clean rm -f "$REVIEW_RECEIPT_PATH" 2>/dev/null || true force_retry=1 else - # Receipt is valid - read the verdict field - receipt_verdict="$(read_receipt_verdict "$REVIEW_RECEIPT_PATH")" + # Receipt is valid - verdict was read in the same pass (no TOCTOU) + receipt_verdict="$(json_get verdict "$_receipt_result")" fi fi From 220801803c9866ccdc84e414602953259611374f Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 11:57:44 +0800 Subject: [PATCH 2/6] fix(task): spec validation rejects duplicates only, not missing headings The spec heading validation added by task .6 rejected all heading errors including missing headings, which breaks the smoke test set-spec --file test case (legitimate specs without ## Done summary / ## Evidence). Per acceptance criteria, only duplicate headings should be rejected. Task: fn-4-flowctl-comprehensive-optimization-and.4 --- scripts/flowctl/commands/task.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/scripts/flowctl/commands/task.py b/scripts/flowctl/commands/task.py index 393fc827..bc4e9478 100644 --- a/scripts/flowctl/commands/task.py +++ b/scripts/flowctl/commands/task.py @@ -676,12 +676,14 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None: # Full file replacement mode (like epic set-plan) if has_file: content = read_file_or_stdin(args.file, "Spec file", use_json=args.json) - # Validate spec headings before writing + # Validate spec headings before writing: reject duplicates from flowctl.commands.admin import validate_task_spec_headings heading_errors = validate_task_spec_headings(content) - if heading_errors: + # Only reject on duplicate headings, not missing ones + dup_errors = [e for e in heading_errors if e.startswith("Duplicate")] + if dup_errors: error_exit( - f"Spec validation failed: {'; '.join(heading_errors)}", + f"Spec validation failed: {'; '.join(dup_errors)}", use_json=args.json, ) atomic_write(task_spec_path, content) @@ -721,12 +723,14 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None: except ValueError as e: error_exit(str(e), use_json=args.json) - # Validate final spec headings before writing + # Validate final spec headings before writing: reject duplicates from flowctl.commands.admin import validate_task_spec_headings heading_errors = validate_task_spec_headings(updated_spec) - if heading_errors: + # Only reject on duplicate headings, not missing ones + dup_errors = [e for e in heading_errors if e.startswith("Duplicate")] + if dup_errors: error_exit( - f"Spec validation failed after patching: {'; '.join(heading_errors)}", + f"Spec validation failed after patching: {'; '.join(dup_errors)}", use_json=args.json, ) From 7dab5dfc4e3ed1aa4f73026d6844b22a4833653d Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 12:03:47 +0800 Subject: [PATCH 3/6] test(guard): add 56 pytest tests for ralph-guard.py hook logic - Tests handle_pre_tool_use with 11 blocked command patterns (chat-send --json, --new-chat re-review, direct codex exec/review, --last, setup-review missing flags, select-add missing --window, flowctl done missing flags, receipt before review) - Tests handle_pre_tool_use with 11 allowed patterns (flowctl wrappers, chat-send without --json, first --new-chat, done with all flags, git/flowctl show, etc.) - Tests handle_post_tool_use state transitions (chat-send success/null, flowctl done tracking, codex review verdicts, receipt reset, setup-review W=/T= tracking) - Tests edge cases (done in comments, echo, multi-line commands, $FLOWCTL variable) - Tests protected file checks, parse_receipt_path, state persistence, stop events Task: fn-4-flowctl-comprehensive-optimization-and.3 --- scripts/flowctl/tests/test_guard.py | 699 ++++++++++++++++++++++++++++ 1 file changed, 699 insertions(+) create mode 100644 scripts/flowctl/tests/test_guard.py diff --git a/scripts/flowctl/tests/test_guard.py b/scripts/flowctl/tests/test_guard.py new file mode 100644 index 00000000..8b5e701b --- /dev/null +++ b/scripts/flowctl/tests/test_guard.py @@ -0,0 +1,699 @@ +"""Tests for ralph-guard.py hook logic with fixture-based JSON payloads. + +ralph-guard.py is a standalone Python script (not a package module), so we +import it via importlib. Each test function builds a fixture dict that mimics +the JSON Claude Code sends to hooks, then calls the handler directly. + +Because handle_pre_tool_use / handle_post_tool_use call sys.exit(), we catch +SystemExit to inspect the exit code (0 = allow, 2 = block). +""" + +import importlib.util +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +# --------------------------------------------------------------------------- +# Import ralph-guard.py as a module (filename contains a hyphen) +# --------------------------------------------------------------------------- +_GUARD_PATH = ( + Path(__file__).resolve().parents[2] / "hooks" / "ralph-guard.py" +) + + +def _load_guard(): + """Load ralph-guard.py as a Python module.""" + spec = importlib.util.spec_from_file_location("ralph_guard", _GUARD_PATH) + mod = importlib.util.module_from_spec(spec) + # Patch FLOW_RALPH so module-level code doesn't exit during import + with patch.dict(os.environ, {"FLOW_RALPH": "1"}): + spec.loader.exec_module(mod) + return mod + + +guard = _load_guard() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _pre_payload(command: str, *, session_id: str = "test-session") -> dict: + """Build a PreToolUse fixture for a Bash command.""" + return { + "hook_event_name": "PreToolUse", + "tool_name": "Bash", + "tool_input": {"command": command}, + "session_id": session_id, + } + + +def _post_payload( + command: str, + response: str = "", + *, + session_id: str = "test-session", +) -> dict: + """Build a PostToolUse fixture for a Bash command.""" + return { + "hook_event_name": "PostToolUse", + "tool_name": "Bash", + "tool_input": {"command": command}, + "tool_response": {"stdout": response}, + "session_id": session_id, + } + + +def _stop_payload(*, session_id: str = "test-session", stop_hook_active: bool = False) -> dict: + """Build a Stop event fixture.""" + return { + "hook_event_name": "Stop", + "session_id": session_id, + "stop_hook_active": stop_hook_active, + } + + +def _edit_payload(file_path: str, *, session_id: str = "test-session") -> dict: + """Build a PreToolUse fixture for an Edit tool (protected file check).""" + return { + "hook_event_name": "PreToolUse", + "tool_name": "Edit", + "tool_input": {"file_path": file_path}, + "session_id": session_id, + } + + +def _reset_state(session_id: str = "test-session") -> None: + """Remove any leftover state file for the session.""" + state_file = guard.get_state_file(session_id) + if state_file.exists(): + state_file.unlink() + + +@pytest.fixture(autouse=True) +def _clean_state(): + """Ensure clean state before and after each test.""" + _reset_state() + yield + _reset_state() + + +# =================================================================== +# PreToolUse — blocked command patterns (acceptance: >= 5 cases) +# =================================================================== + + +class TestPreToolUseBlocked: + """Commands that ralph-guard MUST block (exit code 2).""" + + def test_chat_send_json_flag(self): + """--json on chat-send suppresses review text.""" + data = _pre_payload('rp chat-send --json --message "review"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_chat_send_new_chat_on_re_review(self): + """--new-chat on re-reviews loses reviewer context.""" + # Simulate a first chat already sent + state = guard.load_state("test-session") + state["chats_sent"] = 1 + guard.save_state("test-session", state) + + data = _pre_payload('rp chat-send --new-chat --message "re-review"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_direct_codex_exec(self): + """Direct 'codex exec' must be blocked — use flowctl wrappers.""" + data = _pre_payload('codex exec "review this code"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_direct_codex_review(self): + """Direct 'codex review' must be blocked.""" + data = _pre_payload("codex review --diff main..HEAD") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_codex_last_flag(self): + """--last flag on codex (even through wrapper) breaks session continuity.""" + data = _pre_payload("flowctl codex impl-review --last") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_setup_review_missing_repo_root(self): + """setup-review without --repo-root must be blocked.""" + data = _pre_payload('setup-review --summary "test"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_setup_review_missing_summary(self): + """setup-review without --summary must be blocked.""" + data = _pre_payload("setup-review --repo-root /tmp/repo") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_select_add_missing_window(self): + """select-add without --window must be blocked.""" + data = _pre_payload("select-add src/main.py") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_flowctl_done_missing_evidence(self): + """flowctl done without --evidence-json must be blocked.""" + data = _pre_payload( + 'flowctl done fn-1.1 --summary-file /tmp/s.md' + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_flowctl_done_missing_summary(self): + """flowctl done without --summary-file must be blocked.""" + data = _pre_payload( + "flowctl done fn-1.1 --evidence-json /tmp/e.json" + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_receipt_write_before_review(self, monkeypatch): + """Cannot write receipt before chat-send or codex review succeeds.""" + monkeypatch.setenv("REVIEW_RECEIPT_PATH", "/tmp/receipts/impl-fn-1.1.json") + data = _pre_payload( + 'cat > "/tmp/receipts/impl-fn-1.1.json" << \'EOF\'\n' + '{"type":"impl_review","id":"fn-1.1","verdict":"SHIP"}\n' + "EOF" + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + +# =================================================================== +# PreToolUse — allowed command patterns (acceptance: >= 5 cases) +# =================================================================== + + +class TestPreToolUseAllowed: + """Commands that ralph-guard must allow (exit code 0).""" + + def test_flowctl_codex_impl_review(self): + """flowctl codex wrapper calls are allowed.""" + data = _pre_payload("flowctl codex impl-review fn-1.1 --base abc123") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_flowctl_codex_plan_review(self): + """flowctl codex plan-review wrapper is allowed.""" + data = _pre_payload("flowctl codex plan-review fn-1") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_chat_send_no_json(self): + """chat-send without --json is fine.""" + data = _pre_payload('rp chat-send --message "review this"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_chat_send_new_chat_first_review(self): + """--new-chat on FIRST review is allowed (chats_sent == 0).""" + data = _pre_payload('rp chat-send --new-chat --message "first review"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_flowctl_done_with_all_flags(self): + """flowctl done with both required flags is allowed.""" + data = _pre_payload( + "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json" + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_plain_git_command(self): + """Regular git commands should pass through.""" + data = _pre_payload("git status") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_flowctl_show(self): + """Non-done flowctl commands pass through.""" + data = _pre_payload("flowctl show fn-1.1 --json") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_setup_review_complete(self): + """setup-review with all required flags is allowed.""" + data = _pre_payload( + 'setup-review --repo-root /tmp/repo --summary "review this"' + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_select_add_with_window(self): + """select-add with --window is allowed.""" + data = _pre_payload('select-add --window "1" --tab "abc" src/main.py') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_FLOWCTL_variable_done(self): + """$FLOWCTL done with both flags is allowed.""" + data = _pre_payload( + '"$FLOWCTL" done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json' + ) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_flowctl_done_help(self): + """flowctl done --help should not be blocked.""" + data = _pre_payload("flowctl done --help") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + +# =================================================================== +# PostToolUse — state transitions (acceptance criterion) +# =================================================================== + + +class TestPostToolUseStateTransitions: + """Verify state tracking across PostToolUse events.""" + + def test_chat_send_success_sets_state(self): + """Successful chat-send sets chat_send_succeeded and increments chats_sent.""" + data = _post_payload( + 'rp chat-send --message "review"', + "Chat Send completed\nNEEDS_WORK\nFix the tests.", + ) + # handle_post_tool_use calls sys.exit(0) at the end + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert state["chat_send_succeeded"] is True + assert state["chats_sent"] == 1 + + def test_chat_send_null_clears_state(self): + """chat-send returning {"chat": null} clears chat_send_succeeded.""" + # First, simulate a previous success + initial = guard.load_state("test-session") + initial["chat_send_succeeded"] = True + guard.save_state("test-session", initial) + + data = _post_payload( + 'rp chat-send --json --message "review"', + '{"chat": null}', + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert state["chat_send_succeeded"] is False + + def test_flowctl_done_tracks_task_id(self): + """flowctl done adds task to flowctl_done_called set.""" + data = _post_payload( + "flowctl done fn-1.2 --summary-file /tmp/s.md --evidence-json /tmp/e.json", + '{"status": "done", "task": "fn-1.2"}', + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert "fn-1.2" in state["flowctl_done_called"] + + def test_flowctl_done_accumulates(self): + """Multiple flowctl done calls accumulate task IDs.""" + # First task + data1 = _post_payload( + "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json", + '{"status": "done", "task": "fn-1.1"}', + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data1) + + # Second task + data2 = _post_payload( + "flowctl done fn-1.2 --summary-file /tmp/s.md --evidence-json /tmp/e.json", + '{"status": "done", "task": "fn-1.2"}', + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data2) + + state = guard.load_state("test-session") + assert "fn-1.1" in state["flowctl_done_called"] + assert "fn-1.2" in state["flowctl_done_called"] + + def test_codex_review_success(self): + """Codex review with verdict sets codex_review_succeeded.""" + data = _post_payload( + "flowctl codex impl-review fn-1.1 --base abc123", + "Review output...\nSHIP\nAll good.", + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert state["codex_review_succeeded"] is True + assert state["last_verdict"] == "SHIP" + + def test_codex_needs_work_verdict(self): + """Codex review with NEEDS_WORK verdict.""" + data = _post_payload( + "flowctl codex impl-review fn-1.1 --base abc123", + "Review output...\nNEEDS_WORK\nFix errors.", + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert state["codex_review_succeeded"] is True + assert state["last_verdict"] == "NEEDS_WORK" + + def test_receipt_write_resets_review_state(self, monkeypatch): + """Writing a receipt resets chat_send_succeeded and codex_review_succeeded.""" + monkeypatch.setenv("REVIEW_RECEIPT_PATH", "/tmp/receipts/impl-fn-1.1.json") + + # Set up state as if review succeeded + initial = guard.load_state("test-session") + initial["chat_send_succeeded"] = True + initial["codex_review_succeeded"] = True + guard.save_state("test-session", initial) + + data = _post_payload( + 'cat > "/tmp/receipts/impl-fn-1.1.json" <MAJOR_RETHINK tag.", + ) + with pytest.raises(SystemExit): + guard.handle_post_tool_use(data) + + state = guard.load_state("test-session") + assert state["last_verdict"] == "MAJOR_RETHINK" + + +# =================================================================== +# Edge cases (acceptance criterion) +# =================================================================== + + +class TestEdgeCases: + """Edge cases: done in comments, quoted strings, multi-line commands.""" + + def test_done_in_comment_not_blocked(self): + """'done' in a shell comment should not trigger flowctl done checks.""" + # This command has "done" but NOT as "flowctl done" or "FLOWCTL done" + data = _pre_payload("echo 'task is done' # just a comment") + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_done_in_echo_not_blocked(self): + """'done' as part of an echo statement (no flowctl context) is fine.""" + data = _pre_payload('echo "all tasks done successfully"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + def test_codex_in_string_blocked(self): + """'codex' word boundary match catches standalone usage.""" + # The guard uses \\bcodex\\b, so 'codex' as a standalone word matches + data = _pre_payload('codex exec "check code"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_flowctl_done_in_multiline(self): + """flowctl done in a multi-line command is still checked.""" + cmd = ( + "# First line\n" + "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json\n" + "# Last line" + ) + data = _pre_payload(cmd) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 # Allowed because all flags present + + def test_flowctl_done_multiline_missing_evidence(self): + """flowctl done in multi-line without evidence is blocked.""" + cmd = ( + "# First line\n" + "flowctl done fn-1.1 --summary-file /tmp/s.md\n" + "# Last line" + ) + data = _pre_payload(cmd) + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_FLOWCTL_env_var_done(self): + """$FLOWCTL done is still validated (environment variable invocation).""" + data = _pre_payload('"$FLOWCTL" done fn-1.1 --summary-file /tmp/s.md') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 # Missing --evidence-json + + def test_chat_send_json_in_message_body(self): + """--json flag after chat-send is blocked even with surrounding text.""" + data = _pre_payload('rp chat-send --mode review --json --message "test"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 2 + + def test_new_chat_allowed_when_zero_chats(self): + """--new-chat is only blocked on re-reviews (chats_sent > 0).""" + # chats_sent defaults to 0 + data = _pre_payload('rp chat-send --new-chat --message "first"') + with pytest.raises(SystemExit) as exc: + guard.handle_pre_tool_use(data) + assert exc.value.code == 0 + + +# =================================================================== +# Protected file checks +# =================================================================== + + +class TestProtectedFiles: + """Block Edit/Write to protected workflow files.""" + + def test_edit_ralph_guard_blocked(self): + """Cannot edit ralph-guard.py.""" + data = _edit_payload("/path/to/scripts/hooks/ralph-guard.py") + with pytest.raises(SystemExit) as exc: + guard.handle_protected_file_check(data) + assert exc.value.code == 2 + + def test_edit_flowctl_py_blocked(self): + """Cannot edit flowctl.py.""" + data = _edit_payload("/path/to/scripts/flowctl.py") + with pytest.raises(SystemExit) as exc: + guard.handle_protected_file_check(data) + assert exc.value.code == 2 + + def test_edit_hooks_json_blocked(self): + """Cannot edit hooks/hooks.json.""" + data = _edit_payload("/path/to/hooks/hooks.json") + with pytest.raises(SystemExit) as exc: + guard.handle_protected_file_check(data) + assert exc.value.code == 2 + + def test_edit_flowctl_dir_blocked(self): + """Cannot edit the flowctl directory itself (endswith /flowctl/).""" + # The pattern "/flowctl/" matches paths ending with "/flowctl/" + # (e.g., a directory path), not files inside it + data = _edit_payload("/path/to/scripts/flowctl/") + with pytest.raises(SystemExit) as exc: + guard.handle_protected_file_check(data) + assert exc.value.code == 2 + + def test_edit_normal_file_allowed(self): + """Editing a normal file is not blocked (no exit).""" + data = _edit_payload("/path/to/src/main.py") + # Should return None (no exit) since file is not protected + result = guard.handle_protected_file_check(data) + assert result is None + + +# =================================================================== +# parse_receipt_path +# =================================================================== + + +class TestParseReceiptPath: + """Test receipt path parsing for type/id extraction.""" + + def test_plan_receipt_legacy(self): + """plan-fn-1.json -> (plan_review, fn-1)""" + assert guard.parse_receipt_path("plan-fn-1.json") == ("plan_review", "fn-1") + + def test_impl_receipt_legacy(self): + """impl-fn-1.2.json -> (impl_review, fn-1.2)""" + assert guard.parse_receipt_path("impl-fn-1.2.json") == ("impl_review", "fn-1.2") + + def test_completion_receipt_legacy(self): + """completion-fn-1.json -> (completion_review, fn-1)""" + assert guard.parse_receipt_path("completion-fn-1.json") == ( + "completion_review", + "fn-1", + ) + + def test_impl_receipt_with_slug(self): + """impl-fn-4-flowctl-comprehensive-optimization-and.3.json -> (impl_review, fn-4-..-.3)""" + rtype, rid = guard.parse_receipt_path( + "impl-fn-4-flowctl-comprehensive-optimization-and.3.json" + ) + assert rtype == "impl_review" + assert rid.startswith("fn-4") + assert rid.endswith(".3") + + def test_plan_receipt_with_slug(self): + """plan-fn-4-flowctl-comprehensive.json""" + rtype, rid = guard.parse_receipt_path( + "plan-fn-4-flowctl-comprehensive.json" + ) + assert rtype == "plan_review" + assert "fn-4" in rid + + def test_unknown_format_fallback(self): + """Unknown filename pattern returns fallback.""" + assert guard.parse_receipt_path("random-file.json") == ("impl_review", "UNKNOWN") + + +# =================================================================== +# State persistence +# =================================================================== + + +class TestStatePersistence: + """Test state load/save round-trip and defaults.""" + + def test_fresh_state_defaults(self): + """Fresh state has expected defaults.""" + state = guard.load_state("fresh-session") + assert state["chats_sent"] == 0 + assert state["last_verdict"] is None + assert state["chat_send_succeeded"] is False + assert state["codex_review_succeeded"] is False + assert isinstance(state["flowctl_done_called"], set) + assert len(state["flowctl_done_called"]) == 0 + # Clean up + _reset_state("fresh-session") + + def test_round_trip_with_set(self): + """State with set survives JSON round-trip.""" + state = guard.load_state("rt-session") + state["flowctl_done_called"] = {"fn-1.1", "fn-1.2"} + state["chat_send_succeeded"] = True + guard.save_state("rt-session", state) + + loaded = guard.load_state("rt-session") + assert loaded["chat_send_succeeded"] is True + assert "fn-1.1" in loaded["flowctl_done_called"] + assert "fn-1.2" in loaded["flowctl_done_called"] + # Clean up + sf = guard.get_state_file("rt-session") + if sf.exists(): + sf.unlink() + + def test_corrupt_state_returns_defaults(self): + """Corrupt state file returns default state.""" + state_file = guard.get_state_file("corrupt-session") + state_file.write_text("not valid json{{{") + state = guard.load_state("corrupt-session") + assert state["chats_sent"] == 0 + assert state["chat_send_succeeded"] is False + # Clean up + if state_file.exists(): + state_file.unlink() + + +# =================================================================== +# Stop event +# =================================================================== + + +class TestStopEvent: + """Test Stop handler behavior.""" + + def test_stop_with_no_receipt_path(self): + """Stop with no REVIEW_RECEIPT_PATH set exits cleanly.""" + data = _stop_payload() + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("REVIEW_RECEIPT_PATH", None) + with pytest.raises(SystemExit) as exc: + guard.handle_stop(data) + assert exc.value.code == 0 + + def test_stop_hook_active_prevents_loop(self): + """stop_hook_active=True exits immediately (infinite loop guard).""" + data = _stop_payload(stop_hook_active=True) + with pytest.raises(SystemExit) as exc: + guard.handle_stop(data) + assert exc.value.code == 0 + + def test_stop_cleans_up_state_file(self): + """Stop cleans up the session state file.""" + # Create state + state = guard.load_state("test-session") + state["chats_sent"] = 5 + guard.save_state("test-session", state) + assert guard.get_state_file("test-session").exists() + + data = _stop_payload() + with patch.dict(os.environ, {}, clear=False): + os.environ.pop("REVIEW_RECEIPT_PATH", None) + with pytest.raises(SystemExit): + guard.handle_stop(data) + + assert not guard.get_state_file("test-session").exists() From fd38ca32b4a3124168df04c5b41eb4d2d8cd6401 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 12:04:02 +0800 Subject: [PATCH 4/6] fix(review): remove dead code from prompts.py and add command tests - Remove dead cmd_codex_completion_review (~220 lines) from prompts.py (active version lives in commands.py, dead copy was from pre-refactor) - Remove 6 stale imports only used by the dead function (json, Any, EPICS_DIR, SPECS_DIR, TASKS_DIR, load_json_or_exit, get_flow_dir, gather_context_hints) - Add test_admin.py: 12 tests for validate_task_spec_headings (valid, missing, duplicate, edge cases) - Add test_task.py: 10 tests for patch_task_section (replace, add, duplicate error, heading strip, edge cases) - Add test_review.py: 33 tests for parse_codex_verdict, resolve_codex_sandbox, is_sandbox_failure (all branches, env vars, JSON output parsing) Note: import shutil was already present in commands.py (fixed by prior commit) Task: fn-4-flowctl-comprehensive-optimization-and.2 --- scripts/flowctl/cli.py | 14 ++ scripts/flowctl/commands/admin.py | 234 ++++++++++++++++++++- scripts/flowctl/commands/epic.py | 59 ++++++ scripts/flowctl/commands/review/prompts.py | 229 +------------------- scripts/flowctl/tests/test_admin.py | 106 ++++++++++ scripts/flowctl/tests/test_review.py | 195 +++++++++++++++++ scripts/flowctl/tests/test_task.py | 114 ++++++++++ 7 files changed, 720 insertions(+), 231 deletions(-) create mode 100644 scripts/flowctl/tests/test_admin.py create mode 100644 scripts/flowctl/tests/test_review.py create mode 100644 scripts/flowctl/tests/test_task.py diff --git a/scripts/flowctl/cli.py b/scripts/flowctl/cli.py index 5556222a..6b4b1cc7 100644 --- a/scripts/flowctl/cli.py +++ b/scripts/flowctl/cli.py @@ -18,6 +18,7 @@ from flowctl.commands.admin import ( cmd_init, cmd_detect, + cmd_doctor, cmd_status, cmd_ralph_pause, cmd_ralph_resume, @@ -45,6 +46,7 @@ cmd_epic_rm_dep, cmd_epic_set_backend, cmd_epic_close, + cmd_epic_reopen, cmd_epic_archive, cmd_epic_clean, ) @@ -336,6 +338,11 @@ def main() -> None: p_epic_close.add_argument("--json", action="store_true", help="JSON output") p_epic_close.set_defaults(func=cmd_epic_close) + p_epic_reopen = epic_sub.add_parser("reopen", help="Reopen a closed epic") + p_epic_reopen.add_argument("id", help="Epic ID (e.g., fn-1, fn-1-add-auth)") + p_epic_reopen.add_argument("--json", action="store_true", help="JSON output") + p_epic_reopen.set_defaults(func=cmd_epic_reopen) + p_epic_archive = epic_sub.add_parser( "archive", help="Archive closed epic to .flow/.archive/" ) @@ -700,6 +707,13 @@ def main() -> None: p_validate.add_argument("--json", action="store_true", help="JSON output") p_validate.set_defaults(func=cmd_validate) + # doctor + p_doctor = subparsers.add_parser( + "doctor", help="Run comprehensive state health diagnostics" + ) + p_doctor.add_argument("--json", action="store_true", help="JSON output") + p_doctor.set_defaults(func=cmd_doctor) + # checkpoint p_checkpoint = subparsers.add_parser("checkpoint", help="Checkpoint commands") checkpoint_sub = p_checkpoint.add_subparsers(dest="checkpoint_cmd", required=True) diff --git a/scripts/flowctl/commands/admin.py b/scripts/flowctl/commands/admin.py index c6163aad..77868a0f 100644 --- a/scripts/flowctl/commands/admin.py +++ b/scripts/flowctl/commands/admin.py @@ -1,10 +1,13 @@ -"""Admin commands: init, detect, status, ralph control, config, review-backend, validate.""" +"""Admin commands: init, detect, status, ralph control, config, review-backend, validate, doctor.""" import argparse import json import os import re +import subprocess import sys +import tempfile +from datetime import datetime from pathlib import Path from typing import Optional @@ -25,6 +28,7 @@ deep_merge, get_config, get_default_config, + load_flow_config, set_config, ) from flowctl.core.ids import is_epic_id, is_task_id, normalize_epic @@ -36,8 +40,8 @@ load_json, load_json_or_exit, ) -from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root -from flowctl.core.state import load_task_with_state +from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root, get_state_dir +from flowctl.core.state import get_state_store, load_task_with_state from flowctl.commands.stack import detect_stack @@ -949,3 +953,227 @@ def cmd_validate(args: argparse.Namespace) -> None: # Exit with non-zero if validation failed if not valid: sys.exit(1) + + +# --- Doctor command --- + + +def cmd_doctor(args: argparse.Namespace) -> None: + """Run comprehensive state health diagnostics (superset of validate --all).""" + if not ensure_flow_exists(): + error_exit( + ".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json + ) + + flow_dir = get_flow_dir() + checks: list[dict] = [] + + def add_check(name: str, status: str, message: str) -> None: + checks.append({"name": name, "status": status, "message": message}) + + # --- Check 1: Run validate --all internally --- + import io as _io + import contextlib + + fake_args = argparse.Namespace(epic=None, all=True, json=True) + validate_output = _io.StringIO() + validate_passed = True + try: + with contextlib.redirect_stdout(validate_output): + cmd_validate(fake_args) + except SystemExit as e: + if e.code != 0: + validate_passed = False + + if validate_passed: + add_check("validate", "pass", "All epics and tasks validated successfully") + else: + # Parse the validate output for details + try: + vdata = json.loads(validate_output.getvalue()) + err_count = vdata.get("total_errors", 0) + add_check( + "validate", "fail", + f"Validation found {err_count} error(s). Run 'flowctl validate --all' for details" + ) + except (json.JSONDecodeError, ValueError): + add_check("validate", "fail", "Validation failed (could not parse output)") + + # --- Check 2: State-dir accessibility --- + try: + state_dir = get_state_dir() + state_dir.mkdir(parents=True, exist_ok=True) + # Test write access + test_file = state_dir / ".doctor-probe" + test_file.write_text("probe", encoding="utf-8") + test_file.unlink() + add_check("state_dir_access", "pass", f"State dir accessible: {state_dir}") + except (OSError, PermissionError) as e: + add_check("state_dir_access", "fail", f"State dir not accessible: {e}") + + # --- Check 3: Orphaned state files --- + try: + store = get_state_store() + runtime_ids = store.list_runtime_files() + tasks_dir = flow_dir / TASKS_DIR + orphaned = [] + for rid in runtime_ids: + task_def_path = tasks_dir / f"{rid}.json" + if not task_def_path.exists(): + orphaned.append(rid) + if orphaned: + add_check( + "orphaned_state", "warn", + f"{len(orphaned)} orphaned state file(s): {', '.join(orphaned[:5])}" + + (f" (+{len(orphaned) - 5} more)" if len(orphaned) > 5 else "") + ) + else: + add_check("orphaned_state", "pass", "No orphaned state files") + except Exception as e: + add_check("orphaned_state", "warn", f"Could not check orphaned state: {e}") + + # --- Check 4: Stale in_progress tasks (>7 days) --- + try: + stale = [] + tasks_dir = flow_dir / TASKS_DIR + if tasks_dir.exists(): + for task_file in tasks_dir.glob("fn-*.json"): + task_id = task_file.stem + if not is_task_id(task_id): + continue + try: + task_data = load_task_with_state(task_id, use_json=True) + except SystemExit: + continue + if task_data.get("status") != "in_progress": + continue + updated = task_data.get("updated_at") or task_data.get("claimed_at") + if updated: + try: + # Parse ISO timestamp + ts = updated.replace("Z", "+00:00") + task_time = datetime.fromisoformat(ts) + now = datetime.utcnow().replace( + tzinfo=task_time.tzinfo + ) + age_days = (now - task_time).days + if age_days > 7: + stale.append(f"{task_id} ({age_days}d)") + except (ValueError, TypeError): + pass + if stale: + add_check( + "stale_tasks", "warn", + f"{len(stale)} task(s) in_progress for >7 days: {', '.join(stale[:5])}" + + (f" (+{len(stale) - 5} more)" if len(stale) > 5 else "") + ) + else: + add_check("stale_tasks", "pass", "No stale in_progress tasks") + except Exception as e: + add_check("stale_tasks", "warn", f"Could not check stale tasks: {e}") + + # --- Check 5: Lock file accumulation --- + try: + state_dir = get_state_dir() + locks_dir = state_dir / "locks" + lock_count = 0 + if locks_dir.exists(): + lock_count = sum(1 for _ in locks_dir.glob("*.lock")) + if lock_count > 50: + add_check( + "lock_files", "warn", + f"{lock_count} lock files in state dir (consider cleanup)" + ) + else: + add_check( + "lock_files", "pass", + f"{lock_count} lock file(s) in state dir" + ) + except Exception as e: + add_check("lock_files", "warn", f"Could not check lock files: {e}") + + # --- Check 6: Config validity --- + try: + config_path = flow_dir / CONFIG_FILE + if config_path.exists(): + raw_text = config_path.read_text(encoding="utf-8") + parsed = json.loads(raw_text) + if not isinstance(parsed, dict): + add_check("config", "fail", "config.json is not a JSON object") + else: + # Check for known top-level keys + known_keys = set(get_default_config().keys()) + unknown = set(parsed.keys()) - known_keys + if unknown: + add_check( + "config", "warn", + f"Unknown config keys: {', '.join(sorted(unknown))}" + ) + else: + add_check("config", "pass", "config.json valid with known keys") + else: + add_check("config", "warn", "config.json missing (run 'flowctl init')") + except json.JSONDecodeError as e: + add_check("config", "fail", f"config.json invalid JSON: {e}") + except Exception as e: + add_check("config", "warn", f"Could not check config: {e}") + + # --- Check 7: git-common-dir reachability --- + try: + result = subprocess.run( + ["git", "rev-parse", "--git-common-dir", "--path-format=absolute"], + capture_output=True, text=True, check=True, + ) + common_dir = Path(result.stdout.strip()) + if common_dir.exists(): + add_check( + "git_common_dir", "pass", + f"git common-dir reachable: {common_dir}" + ) + else: + add_check( + "git_common_dir", "warn", + f"git common-dir path does not exist: {common_dir}" + ) + except subprocess.CalledProcessError: + add_check( + "git_common_dir", "warn", + "Not in a git repository (git common-dir unavailable)" + ) + except FileNotFoundError: + add_check( + "git_common_dir", "warn", + "git not found on PATH" + ) + + # --- Build summary --- + summary = {"pass": 0, "warn": 0, "fail": 0} + for c in checks: + summary[c["status"]] += 1 + + overall_healthy = summary["fail"] == 0 + + if args.json: + json_output( + { + "checks": checks, + "summary": summary, + "healthy": overall_healthy, + }, + success=overall_healthy, + ) + else: + print("Doctor diagnostics:") + for c in checks: + icon = {"pass": "OK", "warn": "WARN", "fail": "FAIL"}[c["status"]] + print(f" [{icon}] {c['name']}: {c['message']}") + print() + print( + f"Summary: {summary['pass']} pass, " + f"{summary['warn']} warn, {summary['fail']} fail" + ) + if not overall_healthy: + print("Health check FAILED — resolve fail items above.") + + if not overall_healthy: + sys.exit(1) diff --git a/scripts/flowctl/commands/epic.py b/scripts/flowctl/commands/epic.py index 474a6af0..fe2c8133 100644 --- a/scripts/flowctl/commands/epic.py +++ b/scripts/flowctl/commands/epic.py @@ -872,6 +872,65 @@ def cmd_epic_archive(args: argparse.Namespace) -> None: print(f" {f}") +def cmd_epic_reopen(args: argparse.Namespace) -> None: + """Reopen a closed epic (sets status back to open).""" + if not ensure_flow_exists(): + error_exit( + ".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json + ) + + epic_id = args.id + if not is_epic_id(epic_id): + error_exit( + f"Invalid epic ID: {epic_id}. Expected format: fn-N or fn-N-slug " + f"(e.g., fn-1, fn-1-add-auth)", + use_json=args.json, + ) + + flow_dir = get_flow_dir() + epic_path = flow_dir / EPICS_DIR / f"{epic_id}.json" + + if not epic_path.exists(): + # Check if archived + archive_path = flow_dir / ".archive" / epic_id + if archive_path.exists(): + error_exit( + f"Epic {epic_id} is archived. Unarchive it first before reopening.", + use_json=args.json, + ) + error_exit(f"Epic {epic_id} not found", use_json=args.json) + + epic_data = normalize_epic( + load_json_or_exit(epic_path, f"Epic {epic_id}", use_json=args.json) + ) + + previous_status = epic_data.get("status", "unknown") + + if previous_status == "open": + error_exit( + f"Epic {epic_id} is already open (no-op protection)", + use_json=args.json, + ) + + # Set status back to open and reset completion review + epic_data["status"] = "open" + epic_data["completion_review_status"] = "unknown" + epic_data["updated_at"] = now_iso() + atomic_write_json(epic_path, epic_data) + + if args.json: + json_output( + { + "id": epic_id, + "previous_status": previous_status, + "new_status": "open", + "message": f"Epic {epic_id} reopened", + } + ) + else: + print(f"Epic {epic_id} reopened (was: {previous_status})") + + def cmd_epic_clean(args: argparse.Namespace) -> None: """Archive all closed epics at once.""" if not ensure_flow_exists(): diff --git a/scripts/flowctl/commands/review/prompts.py b/scripts/flowctl/commands/review/prompts.py index b2bf127a..25277aa4 100644 --- a/scripts/flowctl/commands/review/prompts.py +++ b/scripts/flowctl/commands/review/prompts.py @@ -1,12 +1,6 @@ """Review prompt builders for impl, plan, completion, and standalone reviews.""" -import json -from typing import Any, Optional - -from flowctl.core.constants import EPICS_DIR, SPECS_DIR, TASKS_DIR -from flowctl.core.io import load_json_or_exit -from flowctl.core.paths import get_flow_dir -from flowctl.core.git import gather_context_hints +from typing import Optional def build_review_prompt( review_type: str, @@ -567,224 +561,3 @@ def build_completion_review_prompt( return "\n\n".join(parts) - -def cmd_codex_completion_review(args: argparse.Namespace) -> None: - """Run epic completion review via codex exec. - - Verifies that all epic requirements are implemented before closing. - Two-phase approach: extract requirements, then verify coverage. - """ - if not ensure_flow_exists(): - error_exit(".flow/ does not exist", use_json=args.json) - - epic_id = args.epic - - # Validate epic ID - if not is_epic_id(epic_id): - error_exit(f"Invalid epic ID: {epic_id}", use_json=args.json) - - flow_dir = get_flow_dir() - - # Load epic spec - epic_spec_path = flow_dir / SPECS_DIR / f"{epic_id}.md" - if not epic_spec_path.exists(): - error_exit(f"Epic spec not found: {epic_spec_path}", use_json=args.json) - - epic_spec = epic_spec_path.read_text(encoding="utf-8") - - # Load task specs for this epic - tasks_dir = flow_dir / TASKS_DIR - task_specs_parts = [] - for task_file in sorted(tasks_dir.glob(f"{epic_id}.*.md")): - task_id = task_file.stem - task_content = task_file.read_text(encoding="utf-8") - task_specs_parts.append(f"### {task_id}\n\n{task_content}") - - task_specs = "\n\n---\n\n".join(task_specs_parts) if task_specs_parts else "" - - # Get base branch for diff (default to main) - base_branch = args.base if hasattr(args, "base") and args.base else "main" - - # Get diff summary - diff_summary = "" - try: - diff_result = subprocess.run( - ["git", "diff", "--stat", f"{base_branch}..HEAD"], - capture_output=True, - text=True, - cwd=get_repo_root(), - ) - if diff_result.returncode == 0: - diff_summary = diff_result.stdout.strip() - except (subprocess.CalledProcessError, OSError): - pass - - # Get actual diff content with size cap - diff_content = "" - max_diff_bytes = 50000 - try: - proc = subprocess.Popen( - ["git", "diff", f"{base_branch}..HEAD"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=get_repo_root(), - ) - diff_bytes = proc.stdout.read(max_diff_bytes + 1) - was_truncated = len(diff_bytes) > max_diff_bytes - if was_truncated: - diff_bytes = diff_bytes[:max_diff_bytes] - while proc.stdout.read(65536): - pass - stderr_bytes = proc.stderr.read() - proc.stdout.close() - proc.stderr.close() - returncode = proc.wait() - - if returncode != 0 and stderr_bytes: - diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]" - else: - diff_content = diff_bytes.decode("utf-8", errors="replace").strip() - if was_truncated: - diff_content += "\n\n... [diff truncated at 50KB]" - except (subprocess.CalledProcessError, OSError): - pass - - # Always embed changed file contents. See cmd_codex_impl_review comment - # for rationale. - changed_files = get_changed_files(base_branch) - embedded_content, embed_stats = get_embedded_file_contents(changed_files) - - # Only forbid disk reads when ALL files were fully embedded. - files_embedded = not embed_stats.get("budget_skipped") and not embed_stats.get("truncated") - prompt = build_completion_review_prompt( - epic_spec, - task_specs, - diff_summary, - diff_content, - embedded_files=embedded_content, - files_embedded=files_embedded, - ) - - # Check for existing session in receipt (indicates re-review) - receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None - session_id = None - is_rereview = False - if receipt_path: - receipt_file = Path(receipt_path) - if receipt_file.exists(): - try: - receipt_data = json.loads(receipt_file.read_text(encoding="utf-8")) - session_id = receipt_data.get("session_id") - is_rereview = session_id is not None - except (json.JSONDecodeError, Exception): - pass - - # For re-reviews, prepend instruction to re-read changed files - if is_rereview: - changed_files = get_changed_files(base_branch) - if changed_files: - rereview_preamble = build_rereview_preamble( - changed_files, "completion", files_embedded - ) - prompt = rereview_preamble + prompt - - # Resolve sandbox mode - try: - sandbox = resolve_codex_sandbox(getattr(args, "sandbox", "auto")) - except ValueError as e: - error_exit(str(e), use_json=args.json, code=2) - - # Run codex - effort = getattr(args, "effort", "high") - output, thread_id, exit_code, stderr = run_codex_exec( - prompt, session_id=session_id, sandbox=sandbox, effort=effort - ) - - # Check for sandbox failures - if is_sandbox_failure(exit_code, output, stderr): - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass - msg = ( - "Codex sandbox blocked operations. " - "Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access" - ) - error_exit(msg, use_json=args.json, code=3) - - # Handle non-sandbox failures - if exit_code != 0: - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass - msg = (stderr or output or "codex exec failed").strip() - error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2) - - # Parse verdict - verdict = parse_codex_verdict(output) - - # Fail if no verdict found - if not verdict: - if receipt_path: - try: - Path(receipt_path).unlink(missing_ok=True) - except OSError: - pass - error_exit( - "Codex review completed but no verdict found in output. " - "Expected SHIP or NEEDS_WORK", - use_json=args.json, - code=2, - ) - - # Preserve session_id for continuity (avoid clobbering on resumed sessions) - session_id_to_write = thread_id or session_id - - # Write receipt if path provided (Ralph-compatible schema) - if receipt_path: - receipt_data = { - "type": "completion_review", # Required by Ralph - "id": epic_id, # Required by Ralph - "mode": "codex", - "base": base_branch, - "verdict": verdict, - "session_id": session_id_to_write, - "timestamp": now_iso(), - "review": output, # Full review feedback for fix loop - } - # Add iteration if running under Ralph - ralph_iter = os.environ.get("RALPH_ITERATION") - if ralph_iter: - try: - receipt_data["iteration"] = int(ralph_iter) - except ValueError: - pass - Path(receipt_path).write_text( - json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8" - ) - - # Output - if args.json: - json_output( - { - "type": "completion_review", - "id": epic_id, - "base": base_branch, - "verdict": verdict, - "session_id": session_id_to_write, - "mode": "codex", - "review": output, - } - ) - else: - print(output) - print(f"\nVERDICT={verdict or 'UNKNOWN'}") - - -# ───────────────────────────────────────────────────────────────────────────── -# Checkpoint commands -# ───────────────────────────────────────────────────────────────────────────── - diff --git a/scripts/flowctl/tests/test_admin.py b/scripts/flowctl/tests/test_admin.py new file mode 100644 index 00000000..4f09dd3b --- /dev/null +++ b/scripts/flowctl/tests/test_admin.py @@ -0,0 +1,106 @@ +"""Tests for flowctl.commands.admin — validate_task_spec_headings.""" + +import pytest + +from flowctl.commands.admin import validate_task_spec_headings + + +VALID_SPEC = """\ +# fn-1.1 Some task title + +## Description +This is the description. + +## Acceptance +- [ ] Criterion A +- [ ] Criterion B + +## Done summary +TBD + +## Evidence +- Commits: +- Tests: +- PRs: +""" + + +class TestValidateTaskSpecHeadings: + """Tests for validate_task_spec_headings().""" + + def test_valid_spec_no_errors(self): + """A well-formed spec should produce zero errors.""" + errors = validate_task_spec_headings(VALID_SPEC) + assert errors == [] + + def test_missing_description(self): + """Spec without ## Description should report missing heading.""" + content = VALID_SPEC.replace("## Description\n", "") + errors = validate_task_spec_headings(content) + assert any("Missing required heading: ## Description" in e for e in errors) + + def test_missing_acceptance(self): + """Spec without ## Acceptance should report missing heading.""" + content = VALID_SPEC.replace("## Acceptance\n", "") + errors = validate_task_spec_headings(content) + assert any("Missing required heading: ## Acceptance" in e for e in errors) + + def test_missing_done_summary(self): + """Spec without ## Done summary should report missing heading.""" + content = VALID_SPEC.replace("## Done summary\n", "") + errors = validate_task_spec_headings(content) + assert any("Missing required heading: ## Done summary" in e for e in errors) + + def test_missing_evidence(self): + """Spec without ## Evidence should report missing heading.""" + content = VALID_SPEC.replace("## Evidence\n", "") + errors = validate_task_spec_headings(content) + assert any("Missing required heading: ## Evidence" in e for e in errors) + + def test_missing_all_headings(self): + """Spec with no headings at all should report 4 missing.""" + content = "# Title\n\nJust some text, no sections.\n" + errors = validate_task_spec_headings(content) + assert len(errors) == 4 + assert all("Missing required heading" in e for e in errors) + + def test_duplicate_description(self): + """Spec with duplicate ## Description should report duplicate.""" + content = VALID_SPEC + "\n## Description\nAnother description.\n" + errors = validate_task_spec_headings(content) + assert any("Duplicate heading: ## Description" in e for e in errors) + + def test_duplicate_acceptance(self): + """Spec with duplicate ## Acceptance should report duplicate.""" + content = VALID_SPEC + "\n## Acceptance\n- [ ] Extra\n" + errors = validate_task_spec_headings(content) + assert any("Duplicate heading: ## Acceptance" in e for e in errors) + + def test_heading_inside_code_block_not_matched(self): + """Headings inside code blocks should still be matched (regex is line-anchored). + + The current implementation uses line-anchored regex, so ## Description + inside a code block on its own line WILL be detected. This test documents + that known limitation rather than asserting it doesn't match. + """ + content = VALID_SPEC + "\n```\n## Description\n```\n" + errors = validate_task_spec_headings(content) + # The heading appears twice now (once real, once in code block) + assert any("Duplicate heading: ## Description" in e for e in errors) + + def test_heading_with_trailing_whitespace(self): + """Headings with trailing whitespace should still be recognized.""" + content = VALID_SPEC.replace("## Description\n", "## Description \n") + errors = validate_task_spec_headings(content) + assert errors == [] + + def test_empty_content(self): + """Empty content should report all headings as missing.""" + errors = validate_task_spec_headings("") + assert len(errors) == 4 + + def test_similar_heading_not_confused(self): + """## Descriptions (plural) should NOT satisfy ## Description.""" + content = VALID_SPEC.replace("## Description\n", "## Descriptions\n") + errors = validate_task_spec_headings(content) + assert any("Missing required heading: ## Description" in e for e in errors) diff --git a/scripts/flowctl/tests/test_review.py b/scripts/flowctl/tests/test_review.py new file mode 100644 index 00000000..7568cca4 --- /dev/null +++ b/scripts/flowctl/tests/test_review.py @@ -0,0 +1,195 @@ +"""Tests for flowctl.commands.review.codex_utils — pure function tests.""" + +import os + +import pytest + +from flowctl.commands.review.codex_utils import ( + is_sandbox_failure, + parse_codex_verdict, + resolve_codex_sandbox, +) + + +# --- parse_codex_verdict --- + + +class TestParseCodexVerdict: + """Tests for parse_codex_verdict().""" + + def test_ship_verdict(self): + output = "Review looks good.\nSHIP\n" + assert parse_codex_verdict(output) == "SHIP" + + def test_needs_work_verdict(self): + output = "Found issues.\nNEEDS_WORK\n" + assert parse_codex_verdict(output) == "NEEDS_WORK" + + def test_major_rethink_verdict(self): + output = "Fundamental problems.\nMAJOR_RETHINK\n" + assert parse_codex_verdict(output) == "MAJOR_RETHINK" + + def test_no_verdict_returns_none(self): + output = "Some review output without a verdict tag." + assert parse_codex_verdict(output) is None + + def test_empty_output_returns_none(self): + assert parse_codex_verdict("") is None + + def test_malformed_verdict_tag(self): + """Verdict tag with wrong content should return None.""" + output = "APPROVE" + assert parse_codex_verdict(output) is None + + def test_verdict_in_middle_of_text(self): + """Verdict embedded in longer output.""" + output = ( + "## Review\n\nLooks great.\n\n" + "### Summary\nAll checks pass.\n\n" + "SHIP\n\n" + "End of review." + ) + assert parse_codex_verdict(output) == "SHIP" + + def test_multiple_verdicts_returns_first(self): + """If multiple verdict tags exist, return the first one.""" + output = "NEEDS_WORK\nAfter fixes:\nSHIP" + # re.search returns the first match + assert parse_codex_verdict(output) == "NEEDS_WORK" + + def test_verdict_case_sensitive(self): + """Verdict values are case-sensitive — lowercase should not match.""" + output = "ship" + assert parse_codex_verdict(output) is None + + def test_partial_verdict_tag(self): + """Incomplete verdict tag should not match.""" + output = "SHIP" + assert parse_codex_verdict(output) is None + + +# --- resolve_codex_sandbox --- + + +class TestResolveCodexSandbox: + """Tests for resolve_codex_sandbox().""" + + def test_explicit_read_only(self): + assert resolve_codex_sandbox("read-only") == "read-only" + + def test_explicit_danger_full_access(self): + assert resolve_codex_sandbox("danger-full-access") == "danger-full-access" + + def test_explicit_workspace_write(self): + assert resolve_codex_sandbox("workspace-write") == "workspace-write" + + def test_invalid_mode_raises(self): + with pytest.raises(ValueError, match="Invalid sandbox value"): + resolve_codex_sandbox("invalid-mode") + + def test_auto_resolves_on_unix(self, monkeypatch): + """On unix (os.name != 'nt'), auto resolves to read-only.""" + monkeypatch.setattr(os, "name", "posix") + monkeypatch.delenv("CODEX_SANDBOX", raising=False) + assert resolve_codex_sandbox("auto") == "read-only" + + def test_auto_resolves_on_windows(self, monkeypatch): + """On Windows (os.name == 'nt'), auto resolves to danger-full-access.""" + monkeypatch.setattr(os, "name", "nt") + monkeypatch.delenv("CODEX_SANDBOX", raising=False) + assert resolve_codex_sandbox("auto") == "danger-full-access" + + def test_env_var_overrides_auto(self, monkeypatch): + """CODEX_SANDBOX env var should override auto resolution.""" + monkeypatch.setenv("CODEX_SANDBOX", "workspace-write") + assert resolve_codex_sandbox("auto") == "workspace-write" + + def test_explicit_overrides_env(self, monkeypatch): + """Explicit CLI value should override env var.""" + monkeypatch.setenv("CODEX_SANDBOX", "workspace-write") + assert resolve_codex_sandbox("read-only") == "read-only" + + def test_invalid_env_var_raises(self, monkeypatch): + """Invalid CODEX_SANDBOX env value should raise ValueError.""" + monkeypatch.setenv("CODEX_SANDBOX", "bad-value") + with pytest.raises(ValueError, match="Invalid CODEX_SANDBOX value"): + resolve_codex_sandbox("auto") + + def test_empty_string_treated_as_auto(self, monkeypatch): + """Empty string should resolve like auto.""" + monkeypatch.setattr(os, "name", "posix") + monkeypatch.delenv("CODEX_SANDBOX", raising=False) + assert resolve_codex_sandbox("") == "read-only" + + def test_whitespace_stripped(self, monkeypatch): + """Leading/trailing whitespace should be stripped.""" + assert resolve_codex_sandbox(" read-only ") == "read-only" + + +# --- is_sandbox_failure --- + + +class TestIsSandboxFailure: + """Tests for is_sandbox_failure().""" + + def test_success_exit_code_never_sandbox_failure(self): + """Exit code 0 should never be detected as sandbox failure.""" + assert is_sandbox_failure(0, "", "blocked by policy") is False + + def test_blocked_by_policy_in_stderr(self): + assert is_sandbox_failure(1, "", "Error: blocked by policy") is True + + def test_rejected_by_policy_in_stderr(self): + assert is_sandbox_failure(1, "", "rejected by policy: read") is True + + def test_filesystem_read_blocked_in_stderr(self): + assert is_sandbox_failure(1, "", "filesystem read is blocked") is True + + def test_filesystem_write_blocked_in_stderr(self): + assert is_sandbox_failure(1, "", "filesystem write is blocked") is True + + def test_shell_command_blocked_in_stderr(self): + assert is_sandbox_failure(1, "", "shell command was blocked") is True + + def test_appcontainer_in_stderr(self): + assert is_sandbox_failure(1, "", "AppContainer restriction") is True + + def test_unrelated_error_not_sandbox(self): + """Non-sandbox errors should return False.""" + assert is_sandbox_failure(1, "", "connection refused") is False + + def test_empty_stderr_not_sandbox(self): + assert is_sandbox_failure(1, "", "") is False + + def test_json_stdout_failed_item_with_policy(self): + """Failed item in JSON stdout with rejection message.""" + import json + + item = { + "type": "item.completed", + "item": { + "status": "failed", + "aggregated_output": "rejected by policy: filesystem read", + }, + } + stdout = json.dumps(item) + assert is_sandbox_failure(1, stdout, "") is True + + def test_json_stdout_success_item_not_sandbox(self): + """Successful item in JSON stdout should not be sandbox failure.""" + import json + + item = { + "type": "item.completed", + "item": { + "status": "completed", + "aggregated_output": "all good", + }, + } + stdout = json.dumps(item) + assert is_sandbox_failure(1, stdout, "") is False + + def test_case_insensitive_pattern_matching(self): + """Patterns should match case-insensitively.""" + assert is_sandbox_failure(1, "", "BLOCKED BY POLICY") is True + assert is_sandbox_failure(1, "", "Blocked By Policy") is True diff --git a/scripts/flowctl/tests/test_task.py b/scripts/flowctl/tests/test_task.py new file mode 100644 index 00000000..3c1e9056 --- /dev/null +++ b/scripts/flowctl/tests/test_task.py @@ -0,0 +1,114 @@ +"""Tests for flowctl.commands.task — patch_task_section.""" + +import pytest + +from flowctl.commands.task import patch_task_section + + +SAMPLE_SPEC = """\ +# fn-1.1 Some task + +## Description +Old description content. + +## Acceptance +- [ ] Criterion A + +## Done summary +TBD + +## Evidence +- Commits: +- Tests: +- PRs:""" + + +class TestPatchTaskSection: + """Tests for patch_task_section().""" + + def test_replace_description(self): + """Replacing ## Description content should preserve other sections.""" + result = patch_task_section(SAMPLE_SPEC, "## Description", "New description.") + assert "New description." in result + assert "Old description content." not in result + # Other sections preserved + assert "## Acceptance" in result + assert "Criterion A" in result + assert "## Done summary" in result + assert "## Evidence" in result + + def test_replace_acceptance(self): + """Replacing ## Acceptance content.""" + result = patch_task_section( + SAMPLE_SPEC, "## Acceptance", "- [ ] New criterion" + ) + assert "- [ ] New criterion" in result + assert "Criterion A" not in result + + def test_replace_done_summary(self): + """Replacing ## Done summary content.""" + result = patch_task_section( + SAMPLE_SPEC, "## Done summary", "Implemented the feature." + ) + assert "Implemented the feature." in result + assert "\nTBD\n" not in result + + def test_add_missing_section(self): + """Patching a section that doesn't exist should auto-append it.""" + content = "# Title\n\n## Description\nSome text.\n" + result = patch_task_section(content, "## New Section", "New content here.") + assert "## New Section" in result + assert "New content here." in result + # Original content preserved + assert "## Description" in result + assert "Some text." in result + + def test_duplicate_heading_raises(self): + """Patching when target heading appears multiple times should raise ValueError.""" + content = SAMPLE_SPEC + "\n\n## Description\nDuplicate." + with pytest.raises(ValueError, match="duplicate heading"): + patch_task_section(content, "## Description", "New text") + + def test_strips_heading_from_new_content(self): + """If new_content starts with the section heading, it should be stripped.""" + result = patch_task_section( + SAMPLE_SPEC, "## Description", "## Description\nContent after heading." + ) + # Should not have double heading + lines = result.split("\n") + desc_count = sum(1 for l in lines if l.strip() == "## Description") + assert desc_count == 1 + assert "Content after heading." in result + + def test_multiline_replacement(self): + """Multi-line content should be preserved correctly.""" + new_content = "Line 1\nLine 2\nLine 3" + result = patch_task_section(SAMPLE_SPEC, "## Description", new_content) + assert "Line 1" in result + assert "Line 2" in result + assert "Line 3" in result + + def test_empty_replacement(self): + """Replacing with empty content should clear the section body.""" + result = patch_task_section(SAMPLE_SPEC, "## Description", "") + # Section heading still present, but body cleared + assert "## Description" in result + # Next section should follow + assert "## Acceptance" in result + + def test_preserves_title_line(self): + """The title line (# fn-1.1 ...) should be preserved.""" + result = patch_task_section(SAMPLE_SPEC, "## Description", "New desc.") + assert result.startswith("# fn-1.1 Some task") + + def test_trailing_whitespace_stripped(self): + """New content trailing whitespace should be stripped.""" + result = patch_task_section( + SAMPLE_SPEC, "## Description", "Content with trailing \n\n\n" + ) + # Check that trailing newlines from new_content are stripped + desc_idx = result.index("## Description") + acc_idx = result.index("## Acceptance") + between = result[desc_idx:acc_idx] + # Should not have excessive trailing newlines + assert not between.endswith("\n\n\n\n") From c4483b62be134a9638da2fbe0ac74ccd0fa172fa Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 12:14:46 +0800 Subject: [PATCH 5/6] feat(perf): batch task loading, memoize get_state_dir, DX prefixes - Memoize get_state_dir() per cwd with _reset_state_dir_cache() for tests - Add load_all_tasks_with_state() using os.scandir for single-pass loading - Replace per-file task loading in cmd_tasks/cmd_list/cmd_epics with batch - Add [iter N] [fn-X.Y] prefix to watch-filter output from env vars - Use $RUN_DIR/guard-debug.log when RUN_DIR is set in ralph-guard Task: fn-4-flowctl-comprehensive-optimization-and.8 --- scripts/flowctl/commands/query.py | 114 +++++++----------- scripts/flowctl/core/paths.py | 28 ++++- scripts/flowctl/core/state.py | 64 +++++++++- scripts/hooks/ralph-guard.py | 17 ++- .../templates/watch-filter.py | 11 +- 5 files changed, 157 insertions(+), 77 deletions(-) diff --git a/scripts/flowctl/commands/query.py b/scripts/flowctl/commands/query.py index 09718624..0a8d58aa 100644 --- a/scripts/flowctl/commands/query.py +++ b/scripts/flowctl/commands/query.py @@ -1,8 +1,6 @@ """Cross-cutting query commands: show, epics, files, tasks, list, cat.""" import argparse -import re -from pathlib import Path from flowctl.core.constants import ( EPICS_DIR, @@ -23,6 +21,7 @@ ) from flowctl.core.paths import ensure_flow_exists, get_flow_dir from flowctl.core.state import ( + load_all_tasks_with_state, load_task_with_state, lock_files, unlock_files, @@ -132,19 +131,10 @@ def cmd_epics(args: argparse.Namespace) -> None: epic_file, f"Epic {epic_file.stem}", use_json=args.json ) ) - # Count tasks (with merged runtime state) - tasks_dir = flow_dir / TASKS_DIR - task_count = 0 - done_count = 0 - if tasks_dir.exists(): - for task_file in tasks_dir.glob(f"{epic_data['id']}.*.json"): - task_id = task_file.stem - if not is_task_id(task_id): - continue # Skip non-task files (e.g., fn-1.2-review.json) - task_data = load_task_with_state(task_id, use_json=args.json) - task_count += 1 - if task_data.get("status") == "done": - done_count += 1 + # Count tasks via batch loading (single directory scan per epic) + epic_tasks = load_all_tasks_with_state(epic_id=epic_data['id']) + task_count = len(epic_tasks) + done_count = sum(1 for t in epic_tasks.values() if t.get("status") == "done") epics.append( { @@ -252,37 +242,30 @@ def cmd_tasks(args: argparse.Namespace) -> None: ".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json ) - flow_dir = get_flow_dir() - tasks_dir = flow_dir / TASKS_DIR + # Batch load all tasks in a single directory scan + all_loaded = load_all_tasks_with_state(epic_id=args.epic if args.epic else None) tasks = [] - if tasks_dir.exists(): - pattern = f"{args.epic}.*.json" if args.epic else "fn-*.json" - for task_file in sorted(tasks_dir.glob(pattern)): - task_id = task_file.stem - if not is_task_id(task_id): - continue # Skip non-task files (e.g., fn-1.2-review.json) - # Load task with merged runtime state - task_data = load_task_with_state(task_id, use_json=args.json) - if "id" not in task_data: - continue # Skip artifact files (GH-21) - # Filter by status if requested - if args.status and task_data["status"] != args.status: - continue - # Filter by domain if requested - if hasattr(args, "domain") and args.domain and task_data.get("domain") != args.domain: - continue - tasks.append( - { - "id": task_data["id"], - "epic": task_data["epic"], - "title": task_data["title"], - "status": task_data["status"], - "priority": task_data.get("priority"), - "domain": task_data.get("domain"), - "depends_on": task_data.get("depends_on", task_data.get("deps", [])), - } - ) + for task_id, task_data in all_loaded.items(): + if "id" not in task_data: + continue # Skip artifact files (GH-21) + # Filter by status if requested + if args.status and task_data["status"] != args.status: + continue + # Filter by domain if requested + if hasattr(args, "domain") and args.domain and task_data.get("domain") != args.domain: + continue + tasks.append( + { + "id": task_data["id"], + "epic": task_data["epic"], + "title": task_data["title"], + "status": task_data["status"], + "priority": task_data.get("priority"), + "domain": task_data.get("domain"), + "depends_on": task_data.get("depends_on", task_data.get("deps", [])), + } + ) # Sort tasks by epic number then task number def task_sort_key(t): @@ -321,7 +304,6 @@ def cmd_list(args: argparse.Namespace) -> None: flow_dir = get_flow_dir() epics_dir = flow_dir / EPICS_DIR - tasks_dir = flow_dir / TASKS_DIR # Load all epics epics = [] @@ -341,31 +323,27 @@ def epic_sort_key(e): epics.sort(key=epic_sort_key) - # Load all tasks grouped by epic (with merged runtime state) + # Batch load all tasks in a single directory scan (with merged runtime state) + all_loaded = load_all_tasks_with_state() tasks_by_epic = {} all_tasks = [] - if tasks_dir.exists(): - for task_file in sorted(tasks_dir.glob("fn-*.json")): - task_id = task_file.stem - if not is_task_id(task_id): - continue # Skip non-task files (e.g., fn-1.2-review.json) - task_data = load_task_with_state(task_id, use_json=args.json) - if "id" not in task_data or "epic" not in task_data: - continue # Skip artifact files (GH-21) - epic_id = task_data["epic"] - if epic_id not in tasks_by_epic: - tasks_by_epic[epic_id] = [] - tasks_by_epic[epic_id].append(task_data) - all_tasks.append( - { - "id": task_data["id"], - "epic": task_data["epic"], - "title": task_data["title"], - "status": task_data["status"], - "priority": task_data.get("priority"), - "depends_on": task_data.get("depends_on", task_data.get("deps", [])), - } - ) + for task_id, task_data in all_loaded.items(): + if "id" not in task_data or "epic" not in task_data: + continue # Skip artifact files (GH-21) + epic_id = task_data["epic"] + if epic_id not in tasks_by_epic: + tasks_by_epic[epic_id] = [] + tasks_by_epic[epic_id].append(task_data) + all_tasks.append( + { + "id": task_data["id"], + "epic": task_data["epic"], + "title": task_data["title"], + "status": task_data["status"], + "priority": task_data.get("priority"), + "depends_on": task_data.get("depends_on", task_data.get("deps", [])), + } + ) # Sort tasks within each epic for epic_id in tasks_by_epic: diff --git a/scripts/flowctl/core/paths.py b/scripts/flowctl/core/paths.py index 811b8f0f..4ebb0d1e 100644 --- a/scripts/flowctl/core/paths.py +++ b/scripts/flowctl/core/paths.py @@ -6,6 +6,15 @@ from flowctl.core.constants import FLOW_DIR +# Module-level cache for get_state_dir(), keyed by cwd string. +# CLI invocations are short-lived so no expiry needed. +_state_dir_cache: dict[str, Path] = {} + + +def _reset_state_dir_cache() -> None: + """Clear the get_state_dir() memoization cache. For testing.""" + _state_dir_cache.clear() + def get_repo_root() -> Path: """Find git repo root.""" @@ -35,14 +44,23 @@ def ensure_flow_exists() -> bool: def get_state_dir() -> Path: """Get state directory for runtime task state. + Results are memoized per working directory. Call _reset_state_dir_cache() + to clear (e.g. in tests that change directories). + Resolution order: 1. FLOW_STATE_DIR env var (explicit override for orchestrators) 2. git common-dir (shared across all worktrees automatically) 3. Fallback to .flow/state for non-git repos """ + cache_key = os.getcwd() + if cache_key in _state_dir_cache: + return _state_dir_cache[cache_key] + # 1. Explicit override if state_dir := os.environ.get("FLOW_STATE_DIR"): - return Path(state_dir).resolve() + resolved = Path(state_dir).resolve() + _state_dir_cache[cache_key] = resolved + return resolved # 2. Git common-dir (shared across worktrees) try: @@ -53,9 +71,13 @@ def get_state_dir() -> Path: check=True, ) common = result.stdout.strip() - return Path(common) / "flow-state" + resolved = Path(common) / "flow-state" + _state_dir_cache[cache_key] = resolved + return resolved except subprocess.CalledProcessError: pass # 3. Fallback for non-git repos - return get_flow_dir() / "state" + resolved = get_flow_dir() / "state" + _state_dir_cache[cache_key] = resolved + return resolved diff --git a/scripts/flowctl/core/state.py b/scripts/flowctl/core/state.py index befc215f..724db508 100644 --- a/scripts/flowctl/core/state.py +++ b/scripts/flowctl/core/state.py @@ -1,6 +1,7 @@ """State management: StateStore, task state operations.""" import json +import os from abc import ABC, abstractmethod from contextlib import contextmanager from pathlib import Path @@ -8,7 +9,7 @@ from flowctl.compat import _flock, LOCK_EX, LOCK_UN from flowctl.core.constants import RUNTIME_FIELDS, TASKS_DIR -from flowctl.core.ids import normalize_task +from flowctl.core.ids import is_task_id, normalize_task from flowctl.core.io import ( atomic_write, atomic_write_json, @@ -134,6 +135,67 @@ def load_task_with_state(task_id: str, use_json: bool = True) -> dict: return normalize_task(merged) +def load_all_tasks_with_state(epic_id: str | None = None) -> dict[str, dict]: + """Load all tasks with merged runtime state in a single directory scan. + + Uses os.scandir for efficient single-pass directory listing, optionally + filtered by epic_id prefix. Returns dict keyed by task ID. + + Args: + epic_id: If provided, only load tasks whose ID starts with this prefix. + E.g., "fn-4-slug" loads "fn-4-slug.1", "fn-4-slug.2", etc. + + Returns: + Dict of {task_id: merged_task_data}. + """ + flow_dir = get_flow_dir() + tasks_dir = flow_dir / TASKS_DIR + if not tasks_dir.exists(): + return {} + + store = get_state_store() + result: dict[str, dict] = {} + + # Single scandir pass — filter by .json suffix and optional epic prefix + prefix = f"{epic_id}." if epic_id else "fn-" + try: + entries = os.scandir(tasks_dir) + except OSError: + return {} + + for entry in entries: + if not entry.name.endswith(".json"): + continue + if not entry.name.startswith(prefix): + continue + + task_id = entry.name[:-5] # strip .json + if not is_task_id(task_id): + continue + + # Load definition + try: + with open(entry.path, encoding="utf-8") as f: + definition = json.load(f) + except (json.JSONDecodeError, IOError): + continue + + if "id" not in definition: + continue # Skip artifact files + + # Load runtime state + runtime = store.load_runtime(task_id) + if runtime is None: + runtime = {k: definition[k] for k in RUNTIME_FIELDS if k in definition} + if not runtime: + runtime = {"status": "todo"} + + merged = {**definition, **runtime} + result[task_id] = normalize_task(merged) + + return result + + def save_task_runtime(task_id: str, updates: dict) -> None: """Write runtime state only (merge with existing). Never touch definition file.""" store = get_state_store() diff --git a/scripts/hooks/ralph-guard.py b/scripts/hooks/ralph-guard.py index 526b0e40..d2f647d7 100755 --- a/scripts/hooks/ralph-guard.py +++ b/scripts/hooks/ralph-guard.py @@ -378,7 +378,8 @@ def handle_post_tool_use(data: dict) -> None: # - "$FLOWCTL" done if " done " in command and ("flowctl" in command or "FLOWCTL" in command): # Debug logging - with Path("/tmp/ralph-guard-debug.log").open("a") as f: + debug_file = _debug_log_path() + with debug_file.open("a") as f: f.write(f" -> flowctl done detected in: {command[:100]}...\n") # Extract task ID from command - look for "done" followed by task ID @@ -386,7 +387,7 @@ def handle_post_tool_use(data: dict) -> None: done_match = re.search(r"\bdone\s+([a-zA-Z0-9][a-zA-Z0-9._-]*)", command) if done_match: task_id = done_match.group(1) - with Path("/tmp/ralph-guard-debug.log").open("a") as f: + with debug_file.open("a") as f: f.write( f" -> Extracted task_id: {task_id}, response has 'status': {'status' in response_text.lower()}\n" ) @@ -405,7 +406,7 @@ def handle_post_tool_use(data: dict) -> None: done_set.add(task_id) state["flowctl_done_called"] = done_set save_state(session_id, state) - with Path("/tmp/ralph-guard-debug.log").open("a") as f: + with debug_file.open("a") as f: f.write( f" -> Added {task_id} to flowctl_done_called: {done_set}\n" ) @@ -582,9 +583,17 @@ def handle_subagent_stop(data: dict) -> None: handle_stop(data) +def _debug_log_path() -> Path: + """Get debug log path: $RUN_DIR/guard-debug.log if set, else /tmp fallback.""" + run_dir = os.environ.get("RUN_DIR") + if run_dir: + return Path(run_dir) / "guard-debug.log" + return Path("/tmp/ralph-guard-debug.log") + + def main(): # Debug logging - always write to see if hook is being called - debug_file = Path("/tmp/ralph-guard-debug.log") + debug_file = _debug_log_path() with debug_file.open("a") as f: f.write(f"[{os.environ.get('FLOW_RALPH', 'unset')}] Hook called\n") diff --git a/skills/flow-code-ralph-init/templates/watch-filter.py b/skills/flow-code-ralph-init/templates/watch-filter.py index c703e18d..3a93eefc 100755 --- a/skills/flow-code-ralph-init/templates/watch-filter.py +++ b/skills/flow-code-ralph-init/templates/watch-filter.py @@ -32,6 +32,15 @@ # TUI indentation (3 spaces to match ralph.sh) INDENT = " " +# Context prefix from environment (iteration + task) +_ITER = os.environ.get("RALPH_ITERATION", "") +_TASK = os.environ.get("RALPH_TASK_ID", "") +_PREFIX = "" +if _ITER: + _PREFIX += f"[iter {_ITER}] " +if _TASK: + _PREFIX += f"[{_TASK}] " + # Tool icons ICONS = { "Bash": "🔧", @@ -165,7 +174,7 @@ def process_event(event: dict, verbose: bool) -> None: tool_name = block.get("name", "") tool_input = block.get("input", {}) formatted = format_tool_use(tool_name, tool_input) - safe_print(f"{INDENT}{C_DIM}{formatted}{C_RESET}") + safe_print(f"{INDENT}{C_DIM}{_PREFIX}{formatted}{C_RESET}") elif verbose and block_type == "text": text = block.get("text", "") From 6267336132786abe900451a2e99d75ef88eda3b0 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 12:24:53 +0800 Subject: [PATCH 6/6] fix: heading validation ignores fenced code blocks, epic reopen clears plan review - validate_task_spec_headings now strips fenced code blocks before checking for duplicate headings, preventing false positives on specs with markdown examples containing ## headings - epic reopen now resets plan_review_status and plan_reviewed_at alongside completion_review_status, so flowctl next --require-plan-review correctly re-requires review after reopening - Updated test to verify fenced code block handling (was documenting as known limitation, now fixed) Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/flowctl/commands/admin.py | 10 ++++++++-- scripts/flowctl/commands/epic.py | 4 +++- scripts/flowctl/tests/test_admin.py | 19 ++++++++++--------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/scripts/flowctl/commands/admin.py b/scripts/flowctl/commands/admin.py index 77868a0f..f685d8b3 100644 --- a/scripts/flowctl/commands/admin.py +++ b/scripts/flowctl/commands/admin.py @@ -132,13 +132,19 @@ def find_active_run( # --- Validation helpers --- +def _strip_fenced_blocks(content: str) -> str: + """Remove fenced code blocks (``` ... ```) so headings inside them are ignored.""" + return re.sub(r"^```.*?^```", "", content, flags=re.MULTILINE | re.DOTALL) + + def validate_task_spec_headings(content: str) -> list[str]: """Validate task spec has required headings exactly once. Returns errors.""" + # Strip fenced code blocks so ## headings inside examples aren't counted + stripped = _strip_fenced_blocks(content) errors = [] for heading in TASK_SPEC_HEADINGS: - # Use regex anchored to line start to avoid matching inside code blocks pattern = rf"^{re.escape(heading)}\s*$" - count = len(re.findall(pattern, content, flags=re.MULTILINE)) + count = len(re.findall(pattern, stripped, flags=re.MULTILINE)) if count == 0: errors.append(f"Missing required heading: {heading}") elif count > 1: diff --git a/scripts/flowctl/commands/epic.py b/scripts/flowctl/commands/epic.py index fe2c8133..1df466c8 100644 --- a/scripts/flowctl/commands/epic.py +++ b/scripts/flowctl/commands/epic.py @@ -912,9 +912,11 @@ def cmd_epic_reopen(args: argparse.Namespace) -> None: use_json=args.json, ) - # Set status back to open and reset completion review + # Set status back to open and reset review metadata epic_data["status"] = "open" epic_data["completion_review_status"] = "unknown" + epic_data["plan_review_status"] = "unknown" + epic_data.pop("plan_reviewed_at", None) epic_data["updated_at"] = now_iso() atomic_write_json(epic_path, epic_data) diff --git a/scripts/flowctl/tests/test_admin.py b/scripts/flowctl/tests/test_admin.py index 4f09dd3b..1fa751fd 100644 --- a/scripts/flowctl/tests/test_admin.py +++ b/scripts/flowctl/tests/test_admin.py @@ -76,17 +76,18 @@ def test_duplicate_acceptance(self): errors = validate_task_spec_headings(content) assert any("Duplicate heading: ## Acceptance" in e for e in errors) - def test_heading_inside_code_block_not_matched(self): - """Headings inside code blocks should still be matched (regex is line-anchored). - - The current implementation uses line-anchored regex, so ## Description - inside a code block on its own line WILL be detected. This test documents - that known limitation rather than asserting it doesn't match. - """ + def test_heading_inside_code_block_ignored(self): + """Headings inside fenced code blocks should be ignored.""" content = VALID_SPEC + "\n```\n## Description\n```\n" errors = validate_task_spec_headings(content) - # The heading appears twice now (once real, once in code block) - assert any("Duplicate heading: ## Description" in e for e in errors) + # Fenced code blocks are stripped before checking — no duplicate + assert errors == [] + + def test_heading_inside_tagged_code_block_ignored(self): + """Headings inside fenced code blocks with language tags should be ignored.""" + content = VALID_SPEC + "\n```markdown\n## Description\n## Acceptance\n```\n" + errors = validate_task_spec_headings(content) + assert errors == [] def test_heading_with_trailing_whitespace(self): """Headings with trailing whitespace should still be recognized."""