diff --git a/scripts/flowctl/cli.py b/scripts/flowctl/cli.py
index 5556222a..6b4b1cc7 100644
--- a/scripts/flowctl/cli.py
+++ b/scripts/flowctl/cli.py
@@ -18,6 +18,7 @@
from flowctl.commands.admin import (
cmd_init,
cmd_detect,
+ cmd_doctor,
cmd_status,
cmd_ralph_pause,
cmd_ralph_resume,
@@ -45,6 +46,7 @@
cmd_epic_rm_dep,
cmd_epic_set_backend,
cmd_epic_close,
+ cmd_epic_reopen,
cmd_epic_archive,
cmd_epic_clean,
)
@@ -336,6 +338,11 @@ def main() -> None:
p_epic_close.add_argument("--json", action="store_true", help="JSON output")
p_epic_close.set_defaults(func=cmd_epic_close)
+ p_epic_reopen = epic_sub.add_parser("reopen", help="Reopen a closed epic")
+ p_epic_reopen.add_argument("id", help="Epic ID (e.g., fn-1, fn-1-add-auth)")
+ p_epic_reopen.add_argument("--json", action="store_true", help="JSON output")
+ p_epic_reopen.set_defaults(func=cmd_epic_reopen)
+
p_epic_archive = epic_sub.add_parser(
"archive", help="Archive closed epic to .flow/.archive/"
)
@@ -700,6 +707,13 @@ def main() -> None:
p_validate.add_argument("--json", action="store_true", help="JSON output")
p_validate.set_defaults(func=cmd_validate)
+ # doctor
+ p_doctor = subparsers.add_parser(
+ "doctor", help="Run comprehensive state health diagnostics"
+ )
+ p_doctor.add_argument("--json", action="store_true", help="JSON output")
+ p_doctor.set_defaults(func=cmd_doctor)
+
# checkpoint
p_checkpoint = subparsers.add_parser("checkpoint", help="Checkpoint commands")
checkpoint_sub = p_checkpoint.add_subparsers(dest="checkpoint_cmd", required=True)
diff --git a/scripts/flowctl/commands/admin.py b/scripts/flowctl/commands/admin.py
index c6163aad..f685d8b3 100644
--- a/scripts/flowctl/commands/admin.py
+++ b/scripts/flowctl/commands/admin.py
@@ -1,10 +1,13 @@
-"""Admin commands: init, detect, status, ralph control, config, review-backend, validate."""
+"""Admin commands: init, detect, status, ralph control, config, review-backend, validate, doctor."""
import argparse
import json
import os
import re
+import subprocess
import sys
+import tempfile
+from datetime import datetime
from pathlib import Path
from typing import Optional
@@ -25,6 +28,7 @@
deep_merge,
get_config,
get_default_config,
+ load_flow_config,
set_config,
)
from flowctl.core.ids import is_epic_id, is_task_id, normalize_epic
@@ -36,8 +40,8 @@
load_json,
load_json_or_exit,
)
-from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root
-from flowctl.core.state import load_task_with_state
+from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root, get_state_dir
+from flowctl.core.state import get_state_store, load_task_with_state
from flowctl.commands.stack import detect_stack
@@ -128,13 +132,19 @@ def find_active_run(
# --- Validation helpers ---
+def _strip_fenced_blocks(content: str) -> str:
+ """Remove fenced code blocks (``` ... ```) so headings inside them are ignored."""
+ return re.sub(r"^```.*?^```", "", content, flags=re.MULTILINE | re.DOTALL)
+
+
def validate_task_spec_headings(content: str) -> list[str]:
"""Validate task spec has required headings exactly once. Returns errors."""
+ # Strip fenced code blocks so ## headings inside examples aren't counted
+ stripped = _strip_fenced_blocks(content)
errors = []
for heading in TASK_SPEC_HEADINGS:
- # Use regex anchored to line start to avoid matching inside code blocks
pattern = rf"^{re.escape(heading)}\s*$"
- count = len(re.findall(pattern, content, flags=re.MULTILINE))
+ count = len(re.findall(pattern, stripped, flags=re.MULTILINE))
if count == 0:
errors.append(f"Missing required heading: {heading}")
elif count > 1:
@@ -949,3 +959,227 @@ def cmd_validate(args: argparse.Namespace) -> None:
# Exit with non-zero if validation failed
if not valid:
sys.exit(1)
+
+
+# --- Doctor command ---
+
+
+def cmd_doctor(args: argparse.Namespace) -> None:
+ """Run comprehensive state health diagnostics (superset of validate --all)."""
+ if not ensure_flow_exists():
+ error_exit(
+ ".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json
+ )
+
+ flow_dir = get_flow_dir()
+ checks: list[dict] = []
+
+ def add_check(name: str, status: str, message: str) -> None:
+ checks.append({"name": name, "status": status, "message": message})
+
+ # --- Check 1: Run validate --all internally ---
+ import io as _io
+ import contextlib
+
+ fake_args = argparse.Namespace(epic=None, all=True, json=True)
+ validate_output = _io.StringIO()
+ validate_passed = True
+ try:
+ with contextlib.redirect_stdout(validate_output):
+ cmd_validate(fake_args)
+ except SystemExit as e:
+ if e.code != 0:
+ validate_passed = False
+
+ if validate_passed:
+ add_check("validate", "pass", "All epics and tasks validated successfully")
+ else:
+ # Parse the validate output for details
+ try:
+ vdata = json.loads(validate_output.getvalue())
+ err_count = vdata.get("total_errors", 0)
+ add_check(
+ "validate", "fail",
+ f"Validation found {err_count} error(s). Run 'flowctl validate --all' for details"
+ )
+ except (json.JSONDecodeError, ValueError):
+ add_check("validate", "fail", "Validation failed (could not parse output)")
+
+ # --- Check 2: State-dir accessibility ---
+ try:
+ state_dir = get_state_dir()
+ state_dir.mkdir(parents=True, exist_ok=True)
+ # Test write access
+ test_file = state_dir / ".doctor-probe"
+ test_file.write_text("probe", encoding="utf-8")
+ test_file.unlink()
+ add_check("state_dir_access", "pass", f"State dir accessible: {state_dir}")
+ except (OSError, PermissionError) as e:
+ add_check("state_dir_access", "fail", f"State dir not accessible: {e}")
+
+ # --- Check 3: Orphaned state files ---
+ try:
+ store = get_state_store()
+ runtime_ids = store.list_runtime_files()
+ tasks_dir = flow_dir / TASKS_DIR
+ orphaned = []
+ for rid in runtime_ids:
+ task_def_path = tasks_dir / f"{rid}.json"
+ if not task_def_path.exists():
+ orphaned.append(rid)
+ if orphaned:
+ add_check(
+ "orphaned_state", "warn",
+ f"{len(orphaned)} orphaned state file(s): {', '.join(orphaned[:5])}"
+ + (f" (+{len(orphaned) - 5} more)" if len(orphaned) > 5 else "")
+ )
+ else:
+ add_check("orphaned_state", "pass", "No orphaned state files")
+ except Exception as e:
+ add_check("orphaned_state", "warn", f"Could not check orphaned state: {e}")
+
+ # --- Check 4: Stale in_progress tasks (>7 days) ---
+ try:
+ stale = []
+ tasks_dir = flow_dir / TASKS_DIR
+ if tasks_dir.exists():
+ for task_file in tasks_dir.glob("fn-*.json"):
+ task_id = task_file.stem
+ if not is_task_id(task_id):
+ continue
+ try:
+ task_data = load_task_with_state(task_id, use_json=True)
+ except SystemExit:
+ continue
+ if task_data.get("status") != "in_progress":
+ continue
+ updated = task_data.get("updated_at") or task_data.get("claimed_at")
+ if updated:
+ try:
+ # Parse ISO timestamp
+ ts = updated.replace("Z", "+00:00")
+ task_time = datetime.fromisoformat(ts)
+ now = datetime.utcnow().replace(
+ tzinfo=task_time.tzinfo
+ )
+ age_days = (now - task_time).days
+ if age_days > 7:
+ stale.append(f"{task_id} ({age_days}d)")
+ except (ValueError, TypeError):
+ pass
+ if stale:
+ add_check(
+ "stale_tasks", "warn",
+ f"{len(stale)} task(s) in_progress for >7 days: {', '.join(stale[:5])}"
+ + (f" (+{len(stale) - 5} more)" if len(stale) > 5 else "")
+ )
+ else:
+ add_check("stale_tasks", "pass", "No stale in_progress tasks")
+ except Exception as e:
+ add_check("stale_tasks", "warn", f"Could not check stale tasks: {e}")
+
+ # --- Check 5: Lock file accumulation ---
+ try:
+ state_dir = get_state_dir()
+ locks_dir = state_dir / "locks"
+ lock_count = 0
+ if locks_dir.exists():
+ lock_count = sum(1 for _ in locks_dir.glob("*.lock"))
+ if lock_count > 50:
+ add_check(
+ "lock_files", "warn",
+ f"{lock_count} lock files in state dir (consider cleanup)"
+ )
+ else:
+ add_check(
+ "lock_files", "pass",
+ f"{lock_count} lock file(s) in state dir"
+ )
+ except Exception as e:
+ add_check("lock_files", "warn", f"Could not check lock files: {e}")
+
+ # --- Check 6: Config validity ---
+ try:
+ config_path = flow_dir / CONFIG_FILE
+ if config_path.exists():
+ raw_text = config_path.read_text(encoding="utf-8")
+ parsed = json.loads(raw_text)
+ if not isinstance(parsed, dict):
+ add_check("config", "fail", "config.json is not a JSON object")
+ else:
+ # Check for known top-level keys
+ known_keys = set(get_default_config().keys())
+ unknown = set(parsed.keys()) - known_keys
+ if unknown:
+ add_check(
+ "config", "warn",
+ f"Unknown config keys: {', '.join(sorted(unknown))}"
+ )
+ else:
+ add_check("config", "pass", "config.json valid with known keys")
+ else:
+ add_check("config", "warn", "config.json missing (run 'flowctl init')")
+ except json.JSONDecodeError as e:
+ add_check("config", "fail", f"config.json invalid JSON: {e}")
+ except Exception as e:
+ add_check("config", "warn", f"Could not check config: {e}")
+
+ # --- Check 7: git-common-dir reachability ---
+ try:
+ result = subprocess.run(
+ ["git", "rev-parse", "--git-common-dir", "--path-format=absolute"],
+ capture_output=True, text=True, check=True,
+ )
+ common_dir = Path(result.stdout.strip())
+ if common_dir.exists():
+ add_check(
+ "git_common_dir", "pass",
+ f"git common-dir reachable: {common_dir}"
+ )
+ else:
+ add_check(
+ "git_common_dir", "warn",
+ f"git common-dir path does not exist: {common_dir}"
+ )
+ except subprocess.CalledProcessError:
+ add_check(
+ "git_common_dir", "warn",
+ "Not in a git repository (git common-dir unavailable)"
+ )
+ except FileNotFoundError:
+ add_check(
+ "git_common_dir", "warn",
+ "git not found on PATH"
+ )
+
+ # --- Build summary ---
+ summary = {"pass": 0, "warn": 0, "fail": 0}
+ for c in checks:
+ summary[c["status"]] += 1
+
+ overall_healthy = summary["fail"] == 0
+
+ if args.json:
+ json_output(
+ {
+ "checks": checks,
+ "summary": summary,
+ "healthy": overall_healthy,
+ },
+ success=overall_healthy,
+ )
+ else:
+ print("Doctor diagnostics:")
+ for c in checks:
+ icon = {"pass": "OK", "warn": "WARN", "fail": "FAIL"}[c["status"]]
+ print(f" [{icon}] {c['name']}: {c['message']}")
+ print()
+ print(
+ f"Summary: {summary['pass']} pass, "
+ f"{summary['warn']} warn, {summary['fail']} fail"
+ )
+ if not overall_healthy:
+ print("Health check FAILED — resolve fail items above.")
+
+ if not overall_healthy:
+ sys.exit(1)
diff --git a/scripts/flowctl/commands/epic.py b/scripts/flowctl/commands/epic.py
index 56c2f49e..1df466c8 100644
--- a/scripts/flowctl/commands/epic.py
+++ b/scripts/flowctl/commands/epic.py
@@ -196,6 +196,21 @@ def cmd_epic_set_plan(args: argparse.Namespace) -> None:
# Read content from file or stdin
content = read_file_or_stdin(args.file, "Input file", use_json=args.json)
+ # Validate spec headings: reject duplicate headings
+ headings = re.findall(r"^(##\s+.+?)\s*$", content, flags=re.MULTILINE)
+ seen = {}
+ duplicates = []
+ for h in headings:
+ seen[h] = seen.get(h, 0) + 1
+ for h, count in seen.items():
+ if count > 1:
+ duplicates.append(f"Duplicate heading: {h} (found {count} times)")
+ if duplicates:
+ error_exit(
+ f"Spec validation failed: {'; '.join(duplicates)}",
+ use_json=args.json,
+ )
+
# Write spec
spec_path = flow_dir / SPECS_DIR / f"{args.id}.md"
atomic_write(spec_path, content)
@@ -857,6 +872,67 @@ def cmd_epic_archive(args: argparse.Namespace) -> None:
print(f" {f}")
+def cmd_epic_reopen(args: argparse.Namespace) -> None:
+ """Reopen a closed epic (sets status back to open)."""
+ if not ensure_flow_exists():
+ error_exit(
+ ".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json
+ )
+
+ epic_id = args.id
+ if not is_epic_id(epic_id):
+ error_exit(
+ f"Invalid epic ID: {epic_id}. Expected format: fn-N or fn-N-slug "
+ f"(e.g., fn-1, fn-1-add-auth)",
+ use_json=args.json,
+ )
+
+ flow_dir = get_flow_dir()
+ epic_path = flow_dir / EPICS_DIR / f"{epic_id}.json"
+
+ if not epic_path.exists():
+ # Check if archived
+ archive_path = flow_dir / ".archive" / epic_id
+ if archive_path.exists():
+ error_exit(
+ f"Epic {epic_id} is archived. Unarchive it first before reopening.",
+ use_json=args.json,
+ )
+ error_exit(f"Epic {epic_id} not found", use_json=args.json)
+
+ epic_data = normalize_epic(
+ load_json_or_exit(epic_path, f"Epic {epic_id}", use_json=args.json)
+ )
+
+ previous_status = epic_data.get("status", "unknown")
+
+ if previous_status == "open":
+ error_exit(
+ f"Epic {epic_id} is already open (no-op protection)",
+ use_json=args.json,
+ )
+
+ # Set status back to open and reset review metadata
+ epic_data["status"] = "open"
+ epic_data["completion_review_status"] = "unknown"
+ epic_data["plan_review_status"] = "unknown"
+ epic_data.pop("plan_reviewed_at", None)
+ epic_data["updated_at"] = now_iso()
+ atomic_write_json(epic_path, epic_data)
+
+ if args.json:
+ json_output(
+ {
+ "id": epic_id,
+ "previous_status": previous_status,
+ "new_status": "open",
+ "message": f"Epic {epic_id} reopened",
+ }
+ )
+ else:
+ print(f"Epic {epic_id} reopened (was: {previous_status})")
+
+
def cmd_epic_clean(args: argparse.Namespace) -> None:
"""Archive all closed epics at once."""
if not ensure_flow_exists():
diff --git a/scripts/flowctl/commands/query.py b/scripts/flowctl/commands/query.py
index 09718624..0a8d58aa 100644
--- a/scripts/flowctl/commands/query.py
+++ b/scripts/flowctl/commands/query.py
@@ -1,8 +1,6 @@
"""Cross-cutting query commands: show, epics, files, tasks, list, cat."""
import argparse
-import re
-from pathlib import Path
from flowctl.core.constants import (
EPICS_DIR,
@@ -23,6 +21,7 @@
)
from flowctl.core.paths import ensure_flow_exists, get_flow_dir
from flowctl.core.state import (
+ load_all_tasks_with_state,
load_task_with_state,
lock_files,
unlock_files,
@@ -132,19 +131,10 @@ def cmd_epics(args: argparse.Namespace) -> None:
epic_file, f"Epic {epic_file.stem}", use_json=args.json
)
)
- # Count tasks (with merged runtime state)
- tasks_dir = flow_dir / TASKS_DIR
- task_count = 0
- done_count = 0
- if tasks_dir.exists():
- for task_file in tasks_dir.glob(f"{epic_data['id']}.*.json"):
- task_id = task_file.stem
- if not is_task_id(task_id):
- continue # Skip non-task files (e.g., fn-1.2-review.json)
- task_data = load_task_with_state(task_id, use_json=args.json)
- task_count += 1
- if task_data.get("status") == "done":
- done_count += 1
+ # Count tasks via batch loading (single directory scan per epic)
+ epic_tasks = load_all_tasks_with_state(epic_id=epic_data['id'])
+ task_count = len(epic_tasks)
+ done_count = sum(1 for t in epic_tasks.values() if t.get("status") == "done")
epics.append(
{
@@ -252,37 +242,30 @@ def cmd_tasks(args: argparse.Namespace) -> None:
".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json
)
- flow_dir = get_flow_dir()
- tasks_dir = flow_dir / TASKS_DIR
+ # Batch load all tasks in a single directory scan
+ all_loaded = load_all_tasks_with_state(epic_id=args.epic if args.epic else None)
tasks = []
- if tasks_dir.exists():
- pattern = f"{args.epic}.*.json" if args.epic else "fn-*.json"
- for task_file in sorted(tasks_dir.glob(pattern)):
- task_id = task_file.stem
- if not is_task_id(task_id):
- continue # Skip non-task files (e.g., fn-1.2-review.json)
- # Load task with merged runtime state
- task_data = load_task_with_state(task_id, use_json=args.json)
- if "id" not in task_data:
- continue # Skip artifact files (GH-21)
- # Filter by status if requested
- if args.status and task_data["status"] != args.status:
- continue
- # Filter by domain if requested
- if hasattr(args, "domain") and args.domain and task_data.get("domain") != args.domain:
- continue
- tasks.append(
- {
- "id": task_data["id"],
- "epic": task_data["epic"],
- "title": task_data["title"],
- "status": task_data["status"],
- "priority": task_data.get("priority"),
- "domain": task_data.get("domain"),
- "depends_on": task_data.get("depends_on", task_data.get("deps", [])),
- }
- )
+ for task_id, task_data in all_loaded.items():
+ if "id" not in task_data:
+ continue # Skip artifact files (GH-21)
+ # Filter by status if requested
+ if args.status and task_data["status"] != args.status:
+ continue
+ # Filter by domain if requested
+ if hasattr(args, "domain") and args.domain and task_data.get("domain") != args.domain:
+ continue
+ tasks.append(
+ {
+ "id": task_data["id"],
+ "epic": task_data["epic"],
+ "title": task_data["title"],
+ "status": task_data["status"],
+ "priority": task_data.get("priority"),
+ "domain": task_data.get("domain"),
+ "depends_on": task_data.get("depends_on", task_data.get("deps", [])),
+ }
+ )
# Sort tasks by epic number then task number
def task_sort_key(t):
@@ -321,7 +304,6 @@ def cmd_list(args: argparse.Namespace) -> None:
flow_dir = get_flow_dir()
epics_dir = flow_dir / EPICS_DIR
- tasks_dir = flow_dir / TASKS_DIR
# Load all epics
epics = []
@@ -341,31 +323,27 @@ def epic_sort_key(e):
epics.sort(key=epic_sort_key)
- # Load all tasks grouped by epic (with merged runtime state)
+ # Batch load all tasks in a single directory scan (with merged runtime state)
+ all_loaded = load_all_tasks_with_state()
tasks_by_epic = {}
all_tasks = []
- if tasks_dir.exists():
- for task_file in sorted(tasks_dir.glob("fn-*.json")):
- task_id = task_file.stem
- if not is_task_id(task_id):
- continue # Skip non-task files (e.g., fn-1.2-review.json)
- task_data = load_task_with_state(task_id, use_json=args.json)
- if "id" not in task_data or "epic" not in task_data:
- continue # Skip artifact files (GH-21)
- epic_id = task_data["epic"]
- if epic_id not in tasks_by_epic:
- tasks_by_epic[epic_id] = []
- tasks_by_epic[epic_id].append(task_data)
- all_tasks.append(
- {
- "id": task_data["id"],
- "epic": task_data["epic"],
- "title": task_data["title"],
- "status": task_data["status"],
- "priority": task_data.get("priority"),
- "depends_on": task_data.get("depends_on", task_data.get("deps", [])),
- }
- )
+ for task_id, task_data in all_loaded.items():
+ if "id" not in task_data or "epic" not in task_data:
+ continue # Skip artifact files (GH-21)
+ epic_id = task_data["epic"]
+ if epic_id not in tasks_by_epic:
+ tasks_by_epic[epic_id] = []
+ tasks_by_epic[epic_id].append(task_data)
+ all_tasks.append(
+ {
+ "id": task_data["id"],
+ "epic": task_data["epic"],
+ "title": task_data["title"],
+ "status": task_data["status"],
+ "priority": task_data.get("priority"),
+ "depends_on": task_data.get("depends_on", task_data.get("deps", [])),
+ }
+ )
# Sort tasks within each epic
for epic_id in tasks_by_epic:
diff --git a/scripts/flowctl/commands/review/__init__.py b/scripts/flowctl/commands/review/__init__.py
index a859ec39..972b0f00 100644
--- a/scripts/flowctl/commands/review/__init__.py
+++ b/scripts/flowctl/commands/review/__init__.py
@@ -11,13 +11,16 @@
from flowctl.commands.review.codex_utils import ( # noqa: F401
CODEX_EFFORT_LEVELS,
CODEX_SANDBOX_MODES,
+ delete_stale_receipt,
get_codex_version,
is_sandbox_failure,
+ load_receipt,
parse_codex_thread_id,
parse_codex_verdict,
require_codex,
resolve_codex_sandbox,
run_codex_exec,
+ save_receipt,
)
from flowctl.commands.review.prompts import ( # noqa: F401
build_completion_review_prompt,
diff --git a/scripts/flowctl/commands/review/adversarial.py b/scripts/flowctl/commands/review/adversarial.py
index c50bbba1..8ea076cd 100644
--- a/scripts/flowctl/commands/review/adversarial.py
+++ b/scripts/flowctl/commands/review/adversarial.py
@@ -2,15 +2,12 @@
import argparse
import json
-import os
import re
-import subprocess
from pathlib import Path
from typing import Optional
-from flowctl.core.git import get_changed_files, get_embedded_file_contents
+from flowctl.core.git import get_changed_files, get_diff_context, get_embedded_file_contents
from flowctl.core.io import error_exit, json_output
-from flowctl.core.paths import get_repo_root
from flowctl.commands.review.codex_utils import (
run_codex_exec,
@@ -108,39 +105,8 @@ def cmd_codex_adversarial(args: argparse.Namespace) -> None:
base_branch = args.base
focus = getattr(args, "focus", None)
- # Get diff
- diff_summary = ""
- diff_content = ""
- try:
- result = subprocess.run(
- ["git", "diff", "--stat", f"{base_branch}..HEAD"],
- capture_output=True, text=True, cwd=get_repo_root(),
- )
- if result.returncode == 0:
- diff_summary = result.stdout.strip()
- except (subprocess.CalledProcessError, OSError):
- pass
-
- max_diff_bytes = 50000
- try:
- proc = subprocess.Popen(
- ["git", "diff", f"{base_branch}..HEAD"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=get_repo_root(),
- )
- diff_bytes = proc.stdout.read(max_diff_bytes + 1)
- was_truncated = len(diff_bytes) > max_diff_bytes
- if was_truncated:
- diff_bytes = diff_bytes[:max_diff_bytes]
- while proc.stdout.read(65536):
- pass
- proc.stdout.close()
- proc.stderr.close()
- proc.wait()
- diff_content = diff_bytes.decode("utf-8", errors="replace").strip()
- if was_truncated:
- diff_content += "\n\n... [diff truncated at 50KB]"
- except (subprocess.CalledProcessError, OSError):
- pass
+ # Get diff summary + content via shared helper
+ diff_summary, diff_content = get_diff_context(base_branch)
if not diff_summary and not diff_content:
error_exit(f"No changes found between {base_branch} and HEAD", use_json=args.json)
diff --git a/scripts/flowctl/commands/review/codex_utils.py b/scripts/flowctl/commands/review/codex_utils.py
index 65cfe538..f4385162 100644
--- a/scripts/flowctl/commands/review/codex_utils.py
+++ b/scripts/flowctl/commands/review/codex_utils.py
@@ -67,6 +67,92 @@ def get_codex_version() -> Optional[str]:
return None
+# ─────────────────────────────────────────────────────────────────────────────
+# Receipt lifecycle helpers
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+def load_receipt(path: Optional[str]) -> tuple[Optional[str], bool]:
+ """Load a review receipt and extract session info for re-reviews.
+
+ Args:
+ path: Receipt file path (may be None).
+
+ Returns:
+ tuple: (session_id, is_rereview)
+ - session_id: Codex thread ID from previous review, or None.
+ - is_rereview: True if a valid session was found (indicates re-review).
+ """
+ if not path:
+ return None, False
+ receipt_file = Path(path)
+ if not receipt_file.exists():
+ return None, False
+ try:
+ receipt_data = json.loads(receipt_file.read_text(encoding="utf-8"))
+ session_id = receipt_data.get("session_id")
+ return session_id, session_id is not None
+ except (json.JSONDecodeError, Exception):
+ return None, False
+
+
+def save_receipt(
+ path: str,
+ *,
+ review_type: str,
+ review_id: str,
+ mode: str = "codex",
+ verdict: str,
+ session_id: Optional[str],
+ output: str,
+ base_branch: Optional[str] = None,
+ focus: Optional[str] = None,
+) -> None:
+ """Write a Ralph-compatible review receipt to *path*.
+
+ Automatically includes the current RALPH_ITERATION env var if set.
+ """
+ receipt_data: dict[str, Any] = {
+ "type": review_type,
+ "id": review_id,
+ "mode": mode,
+ "verdict": verdict,
+ "session_id": session_id,
+ "timestamp": now_iso(),
+ "review": output,
+ }
+ if base_branch is not None:
+ receipt_data["base"] = base_branch
+ if focus is not None:
+ receipt_data["focus"] = focus
+
+ # Add iteration if running under Ralph
+ ralph_iter = os.environ.get("RALPH_ITERATION")
+ if ralph_iter:
+ try:
+ receipt_data["iteration"] = int(ralph_iter)
+ except ValueError:
+ pass
+
+ Path(path).write_text(
+ json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8"
+ )
+
+
+def delete_stale_receipt(path: Optional[str]) -> None:
+ """Delete a receipt file if it exists (best-effort).
+
+ Used to clear stale receipts on review failure so they don't
+ falsely satisfy downstream gates.
+ """
+ if not path:
+ return
+ try:
+ Path(path).unlink(missing_ok=True)
+ except OSError:
+ pass
+
+
CODEX_SANDBOX_MODES = {"read-only", "workspace-write", "danger-full-access", "auto"}
diff --git a/scripts/flowctl/commands/review/commands.py b/scripts/flowctl/commands/review/commands.py
index 192d62b5..d6bc9b68 100644
--- a/scripts/flowctl/commands/review/commands.py
+++ b/scripts/flowctl/commands/review/commands.py
@@ -1,37 +1,33 @@
"""Review commands: check, impl-review, plan-review, completion-review."""
import argparse
-import json
-import os
-import re
-import subprocess
+import shutil
+import sys
from pathlib import Path
-from typing import Any, Optional
-from flowctl.core.constants import EPICS_DIR, SPECS_DIR, TASKS_DIR
+from flowctl.core.constants import SPECS_DIR, TASKS_DIR
from flowctl.core.git import (
gather_context_hints,
get_changed_files,
+ get_diff_context,
get_embedded_file_contents,
)
from flowctl.core.ids import is_epic_id, is_task_id
from flowctl.core.io import (
- atomic_write,
- atomic_write_json,
error_exit,
json_output,
- load_json,
- load_json_or_exit,
- now_iso,
)
from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root
from flowctl.commands.review.codex_utils import (
- run_codex_exec,
+ delete_stale_receipt,
+ get_codex_version,
+ is_sandbox_failure,
+ load_receipt,
parse_codex_verdict,
- parse_codex_thread_id,
resolve_codex_sandbox,
- is_sandbox_failure,
+ run_codex_exec,
+ save_receipt,
CODEX_EFFORT_LEVELS,
)
from flowctl.commands.review.prompts import (
@@ -84,53 +80,8 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
task_spec = task_spec_path.read_text(encoding="utf-8")
- # Get diff summary (--stat) - use base..HEAD for committed changes only
- diff_summary = ""
- try:
- diff_result = subprocess.run(
- ["git", "diff", "--stat", f"{base_branch}..HEAD"],
- capture_output=True,
- text=True,
- cwd=get_repo_root(),
- )
- if diff_result.returncode == 0:
- diff_summary = diff_result.stdout.strip()
- except (subprocess.CalledProcessError, OSError):
- pass
-
- # Get actual diff content with size cap (avoid memory spike on large diffs)
- # Use base..HEAD for committed changes only (not working tree)
- diff_content = ""
- max_diff_bytes = 50000
- try:
- proc = subprocess.Popen(
- ["git", "diff", f"{base_branch}..HEAD"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=get_repo_root(),
- )
- # Read only up to max_diff_bytes
- diff_bytes = proc.stdout.read(max_diff_bytes + 1)
- was_truncated = len(diff_bytes) > max_diff_bytes
- if was_truncated:
- diff_bytes = diff_bytes[:max_diff_bytes]
- # Consume remaining stdout in chunks (avoid allocating the entire diff)
- while proc.stdout.read(65536):
- pass
- stderr_bytes = proc.stderr.read()
- proc.stdout.close()
- proc.stderr.close()
- returncode = proc.wait()
-
- if returncode != 0 and stderr_bytes:
- # Include error info but don't fail - diff is optional context
- diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]"
- else:
- diff_content = diff_bytes.decode("utf-8", errors="replace").strip()
- if was_truncated:
- diff_content += "\n\n... [diff truncated at 50KB]"
- except (subprocess.CalledProcessError, OSError):
- pass
+ # Get diff summary + content via shared helper
+ diff_summary, diff_content = get_diff_context(base_branch)
# Always embed changed file contents so Codex doesn't waste turns reading
# files from disk. Without embedding, Codex exhausts its turn budget on
@@ -162,17 +113,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
# Check for existing session in receipt (indicates re-review)
receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None
- session_id = None
- is_rereview = False
- if receipt_path:
- receipt_file = Path(receipt_path)
- if receipt_file.exists():
- try:
- receipt_data = json.loads(receipt_file.read_text(encoding="utf-8"))
- session_id = receipt_data.get("session_id")
- is_rereview = session_id is not None
- except (json.JSONDecodeError, Exception):
- pass
+ session_id, is_rereview = load_receipt(receipt_path)
# For re-reviews, prepend instruction to re-read changed files
if is_rereview:
@@ -197,12 +138,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
# Check for sandbox failures (clear stale receipt and exit)
if is_sandbox_failure(exit_code, output, stderr):
- # Clear any stale receipt to prevent false gate satisfaction
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass # Best effort - proceed to error_exit regardless
+ delete_stale_receipt(receipt_path)
msg = (
"Codex sandbox blocked operations. "
"Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access"
@@ -211,12 +147,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
# Handle non-sandbox failures
if exit_code != 0:
- # Clear any stale receipt to prevent false gate satisfaction
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
msg = (stderr or output or "codex exec failed").strip()
error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2)
@@ -225,12 +156,7 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
# Fail if no verdict found (don't let UNKNOWN pass as success)
if not verdict:
- # Clear any stale receipt
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
error_exit(
"Codex review completed but no verdict found in output. "
"Expected SHIP or NEEDS_WORK",
@@ -241,29 +167,17 @@ def cmd_codex_impl_review(args: argparse.Namespace) -> None:
# Determine review id (task_id for task reviews, "branch" for standalone)
review_id = task_id if task_id else "branch"
- # Write receipt if path provided (Ralph-compatible schema)
+ # Write receipt if path provided
if receipt_path:
- receipt_data = {
- "type": "impl_review", # Required by Ralph
- "id": review_id, # Required by Ralph
- "mode": "codex",
- "base": base_branch,
- "verdict": verdict,
- "session_id": thread_id,
- "timestamp": now_iso(),
- "review": output, # Full review feedback for fix loop
- }
- # Add iteration if running under Ralph
- ralph_iter = os.environ.get("RALPH_ITERATION")
- if ralph_iter:
- try:
- receipt_data["iteration"] = int(ralph_iter)
- except ValueError:
- pass
- if focus:
- receipt_data["focus"] = focus
- Path(receipt_path).write_text(
- json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8"
+ save_receipt(
+ receipt_path,
+ review_type="impl_review",
+ review_id=review_id,
+ verdict=verdict,
+ session_id=thread_id,
+ output=output,
+ base_branch=base_branch,
+ focus=focus,
)
# Output
@@ -377,17 +291,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None:
# Check for existing session in receipt (indicates re-review)
receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None
- session_id = None
- is_rereview = False
- if receipt_path:
- receipt_file = Path(receipt_path)
- if receipt_file.exists():
- try:
- receipt_data = json.loads(receipt_file.read_text(encoding="utf-8"))
- session_id = receipt_data.get("session_id")
- is_rereview = session_id is not None
- except (json.JSONDecodeError, Exception):
- pass
+ session_id, is_rereview = load_receipt(receipt_path)
# For re-reviews, prepend instruction to re-read spec files
if is_rereview:
@@ -415,12 +319,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None:
# Check for sandbox failures (clear stale receipt and exit)
if is_sandbox_failure(exit_code, output, stderr):
- # Clear any stale receipt to prevent false gate satisfaction
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass # Best effort - proceed to error_exit regardless
+ delete_stale_receipt(receipt_path)
msg = (
"Codex sandbox blocked operations. "
"Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access"
@@ -429,12 +328,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None:
# Handle non-sandbox failures
if exit_code != 0:
- # Clear any stale receipt to prevent false gate satisfaction
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
msg = (stderr or output or "codex exec failed").strip()
error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2)
@@ -443,12 +337,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None:
# Fail if no verdict found (don't let UNKNOWN pass as success)
if not verdict:
- # Clear any stale receipt
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
error_exit(
"Codex review completed but no verdict found in output. "
"Expected SHIP or NEEDS_WORK",
@@ -456,26 +345,15 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None:
code=2,
)
- # Write receipt if path provided (Ralph-compatible schema)
+ # Write receipt if path provided
if receipt_path:
- receipt_data = {
- "type": "plan_review", # Required by Ralph
- "id": epic_id, # Required by Ralph
- "mode": "codex",
- "verdict": verdict,
- "session_id": thread_id,
- "timestamp": now_iso(),
- "review": output, # Full review feedback for fix loop
- }
- # Add iteration if running under Ralph
- ralph_iter = os.environ.get("RALPH_ITERATION")
- if ralph_iter:
- try:
- receipt_data["iteration"] = int(ralph_iter)
- except ValueError:
- pass
- Path(receipt_path).write_text(
- json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8"
+ save_receipt(
+ receipt_path,
+ review_type="plan_review",
+ review_id=epic_id,
+ verdict=verdict,
+ session_id=thread_id,
+ output=output,
)
# Output
@@ -533,49 +411,8 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Get base branch for diff (default to main)
base_branch = args.base if hasattr(args, "base") and args.base else "main"
- # Get diff summary
- diff_summary = ""
- try:
- diff_result = subprocess.run(
- ["git", "diff", "--stat", f"{base_branch}..HEAD"],
- capture_output=True,
- text=True,
- cwd=get_repo_root(),
- )
- if diff_result.returncode == 0:
- diff_summary = diff_result.stdout.strip()
- except (subprocess.CalledProcessError, OSError):
- pass
-
- # Get actual diff content with size cap
- diff_content = ""
- max_diff_bytes = 50000
- try:
- proc = subprocess.Popen(
- ["git", "diff", f"{base_branch}..HEAD"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=get_repo_root(),
- )
- diff_bytes = proc.stdout.read(max_diff_bytes + 1)
- was_truncated = len(diff_bytes) > max_diff_bytes
- if was_truncated:
- diff_bytes = diff_bytes[:max_diff_bytes]
- while proc.stdout.read(65536):
- pass
- stderr_bytes = proc.stderr.read()
- proc.stdout.close()
- proc.stderr.close()
- returncode = proc.wait()
-
- if returncode != 0 and stderr_bytes:
- diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]"
- else:
- diff_content = diff_bytes.decode("utf-8", errors="replace").strip()
- if was_truncated:
- diff_content += "\n\n... [diff truncated at 50KB]"
- except (subprocess.CalledProcessError, OSError):
- pass
+ # Get diff summary + content via shared helper
+ diff_summary, diff_content = get_diff_context(base_branch)
# Always embed changed file contents. See cmd_codex_impl_review comment
# for rationale.
@@ -595,17 +432,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Check for existing session in receipt (indicates re-review)
receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None
- session_id = None
- is_rereview = False
- if receipt_path:
- receipt_file = Path(receipt_path)
- if receipt_file.exists():
- try:
- receipt_data = json.loads(receipt_file.read_text(encoding="utf-8"))
- session_id = receipt_data.get("session_id")
- is_rereview = session_id is not None
- except (json.JSONDecodeError, Exception):
- pass
+ session_id, is_rereview = load_receipt(receipt_path)
# For re-reviews, prepend instruction to re-read changed files
if is_rereview:
@@ -630,11 +457,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Check for sandbox failures
if is_sandbox_failure(exit_code, output, stderr):
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
msg = (
"Codex sandbox blocked operations. "
"Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access"
@@ -643,11 +466,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Handle non-sandbox failures
if exit_code != 0:
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
msg = (stderr or output or "codex exec failed").strip()
error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2)
@@ -656,11 +475,7 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Fail if no verdict found
if not verdict:
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
+ delete_stale_receipt(receipt_path)
error_exit(
"Codex review completed but no verdict found in output. "
"Expected SHIP or NEEDS_WORK",
@@ -671,27 +486,16 @@ def cmd_codex_completion_review(args: argparse.Namespace) -> None:
# Preserve session_id for continuity (avoid clobbering on resumed sessions)
session_id_to_write = thread_id or session_id
- # Write receipt if path provided (Ralph-compatible schema)
+ # Write receipt if path provided
if receipt_path:
- receipt_data = {
- "type": "completion_review", # Required by Ralph
- "id": epic_id, # Required by Ralph
- "mode": "codex",
- "base": base_branch,
- "verdict": verdict,
- "session_id": session_id_to_write,
- "timestamp": now_iso(),
- "review": output, # Full review feedback for fix loop
- }
- # Add iteration if running under Ralph
- ralph_iter = os.environ.get("RALPH_ITERATION")
- if ralph_iter:
- try:
- receipt_data["iteration"] = int(ralph_iter)
- except ValueError:
- pass
- Path(receipt_path).write_text(
- json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8"
+ save_receipt(
+ receipt_path,
+ review_type="completion_review",
+ review_id=epic_id,
+ verdict=verdict,
+ session_id=session_id_to_write,
+ output=output,
+ base_branch=base_branch,
)
# Output
diff --git a/scripts/flowctl/commands/review/prompts.py b/scripts/flowctl/commands/review/prompts.py
index b2bf127a..25277aa4 100644
--- a/scripts/flowctl/commands/review/prompts.py
+++ b/scripts/flowctl/commands/review/prompts.py
@@ -1,12 +1,6 @@
"""Review prompt builders for impl, plan, completion, and standalone reviews."""
-import json
-from typing import Any, Optional
-
-from flowctl.core.constants import EPICS_DIR, SPECS_DIR, TASKS_DIR
-from flowctl.core.io import load_json_or_exit
-from flowctl.core.paths import get_flow_dir
-from flowctl.core.git import gather_context_hints
+from typing import Optional
def build_review_prompt(
review_type: str,
@@ -567,224 +561,3 @@ def build_completion_review_prompt(
return "\n\n".join(parts)
-
-def cmd_codex_completion_review(args: argparse.Namespace) -> None:
- """Run epic completion review via codex exec.
-
- Verifies that all epic requirements are implemented before closing.
- Two-phase approach: extract requirements, then verify coverage.
- """
- if not ensure_flow_exists():
- error_exit(".flow/ does not exist", use_json=args.json)
-
- epic_id = args.epic
-
- # Validate epic ID
- if not is_epic_id(epic_id):
- error_exit(f"Invalid epic ID: {epic_id}", use_json=args.json)
-
- flow_dir = get_flow_dir()
-
- # Load epic spec
- epic_spec_path = flow_dir / SPECS_DIR / f"{epic_id}.md"
- if not epic_spec_path.exists():
- error_exit(f"Epic spec not found: {epic_spec_path}", use_json=args.json)
-
- epic_spec = epic_spec_path.read_text(encoding="utf-8")
-
- # Load task specs for this epic
- tasks_dir = flow_dir / TASKS_DIR
- task_specs_parts = []
- for task_file in sorted(tasks_dir.glob(f"{epic_id}.*.md")):
- task_id = task_file.stem
- task_content = task_file.read_text(encoding="utf-8")
- task_specs_parts.append(f"### {task_id}\n\n{task_content}")
-
- task_specs = "\n\n---\n\n".join(task_specs_parts) if task_specs_parts else ""
-
- # Get base branch for diff (default to main)
- base_branch = args.base if hasattr(args, "base") and args.base else "main"
-
- # Get diff summary
- diff_summary = ""
- try:
- diff_result = subprocess.run(
- ["git", "diff", "--stat", f"{base_branch}..HEAD"],
- capture_output=True,
- text=True,
- cwd=get_repo_root(),
- )
- if diff_result.returncode == 0:
- diff_summary = diff_result.stdout.strip()
- except (subprocess.CalledProcessError, OSError):
- pass
-
- # Get actual diff content with size cap
- diff_content = ""
- max_diff_bytes = 50000
- try:
- proc = subprocess.Popen(
- ["git", "diff", f"{base_branch}..HEAD"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=get_repo_root(),
- )
- diff_bytes = proc.stdout.read(max_diff_bytes + 1)
- was_truncated = len(diff_bytes) > max_diff_bytes
- if was_truncated:
- diff_bytes = diff_bytes[:max_diff_bytes]
- while proc.stdout.read(65536):
- pass
- stderr_bytes = proc.stderr.read()
- proc.stdout.close()
- proc.stderr.close()
- returncode = proc.wait()
-
- if returncode != 0 and stderr_bytes:
- diff_content = f"[git diff failed: {stderr_bytes.decode('utf-8', errors='replace').strip()}]"
- else:
- diff_content = diff_bytes.decode("utf-8", errors="replace").strip()
- if was_truncated:
- diff_content += "\n\n... [diff truncated at 50KB]"
- except (subprocess.CalledProcessError, OSError):
- pass
-
- # Always embed changed file contents. See cmd_codex_impl_review comment
- # for rationale.
- changed_files = get_changed_files(base_branch)
- embedded_content, embed_stats = get_embedded_file_contents(changed_files)
-
- # Only forbid disk reads when ALL files were fully embedded.
- files_embedded = not embed_stats.get("budget_skipped") and not embed_stats.get("truncated")
- prompt = build_completion_review_prompt(
- epic_spec,
- task_specs,
- diff_summary,
- diff_content,
- embedded_files=embedded_content,
- files_embedded=files_embedded,
- )
-
- # Check for existing session in receipt (indicates re-review)
- receipt_path = args.receipt if hasattr(args, "receipt") and args.receipt else None
- session_id = None
- is_rereview = False
- if receipt_path:
- receipt_file = Path(receipt_path)
- if receipt_file.exists():
- try:
- receipt_data = json.loads(receipt_file.read_text(encoding="utf-8"))
- session_id = receipt_data.get("session_id")
- is_rereview = session_id is not None
- except (json.JSONDecodeError, Exception):
- pass
-
- # For re-reviews, prepend instruction to re-read changed files
- if is_rereview:
- changed_files = get_changed_files(base_branch)
- if changed_files:
- rereview_preamble = build_rereview_preamble(
- changed_files, "completion", files_embedded
- )
- prompt = rereview_preamble + prompt
-
- # Resolve sandbox mode
- try:
- sandbox = resolve_codex_sandbox(getattr(args, "sandbox", "auto"))
- except ValueError as e:
- error_exit(str(e), use_json=args.json, code=2)
-
- # Run codex
- effort = getattr(args, "effort", "high")
- output, thread_id, exit_code, stderr = run_codex_exec(
- prompt, session_id=session_id, sandbox=sandbox, effort=effort
- )
-
- # Check for sandbox failures
- if is_sandbox_failure(exit_code, output, stderr):
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
- msg = (
- "Codex sandbox blocked operations. "
- "Try --sandbox danger-full-access (or auto) or set CODEX_SANDBOX=danger-full-access"
- )
- error_exit(msg, use_json=args.json, code=3)
-
- # Handle non-sandbox failures
- if exit_code != 0:
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
- msg = (stderr or output or "codex exec failed").strip()
- error_exit(f"codex exec failed: {msg}", use_json=args.json, code=2)
-
- # Parse verdict
- verdict = parse_codex_verdict(output)
-
- # Fail if no verdict found
- if not verdict:
- if receipt_path:
- try:
- Path(receipt_path).unlink(missing_ok=True)
- except OSError:
- pass
- error_exit(
- "Codex review completed but no verdict found in output. "
- "Expected SHIP or NEEDS_WORK",
- use_json=args.json,
- code=2,
- )
-
- # Preserve session_id for continuity (avoid clobbering on resumed sessions)
- session_id_to_write = thread_id or session_id
-
- # Write receipt if path provided (Ralph-compatible schema)
- if receipt_path:
- receipt_data = {
- "type": "completion_review", # Required by Ralph
- "id": epic_id, # Required by Ralph
- "mode": "codex",
- "base": base_branch,
- "verdict": verdict,
- "session_id": session_id_to_write,
- "timestamp": now_iso(),
- "review": output, # Full review feedback for fix loop
- }
- # Add iteration if running under Ralph
- ralph_iter = os.environ.get("RALPH_ITERATION")
- if ralph_iter:
- try:
- receipt_data["iteration"] = int(ralph_iter)
- except ValueError:
- pass
- Path(receipt_path).write_text(
- json.dumps(receipt_data, indent=2) + "\n", encoding="utf-8"
- )
-
- # Output
- if args.json:
- json_output(
- {
- "type": "completion_review",
- "id": epic_id,
- "base": base_branch,
- "verdict": verdict,
- "session_id": session_id_to_write,
- "mode": "codex",
- "review": output,
- }
- )
- else:
- print(output)
- print(f"\nVERDICT={verdict or 'UNKNOWN'}")
-
-
-# ─────────────────────────────────────────────────────────────────────────────
-# Checkpoint commands
-# ─────────────────────────────────────────────────────────────────────────────
-
diff --git a/scripts/flowctl/commands/task.py b/scripts/flowctl/commands/task.py
index 219de50c..bc4e9478 100644
--- a/scripts/flowctl/commands/task.py
+++ b/scripts/flowctl/commands/task.py
@@ -676,6 +676,16 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None:
# Full file replacement mode (like epic set-plan)
if has_file:
content = read_file_or_stdin(args.file, "Spec file", use_json=args.json)
+ # Validate spec headings before writing: reject duplicates
+ from flowctl.commands.admin import validate_task_spec_headings
+ heading_errors = validate_task_spec_headings(content)
+ # Only reject on duplicate headings, not missing ones
+ dup_errors = [e for e in heading_errors if e.startswith("Duplicate")]
+ if dup_errors:
+ error_exit(
+ f"Spec validation failed: {'; '.join(dup_errors)}",
+ use_json=args.json,
+ )
atomic_write(task_spec_path, content)
task_data["updated_at"] = now_iso()
atomic_write_json(task_json_path, task_data)
@@ -713,6 +723,17 @@ def cmd_task_set_spec(args: argparse.Namespace) -> None:
except ValueError as e:
error_exit(str(e), use_json=args.json)
+ # Validate final spec headings before writing: reject duplicates
+ from flowctl.commands.admin import validate_task_spec_headings
+ heading_errors = validate_task_spec_headings(updated_spec)
+ # Only reject on duplicate headings, not missing ones
+ dup_errors = [e for e in heading_errors if e.startswith("Duplicate")]
+ if dup_errors:
+ error_exit(
+ f"Spec validation failed after patching: {'; '.join(dup_errors)}",
+ use_json=args.json,
+ )
+
# Single atomic write for spec, single for JSON
atomic_write(task_spec_path, updated_spec)
task_data["updated_at"] = now_iso()
diff --git a/scripts/flowctl/compat.py b/scripts/flowctl/compat.py
index c539adb1..714aaf29 100644
--- a/scripts/flowctl/compat.py
+++ b/scripts/flowctl/compat.py
@@ -1,4 +1,7 @@
-"""Platform-specific compatibility: file locking (fcntl on Unix, no-op on Windows)."""
+"""Platform-specific compatibility: file locking and fsync (fcntl on Unix, no-op on Windows)."""
+
+import os
+import sys
try:
import fcntl
@@ -15,3 +18,15 @@ def _flock(f, lock_type):
LOCK_EX = 0
LOCK_UN = 0
+
+
+def _fsync(fd: int) -> None:
+ """Platform-aware fsync: uses F_FULLFSYNC on macOS for true hardware flush."""
+ if sys.platform == "darwin":
+ try:
+ import fcntl as _fcntl
+ _fcntl.fcntl(fd, _fcntl.F_FULLFSYNC)
+ return
+ except (ImportError, AttributeError, OSError):
+ pass # Fall through to os.fsync
+ os.fsync(fd)
diff --git a/scripts/flowctl/core/git.py b/scripts/flowctl/core/git.py
index e9219c90..1cd16921 100644
--- a/scripts/flowctl/core/git.py
+++ b/scripts/flowctl/core/git.py
@@ -408,6 +408,75 @@ def gather_context_hints(base_branch: str, max_hints: int = 15) -> str:
return "Consider these related files:\n" + "\n".join(hints)
+def get_diff_context(
+ base_branch: str, max_bytes: int = 50000
+) -> tuple[str, str]:
+ """Get diff summary and content between base_branch and HEAD.
+
+ Returns:
+ tuple: (diff_summary, diff_content)
+ - diff_summary: output of ``git diff --stat base_branch..HEAD``
+ - diff_content: raw diff truncated to *max_bytes*; a
+ ``[...truncated at N bytes]`` suffix is appended when truncated.
+
+ Both values default to ``""`` on any git error so callers never need
+ to handle exceptions.
+ """
+ repo_root = get_repo_root()
+
+ # 1. Diff summary (--stat)
+ diff_summary = ""
+ try:
+ stat_result = subprocess.run(
+ ["git", "diff", "--stat", f"{base_branch}..HEAD"],
+ capture_output=True,
+ text=True,
+ cwd=repo_root,
+ )
+ if stat_result.returncode == 0:
+ diff_summary = stat_result.stdout.strip()
+ except (subprocess.CalledProcessError, OSError):
+ pass
+
+ # 2. Diff content with byte-cap (avoid memory spike on large diffs)
+ diff_content = ""
+ try:
+ proc = subprocess.Popen(
+ ["git", "diff", f"{base_branch}..HEAD"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ cwd=repo_root,
+ )
+ diff_bytes = proc.stdout.read(max_bytes + 1)
+ was_truncated = len(diff_bytes) > max_bytes
+ if was_truncated:
+ diff_bytes = diff_bytes[:max_bytes]
+ # Drain remaining stdout to avoid blocking the subprocess
+ while proc.stdout.read(65536):
+ pass
+ stderr_bytes = proc.stderr.read()
+ proc.stdout.close()
+ proc.stderr.close()
+ returncode = proc.wait()
+
+ if returncode != 0 and stderr_bytes:
+ diff_content = (
+ f"[git diff failed: "
+ f"{stderr_bytes.decode('utf-8', errors='replace').strip()}]"
+ )
+ else:
+ diff_content = diff_bytes.decode("utf-8", errors="replace").strip()
+ if was_truncated:
+ diff_content += (
+ f"\n\n... [diff truncated at "
+ f"{max_bytes // 1000}KB]"
+ )
+ except (subprocess.CalledProcessError, OSError):
+ pass
+
+ return diff_summary, diff_content
+
+
def get_actor() -> str:
"""Determine current actor for soft-claim semantics.
diff --git a/scripts/flowctl/core/io.py b/scripts/flowctl/core/io.py
index 5051db96..4048625d 100644
--- a/scripts/flowctl/core/io.py
+++ b/scripts/flowctl/core/io.py
@@ -40,14 +40,18 @@ def is_supported_schema(version: Any) -> bool:
def atomic_write(path: Path, content: str) -> None:
- """Write file atomically via temp + rename."""
+ """Write file atomically via temp + rename with fsync for durability."""
+ from flowctl.compat import _fsync
+
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
+ f.flush()
+ _fsync(f.fileno())
os.replace(tmp_path, path)
- except Exception:
+ except BaseException:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
diff --git a/scripts/flowctl/core/paths.py b/scripts/flowctl/core/paths.py
index 811b8f0f..4ebb0d1e 100644
--- a/scripts/flowctl/core/paths.py
+++ b/scripts/flowctl/core/paths.py
@@ -6,6 +6,15 @@
from flowctl.core.constants import FLOW_DIR
+# Module-level cache for get_state_dir(), keyed by cwd string.
+# CLI invocations are short-lived so no expiry needed.
+_state_dir_cache: dict[str, Path] = {}
+
+
+def _reset_state_dir_cache() -> None:
+ """Clear the get_state_dir() memoization cache. For testing."""
+ _state_dir_cache.clear()
+
def get_repo_root() -> Path:
"""Find git repo root."""
@@ -35,14 +44,23 @@ def ensure_flow_exists() -> bool:
def get_state_dir() -> Path:
"""Get state directory for runtime task state.
+ Results are memoized per working directory. Call _reset_state_dir_cache()
+ to clear (e.g. in tests that change directories).
+
Resolution order:
1. FLOW_STATE_DIR env var (explicit override for orchestrators)
2. git common-dir (shared across all worktrees automatically)
3. Fallback to .flow/state for non-git repos
"""
+ cache_key = os.getcwd()
+ if cache_key in _state_dir_cache:
+ return _state_dir_cache[cache_key]
+
# 1. Explicit override
if state_dir := os.environ.get("FLOW_STATE_DIR"):
- return Path(state_dir).resolve()
+ resolved = Path(state_dir).resolve()
+ _state_dir_cache[cache_key] = resolved
+ return resolved
# 2. Git common-dir (shared across worktrees)
try:
@@ -53,9 +71,13 @@ def get_state_dir() -> Path:
check=True,
)
common = result.stdout.strip()
- return Path(common) / "flow-state"
+ resolved = Path(common) / "flow-state"
+ _state_dir_cache[cache_key] = resolved
+ return resolved
except subprocess.CalledProcessError:
pass
# 3. Fallback for non-git repos
- return get_flow_dir() / "state"
+ resolved = get_flow_dir() / "state"
+ _state_dir_cache[cache_key] = resolved
+ return resolved
diff --git a/scripts/flowctl/core/state.py b/scripts/flowctl/core/state.py
index 5eed790e..724db508 100644
--- a/scripts/flowctl/core/state.py
+++ b/scripts/flowctl/core/state.py
@@ -1,6 +1,7 @@
"""State management: StateStore, task state operations."""
import json
+import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
@@ -8,7 +9,7 @@
from flowctl.compat import _flock, LOCK_EX, LOCK_UN
from flowctl.core.constants import RUNTIME_FIELDS, TASKS_DIR
-from flowctl.core.ids import normalize_task
+from flowctl.core.ids import is_task_id, normalize_task
from flowctl.core.io import (
atomic_write,
atomic_write_json,
@@ -134,6 +135,67 @@ def load_task_with_state(task_id: str, use_json: bool = True) -> dict:
return normalize_task(merged)
+def load_all_tasks_with_state(epic_id: str | None = None) -> dict[str, dict]:
+ """Load all tasks with merged runtime state in a single directory scan.
+
+ Uses os.scandir for efficient single-pass directory listing, optionally
+ filtered by epic_id prefix. Returns dict keyed by task ID.
+
+ Args:
+ epic_id: If provided, only load tasks whose ID starts with this prefix.
+ E.g., "fn-4-slug" loads "fn-4-slug.1", "fn-4-slug.2", etc.
+
+ Returns:
+ Dict of {task_id: merged_task_data}.
+ """
+ flow_dir = get_flow_dir()
+ tasks_dir = flow_dir / TASKS_DIR
+ if not tasks_dir.exists():
+ return {}
+
+ store = get_state_store()
+ result: dict[str, dict] = {}
+
+ # Single scandir pass — filter by .json suffix and optional epic prefix
+ prefix = f"{epic_id}." if epic_id else "fn-"
+ try:
+ entries = os.scandir(tasks_dir)
+ except OSError:
+ return {}
+
+ for entry in entries:
+ if not entry.name.endswith(".json"):
+ continue
+ if not entry.name.startswith(prefix):
+ continue
+
+ task_id = entry.name[:-5] # strip .json
+ if not is_task_id(task_id):
+ continue
+
+ # Load definition
+ try:
+ with open(entry.path, encoding="utf-8") as f:
+ definition = json.load(f)
+ except (json.JSONDecodeError, IOError):
+ continue
+
+ if "id" not in definition:
+ continue # Skip artifact files
+
+ # Load runtime state
+ runtime = store.load_runtime(task_id)
+ if runtime is None:
+ runtime = {k: definition[k] for k in RUNTIME_FIELDS if k in definition}
+ if not runtime:
+ runtime = {"status": "todo"}
+
+ merged = {**definition, **runtime}
+ result[task_id] = normalize_task(merged)
+
+ return result
+
+
def save_task_runtime(task_id: str, updates: dict) -> None:
"""Write runtime state only (merge with existing). Never touch definition file."""
store = get_state_store()
@@ -168,8 +230,33 @@ def _file_locks_path() -> Path:
return get_state_dir() / "file_locks.json"
+def _file_locks_mutex_path() -> Path:
+ """Path to the mutex file for file_locks.json read-modify-write."""
+ return get_state_dir() / "file_locks.mutex"
+
+
+@contextmanager
+def _file_locks_mutex():
+ """Acquire mutual exclusion for file_locks.json read-modify-write.
+
+ Prevents race conditions when concurrent workers both read the same state,
+ decide a file is unlocked, and both write — second overwriting first's lock.
+ """
+ mutex_path = _file_locks_mutex_path()
+ mutex_path.parent.mkdir(parents=True, exist_ok=True)
+ with open(mutex_path, "w") as f:
+ try:
+ _flock(f, LOCK_EX)
+ yield
+ finally:
+ _flock(f, LOCK_UN)
+
+
def _load_file_locks() -> dict:
- """Load file lock registry. Returns {file_path: {task_id, locked_at}}."""
+ """Load file lock registry. Returns {file_path: {task_id, locked_at}}.
+
+ NOTE: Callers doing read-modify-write must wrap in _file_locks_mutex().
+ """
path = _file_locks_path()
if not path.exists():
return {}
@@ -188,34 +275,42 @@ def _save_file_locks(locks: dict) -> None:
def lock_files(task_id: str, files: list[str]) -> dict:
- """Lock files for a task. Returns {locked: [...], already_locked: [{file, owner}]}."""
- locks = _load_file_locks()
- locked = []
- already_locked = []
- for f in files:
- existing = locks.get(f)
- if existing and existing["task_id"] != task_id:
- already_locked.append({"file": f, "owner": existing["task_id"]})
- else:
- locks[f] = {"task_id": task_id, "locked_at": now_iso()}
- locked.append(f)
- _save_file_locks(locks)
+ """Lock files for a task. Returns {locked: [...], already_locked: [{file, owner}]}.
+
+ Uses fcntl.flock for mutual exclusion to prevent race conditions.
+ """
+ with _file_locks_mutex():
+ locks = _load_file_locks()
+ locked = []
+ already_locked = []
+ for f in files:
+ existing = locks.get(f)
+ if existing and existing["task_id"] != task_id:
+ already_locked.append({"file": f, "owner": existing["task_id"]})
+ else:
+ locks[f] = {"task_id": task_id, "locked_at": now_iso()}
+ locked.append(f)
+ _save_file_locks(locks)
return {"locked": locked, "already_locked": already_locked}
def unlock_files(task_id: str, files: list[str] | None = None) -> list[str]:
- """Unlock files owned by task_id. If files=None, unlock all files for this task."""
- locks = _load_file_locks()
- unlocked = []
- to_remove = []
- for f, info in locks.items():
- if info["task_id"] == task_id:
- if files is None or f in files:
- to_remove.append(f)
- unlocked.append(f)
- for f in to_remove:
- del locks[f]
- _save_file_locks(locks)
+ """Unlock files owned by task_id. If files=None, unlock all files for this task.
+
+ Uses fcntl.flock for mutual exclusion to prevent race conditions.
+ """
+ with _file_locks_mutex():
+ locks = _load_file_locks()
+ unlocked = []
+ to_remove = []
+ for f, info in locks.items():
+ if info["task_id"] == task_id:
+ if files is None or f in files:
+ to_remove.append(f)
+ unlocked.append(f)
+ for f in to_remove:
+ del locks[f]
+ _save_file_locks(locks)
return unlocked
@@ -231,10 +326,14 @@ def list_file_locks() -> dict:
def clear_file_locks() -> int:
- """Clear all file locks. Returns count cleared."""
- locks = _load_file_locks()
- count = len(locks)
- _save_file_locks({})
+ """Clear all file locks. Returns count cleared.
+
+ Uses fcntl.flock for mutual exclusion to prevent race conditions.
+ """
+ with _file_locks_mutex():
+ locks = _load_file_locks()
+ count = len(locks)
+ _save_file_locks({})
return count
diff --git a/scripts/flowctl/tests/__init__.py b/scripts/flowctl/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/scripts/flowctl/tests/conftest.py b/scripts/flowctl/tests/conftest.py
new file mode 100644
index 00000000..466705df
--- /dev/null
+++ b/scripts/flowctl/tests/conftest.py
@@ -0,0 +1,98 @@
+"""Shared pytest fixtures for flowctl tests."""
+
+import json
+import os
+import subprocess
+
+import pytest
+
+
+@pytest.fixture
+def flow_dir(tmp_path, monkeypatch):
+ """Set up a minimal .flow/ directory structure in a temp directory.
+
+ Creates:
+ .flow/meta.json — schema version 2
+ .flow/epics/ — empty
+ .flow/tasks/ — empty
+ .flow/specs/ — empty
+ .flow/config.json — default config
+
+ Changes cwd to tmp_path so flowctl path resolution finds .flow/.
+ """
+ flow = tmp_path / ".flow"
+ flow.mkdir()
+ (flow / "epics").mkdir()
+ (flow / "tasks").mkdir()
+ (flow / "specs").mkdir()
+
+ meta = {"schema_version": 2, "next_epic": 1}
+ (flow / "meta.json").write_text(json.dumps(meta, indent=2) + "\n", encoding="utf-8")
+
+ config = {"memory": {"enabled": True}, "stack": {}}
+ (flow / "config.json").write_text(
+ json.dumps(config, indent=2) + "\n", encoding="utf-8"
+ )
+
+ monkeypatch.chdir(tmp_path)
+ return flow
+
+
+@pytest.fixture
+def git_repo(tmp_path, monkeypatch):
+ """Initialize a git repo in tmp_path with a .flow/ directory.
+
+ Needed by state.py's get_state_dir() which calls git rev-parse.
+ Returns the tmp_path (repo root).
+ """
+ monkeypatch.chdir(tmp_path)
+
+ subprocess.run(
+ ["git", "init"],
+ cwd=str(tmp_path),
+ capture_output=True,
+ check=True,
+ )
+ subprocess.run(
+ ["git", "config", "user.email", "test@test.com"],
+ cwd=str(tmp_path),
+ capture_output=True,
+ check=True,
+ )
+ subprocess.run(
+ ["git", "config", "user.name", "Test"],
+ cwd=str(tmp_path),
+ capture_output=True,
+ check=True,
+ )
+
+ # Create .flow/ structure (same as flow_dir fixture but inside git repo)
+ flow = tmp_path / ".flow"
+ flow.mkdir()
+ (flow / "epics").mkdir()
+ (flow / "tasks").mkdir()
+ (flow / "specs").mkdir()
+
+ meta = {"schema_version": 2, "next_epic": 1}
+ (flow / "meta.json").write_text(json.dumps(meta, indent=2) + "\n", encoding="utf-8")
+
+ config = {"memory": {"enabled": True}, "stack": {}}
+ (flow / "config.json").write_text(
+ json.dumps(config, indent=2) + "\n", encoding="utf-8"
+ )
+
+ # Initial commit so git rev-parse works
+ subprocess.run(
+ ["git", "add", "."],
+ cwd=str(tmp_path),
+ capture_output=True,
+ check=True,
+ )
+ subprocess.run(
+ ["git", "commit", "-m", "init"],
+ cwd=str(tmp_path),
+ capture_output=True,
+ check=True,
+ )
+
+ return tmp_path
diff --git a/scripts/flowctl/tests/test_admin.py b/scripts/flowctl/tests/test_admin.py
new file mode 100644
index 00000000..1fa751fd
--- /dev/null
+++ b/scripts/flowctl/tests/test_admin.py
@@ -0,0 +1,107 @@
+"""Tests for flowctl.commands.admin — validate_task_spec_headings."""
+
+import pytest
+
+from flowctl.commands.admin import validate_task_spec_headings
+
+
+VALID_SPEC = """\
+# fn-1.1 Some task title
+
+## Description
+This is the description.
+
+## Acceptance
+- [ ] Criterion A
+- [ ] Criterion B
+
+## Done summary
+TBD
+
+## Evidence
+- Commits:
+- Tests:
+- PRs:
+"""
+
+
+class TestValidateTaskSpecHeadings:
+ """Tests for validate_task_spec_headings()."""
+
+ def test_valid_spec_no_errors(self):
+ """A well-formed spec should produce zero errors."""
+ errors = validate_task_spec_headings(VALID_SPEC)
+ assert errors == []
+
+ def test_missing_description(self):
+ """Spec without ## Description should report missing heading."""
+ content = VALID_SPEC.replace("## Description\n", "")
+ errors = validate_task_spec_headings(content)
+ assert any("Missing required heading: ## Description" in e for e in errors)
+
+ def test_missing_acceptance(self):
+ """Spec without ## Acceptance should report missing heading."""
+ content = VALID_SPEC.replace("## Acceptance\n", "")
+ errors = validate_task_spec_headings(content)
+ assert any("Missing required heading: ## Acceptance" in e for e in errors)
+
+ def test_missing_done_summary(self):
+ """Spec without ## Done summary should report missing heading."""
+ content = VALID_SPEC.replace("## Done summary\n", "")
+ errors = validate_task_spec_headings(content)
+ assert any("Missing required heading: ## Done summary" in e for e in errors)
+
+ def test_missing_evidence(self):
+ """Spec without ## Evidence should report missing heading."""
+ content = VALID_SPEC.replace("## Evidence\n", "")
+ errors = validate_task_spec_headings(content)
+ assert any("Missing required heading: ## Evidence" in e for e in errors)
+
+ def test_missing_all_headings(self):
+ """Spec with no headings at all should report 4 missing."""
+ content = "# Title\n\nJust some text, no sections.\n"
+ errors = validate_task_spec_headings(content)
+ assert len(errors) == 4
+ assert all("Missing required heading" in e for e in errors)
+
+ def test_duplicate_description(self):
+ """Spec with duplicate ## Description should report duplicate."""
+ content = VALID_SPEC + "\n## Description\nAnother description.\n"
+ errors = validate_task_spec_headings(content)
+ assert any("Duplicate heading: ## Description" in e for e in errors)
+
+ def test_duplicate_acceptance(self):
+ """Spec with duplicate ## Acceptance should report duplicate."""
+ content = VALID_SPEC + "\n## Acceptance\n- [ ] Extra\n"
+ errors = validate_task_spec_headings(content)
+ assert any("Duplicate heading: ## Acceptance" in e for e in errors)
+
+ def test_heading_inside_code_block_ignored(self):
+ """Headings inside fenced code blocks should be ignored."""
+ content = VALID_SPEC + "\n```\n## Description\n```\n"
+ errors = validate_task_spec_headings(content)
+ # Fenced code blocks are stripped before checking — no duplicate
+ assert errors == []
+
+ def test_heading_inside_tagged_code_block_ignored(self):
+ """Headings inside fenced code blocks with language tags should be ignored."""
+ content = VALID_SPEC + "\n```markdown\n## Description\n## Acceptance\n```\n"
+ errors = validate_task_spec_headings(content)
+ assert errors == []
+
+ def test_heading_with_trailing_whitespace(self):
+ """Headings with trailing whitespace should still be recognized."""
+ content = VALID_SPEC.replace("## Description\n", "## Description \n")
+ errors = validate_task_spec_headings(content)
+ assert errors == []
+
+ def test_empty_content(self):
+ """Empty content should report all headings as missing."""
+ errors = validate_task_spec_headings("")
+ assert len(errors) == 4
+
+ def test_similar_heading_not_confused(self):
+ """## Descriptions (plural) should NOT satisfy ## Description."""
+ content = VALID_SPEC.replace("## Description\n", "## Descriptions\n")
+ errors = validate_task_spec_headings(content)
+ assert any("Missing required heading: ## Description" in e for e in errors)
diff --git a/scripts/flowctl/tests/test_config.py b/scripts/flowctl/tests/test_config.py
new file mode 100644
index 00000000..0bdf1594
--- /dev/null
+++ b/scripts/flowctl/tests/test_config.py
@@ -0,0 +1,131 @@
+"""Tests for flowctl.core.config — deep_merge, config loading."""
+
+import json
+
+import pytest
+
+from flowctl.core.config import deep_merge, get_default_config, load_flow_config
+
+
+class TestDeepMerge:
+ """Tests for deep_merge() — recursive dict merging."""
+
+ def test_flat_merge(self):
+ """Simple key-value merge, override wins."""
+ base = {"a": 1, "b": 2}
+ override = {"b": 3, "c": 4}
+ result = deep_merge(base, override)
+ assert result == {"a": 1, "b": 3, "c": 4}
+
+ def test_nested_merge(self):
+ """Nested dicts should be merged recursively."""
+ base = {"level1": {"a": 1, "b": 2}}
+ override = {"level1": {"b": 3, "c": 4}}
+ result = deep_merge(base, override)
+ assert result == {"level1": {"a": 1, "b": 3, "c": 4}}
+
+ def test_deep_nested_merge(self):
+ """Multiple levels of nesting should all merge."""
+ base = {"l1": {"l2": {"l3": {"a": 1, "b": 2}}}}
+ override = {"l1": {"l2": {"l3": {"b": 3}}}}
+ result = deep_merge(base, override)
+ assert result == {"l1": {"l2": {"l3": {"a": 1, "b": 3}}}}
+
+ def test_override_dict_with_scalar(self):
+ """When override provides a scalar for a dict key, scalar wins."""
+ base = {"a": {"nested": True}}
+ override = {"a": "flat_value"}
+ result = deep_merge(base, override)
+ assert result == {"a": "flat_value"}
+
+ def test_override_scalar_with_dict(self):
+ """When override provides a dict for a scalar key, dict wins."""
+ base = {"a": "flat_value"}
+ override = {"a": {"nested": True}}
+ result = deep_merge(base, override)
+ assert result == {"a": {"nested": True}}
+
+ def test_empty_override(self):
+ """Empty override should return base unchanged."""
+ base = {"a": 1, "b": {"c": 2}}
+ result = deep_merge(base, {})
+ assert result == base
+
+ def test_empty_base(self):
+ """Empty base should return override."""
+ override = {"a": 1, "b": {"c": 2}}
+ result = deep_merge({}, override)
+ assert result == override
+
+ def test_both_empty(self):
+ result = deep_merge({}, {})
+ assert result == {}
+
+ def test_does_not_mutate_base(self):
+ """deep_merge should not mutate the base dict."""
+ base = {"a": 1, "b": {"c": 2}}
+ original_base = {"a": 1, "b": {"c": 2}}
+ deep_merge(base, {"a": 99})
+ assert base == original_base
+
+ def test_lists_are_replaced_not_merged(self):
+ """Lists should be replaced entirely, not concatenated."""
+ base = {"items": [1, 2, 3]}
+ override = {"items": [4, 5]}
+ result = deep_merge(base, override)
+ assert result == {"items": [4, 5]}
+
+ def test_none_override(self):
+ """None in override should replace base value."""
+ base = {"a": "something"}
+ override = {"a": None}
+ result = deep_merge(base, override)
+ assert result == {"a": None}
+
+
+class TestGetDefaultConfig:
+ def test_has_required_keys(self):
+ config = get_default_config()
+ assert "memory" in config
+ assert "planSync" in config
+ assert "review" in config
+ assert "scouts" in config
+ assert "stack" in config
+
+ def test_memory_enabled_by_default(self):
+ config = get_default_config()
+ assert config["memory"]["enabled"] is True
+
+
+class TestLoadFlowConfig:
+ def test_returns_defaults_when_no_config(self, flow_dir, monkeypatch):
+ """When config.json is missing, should return defaults."""
+ # Remove the config file that flow_dir fixture created
+ config_path = flow_dir / "config.json"
+ config_path.unlink()
+
+ config = load_flow_config()
+ defaults = get_default_config()
+ assert config == defaults
+
+ def test_merges_with_defaults(self, flow_dir, monkeypatch):
+ """Partial config should be merged with defaults."""
+ config_path = flow_dir / "config.json"
+ partial = {"memory": {"enabled": False}}
+ config_path.write_text(json.dumps(partial) + "\n", encoding="utf-8")
+
+ config = load_flow_config()
+ # Overridden value
+ assert config["memory"]["enabled"] is False
+ # Default values still present
+ assert "planSync" in config
+ assert "review" in config
+
+ def test_handles_invalid_json(self, flow_dir, monkeypatch):
+ """Invalid JSON should return defaults."""
+ config_path = flow_dir / "config.json"
+ config_path.write_text("not json{{{", encoding="utf-8")
+
+ config = load_flow_config()
+ defaults = get_default_config()
+ assert config == defaults
diff --git a/scripts/flowctl/tests/test_guard.py b/scripts/flowctl/tests/test_guard.py
new file mode 100644
index 00000000..8b5e701b
--- /dev/null
+++ b/scripts/flowctl/tests/test_guard.py
@@ -0,0 +1,699 @@
+"""Tests for ralph-guard.py hook logic with fixture-based JSON payloads.
+
+ralph-guard.py is a standalone Python script (not a package module), so we
+import it via importlib. Each test function builds a fixture dict that mimics
+the JSON Claude Code sends to hooks, then calls the handler directly.
+
+Because handle_pre_tool_use / handle_post_tool_use call sys.exit(), we catch
+SystemExit to inspect the exit code (0 = allow, 2 = block).
+"""
+
+import importlib.util
+import json
+import os
+import sys
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+# ---------------------------------------------------------------------------
+# Import ralph-guard.py as a module (filename contains a hyphen)
+# ---------------------------------------------------------------------------
+_GUARD_PATH = (
+ Path(__file__).resolve().parents[2] / "hooks" / "ralph-guard.py"
+)
+
+
+def _load_guard():
+ """Load ralph-guard.py as a Python module."""
+ spec = importlib.util.spec_from_file_location("ralph_guard", _GUARD_PATH)
+ mod = importlib.util.module_from_spec(spec)
+ # Patch FLOW_RALPH so module-level code doesn't exit during import
+ with patch.dict(os.environ, {"FLOW_RALPH": "1"}):
+ spec.loader.exec_module(mod)
+ return mod
+
+
+guard = _load_guard()
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _pre_payload(command: str, *, session_id: str = "test-session") -> dict:
+ """Build a PreToolUse fixture for a Bash command."""
+ return {
+ "hook_event_name": "PreToolUse",
+ "tool_name": "Bash",
+ "tool_input": {"command": command},
+ "session_id": session_id,
+ }
+
+
+def _post_payload(
+ command: str,
+ response: str = "",
+ *,
+ session_id: str = "test-session",
+) -> dict:
+ """Build a PostToolUse fixture for a Bash command."""
+ return {
+ "hook_event_name": "PostToolUse",
+ "tool_name": "Bash",
+ "tool_input": {"command": command},
+ "tool_response": {"stdout": response},
+ "session_id": session_id,
+ }
+
+
+def _stop_payload(*, session_id: str = "test-session", stop_hook_active: bool = False) -> dict:
+ """Build a Stop event fixture."""
+ return {
+ "hook_event_name": "Stop",
+ "session_id": session_id,
+ "stop_hook_active": stop_hook_active,
+ }
+
+
+def _edit_payload(file_path: str, *, session_id: str = "test-session") -> dict:
+ """Build a PreToolUse fixture for an Edit tool (protected file check)."""
+ return {
+ "hook_event_name": "PreToolUse",
+ "tool_name": "Edit",
+ "tool_input": {"file_path": file_path},
+ "session_id": session_id,
+ }
+
+
+def _reset_state(session_id: str = "test-session") -> None:
+ """Remove any leftover state file for the session."""
+ state_file = guard.get_state_file(session_id)
+ if state_file.exists():
+ state_file.unlink()
+
+
+@pytest.fixture(autouse=True)
+def _clean_state():
+ """Ensure clean state before and after each test."""
+ _reset_state()
+ yield
+ _reset_state()
+
+
+# ===================================================================
+# PreToolUse — blocked command patterns (acceptance: >= 5 cases)
+# ===================================================================
+
+
+class TestPreToolUseBlocked:
+ """Commands that ralph-guard MUST block (exit code 2)."""
+
+ def test_chat_send_json_flag(self):
+ """--json on chat-send suppresses review text."""
+ data = _pre_payload('rp chat-send --json --message "review"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_chat_send_new_chat_on_re_review(self):
+ """--new-chat on re-reviews loses reviewer context."""
+ # Simulate a first chat already sent
+ state = guard.load_state("test-session")
+ state["chats_sent"] = 1
+ guard.save_state("test-session", state)
+
+ data = _pre_payload('rp chat-send --new-chat --message "re-review"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_direct_codex_exec(self):
+ """Direct 'codex exec' must be blocked — use flowctl wrappers."""
+ data = _pre_payload('codex exec "review this code"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_direct_codex_review(self):
+ """Direct 'codex review' must be blocked."""
+ data = _pre_payload("codex review --diff main..HEAD")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_codex_last_flag(self):
+ """--last flag on codex (even through wrapper) breaks session continuity."""
+ data = _pre_payload("flowctl codex impl-review --last")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_setup_review_missing_repo_root(self):
+ """setup-review without --repo-root must be blocked."""
+ data = _pre_payload('setup-review --summary "test"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_setup_review_missing_summary(self):
+ """setup-review without --summary must be blocked."""
+ data = _pre_payload("setup-review --repo-root /tmp/repo")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_select_add_missing_window(self):
+ """select-add without --window must be blocked."""
+ data = _pre_payload("select-add src/main.py")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_flowctl_done_missing_evidence(self):
+ """flowctl done without --evidence-json must be blocked."""
+ data = _pre_payload(
+ 'flowctl done fn-1.1 --summary-file /tmp/s.md'
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_flowctl_done_missing_summary(self):
+ """flowctl done without --summary-file must be blocked."""
+ data = _pre_payload(
+ "flowctl done fn-1.1 --evidence-json /tmp/e.json"
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_receipt_write_before_review(self, monkeypatch):
+ """Cannot write receipt before chat-send or codex review succeeds."""
+ monkeypatch.setenv("REVIEW_RECEIPT_PATH", "/tmp/receipts/impl-fn-1.1.json")
+ data = _pre_payload(
+ 'cat > "/tmp/receipts/impl-fn-1.1.json" << \'EOF\'\n'
+ '{"type":"impl_review","id":"fn-1.1","verdict":"SHIP"}\n'
+ "EOF"
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+
+# ===================================================================
+# PreToolUse — allowed command patterns (acceptance: >= 5 cases)
+# ===================================================================
+
+
+class TestPreToolUseAllowed:
+ """Commands that ralph-guard must allow (exit code 0)."""
+
+ def test_flowctl_codex_impl_review(self):
+ """flowctl codex wrapper calls are allowed."""
+ data = _pre_payload("flowctl codex impl-review fn-1.1 --base abc123")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_flowctl_codex_plan_review(self):
+ """flowctl codex plan-review wrapper is allowed."""
+ data = _pre_payload("flowctl codex plan-review fn-1")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_chat_send_no_json(self):
+ """chat-send without --json is fine."""
+ data = _pre_payload('rp chat-send --message "review this"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_chat_send_new_chat_first_review(self):
+ """--new-chat on FIRST review is allowed (chats_sent == 0)."""
+ data = _pre_payload('rp chat-send --new-chat --message "first review"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_flowctl_done_with_all_flags(self):
+ """flowctl done with both required flags is allowed."""
+ data = _pre_payload(
+ "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json"
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_plain_git_command(self):
+ """Regular git commands should pass through."""
+ data = _pre_payload("git status")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_flowctl_show(self):
+ """Non-done flowctl commands pass through."""
+ data = _pre_payload("flowctl show fn-1.1 --json")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_setup_review_complete(self):
+ """setup-review with all required flags is allowed."""
+ data = _pre_payload(
+ 'setup-review --repo-root /tmp/repo --summary "review this"'
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_select_add_with_window(self):
+ """select-add with --window is allowed."""
+ data = _pre_payload('select-add --window "1" --tab "abc" src/main.py')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_FLOWCTL_variable_done(self):
+ """$FLOWCTL done with both flags is allowed."""
+ data = _pre_payload(
+ '"$FLOWCTL" done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json'
+ )
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_flowctl_done_help(self):
+ """flowctl done --help should not be blocked."""
+ data = _pre_payload("flowctl done --help")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+
+# ===================================================================
+# PostToolUse — state transitions (acceptance criterion)
+# ===================================================================
+
+
+class TestPostToolUseStateTransitions:
+ """Verify state tracking across PostToolUse events."""
+
+ def test_chat_send_success_sets_state(self):
+ """Successful chat-send sets chat_send_succeeded and increments chats_sent."""
+ data = _post_payload(
+ 'rp chat-send --message "review"',
+ "Chat Send completed\nNEEDS_WORK\nFix the tests.",
+ )
+ # handle_post_tool_use calls sys.exit(0) at the end
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert state["chat_send_succeeded"] is True
+ assert state["chats_sent"] == 1
+
+ def test_chat_send_null_clears_state(self):
+ """chat-send returning {"chat": null} clears chat_send_succeeded."""
+ # First, simulate a previous success
+ initial = guard.load_state("test-session")
+ initial["chat_send_succeeded"] = True
+ guard.save_state("test-session", initial)
+
+ data = _post_payload(
+ 'rp chat-send --json --message "review"',
+ '{"chat": null}',
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert state["chat_send_succeeded"] is False
+
+ def test_flowctl_done_tracks_task_id(self):
+ """flowctl done adds task to flowctl_done_called set."""
+ data = _post_payload(
+ "flowctl done fn-1.2 --summary-file /tmp/s.md --evidence-json /tmp/e.json",
+ '{"status": "done", "task": "fn-1.2"}',
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert "fn-1.2" in state["flowctl_done_called"]
+
+ def test_flowctl_done_accumulates(self):
+ """Multiple flowctl done calls accumulate task IDs."""
+ # First task
+ data1 = _post_payload(
+ "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json",
+ '{"status": "done", "task": "fn-1.1"}',
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data1)
+
+ # Second task
+ data2 = _post_payload(
+ "flowctl done fn-1.2 --summary-file /tmp/s.md --evidence-json /tmp/e.json",
+ '{"status": "done", "task": "fn-1.2"}',
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data2)
+
+ state = guard.load_state("test-session")
+ assert "fn-1.1" in state["flowctl_done_called"]
+ assert "fn-1.2" in state["flowctl_done_called"]
+
+ def test_codex_review_success(self):
+ """Codex review with verdict sets codex_review_succeeded."""
+ data = _post_payload(
+ "flowctl codex impl-review fn-1.1 --base abc123",
+ "Review output...\nSHIP\nAll good.",
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert state["codex_review_succeeded"] is True
+ assert state["last_verdict"] == "SHIP"
+
+ def test_codex_needs_work_verdict(self):
+ """Codex review with NEEDS_WORK verdict."""
+ data = _post_payload(
+ "flowctl codex impl-review fn-1.1 --base abc123",
+ "Review output...\nNEEDS_WORK\nFix errors.",
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert state["codex_review_succeeded"] is True
+ assert state["last_verdict"] == "NEEDS_WORK"
+
+ def test_receipt_write_resets_review_state(self, monkeypatch):
+ """Writing a receipt resets chat_send_succeeded and codex_review_succeeded."""
+ monkeypatch.setenv("REVIEW_RECEIPT_PATH", "/tmp/receipts/impl-fn-1.1.json")
+
+ # Set up state as if review succeeded
+ initial = guard.load_state("test-session")
+ initial["chat_send_succeeded"] = True
+ initial["codex_review_succeeded"] = True
+ guard.save_state("test-session", initial)
+
+ data = _post_payload(
+ 'cat > "/tmp/receipts/impl-fn-1.1.json" <MAJOR_RETHINK tag.",
+ )
+ with pytest.raises(SystemExit):
+ guard.handle_post_tool_use(data)
+
+ state = guard.load_state("test-session")
+ assert state["last_verdict"] == "MAJOR_RETHINK"
+
+
+# ===================================================================
+# Edge cases (acceptance criterion)
+# ===================================================================
+
+
+class TestEdgeCases:
+ """Edge cases: done in comments, quoted strings, multi-line commands."""
+
+ def test_done_in_comment_not_blocked(self):
+ """'done' in a shell comment should not trigger flowctl done checks."""
+ # This command has "done" but NOT as "flowctl done" or "FLOWCTL done"
+ data = _pre_payload("echo 'task is done' # just a comment")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_done_in_echo_not_blocked(self):
+ """'done' as part of an echo statement (no flowctl context) is fine."""
+ data = _pre_payload('echo "all tasks done successfully"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+ def test_codex_in_string_blocked(self):
+ """'codex' word boundary match catches standalone usage."""
+ # The guard uses \\bcodex\\b, so 'codex' as a standalone word matches
+ data = _pre_payload('codex exec "check code"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_flowctl_done_in_multiline(self):
+ """flowctl done in a multi-line command is still checked."""
+ cmd = (
+ "# First line\n"
+ "flowctl done fn-1.1 --summary-file /tmp/s.md --evidence-json /tmp/e.json\n"
+ "# Last line"
+ )
+ data = _pre_payload(cmd)
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0 # Allowed because all flags present
+
+ def test_flowctl_done_multiline_missing_evidence(self):
+ """flowctl done in multi-line without evidence is blocked."""
+ cmd = (
+ "# First line\n"
+ "flowctl done fn-1.1 --summary-file /tmp/s.md\n"
+ "# Last line"
+ )
+ data = _pre_payload(cmd)
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_FLOWCTL_env_var_done(self):
+ """$FLOWCTL done is still validated (environment variable invocation)."""
+ data = _pre_payload('"$FLOWCTL" done fn-1.1 --summary-file /tmp/s.md')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2 # Missing --evidence-json
+
+ def test_chat_send_json_in_message_body(self):
+ """--json flag after chat-send is blocked even with surrounding text."""
+ data = _pre_payload('rp chat-send --mode review --json --message "test"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 2
+
+ def test_new_chat_allowed_when_zero_chats(self):
+ """--new-chat is only blocked on re-reviews (chats_sent > 0)."""
+ # chats_sent defaults to 0
+ data = _pre_payload('rp chat-send --new-chat --message "first"')
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_pre_tool_use(data)
+ assert exc.value.code == 0
+
+
+# ===================================================================
+# Protected file checks
+# ===================================================================
+
+
+class TestProtectedFiles:
+ """Block Edit/Write to protected workflow files."""
+
+ def test_edit_ralph_guard_blocked(self):
+ """Cannot edit ralph-guard.py."""
+ data = _edit_payload("/path/to/scripts/hooks/ralph-guard.py")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_protected_file_check(data)
+ assert exc.value.code == 2
+
+ def test_edit_flowctl_py_blocked(self):
+ """Cannot edit flowctl.py."""
+ data = _edit_payload("/path/to/scripts/flowctl.py")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_protected_file_check(data)
+ assert exc.value.code == 2
+
+ def test_edit_hooks_json_blocked(self):
+ """Cannot edit hooks/hooks.json."""
+ data = _edit_payload("/path/to/hooks/hooks.json")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_protected_file_check(data)
+ assert exc.value.code == 2
+
+ def test_edit_flowctl_dir_blocked(self):
+ """Cannot edit the flowctl directory itself (endswith /flowctl/)."""
+ # The pattern "/flowctl/" matches paths ending with "/flowctl/"
+ # (e.g., a directory path), not files inside it
+ data = _edit_payload("/path/to/scripts/flowctl/")
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_protected_file_check(data)
+ assert exc.value.code == 2
+
+ def test_edit_normal_file_allowed(self):
+ """Editing a normal file is not blocked (no exit)."""
+ data = _edit_payload("/path/to/src/main.py")
+ # Should return None (no exit) since file is not protected
+ result = guard.handle_protected_file_check(data)
+ assert result is None
+
+
+# ===================================================================
+# parse_receipt_path
+# ===================================================================
+
+
+class TestParseReceiptPath:
+ """Test receipt path parsing for type/id extraction."""
+
+ def test_plan_receipt_legacy(self):
+ """plan-fn-1.json -> (plan_review, fn-1)"""
+ assert guard.parse_receipt_path("plan-fn-1.json") == ("plan_review", "fn-1")
+
+ def test_impl_receipt_legacy(self):
+ """impl-fn-1.2.json -> (impl_review, fn-1.2)"""
+ assert guard.parse_receipt_path("impl-fn-1.2.json") == ("impl_review", "fn-1.2")
+
+ def test_completion_receipt_legacy(self):
+ """completion-fn-1.json -> (completion_review, fn-1)"""
+ assert guard.parse_receipt_path("completion-fn-1.json") == (
+ "completion_review",
+ "fn-1",
+ )
+
+ def test_impl_receipt_with_slug(self):
+ """impl-fn-4-flowctl-comprehensive-optimization-and.3.json -> (impl_review, fn-4-..-.3)"""
+ rtype, rid = guard.parse_receipt_path(
+ "impl-fn-4-flowctl-comprehensive-optimization-and.3.json"
+ )
+ assert rtype == "impl_review"
+ assert rid.startswith("fn-4")
+ assert rid.endswith(".3")
+
+ def test_plan_receipt_with_slug(self):
+ """plan-fn-4-flowctl-comprehensive.json"""
+ rtype, rid = guard.parse_receipt_path(
+ "plan-fn-4-flowctl-comprehensive.json"
+ )
+ assert rtype == "plan_review"
+ assert "fn-4" in rid
+
+ def test_unknown_format_fallback(self):
+ """Unknown filename pattern returns fallback."""
+ assert guard.parse_receipt_path("random-file.json") == ("impl_review", "UNKNOWN")
+
+
+# ===================================================================
+# State persistence
+# ===================================================================
+
+
+class TestStatePersistence:
+ """Test state load/save round-trip and defaults."""
+
+ def test_fresh_state_defaults(self):
+ """Fresh state has expected defaults."""
+ state = guard.load_state("fresh-session")
+ assert state["chats_sent"] == 0
+ assert state["last_verdict"] is None
+ assert state["chat_send_succeeded"] is False
+ assert state["codex_review_succeeded"] is False
+ assert isinstance(state["flowctl_done_called"], set)
+ assert len(state["flowctl_done_called"]) == 0
+ # Clean up
+ _reset_state("fresh-session")
+
+ def test_round_trip_with_set(self):
+ """State with set survives JSON round-trip."""
+ state = guard.load_state("rt-session")
+ state["flowctl_done_called"] = {"fn-1.1", "fn-1.2"}
+ state["chat_send_succeeded"] = True
+ guard.save_state("rt-session", state)
+
+ loaded = guard.load_state("rt-session")
+ assert loaded["chat_send_succeeded"] is True
+ assert "fn-1.1" in loaded["flowctl_done_called"]
+ assert "fn-1.2" in loaded["flowctl_done_called"]
+ # Clean up
+ sf = guard.get_state_file("rt-session")
+ if sf.exists():
+ sf.unlink()
+
+ def test_corrupt_state_returns_defaults(self):
+ """Corrupt state file returns default state."""
+ state_file = guard.get_state_file("corrupt-session")
+ state_file.write_text("not valid json{{{")
+ state = guard.load_state("corrupt-session")
+ assert state["chats_sent"] == 0
+ assert state["chat_send_succeeded"] is False
+ # Clean up
+ if state_file.exists():
+ state_file.unlink()
+
+
+# ===================================================================
+# Stop event
+# ===================================================================
+
+
+class TestStopEvent:
+ """Test Stop handler behavior."""
+
+ def test_stop_with_no_receipt_path(self):
+ """Stop with no REVIEW_RECEIPT_PATH set exits cleanly."""
+ data = _stop_payload()
+ with patch.dict(os.environ, {}, clear=False):
+ os.environ.pop("REVIEW_RECEIPT_PATH", None)
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_stop(data)
+ assert exc.value.code == 0
+
+ def test_stop_hook_active_prevents_loop(self):
+ """stop_hook_active=True exits immediately (infinite loop guard)."""
+ data = _stop_payload(stop_hook_active=True)
+ with pytest.raises(SystemExit) as exc:
+ guard.handle_stop(data)
+ assert exc.value.code == 0
+
+ def test_stop_cleans_up_state_file(self):
+ """Stop cleans up the session state file."""
+ # Create state
+ state = guard.load_state("test-session")
+ state["chats_sent"] = 5
+ guard.save_state("test-session", state)
+ assert guard.get_state_file("test-session").exists()
+
+ data = _stop_payload()
+ with patch.dict(os.environ, {}, clear=False):
+ os.environ.pop("REVIEW_RECEIPT_PATH", None)
+ with pytest.raises(SystemExit):
+ guard.handle_stop(data)
+
+ assert not guard.get_state_file("test-session").exists()
diff --git a/scripts/flowctl/tests/test_ids.py b/scripts/flowctl/tests/test_ids.py
new file mode 100644
index 00000000..96a6a1c6
--- /dev/null
+++ b/scripts/flowctl/tests/test_ids.py
@@ -0,0 +1,272 @@
+"""Tests for flowctl.core.ids — ID parsing, generation, and validation."""
+
+import pytest
+
+from flowctl.core.ids import (
+ epic_id_from_task,
+ generate_epic_suffix,
+ is_epic_id,
+ is_task_id,
+ normalize_epic,
+ normalize_task,
+ parse_id,
+ slugify,
+ task_priority,
+)
+
+
+# --- parse_id ---
+
+
+class TestParseId:
+ """Tests for parse_id() — the highest-risk function, used everywhere."""
+
+ def test_legacy_epic(self):
+ """fn-1 -> (1, None)"""
+ assert parse_id("fn-1") == (1, None)
+
+ def test_legacy_task(self):
+ """fn-1.2 -> (1, 2)"""
+ assert parse_id("fn-1.2") == (1, 2)
+
+ def test_legacy_large_numbers(self):
+ """fn-99.100 -> (99, 100)"""
+ assert parse_id("fn-99.100") == (99, 100)
+
+ def test_short_suffix_epic(self):
+ """fn-5-x7k -> (5, None)"""
+ assert parse_id("fn-5-x7k") == (5, None)
+
+ def test_short_suffix_task(self):
+ """fn-5-x7k.3 -> (5, 3)"""
+ assert parse_id("fn-5-x7k.3") == (5, 3)
+
+ def test_slug_suffix_epic(self):
+ """fn-4-flowctl-comprehensive-optimization-and -> (4, None)"""
+ assert parse_id("fn-4-flowctl-comprehensive-optimization-and") == (4, None)
+
+ def test_slug_suffix_task(self):
+ """fn-4-flowctl-comprehensive-optimization-and.1 -> (4, 1)"""
+ assert parse_id("fn-4-flowctl-comprehensive-optimization-and.1") == (4, 1)
+
+ def test_two_char_suffix(self):
+ """fn-3-ab -> (3, None)"""
+ assert parse_id("fn-3-ab") == (3, None)
+
+ def test_single_char_suffix(self):
+ """fn-3-a -> (3, None)"""
+ assert parse_id("fn-3-a") == (3, None)
+
+ def test_invalid_empty(self):
+ assert parse_id("") == (None, None)
+
+ def test_invalid_no_prefix(self):
+ assert parse_id("task-1") == (None, None)
+
+ def test_invalid_no_number(self):
+ assert parse_id("fn-") == (None, None)
+
+ def test_invalid_no_fn_prefix(self):
+ assert parse_id("1.2") == (None, None)
+
+ def test_invalid_uppercase(self):
+ """Suffixes must be lowercase."""
+ assert parse_id("fn-1-ABC") == (None, None)
+
+ def test_invalid_special_chars(self):
+ assert parse_id("fn-1-a_b") == (None, None)
+
+ def test_invalid_double_dot(self):
+ assert parse_id("fn-1.2.3") == (None, None)
+
+ def test_zero_epic(self):
+ """fn-0 should be valid (0 is a valid integer)."""
+ assert parse_id("fn-0") == (0, None)
+
+ def test_zero_task(self):
+ """fn-0.0 should be valid."""
+ assert parse_id("fn-0.0") == (0, 0)
+
+ def test_suffix_with_digits(self):
+ """fn-2-a3b -> (2, None)"""
+ assert parse_id("fn-2-a3b") == (2, None)
+
+ def test_long_slug_with_many_segments(self):
+ """Multi-segment slug suffix with task number."""
+ assert parse_id("fn-10-my-long-slug-name.5") == (10, 5)
+
+
+# --- is_epic_id / is_task_id ---
+
+
+class TestIsEpicId:
+ def test_epic_id(self):
+ assert is_epic_id("fn-1") is True
+
+ def test_task_id_not_epic(self):
+ assert is_epic_id("fn-1.2") is False
+
+ def test_invalid_not_epic(self):
+ assert is_epic_id("garbage") is False
+
+ def test_slug_epic(self):
+ assert is_epic_id("fn-4-flowctl-opt") is True
+
+
+class TestIsTaskId:
+ def test_task_id(self):
+ assert is_task_id("fn-1.2") is True
+
+ def test_epic_id_not_task(self):
+ assert is_task_id("fn-1") is False
+
+ def test_invalid_not_task(self):
+ assert is_task_id("garbage") is False
+
+ def test_slug_task(self):
+ assert is_task_id("fn-4-opt.1") is True
+
+
+# --- epic_id_from_task ---
+
+
+class TestEpicIdFromTask:
+ def test_legacy(self):
+ assert epic_id_from_task("fn-1.2") == "fn-1"
+
+ def test_with_suffix(self):
+ assert epic_id_from_task("fn-5-x7k.3") == "fn-5-x7k"
+
+ def test_with_slug(self):
+ assert epic_id_from_task("fn-4-flowctl-opt.1") == "fn-4-flowctl-opt"
+
+ def test_invalid_raises(self):
+ with pytest.raises(ValueError):
+ epic_id_from_task("fn-1") # epic ID, not task
+
+ def test_garbage_raises(self):
+ with pytest.raises(ValueError):
+ epic_id_from_task("garbage")
+
+
+# --- slugify ---
+
+
+class TestSlugify:
+ def test_basic(self):
+ assert slugify("Hello World") == "hello-world"
+
+ def test_special_chars(self):
+ assert slugify("Hello! @World#") == "hello-world"
+
+ def test_underscores(self):
+ assert slugify("my_cool_thing") == "my-cool-thing"
+
+ def test_unicode(self):
+ result = slugify("café résumé")
+ assert result == "cafe-resume"
+
+ def test_max_length(self):
+ result = slugify("a" * 100, max_length=10)
+ assert len(result) <= 10
+
+ def test_max_length_word_boundary(self):
+ result = slugify("hello-world-foo-bar", max_length=12)
+ # Should truncate at word boundary
+ assert len(result) <= 12
+ assert not result.endswith("-")
+
+ def test_empty_returns_none(self):
+ assert slugify("!!!") is None
+
+ def test_all_whitespace_returns_none(self):
+ # After processing, should produce empty string
+ assert slugify(" ") is None
+
+
+# --- generate_epic_suffix ---
+
+
+class TestGenerateEpicSuffix:
+ def test_default_length(self):
+ suffix = generate_epic_suffix()
+ assert len(suffix) == 3
+
+ def test_custom_length(self):
+ suffix = generate_epic_suffix(length=5)
+ assert len(suffix) == 5
+
+ def test_valid_chars(self):
+ """Suffix should only contain lowercase letters and digits."""
+ import string
+
+ valid = set(string.ascii_lowercase + string.digits)
+ for _ in range(20): # run multiple times for randomness
+ suffix = generate_epic_suffix()
+ assert all(c in valid for c in suffix)
+
+
+# --- normalize_epic / normalize_task ---
+
+
+class TestNormalizeEpic:
+ def test_adds_missing_fields(self):
+ data = {}
+ result = normalize_epic(data)
+ assert result["plan_review_status"] == "unknown"
+ assert result["plan_reviewed_at"] is None
+ assert result["completion_review_status"] == "unknown"
+ assert result["completion_reviewed_at"] is None
+ assert result["branch_name"] is None
+ assert result["depends_on_epics"] == []
+ assert result["default_impl"] is None
+ assert result["default_review"] is None
+ assert result["default_sync"] is None
+ assert result["gaps"] == []
+
+ def test_preserves_existing(self):
+ data = {"plan_review_status": "ship", "branch_name": "my-branch"}
+ result = normalize_epic(data)
+ assert result["plan_review_status"] == "ship"
+ assert result["branch_name"] == "my-branch"
+
+
+class TestNormalizeTask:
+ def test_adds_missing_fields(self):
+ data = {}
+ result = normalize_task(data)
+ assert result["priority"] is None
+ assert result["depends_on"] == []
+ assert result["impl"] is None
+ assert result["review"] is None
+ assert result["sync"] is None
+
+ def test_migrates_legacy_deps(self):
+ data = {"deps": ["fn-1.1", "fn-1.2"]}
+ result = normalize_task(data)
+ assert result["depends_on"] == ["fn-1.1", "fn-1.2"]
+
+ def test_depends_on_preferred_over_deps(self):
+ data = {"depends_on": ["fn-1.3"], "deps": ["fn-1.1"]}
+ result = normalize_task(data)
+ assert result["depends_on"] == ["fn-1.3"]
+
+
+# --- task_priority ---
+
+
+class TestTaskPriority:
+ def test_none_priority(self):
+ assert task_priority({"priority": None}) == 999
+
+ def test_missing_priority(self):
+ assert task_priority({}) == 999
+
+ def test_numeric_priority(self):
+ assert task_priority({"priority": 1}) == 1
+
+ def test_string_priority(self):
+ assert task_priority({"priority": "5"}) == 5
+
+ def test_invalid_priority(self):
+ assert task_priority({"priority": "not-a-number"}) == 999
diff --git a/scripts/flowctl/tests/test_io.py b/scripts/flowctl/tests/test_io.py
new file mode 100644
index 00000000..264e0e71
--- /dev/null
+++ b/scripts/flowctl/tests/test_io.py
@@ -0,0 +1,155 @@
+"""Tests for flowctl.core.io — atomic writes, JSON loading, output formatting."""
+
+import json
+import os
+
+import pytest
+
+from flowctl.core.io import (
+ atomic_write,
+ atomic_write_json,
+ is_supported_schema,
+ load_json,
+ now_iso,
+)
+
+
+class TestAtomicWrite:
+ """Tests for atomic_write() — temp + rename pattern."""
+
+ def test_creates_file(self, tmp_path):
+ """atomic_write should create a new file with correct content."""
+ target = tmp_path / "output.txt"
+ atomic_write(target, "hello world\n")
+ assert target.exists()
+ assert target.read_text(encoding="utf-8") == "hello world\n"
+
+ def test_overwrites_existing(self, tmp_path):
+ """atomic_write should overwrite existing file."""
+ target = tmp_path / "output.txt"
+ target.write_text("old content", encoding="utf-8")
+ atomic_write(target, "new content")
+ assert target.read_text(encoding="utf-8") == "new content"
+
+ def test_creates_parent_dirs(self, tmp_path):
+ """atomic_write should create parent directories if missing."""
+ target = tmp_path / "a" / "b" / "c" / "output.txt"
+ atomic_write(target, "deep file")
+ assert target.read_text(encoding="utf-8") == "deep file"
+
+ def test_cleans_up_on_error(self, tmp_path):
+ """atomic_write should remove temp file if write fails."""
+ target = tmp_path / "output.txt"
+
+ class BadStr:
+ """Object that raises on write."""
+
+ def __str__(self):
+ raise RuntimeError("boom")
+
+ # This should raise because we can't write a non-string
+ with pytest.raises(TypeError):
+ atomic_write(target, BadStr()) # type: ignore
+
+ # No temp files should remain
+ remaining = list(tmp_path.glob("*.tmp"))
+ assert remaining == [], f"Temp files not cleaned up: {remaining}"
+
+ def test_atomic_write_unicode(self, tmp_path):
+ """atomic_write should handle unicode content."""
+ target = tmp_path / "unicode.txt"
+ text = "Hello \u00e9\u00e8\u00ea \u2603 \U0001f600"
+ atomic_write(target, text)
+ content = target.read_text(encoding="utf-8")
+ assert "\u00e9" in content
+ assert "\u2603" in content
+ assert "\U0001f600" in content
+
+
+class TestAtomicWriteJson:
+ """Tests for atomic_write_json() — sorted keys, trailing newline."""
+
+ def test_roundtrip(self, tmp_path):
+ """Write JSON and load it back."""
+ target = tmp_path / "data.json"
+ data = {"b": 2, "a": 1, "nested": {"x": True}}
+ atomic_write_json(target, data)
+
+ loaded = load_json(target)
+ assert loaded == data
+
+ def test_sorted_keys(self, tmp_path):
+ """Keys should be sorted in output."""
+ target = tmp_path / "data.json"
+ atomic_write_json(target, {"z": 1, "a": 2, "m": 3})
+ raw = target.read_text(encoding="utf-8")
+ # 'a' should appear before 'm' which should appear before 'z'
+ assert raw.index('"a"') < raw.index('"m"') < raw.index('"z"')
+
+ def test_trailing_newline(self, tmp_path):
+ """JSON output should end with newline."""
+ target = tmp_path / "data.json"
+ atomic_write_json(target, {"key": "value"})
+ raw = target.read_text(encoding="utf-8")
+ assert raw.endswith("\n")
+
+
+class TestLoadJson:
+ """Tests for load_json() — basic JSON file loading."""
+
+ def test_load_valid(self, tmp_path):
+ target = tmp_path / "valid.json"
+ target.write_text('{"key": "value"}', encoding="utf-8")
+ result = load_json(target)
+ assert result == {"key": "value"}
+
+ def test_load_invalid_raises(self, tmp_path):
+ target = tmp_path / "invalid.json"
+ target.write_text("not json", encoding="utf-8")
+ with pytest.raises(json.JSONDecodeError):
+ load_json(target)
+
+ def test_load_missing_raises(self, tmp_path):
+ target = tmp_path / "missing.json"
+ with pytest.raises(FileNotFoundError):
+ load_json(target)
+
+
+class TestNowIso:
+ """Tests for now_iso() — ISO timestamp generation."""
+
+ def test_format(self):
+ result = now_iso()
+ assert result.endswith("Z")
+ # Should be parseable as ISO format
+ assert "T" in result
+
+ def test_contains_date_parts(self):
+ result = now_iso()
+ # Should contain year-month-day
+ parts = result.split("T")
+ assert len(parts) == 2
+ date_parts = parts[0].split("-")
+ assert len(date_parts) == 3 # year, month, day
+
+
+class TestIsSupportedSchema:
+ """Tests for is_supported_schema() — version checking."""
+
+ def test_version_1(self):
+ assert is_supported_schema(1) is True
+
+ def test_version_2(self):
+ assert is_supported_schema(2) is True
+
+ def test_version_string(self):
+ """String versions should also work via int conversion."""
+ assert is_supported_schema("1") is True
+ assert is_supported_schema("2") is True
+
+ def test_unsupported_version(self):
+ assert is_supported_schema(99) is False
+
+ def test_invalid_type(self):
+ assert is_supported_schema("abc") is False
+ assert is_supported_schema(None) is False
diff --git a/scripts/flowctl/tests/test_review.py b/scripts/flowctl/tests/test_review.py
new file mode 100644
index 00000000..7568cca4
--- /dev/null
+++ b/scripts/flowctl/tests/test_review.py
@@ -0,0 +1,195 @@
+"""Tests for flowctl.commands.review.codex_utils — pure function tests."""
+
+import os
+
+import pytest
+
+from flowctl.commands.review.codex_utils import (
+ is_sandbox_failure,
+ parse_codex_verdict,
+ resolve_codex_sandbox,
+)
+
+
+# --- parse_codex_verdict ---
+
+
+class TestParseCodexVerdict:
+ """Tests for parse_codex_verdict()."""
+
+ def test_ship_verdict(self):
+ output = "Review looks good.\nSHIP\n"
+ assert parse_codex_verdict(output) == "SHIP"
+
+ def test_needs_work_verdict(self):
+ output = "Found issues.\nNEEDS_WORK\n"
+ assert parse_codex_verdict(output) == "NEEDS_WORK"
+
+ def test_major_rethink_verdict(self):
+ output = "Fundamental problems.\nMAJOR_RETHINK\n"
+ assert parse_codex_verdict(output) == "MAJOR_RETHINK"
+
+ def test_no_verdict_returns_none(self):
+ output = "Some review output without a verdict tag."
+ assert parse_codex_verdict(output) is None
+
+ def test_empty_output_returns_none(self):
+ assert parse_codex_verdict("") is None
+
+ def test_malformed_verdict_tag(self):
+ """Verdict tag with wrong content should return None."""
+ output = "APPROVE"
+ assert parse_codex_verdict(output) is None
+
+ def test_verdict_in_middle_of_text(self):
+ """Verdict embedded in longer output."""
+ output = (
+ "## Review\n\nLooks great.\n\n"
+ "### Summary\nAll checks pass.\n\n"
+ "SHIP\n\n"
+ "End of review."
+ )
+ assert parse_codex_verdict(output) == "SHIP"
+
+ def test_multiple_verdicts_returns_first(self):
+ """If multiple verdict tags exist, return the first one."""
+ output = "NEEDS_WORK\nAfter fixes:\nSHIP"
+ # re.search returns the first match
+ assert parse_codex_verdict(output) == "NEEDS_WORK"
+
+ def test_verdict_case_sensitive(self):
+ """Verdict values are case-sensitive — lowercase should not match."""
+ output = "ship"
+ assert parse_codex_verdict(output) is None
+
+ def test_partial_verdict_tag(self):
+ """Incomplete verdict tag should not match."""
+ output = "SHIP"
+ assert parse_codex_verdict(output) is None
+
+
+# --- resolve_codex_sandbox ---
+
+
+class TestResolveCodexSandbox:
+ """Tests for resolve_codex_sandbox()."""
+
+ def test_explicit_read_only(self):
+ assert resolve_codex_sandbox("read-only") == "read-only"
+
+ def test_explicit_danger_full_access(self):
+ assert resolve_codex_sandbox("danger-full-access") == "danger-full-access"
+
+ def test_explicit_workspace_write(self):
+ assert resolve_codex_sandbox("workspace-write") == "workspace-write"
+
+ def test_invalid_mode_raises(self):
+ with pytest.raises(ValueError, match="Invalid sandbox value"):
+ resolve_codex_sandbox("invalid-mode")
+
+ def test_auto_resolves_on_unix(self, monkeypatch):
+ """On unix (os.name != 'nt'), auto resolves to read-only."""
+ monkeypatch.setattr(os, "name", "posix")
+ monkeypatch.delenv("CODEX_SANDBOX", raising=False)
+ assert resolve_codex_sandbox("auto") == "read-only"
+
+ def test_auto_resolves_on_windows(self, monkeypatch):
+ """On Windows (os.name == 'nt'), auto resolves to danger-full-access."""
+ monkeypatch.setattr(os, "name", "nt")
+ monkeypatch.delenv("CODEX_SANDBOX", raising=False)
+ assert resolve_codex_sandbox("auto") == "danger-full-access"
+
+ def test_env_var_overrides_auto(self, monkeypatch):
+ """CODEX_SANDBOX env var should override auto resolution."""
+ monkeypatch.setenv("CODEX_SANDBOX", "workspace-write")
+ assert resolve_codex_sandbox("auto") == "workspace-write"
+
+ def test_explicit_overrides_env(self, monkeypatch):
+ """Explicit CLI value should override env var."""
+ monkeypatch.setenv("CODEX_SANDBOX", "workspace-write")
+ assert resolve_codex_sandbox("read-only") == "read-only"
+
+ def test_invalid_env_var_raises(self, monkeypatch):
+ """Invalid CODEX_SANDBOX env value should raise ValueError."""
+ monkeypatch.setenv("CODEX_SANDBOX", "bad-value")
+ with pytest.raises(ValueError, match="Invalid CODEX_SANDBOX value"):
+ resolve_codex_sandbox("auto")
+
+ def test_empty_string_treated_as_auto(self, monkeypatch):
+ """Empty string should resolve like auto."""
+ monkeypatch.setattr(os, "name", "posix")
+ monkeypatch.delenv("CODEX_SANDBOX", raising=False)
+ assert resolve_codex_sandbox("") == "read-only"
+
+ def test_whitespace_stripped(self, monkeypatch):
+ """Leading/trailing whitespace should be stripped."""
+ assert resolve_codex_sandbox(" read-only ") == "read-only"
+
+
+# --- is_sandbox_failure ---
+
+
+class TestIsSandboxFailure:
+ """Tests for is_sandbox_failure()."""
+
+ def test_success_exit_code_never_sandbox_failure(self):
+ """Exit code 0 should never be detected as sandbox failure."""
+ assert is_sandbox_failure(0, "", "blocked by policy") is False
+
+ def test_blocked_by_policy_in_stderr(self):
+ assert is_sandbox_failure(1, "", "Error: blocked by policy") is True
+
+ def test_rejected_by_policy_in_stderr(self):
+ assert is_sandbox_failure(1, "", "rejected by policy: read") is True
+
+ def test_filesystem_read_blocked_in_stderr(self):
+ assert is_sandbox_failure(1, "", "filesystem read is blocked") is True
+
+ def test_filesystem_write_blocked_in_stderr(self):
+ assert is_sandbox_failure(1, "", "filesystem write is blocked") is True
+
+ def test_shell_command_blocked_in_stderr(self):
+ assert is_sandbox_failure(1, "", "shell command was blocked") is True
+
+ def test_appcontainer_in_stderr(self):
+ assert is_sandbox_failure(1, "", "AppContainer restriction") is True
+
+ def test_unrelated_error_not_sandbox(self):
+ """Non-sandbox errors should return False."""
+ assert is_sandbox_failure(1, "", "connection refused") is False
+
+ def test_empty_stderr_not_sandbox(self):
+ assert is_sandbox_failure(1, "", "") is False
+
+ def test_json_stdout_failed_item_with_policy(self):
+ """Failed item in JSON stdout with rejection message."""
+ import json
+
+ item = {
+ "type": "item.completed",
+ "item": {
+ "status": "failed",
+ "aggregated_output": "rejected by policy: filesystem read",
+ },
+ }
+ stdout = json.dumps(item)
+ assert is_sandbox_failure(1, stdout, "") is True
+
+ def test_json_stdout_success_item_not_sandbox(self):
+ """Successful item in JSON stdout should not be sandbox failure."""
+ import json
+
+ item = {
+ "type": "item.completed",
+ "item": {
+ "status": "completed",
+ "aggregated_output": "all good",
+ },
+ }
+ stdout = json.dumps(item)
+ assert is_sandbox_failure(1, stdout, "") is False
+
+ def test_case_insensitive_pattern_matching(self):
+ """Patterns should match case-insensitively."""
+ assert is_sandbox_failure(1, "", "BLOCKED BY POLICY") is True
+ assert is_sandbox_failure(1, "", "Blocked By Policy") is True
diff --git a/scripts/flowctl/tests/test_state.py b/scripts/flowctl/tests/test_state.py
new file mode 100644
index 00000000..891dcc68
--- /dev/null
+++ b/scripts/flowctl/tests/test_state.py
@@ -0,0 +1,178 @@
+"""Tests for flowctl.core.state — StateStore and task loading with state merge."""
+
+import json
+import os
+
+import pytest
+
+from flowctl.core.state import LocalFileStateStore, load_task_with_state
+
+
+class TestLocalFileStateStore:
+ """Tests for LocalFileStateStore — file-based state with fcntl locking."""
+
+ def test_save_and_load_runtime(self, tmp_path):
+ """save_runtime creates file, load_runtime reads it back."""
+ store = LocalFileStateStore(tmp_path)
+ store.save_runtime("fn-1.1", {"status": "in_progress", "assignee": "alice"})
+
+ result = store.load_runtime("fn-1.1")
+ assert result is not None
+ assert result["status"] == "in_progress"
+ assert result["assignee"] == "alice"
+
+ def test_load_missing_returns_none(self, tmp_path):
+ """load_runtime returns None for non-existent task."""
+ store = LocalFileStateStore(tmp_path)
+ assert store.load_runtime("fn-999.1") is None
+
+ def test_load_corrupt_returns_none(self, tmp_path):
+ """load_runtime returns None for corrupt JSON."""
+ store = LocalFileStateStore(tmp_path)
+ tasks_dir = tmp_path / "tasks"
+ tasks_dir.mkdir(parents=True)
+ corrupt_path = tasks_dir / "fn-1.1.state.json"
+ corrupt_path.write_text("not valid json{{{", encoding="utf-8")
+
+ assert store.load_runtime("fn-1.1") is None
+
+ def test_save_overwrites(self, tmp_path):
+ """save_runtime overwrites existing state."""
+ store = LocalFileStateStore(tmp_path)
+ store.save_runtime("fn-1.1", {"status": "todo"})
+ store.save_runtime("fn-1.1", {"status": "done"})
+
+ result = store.load_runtime("fn-1.1")
+ assert result["status"] == "done"
+
+ def test_list_runtime_files_empty(self, tmp_path):
+ """list_runtime_files returns empty list when no state files."""
+ store = LocalFileStateStore(tmp_path)
+ assert store.list_runtime_files() == []
+
+ def test_list_runtime_files(self, tmp_path):
+ """list_runtime_files returns task IDs that have state."""
+ store = LocalFileStateStore(tmp_path)
+ store.save_runtime("fn-1.1", {"status": "todo"})
+ store.save_runtime("fn-1.2", {"status": "done"})
+
+ ids = store.list_runtime_files()
+ assert sorted(ids) == ["fn-1.1", "fn-1.2"]
+
+ def test_lock_task_context_manager(self, tmp_path):
+ """lock_task should work as a context manager without errors."""
+ store = LocalFileStateStore(tmp_path)
+ with store.lock_task("fn-1.1"):
+ # Should be able to do operations inside the lock
+ store.save_runtime("fn-1.1", {"status": "in_progress"})
+
+ result = store.load_runtime("fn-1.1")
+ assert result["status"] == "in_progress"
+
+ def test_state_path(self, tmp_path):
+ """Internal _state_path should use correct naming convention."""
+ store = LocalFileStateStore(tmp_path)
+ path = store._state_path("fn-1.1")
+ assert path.name == "fn-1.1.state.json"
+ assert path.parent.name == "tasks"
+
+
+class TestLoadTaskWithState:
+ """Tests for load_task_with_state — merges definition with runtime state."""
+
+ def _setup_task(self, git_repo, task_id, definition, runtime=None):
+ """Helper to create task definition and optional runtime state."""
+ flow_dir = git_repo / ".flow"
+ tasks_dir = flow_dir / "tasks"
+ tasks_dir.mkdir(parents=True, exist_ok=True)
+
+ # Write definition file
+ def_path = tasks_dir / f"{task_id}.json"
+ def_path.write_text(
+ json.dumps(definition, indent=2) + "\n", encoding="utf-8"
+ )
+
+ if runtime is not None:
+ # Write runtime state to git state dir
+ # Use FLOW_STATE_DIR env var to point to a known location
+ state_dir = git_repo / ".git" / "flow-state"
+ state_tasks = state_dir / "tasks"
+ state_tasks.mkdir(parents=True, exist_ok=True)
+ state_path = state_tasks / f"{task_id}.state.json"
+ state_path.write_text(
+ json.dumps(runtime, indent=2) + "\n", encoding="utf-8"
+ )
+ os.environ["FLOW_STATE_DIR"] = str(state_dir)
+
+ def test_merges_correctly(self, git_repo, monkeypatch):
+ """Runtime state should override definition for runtime fields."""
+ definition = {
+ "id": "fn-1.1",
+ "title": "Test task",
+ "epic": "fn-1",
+ "status": "todo",
+ }
+ runtime = {"status": "in_progress", "assignee": "bob"}
+ self._setup_task(git_repo, "fn-1.1", definition, runtime)
+
+ result = load_task_with_state("fn-1.1")
+ assert result["status"] == "in_progress"
+ assert result["assignee"] == "bob"
+ assert result["title"] == "Test task"
+
+ def test_handles_missing_state(self, git_repo, monkeypatch):
+ """When no runtime state exists, should use definition fields or defaults."""
+ definition = {
+ "id": "fn-1.1",
+ "title": "Test task",
+ "epic": "fn-1",
+ "status": "todo",
+ }
+ self._setup_task(git_repo, "fn-1.1", definition)
+
+ # Point FLOW_STATE_DIR to an empty directory
+ empty_state = git_repo / "empty-state"
+ empty_state.mkdir()
+ os.environ["FLOW_STATE_DIR"] = str(empty_state)
+
+ result = load_task_with_state("fn-1.1")
+ # Should fall back to definition's status
+ assert result["status"] == "todo"
+ assert result["title"] == "Test task"
+
+ def test_normalizes_task(self, git_repo, monkeypatch):
+ """Result should have all normalized fields (priority, depends_on, etc.)."""
+ definition = {
+ "id": "fn-1.1",
+ "title": "Test task",
+ "epic": "fn-1",
+ }
+ self._setup_task(git_repo, "fn-1.1", definition)
+
+ empty_state = git_repo / "empty-state"
+ empty_state.mkdir(exist_ok=True)
+ os.environ["FLOW_STATE_DIR"] = str(empty_state)
+
+ result = load_task_with_state("fn-1.1")
+ # normalize_task should have added defaults
+ assert result["priority"] is None
+ assert result["depends_on"] == []
+ assert result["impl"] is None
+ assert result["review"] is None
+ assert result["sync"] is None
+
+ def test_definition_without_status(self, git_repo, monkeypatch):
+ """When definition has no status and no runtime state, defaults to 'todo'."""
+ definition = {
+ "id": "fn-1.1",
+ "title": "No status task",
+ "epic": "fn-1",
+ }
+ self._setup_task(git_repo, "fn-1.1", definition)
+
+ empty_state = git_repo / "empty-state"
+ empty_state.mkdir(exist_ok=True)
+ os.environ["FLOW_STATE_DIR"] = str(empty_state)
+
+ result = load_task_with_state("fn-1.1")
+ assert result["status"] == "todo"
diff --git a/scripts/flowctl/tests/test_task.py b/scripts/flowctl/tests/test_task.py
new file mode 100644
index 00000000..3c1e9056
--- /dev/null
+++ b/scripts/flowctl/tests/test_task.py
@@ -0,0 +1,114 @@
+"""Tests for flowctl.commands.task — patch_task_section."""
+
+import pytest
+
+from flowctl.commands.task import patch_task_section
+
+
+SAMPLE_SPEC = """\
+# fn-1.1 Some task
+
+## Description
+Old description content.
+
+## Acceptance
+- [ ] Criterion A
+
+## Done summary
+TBD
+
+## Evidence
+- Commits:
+- Tests:
+- PRs:"""
+
+
+class TestPatchTaskSection:
+ """Tests for patch_task_section()."""
+
+ def test_replace_description(self):
+ """Replacing ## Description content should preserve other sections."""
+ result = patch_task_section(SAMPLE_SPEC, "## Description", "New description.")
+ assert "New description." in result
+ assert "Old description content." not in result
+ # Other sections preserved
+ assert "## Acceptance" in result
+ assert "Criterion A" in result
+ assert "## Done summary" in result
+ assert "## Evidence" in result
+
+ def test_replace_acceptance(self):
+ """Replacing ## Acceptance content."""
+ result = patch_task_section(
+ SAMPLE_SPEC, "## Acceptance", "- [ ] New criterion"
+ )
+ assert "- [ ] New criterion" in result
+ assert "Criterion A" not in result
+
+ def test_replace_done_summary(self):
+ """Replacing ## Done summary content."""
+ result = patch_task_section(
+ SAMPLE_SPEC, "## Done summary", "Implemented the feature."
+ )
+ assert "Implemented the feature." in result
+ assert "\nTBD\n" not in result
+
+ def test_add_missing_section(self):
+ """Patching a section that doesn't exist should auto-append it."""
+ content = "# Title\n\n## Description\nSome text.\n"
+ result = patch_task_section(content, "## New Section", "New content here.")
+ assert "## New Section" in result
+ assert "New content here." in result
+ # Original content preserved
+ assert "## Description" in result
+ assert "Some text." in result
+
+ def test_duplicate_heading_raises(self):
+ """Patching when target heading appears multiple times should raise ValueError."""
+ content = SAMPLE_SPEC + "\n\n## Description\nDuplicate."
+ with pytest.raises(ValueError, match="duplicate heading"):
+ patch_task_section(content, "## Description", "New text")
+
+ def test_strips_heading_from_new_content(self):
+ """If new_content starts with the section heading, it should be stripped."""
+ result = patch_task_section(
+ SAMPLE_SPEC, "## Description", "## Description\nContent after heading."
+ )
+ # Should not have double heading
+ lines = result.split("\n")
+ desc_count = sum(1 for l in lines if l.strip() == "## Description")
+ assert desc_count == 1
+ assert "Content after heading." in result
+
+ def test_multiline_replacement(self):
+ """Multi-line content should be preserved correctly."""
+ new_content = "Line 1\nLine 2\nLine 3"
+ result = patch_task_section(SAMPLE_SPEC, "## Description", new_content)
+ assert "Line 1" in result
+ assert "Line 2" in result
+ assert "Line 3" in result
+
+ def test_empty_replacement(self):
+ """Replacing with empty content should clear the section body."""
+ result = patch_task_section(SAMPLE_SPEC, "## Description", "")
+ # Section heading still present, but body cleared
+ assert "## Description" in result
+ # Next section should follow
+ assert "## Acceptance" in result
+
+ def test_preserves_title_line(self):
+ """The title line (# fn-1.1 ...) should be preserved."""
+ result = patch_task_section(SAMPLE_SPEC, "## Description", "New desc.")
+ assert result.startswith("# fn-1.1 Some task")
+
+ def test_trailing_whitespace_stripped(self):
+ """New content trailing whitespace should be stripped."""
+ result = patch_task_section(
+ SAMPLE_SPEC, "## Description", "Content with trailing \n\n\n"
+ )
+ # Check that trailing newlines from new_content are stripped
+ desc_idx = result.index("## Description")
+ acc_idx = result.index("## Acceptance")
+ between = result[desc_idx:acc_idx]
+ # Should not have excessive trailing newlines
+ assert not between.endswith("\n\n\n\n")
diff --git a/scripts/hooks/ralph-guard.py b/scripts/hooks/ralph-guard.py
index 526b0e40..d2f647d7 100755
--- a/scripts/hooks/ralph-guard.py
+++ b/scripts/hooks/ralph-guard.py
@@ -378,7 +378,8 @@ def handle_post_tool_use(data: dict) -> None:
# - "$FLOWCTL" done
if " done " in command and ("flowctl" in command or "FLOWCTL" in command):
# Debug logging
- with Path("/tmp/ralph-guard-debug.log").open("a") as f:
+ debug_file = _debug_log_path()
+ with debug_file.open("a") as f:
f.write(f" -> flowctl done detected in: {command[:100]}...\n")
# Extract task ID from command - look for "done" followed by task ID
@@ -386,7 +387,7 @@ def handle_post_tool_use(data: dict) -> None:
done_match = re.search(r"\bdone\s+([a-zA-Z0-9][a-zA-Z0-9._-]*)", command)
if done_match:
task_id = done_match.group(1)
- with Path("/tmp/ralph-guard-debug.log").open("a") as f:
+ with debug_file.open("a") as f:
f.write(
f" -> Extracted task_id: {task_id}, response has 'status': {'status' in response_text.lower()}\n"
)
@@ -405,7 +406,7 @@ def handle_post_tool_use(data: dict) -> None:
done_set.add(task_id)
state["flowctl_done_called"] = done_set
save_state(session_id, state)
- with Path("/tmp/ralph-guard-debug.log").open("a") as f:
+ with debug_file.open("a") as f:
f.write(
f" -> Added {task_id} to flowctl_done_called: {done_set}\n"
)
@@ -582,9 +583,17 @@ def handle_subagent_stop(data: dict) -> None:
handle_stop(data)
+def _debug_log_path() -> Path:
+ """Get debug log path: $RUN_DIR/guard-debug.log if set, else /tmp fallback."""
+ run_dir = os.environ.get("RUN_DIR")
+ if run_dir:
+ return Path(run_dir) / "guard-debug.log"
+ return Path("/tmp/ralph-guard-debug.log")
+
+
def main():
# Debug logging - always write to see if hook is being called
- debug_file = Path("/tmp/ralph-guard-debug.log")
+ debug_file = _debug_log_path()
with debug_file.open("a") as f:
f.write(f"[{os.environ.get('FLOW_RALPH', 'unset')}] Hook called\n")
diff --git a/skills/flow-code-ralph-init/templates/ralph.sh b/skills/flow-code-ralph-init/templates/ralph.sh
index bf28b938..ae26bafd 100644
--- a/skills/flow-code-ralph-init/templates/ralph.sh
+++ b/skills/flow-code-ralph-init/templates/ralph.sh
@@ -440,6 +440,9 @@ MEMORY_AUTO_SAVE="${MEMORY_AUTO_SAVE:-0}"
TDD_MODE="${TDD_MODE:-0}"
export CODEX_SANDBOX # Ensure available to Claude worker for flowctl codex commands
+# Dry-run mode: run selector loop only, no Claude invocation
+DRY_RUN="${DRY_RUN:-0}"
+
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case "$1" in
@@ -456,6 +459,10 @@ while [[ $# -gt 0 ]]; do
# Already processed in pre-scan; just consume args
shift
;;
+ --dry-run)
+ DRY_RUN=1
+ shift
+ ;;
--help|-h)
echo "Usage: ralph.sh [options]"
echo ""
@@ -463,6 +470,7 @@ while [[ $# -gt 0 ]]; do
echo " --config Use alternate config file (default: config.env)"
echo " --watch Show tool calls in real-time"
echo " --watch verbose Show tool calls + model responses"
+ echo " --dry-run Run selector loop only; no Claude invocation or state changes"
echo " --help, -h Show this help"
echo ""
echo "Environment variables:"
@@ -604,6 +612,43 @@ RUN_ID="$(date -u +%Y%m%d-%H%M%S)-$(rand4)"
RUN_ID_FULL="$(date -u +%Y%m%dT%H%M%SZ)-$(hostname -s 2>/dev/null || hostname)-$(sanitize_id "$(get_actor)")-$$-$(rand4)"
RUN_DIR="$SCRIPT_DIR/runs/$RUN_ID"
mkdir -p "$RUN_DIR"
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Concurrent run lock (Python fcntl.flock — works on macOS/Linux)
+# Prevents multiple ralph.sh instances running against the same repo.
+# Bash opens fd 200 on the lock file; Python acquires LOCK_EX|LOCK_NB on it.
+# Lock is held for shell process lifetime (fd 200 stays open until exit).
+# Windows: fcntl unavailable — lock is advisory-only (allows the run).
+# ─────────────────────────────────────────────────────────────────────────────
+RALPH_LOCK_FILE="$SCRIPT_DIR/.ralph.lock"
+
+# Open fd 200 on the lock file (bash holds this fd for its lifetime)
+exec 200>"$RALPH_LOCK_FILE"
+
+# Use Python to acquire non-blocking exclusive lock on fd 200
+_lock_result="$("$PYTHON_BIN" - <<'PY'
+import sys, os
+try:
+ import fcntl
+ try:
+ fcntl.flock(200, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ # Write PID for diagnostics
+ os.ftruncate(200, 0)
+ os.lseek(200, 0, os.SEEK_SET)
+ os.write(200, f"{os.getppid()}\n".encode())
+ print("OK")
+ except (IOError, OSError):
+ print("LOCKED")
+except ImportError:
+ # Windows: no fcntl — advisory only, allow the run
+ print("OK")
+PY
+)"
+if [[ "$_lock_result" == "LOCKED" ]]; then
+ fail "another ralph instance is already running in this directory (lock: $RALPH_LOCK_FILE). Remove the lock file to force-clear if the previous run crashed."
+fi
+unset _lock_result
+
ATTEMPTS_FILE="$RUN_DIR/attempts.json"
ensure_attempts_file "$ATTEMPTS_FILE"
BRANCHES_FILE="$RUN_DIR/branches.json"
@@ -910,38 +955,54 @@ maybe_close_epics() {
done
}
-verify_receipt() {
+# Read and verify receipt in a single atomic operation (no TOCTOU gap).
+# Returns JSON: {"valid": bool, "verdict": str, "error": str}
+# Caller checks validity via json_get on the output, avoiding separate
+# file-exists check + read that could race with concurrent writers.
+read_and_verify_receipt() {
local path="$1"
local kind="$2"
local id="$3"
- [[ -f "$path" ]] || return 1
"$PYTHON_BIN" - "$path" "$kind" "$id" <<'PY'
import json, sys
path, kind, rid = sys.argv[1], sys.argv[2], sys.argv[3]
+result = {"valid": False, "verdict": "", "error": ""}
try:
- data = json.load(open(path, encoding="utf-8"))
-except Exception:
- sys.exit(1)
+ with open(path, encoding="utf-8") as f:
+ data = json.load(f)
+except FileNotFoundError:
+ result["error"] = "file_not_found"
+ print(json.dumps(result))
+ sys.exit(0)
+except Exception as e:
+ result["error"] = f"parse_error: {e}"
+ print(json.dumps(result))
+ sys.exit(0)
if data.get("type") != kind:
- sys.exit(1)
+ result["error"] = f"type_mismatch: expected={kind} got={data.get('type')}"
+ print(json.dumps(result))
+ sys.exit(0)
if data.get("id") != rid:
- sys.exit(1)
-sys.exit(0)
+ result["error"] = f"id_mismatch: expected={rid} got={data.get('id')}"
+ print(json.dumps(result))
+ sys.exit(0)
+result["valid"] = True
+result["verdict"] = data.get("verdict", "")
+print(json.dumps(result))
PY
}
-# Read verdict field from receipt file (returns empty string if not found/error)
-read_receipt_verdict() {
+# Backward-compat wrapper: returns 0 if receipt is valid, 1 otherwise
+verify_receipt() {
local path="$1"
- [[ -f "$path" ]] || return 0
- "$PYTHON_BIN" - "$path" <<'PY'
-import json, sys
-try:
- data = json.load(open(sys.argv[1], encoding="utf-8"))
- print(data.get("verdict", ""))
-except Exception:
- pass
-PY
+ local kind="$2"
+ local id="$3"
+ local result
+ result="$(read_and_verify_receipt "$path" "$kind" "$id")"
+ local valid
+ valid="$(json_get valid "$result")"
+ [[ "$valid" == "1" ]] && return 0
+ return 1
}
# Create/switch to run branch (once at start, all epics work here)
@@ -1313,7 +1374,8 @@ ui_config
ui_version_check
# Create run branch once at start (all epics work on same branch)
-ensure_run_branch
+# Skip in dry-run mode — no branches, no state changes
+[[ "$DRY_RUN" != "1" ]] && ensure_run_branch
# Freeze scope snapshot (opt-in via FREEZE_SCOPE=1)
freeze_scope
@@ -1427,6 +1489,13 @@ while (( iter <= MAX_ITERATIONS )); do
fail "invalid selector status: $status"
fi
+ # Dry-run mode: print iteration info and skip Claude invocation entirely
+ if [[ "$DRY_RUN" == "1" ]]; then
+ echo "iter=$iter status=$status epic=${epic_id:-} task=${task_id:-}"
+ iter=$((iter + 1))
+ continue
+ fi
+
export REVIEW_MODE
export FLOW_RALPH="1"
claude_args=(-p)
@@ -1558,16 +1627,20 @@ Violations break automation and leave the user with incomplete work. Be precise,
fi
receipt_verdict=""
if [[ "$status" == "work" && ( "$WORK_REVIEW" == "rp" || "$WORK_REVIEW" == "codex" ) ]]; then
- if ! verify_receipt "$REVIEW_RECEIPT_PATH" "impl_review" "$task_id"; then
- echo "ralph: missing impl review receipt; forcing retry" >> "$iter_log"
- log "missing impl receipt; forcing retry"
+ # Single atomic read+verify (no TOCTOU gap between verify and verdict read)
+ _receipt_result="$(read_and_verify_receipt "$REVIEW_RECEIPT_PATH" "impl_review" "$task_id")"
+ _receipt_valid="$(json_get valid "$_receipt_result")"
+ if [[ "$_receipt_valid" != "1" ]]; then
+ _receipt_err="$(json_get error "$_receipt_result")"
+ echo "ralph: invalid impl review receipt ($REVIEW_RECEIPT_PATH): $_receipt_err; forcing retry" >> "$iter_log"
+ log "invalid impl receipt: $_receipt_err; forcing retry"
impl_receipt_ok="0"
# Delete corrupted/partial receipt so next attempt starts clean
rm -f "$REVIEW_RECEIPT_PATH" 2>/dev/null || true
force_retry=1
else
- # Receipt is valid - read the verdict field
- receipt_verdict="$(read_receipt_verdict "$REVIEW_RECEIPT_PATH")"
+ # Receipt is valid - verdict was read in the same pass (no TOCTOU)
+ receipt_verdict="$(json_get verdict "$_receipt_result")"
fi
fi
diff --git a/skills/flow-code-ralph-init/templates/watch-filter.py b/skills/flow-code-ralph-init/templates/watch-filter.py
index c703e18d..3a93eefc 100755
--- a/skills/flow-code-ralph-init/templates/watch-filter.py
+++ b/skills/flow-code-ralph-init/templates/watch-filter.py
@@ -32,6 +32,15 @@
# TUI indentation (3 spaces to match ralph.sh)
INDENT = " "
+# Context prefix from environment (iteration + task)
+_ITER = os.environ.get("RALPH_ITERATION", "")
+_TASK = os.environ.get("RALPH_TASK_ID", "")
+_PREFIX = ""
+if _ITER:
+ _PREFIX += f"[iter {_ITER}] "
+if _TASK:
+ _PREFIX += f"[{_TASK}] "
+
# Tool icons
ICONS = {
"Bash": "🔧",
@@ -165,7 +174,7 @@ def process_event(event: dict, verbose: bool) -> None:
tool_name = block.get("name", "")
tool_input = block.get("input", {})
formatted = format_tool_use(tool_name, tool_input)
- safe_print(f"{INDENT}{C_DIM}{formatted}{C_RESET}")
+ safe_print(f"{INDENT}{C_DIM}{_PREFIX}{formatted}{C_RESET}")
elif verbose and block_type == "text":
text = block.get("text", "")