Skip to content

feat(flowctl): event-sourced pipeline-first architecture#19

Merged
z23cc merged 7 commits intomainfrom
fn-2-event-sourced-pipeline-first
Apr 7, 2026
Merged

feat(flowctl): event-sourced pipeline-first architecture#19
z23cc merged 7 commits intomainfrom
fn-2-event-sourced-pipeline-first

Conversation

@z23cc
Copy link
Copy Markdown
Owner

@z23cc z23cc commented Apr 7, 2026

Summary

  • Event Sourcing: FlowEvent enums (EpicEvent + TaskEvent) with typed payloads, event_store table with (stream_id, version) unique index, EventStoreRepo with append/query/rebuild, criterion benchmarks
  • Pipeline Phase System: PipelinePhase state machine (plan → plan-review → work → impl-review → close), flowctl phase next/done CLI commands, pipeline_progress table
  • Unified Entry Skill: flow-code-run SKILL.md with phase loop, 5 existing skills marked deprecated

Architecture reduced from 5 layers (command → skill → agent → flowctl → .flow) to 2.5 layers (Claude → flowctl pipeline → event store).

Test plan

  • cargo build succeeds
  • cargo test --all passes (350+ tests, pre-existing trycmd fixture failure excluded)
  • Serde round-trip tests for all event variants
  • Tolerant reader: unknown events deserialize to Unknown
  • Event store: append, query_stream, rebuild_stream verified
  • Pipeline phases: sequence enforcement, invalid transition rejection
  • Migration v5 runs cleanly on existing databases
  • Plan reviewed via RP (SHIP after 2 iterations)

🤖 Generated with Claude Code

z23cc and others added 7 commits April 7, 2026 17:17
…ks script [fn-140.2]

