From 00f927800abfb9c93141b56c574332d9a38b4e47 Mon Sep 17 00:00:00 2001 From: z23cc Date: Tue, 7 Apr 2026 23:46:25 +0800 Subject: [PATCH] refactor(flowctl): merge 4 crates into 2 (core + cli) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Delete flowctl-db and flowctl-service crates entirely: - FlowStore was a thin wrapper around json_store — callers use json_store directly now - lifecycle.rs, ChangesApplier, FileApprovalStore, OutputsStore moved into flowctl-core - ServiceError merged into flowctl-core::error Result: cli → core (zero intermediate crates) 284 tests pass, clean build. Co-Authored-By: Claude Opus 4.6 (1M context) --- flowctl/Cargo.lock | 27 --- flowctl/Cargo.toml | 4 - flowctl/crates/flowctl-cli/Cargo.toml | 4 - .../flowctl-cli/src/commands/admin/init.rs | 3 +- .../flowctl-cli/src/commands/admin/status.rs | 3 +- .../flowctl-cli/src/commands/approval.rs | 5 +- .../crates/flowctl-cli/src/commands/gap.rs | 30 ++- .../flowctl-cli/src/commands/helpers.rs | 2 +- .../crates/flowctl-cli/src/commands/log.rs | 8 +- .../flowctl-cli/src/commands/outputs.rs | 4 +- .../crates/flowctl-cli/src/commands/query.rs | 75 +++---- .../crates/flowctl-cli/src/commands/stats.rs | 3 +- .../src/commands/workflow/lifecycle.rs | 18 +- .../src/commands/workflow/phase.rs | 6 +- .../src/commands/workflow/pipeline_phase.rs | 12 +- flowctl/crates/flowctl-core/src/approvals.rs | 150 ++++++++++++++ flowctl/crates/flowctl-core/src/changes.rs | 141 +++++++++++++ flowctl/crates/flowctl-core/src/error.rs | 45 +++++ flowctl/crates/flowctl-core/src/lib.rs | 8 +- .../src/lifecycle.rs | 62 +++--- flowctl/crates/flowctl-core/src/outputs.rs | 179 ++++++++++++++++- flowctl/crates/flowctl-db/Cargo.toml | 19 -- flowctl/crates/flowctl-db/src/approvals.rs | 52 ----- flowctl/crates/flowctl-db/src/error.rs | 51 ----- flowctl/crates/flowctl-db/src/events.rs | 63 ------ flowctl/crates/flowctl-db/src/gaps.rs | 68 ------- flowctl/crates/flowctl-db/src/lib.rs | 35 ---- flowctl/crates/flowctl-db/src/locks.rs | 122 ------------ flowctl/crates/flowctl-db/src/memory.rs | 68 ------- flowctl/crates/flowctl-db/src/phases.rs | 70 ------- flowctl/crates/flowctl-db/src/pipeline.rs | 52 ----- flowctl/crates/flowctl-db/src/store.rs | 104 ---------- flowctl/crates/flowctl-service/Cargo.toml | 22 --- .../crates/flowctl-service/src/approvals.rs | 153 --------------- flowctl/crates/flowctl-service/src/changes.rs | 148 -------------- flowctl/crates/flowctl-service/src/error.rs | 50 ----- flowctl/crates/flowctl-service/src/lib.rs | 25 --- flowctl/crates/flowctl-service/src/outputs.rs | 185 ------------------ 38 files changed, 619 insertions(+), 1457 deletions(-) rename flowctl/crates/{flowctl-service => flowctl-core}/src/lifecycle.rs (94%) delete mode 100644 flowctl/crates/flowctl-db/Cargo.toml delete mode 100644 flowctl/crates/flowctl-db/src/approvals.rs delete mode 100644 flowctl/crates/flowctl-db/src/error.rs delete mode 100644 flowctl/crates/flowctl-db/src/events.rs delete mode 100644 flowctl/crates/flowctl-db/src/gaps.rs delete mode 100644 flowctl/crates/flowctl-db/src/lib.rs delete mode 100644 flowctl/crates/flowctl-db/src/locks.rs delete mode 100644 flowctl/crates/flowctl-db/src/memory.rs delete mode 100644 flowctl/crates/flowctl-db/src/phases.rs delete mode 100644 flowctl/crates/flowctl-db/src/pipeline.rs delete mode 100644 flowctl/crates/flowctl-db/src/store.rs delete mode 100644 flowctl/crates/flowctl-service/Cargo.toml delete mode 100644 flowctl/crates/flowctl-service/src/approvals.rs delete mode 100644 flowctl/crates/flowctl-service/src/changes.rs delete mode 100644 flowctl/crates/flowctl-service/src/error.rs delete mode 100644 flowctl/crates/flowctl-service/src/lib.rs delete mode 100644 flowctl/crates/flowctl-service/src/outputs.rs diff --git a/flowctl/Cargo.lock b/flowctl/Cargo.lock index 2c710c0d..b011fded 100644 --- a/flowctl/Cargo.lock +++ b/flowctl/Cargo.lock @@ -450,8 +450,6 @@ dependencies = [ "clap", "clap_complete", "flowctl-core", - "flowctl-db", - "flowctl-service", "miette", "regex", "serde", @@ -478,31 +476,6 @@ dependencies = [ "toml", ] -[[package]] -name = "flowctl-db" -version = "0.1.0" -dependencies = [ - "chrono", - "flowctl-core", - "serde", - "serde_json", - "tempfile", -] - -[[package]] -name = "flowctl-service" -version = "0.1.0" -dependencies = [ - "chrono", - "flowctl-core", - "flowctl-db", - "serde", - "serde_json", - "tempfile", - "thiserror", - "tracing", -] - [[package]] name = "foldhash" version = "0.1.5" diff --git a/flowctl/Cargo.toml b/flowctl/Cargo.toml index 751f8b40..83ce1946 100644 --- a/flowctl/Cargo.toml +++ b/flowctl/Cargo.toml @@ -2,8 +2,6 @@ resolver = "2" members = [ "crates/flowctl-core", - "crates/flowctl-db", - "crates/flowctl-service", "crates/flowctl-cli", ] @@ -56,8 +54,6 @@ trycmd = "0.15" # Internal crate references flowctl-core = { path = "crates/flowctl-core" } -flowctl-db = { path = "crates/flowctl-db" } -flowctl-service = { path = "crates/flowctl-service" } # ── Shared lint configuration ──────────────────────────────────────── [workspace.lints.clippy] diff --git a/flowctl/crates/flowctl-cli/Cargo.toml b/flowctl/crates/flowctl-cli/Cargo.toml index f5f24607..ef738c34 100644 --- a/flowctl/crates/flowctl-cli/Cargo.toml +++ b/flowctl/crates/flowctl-cli/Cargo.toml @@ -12,8 +12,6 @@ path = "src/main.rs" [dependencies] flowctl-core = { workspace = true } -flowctl-db = { workspace = true } -flowctl-service = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } anyhow = { workspace = true } @@ -34,5 +32,3 @@ trycmd = { workspace = true } tempfile = "3" serde_json = { workspace = true } flowctl-core = { workspace = true } -flowctl-db = { workspace = true } -flowctl-service = { workspace = true } diff --git a/flowctl/crates/flowctl-cli/src/commands/admin/init.rs b/flowctl/crates/flowctl-cli/src/commands/admin/init.rs index 4c5465f1..5f301c01 100644 --- a/flowctl/crates/flowctl-cli/src/commands/admin/init.rs +++ b/flowctl/crates/flowctl-cli/src/commands/admin/init.rs @@ -85,8 +85,7 @@ pub fn cmd_init(json: bool) { } // Ensure FlowStore dirs are ready - let store = flowctl_db::FlowStore::new(flow_dir.clone()); - if let Err(e) = store.ensure_dirs() { + if let Err(e) = flowctl_core::json_store::ensure_dirs(&flow_dir) { eprintln!("warning: failed to ensure store dirs: {e}"); } diff --git a/flowctl/crates/flowctl-cli/src/commands/admin/status.rs b/flowctl/crates/flowctl-cli/src/commands/admin/status.rs index 1d0c09c0..da16da75 100644 --- a/flowctl/crates/flowctl-cli/src/commands/admin/status.rs +++ b/flowctl/crates/flowctl-cli/src/commands/admin/status.rs @@ -726,8 +726,7 @@ pub fn cmd_doctor(json_mode: bool, workflow: bool) { // Check 7: stale file locks { - let store = flowctl_db::FlowStore::new(flow_dir.clone()); - match store.locks().list() { + match flowctl_core::json_store::locks_read(&flow_dir) { Ok(locks) if !locks.is_empty() => { checks.push(json!({"name": "stale_locks", "status": "warn", "message": format!("{} file lock(s) active — verify with 'flowctl lock-check'", locks.len())})); } diff --git a/flowctl/crates/flowctl-cli/src/commands/approval.rs b/flowctl/crates/flowctl-cli/src/commands/approval.rs index fe5fec13..005a2f8e 100644 --- a/flowctl/crates/flowctl-cli/src/commands/approval.rs +++ b/flowctl/crates/flowctl-cli/src/commands/approval.rs @@ -8,7 +8,7 @@ use clap::Subcommand; use serde_json::Value; use flowctl_core::approvals::{ApprovalKind, ApprovalStatus, CreateApprovalRequest}; -use flowctl_service::approvals::FileApprovalStore; +use flowctl_core::approvals::FileApprovalStore; use crate::output::{error_exit, json_output}; @@ -80,8 +80,7 @@ pub fn dispatch(cmd: &ApprovalCmd, json: bool) { fn open_local_store() -> FileApprovalStore { let flow_dir = get_flow_dir(); - let store = flowctl_db::FlowStore::new(flow_dir); - FileApprovalStore::new(store) + FileApprovalStore::new(flow_dir) } // ── Payload parsing ───────────────────────────────────────────────── diff --git a/flowctl/crates/flowctl-cli/src/commands/gap.rs b/flowctl/crates/flowctl-cli/src/commands/gap.rs index 9d7d7a57..e2084823 100644 --- a/flowctl/crates/flowctl-cli/src/commands/gap.rs +++ b/flowctl/crates/flowctl-cli/src/commands/gap.rs @@ -10,7 +10,7 @@ use serde_json::json; use crate::output::{error_exit, json_output, pretty_output}; use flowctl_core::id::is_epic_id; -use flowctl_db::{FlowStore, GapEntry}; +use flowctl_core::json_store::{self, GapEntry}; use super::helpers::get_flow_dir; @@ -112,8 +112,8 @@ fn validate_epic(_json: bool, epic_id: &str) { error_exit(&format!("Epic not found: {}", epic_id)); } -fn gap_store() -> FlowStore { - FlowStore::new(get_flow_dir()) +fn gap_flow_dir() -> std::path::PathBuf { + get_flow_dir() } // ── Commands ─────────────────────────────────────────────────────── @@ -126,10 +126,9 @@ fn cmd_gap_add( source: &str, ) { validate_epic(json_mode, epic_id); - let store = gap_store(); - let gap_store = store.gaps(); + let flow_dir = gap_flow_dir(); - let mut gaps = gap_store.read(epic_id).unwrap_or_default(); + let mut gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default(); // Check for existing gap with same capability (idempotent) let cap_lower = capability.trim().to_lowercase(); @@ -166,7 +165,7 @@ fn cmd_gap_add( resolved: false, }); - if let Err(e) = gap_store.write(epic_id, &gaps) { + if let Err(e) = json_store::gaps_write(&flow_dir, epic_id, &gaps) { error_exit(&format!("Failed to add gap: {e}")); } @@ -190,8 +189,8 @@ fn cmd_gap_add( fn cmd_gap_list(json_mode: bool, epic_id: &str, status_filter: Option<&str>) { validate_epic(json_mode, epic_id); - let store = gap_store(); - let gaps = store.gaps().read(epic_id).unwrap_or_default(); + let flow_dir = gap_flow_dir(); + let gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default(); let filtered: Vec<&GapEntry> = gaps.iter().filter(|g| { match status_filter { @@ -244,9 +243,8 @@ fn cmd_gap_resolve( _evidence: &str, ) { validate_epic(json_mode, epic_id); - let store = gap_store(); - let gap_st = store.gaps(); - let mut gaps = gap_st.read(epic_id).unwrap_or_default(); + let flow_dir = gap_flow_dir(); + let mut gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default(); if let Some(direct_id) = gap_id_direct { let gap_id: u32 = direct_id @@ -259,7 +257,7 @@ fn cmd_gap_resolve( error_exit(&format!("Gap {} not found", gap_id)); } - gap_st.write(epic_id, &gaps).unwrap_or_else(|e| { + json_store::gaps_write(&flow_dir, epic_id, &gaps).unwrap_or_else(|e| { error_exit(&format!("Failed to resolve gap: {e}")); }); @@ -281,7 +279,7 @@ fn cmd_gap_resolve( error_exit(&format!("Gap for capability '{}' not found", cap)); } - gap_st.write(epic_id, &gaps).unwrap_or_else(|e| { + json_store::gaps_write(&flow_dir, epic_id, &gaps).unwrap_or_else(|e| { error_exit(&format!("Failed to resolve gap: {e}")); }); @@ -301,8 +299,8 @@ fn cmd_gap_resolve( fn cmd_gap_check(json_mode: bool, epic_id: &str) { validate_epic(json_mode, epic_id); - let store = gap_store(); - let all_gaps = store.gaps().read(epic_id).unwrap_or_default(); + let flow_dir = gap_flow_dir(); + let all_gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default(); let open_blocking: Vec<&GapEntry> = all_gaps .iter() diff --git a/flowctl/crates/flowctl-cli/src/commands/helpers.rs b/flowctl/crates/flowctl-cli/src/commands/helpers.rs index 26d9b063..237e3d75 100644 --- a/flowctl/crates/flowctl-cli/src/commands/helpers.rs +++ b/flowctl/crates/flowctl-cli/src/commands/helpers.rs @@ -149,7 +149,7 @@ fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<(), std::io::Error> { /// Returns the number of mutations applied. Calls `error_exit` on failure. pub fn apply_changes(flow_dir: &Path, changes: &flowctl_core::changes::Changes) -> usize { use crate::output::error_exit; - use flowctl_service::changes::ChangesApplier; + use flowctl_core::changes::ChangesApplier; if changes.is_empty() { return 0; diff --git a/flowctl/crates/flowctl-cli/src/commands/log.rs b/flowctl/crates/flowctl-cli/src/commands/log.rs index 9d270628..71a6c88f 100644 --- a/flowctl/crates/flowctl-cli/src/commands/log.rs +++ b/flowctl/crates/flowctl-cli/src/commands/log.rs @@ -10,7 +10,7 @@ use serde_json::json; use crate::output::{error_exit, json_output, pretty_output}; use super::helpers::get_flow_dir; -use flowctl_db::FlowStore; +use flowctl_core::json_store; #[derive(Subcommand, Debug)] pub enum LogCmd { @@ -65,7 +65,6 @@ fn cmd_log_decision( task_id: Option<&str>, ) { let flow_dir = get_flow_dir(); - let store = FlowStore::new(flow_dir); let epic = epic_id.unwrap_or("_global"); @@ -80,7 +79,7 @@ fn cmd_log_decision( "timestamp": chrono::Utc::now().to_rfc3339(), }); - if let Err(e) = store.events().append(&event.to_string()) { + if let Err(e) = json_store::events_append(&flow_dir, &event.to_string()) { error_exit(&format!("Failed to log decision: {e}")); } @@ -100,9 +99,8 @@ fn cmd_log_decision( fn cmd_log_decisions(json_mode: bool, epic_id: Option<&str>, limit: usize) { let flow_dir = get_flow_dir(); - let store = FlowStore::new(flow_dir); - let all_lines = store.events().read_all().unwrap_or_else(|e| { + let all_lines = json_store::events_read_all(&flow_dir).unwrap_or_else(|e| { error_exit(&format!("Failed to query decisions: {e}")); }); diff --git a/flowctl/crates/flowctl-cli/src/commands/outputs.rs b/flowctl/crates/flowctl-cli/src/commands/outputs.rs index a759489d..0f7f5940 100644 --- a/flowctl/crates/flowctl-cli/src/commands/outputs.rs +++ b/flowctl/crates/flowctl-cli/src/commands/outputs.rs @@ -1,6 +1,6 @@ //! Outputs commands: write, list, show. //! -//! Thin CLI wrapper over `flowctl_service::outputs::OutputsStore`. Provides +//! Thin CLI wrapper over `flowctl_core::outputs::OutputsStore`. Provides //! a lightweight narrative handoff layer at `.flow/outputs/.md` that //! workers populate in Phase 9 and read during Phase 2 re-anchor. @@ -11,7 +11,7 @@ use clap::Subcommand; use serde_json::json; use flowctl_core::id::is_task_id; -use flowctl_service::outputs::OutputsStore; +use flowctl_core::outputs::OutputsStore; use crate::output::{error_exit, json_output}; diff --git a/flowctl/crates/flowctl-cli/src/commands/query.rs b/flowctl/crates/flowctl-cli/src/commands/query.rs index 5b78c571..85a1791f 100644 --- a/flowctl/crates/flowctl-cli/src/commands/query.rs +++ b/flowctl/crates/flowctl-cli/src/commands/query.rs @@ -599,21 +599,21 @@ pub fn cmd_lock(json: bool, task: String, files: String, mode: String) { error_exit("No files specified for locking."); } - let store = flowctl_db::FlowStore::new(flow_dir); - let lock_store = store.locks(); - let mut locked = Vec::new(); let mut already_locked = Vec::new(); for file in &file_list { - match lock_store.acquire(file, &task, &mode) { - Ok(()) => locked.push(file.to_string()), - Err(flowctl_db::DbError::Constraint(msg)) => { - let holder = lock_store.check(file).ok().flatten().unwrap_or_default(); - already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder)], "detail": msg})); - } - Err(e) => { - error_exit(&format!("Failed to lock {}: {}", file, e)); + // Check for conflict: another task holding the file. + let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default(); + let conflict = locks.iter().find(|l| l.file_path == *file && l.task_id != task); + if let Some(holder) = conflict { + already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)})); + } else { + match flowctl_core::json_store::lock_acquire(&flow_dir, file, &task, &mode) { + Ok(()) => locked.push(file.to_string()), + Err(e) => { + error_exit(&format!("Failed to lock {}: {}", file, e)); + } } } } @@ -641,11 +641,8 @@ pub fn cmd_lock(json: bool, task: String, files: String, mode: String) { pub fn cmd_unlock(json: bool, task: Option, _files: Option, all: bool) { let flow_dir = ensure_flow_exists(); - let store = flowctl_db::FlowStore::new(flow_dir); - let lock_store = store.locks(); - if all { - match lock_store.release_all() { + match flowctl_core::json_store::locks_clear(&flow_dir) { Ok(count) => { if json { json_output(json!({ @@ -668,7 +665,7 @@ pub fn cmd_unlock(json: bool, task: Option, _files: Option, all: } }; - match lock_store.release_for_task(&task_id) { + match flowctl_core::json_store::lock_release_task(&flow_dir, &task_id) { Ok(count) => { if json { json_output(json!({ @@ -686,39 +683,31 @@ pub fn cmd_unlock(json: bool, task: Option, _files: Option, all: pub fn cmd_lock_check(json: bool, file: Option) { let flow_dir = ensure_flow_exists(); - let store = flowctl_db::FlowStore::new(flow_dir); - let lock_store = store.locks(); - match file { Some(f) => { - match lock_store.check(&f) { - Ok(Some(task_id)) => { - if json { - json_output(json!({ - "file": f, - "locked": true, - "locks": [{"task_id": task_id, "mode": "write"}], - })); - } else { - println!("{}: locked by {}", f, task_id); - } - } - Ok(None) => { - if json { - json_output(json!({ - "file": f, - "locked": false, - })); - } else { - println!("{}: not locked", f); - } + let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default(); + let holder = locks.iter().find(|l| l.file_path == f).map(|l| l.task_id.clone()); + if let Some(task_id) = holder { + if json { + json_output(json!({ + "file": f, + "locked": true, + "locks": [{"task_id": task_id, "mode": "write"}], + })); + } else { + println!("{}: locked by {}", f, task_id); } - Err(e) => error_exit(&format!("Failed to check lock: {}", e)), + } else if json { + json_output(json!({ + "file": f, + "locked": false, + })); + } else { + println!("{}: not locked", f); } } None => { - let entries = lock_store - .list() + let entries = flowctl_core::json_store::locks_read(&flow_dir) .unwrap_or_else(|e| { error_exit(&format!("Query failed: {}", e)); }); let locks: Vec = entries .into_iter() diff --git a/flowctl/crates/flowctl-cli/src/commands/stats.rs b/flowctl/crates/flowctl-cli/src/commands/stats.rs index 7ad74dd1..9486d51f 100644 --- a/flowctl/crates/flowctl-cli/src/commands/stats.rs +++ b/flowctl/crates/flowctl-cli/src/commands/stats.rs @@ -89,8 +89,7 @@ fn cmd_summary(json_flag: bool) { } } - let store = flowctl_db::FlowStore::new(flow_dir); - let total_events = store.events().read_all().map(|v| v.len() as i64).unwrap_or(0); + let total_events = flowctl_core::json_store::events_read_all(&flow_dir).map(|v| v.len() as i64).unwrap_or(0); if should_json(json_flag) { json_output(json!({ diff --git a/flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs b/flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs index 2d89b7b8..7678e518 100644 --- a/flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs +++ b/flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs @@ -7,8 +7,7 @@ use serde_json::json; use crate::output::{error_exit, json_output}; use flowctl_core::state_machine::Status; -use flowctl_db::FlowStore; -use flowctl_service::lifecycle::{ +use flowctl_core::lifecycle::{ BlockTaskRequest, DoneTaskRequest, FailTaskRequest, RestartTaskRequest, StartTaskRequest, }; @@ -24,7 +23,7 @@ pub fn cmd_start(json_mode: bool, id: String, force: bool, _note: Option actor, }; - match flowctl_service::lifecycle::start_task(&flow_dir, req) { + match flowctl_core::lifecycle::start_task(&flow_dir, req) { Ok(resp) => { if json_mode { json_output(json!({ @@ -62,7 +61,7 @@ pub fn cmd_done( actor, }; - match flowctl_service::lifecycle::done_task(&flow_dir, req) { + match flowctl_core::lifecycle::done_task(&flow_dir, req) { Ok(resp) => { if json_mode { let mut result = json!({ @@ -105,7 +104,7 @@ pub fn cmd_block(json_mode: bool, id: String, reason: String) { reason, }; - match flowctl_service::lifecycle::block_task(&flow_dir, req) { + match flowctl_core::lifecycle::block_task(&flow_dir, req) { Ok(resp) => { if json_mode { json_output(json!({ @@ -130,7 +129,7 @@ pub fn cmd_fail(json_mode: bool, id: String, reason: Option, force: bool force, }; - match flowctl_service::lifecycle::fail_task(&flow_dir, req) { + match flowctl_core::lifecycle::fail_task(&flow_dir, req) { Ok(resp) => { if json_mode { let mut result = json!({ @@ -174,7 +173,7 @@ pub fn cmd_restart(json_mode: bool, id: String, dry_run: bool, force: bool) { force, }; - match flowctl_service::lifecycle::restart_task(&flow_dir, req) { + match flowctl_core::lifecycle::restart_task(&flow_dir, req) { Ok(resp) => { if dry_run { if json_mode { @@ -232,11 +231,8 @@ pub fn cmd_restart(json_mode: bool, id: String, dry_run: bool, force: bool) { pub fn cmd_events(json_mode: bool, epic_id: String) { let flow_dir = ensure_flow_exists(); - let store = FlowStore::new(flow_dir.to_path_buf()); - let events_store = store.events(); - // Read all events and filter by epic prefix - match events_store.read_all() { + match flowctl_core::json_store::events_read_all(&flow_dir) { Ok(lines) => { // Parse events and filter by epic let mut matching: Vec = Vec::new(); diff --git a/flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs b/flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs index cb6e42a7..90def2ba 100644 --- a/flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs +++ b/flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs @@ -167,8 +167,7 @@ fn migrate_phase_id(id: &str) -> String { /// Load completed phases from file store, migrating legacy IDs. fn load_completed_phases(flow_dir: &std::path::Path, task_id: &str) -> Vec { - let store = flowctl_db::FlowStore::new(flow_dir.to_path_buf()); - store.phases().get_completed(task_id) + flowctl_core::json_store::phases_completed(flow_dir, task_id) .unwrap_or_default() .into_iter() .map(|id| migrate_phase_id(&id)) @@ -177,8 +176,7 @@ fn load_completed_phases(flow_dir: &std::path::Path, task_id: &str) -> Vec PipelinePhase { - let store = FlowStore::new(flow_dir.to_path_buf()); - match store.pipeline().read(epic_id) { + match json_store::pipeline_read(flow_dir, epic_id) { Ok(Some(phase_str)) => PipelinePhase::parse(&phase_str).unwrap_or(PipelinePhase::Plan), _ => { // No entry — initialize with Plan phase. - let _ = store.pipeline().write(epic_id, "plan"); + let _ = json_store::pipeline_write(flow_dir, epic_id, "plan"); PipelinePhase::Plan } } @@ -56,8 +55,7 @@ fn get_or_init_phase(flow_dir: &std::path::Path, epic_id: &str) -> PipelinePhase /// Update pipeline phase in file store. fn update_phase(flow_dir: &std::path::Path, epic_id: &str, new_phase: &PipelinePhase) { - let store = FlowStore::new(flow_dir.to_path_buf()); - if let Err(e) = store.pipeline().write(epic_id, new_phase.as_str()) { + if let Err(e) = json_store::pipeline_write(flow_dir, epic_id, new_phase.as_str()) { error_exit(&format!("Failed to update pipeline phase: {e}")); } } @@ -90,7 +88,7 @@ fn cmd_phase_done(json: bool, epic_id: &str, phase_name: &str) { let requested = match PipelinePhase::parse(phase_name) { Some(p) => p, None => { - let valid: Vec<&str> = PipelinePhase::all().iter().map(|p| p.as_str()).collect(); + let valid: Vec<&str> = PipelinePhase::all().iter().map(PipelinePhase::as_str).collect(); error_exit(&format!( "Unknown phase '{}'. Valid phases: {}", phase_name, diff --git a/flowctl/crates/flowctl-core/src/approvals.rs b/flowctl/crates/flowctl-core/src/approvals.rs index 2463bc43..1c6ec98a 100644 --- a/flowctl/crates/flowctl-core/src/approvals.rs +++ b/flowctl/crates/flowctl-core/src/approvals.rs @@ -103,6 +103,156 @@ pub struct ResolveRequest { pub reason: Option, } +// ── FileApprovalStore ──────────────────────────────────────────────── + +use std::path::{Path, PathBuf}; +use chrono::Utc; +use crate::error::{ServiceError, ServiceResult}; + +/// File-backed approval store. +pub struct FileApprovalStore { + flow_dir: PathBuf, +} + +impl FileApprovalStore { + pub fn new(flow_dir: PathBuf) -> Self { + Self { flow_dir } + } + + /// Return the flow directory path. + pub fn flow_dir(&self) -> &Path { + &self.flow_dir + } + + fn new_id() -> String { + let now = Utc::now(); + let millis = now.timestamp_millis(); + let nanos = now.timestamp_subsec_nanos(); + format!("apv-{millis:x}-{nanos:x}") + } + + fn load_all(&self) -> ServiceResult> { + let raw = crate::json_store::approvals_read(&self.flow_dir) + .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; + let mut out = Vec::new(); + for val in raw { + if let Ok(a) = serde_json::from_value::(val) { + out.push(a); + } + } + Ok(out) + } + + fn save_all(&self, approvals: &[Approval]) -> ServiceResult<()> { + let vals: Vec = approvals + .iter() + .filter_map(|a| serde_json::to_value(a).ok()) + .collect(); + crate::json_store::approvals_write(&self.flow_dir, &vals) + .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; + Ok(()) + } + + pub fn create(&self, req: CreateApprovalRequest) -> ServiceResult { + // Validate task exists + if crate::json_store::task_read(&self.flow_dir, &req.task_id).is_err() { + return Err(ServiceError::ValidationError(format!( + "task {} does not exist", + req.task_id + ))); + } + + let id = Self::new_id(); + let now = Utc::now().timestamp(); + + let approval = Approval { + id: id.clone(), + task_id: req.task_id, + kind: req.kind, + payload: req.payload, + status: ApprovalStatus::Pending, + created_at: now, + resolved_at: None, + resolver: None, + reason: None, + }; + + let mut all = self.load_all()?; + all.push(approval.clone()); + self.save_all(&all)?; + + Ok(approval) + } + + pub fn list(&self, status_filter: Option) -> ServiceResult> { + let all = self.load_all()?; + let mut filtered: Vec = if let Some(s) = status_filter { + all.into_iter().filter(|a| a.status == s).collect() + } else { + all + }; + filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + Ok(filtered) + } + + pub fn get(&self, id: &str) -> ServiceResult { + let all = self.load_all()?; + all.into_iter() + .find(|a| a.id == id) + .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}"))) + } + + pub fn approve(&self, id: &str, resolver: Option) -> ServiceResult { + let mut all = self.load_all()?; + let approval = all.iter_mut() + .find(|a| a.id == id) + .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; + + if approval.status != ApprovalStatus::Pending { + return Err(ServiceError::InvalidTransition(format!( + "approval {id} is already {:?}", + approval.status + ))); + } + + approval.status = ApprovalStatus::Approved; + approval.resolved_at = Some(Utc::now().timestamp()); + approval.resolver = resolver; + let result = approval.clone(); + + self.save_all(&all)?; + Ok(result) + } + + pub fn reject( + &self, + id: &str, + resolver: Option, + reason: Option, + ) -> ServiceResult { + let mut all = self.load_all()?; + let approval = all.iter_mut() + .find(|a| a.id == id) + .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; + + if approval.status != ApprovalStatus::Pending { + return Err(ServiceError::InvalidTransition(format!( + "approval {id} is already {:?}", + approval.status + ))); + } + + approval.status = ApprovalStatus::Rejected; + approval.resolved_at = Some(Utc::now().timestamp()); + approval.resolver = resolver; + approval.reason = reason; + let result = approval.clone(); + + self.save_all(&all)?; + Ok(result) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/flowctl/crates/flowctl-core/src/changes.rs b/flowctl/crates/flowctl-core/src/changes.rs index 93d30aac..7683c47d 100644 --- a/flowctl/crates/flowctl-core/src/changes.rs +++ b/flowctl/crates/flowctl-core/src/changes.rs @@ -159,6 +159,147 @@ impl Changes { } } +// ── ChangesApplier ────────────────────────────────────────────────── + +use std::path::Path; + +use crate::error::{ServiceError, ServiceResult}; +use crate::json_store; + +/// Convert a `json_store::StoreError` into a `ServiceError`. +fn store_err(e: json_store::StoreError) -> ServiceError { + ServiceError::IoError(std::io::Error::other(e.to_string())) +} + +/// Result of applying a `Changes` batch. +#[derive(Debug)] +pub struct ApplyResult { + /// Number of mutations successfully applied. + pub applied: usize, +} + +/// Executes a `Changes` batch against JSON file storage and the event log. +pub struct ChangesApplier<'a> { + flow_dir: &'a Path, + actor: Option<&'a str>, + session_id: Option<&'a str>, +} + +impl<'a> ChangesApplier<'a> { + pub fn new(flow_dir: &'a Path) -> Self { + Self { + flow_dir, + actor: None, + session_id: None, + } + } + + /// Set the actor (who is applying the changes) for event logging. + pub fn with_actor(mut self, actor: &'a str) -> Self { + self.actor = Some(actor); + self + } + + /// Set the session ID for event logging. + pub fn with_session(mut self, session_id: &'a str) -> Self { + self.session_id = Some(session_id); + self + } + + /// Apply all mutations in order. Stops on first error. + pub fn apply(&self, changes: &Changes) -> ServiceResult { + let mut applied = 0; + + for mutation in &changes.mutations { + self.apply_one(mutation)?; + self.log_event(mutation); + applied += 1; + } + + Ok(ApplyResult { applied }) + } + + /// Apply a single mutation to the JSON file store. + fn apply_one(&self, mutation: &Mutation) -> ServiceResult<()> { + match mutation { + Mutation::CreateEpic { epic } | Mutation::UpdateEpic { epic } => { + json_store::epic_write(self.flow_dir, epic).map_err(store_err)?; + } + Mutation::RemoveEpic { id } => { + json_store::epic_delete(self.flow_dir, id).map_err(store_err)?; + } + Mutation::CreateTask { task } | Mutation::UpdateTask { task } => { + json_store::task_write_definition(self.flow_dir, task).map_err(store_err)?; + } + Mutation::RemoveTask { id } => { + json_store::task_delete(self.flow_dir, id).map_err(store_err)?; + } + Mutation::SetTaskState { task_id, state } => { + json_store::state_write(self.flow_dir, task_id, state).map_err(store_err)?; + } + Mutation::RemoveTaskState { task_id } => { + let path = self.flow_dir.join(".state").join("tasks").join(format!("{task_id}.state.json")); + if path.exists() { + std::fs::remove_file(&path)?; + } + } + Mutation::SetEpicSpec { epic_id, content } => { + json_store::epic_spec_write(self.flow_dir, epic_id, content).map_err(store_err)?; + } + Mutation::RemoveEpicSpec { epic_id } => { + let path = self.flow_dir.join("specs").join(format!("{epic_id}.md")); + if path.exists() { + std::fs::remove_file(&path)?; + } + } + Mutation::SetTaskSpec { task_id, content } => { + json_store::task_spec_write(self.flow_dir, task_id, content).map_err(store_err)?; + } + Mutation::RemoveTaskSpec { task_id } => { + let path = self.flow_dir.join("tasks").join(format!("{task_id}.md")); + if path.exists() { + std::fs::remove_file(&path)?; + } + } + } + Ok(()) + } + + /// Log a mutation to the JSONL event log. Best-effort: failures are ignored. + fn log_event(&self, mutation: &Mutation) { + let event_type = mutation.event_type(); + let entity_id = mutation.entity_id(); + + let epic_id = mutation + .epic_id() + .unwrap_or(entity_id); + let task_id = match mutation { + Mutation::CreateTask { task } | Mutation::UpdateTask { task } => Some(task.id.as_str()), + Mutation::RemoveTask { id } => Some(id.as_str()), + Mutation::SetTaskState { task_id, .. } | Mutation::RemoveTaskState { task_id } => { + Some(task_id.as_str()) + } + Mutation::SetTaskSpec { task_id, .. } | Mutation::RemoveTaskSpec { task_id } => { + Some(task_id.as_str()) + } + _ => None, + }; + + let event = serde_json::json!({ + "stream_id": format!("mutation:{epic_id}"), + "type": event_type, + "entity_id": entity_id, + "epic_id": epic_id, + "task_id": task_id, + "actor": self.actor.unwrap_or("system"), + "session_id": self.session_id.unwrap_or(""), + "timestamp": chrono::Utc::now().to_rfc3339(), + }); + + let _ = json_store::events_append(self.flow_dir, &event.to_string()); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/flowctl/crates/flowctl-core/src/error.rs b/flowctl/crates/flowctl-core/src/error.rs index 09e45b62..682afccb 100644 --- a/flowctl/crates/flowctl-core/src/error.rs +++ b/flowctl/crates/flowctl-core/src/error.rs @@ -54,3 +54,48 @@ pub enum CoreError { #[error("duplicate task ID: {0}")] DuplicateTask(String), } + +// ── ServiceError ──────────────────────────────────────────────────── + +/// Top-level error type for service/lifecycle operations. +#[derive(Debug, Error)] +pub enum ServiceError { + /// Task not found. + #[error("task not found: {0}")] + TaskNotFound(String), + + /// Epic not found. + #[error("epic not found: {0}")] + EpicNotFound(String), + + /// Invalid state transition (e.g., done -> in_progress without restart). + #[error("invalid transition: {0}")] + InvalidTransition(String), + + /// A dependency is not satisfied (blocking task not done/skipped). + #[error("dependency unsatisfied: task {task} blocked by {dependency}")] + DependencyUnsatisfied { task: String, dependency: String }, + + /// Cross-actor violation (e.g., modifying another agent's locked task). + #[error("cross-actor violation: {0}")] + CrossActorViolation(String), + + /// Underlying store error. + #[error("store error: {0}")] + StoreError(#[from] crate::json_store::StoreError), + + /// I/O error (file reads, state directory operations). + #[error("io error: {0}")] + IoError(#[from] std::io::Error), + + /// Validation error (bad input, missing fields, constraint checks). + #[error("validation error: {0}")] + ValidationError(String), + + /// Core-layer error (ID parsing, DAG operations). + #[error("core error: {0}")] + CoreError(#[from] CoreError), +} + +/// Convenience alias used throughout the service layer. +pub type ServiceResult = Result; diff --git a/flowctl/crates/flowctl-core/src/lib.rs b/flowctl/crates/flowctl-core/src/lib.rs index b631e070..6e3c35a7 100644 --- a/flowctl/crates/flowctl-core/src/lib.rs +++ b/flowctl/crates/flowctl-core/src/lib.rs @@ -15,6 +15,7 @@ pub mod error; pub mod frontmatter; pub mod id; pub mod json_store; +pub mod lifecycle; pub mod outputs; pub mod pipeline; pub mod review_protocol; @@ -22,11 +23,12 @@ pub mod state_machine; pub mod types; // Re-export commonly used items at crate root. -pub use changes::{Changes, Mutation}; +pub use changes::{Changes, ChangesApplier, ApplyResult, Mutation}; pub use dag::TaskDag; -pub use error::CoreError; +pub use error::{CoreError, ServiceError, ServiceResult}; pub use id::{parse_id, slugify, EpicId, ParsedId, TaskId}; -pub use outputs::OutputEntry; +pub use outputs::{OutputEntry, OutputsStore}; pub use pipeline::PipelinePhase; pub use state_machine::{Status, Transition, TransitionError}; pub use types::{Epic, Evidence, Phase, Task, TaskSize}; +pub use approvals::FileApprovalStore; diff --git a/flowctl/crates/flowctl-service/src/lifecycle.rs b/flowctl/crates/flowctl-core/src/lifecycle.rs similarity index 94% rename from flowctl/crates/flowctl-service/src/lifecycle.rs rename to flowctl/crates/flowctl-core/src/lifecycle.rs index ee3ccde2..bb73720b 100644 --- a/flowctl/crates/flowctl-service/src/lifecycle.rs +++ b/flowctl/crates/flowctl-core/src/lifecycle.rs @@ -9,14 +9,12 @@ use std::path::Path; use chrono::Utc; -use flowctl_core::id::{epic_id_from_task, is_task_id}; -use flowctl_core::state_machine::{Status, Transition}; -use flowctl_core::types::{ +use crate::id::{epic_id_from_task, is_task_id}; +use crate::state_machine::{Status, Transition}; +use crate::types::{ Epic, EpicStatus, Evidence, RuntimeState, Task, REVIEWS_DIR, }; -use flowctl_db::FlowStore; - use crate::error::{ServiceError, ServiceResult}; // ── Request / Response types ─────────────────────────────────────── @@ -112,15 +110,15 @@ fn validate_task_id(id: &str) -> ServiceResult<()> { /// Load a task from JSON files. fn load_task(flow_dir: &Path, task_id: &str) -> Option { - flowctl_core::json_store::task_read(flow_dir, task_id).ok() + crate::json_store::task_read(flow_dir, task_id).ok() } fn load_epic(flow_dir: &Path, epic_id: &str) -> Option { - flowctl_core::json_store::epic_read(flow_dir, epic_id).ok() + crate::json_store::epic_read(flow_dir, epic_id).ok() } fn get_runtime(flow_dir: &Path, task_id: &str) -> Option { - let state = flowctl_core::json_store::state_read(flow_dir, task_id).ok()?; + let state = crate::json_store::state_read(flow_dir, task_id).ok()?; Some(RuntimeState { task_id: task_id.to_string(), assignee: state.assignee, @@ -141,7 +139,7 @@ fn load_tasks_for_epic( ) -> std::collections::HashMap { use std::collections::HashMap; - if let Ok(tasks) = flowctl_core::json_store::task_list_by_epic(flow_dir, epic_id) { + if let Ok(tasks) = crate::json_store::task_list_by_epic(flow_dir, epic_id) { let mut map = HashMap::new(); for task in tasks { map.insert(task.id.clone(), task); @@ -213,7 +211,7 @@ fn propagate_upstream_failure( let tasks = load_tasks_for_epic(flow_dir, &epic_id); let task_list: Vec = tasks.values().cloned().collect(); - let dag = match flowctl_core::TaskDag::from_tasks(&task_list) { + let dag = match crate::TaskDag::from_tasks(&task_list) { Ok(d) => { if d.detect_cycles().is_some() { return Vec::new(); @@ -236,18 +234,18 @@ fn propagate_upstream_failure( continue; } - if let Ok(mut state) = flowctl_core::json_store::state_read(flow_dir, tid) { + if let Ok(mut state) = crate::json_store::state_read(flow_dir, tid) { state.status = Status::UpstreamFailed; state.updated_at = Utc::now(); - if let Err(e) = flowctl_core::json_store::state_write(flow_dir, tid, &state) { + if let Err(e) = crate::json_store::state_write(flow_dir, tid, &state) { eprintln!("warning: failed to write upstream_failed state for {tid}: {e}"); } } else { - let state = flowctl_core::json_store::TaskState { + let state = crate::json_store::TaskState { status: Status::UpstreamFailed, ..Default::default() }; - if let Err(e) = flowctl_core::json_store::state_write(flow_dir, tid, &state) { + if let Err(e) = crate::json_store::state_write(flow_dir, tid, &state) { eprintln!("warning: failed to write upstream_failed state for {tid}: {e}"); } } @@ -271,7 +269,7 @@ fn handle_task_failure( if max_retries > 0 && current_retry_count < max_retries { let new_retry_count = current_retry_count + 1; - let task_state = flowctl_core::json_store::TaskState { + let task_state = crate::json_store::TaskState { status: Status::UpForRetry, assignee: runtime.as_ref().and_then(|r| r.assignee.clone()), claimed_at: None, @@ -284,18 +282,18 @@ fn handle_task_failure( retry_count: new_retry_count, updated_at: Utc::now(), }; - flowctl_core::json_store::state_write(flow_dir, task_id, &task_state) + crate::json_store::state_write(flow_dir, task_id, &task_state) .map_err(|e| std::io::Error::other(format!("failed to write retry state for {task_id}: {e}")))?; log_audit_event(flow_dir, task_id, "task_failed"); Ok((Status::UpForRetry, Vec::new())) } else { - let task_state = flowctl_core::json_store::TaskState { + let task_state = crate::json_store::TaskState { status: Status::Failed, ..Default::default() }; - flowctl_core::json_store::state_write(flow_dir, task_id, &task_state) + crate::json_store::state_write(flow_dir, task_id, &task_state) .map_err(|e| std::io::Error::other(format!("failed to write failed state for {task_id}: {e}")))?; log_audit_event(flow_dir, task_id, "task_failed"); @@ -423,7 +421,6 @@ fn log_audit_event( event_type: &str, ) { let epic_id = epic_id_from_task(task_id).unwrap_or_default(); - let store = FlowStore::new(flow_dir.to_path_buf()); let event = serde_json::json!({ "stream_id": format!("task:{task_id}"), "type": event_type, @@ -431,7 +428,7 @@ fn log_audit_event( "task_id": task_id, "timestamp": chrono::Utc::now().to_rfc3339(), }); - let _ = store.events().append(&event.to_string()); + let _ = crate::json_store::events_append(flow_dir, &event.to_string()); } /// Emit a task event to the event store. Failures are silently ignored. @@ -441,7 +438,6 @@ fn emit_task_event( event_type: &str, source_cmd: &str, ) { - let store = FlowStore::new(flow_dir.to_path_buf()); let stream_id = format!("task:{task_id}"); let event = serde_json::json!({ "stream_id": stream_id, @@ -450,7 +446,7 @@ fn emit_task_event( "actor": "lifecycle", "timestamp": chrono::Utc::now().to_rfc3339(), }); - let _ = store.events().append(&event.to_string()); + let _ = crate::json_store::events_append(flow_dir, &event.to_string()); } // ── Service functions ────────────────────────────────────────────── @@ -548,7 +544,7 @@ pub fn start_task( Some(now) }; - let task_state = flowctl_core::json_store::TaskState { + let task_state = crate::json_store::TaskState { status: Status::InProgress, assignee: Some(new_assignee), claimed_at, @@ -567,7 +563,7 @@ pub fn start_task( updated_at: Utc::now(), }; - flowctl_core::json_store::state_write(flow_dir, &req.task_id, &task_state) + crate::json_store::state_write(flow_dir, &req.task_id, &task_state) .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; log_audit_event(flow_dir, &req.task_id, "task_started"); @@ -636,7 +632,7 @@ pub fn done_task( prs: prs.clone(), ..Evidence::default() }; - let task_state = flowctl_core::json_store::TaskState { + let task_state = crate::json_store::TaskState { status: Status::Done, assignee: runtime.as_ref().and_then(|r| r.assignee.clone()), claimed_at: runtime.as_ref().and_then(|r| r.claimed_at), @@ -649,7 +645,7 @@ pub fn done_task( retry_count: runtime.as_ref().map(|r| r.retry_count).unwrap_or(0), updated_at: now, }; - flowctl_core::json_store::state_write(flow_dir, &req.task_id, &task_state) + crate::json_store::state_write(flow_dir, &req.task_id, &task_state) .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; // 7. Archive review receipt @@ -723,8 +719,8 @@ pub fn block_task( // Write to JSON state file { - let existing = flowctl_core::json_store::state_read(flow_dir, &req.task_id).ok(); - let task_state = flowctl_core::json_store::TaskState { + let existing = crate::json_store::state_read(flow_dir, &req.task_id).ok(); + let task_state = crate::json_store::TaskState { status: Status::Blocked, assignee: existing.as_ref().and_then(|r| r.assignee.clone()), claimed_at: existing.as_ref().and_then(|r| r.claimed_at), @@ -737,7 +733,7 @@ pub fn block_task( retry_count: existing.as_ref().map(|r| r.retry_count).unwrap_or(0), updated_at: Utc::now(), }; - flowctl_core::json_store::state_write(flow_dir, &req.task_id, &task_state) + crate::json_store::state_write(flow_dir, &req.task_id, &task_state) .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; } @@ -880,8 +876,8 @@ pub fn restart_task( // Execute reset let mut reset_ids = Vec::new(); for tid in &to_reset { - let blank = flowctl_core::json_store::TaskState::default(); - flowctl_core::json_store::state_write(flow_dir, tid, &blank) + let blank = crate::json_store::TaskState::default(); + crate::json_store::state_write(flow_dir, tid, &blank) .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; reset_ids.push(tid.clone()); @@ -904,8 +900,8 @@ pub fn restart_task( #[cfg(test)] mod tests { use super::*; - use flowctl_core::state_machine::Status; - use flowctl_core::types::{Domain, RuntimeState, Task}; + use crate::state_machine::Status; + use crate::types::{Domain, RuntimeState, Task}; fn make_task(id: &str, status: Status) -> Task { Task { diff --git a/flowctl/crates/flowctl-core/src/outputs.rs b/flowctl/crates/flowctl-core/src/outputs.rs index 48f4308d..49e86428 100644 --- a/flowctl/crates/flowctl-core/src/outputs.rs +++ b/flowctl/crates/flowctl-core/src/outputs.rs @@ -5,7 +5,7 @@ //! verified memory system — outputs is a lightweight, file-native narrative //! layer gated on its own `outputs.enabled` config key. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use serde::{Deserialize, Serialize}; @@ -22,3 +22,180 @@ pub struct OutputEntry { /// File mtime as seconds since UNIX epoch. pub mtime: u64, } + +// ── OutputsStore ──────────────────────────────────────────────────── + +use std::fs; +use std::time::UNIX_EPOCH; + +use crate::id::epic_id_from_task; +use crate::error::ServiceResult; + +/// File-system store for `.flow/outputs/*.md` entries. +/// +/// Rooted at `/outputs/`. Callers construct with a `.flow/` path. +pub struct OutputsStore { + root: PathBuf, +} + +impl OutputsStore { + /// Build a store rooted under `/outputs/`. Creates the dir if missing. + pub fn new(flow_dir: &Path) -> ServiceResult { + let root = flow_dir.join("outputs"); + fs::create_dir_all(&root)?; + Ok(Self { root }) + } + + /// Return the path `/.md`. + pub fn path_for(&self, task_id: &str) -> PathBuf { + self.root.join(format!("{task_id}.md")) + } + + /// Write `content` to `/.md`, overwriting any existing file. + /// + /// Returns the absolute path written. + pub fn write(&self, task_id: &str, content: &str) -> ServiceResult { + let path = self.path_for(task_id); + fs::write(&path, content)?; + Ok(path) + } + + /// Read the full content of `/.md`. + pub fn read(&self, task_id: &str) -> ServiceResult { + let path = self.path_for(task_id); + let content = fs::read_to_string(&path)?; + Ok(content) + } + + /// List outputs for an epic, newest-first, optionally capped at `limit`. + /// + /// Matches any `.md` whose epic-id prefix equals `epic_id`. + /// Invalid task IDs are silently skipped. Files with unreadable mtime + /// fall back to mtime=0. + pub fn list_for_epic(&self, epic_id: &str, limit: Option) -> ServiceResult> { + let mut entries: Vec = Vec::new(); + let read_dir = match fs::read_dir(&self.root) { + Ok(rd) => rd, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(entries), + Err(e) => return Err(e.into()), + }; + + for entry in read_dir.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("md") { + continue; + } + let Some(task_id) = path.file_stem().and_then(|s| s.to_str()) else { + continue; + }; + // Derive epic and skip if mismatch. + let Ok(ep) = epic_id_from_task(task_id) else { + continue; + }; + if ep != epic_id { + continue; + } + let mtime = entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_secs()) + .unwrap_or(0); + entries.push(OutputEntry { + task_id: task_id.to_string(), + path: path.clone(), + mtime, + }); + } + + // Newest-first. + entries.sort_by(|a, b| b.mtime.cmp(&a.mtime)); + if let Some(n) = limit { + entries.truncate(n); + } + Ok(entries) + } +} + +// ── Tests ────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + use std::time::Duration; + + fn tmp_flow() -> tempfile::TempDir { + tempfile::tempdir().unwrap() + } + + #[test] + fn write_read_roundtrip() { + let tmp = tmp_flow(); + let store = OutputsStore::new(tmp.path()).unwrap(); + let content = "## Summary\n\nTest summary.\n\n## Surprises\n\nNone.\n"; + let path = store.write("fn-1.1", content).unwrap(); + assert!(path.exists()); + let got = store.read("fn-1.1").unwrap(); + assert_eq!(got, content); + } + + #[test] + fn list_for_epic_filters_by_prefix() { + let tmp = tmp_flow(); + let store = OutputsStore::new(tmp.path()).unwrap(); + store.write("fn-1.1", "a").unwrap(); + store.write("fn-1.2", "b").unwrap(); + store.write("fn-2-other.1", "c").unwrap(); + + let listed = store.list_for_epic("fn-1", None).unwrap(); + assert_eq!(listed.len(), 2); + for e in &listed { + assert!(e.task_id.starts_with("fn-1.")); + } + + let other = store.list_for_epic("fn-2-other", None).unwrap(); + assert_eq!(other.len(), 1); + assert_eq!(other[0].task_id, "fn-2-other.1"); + } + + #[test] + fn list_newest_first_and_limit() { + let tmp = tmp_flow(); + let store = OutputsStore::new(tmp.path()).unwrap(); + store.write("fn-1.1", "first").unwrap(); + sleep(Duration::from_millis(1100)); + store.write("fn-1.2", "second").unwrap(); + sleep(Duration::from_millis(1100)); + store.write("fn-1.3", "third").unwrap(); + + let listed = store.list_for_epic("fn-1", Some(2)).unwrap(); + assert_eq!(listed.len(), 2); + assert_eq!(listed[0].task_id, "fn-1.3"); + assert_eq!(listed[1].task_id, "fn-1.2"); + } + + #[test] + fn list_empty_dir_returns_empty() { + let tmp = tmp_flow(); + let store = OutputsStore::new(tmp.path()).unwrap(); + let listed = store.list_for_epic("fn-1", Some(3)).unwrap(); + assert!(listed.is_empty()); + } + + #[test] + fn skips_non_md_and_invalid_ids() { + let tmp = tmp_flow(); + let store = OutputsStore::new(tmp.path()).unwrap(); + store.write("fn-1.1", "ok").unwrap(); + // Drop a non-md file. + std::fs::write(tmp.path().join("outputs").join("junk.txt"), "x").unwrap(); + // Drop an invalid task-id md. + std::fs::write(tmp.path().join("outputs").join("not-a-task.md"), "x").unwrap(); + + let listed = store.list_for_epic("fn-1", None).unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].task_id, "fn-1.1"); + } +} diff --git a/flowctl/crates/flowctl-db/Cargo.toml b/flowctl/crates/flowctl-db/Cargo.toml deleted file mode 100644 index b2e6f8ad..00000000 --- a/flowctl/crates/flowctl-db/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "flowctl-db" -version = "0.1.0" -description = "Sync file-based storage layer for flowctl" -edition.workspace = true -rust-version.workspace = true -license.workspace = true - -[dependencies] -flowctl-core = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -chrono = { workspace = true } - -[lints] -workspace = true - -[dev-dependencies] -tempfile = "3" diff --git a/flowctl/crates/flowctl-db/src/approvals.rs b/flowctl/crates/flowctl-db/src/approvals.rs deleted file mode 100644 index 8d4741fe..00000000 --- a/flowctl/crates/flowctl-db/src/approvals.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Approval store — delegates to `json_store::approvals_*`. - -use std::path::Path; - -use crate::error::DbError; - -/// Sync approval store backed by `.state/approvals.json`. -pub struct ApprovalStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> ApprovalStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Read all approval records. - pub fn read(&self) -> Result, DbError> { - let approvals = flowctl_core::json_store::approvals_read(self.flow_dir)?; - Ok(approvals) - } - - /// Write approval records (atomic replacement). - pub fn write(&self, approvals: &[serde_json::Value]) -> Result<(), DbError> { - flowctl_core::json_store::approvals_write(self.flow_dir, approvals)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn approvals_round_trip() { - let tmp = TempDir::new().unwrap(); - let store = ApprovalStore::new(tmp.path()); - - assert!(store.read().unwrap().is_empty()); - - let approvals = vec![ - serde_json::json!({"reviewer": "alice", "status": "approved"}), - serde_json::json!({"reviewer": "bob", "status": "needs_work"}), - ]; - store.write(&approvals).unwrap(); - - let read_back = store.read().unwrap(); - assert_eq!(read_back.len(), 2); - assert_eq!(read_back[0]["reviewer"], "alice"); - } -} diff --git a/flowctl/crates/flowctl-db/src/error.rs b/flowctl/crates/flowctl-db/src/error.rs deleted file mode 100644 index b47b8c64..00000000 --- a/flowctl/crates/flowctl-db/src/error.rs +++ /dev/null @@ -1,51 +0,0 @@ -//! Error types for the file-based storage layer. - -use std::fmt; - -/// Unified error type for flowctl-db operations. -#[derive(Debug)] -pub enum DbError { - /// Wraps a `json_store::StoreError`. - Store(flowctl_core::json_store::StoreError), - - /// Serialization / deserialization error. - Serialize(serde_json::Error), - - /// Entity not found. - NotFound(String), - - /// Constraint violation (e.g. file lock conflict). - Constraint(String), - - /// Invalid input. - InvalidInput(String), -} - -impl fmt::Display for DbError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Store(e) => write!(f, "store error: {e}"), - Self::Serialize(e) => write!(f, "serialization error: {e}"), - Self::NotFound(msg) => write!(f, "not found: {msg}"), - Self::Constraint(msg) => write!(f, "constraint violation: {msg}"), - Self::InvalidInput(msg) => write!(f, "invalid input: {msg}"), - } - } -} - -impl std::error::Error for DbError {} - -impl From for DbError { - fn from(e: flowctl_core::json_store::StoreError) -> Self { - match e { - flowctl_core::json_store::StoreError::NotFound(msg) => Self::NotFound(msg), - other => Self::Store(other), - } - } -} - -impl From for DbError { - fn from(e: serde_json::Error) -> Self { - Self::Serialize(e) - } -} diff --git a/flowctl/crates/flowctl-db/src/events.rs b/flowctl/crates/flowctl-db/src/events.rs deleted file mode 100644 index 428e7c9b..00000000 --- a/flowctl/crates/flowctl-db/src/events.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Event store — delegates to `json_store::events_*`. - -use std::path::Path; - -use crate::error::DbError; - -/// Sync event store backed by `.state/events.jsonl`. -pub struct EventStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> EventStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Append a JSON event line to the event log. - pub fn append(&self, event_json: &str) -> Result<(), DbError> { - flowctl_core::json_store::events_append(self.flow_dir, event_json)?; - Ok(()) - } - - /// Read all event lines from the log. - pub fn read_all(&self) -> Result, DbError> { - let lines = flowctl_core::json_store::events_read_all(self.flow_dir)?; - Ok(lines) - } - - /// Read events filtered by stream_id. - pub fn read_by_stream(&self, stream_id: &str) -> Result, DbError> { - let lines = flowctl_core::json_store::events_read_by_stream(self.flow_dir, stream_id)?; - Ok(lines) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn append_and_read() { - let tmp = TempDir::new().unwrap(); - let store = EventStore::new(tmp.path()); - - store.append(r#"{"stream_id":"s1","type":"created"}"#).unwrap(); - store.append(r#"{"stream_id":"s2","type":"updated"}"#).unwrap(); - store.append(r#"{"stream_id":"s1","type":"done"}"#).unwrap(); - - let all = store.read_all().unwrap(); - assert_eq!(all.len(), 3); - - let s1 = store.read_by_stream("s1").unwrap(); - assert_eq!(s1.len(), 2); - } - - #[test] - fn empty_returns_empty() { - let tmp = TempDir::new().unwrap(); - let store = EventStore::new(tmp.path()); - assert!(store.read_all().unwrap().is_empty()); - } -} diff --git a/flowctl/crates/flowctl-db/src/gaps.rs b/flowctl/crates/flowctl-db/src/gaps.rs deleted file mode 100644 index bf87d1f3..00000000 --- a/flowctl/crates/flowctl-db/src/gaps.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! Gap store — delegates to `json_store::gaps_*`. - -use std::path::Path; - -use crate::error::DbError; - -// Re-export the GapEntry type from json_store. -pub use flowctl_core::json_store::GapEntry; - -/// Sync gap store backed by `gaps/.json`. -pub struct GapStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> GapStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Read gaps for an epic. - pub fn read(&self, epic_id: &str) -> Result, DbError> { - let gaps = flowctl_core::json_store::gaps_read(self.flow_dir, epic_id)?; - Ok(gaps) - } - - /// Write gaps for an epic (atomic replacement). - pub fn write(&self, epic_id: &str, gaps: &[GapEntry]) -> Result<(), DbError> { - flowctl_core::json_store::gaps_write(self.flow_dir, epic_id, gaps)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn gaps_round_trip() { - let tmp = TempDir::new().unwrap(); - let store = GapStore::new(tmp.path()); - - assert!(store.read("fn-1").unwrap().is_empty()); - - let gaps = vec![ - GapEntry { - id: 1, - capability: "auth".into(), - priority: "required".into(), - source: "test".into(), - resolved: false, - }, - GapEntry { - id: 2, - capability: "logging".into(), - priority: "nice-to-have".into(), - source: "test".into(), - resolved: true, - }, - ]; - store.write("fn-1", &gaps).unwrap(); - - let read_back = store.read("fn-1").unwrap(); - assert_eq!(read_back.len(), 2); - assert_eq!(read_back[0].capability, "auth"); - assert!(read_back[1].resolved); - } -} diff --git a/flowctl/crates/flowctl-db/src/lib.rs b/flowctl/crates/flowctl-db/src/lib.rs deleted file mode 100644 index a5d38164..00000000 --- a/flowctl/crates/flowctl-db/src/lib.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! flowctl-db: Sync file-based storage layer for flowctl. -//! -//! All I/O is synchronous, delegating to `flowctl_core::json_store`. -//! No async runtime required — pure synchronous file I/O. -//! -//! # Architecture -//! -//! - `FlowStore` is the main entry point, wrapping a `.flow/` directory path. -//! - Sub-stores (`EventStore`, `PipelineStore`, etc.) are accessed via methods. -//! - All data lives as JSON files in the `.flow/` directory tree. - -pub mod approvals; -pub mod error; -pub mod events; -pub mod gaps; -pub mod locks; -pub mod memory; -pub mod phases; -pub mod pipeline; -pub mod store; - -pub use error::DbError; -pub use store::FlowStore; - -// Re-export sub-store types for convenience. -pub use approvals::ApprovalStore; -pub use events::EventStore; -pub use gaps::{GapEntry, GapStore}; -pub use locks::{LockEntry, LockStore}; -pub use memory::MemoryStore; -pub use phases::PhaseStore; -pub use pipeline::PipelineStore; - -// Re-export json_store types that callers may need. -pub use flowctl_core::json_store::TaskState; diff --git a/flowctl/crates/flowctl-db/src/locks.rs b/flowctl/crates/flowctl-db/src/locks.rs deleted file mode 100644 index a827f2ad..00000000 --- a/flowctl/crates/flowctl-db/src/locks.rs +++ /dev/null @@ -1,122 +0,0 @@ -//! File lock store — delegates to `json_store::lock_*` / `locks_*`. - -use std::path::Path; - -use crate::error::DbError; - -// Re-export the LockEntry type from json_store. -pub use flowctl_core::json_store::LockEntry; - -/// Sync lock store backed by `.state/locks.json`. -pub struct LockStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> LockStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Acquire a lock on a file for a task. - /// - /// If another task already holds a lock on the file, returns - /// `DbError::Constraint`. - pub fn acquire(&self, file_path: &str, task_id: &str, mode: &str) -> Result<(), DbError> { - // Check for conflict: another task holding the file. - let locks = flowctl_core::json_store::locks_read(self.flow_dir)?; - for lock in &locks { - if lock.file_path == file_path && lock.task_id != task_id { - return Err(DbError::Constraint(format!( - "file '{}' already locked by task '{}'", - file_path, lock.task_id - ))); - } - } - flowctl_core::json_store::lock_acquire(self.flow_dir, file_path, task_id, mode)?; - Ok(()) - } - - /// Check which task holds a lock on a file. - pub fn check(&self, file_path: &str) -> Result, DbError> { - let locks = flowctl_core::json_store::locks_read(self.flow_dir)?; - for lock in &locks { - if lock.file_path == file_path { - return Ok(Some(lock.task_id.clone())); - } - } - Ok(None) - } - - /// Release all locks held by a task. Returns number released. - pub fn release_for_task(&self, task_id: &str) -> Result { - let n = flowctl_core::json_store::lock_release_task(self.flow_dir, task_id)?; - Ok(n) - } - - /// Release all locks. Returns number released. - pub fn release_all(&self) -> Result { - let n = flowctl_core::json_store::locks_clear(self.flow_dir)?; - Ok(n) - } - - /// List all current locks. - pub fn list(&self) -> Result, DbError> { - let locks = flowctl_core::json_store::locks_read(self.flow_dir)?; - Ok(locks) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn acquire_and_check() { - let tmp = TempDir::new().unwrap(); - let store = LockStore::new(tmp.path()); - - store.acquire("src/a.rs", "t1", "write").unwrap(); - assert_eq!(store.check("src/a.rs").unwrap().as_deref(), Some("t1")); - assert!(store.check("src/missing.rs").unwrap().is_none()); - } - - #[test] - fn acquire_conflict() { - let tmp = TempDir::new().unwrap(); - let store = LockStore::new(tmp.path()); - - store.acquire("src/a.rs", "t1", "write").unwrap(); - let err = store.acquire("src/a.rs", "t2", "write").unwrap_err(); - assert!(matches!(err, DbError::Constraint(_))); - } - - #[test] - fn acquire_idempotent() { - let tmp = TempDir::new().unwrap(); - let store = LockStore::new(tmp.path()); - - store.acquire("src/a.rs", "t1", "write").unwrap(); - store.acquire("src/a.rs", "t1", "write").unwrap(); - assert_eq!(store.check("src/a.rs").unwrap().as_deref(), Some("t1")); - } - - #[test] - fn release_for_task_and_all() { - let tmp = TempDir::new().unwrap(); - let store = LockStore::new(tmp.path()); - - store.acquire("src/a.rs", "t1", "write").unwrap(); - store.acquire("src/b.rs", "t1", "write").unwrap(); - store.acquire("src/c.rs", "t2", "write").unwrap(); - - let n = store.release_for_task("t1").unwrap(); - assert_eq!(n, 2); - assert!(store.check("src/a.rs").unwrap().is_none()); - assert_eq!(store.check("src/c.rs").unwrap().as_deref(), Some("t2")); - - let n2 = store.release_all().unwrap(); - assert_eq!(n2, 1); - assert!(store.check("src/c.rs").unwrap().is_none()); - } -} diff --git a/flowctl/crates/flowctl-db/src/memory.rs b/flowctl/crates/flowctl-db/src/memory.rs deleted file mode 100644 index 9355ef97..00000000 --- a/flowctl/crates/flowctl-db/src/memory.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! Memory store — delegates to `json_store::memory_*`. - -use std::path::Path; - -use crate::error::DbError; - -/// Sync memory store backed by `memory/entries.jsonl`. -pub struct MemoryStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> MemoryStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Append a JSON memory entry. - pub fn append(&self, entry_json: &str) -> Result<(), DbError> { - flowctl_core::json_store::memory_append(self.flow_dir, entry_json)?; - Ok(()) - } - - /// Read all memory entries. - pub fn read_all(&self) -> Result, DbError> { - let entries = flowctl_core::json_store::memory_read_all(self.flow_dir)?; - Ok(entries) - } - - /// Search memory entries by case-insensitive substring match. - pub fn search(&self, query: &str) -> Result, DbError> { - let results = flowctl_core::json_store::memory_search_text(self.flow_dir, query)?; - Ok(results) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn append_and_read() { - let tmp = TempDir::new().unwrap(); - let store = MemoryStore::new(tmp.path()); - - store.append(r#"{"text":"Rust is great"}"#).unwrap(); - store.append(r#"{"text":"Python is also nice"}"#).unwrap(); - - let all = store.read_all().unwrap(); - assert_eq!(all.len(), 2); - } - - #[test] - fn search_text() { - let tmp = TempDir::new().unwrap(); - let store = MemoryStore::new(tmp.path()); - - store.append(r#"{"text":"Rust is great"}"#).unwrap(); - store.append(r#"{"text":"Python is also nice"}"#).unwrap(); - store.append(r#"{"text":"rust patterns"}"#).unwrap(); - - let found = store.search("rust").unwrap(); - assert_eq!(found.len(), 2); - - let none = store.search("javascript").unwrap(); - assert!(none.is_empty()); - } -} diff --git a/flowctl/crates/flowctl-db/src/phases.rs b/flowctl/crates/flowctl-db/src/phases.rs deleted file mode 100644 index 7efe4a80..00000000 --- a/flowctl/crates/flowctl-db/src/phases.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Phase progress store — delegates to `json_store::phase_*` / `phases_*`. - -use std::path::Path; - -use crate::error::DbError; - -/// Sync phase progress store backed by `.state/phases.json`. -pub struct PhaseStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> PhaseStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Mark a phase as completed for a task. - pub fn mark_done(&self, task_id: &str, phase: &str) -> Result<(), DbError> { - flowctl_core::json_store::phase_mark_done(self.flow_dir, task_id, phase)?; - Ok(()) - } - - /// Get all completed phases for a task. - pub fn get_completed(&self, task_id: &str) -> Result, DbError> { - let phases = flowctl_core::json_store::phases_completed(self.flow_dir, task_id)?; - Ok(phases) - } - - /// Reset all phase progress for a task. Returns the number cleared. - pub fn reset(&self, task_id: &str) -> Result<(), DbError> { - flowctl_core::json_store::phases_reset(self.flow_dir, task_id)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn mark_done_and_get() { - let tmp = TempDir::new().unwrap(); - let store = PhaseStore::new(tmp.path()); - - store.mark_done("t1", "plan").unwrap(); - store.mark_done("t1", "implement").unwrap(); - - let phases = store.get_completed("t1").unwrap(); - assert_eq!(phases, vec!["plan", "implement"]); - - // Idempotent re-mark. - store.mark_done("t1", "plan").unwrap(); - assert_eq!(store.get_completed("t1").unwrap().len(), 2); - } - - #[test] - fn reset_clears_phases() { - let tmp = TempDir::new().unwrap(); - let store = PhaseStore::new(tmp.path()); - - store.mark_done("t1", "1").unwrap(); - store.mark_done("t1", "5").unwrap(); - store.mark_done("t2", "1").unwrap(); - - store.reset("t1").unwrap(); - assert!(store.get_completed("t1").unwrap().is_empty()); - assert_eq!(store.get_completed("t2").unwrap(), vec!["1"]); - } -} diff --git a/flowctl/crates/flowctl-db/src/pipeline.rs b/flowctl/crates/flowctl-db/src/pipeline.rs deleted file mode 100644 index 919a41bd..00000000 --- a/flowctl/crates/flowctl-db/src/pipeline.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Pipeline progress store — delegates to `json_store::pipeline_*`. - -use std::path::Path; - -use crate::error::DbError; - -/// Sync pipeline store backed by `.state/pipeline.json`. -pub struct PipelineStore<'a> { - flow_dir: &'a Path, -} - -impl<'a> PipelineStore<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { flow_dir } - } - - /// Read the current pipeline phase for an epic. - pub fn read(&self, epic_id: &str) -> Result, DbError> { - let phase = flowctl_core::json_store::pipeline_read(self.flow_dir, epic_id)?; - Ok(phase) - } - - /// Set the pipeline phase for an epic. - pub fn write(&self, epic_id: &str, phase: &str) -> Result<(), DbError> { - flowctl_core::json_store::pipeline_write(self.flow_dir, epic_id, phase)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn pipeline_read_write() { - let tmp = TempDir::new().unwrap(); - let store = PipelineStore::new(tmp.path()); - - assert_eq!(store.read("fn-1").unwrap(), None); - - store.write("fn-1", "plan").unwrap(); - assert_eq!(store.read("fn-1").unwrap().as_deref(), Some("plan")); - - store.write("fn-1", "work").unwrap(); - assert_eq!(store.read("fn-1").unwrap().as_deref(), Some("work")); - - store.write("fn-2", "plan").unwrap(); - assert_eq!(store.read("fn-2").unwrap().as_deref(), Some("plan")); - assert_eq!(store.read("fn-1").unwrap().as_deref(), Some("work")); - } -} diff --git a/flowctl/crates/flowctl-db/src/store.rs b/flowctl/crates/flowctl-db/src/store.rs deleted file mode 100644 index 3b683197..00000000 --- a/flowctl/crates/flowctl-db/src/store.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! FlowStore — the main entry point for file-based storage. -//! -//! Wraps a `.flow/` directory path and provides access to sub-stores -//! for epics, tasks, events, pipeline, phases, locks, memory, approvals, and gaps. - -use std::path::{Path, PathBuf}; - -use crate::approvals::ApprovalStore; -use crate::events::EventStore; -use crate::gaps::GapStore; -use crate::locks::LockStore; -use crate::memory::MemoryStore; -use crate::phases::PhaseStore; -use crate::pipeline::PipelineStore; - -/// Top-level store backed by a `.flow/` directory. -pub struct FlowStore { - flow_dir: PathBuf, -} - -impl FlowStore { - /// Create a new store rooted at the given `.flow/` directory. - pub fn new(flow_dir: PathBuf) -> Self { - Self { flow_dir } - } - - /// Ensure all required subdirectories exist. - pub fn ensure_dirs(&self) -> Result<(), crate::error::DbError> { - flowctl_core::json_store::ensure_dirs(&self.flow_dir)?; - Ok(()) - } - - /// Return the flow directory path. - pub fn flow_dir(&self) -> &Path { - &self.flow_dir - } - - /// Access the event store. - pub fn events(&self) -> EventStore<'_> { - EventStore::new(&self.flow_dir) - } - - /// Access the pipeline store. - pub fn pipeline(&self) -> PipelineStore<'_> { - PipelineStore::new(&self.flow_dir) - } - - /// Access the phase store. - pub fn phases(&self) -> PhaseStore<'_> { - PhaseStore::new(&self.flow_dir) - } - - /// Access the lock store. - pub fn locks(&self) -> LockStore<'_> { - LockStore::new(&self.flow_dir) - } - - /// Access the memory store. - pub fn memory(&self) -> MemoryStore<'_> { - MemoryStore::new(&self.flow_dir) - } - - /// Access the approval store. - pub fn approvals(&self) -> ApprovalStore<'_> { - ApprovalStore::new(&self.flow_dir) - } - - /// Access the gap store. - pub fn gaps(&self) -> GapStore<'_> { - GapStore::new(&self.flow_dir) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn store_ensure_dirs() { - let tmp = TempDir::new().unwrap(); - let store = FlowStore::new(tmp.path().to_path_buf()); - store.ensure_dirs().unwrap(); - assert!(tmp.path().join("epics").exists()); - assert!(tmp.path().join("tasks").exists()); - assert!(tmp.path().join("specs").exists()); - assert!(tmp.path().join(".state").exists()); - assert!(tmp.path().join("memory").exists()); - } - - #[test] - fn store_accessors_return_sub_stores() { - let tmp = TempDir::new().unwrap(); - let store = FlowStore::new(tmp.path().to_path_buf()); - // Just verify the accessors compile and return the right types. - let _ = store.events(); - let _ = store.pipeline(); - let _ = store.phases(); - let _ = store.locks(); - let _ = store.memory(); - let _ = store.approvals(); - let _ = store.gaps(); - } -} diff --git a/flowctl/crates/flowctl-service/Cargo.toml b/flowctl/crates/flowctl-service/Cargo.toml deleted file mode 100644 index b5652d83..00000000 --- a/flowctl/crates/flowctl-service/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "flowctl-service" -version = "0.1.0" -description = "Business logic service layer for flowctl — unifies CLI, daemon, and MCP execution paths" -edition.workspace = true -rust-version.workspace = true -license.workspace = true - -[dependencies] -flowctl-core = { workspace = true } -flowctl-db = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -chrono = { workspace = true } -thiserror = { workspace = true } -tracing = { workspace = true } - -[lints] -workspace = true - -[dev-dependencies] -tempfile = "3" diff --git a/flowctl/crates/flowctl-service/src/approvals.rs b/flowctl/crates/flowctl-service/src/approvals.rs deleted file mode 100644 index 7db239aa..00000000 --- a/flowctl/crates/flowctl-service/src/approvals.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Approval store: file-based CRUD over `.state/approvals.json`. -//! -//! Wraps `flowctl_core::approvals::Approval` protocol types with persistence. -//! Used by the CLI and MCP to keep approval state consistent. - -use chrono::Utc; - -use flowctl_core::approvals::{ - Approval, ApprovalStatus, CreateApprovalRequest, -}; - -use flowctl_db::FlowStore; - -use crate::error::{ServiceError, ServiceResult}; - -/// File-backed approval store. -pub struct FileApprovalStore { - store: FlowStore, -} - -impl FileApprovalStore { - pub fn new(store: FlowStore) -> Self { - Self { store } - } - - fn new_id() -> String { - let now = Utc::now(); - let millis = now.timestamp_millis(); - let nanos = now.timestamp_subsec_nanos(); - format!("apv-{millis:x}-{nanos:x}") - } - - fn load_all(&self) -> ServiceResult> { - let raw = self.store.approvals().read() - .map_err(ServiceError::DbError)?; - let mut out = Vec::new(); - for val in raw { - if let Ok(a) = serde_json::from_value::(val) { - out.push(a); - } - } - Ok(out) - } - - fn save_all(&self, approvals: &[Approval]) -> ServiceResult<()> { - let vals: Vec = approvals - .iter() - .filter_map(|a| serde_json::to_value(a).ok()) - .collect(); - self.store.approvals().write(&vals) - .map_err(ServiceError::DbError)?; - Ok(()) - } - - pub fn create(&self, req: CreateApprovalRequest) -> ServiceResult { - // Validate task exists - if flowctl_core::json_store::task_read(self.store.flow_dir(), &req.task_id).is_err() { - return Err(ServiceError::ValidationError(format!( - "task {} does not exist", - req.task_id - ))); - } - - let id = Self::new_id(); - let now = Utc::now().timestamp(); - - let approval = Approval { - id: id.clone(), - task_id: req.task_id, - kind: req.kind, - payload: req.payload, - status: ApprovalStatus::Pending, - created_at: now, - resolved_at: None, - resolver: None, - reason: None, - }; - - let mut all = self.load_all()?; - all.push(approval.clone()); - self.save_all(&all)?; - - Ok(approval) - } - - pub fn list(&self, status_filter: Option) -> ServiceResult> { - let all = self.load_all()?; - let mut filtered: Vec = if let Some(s) = status_filter { - all.into_iter().filter(|a| a.status == s).collect() - } else { - all - }; - filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at)); - Ok(filtered) - } - - pub fn get(&self, id: &str) -> ServiceResult { - let all = self.load_all()?; - all.into_iter() - .find(|a| a.id == id) - .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}"))) - } - - pub fn approve(&self, id: &str, resolver: Option) -> ServiceResult { - let mut all = self.load_all()?; - let approval = all.iter_mut() - .find(|a| a.id == id) - .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; - - if approval.status != ApprovalStatus::Pending { - return Err(ServiceError::InvalidTransition(format!( - "approval {id} is already {:?}", - approval.status - ))); - } - - approval.status = ApprovalStatus::Approved; - approval.resolved_at = Some(Utc::now().timestamp()); - approval.resolver = resolver; - let result = approval.clone(); - - self.save_all(&all)?; - Ok(result) - } - - pub fn reject( - &self, - id: &str, - resolver: Option, - reason: Option, - ) -> ServiceResult { - let mut all = self.load_all()?; - let approval = all.iter_mut() - .find(|a| a.id == id) - .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; - - if approval.status != ApprovalStatus::Pending { - return Err(ServiceError::InvalidTransition(format!( - "approval {id} is already {:?}", - approval.status - ))); - } - - approval.status = ApprovalStatus::Rejected; - approval.resolved_at = Some(Utc::now().timestamp()); - approval.resolver = resolver; - approval.reason = reason; - let result = approval.clone(); - - self.save_all(&all)?; - Ok(result) - } -} diff --git a/flowctl/crates/flowctl-service/src/changes.rs b/flowctl/crates/flowctl-service/src/changes.rs deleted file mode 100644 index 28320ce9..00000000 --- a/flowctl/crates/flowctl-service/src/changes.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! Applies a `Changes` batch against JSON files and the JSONL event log. -//! -//! `ChangesApplier` is the single execution point for declarative mutations. -//! It iterates each `Mutation` in order, writes to the `.flow/` JSON store, -//! and auto-logs an event to the JSONL log for auditability. - -use std::path::Path; - -use flowctl_core::changes::{Changes, Mutation}; -use flowctl_core::json_store; -use flowctl_db::FlowStore; - -use crate::error::{ServiceError, ServiceResult}; - -/// Convert a `json_store::StoreError` into a `ServiceError`. -fn store_err(e: json_store::StoreError) -> ServiceError { - ServiceError::IoError(std::io::Error::other(e.to_string())) -} - -/// Result of applying a `Changes` batch. -#[derive(Debug)] -pub struct ApplyResult { - /// Number of mutations successfully applied. - pub applied: usize, -} - -/// Executes a `Changes` batch against JSON file storage and the event log. -pub struct ChangesApplier<'a> { - flow_dir: &'a Path, - actor: Option<&'a str>, - session_id: Option<&'a str>, -} - -impl<'a> ChangesApplier<'a> { - pub fn new(flow_dir: &'a Path) -> Self { - Self { - flow_dir, - actor: None, - session_id: None, - } - } - - /// Set the actor (who is applying the changes) for event logging. - pub fn with_actor(mut self, actor: &'a str) -> Self { - self.actor = Some(actor); - self - } - - /// Set the session ID for event logging. - pub fn with_session(mut self, session_id: &'a str) -> Self { - self.session_id = Some(session_id); - self - } - - /// Apply all mutations in order. Stops on first error. - pub fn apply(&self, changes: &Changes) -> ServiceResult { - let mut applied = 0; - - for mutation in &changes.mutations { - self.apply_one(mutation)?; - self.log_event(mutation); - applied += 1; - } - - Ok(ApplyResult { applied }) - } - - /// Apply a single mutation to the JSON file store. - fn apply_one(&self, mutation: &Mutation) -> ServiceResult<()> { - match mutation { - Mutation::CreateEpic { epic } | Mutation::UpdateEpic { epic } => { - json_store::epic_write(self.flow_dir, epic).map_err(store_err)?; - } - Mutation::RemoveEpic { id } => { - json_store::epic_delete(self.flow_dir, id).map_err(store_err)?; - } - Mutation::CreateTask { task } | Mutation::UpdateTask { task } => { - json_store::task_write_definition(self.flow_dir, task).map_err(store_err)?; - } - Mutation::RemoveTask { id } => { - json_store::task_delete(self.flow_dir, id).map_err(store_err)?; - } - Mutation::SetTaskState { task_id, state } => { - json_store::state_write(self.flow_dir, task_id, state).map_err(store_err)?; - } - Mutation::RemoveTaskState { task_id } => { - let path = self.flow_dir.join(".state").join("tasks").join(format!("{task_id}.state.json")); - if path.exists() { - std::fs::remove_file(&path)?; - } - } - Mutation::SetEpicSpec { epic_id, content } => { - json_store::epic_spec_write(self.flow_dir, epic_id, content).map_err(store_err)?; - } - Mutation::RemoveEpicSpec { epic_id } => { - let path = self.flow_dir.join("specs").join(format!("{epic_id}.md")); - if path.exists() { - std::fs::remove_file(&path)?; - } - } - Mutation::SetTaskSpec { task_id, content } => { - json_store::task_spec_write(self.flow_dir, task_id, content).map_err(store_err)?; - } - Mutation::RemoveTaskSpec { task_id } => { - let path = self.flow_dir.join("tasks").join(format!("{task_id}.md")); - if path.exists() { - std::fs::remove_file(&path)?; - } - } - } - Ok(()) - } - - /// Log a mutation to the JSONL event log. Best-effort: failures are ignored. - fn log_event(&self, mutation: &Mutation) { - let store = FlowStore::new(self.flow_dir.to_path_buf()); - let event_type = mutation.event_type(); - let entity_id = mutation.entity_id(); - - let epic_id = mutation - .epic_id() - .unwrap_or(entity_id); - let task_id = match mutation { - Mutation::CreateTask { task } | Mutation::UpdateTask { task } => Some(task.id.as_str()), - Mutation::RemoveTask { id } => Some(id.as_str()), - Mutation::SetTaskState { task_id, .. } | Mutation::RemoveTaskState { task_id } => { - Some(task_id.as_str()) - } - Mutation::SetTaskSpec { task_id, .. } | Mutation::RemoveTaskSpec { task_id } => { - Some(task_id.as_str()) - } - _ => None, - }; - - let event = serde_json::json!({ - "stream_id": format!("mutation:{epic_id}"), - "type": event_type, - "entity_id": entity_id, - "epic_id": epic_id, - "task_id": task_id, - "actor": self.actor.unwrap_or("system"), - "session_id": self.session_id.unwrap_or(""), - "timestamp": chrono::Utc::now().to_rfc3339(), - }); - - let _ = store.events().append(&event.to_string()); - } -} diff --git a/flowctl/crates/flowctl-service/src/error.rs b/flowctl/crates/flowctl-service/src/error.rs deleted file mode 100644 index dce48414..00000000 --- a/flowctl/crates/flowctl-service/src/error.rs +++ /dev/null @@ -1,50 +0,0 @@ -//! Service-layer error types. -//! -//! `ServiceError` is the canonical error type for all business logic -//! operations. It wraps lower-level errors from `flowctl-core` and -//! `flowctl-db` and adds service-specific variants. - -use thiserror::Error; - -/// Top-level error type for service operations. -#[derive(Debug, Error)] -pub enum ServiceError { - /// Task not found in the database. - #[error("task not found: {0}")] - TaskNotFound(String), - - /// Epic not found in the database. - #[error("epic not found: {0}")] - EpicNotFound(String), - - /// Invalid state transition (e.g., done → in_progress without restart). - #[error("invalid transition: {0}")] - InvalidTransition(String), - - /// A dependency is not satisfied (blocking task not done/skipped). - #[error("dependency unsatisfied: task {task} blocked by {dependency}")] - DependencyUnsatisfied { task: String, dependency: String }, - - /// Cross-actor violation (e.g., modifying another agent's locked task). - #[error("cross-actor violation: {0}")] - CrossActorViolation(String), - - /// Underlying database error. - #[error("database error: {0}")] - DbError(#[from] flowctl_db::DbError), - - /// I/O error (file reads, state directory operations). - #[error("io error: {0}")] - IoError(#[from] std::io::Error), - - /// Validation error (bad input, missing fields, constraint checks). - #[error("validation error: {0}")] - ValidationError(String), - - /// Core-layer error (ID parsing, DAG operations). - #[error("core error: {0}")] - CoreError(#[from] flowctl_core::CoreError), -} - -/// Convenience alias used throughout the service layer. -pub type ServiceResult = Result; diff --git a/flowctl/crates/flowctl-service/src/lib.rs b/flowctl/crates/flowctl-service/src/lib.rs deleted file mode 100644 index 895d33d0..00000000 --- a/flowctl/crates/flowctl-service/src/lib.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! flowctl-service: Business logic service layer for flowctl. -//! -//! This crate provides the canonical business logic that is shared across -//! CLI and MCP execution paths. It sits between the transport layer -//! (CLI commands, MCP protocol) and the storage layer (flowctl-db). -//! -//! # Architecture -//! -//! ```text -//! CLI commands ─┐ -//! MCP server ───┴─► flowctl-service ──► flowctl-db ──► JSON files -//! │ -//! flowctl-core (types, DAG, state machine) -//! ``` -//! -//! All operations are synchronous, using file-based storage. - -pub mod approvals; -pub mod changes; -pub mod error; -pub mod lifecycle; -pub mod outputs; - -// Re-export key types at crate root. -pub use error::{ServiceError, ServiceResult}; diff --git a/flowctl/crates/flowctl-service/src/outputs.rs b/flowctl/crates/flowctl-service/src/outputs.rs deleted file mode 100644 index 80aa5e98..00000000 --- a/flowctl/crates/flowctl-service/src/outputs.rs +++ /dev/null @@ -1,185 +0,0 @@ -//! Outputs store: file-system native, one `.md` file per task. -//! -//! Lives at `.flow/outputs/.md`. Worker writes in Phase 9; the -//! next worker reads the last N during Phase 2 re-anchor. -//! -//! No database table — outputs are narrative handoff artifacts, not verified -//! state. Listing is done by directory scan + epic-prefix filtering. - -use std::fs; -use std::path::{Path, PathBuf}; -use std::time::UNIX_EPOCH; - -use flowctl_core::id::epic_id_from_task; -use flowctl_core::outputs::OutputEntry; - -use crate::error::ServiceResult; - -/// File-system store for `.flow/outputs/*.md` entries. -/// -/// Rooted at `/outputs/`. Callers construct with a `.flow/` path. -pub struct OutputsStore { - root: PathBuf, -} - -impl OutputsStore { - /// Build a store rooted under `/outputs/`. Creates the dir if missing. - pub fn new(flow_dir: &Path) -> ServiceResult { - let root = flow_dir.join("outputs"); - fs::create_dir_all(&root)?; - Ok(Self { root }) - } - - /// Return the path `/.md`. - pub fn path_for(&self, task_id: &str) -> PathBuf { - self.root.join(format!("{task_id}.md")) - } - - /// Write `content` to `/.md`, overwriting any existing file. - /// - /// Returns the absolute path written. - pub fn write(&self, task_id: &str, content: &str) -> ServiceResult { - let path = self.path_for(task_id); - fs::write(&path, content)?; - Ok(path) - } - - /// Read the full content of `/.md`. - pub fn read(&self, task_id: &str) -> ServiceResult { - let path = self.path_for(task_id); - let content = fs::read_to_string(&path)?; - Ok(content) - } - - /// List outputs for an epic, newest-first, optionally capped at `limit`. - /// - /// Matches any `.md` whose epic-id prefix equals `epic_id`. - /// Invalid task IDs are silently skipped. Files with unreadable mtime - /// fall back to mtime=0. - pub fn list_for_epic(&self, epic_id: &str, limit: Option) -> ServiceResult> { - let mut entries: Vec = Vec::new(); - let read_dir = match fs::read_dir(&self.root) { - Ok(rd) => rd, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(entries), - Err(e) => return Err(e.into()), - }; - - for entry in read_dir.flatten() { - let path = entry.path(); - if path.extension().and_then(|e| e.to_str()) != Some("md") { - continue; - } - let Some(task_id) = path.file_stem().and_then(|s| s.to_str()) else { - continue; - }; - // Derive epic and skip if mismatch. - let Ok(ep) = epic_id_from_task(task_id) else { - continue; - }; - if ep != epic_id { - continue; - } - let mtime = entry - .metadata() - .ok() - .and_then(|m| m.modified().ok()) - .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) - .map(|d| d.as_secs()) - .unwrap_or(0); - entries.push(OutputEntry { - task_id: task_id.to_string(), - path: path.clone(), - mtime, - }); - } - - // Newest-first. - entries.sort_by(|a, b| b.mtime.cmp(&a.mtime)); - if let Some(n) = limit { - entries.truncate(n); - } - Ok(entries) - } -} - -// ── Tests ────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use std::thread::sleep; - use std::time::Duration; - - fn tmp_flow() -> tempfile::TempDir { - tempfile::tempdir().unwrap() - } - - #[test] - fn write_read_roundtrip() { - let tmp = tmp_flow(); - let store = OutputsStore::new(tmp.path()).unwrap(); - let content = "## Summary\n\nTest summary.\n\n## Surprises\n\nNone.\n"; - let path = store.write("fn-1.1", content).unwrap(); - assert!(path.exists()); - let got = store.read("fn-1.1").unwrap(); - assert_eq!(got, content); - } - - #[test] - fn list_for_epic_filters_by_prefix() { - let tmp = tmp_flow(); - let store = OutputsStore::new(tmp.path()).unwrap(); - store.write("fn-1.1", "a").unwrap(); - store.write("fn-1.2", "b").unwrap(); - store.write("fn-2-other.1", "c").unwrap(); - - let listed = store.list_for_epic("fn-1", None).unwrap(); - assert_eq!(listed.len(), 2); - for e in &listed { - assert!(e.task_id.starts_with("fn-1.")); - } - - let other = store.list_for_epic("fn-2-other", None).unwrap(); - assert_eq!(other.len(), 1); - assert_eq!(other[0].task_id, "fn-2-other.1"); - } - - #[test] - fn list_newest_first_and_limit() { - let tmp = tmp_flow(); - let store = OutputsStore::new(tmp.path()).unwrap(); - store.write("fn-1.1", "first").unwrap(); - sleep(Duration::from_millis(1100)); - store.write("fn-1.2", "second").unwrap(); - sleep(Duration::from_millis(1100)); - store.write("fn-1.3", "third").unwrap(); - - let listed = store.list_for_epic("fn-1", Some(2)).unwrap(); - assert_eq!(listed.len(), 2); - assert_eq!(listed[0].task_id, "fn-1.3"); - assert_eq!(listed[1].task_id, "fn-1.2"); - } - - #[test] - fn list_empty_dir_returns_empty() { - let tmp = tmp_flow(); - let store = OutputsStore::new(tmp.path()).unwrap(); - let listed = store.list_for_epic("fn-1", Some(3)).unwrap(); - assert!(listed.is_empty()); - } - - #[test] - fn skips_non_md_and_invalid_ids() { - let tmp = tmp_flow(); - let store = OutputsStore::new(tmp.path()).unwrap(); - store.write("fn-1.1", "ok").unwrap(); - // Drop a non-md file. - std::fs::write(tmp.path().join("outputs").join("junk.txt"), "x").unwrap(); - // Drop an invalid task-id md. - std::fs::write(tmp.path().join("outputs").join("not-a-task.md"), "x").unwrap(); - - let listed = store.list_for_epic("fn-1", None).unwrap(); - assert_eq!(listed.len(), 1); - assert_eq!(listed[0].task_id, "fn-1.1"); - } -}