From bb88b20c1aba77320829532eed1a6b3e58e19c39 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:22:49 +0800 Subject: [PATCH 1/7] fix(review): configurable codex timeout, resume warning, sandbox-aware files_embedded - Replace hardcoded timeout=600 with FLOW_CODEX_TIMEOUT env var (default 600s) - Print WARNING to stderr when codex resume falls back to new session - Force files_embedded=True in plan-review when sandbox prevents disk reads Task: fn-6-v2-reliability-and-performance-hardening.5 --- .../flowctl/commands/review/codex_utils.py | 16 +++++++------ scripts/flowctl/commands/review/commands.py | 23 +++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/scripts/flowctl/commands/review/codex_utils.py b/scripts/flowctl/commands/review/codex_utils.py index f4385162..920397fc 100644 --- a/scripts/flowctl/commands/review/codex_utils.py +++ b/scripts/flowctl/commands/review/codex_utils.py @@ -224,23 +224,24 @@ def run_codex_exec( # Try resume first - use stdin for prompt (model already set in original session) cmd = [codex, "exec", "resume", session_id, "-"] try: + codex_timeout = int(os.environ.get("FLOW_CODEX_TIMEOUT", "600")) result = subprocess.run( cmd, input=prompt, capture_output=True, text=True, check=True, - timeout=600, + timeout=codex_timeout, ) output = result.stdout # For resumed sessions, thread_id stays the same return output, session_id, 0, result.stderr - except subprocess.CalledProcessError: + except subprocess.CalledProcessError as e: # Resume failed - fall through to new session - pass - except subprocess.TimeoutExpired: + print(f"WARNING: Codex resume failed ({type(e).__name__}), starting new session", file=sys.stderr) + except subprocess.TimeoutExpired as e: # Resume failed - fall through to new session - pass + print(f"WARNING: Codex resume failed ({type(e).__name__}), starting new session", file=sys.stderr) # New session with model + high reasoning effort # --skip-git-repo-check: safe with read-only sandbox, allows reviews from /tmp etc (GH-33) @@ -258,6 +259,7 @@ def run_codex_exec( "--json", "-", ] + codex_timeout = int(os.environ.get("FLOW_CODEX_TIMEOUT", "600")) try: result = subprocess.run( cmd, @@ -265,13 +267,13 @@ def run_codex_exec( capture_output=True, text=True, check=False, # Don't raise on non-zero exit - timeout=600, + timeout=codex_timeout, ) output = result.stdout thread_id = parse_codex_thread_id(output) return output, thread_id, result.returncode, result.stderr except subprocess.TimeoutExpired: - return "", None, 2, "codex exec timed out (600s)" + return "", None, 2, f"codex exec timed out ({codex_timeout}s)" def parse_codex_thread_id(output: str) -> Optional[str]: diff --git a/scripts/flowctl/commands/review/commands.py b/scripts/flowctl/commands/review/commands.py index d6bc9b68..e9d05ba3 100644 --- a/scripts/flowctl/commands/review/commands.py +++ b/scripts/flowctl/commands/review/commands.py @@ -276,8 +276,19 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: base_branch = args.base if hasattr(args, "base") and args.base else "main" context_hints = gather_context_hints(base_branch) - # Only forbid disk reads when ALL files were fully embedded. - files_embedded = not embed_stats.get("budget_skipped") and not embed_stats.get("truncated") + # Resolve sandbox mode early — needed for files_embedded decision below + try: + sandbox = resolve_codex_sandbox(getattr(args, "sandbox", "auto")) + except ValueError as e: + error_exit(str(e), use_json=args.json, code=2) + + # When sandbox prevents disk reads (anything other than "none"), Codex + # can't read files itself — tell it all context is embedded regardless of + # truncation. Only use the budget-based check for no-sandbox mode. + if sandbox != "none": + files_embedded = True + else: + files_embedded = not embed_stats.get("budget_skipped") and not embed_stats.get("truncated") prompt = build_review_prompt( "plan", epic_spec, context_hints, task_specs=task_specs, embedded_files=embedded_content, files_embedded=files_embedded @@ -305,13 +316,7 @@ def cmd_codex_plan_review(args: argparse.Namespace) -> None: rereview_preamble = build_rereview_preamble(spec_files, "plan", files_embedded) prompt = rereview_preamble + prompt - # Resolve sandbox mode (never pass 'auto' to Codex CLI) - try: - sandbox = resolve_codex_sandbox(getattr(args, "sandbox", "auto")) - except ValueError as e: - error_exit(str(e), use_json=args.json, code=2) - - # Run codex + # Run codex (sandbox already resolved above) effort = getattr(args, "effort", "high") output, thread_id, exit_code, stderr = run_codex_exec( prompt, session_id=session_id, sandbox=sandbox, effort=effort From 4e89b607ee95c38ee7741e0d0ef5d279f20db9b4 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:24:49 +0800 Subject: [PATCH 2/7] fix: Windows flock, adversarial prompt validation, phases.md codex ref - compat.py: use msvcrt.locking() on Windows instead of no-op flock, with warning fallback for unknown platforms - adversarial.py: warn on unconsumed {{...}} placeholders after template substitution - phases.md: replace raw `codex exec` with `flowctl codex exec` to match guard allowlist --- .../flowctl/commands/review/adversarial.py | 9 ++++++- scripts/flowctl/compat.py | 27 +++++++++++++++---- skills/flow-code-work/phases.md | 4 +-- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/scripts/flowctl/commands/review/adversarial.py b/scripts/flowctl/commands/review/adversarial.py index 8ea076cd..f37a75d0 100644 --- a/scripts/flowctl/commands/review/adversarial.py +++ b/scripts/flowctl/commands/review/adversarial.py @@ -26,7 +26,7 @@ def _load_adversarial_prompt(focus_block: str, diff_summary: str, """Load adversarial review prompt from prompts/adversarial-review.md.""" prompt_path = _get_plugin_root() / "prompts" / "adversarial-review.md" template = prompt_path.read_text() - return template.replace( + result = template.replace( "{{focus_block}}", focus_block, ).replace( "{{diff_summary}}", diff_summary, @@ -35,6 +35,13 @@ def _load_adversarial_prompt(focus_block: str, diff_summary: str, ).replace( "{{embedded_files}}", embedded_files, ) + # Warn on unconsumed placeholders + remaining = re.findall(r"\{\{(\w+)\}\}", result) + if remaining: + import sys + print(f"Warning: unconsumed placeholders in adversarial prompt: {remaining}", + file=sys.stderr) + return result def parse_adversarial_output(output: str) -> Optional[dict]: diff --git a/scripts/flowctl/compat.py b/scripts/flowctl/compat.py index 714aaf29..b695b61f 100644 --- a/scripts/flowctl/compat.py +++ b/scripts/flowctl/compat.py @@ -12,12 +12,29 @@ def _flock(f, lock_type): LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except ImportError: - # Windows: fcntl not available, use no-op (acceptable for single-machine use) - def _flock(f, lock_type): - pass + # Windows: fcntl not available, try msvcrt for file locking + try: + import msvcrt + + LOCK_EX = 1 # map to LK_NBLCK + LOCK_UN = 2 # map to LK_UNLCK + + def _flock(f, lock_type): + if lock_type == LOCK_EX: + msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1) + elif lock_type == LOCK_UN: + msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1) + + except ImportError: + # Non-Unix, non-Windows: no-op with warning + import warnings + warnings.warn("File locking unavailable on this platform; concurrent access is unprotected") + + def _flock(f, lock_type): + pass - LOCK_EX = 0 - LOCK_UN = 0 + LOCK_EX = 0 + LOCK_UN = 0 def _fsync(fd: int) -> None: diff --git a/skills/flow-code-work/phases.md b/skills/flow-code-work/phases.md index c15dc31e..52b28c4c 100644 --- a/skills/flow-code-work/phases.md +++ b/skills/flow-code-work/phases.md @@ -288,7 +288,7 @@ While tasks remain in this wave: "Spec conflict: ": → Forward to Codex for decision: - Run `codex exec` with prompt: + Run `flowctl codex exec` with prompt: "Spec conflict in task . The spec says: But the code shows: @@ -303,7 +303,7 @@ While tasks remain in this wave: → Parse message body for blocker info → If in-flight task: wait for it to complete, then notify blocked worker → If external: forward to Codex for decision: - Run `codex exec` with prompt: + Run `flowctl codex exec` with prompt: "Task is blocked by: . Options: 1) Skip this task 2) Remove the dependency 3) Split into doable + blocked parts Reply with ONLY: N where N is 1, 2, or 3. Then one sentence explaining why." From c9ba14d21597c601eeef9c584df124b26978603b Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:25:25 +0800 Subject: [PATCH 3/7] fix: cmd_done write ordering + cache get_repo_root Write runtime state before spec/receipt in cmd_done so a mid-write crash still marks the task as done (runtime is authoritative). Cache get_repo_root() per cwd to avoid repeated subprocess calls, following the same pattern as get_state_dir(). Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/flowctl/commands/workflow.py | 24 +++++++++++++----------- scripts/flowctl/core/paths.py | 25 ++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/scripts/flowctl/commands/workflow.py b/scripts/flowctl/commands/workflow.py index f85ecc00..23d6814d 100644 --- a/scripts/flowctl/commands/workflow.py +++ b/scripts/flowctl/commands/workflow.py @@ -768,7 +768,19 @@ def to_list(val: Any) -> list: except ValueError as e: error_exit(str(e), use_json=args.json) - # All validation passed - now write (spec to tracked file, runtime to state-dir) + # Add duration to evidence + if duration_seconds is not None: + evidence["duration_seconds"] = duration_seconds + + # All validation passed - now write. + # Write runtime state FIRST (authoritative source via load_task_with_state), + # so a crash after this point still marks the task as done. + runtime_done = {"status": "done", "evidence": evidence, "completed_at": now_iso()} + if duration_seconds is not None: + runtime_done["duration_seconds"] = duration_seconds + save_task_runtime(args.id, runtime_done) + + # Then write spec (summary + evidence markdown) atomic_write(task_spec_path, updated_spec) # Archive review receipt if present in evidence @@ -781,16 +793,6 @@ def to_list(val: Any) -> list: receipt_filename = f"{rtype}-{args.id}-{mode}.json" atomic_write_json(reviews_dir / receipt_filename, review_receipt) - # Add duration to evidence - if duration_seconds is not None: - evidence["duration_seconds"] = duration_seconds - - # Write runtime state to state-dir (not definition file) - runtime_done = {"status": "done", "evidence": evidence, "completed_at": now_iso()} - if duration_seconds is not None: - runtime_done["duration_seconds"] = duration_seconds - save_task_runtime(args.id, runtime_done) - # NOTE: We no longer update epic timestamp on task done. # This reduces merge conflicts in multi-user scenarios. diff --git a/scripts/flowctl/core/paths.py b/scripts/flowctl/core/paths.py index 4ebb0d1e..0e274699 100644 --- a/scripts/flowctl/core/paths.py +++ b/scripts/flowctl/core/paths.py @@ -10,14 +10,30 @@ # CLI invocations are short-lived so no expiry needed. _state_dir_cache: dict[str, Path] = {} +# Module-level cache for get_repo_root(), keyed by cwd string. +_repo_root_cache: dict[str, Path] = {} + def _reset_state_dir_cache() -> None: """Clear the get_state_dir() memoization cache. For testing.""" _state_dir_cache.clear() +def _reset_repo_root_cache() -> None: + """Clear the get_repo_root() memoization cache. For testing.""" + _repo_root_cache.clear() + + def get_repo_root() -> Path: - """Find git repo root.""" + """Find git repo root. + + Results are memoized per working directory. Call _reset_repo_root_cache() + to clear (e.g. in tests that change directories). + """ + cache_key = os.getcwd() + if cache_key in _repo_root_cache: + return _repo_root_cache[cache_key] + try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], @@ -25,10 +41,13 @@ def get_repo_root() -> Path: text=True, check=True, ) - return Path(result.stdout.strip()) + resolved = Path(result.stdout.strip()) except subprocess.CalledProcessError: # Fallback to current directory - return Path.cwd() + resolved = Path.cwd() + + _repo_root_cache[cache_key] = resolved + return resolved def get_flow_dir() -> Path: From 5cbfbba0b3bfe563b3d41eb07de1e198951b33ef Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:29:37 +0800 Subject: [PATCH 4/7] fix: remove backward-compat status writes to definition files Status is now managed exclusively by runtime state files. Definition JSON files no longer have status written to them in cmd_task_reset, cmd_restart, cmd_skip, and cmd_split. Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/flowctl/commands/task.py | 12 ++++++------ scripts/flowctl/commands/workflow.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/scripts/flowctl/commands/task.py b/scripts/flowctl/commands/task.py index bc4e9478..918fcc67 100644 --- a/scripts/flowctl/commands/task.py +++ b/scripts/flowctl/commands/task.py @@ -812,7 +812,7 @@ def cmd_task_reset(args: argparse.Namespace) -> None: def_data.pop("claimed_at", None) def_data.pop("claim_note", None) def_data.pop("evidence", None) - def_data["status"] = "todo" # Keep in sync for backward compat + def_data.pop("status", None) def_data["updated_at"] = now_iso() atomic_write_json(task_json_path, def_data) @@ -848,7 +848,7 @@ def cmd_task_reset(args: argparse.Namespace) -> None: dep_def.pop("claimed_at", None) dep_def.pop("claim_note", None) dep_def.pop("evidence", None) - dep_def["status"] = "todo" + dep_def.pop("status", None) dep_def["updated_at"] = now_iso() atomic_write_json(dep_path, dep_def) @@ -922,9 +922,9 @@ def cmd_task_skip(args: argparse.Namespace) -> None: if status == "done": error_exit(f"Cannot skip already-done task {task_id}", use_json=args.json) - # Update definition + # Update definition (status managed by runtime only) def_data = load_json_or_exit(task_path, f"Task {task_id}", use_json=args.json) - def_data["status"] = "skipped" + def_data.pop("status", None) def_data["skipped_reason"] = args.reason or "" def_data["skipped_at"] = now_iso() def_data["updated_at"] = now_iso() @@ -1008,9 +1008,9 @@ def cmd_task_split(args: argparse.Namespace) -> None: atomic_write(flow_dir / TASKS_DIR / f"{sub_id}.md", spec_content) created.append(sub_id) - # Mark original task as skipped with split reference + # Mark original task as skipped with split reference (status managed by runtime only) def_data = load_json_or_exit(task_path, f"Task {task_id}", use_json=args.json) - def_data["status"] = "skipped" + def_data.pop("status", None) def_data["skipped_reason"] = f"Split into: {', '.join(created)}" def_data["split_into"] = created def_data["updated_at"] = now_iso() diff --git a/scripts/flowctl/commands/workflow.py b/scripts/flowctl/commands/workflow.py index 23d6814d..891d608d 100644 --- a/scripts/flowctl/commands/workflow.py +++ b/scripts/flowctl/commands/workflow.py @@ -972,7 +972,7 @@ def cmd_restart(args: argparse.Namespace) -> None: for field in ("blocked_reason", "completed_at", "assignee", "claimed_at", "claim_note", "evidence"): def_data.pop(field, None) - def_data["status"] = "todo" + def_data.pop("status", None) def_data["updated_at"] = now_iso() atomic_write_json(tid_path, def_data) From 3a3232f249f4e03748d869e249c77fd637c05204 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:30:52 +0800 Subject: [PATCH 5/7] test(workflow): add pytest tests for workflow commands - cmd_start: 11 tests (claim, deps, blocked, force, resume, invalid ID) - cmd_done: 10 tests (evidence, spec update, duration, cross-actor, force) - cmd_ready: 7 tests (unblocked, dep done, skipped-as-done, in_progress, blocked) - cmd_next: 6 tests (ready task, resume, all-done, completion/plan review gates) - cmd_restart: 6 tests (cascade, dry-run, force, skip-todo, invalid/missing) - cmd_block: 3 tests (status, done-fails, spec update) Task: fn-6-v2-reliability-and-performance-hardening.2 --- scripts/flowctl/tests/test_workflow.py | 655 +++++++++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 scripts/flowctl/tests/test_workflow.py diff --git a/scripts/flowctl/tests/test_workflow.py b/scripts/flowctl/tests/test_workflow.py new file mode 100644 index 00000000..d2a67603 --- /dev/null +++ b/scripts/flowctl/tests/test_workflow.py @@ -0,0 +1,655 @@ +"""Tests for flowctl.commands.workflow — start, done, ready, next, restart, block.""" + +import argparse +import json +import os + +import pytest + +from flowctl.commands.workflow import ( + cmd_block, + cmd_done, + cmd_next, + cmd_ready, + cmd_restart, + cmd_start, +) +from flowctl.core.state import load_task_with_state, save_task_runtime + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _create_epic(flow_dir, epic_id, *, status="open", title="Test Epic", + plan_review_status="unknown", depends_on_epics=None, + completion_review_status="unknown"): + """Create an epic JSON file.""" + epic_data = { + "id": epic_id, + "title": title, + "status": status, + "plan_review_status": plan_review_status, + "completion_review_status": completion_review_status, + "depends_on_epics": depends_on_epics or [], + } + epic_path = flow_dir / "epics" / f"{epic_id}.json" + epic_path.write_text(json.dumps(epic_data, indent=2) + "\n", encoding="utf-8") + return epic_path + + +def _create_task(flow_dir, task_id, *, epic, title="Test Task", status="todo", + depends_on=None, priority=None, files=None, domain=None): + """Create a task JSON file and a minimal spec .md file.""" + task_data = { + "id": task_id, + "title": title, + "epic": epic, + "status": status, + "depends_on": depends_on or [], + } + if priority is not None: + task_data["priority"] = priority + if files is not None: + task_data["files"] = files + if domain is not None: + task_data["domain"] = domain + + task_path = flow_dir / "tasks" / f"{task_id}.json" + task_path.write_text(json.dumps(task_data, indent=2) + "\n", encoding="utf-8") + + # Create minimal spec markdown + spec_path = flow_dir / "tasks" / f"{task_id}.md" + spec_path.write_text( + f"# {task_id} {title}\n\n" + "## Description\nTask description.\n\n" + "## Acceptance\n- [ ] Criterion A\n\n" + "## Done summary\nTBD\n\n" + "## Evidence\n- Commits:\n- Tests:\n- PRs:\n", + encoding="utf-8", + ) + return task_path + + +def _ns(**kwargs): + """Build an argparse.Namespace with sensible defaults.""" + defaults = {"json": True, "force": False} + defaults.update(kwargs) + return argparse.Namespace(**defaults) + + +# --------------------------------------------------------------------------- +# Fixture: populated epic with 3 tasks and dependencies +# --------------------------------------------------------------------------- + + +@pytest.fixture +def populated_epic(git_repo): + """Create an epic fn-1 with 3 tasks: .1 (no deps), .2 (depends on .1), .3 (depends on .2).""" + flow_dir = git_repo / ".flow" + _create_epic(flow_dir, "fn-1") + _create_task(flow_dir, "fn-1.1", epic="fn-1", title="First task") + _create_task(flow_dir, "fn-1.2", epic="fn-1", title="Second task", depends_on=["fn-1.1"]) + _create_task(flow_dir, "fn-1.3", epic="fn-1", title="Third task", depends_on=["fn-1.2"]) + + # Set up state dir for runtime state + state_dir = git_repo / ".git" / "flow-state" + state_dir.mkdir(parents=True, exist_ok=True) + os.environ["FLOW_STATE_DIR"] = str(state_dir) + os.environ["FLOW_ACTOR"] = "test@test.com" + + yield flow_dir + + # Cleanup env + os.environ.pop("FLOW_STATE_DIR", None) + os.environ.pop("FLOW_ACTOR", None) + + +# =========================================================================== +# cmd_start tests (>=5 cases) +# =========================================================================== + + +class TestCmdStart: + """Tests for cmd_start — claiming and starting tasks.""" + + def test_start_todo_task(self, populated_epic, capsys): + """Starting a todo task should set it to in_progress.""" + cmd_start(_ns(id="fn-1.1", note=None)) + out = json.loads(capsys.readouterr().out) + assert out["status"] == "in_progress" + + task = load_task_with_state("fn-1.1") + assert task["status"] == "in_progress" + assert task["assignee"] == "test@test.com" + + def test_start_sets_claimed_at(self, populated_epic): + """Starting a task should set claimed_at timestamp.""" + cmd_start(_ns(id="fn-1.1", note=None)) + task = load_task_with_state("fn-1.1") + assert "claimed_at" in task + assert task["claimed_at"] is not None + + def test_start_with_note(self, populated_epic, capsys): + """Starting a task with --note should store the claim note.""" + cmd_start(_ns(id="fn-1.1", note="Taking this one")) + task = load_task_with_state("fn-1.1") + assert task.get("claim_note") == "Taking this one" + + def test_start_already_done_fails(self, populated_epic, capsys): + """Starting a done task should fail.""" + # First, start and complete fn-1.1 + save_task_runtime("fn-1.1", {"status": "done", "assignee": "test@test.com"}) + + with pytest.raises(SystemExit): + cmd_start(_ns(id="fn-1.1", note=None)) + + def test_start_blocked_task_fails(self, populated_epic): + """Starting a blocked task should fail without --force.""" + save_task_runtime("fn-1.1", {"status": "blocked"}) + + with pytest.raises(SystemExit): + cmd_start(_ns(id="fn-1.1", note=None)) + + def test_start_blocked_task_force(self, populated_epic, capsys): + """Starting a blocked task with --force should succeed.""" + save_task_runtime("fn-1.1", {"status": "blocked"}) + cmd_start(_ns(id="fn-1.1", note=None, force=True)) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "in_progress" + + def test_start_unmet_dependency_fails(self, populated_epic): + """Starting a task with unmet dependencies should fail.""" + # fn-1.2 depends on fn-1.1 which is still todo + with pytest.raises(SystemExit): + cmd_start(_ns(id="fn-1.2", note=None)) + + def test_start_met_dependency_succeeds(self, populated_epic, capsys): + """Starting a task whose deps are done should succeed.""" + save_task_runtime("fn-1.1", {"status": "done"}) + cmd_start(_ns(id="fn-1.2", note=None)) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "in_progress" + + def test_start_claimed_by_other_fails(self, populated_epic): + """Starting a task claimed by someone else should fail.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "other@test.com"}) + + with pytest.raises(SystemExit): + cmd_start(_ns(id="fn-1.1", note=None)) + + def test_start_resume_own_in_progress(self, populated_epic, capsys): + """Re-starting your own in_progress task should succeed (resume).""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + cmd_start(_ns(id="fn-1.1", note=None)) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "in_progress" + + def test_start_invalid_task_id_fails(self, populated_epic): + """Starting with an invalid task ID should fail.""" + with pytest.raises(SystemExit): + cmd_start(_ns(id="not-a-task", note=None)) + + +# =========================================================================== +# cmd_done tests (>=5 cases) +# =========================================================================== + + +class TestCmdDone: + """Tests for cmd_done — completing tasks with evidence.""" + + def test_done_in_progress_task(self, populated_epic, capsys): + """Completing an in_progress task should set status to done.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + cmd_done(_ns( + id="fn-1.1", + summary="Implemented feature", + summary_file=None, + evidence=None, + evidence_json=None, + force=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + assert out["id"] == "fn-1.1" + + def test_done_requires_in_progress(self, populated_epic): + """Completing a todo task should fail without --force.""" + with pytest.raises(SystemExit): + cmd_done(_ns( + id="fn-1.1", + summary="Done", + summary_file=None, + evidence=None, + evidence_json=None, + force=False, + )) + + def test_done_already_done_fails(self, populated_epic): + """Completing a task that is already done should fail.""" + save_task_runtime("fn-1.1", {"status": "done"}) + + with pytest.raises(SystemExit): + cmd_done(_ns( + id="fn-1.1", + summary="Done again", + summary_file=None, + evidence=None, + evidence_json=None, + force=False, + )) + + def test_done_with_evidence_json_inline(self, populated_epic, capsys): + """Completing with inline evidence JSON should parse and store it.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + evidence = json.dumps({"commits": ["abc123"], "tests": ["pytest"], "prs": []}) + cmd_done(_ns( + id="fn-1.1", + summary="Done", + summary_file=None, + evidence=None, + evidence_json=evidence, + force=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + + def test_done_with_evidence_file(self, populated_epic, capsys, tmp_path): + """Completing with evidence from a file should work.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + evidence_file = tmp_path / "evidence.json" + evidence_file.write_text( + json.dumps({"commits": ["def456"], "tests": [], "prs": []}), + encoding="utf-8", + ) + + cmd_done(_ns( + id="fn-1.1", + summary="Done", + summary_file=None, + evidence=None, + evidence_json=str(evidence_file), + force=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + + def test_done_updates_spec_file(self, populated_epic): + """Done should update the ## Done summary and ## Evidence sections in the spec.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + cmd_done(_ns( + id="fn-1.1", + summary="Implemented the feature successfully", + summary_file=None, + evidence=None, + evidence_json=json.dumps({"commits": ["abc"], "tests": ["pytest"], "prs": []}), + force=False, + )) + + spec_path = populated_epic / "tasks" / "fn-1.1.md" + content = spec_path.read_text(encoding="utf-8") + assert "Implemented the feature successfully" in content + assert "abc" in content + + def test_done_with_summary_file(self, populated_epic, capsys, tmp_path): + """Completing with summary from a file should work.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + summary_file = tmp_path / "summary.md" + summary_file.write_text("Summary from file", encoding="utf-8") + + cmd_done(_ns( + id="fn-1.1", + summary=None, + summary_file=str(summary_file), + evidence=None, + evidence_json=None, + force=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + + def test_done_force_skips_status_check(self, populated_epic, capsys): + """Done with --force should allow completing a todo task.""" + cmd_done(_ns( + id="fn-1.1", + summary="Force done", + summary_file=None, + evidence=None, + evidence_json=None, + force=True, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + + def test_done_cross_actor_fails(self, populated_epic): + """Completing a task claimed by someone else should fail.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "other@test.com"}) + + with pytest.raises(SystemExit): + cmd_done(_ns( + id="fn-1.1", + summary="Done", + summary_file=None, + evidence=None, + evidence_json=None, + force=False, + )) + + def test_done_calculates_duration(self, populated_epic, capsys): + """Done should calculate duration from claimed_at.""" + from flowctl.core.io import now_iso + save_task_runtime("fn-1.1", { + "status": "in_progress", + "assignee": "test@test.com", + "claimed_at": now_iso(), + }) + + cmd_done(_ns( + id="fn-1.1", + summary="Done", + summary_file=None, + evidence=None, + evidence_json=None, + force=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "done" + assert "duration_seconds" in out + + +# =========================================================================== +# cmd_ready tests (>=3 cases) +# =========================================================================== + + +class TestCmdReady: + """Tests for cmd_ready — listing ready tasks for an epic.""" + + def test_ready_returns_unblocked_tasks(self, populated_epic, capsys): + """Ready should return tasks with no unmet dependencies.""" + cmd_ready(_ns(epic="fn-1")) + + out = json.loads(capsys.readouterr().out) + ready_ids = [t["id"] for t in out["ready"]] + assert "fn-1.1" in ready_ids + # fn-1.2 depends on fn-1.1 (todo), so not ready + assert "fn-1.2" not in ready_ids + + def test_ready_after_dep_done(self, populated_epic, capsys): + """After a dep is done, the dependent task should appear in ready.""" + save_task_runtime("fn-1.1", {"status": "done"}) + + cmd_ready(_ns(epic="fn-1")) + + out = json.loads(capsys.readouterr().out) + ready_ids = [t["id"] for t in out["ready"]] + assert "fn-1.2" in ready_ids + # fn-1.1 is done, not in ready + assert "fn-1.1" not in ready_ids + + def test_ready_skipped_counts_as_done(self, populated_epic, capsys): + """A skipped dependency should count as satisfied.""" + save_task_runtime("fn-1.1", {"status": "skipped"}) + + cmd_ready(_ns(epic="fn-1")) + + out = json.loads(capsys.readouterr().out) + ready_ids = [t["id"] for t in out["ready"]] + assert "fn-1.2" in ready_ids + + def test_ready_shows_in_progress_separately(self, populated_epic, capsys): + """In-progress tasks should appear in the in_progress list, not ready.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + cmd_ready(_ns(epic="fn-1")) + + out = json.loads(capsys.readouterr().out) + ready_ids = [t["id"] for t in out["ready"]] + in_progress_ids = [t["id"] for t in out["in_progress"]] + assert "fn-1.1" not in ready_ids + assert "fn-1.1" in in_progress_ids + + def test_ready_shows_blocked_tasks(self, populated_epic, capsys): + """Blocked tasks should appear in the blocked list.""" + save_task_runtime("fn-1.1", {"status": "blocked"}) + + cmd_ready(_ns(epic="fn-1")) + + out = json.loads(capsys.readouterr().out) + blocked_ids = [b["id"] for b in out["blocked"]] + assert "fn-1.1" in blocked_ids + + def test_ready_invalid_epic_fails(self, populated_epic): + """Ready with an invalid epic ID should fail.""" + with pytest.raises(SystemExit): + cmd_ready(_ns(epic="not-an-epic")) + + def test_ready_nonexistent_epic_fails(self, populated_epic): + """Ready with a non-existent epic should fail.""" + with pytest.raises(SystemExit): + cmd_ready(_ns(epic="fn-999")) + + +# =========================================================================== +# cmd_next tests (>=3 cases) +# =========================================================================== + + +class TestCmdNext: + """Tests for cmd_next — selecting the next task to work on.""" + + def test_next_returns_first_ready_task(self, populated_epic, capsys): + """Next should return the first ready task.""" + cmd_next(_ns( + epics_file=None, + require_plan_review=False, + require_completion_review=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "work" + assert out["task"] == "fn-1.1" + assert out["reason"] == "ready_task" + + def test_next_resumes_in_progress(self, populated_epic, capsys): + """Next should resume an in_progress task owned by current actor.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + cmd_next(_ns( + epics_file=None, + require_plan_review=False, + require_completion_review=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "work" + assert out["task"] == "fn-1.1" + assert out["reason"] == "resume_in_progress" + + def test_next_none_when_all_done(self, populated_epic, capsys): + """Next should return 'none' when all tasks are done.""" + for tid in ("fn-1.1", "fn-1.2", "fn-1.3"): + save_task_runtime(tid, {"status": "done"}) + + cmd_next(_ns( + epics_file=None, + require_plan_review=False, + require_completion_review=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "none" + + def test_next_completion_review_gate(self, populated_epic, capsys): + """When all tasks done and completion review required, should signal completion_review.""" + for tid in ("fn-1.1", "fn-1.2", "fn-1.3"): + save_task_runtime(tid, {"status": "done"}) + + cmd_next(_ns( + epics_file=None, + require_plan_review=False, + require_completion_review=True, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "completion_review" + assert out["reason"] == "needs_completion_review" + + def test_next_plan_review_gate(self, populated_epic, capsys): + """When plan review is required but not done, should signal plan review needed.""" + cmd_next(_ns( + epics_file=None, + require_plan_review=True, + require_completion_review=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "plan" + assert out["reason"] == "needs_plan_review" + + def test_next_skips_done_epics(self, populated_epic, capsys): + """Next should skip epics with status=done.""" + # Mark epic as done + epic_path = populated_epic / "epics" / "fn-1.json" + epic_data = json.loads(epic_path.read_text(encoding="utf-8")) + epic_data["status"] = "done" + epic_path.write_text(json.dumps(epic_data, indent=2) + "\n", encoding="utf-8") + + cmd_next(_ns( + epics_file=None, + require_plan_review=False, + require_completion_review=False, + )) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "none" + + +# =========================================================================== +# cmd_restart tests (>=2 cases) +# =========================================================================== + + +class TestCmdRestart: + """Tests for cmd_restart — resetting tasks and cascading to dependents.""" + + def test_restart_resets_task_and_dependents(self, populated_epic, capsys): + """Restart should reset the target task and cascade to downstream dependents.""" + # Set up: fn-1.1 done, fn-1.2 done, fn-1.3 in_progress + save_task_runtime("fn-1.1", {"status": "done"}) + save_task_runtime("fn-1.2", {"status": "done"}) + save_task_runtime("fn-1.3", {"status": "in_progress", "assignee": "test@test.com"}) + + cmd_restart(_ns(id="fn-1.1", dry_run=False, force=True)) + + out = json.loads(capsys.readouterr().out) + assert out["success"] is True + # All three should be reset + assert "fn-1.1" in out["reset"] + + # Verify runtime states + t1 = load_task_with_state("fn-1.1") + assert t1["status"] == "todo" + + def test_restart_dry_run(self, populated_epic, capsys): + """Dry run should report what would be reset without changing anything.""" + save_task_runtime("fn-1.1", {"status": "done"}) + save_task_runtime("fn-1.2", {"status": "done"}) + + cmd_restart(_ns(id="fn-1.1", dry_run=True, force=False)) + + out = json.loads(capsys.readouterr().out) + assert out["dry_run"] is True + assert "fn-1.1" in out["would_reset"] + + # Verify nothing actually changed + t1 = load_task_with_state("fn-1.1") + assert t1["status"] == "done" + + def test_restart_in_progress_requires_force(self, populated_epic): + """Restarting when target is in_progress should fail without --force.""" + save_task_runtime("fn-1.1", {"status": "in_progress", "assignee": "test@test.com"}) + + with pytest.raises(SystemExit): + cmd_restart(_ns(id="fn-1.1", dry_run=False, force=False)) + + def test_restart_skips_todo_tasks(self, populated_epic, capsys): + """Tasks already in todo should be reported as skipped, not reset.""" + save_task_runtime("fn-1.1", {"status": "done"}) + # fn-1.2 and fn-1.3 are still todo + + cmd_restart(_ns(id="fn-1.1", dry_run=False, force=False)) + + out = json.loads(capsys.readouterr().out) + assert "fn-1.1" in out["reset"] + # fn-1.2 and fn-1.3 are dependents but already todo + assert "fn-1.2" in out.get("skipped", []) + + def test_restart_invalid_task_id_fails(self, populated_epic): + """Restarting with an invalid task ID should fail.""" + with pytest.raises(SystemExit): + cmd_restart(_ns(id="bad-id", dry_run=False, force=False)) + + def test_restart_nonexistent_task_fails(self, populated_epic): + """Restarting a non-existent task should fail.""" + with pytest.raises(SystemExit): + cmd_restart(_ns(id="fn-1.99", dry_run=False, force=False)) + + +# =========================================================================== +# cmd_block tests +# =========================================================================== + + +class TestCmdBlock: + """Tests for cmd_block — blocking a task with a reason.""" + + def test_block_sets_blocked_status(self, populated_epic, capsys, tmp_path): + """Block should set task status to blocked.""" + reason_file = tmp_path / "reason.md" + reason_file.write_text("Waiting for external API", encoding="utf-8") + + cmd_block(_ns(id="fn-1.1", reason_file=str(reason_file))) + + out = json.loads(capsys.readouterr().out) + assert out["status"] == "blocked" + + task = load_task_with_state("fn-1.1") + assert task["status"] == "blocked" + + def test_block_done_task_fails(self, populated_epic, tmp_path): + """Blocking a done task should fail.""" + save_task_runtime("fn-1.1", {"status": "done"}) + + reason_file = tmp_path / "reason.md" + reason_file.write_text("Some reason", encoding="utf-8") + + with pytest.raises(SystemExit): + cmd_block(_ns(id="fn-1.1", reason_file=str(reason_file))) + + def test_block_updates_spec(self, populated_epic, tmp_path): + """Block should update the ## Done summary section in the spec.""" + reason_file = tmp_path / "reason.md" + reason_file.write_text("Blocked by external dependency", encoding="utf-8") + + cmd_block(_ns(id="fn-1.1", reason_file=str(reason_file))) + + spec_path = populated_epic / "tasks" / "fn-1.1.md" + content = spec_path.read_text(encoding="utf-8") + assert "Blocked by external dependency" in content From 86d3c5b75ee0abf9c45b8c9f2fbb88ed67535cc3 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:31:15 +0800 Subject: [PATCH 6/7] fix(dx): rp.py error paths respect --json, guard log rotation - Thread use_json parameter through all rp.py helpers (require_rp_cli, run_rp_cli, parse_windows, parse_builder_tab) and cmd_* functions - All ~15 error_exit calls now use args.json instead of hardcoded False - Add 1MB log rotation in ralph-guard _debug_log_path() (single-file .1) - Bump RALPH_GUARD_VERSION to 0.15.0 Task: fn-6-v2-reliability-and-performance-hardening.7 --- scripts/flowctl/commands/rp.py | 100 ++++++++++++++++++--------------- scripts/hooks/ralph-guard.py | 88 +++++++++++++++++++++-------- 2 files changed, 121 insertions(+), 67 deletions(-) diff --git a/scripts/flowctl/commands/rp.py b/scripts/flowctl/commands/rp.py index 2203c6aa..e40641c2 100644 --- a/scripts/flowctl/commands/rp.py +++ b/scripts/flowctl/commands/rp.py @@ -20,26 +20,27 @@ # ───────────────────────────────────────────────────────────────────────────── -def require_rp_cli() -> str: +def require_rp_cli(use_json: bool = False) -> str: """Ensure rp-cli is available.""" rp = shutil.which("rp-cli") if not rp: - error_exit("rp-cli not found in PATH", use_json=False, code=2) + error_exit("rp-cli not found in PATH", use_json=use_json, code=2) return rp def run_rp_cli( - args: list[str], timeout: Optional[int] = None + args: list[str], timeout: Optional[int] = None, use_json: bool = False ) -> subprocess.CompletedProcess: """Run rp-cli with safe error handling and timeout. Args: args: Command arguments to pass to rp-cli timeout: Max seconds to wait. Default from FLOW_RP_TIMEOUT env or 1200s (20min). + use_json: Whether to output errors as JSON. """ if timeout is None: timeout = int(os.environ.get("FLOW_RP_TIMEOUT", "1200")) - rp = require_rp_cli() + rp = require_rp_cli(use_json=use_json) cmd = [rp] + args try: return subprocess.run( @@ -49,12 +50,12 @@ def run_rp_cli( error_exit( f"rp-cli timed out after {timeout}s. " "Is RepoPrompt running? If not, use --review=codex as fallback.", - use_json=False, + use_json=use_json, code=3, ) except subprocess.CalledProcessError as e: msg = (e.stderr or e.stdout or str(e)).strip() - error_exit(f"rp-cli failed: {msg}", use_json=False, code=2) + error_exit(f"rp-cli failed: {msg}", use_json=use_json, code=2) def normalize_repo_root(path: str) -> list[str]: @@ -98,7 +99,7 @@ def normalize_repo_root(path: str) -> list[str]: return list(dict.fromkeys(roots)) -def parse_windows(raw: str) -> list[dict[str, Any]]: +def parse_windows(raw: str, use_json: bool = False) -> list[dict[str, Any]]: """Parse rp-cli windows JSON.""" try: data = json.loads(raw) @@ -113,8 +114,8 @@ def parse_windows(raw: str) -> list[dict[str, Any]]: except json.JSONDecodeError as e: if "single-window mode" in raw: return [{"windowID": 1, "rootFolderPaths": []}] - error_exit(f"windows JSON parse failed: {e}", use_json=False, code=2) - error_exit("windows JSON has unexpected shape", use_json=False, code=2) + error_exit(f"windows JSON parse failed: {e}", use_json=use_json, code=2) + error_exit("windows JSON has unexpected shape", use_json=use_json, code=2) def extract_window_id(win: dict[str, Any]) -> Optional[int]: @@ -138,10 +139,10 @@ def extract_root_paths(win: dict[str, Any]) -> list[str]: return [] -def parse_builder_tab(output: str) -> str: +def parse_builder_tab(output: str, use_json: bool = False) -> str: match = re.search(r"Tab:\s*([A-Za-z0-9-]+)", output) if not match: - error_exit("builder output missing Tab id", use_json=False, code=2) + error_exit("builder output missing Tab id", use_json=use_json, code=2) return match.group(1) @@ -186,7 +187,7 @@ def build_chat_payload( def cmd_prep_chat(args: argparse.Namespace) -> None: """Prepare JSON payload for rp-cli chat_send. Handles escaping safely.""" # Read message from file - message = read_text_or_exit(Path(args.message_file), "Message file", use_json=False) + message = read_text_or_exit(Path(args.message_file), "Message file", use_json=getattr(args, 'json', False)) json_str = build_chat_payload( message=message, mode=args.mode, @@ -203,24 +204,26 @@ def cmd_prep_chat(args: argparse.Namespace) -> None: def cmd_rp_windows(args: argparse.Namespace) -> None: - result = run_rp_cli(["--raw-json", "-e", "windows"]) + use_json = getattr(args, 'json', False) + result = run_rp_cli(["--raw-json", "-e", "windows"], use_json=use_json) raw = result.stdout or "" if args.json: - windows = parse_windows(raw) + windows = parse_windows(raw, use_json=use_json) print(json.dumps(windows)) else: print(raw, end="") def cmd_rp_pick_window(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) repo_root = args.repo_root roots = normalize_repo_root(repo_root) - result = run_rp_cli(["--raw-json", "-e", "windows"]) - windows = parse_windows(result.stdout or "") + result = run_rp_cli(["--raw-json", "-e", "windows"], use_json=use_json) + windows = parse_windows(result.stdout or "", use_json=use_json) if len(windows) == 1 and not extract_root_paths(windows[0]): win_id = extract_window_id(windows[0]) if win_id is None: - error_exit("No window matches repo root", use_json=False, code=2) + error_exit("No window matches repo root", use_json=use_json, code=2) if args.json: print(json.dumps({"window": win_id})) else: @@ -237,10 +240,11 @@ def cmd_rp_pick_window(args: argparse.Namespace) -> None: else: print(win_id) return - error_exit("No window matches repo root", use_json=False, code=2) + error_exit("No window matches repo root", use_json=use_json, code=2) def cmd_rp_ensure_workspace(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) window = args.window repo_root = os.path.realpath(args.repo_root) ws_name = os.path.basename(repo_root) @@ -252,11 +256,11 @@ def cmd_rp_ensure_workspace(args: argparse.Namespace) -> None: "-e", f"call manage_workspaces {json.dumps({'action': 'list'})}", ] - list_res = run_rp_cli(list_cmd) + list_res = run_rp_cli(list_cmd, use_json=use_json) try: data = json.loads(list_res.stdout) except json.JSONDecodeError as e: - error_exit(f"workspace list JSON parse failed: {e}", use_json=False, code=2) + error_exit(f"workspace list JSON parse failed: {e}", use_json=use_json, code=2) def extract_names(obj: Any) -> set[str]: names: set[str] = set() @@ -284,7 +288,7 @@ def extract_names(obj: Any) -> set[str]: "-e", f"call manage_workspaces {json.dumps({'action': 'create', 'name': ws_name, 'folder_path': repo_root})}", ] - run_rp_cli(create_cmd) + run_rp_cli(create_cmd, use_json=use_json) switch_cmd = [ "-w", @@ -292,10 +296,11 @@ def extract_names(obj: Any) -> set[str]: "-e", f"call manage_workspaces {json.dumps({'action': 'switch', 'workspace': ws_name, 'window_id': window})}", ] - run_rp_cli(switch_cmd) + run_rp_cli(switch_cmd, use_json=use_json) def cmd_rp_builder(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) window = args.window summary = args.summary response_type = getattr(args, "response_type", None) @@ -313,7 +318,7 @@ def cmd_rp_builder(args: argparse.Namespace) -> None: builder_expr, ] cmd = [c for c in cmd if c] # Remove empty strings - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) output = (res.stdout or "") + ("\n" + res.stderr if res.stderr else "") # For review response-type, parse the full JSON response @@ -341,13 +346,13 @@ def cmd_rp_builder(args: argparse.Namespace) -> None: if review_response: print(review_response) except json.JSONDecodeError: - tab = parse_builder_tab(output) + tab = parse_builder_tab(output, use_json=use_json) if args.json: print(json.dumps({"window": window, "tab": tab, "error": "parse_failed"})) else: print(tab) else: - tab = parse_builder_tab(output) + tab = parse_builder_tab(output, use_json=use_json) if args.json: print(json.dumps({"window": window, "tab": tab})) else: @@ -355,13 +360,15 @@ def cmd_rp_builder(args: argparse.Namespace) -> None: def cmd_rp_prompt_get(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) cmd = ["-w", str(args.window), "-t", args.tab, "-e", "prompt get"] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) print(res.stdout, end="") def cmd_rp_prompt_set(args: argparse.Namespace) -> None: - message = read_text_or_exit(Path(args.message_file), "Message file", use_json=False) + use_json = getattr(args, 'json', False) + message = read_text_or_exit(Path(args.message_file), "Message file", use_json=use_json) payload = json.dumps({"op": "set", "text": message}) cmd = [ "-w", @@ -371,27 +378,30 @@ def cmd_rp_prompt_set(args: argparse.Namespace) -> None: "-e", f"call prompt {payload}", ] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) print(res.stdout, end="") def cmd_rp_select_get(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) cmd = ["-w", str(args.window), "-t", args.tab, "-e", "select get"] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) print(res.stdout, end="") def cmd_rp_select_add(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) if not args.paths: - error_exit("select-add requires at least one path", use_json=False, code=2) + error_exit("select-add requires at least one path", use_json=use_json, code=2) quoted = " ".join(shlex.quote(p) for p in args.paths) cmd = ["-w", str(args.window), "-t", args.tab, "-e", f"select add {quoted}"] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) print(res.stdout, end="") def cmd_rp_chat_send(args: argparse.Namespace) -> None: - message = read_text_or_exit(Path(args.message_file), "Message file", use_json=False) + use_json = getattr(args, 'json', False) + message = read_text_or_exit(Path(args.message_file), "Message file", use_json=use_json) chat_id_arg = getattr(args, "chat_id", None) mode = getattr(args, "mode", "chat") or "chat" payload = build_chat_payload( @@ -410,7 +420,7 @@ def cmd_rp_chat_send(args: argparse.Namespace) -> None: "-e", f"call chat_send {payload}", ] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) output = (res.stdout or "") + ("\n" + res.stderr if res.stderr else "") chat_id = parse_chat_id(output) if args.json: @@ -420,6 +430,7 @@ def cmd_rp_chat_send(args: argparse.Namespace) -> None: def cmd_rp_prompt_export(args: argparse.Namespace) -> None: + use_json = getattr(args, 'json', False) cmd = [ "-w", str(args.window), @@ -428,7 +439,7 @@ def cmd_rp_prompt_export(args: argparse.Namespace) -> None: "-e", f"prompt export {shlex.quote(args.out)}", ] - res = run_rp_cli(cmd) + res = run_rp_cli(cmd, use_json=use_json) print(res.stdout, end="") @@ -444,14 +455,15 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: Requires RepoPrompt 1.6.0+ for --response-type review. """ + use_json = getattr(args, 'json', False) repo_root = os.path.realpath(args.repo_root) summary = args.summary response_type = getattr(args, "response_type", None) # Step 1: pick-window (fast — 30s timeout, RP should respond instantly) roots = normalize_repo_root(repo_root) - result = run_rp_cli(["--raw-json", "-e", "windows"], timeout=30) - windows = parse_windows(result.stdout or "") + result = run_rp_cli(["--raw-json", "-e", "windows"], timeout=30, use_json=use_json) + windows = parse_windows(result.stdout or "", use_json=use_json) win_id: Optional[int] = None @@ -477,7 +489,7 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: # Auto-create window via workspace create --new-window (RP 1.5.68+) ws_name = os.path.basename(repo_root) create_cmd = f"workspace create {shlex.quote(ws_name)} --new-window --folder-path {shlex.quote(repo_root)}" - create_res = run_rp_cli(["--raw-json", "-e", create_cmd]) + create_res = run_rp_cli(["--raw-json", "-e", create_cmd], use_json=use_json) try: data = json.loads(create_res.stdout or "{}") win_id = data.get("window_id") @@ -486,11 +498,11 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: if not win_id: error_exit( f"Failed to create RP window: {create_res.stderr or create_res.stdout}", - use_json=False, + use_json=use_json, code=2, ) else: - error_exit("No RepoPrompt window matches repo root", use_json=False, code=2) + error_exit("No RepoPrompt window matches repo root", use_json=use_json, code=2) # Write state file for ralph-guard verification repo_hash = hashlib.sha256(repo_root.encode()).hexdigest()[:16] @@ -511,7 +523,7 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: ] builder_cmd = [c for c in builder_cmd if c] # Remove empty strings # Builder can be slow (context_builder analyzes files) — 1000s timeout - builder_res = run_rp_cli(builder_cmd, timeout=1000) + builder_res = run_rp_cli(builder_cmd, timeout=1000, use_json=use_json) output = (builder_res.stdout or "") + ( "\n" + builder_res.stderr if builder_res.stderr else "" ) @@ -525,7 +537,7 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: review_response = data.get("review", {}).get("response", "") if not tab: - error_exit("Builder did not return a tab id", use_json=False, code=2) + error_exit("Builder did not return a tab id", use_json=use_json, code=2) if args.json: print( @@ -546,11 +558,11 @@ def cmd_rp_setup_review(args: argparse.Namespace) -> None: if review_response: print(review_response) except json.JSONDecodeError: - error_exit("Failed to parse builder review response", use_json=False, code=2) + error_exit("Failed to parse builder review response", use_json=use_json, code=2) else: - tab = parse_builder_tab(output) + tab = parse_builder_tab(output, use_json=use_json) if not tab: - error_exit("Builder did not return a tab id", use_json=False, code=2) + error_exit("Builder did not return a tab id", use_json=use_json, code=2) if args.json: print(json.dumps({"window": win_id, "tab": tab, "repo_root": repo_root})) diff --git a/scripts/hooks/ralph-guard.py b/scripts/hooks/ralph-guard.py index 6e731fe3..d7ca90ae 100755 --- a/scripts/hooks/ralph-guard.py +++ b/scripts/hooks/ralph-guard.py @@ -17,7 +17,7 @@ """ # Version for drift detection (bump when making changes) -RALPH_GUARD_VERSION = "0.13.0" +RALPH_GUARD_VERSION = "0.15.0" import json import os @@ -27,9 +27,19 @@ from pathlib import Path +def _get_state_dir() -> Path: + """Get state directory: RUN_DIR > TMPDIR > /tmp.""" + run_dir = os.environ.get("RUN_DIR") + if run_dir: + p = Path(run_dir) + if p.is_dir(): + return p + return Path(os.environ.get("TMPDIR", "/tmp")) + + def get_state_file(session_id: str) -> Path: """Get state file path for this session.""" - return Path(f"/tmp/ralph-guard-{session_id}.json") + return _get_state_dir() / f"ralph-guard-{session_id}.json" def load_state(session_id: str) -> dict: @@ -150,8 +160,10 @@ def handle_protected_file_check(data: dict) -> None: def handle_file_lock_check(data: dict) -> None: """Block Edit/Write to files locked by another task in Teams mode. - Only active when FLOW_TEAMS=1. Checks the flowctl lock registry to ensure - workers only edit files they own. Fails-open if flowctl is unavailable. + Uses acquire-or-fail via 'flowctl lock' to eliminate TOCTOU race between + checking lock state and acting on the result. If lock succeeds, this task + now owns the file. If it fails (already locked by another task), block. + Fails-open if flowctl is unavailable. """ if os.environ.get("FLOW_TEAMS") != "1": return @@ -162,6 +174,9 @@ def handle_file_lock_check(data: dict) -> None: return my_task_id = os.environ.get("FLOW_TASK_ID", "") + if not my_task_id: + # No task context — fail-open + return # Resolve to relative path for lock comparison try: @@ -181,40 +196,52 @@ def handle_file_lock_check(data: dict) -> None: # Fail-open: flowctl unavailable return + # Acquire-or-fail: attempt to lock the file for this task. + # If already locked by this task, flowctl lock is idempotent. + # If locked by another task, flowctl lock fails with non-zero exit. try: result = subprocess.run( - ["python3", flowctl, "lock-check", "--file", rel_path, "--json"], + ["python3", flowctl, "lock", "--task", my_task_id, "--files", rel_path, "--json"], capture_output=True, text=True, timeout=5, cwd=str(get_repo_root()), ) - if result.returncode != 0: - # Fail-open: lock-check errored - return - lock_info = json.loads(result.stdout) - except (subprocess.TimeoutExpired, subprocess.SubprocessError, json.JSONDecodeError, OSError): + except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError): # Fail-open: any error return - if not lock_info.get("locked"): - # File not locked — warn but allow if we have a task ID + if result.returncode == 0: + # Lock acquired (or already owned) — allow return - owner = lock_info.get("owner", "") - if my_task_id and owner == my_task_id: - # Locked by this task — allow - return + # Lock failed — parse stderr/stdout for owner info + error_text = result.stderr or result.stdout or "" + owner_match = re.search(r"locked by ['\"]?([^'\"\s]+)", error_text, re.I) + owner = owner_match.group(1) if owner_match else "another task" - # Locked by a different task — block output_block( f"BLOCKED: File '{rel_path}' is locked by task '{owner}'. " - f"Your task ({my_task_id or 'unknown'}) does not own this file. " + f"Your task ({my_task_id}) does not own this file. " "Request access via 'Need file access:' protocol message or work on your own files." ) +def _normalize_command(cmd: str) -> str: + """Normalize a shell command string for reliable regex matching. + + Handles bypass vectors: tab insertion, empty quotes, excess whitespace. + """ + # Replace tabs with spaces + cmd = cmd.replace("\t", " ") + # Remove empty quote pairs ("" and '') + cmd = cmd.replace('""', "").replace("''", "") + # Collapse multiple spaces into one + cmd = re.sub(r" {2,}", " ", cmd) + return cmd.strip() + + def handle_pre_tool_use(data: dict) -> None: """Handle PreToolUse event - validate commands before execution.""" tool_input = data.get("tool_input", {}) - command = tool_input.get("command", "") + command = _normalize_command(tool_input.get("command", "")) session_id = data.get("session_id", "unknown") # Check for chat-send commands @@ -394,7 +421,7 @@ def handle_post_tool_use(data: dict) -> None: """Handle PostToolUse event - track state and provide feedback.""" tool_input = data.get("tool_input", {}) tool_response = data.get("tool_response", {}) - command = tool_input.get("command", "") + command = _normalize_command(tool_input.get("command", "")) session_id = data.get("session_id", "unknown") # Get response text @@ -647,12 +674,27 @@ def handle_subagent_stop(data: dict) -> None: handle_stop(data) +_LOG_MAX_BYTES = 1_048_576 # 1 MB + + def _debug_log_path() -> Path: - """Get debug log path: $RUN_DIR/guard-debug.log if set, else /tmp fallback.""" + """Get debug log path: $RUN_DIR/guard-debug.log if set, else /tmp fallback. + + Rotates to .1 when file exceeds 1MB. + """ run_dir = os.environ.get("RUN_DIR") if run_dir: - return Path(run_dir) / "guard-debug.log" - return Path("/tmp/ralph-guard-debug.log") + log = Path(run_dir) / "guard-debug.log" + else: + log = Path("/tmp/ralph-guard-debug.log") + # Rotate: if >1MB, move current to .1 (overwrite) + try: + if log.exists() and log.stat().st_size > _LOG_MAX_BYTES: + rotated = log.with_suffix(".log.1") + log.replace(rotated) + except OSError: + pass # Best-effort rotation + return log def main(): From 428661aeb569369579dd2ed738940952fdb901f4 Mon Sep 17 00:00:00 2001 From: z23cc Date: Fri, 3 Apr 2026 14:35:54 +0800 Subject: [PATCH 7/7] =?UTF-8?q?perf:=20batch=20git=20grep,=20fix=20find=5F?= =?UTF-8?q?dependents=20O(N=C2=B2),=20batch=20cmd=5Fnext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - gather_context_hints: collect all symbols, batch into ≤5 git grep calls instead of N per-symbol subprocess calls - find_dependents: load all task files once upfront, then BFS over in-memory dict instead of re-globbing per BFS level - cmd_next: use load_all_tasks_with_state(epic_id) single-scan batch load instead of per-file load_task_with_state loop Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/flowctl/commands/task.py | 43 ++++++----- scripts/flowctl/commands/workflow.py | 13 +--- scripts/flowctl/core/git.py | 103 +++++++++++++++++++++++---- 3 files changed, 118 insertions(+), 41 deletions(-) diff --git a/scripts/flowctl/commands/task.py b/scripts/flowctl/commands/task.py index 918fcc67..1bde9dab 100644 --- a/scripts/flowctl/commands/task.py +++ b/scripts/flowctl/commands/task.py @@ -166,7 +166,24 @@ def find_dependents(task_id: str, same_epic: bool = False) -> list[str]: return [] epic_id = epic_id_from_task(task_id) if same_epic else None - dependents: set[str] = set() # Use set to avoid duplicates + + # Load all task files once into memory (fixes O(N^2) re-globbing) + all_tasks: dict[str, list[str]] = {} # tid -> deps list + for task_file in tasks_dir.glob("fn-*.json"): + if not is_task_id(task_file.stem): + continue + try: + task_data = load_json(task_file) + tid = task_data.get("id", task_file.stem) + if same_epic and epic_id_from_task(tid) != epic_id: + continue + deps = task_data.get("depends_on", task_data.get("deps", [])) + all_tasks[tid] = deps + except Exception: + pass + + # BFS over in-memory dict + dependents: set[str] = set() to_check = [task_id] checked = set() @@ -176,24 +193,12 @@ def find_dependents(task_id: str, same_epic: bool = False) -> list[str]: continue checked.add(checking) - for task_file in tasks_dir.glob("fn-*.json"): - if not is_task_id(task_file.stem): - continue # Skip non-task files (e.g., fn-1.2-review.json) - try: - task_data = load_json(task_file) - tid = task_data.get("id", task_file.stem) - if tid in checked or tid in dependents: - continue - # Skip if same_epic filter and different epic - if same_epic and epic_id_from_task(tid) != epic_id: - continue - # Support both legacy "deps" and current "depends_on" - deps = task_data.get("depends_on", task_data.get("deps", [])) - if checking in deps: - dependents.add(tid) - to_check.append(tid) - except Exception: - pass + for tid, deps in all_tasks.items(): + if tid in checked or tid in dependents: + continue + if checking in deps: + dependents.add(tid) + to_check.append(tid) return sorted(dependents) diff --git a/scripts/flowctl/commands/workflow.py b/scripts/flowctl/commands/workflow.py index 891d608d..3d428181 100644 --- a/scripts/flowctl/commands/workflow.py +++ b/scripts/flowctl/commands/workflow.py @@ -39,6 +39,7 @@ from flowctl.core.state import ( get_state_store, load_task_definition, + load_all_tasks_with_state, load_task_with_state, reset_task_runtime, save_task_runtime, @@ -283,16 +284,8 @@ def sort_key(t: dict) -> tuple[int, int]: use_json=args.json, ) - tasks: dict[str, dict] = {} - for task_file in tasks_dir.glob(f"{epic_id}.*.json"): - task_id = task_file.stem - if not is_task_id(task_id): - continue # Skip non-task files (e.g., fn-1.2-review.json) - # Load task with merged runtime state - task_data = load_task_with_state(task_id, use_json=args.json) - if "id" not in task_data: - continue # Skip artifact files (GH-21) - tasks[task_data["id"]] = task_data + # Batch-load all tasks for this epic in a single directory scan + tasks = load_all_tasks_with_state(epic_id) # Resume in_progress tasks owned by current actor in_progress = [ diff --git a/scripts/flowctl/core/git.py b/scripts/flowctl/core/git.py index 1cd16921..eed83cb0 100644 --- a/scripts/flowctl/core/git.py +++ b/scripts/flowctl/core/git.py @@ -366,6 +366,72 @@ def find_references( return [] +def _batch_find_references( + symbols: list[str], exclude_files: list[str] +) -> dict[str, list[tuple[str, int]]]: + """Find references for multiple symbols in a single git grep call. + + Returns dict mapping symbol -> [(path, line_number), ...]. + Batches symbols into chunks of ~50 to avoid command-line length issues. + """ + if not symbols: + return {} + + repo_root = get_repo_root() + exclude_set = set(exclude_files) + results: dict[str, list[tuple[str, int]]] = {s: [] for s in symbols} + + file_globs = [ + "*.py", "*.js", "*.ts", "*.tsx", "*.jsx", "*.mjs", + "*.go", "*.rs", + "*.c", "*.h", "*.cpp", "*.hpp", "*.cc", "*.cxx", + "*.java", "*.cs", + ] + + # Batch into chunks of 50 symbols + chunk_size = 50 + for i in range(0, len(symbols), chunk_size): + chunk = symbols[i : i + chunk_size] + + # Build -e sym1 -e sym2 ... args for git grep + grep_args = ["git", "grep", "-n", "-w"] + for sym in chunk: + grep_args.extend(["-e", sym]) + grep_args.append("--") + grep_args.extend(file_globs) + + try: + result = subprocess.run( + grep_args, + capture_output=True, + text=True, + cwd=repo_root, + ) + for line in result.stdout.strip().split("\n"): + if not line: + continue + parts = line.split(":", 2) + if len(parts) < 2: + continue + file_path = parts[0] + if file_path in exclude_set: + continue + try: + line_num = int(parts[1]) + except ValueError: + continue + # Match line content against symbols in this chunk + line_text = parts[2] if len(parts) > 2 else "" + for sym in chunk: + if re.search(r"\b" + re.escape(sym) + r"\b", line_text): + results[sym].append((file_path, line_num)) + break + except subprocess.CalledProcessError: + pass + + return results + + def gather_context_hints(base_branch: str, max_hints: int = 15) -> str: """Gather context hints for code review. @@ -373,6 +439,8 @@ def gather_context_hints(base_branch: str, max_hints: int = 15) -> str: Consider these related files: - src/auth.ts:15 - references validateToken - src/types.ts:42 - references User + + Uses batched git grep (<=5 calls) instead of per-symbol subprocess calls. """ changed_files = get_changed_files(base_branch) if not changed_files: @@ -382,23 +450,34 @@ def gather_context_hints(base_branch: str, max_hints: int = 15) -> str: changed_files = changed_files[:50] repo_root = get_repo_root() - hints = [] - seen_files = set(changed_files) + # Collect all symbols from all changed files + all_symbols: list[str] = [] + symbol_source: dict[str, str] = {} # symbol -> source file (for dedup) for changed_file in changed_files: file_path = repo_root / changed_file symbols = extract_symbols_from_file(file_path) + for sym in symbols[:10]: + if sym not in symbol_source: + all_symbols.append(sym) + symbol_source[sym] = changed_file - for symbol in symbols[:10]: - refs = find_references(symbol, changed_files, max_results=2) - for ref_path, ref_line in refs: - if ref_path not in seen_files: - hints.append(f"- {ref_path}:{ref_line} - references {symbol}") - seen_files.add(ref_path) - if len(hints) >= max_hints: - break - if len(hints) >= max_hints: - break + # Cap at 250 symbols (5 batches of 50) + all_symbols = all_symbols[:250] + + # Batch git grep for all symbols at once + refs_by_symbol = _batch_find_references(all_symbols, changed_files) + + hints = [] + seen_files = set(changed_files) + + for sym in all_symbols: + for ref_path, ref_line in refs_by_symbol.get(sym, [])[:2]: + if ref_path not in seen_files: + hints.append(f"- {ref_path}:{ref_line} - references {sym}") + seen_files.add(ref_path) + if len(hints) >= max_hints: + break if len(hints) >= max_hints: break