Conversation
…es + state machine - Create Cargo workspace under flowctl/ with [workspace.dependencies] and [workspace.package] patterns (nushell convention) - 6 crates: flowctl-core, flowctl-db, flowctl-scheduler, flowctl-cli, flowctl-daemon, flowctl-tui with correct dependency DAG - Implement core types: Task, Epic, Phase, Evidence, TaskId, EpicId with serde derives and comprehensive field coverage - Implement state machine with all 8 states: todo, in_progress, done, blocked, skipped, failed, up_for_retry, upstream_failed - Port ID parsing regex from Python ids.py:49-66 using std::sync::LazyLock - Port slugify() from Python ids.py:16-46 with Unicode NFKD decomposition - Use thiserror for library errors in core crate - Release profile: opt-level="z", lto="fat", codegen-units=1, strip=true - 48 unit tests + 2 doc-tests, all passing - cargo clippy --workspace passes with zero warnings Task: fn-2-flowctl-rust-platform-rewrite.1
6-crate Cargo workspace (core, db, scheduler, cli, daemon, tui) with: - 65+ CLI commands ported from Python with identical JSON output - Markdown canonical + SQLite cache data architecture - YAML frontmatter parser (serde-saphyr) with round-trip safety - petgraph DAG with Kahn's algorithm, cycle detection, critical path - Daemon with PID lock, Unix socket, graceful shutdown (CancellationToken) - TUI dashboard (4 tabs: Tasks, DAG, Logs, Stats) with ratatui - Event bus (broadcast + mpsc), HTTP API (axum), WebSocket streaming - DAG scheduler with bounded parallelism (Semaphore), heartbeat watchdog - File watcher (notify) with debouncing, circuit breaker - Stats dashboard with DORA metrics, token tracking, auto-cleanup - upstream_failed propagation, retry logic, shell completions - miette diagnostics for rich error display - Python shim dispatch (FLOWCTL_RUST=1) - Integration tests (Rust vs Python parity), 224 unit tests - cargo-dist config, install.sh, release workflow Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Implements a full Rust rewrite of flowctl as a multi-crate workspace (core types/frontmatter, SQLite DB, scheduler, CLI, daemon, TUI), adds parity/contract testing, and introduces CI/release automation for building and shipping binaries.
Changes:
- Added
flowctl-corefoundational types (IDs, state machine, frontmatter parsing) with unit tests. - Added
flowctl-clicommand surface + trycmd + Python parity integration tests, plus several command implementations. - Added
flowctl-daemonHTTP/WS handler scaffolding and GitHub CI/release workflows.
Reviewed changes
Copilot reviewed 37 out of 119 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| flowctl/crates/flowctl-daemon/src/lib.rs | Establishes daemon crate surface + feature-gated modules. |
| flowctl/crates/flowctl-daemon/src/handlers.rs | Adds HTTP endpoints and WS event streaming handlers. |
| flowctl/crates/flowctl-daemon/Cargo.toml | Defines daemon crate deps and daemon feature gating. |
| flowctl/crates/flowctl-core/src/types.rs | Introduces core domain types/constants and serialization defaults. |
| flowctl/crates/flowctl-core/src/state_machine.rs | Adds task status state machine + transition validation + tests. |
| flowctl/crates/flowctl-core/src/lib.rs | Exposes core modules and re-exports for downstream crates. |
| flowctl/crates/flowctl-core/src/id.rs | Implements ID parsing/slugify/suffix generation + parity tests. |
| flowctl/crates/flowctl-core/src/frontmatter.rs | Implements YAML frontmatter parse/write + tests. |
| flowctl/crates/flowctl-core/src/error.rs | Defines CoreError for library-style errors. |
| flowctl/crates/flowctl-core/Cargo.toml | Declares core crate dependencies (serde, regex, petgraph, etc.). |
| flowctl/crates/flowctl-cli/tests/parity_test.rs | Adds integration parity tests running Python and Rust CLIs. |
| flowctl/crates/flowctl-cli/tests/cli_tests.rs | Adds trycmd-driven CLI contract regression tests. |
| flowctl/crates/flowctl-cli/src/output.rs | Adds JSON output helpers and stub/not-implemented responses. |
| flowctl/crates/flowctl-cli/src/main.rs | Defines clap CLI surface and dispatches to command modules. |
| flowctl/crates/flowctl-cli/src/diagnostics.rs | Adds miette-based diagnostics for frontmatter parsing errors. |
| flowctl/crates/flowctl-cli/src/commands/stats.rs | Implements flowctl stats querying SQLite and formatting output. |
| flowctl/crates/flowctl-cli/src/commands/stack.rs | Implements stack detection/config and invariants commands. |
| flowctl/crates/flowctl-cli/src/commands/rp.rs | Wraps rp-cli for RepoPrompt window/workspace/chat operations. |
| flowctl/crates/flowctl-cli/src/commands/ralph.rs | Implements ralph run-control via sentinel files and progress parsing. |
| flowctl/crates/flowctl-cli/src/commands/mod.rs | Registers CLI command modules. |
| flowctl/crates/flowctl-cli/src/commands/gap.rs | Implements gap registry via .gaps.json sidecar with gating. |
| flowctl/crates/flowctl-cli/src/commands/dep.rs | Implements dependency add/remove updating Markdown + SQLite cache. |
| flowctl/crates/flowctl-cli/src/commands/codex.rs | Wraps codex CLI for reviews and receipt management. |
| flowctl/crates/flowctl-cli/src/commands/checkpoint.rs | Implements DB checkpoint save/restore/delete utilities. |
| flowctl/crates/flowctl-cli/Cargo.toml | Defines CLI crate deps and tui feature. |
| flowctl/README.md | Adds Rust rewrite documentation, install steps, and quickstart. |
| flowctl/Cargo.toml | Adds workspace members, shared deps, and release profile settings. |
| flowctl/.gitignore | Ignores Cargo build output. |
| .github/workflows/release.yml | Adds multi-target release build + packaging + checksums. |
| .github/workflows/ci.yml | Adds Rust CI for fmt/clippy/build/test on PRs and main. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Ok(epics) => (StatusCode::OK, Json(serde_json::to_value(&epics).unwrap())), | ||
| Err(e) => ( | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| Json(serde_json::json!({"error": e.to_string()})), | ||
| ), |
There was a problem hiding this comment.
serde_json::to_value(...).unwrap() can panic the daemon process on serialization failure. This should be converted into an error response (500) instead of unwinding (e.g., map the serialization error into {"error": ...} and return StatusCode::INTERNAL_SERVER_ERROR).
| pub async fn epics_handler(State(state): State<AppState>) -> impl IntoResponse { | ||
| let db_path = state | ||
| .runtime | ||
| .paths | ||
| .state_dir | ||
| .parent() | ||
| .map(|flow_dir| flow_dir.join("flowctl.db")); |
There was a problem hiding this comment.
flowctl_db::open and the subsequent rusqlite queries are synchronous and will run on the async executor thread, potentially blocking the daemon under load (especially if multiple requests arrive concurrently). Consider moving DB open/query work to tokio::task::spawn_blocking (or maintaining a shared connection/pool in state) so the HTTP server remains responsive.
| match flowctl_db::open(&db_path) { | ||
| Ok(conn) => { | ||
| let repo = flowctl_db::EpicRepo::new(&conn); | ||
| match repo.list(None) { |
There was a problem hiding this comment.
flowctl_db::open and the subsequent rusqlite queries are synchronous and will run on the async executor thread, potentially blocking the daemon under load (especially if multiple requests arrive concurrently). Consider moving DB open/query work to tokio::task::spawn_blocking (or maintaining a shared connection/pool in state) so the HTTP server remains responsive.
| pub fn parse<T: DeserializeOwned>(input: &str) -> Result<Document<T>, CoreError> { | ||
| let trimmed = input.trim_start(); | ||
|
|
||
| if !trimmed.starts_with("---") { | ||
| return Err(CoreError::FrontmatterParse( | ||
| "document does not start with ---".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| // Skip the opening "---" line. | ||
| let after_open = match trimmed.strip_prefix("---") { | ||
| Some(rest) => { | ||
| // Consume the newline (or the rest if it's just "---" at EOF). | ||
| rest.strip_prefix('\n').unwrap_or(rest) | ||
| } | ||
| None => unreachable!(), | ||
| }; |
There was a problem hiding this comment.
This only consumes \n after the opening ---, so files with Windows line endings (---\r\n) will leave a leading \r in the YAML string and may prevent finding the closing delimiter (find_closing_delimiter searches for \n---). To be platform-robust, handle \r\n explicitly for both opening and closing delimiter parsing (e.g., strip an optional \r before \n, and/or normalize line endings up front).
| matches!(self, Status::Done | Status::Skipped) | ||
| } | ||
|
|
||
| /// Whether this status represents a terminal state (no further transitions). |
There was a problem hiding this comment.
The doc comment says “no further transitions,” but the state machine explicitly allows * -> UpstreamFailed, and tests expect Done to have a valid target UpstreamFailed. Either the comment should be tightened (e.g., “no further user-initiated transitions” / “terminal for normal workflow”), or is_terminal should reflect the actual transition semantics you want.
| /// Whether this status represents a terminal state (no further transitions). | |
| /// Whether this status is terminal for the normal task workflow. | |
| /// | |
| /// `done` and `skipped` represent completed outcomes for user-driven task | |
| /// execution, even though the broader state machine may still allow special | |
| /// transitions such as `-> upstream_failed`. |
| pub fn stub(command_name: &str, json_mode: bool) { | ||
| if json_mode { | ||
| json_output(json!({ | ||
| "status": "not_implemented", | ||
| "command": command_name, | ||
| "message": format!("{} is not yet implemented in Rust flowctl", command_name), | ||
| })); | ||
| } else { |
There was a problem hiding this comment.
In JSON mode, stub() reports "success": true (because json_output always forces it) and exits with code 0 (no exit(1) on the JSON branch). That makes “not implemented” indistinguishable from success for automation and contradicts the non-JSON branch behavior. Suggest making stub responses success=false and using a non-zero exit code in both modes (or introduce a separate json_error_output helper).
| flowctl task create -e ep-1 "Design token schema" --domain backend | ||
| flowctl task create -e ep-1 "Implement JWT middleware" --domain backend | ||
| flowctl dep add tsk-2 tsk-1 | ||
|
|
||
| # Start work | ||
| flowctl start tsk-1 | ||
|
|
||
| # Complete with evidence | ||
| flowctl done tsk-1 --summary-file summary.md --evidence-json evidence.json | ||
|
|
||
| # Check status | ||
| flowctl status | ||
| flowctl tasks -e ep-1 |
There was a problem hiding this comment.
The quickstart uses ID formats like ep-1 / tsk-1, but the core ID format implemented elsewhere in this PR is fn-N and fn-N.M (e.g., fn-1-add-auth.3). This mismatch will mislead new users—recommend updating the README examples to match the actual ID scheme and CLI flags (e.g., --epic fn-1-...).
| flowctl task create -e ep-1 "Design token schema" --domain backend | |
| flowctl task create -e ep-1 "Implement JWT middleware" --domain backend | |
| flowctl dep add tsk-2 tsk-1 | |
| # Start work | |
| flowctl start tsk-1 | |
| # Complete with evidence | |
| flowctl done tsk-1 --summary-file summary.md --evidence-json evidence.json | |
| # Check status | |
| flowctl status | |
| flowctl tasks -e ep-1 | |
| flowctl task create --epic fn-1-build-auth-system "Design token schema" --domain backend | |
| flowctl task create --epic fn-1-build-auth-system "Implement JWT middleware" --domain backend | |
| flowctl dep add fn-1-build-auth-system.2 fn-1-build-auth-system.1 | |
| # Start work | |
| flowctl start fn-1-build-auth-system.1 | |
| # Complete with evidence | |
| flowctl done fn-1-build-auth-system.1 --summary-file summary.md --evidence-json evidence.json | |
| # Check status | |
| flowctl status | |
| flowctl tasks --epic fn-1-build-auth-system |
| /// Run Python flowctl: `python3 flowctl.py <cmd...> --json` | ||
| fn run_python(work_dir: &Path, args: &[&str]) -> (String, i32) { | ||
| let py = python_flowctl(); | ||
| let mut cmd_args: Vec<&str> = args.to_vec(); | ||
| cmd_args.push("--json"); | ||
|
|
||
| let output = Command::new("python3") | ||
| .arg(&py) | ||
| .args(&cmd_args) | ||
| .current_dir(work_dir) | ||
| .output() | ||
| .expect("Failed to run python3"); | ||
|
|
||
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); | ||
| let stderr = String::from_utf8_lossy(&output.stderr).to_string(); |
There was a problem hiding this comment.
These integration tests hard-require python3 and the Python script to be present; expect("Failed to run python3") will fail the entire test suite on systems without Python (or on Windows where python3 isn’t typical). To keep cargo test usable for more environments, consider gating parity tests behind an env var (e.g., FLOWCTL_RUN_PARITY=1) or skipping when python3 (or the script path) isn’t available.
| let _timeout = timeout_secs.unwrap_or_else(|| { | ||
| env::var("FLOW_RP_TIMEOUT") | ||
| .ok() | ||
| .and_then(|v| v.parse().ok()) | ||
| .unwrap_or(1200) | ||
| }); | ||
|
|
||
| let result = Command::new(&rp) | ||
| .args(args) | ||
| .output(); | ||
|
|
||
| match result { | ||
| Ok(output) => { | ||
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); | ||
| let stderr = String::from_utf8_lossy(&output.stderr).to_string(); | ||
| if !output.status.success() { | ||
| let msg = if !stderr.is_empty() { &stderr } else if !stdout.is_empty() { &stdout } else { "unknown error" }; | ||
| error_exit(&format!("rp-cli failed: {}", msg.trim())); | ||
| } | ||
| (stdout, stderr) | ||
| } | ||
| Err(e) => { | ||
| error_exit(&format!("rp-cli failed: {e}")); | ||
| } |
There was a problem hiding this comment.
The timeout_secs parameter is computed but never enforced (only stored in _timeout). As written, rp-cli subprocesses can hang indefinitely, which is risky for automation and interactive UX. Either implement an actual timeout (kill the process if elapsed) or remove the parameter/env var to avoid a false sense of safety.
| let _timeout = timeout_secs.unwrap_or_else(|| { | |
| env::var("FLOW_RP_TIMEOUT") | |
| .ok() | |
| .and_then(|v| v.parse().ok()) | |
| .unwrap_or(1200) | |
| }); | |
| let result = Command::new(&rp) | |
| .args(args) | |
| .output(); | |
| match result { | |
| Ok(output) => { | |
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); | |
| let stderr = String::from_utf8_lossy(&output.stderr).to_string(); | |
| if !output.status.success() { | |
| let msg = if !stderr.is_empty() { &stderr } else if !stdout.is_empty() { &stdout } else { "unknown error" }; | |
| error_exit(&format!("rp-cli failed: {}", msg.trim())); | |
| } | |
| (stdout, stderr) | |
| } | |
| Err(e) => { | |
| error_exit(&format!("rp-cli failed: {e}")); | |
| } | |
| let timeout = timeout_secs.unwrap_or_else(|| { | |
| env::var("FLOW_RP_TIMEOUT") | |
| .ok() | |
| .and_then(|v| v.parse().ok()) | |
| .unwrap_or(1200) | |
| }); | |
| let mut child = match Command::new(&rp) | |
| .args(args) | |
| .stdout(std::process::Stdio::piped()) | |
| .stderr(std::process::Stdio::piped()) | |
| .spawn() | |
| { | |
| Ok(child) => child, | |
| Err(e) => { | |
| error_exit(&format!("rp-cli failed: {e}")); | |
| } | |
| }; | |
| let start = std::time::Instant::now(); | |
| let poll_interval = std::time::Duration::from_millis(100); | |
| loop { | |
| match child.try_wait() { | |
| Ok(Some(_status)) => { | |
| let output = match child.wait_with_output() { | |
| Ok(output) => output, | |
| Err(e) => error_exit(&format!("rp-cli failed: {e}")), | |
| }; | |
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); | |
| let stderr = String::from_utf8_lossy(&output.stderr).to_string(); | |
| if !output.status.success() { | |
| let msg = if !stderr.is_empty() { | |
| &stderr | |
| } else if !stdout.is_empty() { | |
| &stdout | |
| } else { | |
| "unknown error" | |
| }; | |
| error_exit(&format!("rp-cli failed: {}", msg.trim())); | |
| } | |
| return (stdout, stderr); | |
| } | |
| Ok(None) => { | |
| if start.elapsed() >= std::time::Duration::from_secs(timeout) { | |
| let _ = child.kill(); | |
| let output = child.wait_with_output().ok(); | |
| let msg = output | |
| .as_ref() | |
| .map(|output| { | |
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); | |
| let stderr = String::from_utf8_lossy(&output.stderr).to_string(); | |
| if !stderr.trim().is_empty() { | |
| stderr.trim().to_string() | |
| } else if !stdout.trim().is_empty() { | |
| stdout.trim().to_string() | |
| } else { | |
| format!("timed out after {} seconds", timeout) | |
| } | |
| }) | |
| .unwrap_or_else(|| format!("timed out after {} seconds", timeout)); | |
| error_exit(&format!("rp-cli timed out after {} seconds: {}", timeout, msg)); | |
| } | |
| std::thread::sleep(poll_interval); | |
| } | |
| Err(e) => { | |
| let _ = child.kill(); | |
| error_exit(&format!("rp-cli failed: {e}")); | |
| } | |
| } |
| // Wait with timeout | ||
| let _timeout = std::time::Duration::from_secs(timeout_secs); | ||
| match child.wait_with_output() { | ||
| Ok(output) => { | ||
| let stdout = String::from_utf8_lossy(&output.stdout).to_string(); |
There was a problem hiding this comment.
Similar to rp.rs, the code comments and _timeout variable imply a timeout, but wait_with_output() is unconditional and can block indefinitely if codex hangs. This can wedge CI or local runs. Consider adding real timeout handling (and ensuring the child process is terminated on timeout).
P0 fixes (state loss — root cause of 5 issues): - get_flow_dir() now walks up directory tree (FLOW_STATE_DIR env → walk-up → CWD) Fixes: #1 state loss, #3 state not persistent, #5 worker parallel fail, #9 .flow symlink issues. Same pattern as git finding .git. - flowctl recover --epic <id> [--dry-run]: rebuilds task completion status from git log. Fixes #11 no recovery after state loss. P1 fixes (guard + review): - Guard graceful fallback: missing tools → "skipped" (not "failed"). Only actual failures block pipeline. Fixes #8. - Review-backend availability check: if rp-cli/codex not in PATH, auto-fallback to "none" with warning. Fixes #7. P2 fixes (UX): - Slug max length 40→20 chars. "Django+React platform with account management" → "fn-3-django-react-plat" not 40-char monster. Fixes #2 #12. - Brainstorm auto-skip: trivial tasks (≤10 words, contains "fix"/"typo"/etc) skip brainstorm entirely. Fixes #6. - --interactive flag: pause at key decisions. Fixes #10. 370 tests pass. Zero new dependencies. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Complete Rust rewrite of flowctl from Python (~15,000 lines) to Rust (~26,000 lines) as a platform-grade development orchestration engine.
What's included
FLOWCTL_RUST=1env var for gradual opt-inDesign spec
docs/superpowers/specs/2026-04-04-flowctl-rust-platform-design.mdTest plan
cargo build --workspace— cleancargo test --workspace— 224 tests passcargo clippy --workspace— no warningsFLOWCTL_RUST=1 flowctl status --jsoncargo run --features tui -- tuicargo run --features daemon -- daemon start🤖 Generated with Claude Code