Add CI pipeline hardening:
- shellcheck step for scripts/*.sh, scripts/hooks/*.sh, flowctl/install.sh
- cargo-audit step for dependency vulnerability scanning
- JSON validation for hooks/hooks.json and .claude-plugin/plugin.json
- .shellcheckrc with shell=bash default
- scripts/setup-hooks.sh for idempotent pre-commit hook symlink setup
- Expanded CI path triggers to include scripts/ and hooks/ directories

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- CLAUDE.md: update crate count to 4 (add flowctl-service), fix skill count to 8+22
- README.md: bump version badge 0.1.27 → 0.1.31, fix broken CHANGELOG.md links → releases page
- docs/skills.md: add 6 missing extension skills, update count 16 → 22
- flowctl/README.md: add flowctl-service crate to architecture section
- Delete dead flowctl/tests/integration/compare_outputs.sh (Python flowctl removed in 577e9c7)
- Rename parity_test.rs → integration_test.rs (no longer parity tests)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…es [fn-140.4]

Split the ~2000-line smoke_test.sh into 12 focused test files under
scripts/tests/, each independently runnable with its own isolated
temp directory. All test assertions preserved exactly.

Files created:
- common.sh: shared setup (python detection, flowctl binary, colors, counters)
- run_all.sh: sequential test runner with per-file pass/fail tracking
- test_init.sh: idempotent init, config set/get, planSync config
- test_scheduling.sh: next plan/work/none, priority, artifact file resilience
- test_lifecycle.sh: plan_review_status, branch, set-title, block/validate,
  duration tracking, workspace_changes
- test_gaps.sh: gap add/resolve/check, idempotency, priority filtering
- test_memory.sh: memory init/add/list, verify, staleness, retro suggestion
- test_files.sh: file ownership map, conflict detection
- test_review.sh: parse_receipt_path, review-backend compare, archival,
  parse-findings
- test_domain.sh: domain tagging, epic archive/clean
- test_worker.sh: context hints, build_review_prompt, worker-prompt,
  worker-phase lifecycle
- test_misc.sh: schema validate, codex commands, depends_on_epics, stdin,
  set-spec, checkpoint, sync command files
- test_restart.sh: restart command, status --interrupted, auto-execute
- test_codex_e2e.sh: codex plan-review/impl-review e2e

smoke_test.sh is now a thin wrapper that delegates to run_all.sh.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ge [fn-140.5]

Add parallel `integration` CI job that builds flowctl and runs
scripts/tests/run_all.sh with git configured for test isolation.

Add four new Rust integration test files covering approval, log,
outputs, and doctor workflows — following existing temp-dir + JSON
assertion patterns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…0.3]

Add documentation for ~30 previously undocumented commands: doctor,
review-backend, dag, estimate, replay, diff, plan-depth, approval
(create/list/show/approve/reject), log (decision/decisions), outputs
(write/list/show), ralph (pause/resume/stop/status), scout-cache
(get/set/clear), skill (register/match), hook (7 subcommands), stats
(summary/epic/weekly/tokens/bottlenecks/dora/rollup/cleanup), files,
lock, unlock, heartbeat, lock-check, queue, fail, restart, dep rm,
task skip, task split, epic reopen/title/archive/clean/audit/add-dep/
rm-dep/auto-exec, export, import, completions.

Fix stale command names: epic set-plan → epic plan, epic
set-plan-review-status → epic review, epic set-completion-review-status
→ epic completion, epic set-branch → epic branch, task
set-description/set-acceptance/set-spec → task spec (unified).

Remove stale prep-chat entry (command removed from CLI).
Update Available Commands summary at top of file.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…[fn-140.6]

The rtk_rewrite hook previously ran `command -v rtk` on every Bash tool
call. For the 99% of users without rtk installed, this was a wasted
subprocess fork. Cache the probe result in $TMPDIR/flowctl-rtk-probe
with a 1-hour TTL so subsequent calls skip the probe entirely.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three pillars implemented:
1. Event Sourcing — FlowEvent enums (EpicEvent + TaskEvent) with typed
   payloads, event_store table with (stream_id, version) unique index,
   EventStoreRepo with append/query/rebuild, criterion benchmarks
2. Pipeline Phase System — PipelinePhase state machine (plan → plan-review
   → work → impl-review → close), `flowctl phase next/done` CLI commands,
   pipeline_progress table for per-epic phase tracking
3. Unified Entry Skill — flow-code-run SKILL.md with phase loop pattern,
   5 existing orchestration skills marked deprecated

Event emission is additive: lifecycle methods (start, done, block, fail,
restart) and ChangesApplier now emit typed events alongside existing
JSON/DB state mutations. Event store is authoritative; existing tables
serve as materialized read model.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 7, 2026 13:37
@z23cc z23cc merged commit b01918c into main Apr 7, 2026
1 of 3 checks passed
@z23cc z23cc deleted the fn-2-event-sourced-pipeline-first branch April 7, 2026 13:37
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an event-sourced, pipeline-first architecture to flowctl, adds an epic-level pipeline phase state machine with corresponding CLI commands, and consolidates the Claude Code workflow into a single /flow-code:run skill while deprecating older phase-specific skills. It also adds a new split shell-based integration test suite and expands CI linting/testing coverage.

Changes:

  • Add event sourcing primitives (FlowEvent), a persisted event_store with append/query/replay repo, and wire event emission into lifecycle and changes application paths.
  • Add an epic-level pipeline phase state machine (PipelinePhase) and new CLI commands (flowctl phase next/done, plus flowctl events).
  • Introduce /flow-code:run as the unified entry skill; mark legacy /flow-code:* phase skills deprecated; add shell smoke tests and CI steps.

Reviewed changes

Copilot reviewed 55 out of 57 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
skills/flow-code-work/SKILL.md Marks legacy work skill deprecated in favor of /flow-code:run.
skills/flow-code-run/SKILL.md Adds unified pipeline entry skill docs and phase-loop procedure.
skills/flow-code-plan/SKILL.md Marks legacy plan skill deprecated in favor of /flow-code:run.
skills/flow-code-plan-review/SKILL.md Marks legacy plan-review skill deprecated in favor of /flow-code:run.
skills/flow-code-impl-review/SKILL.md Marks legacy impl-review skill deprecated in favor of /flow-code:run.
skills/flow-code-epic-review/SKILL.md Marks legacy epic-review skill deprecated in favor of /flow-code:run.
scripts/tests/common.sh Shared setup harness for new split shell smoke tests.
scripts/tests/run_all.sh Runs all scripts/tests/test_*.sh sequentially and reports overall status.
scripts/tests/test_worker.sh Adds shell tests for context hints, worker prompt, and worker-phase behavior.
scripts/tests/test_scheduling.sh Adds shell tests for next scheduling, priority, and artifact file resilience.
scripts/tests/test_review.sh Adds shell tests for review receipts, compare mode, and findings parsing/registration.
scripts/tests/test_restart.sh Adds shell tests for restart, interrupted status, and auto-execute marker.
scripts/tests/test_misc.sh Adds shell tests for misc commands (schema validate, stdin, set-spec, checkpoint, sync).
scripts/tests/test_memory.sh Adds shell tests for memory init/add/list/verify and epic close retro suggestion.
scripts/tests/test_lifecycle.sh Adds shell tests for lifecycle and metadata fields (branch, review status, duration, evidence).
scripts/tests/test_init.sh Adds shell tests for init idempotency and config upgrade/set/get behavior.
scripts/tests/test_gaps.sh Adds shell tests for gap add/list/check/resolve idempotency and gating.
scripts/tests/test_files.sh Adds shell tests for file ownership mapping and conflict detection.
scripts/tests/test_domain.sh Adds shell tests for task domain tagging and epic archive/clean.
scripts/tests/test_codex_e2e.sh Adds optional Codex end-to-end tests when Codex CLI is available.
scripts/setup-hooks.sh Adds a helper to install a pre-commit hook via symlink.
README.md Updates badges/links (version badge now points to releases).
flowctl/tests/integration/compare_outputs.sh Removes legacy parity integration script (Python vs Rust).
flowctl/tests/cmd/validate_json.toml Updates trycmd expectations for validate.
flowctl/tests/cmd/next_json.toml Updates trycmd expectation for next output.
flowctl/README.md Updates architecture documentation to reflect new flowctl-service crate.
flowctl/crates/flowctl-service/src/lifecycle.rs Emits task lifecycle domain events into the event store (best-effort).
flowctl/crates/flowctl-service/src/changes.rs Adds optional event store emission for create mutations via ChangesApplier.
flowctl/crates/flowctl-db/src/schema.sql Adds event_store and pipeline_progress tables + stream/version unique index.
flowctl/crates/flowctl-db/src/repo/mod.rs Registers new EventStoreRepo module exports.
flowctl/crates/flowctl-db/src/repo/event_store.rs Implements async event store append/query/rebuild and prefix queries.
flowctl/crates/flowctl-db/src/pool.rs Updates schema table list assertions to include new tables.
flowctl/crates/flowctl-db/src/migration.rs Adds migration v5 to create new event store and pipeline progress tables.
flowctl/crates/flowctl-db/src/lib.rs Re-exports EventStoreRepo and StoredEvent from the db crate.
flowctl/crates/flowctl-db/Cargo.toml Adds criterion bench config for the new event store.
flowctl/crates/flowctl-db/benches/event_store.rs Adds criterion benchmarks for append/query performance.
flowctl/crates/flowctl-core/src/pipeline.rs Introduces PipelinePhase linear state machine + tests.
flowctl/crates/flowctl-core/src/lib.rs Exposes new events and pipeline modules and re-exports PipelinePhase.
flowctl/crates/flowctl-core/src/events.rs Adds EpicEvent/TaskEvent + FlowEvent wrapper and metadata + helpers.
flowctl/crates/flowctl-cli/tests/outputs_test.rs Adds CLI integration tests for outputs write/list/show workflows.
flowctl/crates/flowctl-cli/tests/log_test.rs Adds CLI integration tests for decision log commands.
flowctl/crates/flowctl-cli/tests/integration_test.rs Adds/updates Rust-only integration tests (replacing parity tests).
flowctl/crates/flowctl-cli/tests/doctor_test.rs Adds CLI integration tests for doctor command behavior.
flowctl/crates/flowctl-cli/tests/approval_test.rs Adds CLI integration tests for approval workflows.
flowctl/crates/flowctl-cli/src/main.rs Adds phase and events top-level commands wiring.
flowctl/crates/flowctl-cli/src/commands/workflow/pipeline_phase.rs Implements flowctl phase next/done backed by pipeline_progress.
flowctl/crates/flowctl-cli/src/commands/workflow/mod.rs Re-exports pipeline phase dispatch and adds cmd_events export.
flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs Adds flowctl events --epic command to query event store history.
flowctl/crates/flowctl-cli/src/commands/hook/rtk_rewrite.rs Adds RTK availability caching for the RTK rewrite hook.
flowctl/Cargo.lock Updates lockfile for new deps (criterion and transitive crates).
docs/skills.md Updates skill catalog counts and adds new extension skills.
docs/flowctl.md Updates CLI reference command list and revises command docs.
commands/flow-code/run.md Adds new /flow-code:run command stub that delegates to skill.
CLAUDE.md Documents unified workflow entry, updated crate structure, and deprecation notes.
.shellcheckrc Configures shellcheck to treat scripts as bash.
.github/workflows/ci.yml Expands CI scope (scripts/hooks), adds shellcheck + JSON validation, adds shell integration job.
Comments suppressed due to low confidence (1)

flowctl/crates/flowctl-service/src/changes.rs:82

  • ChangesApplier::apply emits the domain event before apply_one. If applying the mutation fails (I/O error, partial batch, etc.), the event store can record a Created event for an epic/task that was never actually written. Emitting after the mutation succeeds (or within the same transaction boundary if possible) would keep the event stream consistent with materialized state.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +104 to +114
/// `flowctl phase next --epic <id> --json`
fn cmd_phase_next(json: bool, epic_id: &str) {
let current = get_or_init_phase(epic_id);
let all_done = current.is_terminal();

if json {
json_output(json!({
"phase": current.as_str(),
"prompt": current.prompt_template(),
"all_done": all_done,
}));
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd_phase_next sets all_done based on current.is_terminal(). Since Close is modeled as terminal, this makes phase next report all_done=true as soon as the pipeline reaches close, which causes callers to stop before running the close actions. Consider reporting all_done only after close has been completed (e.g., add an explicit completed state/flag, or allow phase done on close and set all_done=true afterwards).

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +160
if current.is_terminal() {
error_exit("Pipeline is already at the terminal phase (close). No further advancement.");
}

let next_phase = current.next().expect("non-terminal phase has a next");
update_phase(epic_id, &next_phase);

if json {
json_output(json!({
"previous_phase": current.as_str(),
"phase": next_phase.as_str(),
"prompt": next_phase.prompt_template(),
"all_done": next_phase.is_terminal(),
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd_phase_done rejects current.is_terminal() (close), so there is no way to mark the close phase done / advance to a completed state. This conflicts with the phase loop contract and makes it impossible to record evidence for close. Consider allowing done for close (and persisting a completed marker), or adding a final completed phase after close.

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +78
/// Read current pipeline phase from DB. If no row exists, initialize to Plan.
fn get_or_init_phase(epic_id: &str) -> PipelinePhase {
let conn = require_db();
let raw = conn.inner_conn();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");

rt.block_on(async {
let mut rows = raw
.query(
"SELECT phase FROM pipeline_progress WHERE epic_id = ?1",
libsql::params![epic_id],
)
.await
.unwrap_or_else(|e| {
error_exit(&format!("DB query failed: {e}"));
});

if let Some(row) = rows.next().await.unwrap_or(None) {
let phase_str: String = row.get(0).unwrap_or_else(|_| "plan".to_string());
PipelinePhase::parse(&phase_str).unwrap_or(PipelinePhase::Plan)
} else {
// No row — initialize with Plan phase.
let now = chrono::Utc::now().to_rfc3339();
raw.execute(
"INSERT INTO pipeline_progress (epic_id, phase, started_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
libsql::params![epic_id, "plan", now.clone(), now],
)
.await
.unwrap_or_else(|e| {
error_exit(&format!("DB insert failed: {e}"));
});
PipelinePhase::Plan
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_or_init_phase will insert a pipeline_progress row for any arbitrary --epic string, even if that epic doesn't exist. This can create orphan progress rows and hide user typos. Suggest validating the epic exists before initializing pipeline progress (or returning a clear error if it doesn't).

Copilot uses AI. Check for mistakes.
Comment on lines +915 to +917
// Event store — emit Started for the restarted task
emit_task_event(conn, &req.task_id, TaskEvent::Started, "flowctl restart").await;

Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restart_task emits TaskEvent::Started even though the task is being reset back to todo (and downstream tasks are reset too). This makes the event stream misleading for consumers (it looks like a start transition happened). Consider introducing a dedicated restart/reset event (e.g., TaskEvent::Restarted/Reset) and/or emitting events for all reset task IDs.

Copilot uses AI. Check for mistakes.
Comment on lines +15 to +51
/// Returns the path for the RTK probe cache file.
fn rtk_probe_cache_path() -> PathBuf {
let tmp = std::env::var("TMPDIR").unwrap_or_else(|_| "/tmp".into());
PathBuf::from(tmp).join("flowctl-rtk-probe")
}

/// Check if rtk is available, using a file-based cache to avoid repeated `command -v rtk` calls.
/// Returns true if rtk is installed and available.
fn is_rtk_available() -> bool {
let cache_path = rtk_probe_cache_path();

// Check cache: if file exists and is fresh, use cached result
if let Ok(metadata) = fs::metadata(&cache_path) {
let is_fresh = metadata
.modified()
.ok()
.and_then(|mtime| SystemTime::now().duration_since(mtime).ok())
.map(|age| age.as_secs() < RTK_PROBE_CACHE_TTL_SECS)
.unwrap_or(false);

if is_fresh {
if let Ok(content) = fs::read_to_string(&cache_path) {
return content.trim() == "found";
}
}
}

// Cache miss or stale — probe for rtk
let available = Command::new("sh")
.args(["-c", "command -v rtk"])
.output()
.map(|o| o.status.success())
.unwrap_or(false);

// Write result to cache (best-effort)
let _ = fs::write(&cache_path, if available { "found" } else { "not-found" });

Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RTK probe cache uses a predictable filename in $TMPDIR//tmp and writes with fs::write. On multi-user systems this is susceptible to symlink attacks (an attacker can pre-create flowctl-rtk-probe as a symlink and trick the hook into overwriting arbitrary files), and cache poisoning across users. Prefer a user-specific cache directory (e.g., XDG cache) and write via an atomic create-new temp file + rename, or use a secure tempfile API that avoids following symlinks.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 5
bin.name = "flowctl"
args = ["--json", "next"]
stdout = """
{"epic":"fn-22-improve-flowctl-code-quality","reason":"ready_task","status":"work","task":"fn-22-improve-flowctl-code-quality.1"}
{"epic":"fn-2-event-sourced-pipeline-first","reason":"resume_in_progress","status":"work","task":"fn-2-event-sourced-pipeline-first.4"}
"""
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next_json.toml asserts a specific epic/task ID and reason, which makes this trycmd test dependent on the repository's .flow state and likely brittle across environments. Consider setting up the required .flow state within the test case (trycmd fs fixture) and/or loosening the assertion to stdout = "..." / key-based matching.

Copilot uses AI. Check for mistakes.
Comment on lines +157 to +170
# Verify suggested command is included
int_suggested="$(echo "$int_json" | "$PYTHON_BIN" -c '
import json, sys
data = json.load(sys.stdin)
epics = data.get("interrupted", [])
matching = [e for e in epics if e["id"] == "'"$EPIC_INT"'"]
print(matching[0].get("suggested", "") if matching else "")
')"
if [[ "$int_suggested" == "/flow-code:work $EPIC_INT" ]]; then
echo -e "${GREEN}✓${NC} status --interrupted includes suggested resume command"
PASS=$((PASS + 1))
else
echo -e "${RED}✗${NC} status --interrupted wrong suggested (got: $int_suggested)"
FAIL=$((FAIL + 1))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts the suggested resume command is /flow-code:work ..., but flow-code-work is now marked user-invocable: false and deprecated in this PR. If the CLI still suggests /flow-code:work, users won't be able to invoke it. Suggest updating the suggestion (and this assertion) to /flow-code:run <epic> or another user-invocable resume command.

Copilot uses AI. Check for mistakes.
Comment on lines +34 to +90
/// Append an event to a stream. Auto-increments the version via
/// `SELECT MAX(version)+1`. Returns the assigned version number.
///
/// Uses `INSERT OR FAIL` so a concurrent append that races on the same
/// version will fail with a constraint error rather than silently
/// overwriting.
pub async fn append(
&self,
stream_id: &str,
event: &FlowEvent,
metadata: &EventMetadata,
) -> Result<u64, DbError> {
// Determine the next version for this stream.
let mut rows = self
.conn
.query(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE stream_id = ?1",
params![stream_id.to_string()],
)
.await?;
let next_version: i64 = match rows.next().await? {
Some(row) => row.get::<i64>(0)? + 1,
None => 1,
};

let event_type = event_type_label(event);
let payload_json = serde_json::to_string(event)?;
let metadata_json = serde_json::to_string(metadata)?;

let result = self
.conn
.execute(
"INSERT OR FAIL INTO event_store (stream_id, version, event_type, payload, metadata)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
stream_id.to_string(),
next_version,
event_type,
payload_json,
metadata_json
],
)
.await;

match result {
Ok(_) => Ok(next_version as u64),
Err(e) => {
let msg = e.to_string();
if msg.contains("UNIQUE constraint failed") || msg.contains("constraint") {
Err(DbError::Constraint(format!(
"concurrency conflict: version {next_version} already exists for stream {stream_id}"
)))
} else {
Err(DbError::LibSql(e))
}
}
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventStoreRepo::append computes next_version via SELECT MAX(version)+1 and then inserts. Without a transaction/retry loop, concurrent appends to the same stream will frequently race and drop events (especially since many callers ignore append errors). Consider an atomic INSERT pattern (e.g., INSERT ... SELECT COALESCE(MAX(version),0)+1 ... inside a transaction) and/or a small retry-on-conflict loop to improve reliability.

Suggested change
/// Append an event to a stream. Auto-increments the version via
/// `SELECT MAX(version)+1`. Returns the assigned version number.
///
/// Uses `INSERT OR FAIL` so a concurrent append that races on the same
/// version will fail with a constraint error rather than silently
/// overwriting.
pub async fn append(
&self,
stream_id: &str,
event: &FlowEvent,
metadata: &EventMetadata,
) -> Result<u64, DbError> {
// Determine the next version for this stream.
let mut rows = self
.conn
.query(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE stream_id = ?1",
params![stream_id.to_string()],
)
.await?;
let next_version: i64 = match rows.next().await? {
Some(row) => row.get::<i64>(0)? + 1,
None => 1,
};
let event_type = event_type_label(event);
let payload_json = serde_json::to_string(event)?;
let metadata_json = serde_json::to_string(metadata)?;
let result = self
.conn
.execute(
"INSERT OR FAIL INTO event_store (stream_id, version, event_type, payload, metadata)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
stream_id.to_string(),
next_version,
event_type,
payload_json,
metadata_json
],
)
.await;
match result {
Ok(_) => Ok(next_version as u64),
Err(e) => {
let msg = e.to_string();
if msg.contains("UNIQUE constraint failed") || msg.contains("constraint") {
Err(DbError::Constraint(format!(
"concurrency conflict: version {next_version} already exists for stream {stream_id}"
)))
} else {
Err(DbError::LibSql(e))
}
}
}
/// Append an event to a stream and return the assigned version number.
///
/// Version assignment is derived from the current max stream version and
/// inserted with `INSERT OR FAIL`. To make concurrent appends more
/// reliable, constraint conflicts are retried a small number of times so
/// a racing writer can re-read the latest version and append at the next
/// slot instead of immediately failing.
pub async fn append(
&self,
stream_id: &str,
event: &FlowEvent,
metadata: &EventMetadata,
) -> Result<u64, DbError> {
const MAX_APPEND_RETRIES: usize = 3;
let event_type = event_type_label(event);
let payload_json = serde_json::to_string(event)?;
let metadata_json = serde_json::to_string(metadata)?;
for attempt in 0..=MAX_APPEND_RETRIES {
// Determine the next version for this stream.
let mut rows = self
.conn
.query(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE stream_id = ?1",
params![stream_id.to_string()],
)
.await?;
let next_version: i64 = match rows.next().await? {
Some(row) => row.get::<i64>(0)? + 1,
None => 1,
};
let result = self
.conn
.execute(
"INSERT OR FAIL INTO event_store (stream_id, version, event_type, payload, metadata)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
stream_id.to_string(),
next_version,
event_type,
payload_json,
metadata_json
],
)
.await;
match result {
Ok(_) => return Ok(next_version as u64),
Err(e) => {
let msg = e.to_string();
let is_constraint =
msg.contains("UNIQUE constraint failed") || msg.contains("constraint");
if is_constraint && attempt < MAX_APPEND_RETRIES {
continue;
}
if is_constraint {
return Err(DbError::Constraint(format!(
"concurrency conflict after {} retries while appending to stream {}",
MAX_APPEND_RETRIES, stream_id
)));
} else {
return Err(DbError::LibSql(e));
}
}
}
}
Err(DbError::Constraint(format!(
"concurrency conflict after {} retries while appending to stream {}",
MAX_APPEND_RETRIES, stream_id
)))

Copilot uses AI. Check for mistakes.
Comment on lines +12 to +13
config, epic, task, dep, approval, gap, log, memory, outputs, checkpoint,
stack, invariants, ralph, scout-cache, skill, rp, codex, hook, stats, worker-phase,
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Available Commands" list doesn't include the newly added phase and events commands, so the reference is now incomplete. Consider adding them to this list (and keeping ordering consistent with flowctl --help).

Suggested change
config, epic, task, dep, approval, gap, log, memory, outputs, checkpoint,
stack, invariants, ralph, scout-cache, skill, rp, codex, hook, stats, worker-phase,
config, epic, events, task, dep, approval, gap, log, memory, outputs, checkpoint,
stack, invariants, ralph, scout-cache, skill, rp, codex, hook, stats, phase, worker-phase,

Copilot uses AI. Check for mistakes.
Comment on lines +2 to +6
name: flow-code-run
description: Unified entry point for plan-first development. Manages the entire pipeline (plan, plan-review, work, impl-review, close) via flowctl phase commands.
user-invocable: true
---

Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This skill describes the pipeline phases as plan-review / impl-review in some places, but the actual flowctl phase command uses snake_case phase names (plan_review, impl_review). To avoid confusion (and mismatched --phase values if a user follows the prose), consider standardizing all references to the exact snake_case identifiers returned by flowctl phase next.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants