Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions scripts/flowctl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from flowctl.commands.admin import (
cmd_init,
cmd_detect,
cmd_doctor,
cmd_status,
cmd_ralph_pause,
cmd_ralph_resume,
Expand Down Expand Up @@ -45,6 +46,7 @@
cmd_epic_rm_dep,
cmd_epic_set_backend,
cmd_epic_close,
cmd_epic_reopen,
cmd_epic_archive,
cmd_epic_clean,
)
Expand Down Expand Up @@ -336,6 +338,11 @@ def main() -> None:
p_epic_close.add_argument("--json", action="store_true", help="JSON output")
p_epic_close.set_defaults(func=cmd_epic_close)

p_epic_reopen = epic_sub.add_parser("reopen", help="Reopen a closed epic")
p_epic_reopen.add_argument("id", help="Epic ID (e.g., fn-1, fn-1-add-auth)")
p_epic_reopen.add_argument("--json", action="store_true", help="JSON output")
p_epic_reopen.set_defaults(func=cmd_epic_reopen)

p_epic_archive = epic_sub.add_parser(
"archive", help="Archive closed epic to .flow/.archive/"
)
Expand Down Expand Up @@ -700,6 +707,13 @@ def main() -> None:
p_validate.add_argument("--json", action="store_true", help="JSON output")
p_validate.set_defaults(func=cmd_validate)

# doctor
p_doctor = subparsers.add_parser(
"doctor", help="Run comprehensive state health diagnostics"
)
p_doctor.add_argument("--json", action="store_true", help="JSON output")
p_doctor.set_defaults(func=cmd_doctor)

# checkpoint
p_checkpoint = subparsers.add_parser("checkpoint", help="Checkpoint commands")
checkpoint_sub = p_checkpoint.add_subparsers(dest="checkpoint_cmd", required=True)
Expand Down
244 changes: 239 additions & 5 deletions scripts/flowctl/commands/admin.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Admin commands: init, detect, status, ralph control, config, review-backend, validate."""
"""Admin commands: init, detect, status, ralph control, config, review-backend, validate, doctor."""

import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional

Expand All @@ -25,6 +28,7 @@
deep_merge,
get_config,
get_default_config,
load_flow_config,
set_config,
)
from flowctl.core.ids import is_epic_id, is_task_id, normalize_epic
Expand All @@ -36,8 +40,8 @@
load_json,
load_json_or_exit,
)
from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root
from flowctl.core.state import load_task_with_state
from flowctl.core.paths import ensure_flow_exists, get_flow_dir, get_repo_root, get_state_dir
from flowctl.core.state import get_state_store, load_task_with_state
from flowctl.commands.stack import detect_stack


Expand Down Expand Up @@ -128,13 +132,19 @@ def find_active_run(
# --- Validation helpers ---


def _strip_fenced_blocks(content: str) -> str:
"""Remove fenced code blocks (``` ... ```) so headings inside them are ignored."""
return re.sub(r"^```.*?^```", "", content, flags=re.MULTILINE | re.DOTALL)


def validate_task_spec_headings(content: str) -> list[str]:
"""Validate task spec has required headings exactly once. Returns errors."""
# Strip fenced code blocks so ## headings inside examples aren't counted
stripped = _strip_fenced_blocks(content)
errors = []
for heading in TASK_SPEC_HEADINGS:
# Use regex anchored to line start to avoid matching inside code blocks
pattern = rf"^{re.escape(heading)}\s*$"
count = len(re.findall(pattern, content, flags=re.MULTILINE))
count = len(re.findall(pattern, stripped, flags=re.MULTILINE))
if count == 0:
errors.append(f"Missing required heading: {heading}")
elif count > 1:
Expand Down Expand Up @@ -949,3 +959,227 @@ def cmd_validate(args: argparse.Namespace) -> None:
# Exit with non-zero if validation failed
if not valid:
sys.exit(1)


# --- Doctor command ---


def cmd_doctor(args: argparse.Namespace) -> None:
"""Run comprehensive state health diagnostics (superset of validate --all)."""
if not ensure_flow_exists():
error_exit(
".flow/ does not exist. Run 'flowctl init' first.", use_json=args.json
)

flow_dir = get_flow_dir()
checks: list[dict] = []

def add_check(name: str, status: str, message: str) -> None:
checks.append({"name": name, "status": status, "message": message})

# --- Check 1: Run validate --all internally ---
import io as _io
import contextlib

fake_args = argparse.Namespace(epic=None, all=True, json=True)
validate_output = _io.StringIO()
validate_passed = True
try:
with contextlib.redirect_stdout(validate_output):
cmd_validate(fake_args)
except SystemExit as e:
if e.code != 0:
validate_passed = False

if validate_passed:
add_check("validate", "pass", "All epics and tasks validated successfully")
else:
# Parse the validate output for details
try:
vdata = json.loads(validate_output.getvalue())
err_count = vdata.get("total_errors", 0)
add_check(
"validate", "fail",
f"Validation found {err_count} error(s). Run 'flowctl validate --all' for details"
)
except (json.JSONDecodeError, ValueError):
add_check("validate", "fail", "Validation failed (could not parse output)")

# --- Check 2: State-dir accessibility ---
try:
state_dir = get_state_dir()
state_dir.mkdir(parents=True, exist_ok=True)
# Test write access
test_file = state_dir / ".doctor-probe"
test_file.write_text("probe", encoding="utf-8")
test_file.unlink()
add_check("state_dir_access", "pass", f"State dir accessible: {state_dir}")
except (OSError, PermissionError) as e:
add_check("state_dir_access", "fail", f"State dir not accessible: {e}")

# --- Check 3: Orphaned state files ---
try:
store = get_state_store()
runtime_ids = store.list_runtime_files()
tasks_dir = flow_dir / TASKS_DIR
orphaned = []
for rid in runtime_ids:
task_def_path = tasks_dir / f"{rid}.json"
if not task_def_path.exists():
orphaned.append(rid)
if orphaned:
add_check(
"orphaned_state", "warn",
f"{len(orphaned)} orphaned state file(s): {', '.join(orphaned[:5])}"
+ (f" (+{len(orphaned) - 5} more)" if len(orphaned) > 5 else "")
)
else:
add_check("orphaned_state", "pass", "No orphaned state files")
except Exception as e:
add_check("orphaned_state", "warn", f"Could not check orphaned state: {e}")

# --- Check 4: Stale in_progress tasks (>7 days) ---
try:
stale = []
tasks_dir = flow_dir / TASKS_DIR
if tasks_dir.exists():
for task_file in tasks_dir.glob("fn-*.json"):
task_id = task_file.stem
if not is_task_id(task_id):
continue
try:
task_data = load_task_with_state(task_id, use_json=True)
except SystemExit:
continue
if task_data.get("status") != "in_progress":
continue
updated = task_data.get("updated_at") or task_data.get("claimed_at")
if updated:
try:
# Parse ISO timestamp
ts = updated.replace("Z", "+00:00")
task_time = datetime.fromisoformat(ts)
now = datetime.utcnow().replace(
tzinfo=task_time.tzinfo
)
age_days = (now - task_time).days
if age_days > 7:
stale.append(f"{task_id} ({age_days}d)")
except (ValueError, TypeError):
pass
if stale:
add_check(
"stale_tasks", "warn",
f"{len(stale)} task(s) in_progress for >7 days: {', '.join(stale[:5])}"
+ (f" (+{len(stale) - 5} more)" if len(stale) > 5 else "")
)
else:
add_check("stale_tasks", "pass", "No stale in_progress tasks")
except Exception as e:
add_check("stale_tasks", "warn", f"Could not check stale tasks: {e}")

# --- Check 5: Lock file accumulation ---
try:
state_dir = get_state_dir()
locks_dir = state_dir / "locks"
lock_count = 0
if locks_dir.exists():
lock_count = sum(1 for _ in locks_dir.glob("*.lock"))
if lock_count > 50:
add_check(
"lock_files", "warn",
f"{lock_count} lock files in state dir (consider cleanup)"
)
else:
add_check(
"lock_files", "pass",
f"{lock_count} lock file(s) in state dir"
)
except Exception as e:
add_check("lock_files", "warn", f"Could not check lock files: {e}")

# --- Check 6: Config validity ---
try:
config_path = flow_dir / CONFIG_FILE
if config_path.exists():
raw_text = config_path.read_text(encoding="utf-8")
parsed = json.loads(raw_text)
if not isinstance(parsed, dict):
add_check("config", "fail", "config.json is not a JSON object")
else:
# Check for known top-level keys
known_keys = set(get_default_config().keys())
unknown = set(parsed.keys()) - known_keys
if unknown:
add_check(
"config", "warn",
f"Unknown config keys: {', '.join(sorted(unknown))}"
)
else:
add_check("config", "pass", "config.json valid with known keys")
else:
add_check("config", "warn", "config.json missing (run 'flowctl init')")
except json.JSONDecodeError as e:
add_check("config", "fail", f"config.json invalid JSON: {e}")
except Exception as e:
add_check("config", "warn", f"Could not check config: {e}")

# --- Check 7: git-common-dir reachability ---
try:
result = subprocess.run(
["git", "rev-parse", "--git-common-dir", "--path-format=absolute"],
capture_output=True, text=True, check=True,
)
common_dir = Path(result.stdout.strip())
if common_dir.exists():
add_check(
"git_common_dir", "pass",
f"git common-dir reachable: {common_dir}"
)
else:
add_check(
"git_common_dir", "warn",
f"git common-dir path does not exist: {common_dir}"
)
except subprocess.CalledProcessError:
add_check(
"git_common_dir", "warn",
"Not in a git repository (git common-dir unavailable)"
)
except FileNotFoundError:
add_check(
"git_common_dir", "warn",
"git not found on PATH"
)

# --- Build summary ---
summary = {"pass": 0, "warn": 0, "fail": 0}
for c in checks:
summary[c["status"]] += 1

overall_healthy = summary["fail"] == 0

if args.json:
json_output(
{
"checks": checks,
"summary": summary,
"healthy": overall_healthy,
},
success=overall_healthy,
)
else:
print("Doctor diagnostics:")
for c in checks:
icon = {"pass": "OK", "warn": "WARN", "fail": "FAIL"}[c["status"]]
print(f" [{icon}] {c['name']}: {c['message']}")
print()
print(
f"Summary: {summary['pass']} pass, "
f"{summary['warn']} warn, {summary['fail']} fail"
)
if not overall_healthy:
print("Health check FAILED — resolve fail items above.")

if not overall_healthy:
sys.exit(1)
Loading