Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions flowctl/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions flowctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
resolver = "2"
members = [
"crates/flowctl-core",
"crates/flowctl-db",
"crates/flowctl-service",
"crates/flowctl-cli",
]

Expand Down Expand Up @@ -56,8 +54,6 @@ trycmd = "0.15"

# Internal crate references
flowctl-core = { path = "crates/flowctl-core" }
flowctl-db = { path = "crates/flowctl-db" }
flowctl-service = { path = "crates/flowctl-service" }

# ── Shared lint configuration ────────────────────────────────────────
[workspace.lints.clippy]
Expand Down
4 changes: 0 additions & 4 deletions flowctl/crates/flowctl-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ path = "src/main.rs"

[dependencies]
flowctl-core = { workspace = true }
flowctl-db = { workspace = true }
flowctl-service = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
Expand All @@ -34,5 +32,3 @@ trycmd = { workspace = true }
tempfile = "3"
serde_json = { workspace = true }
flowctl-core = { workspace = true }
flowctl-db = { workspace = true }
flowctl-service = { workspace = true }
3 changes: 1 addition & 2 deletions flowctl/crates/flowctl-cli/src/commands/admin/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ pub fn cmd_init(json: bool) {
}

// Ensure FlowStore dirs are ready
let store = flowctl_db::FlowStore::new(flow_dir.clone());
if let Err(e) = store.ensure_dirs() {
if let Err(e) = flowctl_core::json_store::ensure_dirs(&flow_dir) {
eprintln!("warning: failed to ensure store dirs: {e}");
}

Expand Down
3 changes: 1 addition & 2 deletions flowctl/crates/flowctl-cli/src/commands/admin/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,7 @@ pub fn cmd_doctor(json_mode: bool, workflow: bool) {

// Check 7: stale file locks
{
let store = flowctl_db::FlowStore::new(flow_dir.clone());
match store.locks().list() {
match flowctl_core::json_store::locks_read(&flow_dir) {
Ok(locks) if !locks.is_empty() => {
checks.push(json!({"name": "stale_locks", "status": "warn", "message": format!("{} file lock(s) active — verify with 'flowctl lock-check'", locks.len())}));
}
Expand Down
5 changes: 2 additions & 3 deletions flowctl/crates/flowctl-cli/src/commands/approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use clap::Subcommand;
use serde_json::Value;

use flowctl_core::approvals::{ApprovalKind, ApprovalStatus, CreateApprovalRequest};
use flowctl_service::approvals::FileApprovalStore;
use flowctl_core::approvals::FileApprovalStore;

use crate::output::{error_exit, json_output};

Expand Down Expand Up @@ -80,8 +80,7 @@ pub fn dispatch(cmd: &ApprovalCmd, json: bool) {

fn open_local_store() -> FileApprovalStore {
let flow_dir = get_flow_dir();
let store = flowctl_db::FlowStore::new(flow_dir);
FileApprovalStore::new(store)
FileApprovalStore::new(flow_dir)
}

// ── Payload parsing ─────────────────────────────────────────────────
Expand Down
30 changes: 14 additions & 16 deletions flowctl/crates/flowctl-cli/src/commands/gap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde_json::json;
use crate::output::{error_exit, json_output, pretty_output};

use flowctl_core::id::is_epic_id;
use flowctl_db::{FlowStore, GapEntry};
use flowctl_core::json_store::{self, GapEntry};

use super::helpers::get_flow_dir;

Expand Down Expand Up @@ -112,8 +112,8 @@ fn validate_epic(_json: bool, epic_id: &str) {
error_exit(&format!("Epic not found: {}", epic_id));
}

fn gap_store() -> FlowStore {
FlowStore::new(get_flow_dir())
fn gap_flow_dir() -> std::path::PathBuf {
get_flow_dir()
}

// ── Commands ───────────────────────────────────────────────────────
Expand All @@ -126,10 +126,9 @@ fn cmd_gap_add(
source: &str,
) {
validate_epic(json_mode, epic_id);
let store = gap_store();
let gap_store = store.gaps();
let flow_dir = gap_flow_dir();

let mut gaps = gap_store.read(epic_id).unwrap_or_default();
let mut gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default();

// Check for existing gap with same capability (idempotent)
let cap_lower = capability.trim().to_lowercase();
Expand Down Expand Up @@ -166,7 +165,7 @@ fn cmd_gap_add(
resolved: false,
});

if let Err(e) = gap_store.write(epic_id, &gaps) {
if let Err(e) = json_store::gaps_write(&flow_dir, epic_id, &gaps) {
error_exit(&format!("Failed to add gap: {e}"));
}

Expand All @@ -190,8 +189,8 @@ fn cmd_gap_add(

fn cmd_gap_list(json_mode: bool, epic_id: &str, status_filter: Option<&str>) {
validate_epic(json_mode, epic_id);
let store = gap_store();
let gaps = store.gaps().read(epic_id).unwrap_or_default();
let flow_dir = gap_flow_dir();
let gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default();

let filtered: Vec<&GapEntry> = gaps.iter().filter(|g| {
match status_filter {
Expand Down Expand Up @@ -244,9 +243,8 @@ fn cmd_gap_resolve(
_evidence: &str,
) {
validate_epic(json_mode, epic_id);
let store = gap_store();
let gap_st = store.gaps();
let mut gaps = gap_st.read(epic_id).unwrap_or_default();
let flow_dir = gap_flow_dir();
let mut gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default();

if let Some(direct_id) = gap_id_direct {
let gap_id: u32 = direct_id
Expand All @@ -259,7 +257,7 @@ fn cmd_gap_resolve(
error_exit(&format!("Gap {} not found", gap_id));
}

gap_st.write(epic_id, &gaps).unwrap_or_else(|e| {
json_store::gaps_write(&flow_dir, epic_id, &gaps).unwrap_or_else(|e| {
error_exit(&format!("Failed to resolve gap: {e}"));
});

Expand All @@ -281,7 +279,7 @@ fn cmd_gap_resolve(
error_exit(&format!("Gap for capability '{}' not found", cap));
}

gap_st.write(epic_id, &gaps).unwrap_or_else(|e| {
json_store::gaps_write(&flow_dir, epic_id, &gaps).unwrap_or_else(|e| {
error_exit(&format!("Failed to resolve gap: {e}"));
});

Expand All @@ -301,8 +299,8 @@ fn cmd_gap_resolve(

fn cmd_gap_check(json_mode: bool, epic_id: &str) {
validate_epic(json_mode, epic_id);
let store = gap_store();
let all_gaps = store.gaps().read(epic_id).unwrap_or_default();
let flow_dir = gap_flow_dir();
let all_gaps = json_store::gaps_read(&flow_dir, epic_id).unwrap_or_default();

let open_blocking: Vec<&GapEntry> = all_gaps
.iter()
Expand Down
2 changes: 1 addition & 1 deletion flowctl/crates/flowctl-cli/src/commands/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<(), std::io::Error> {
/// Returns the number of mutations applied. Calls `error_exit` on failure.
pub fn apply_changes(flow_dir: &Path, changes: &flowctl_core::changes::Changes) -> usize {
use crate::output::error_exit;
use flowctl_service::changes::ChangesApplier;
use flowctl_core::changes::ChangesApplier;

if changes.is_empty() {
return 0;
Expand Down
8 changes: 3 additions & 5 deletions flowctl/crates/flowctl-cli/src/commands/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde_json::json;
use crate::output::{error_exit, json_output, pretty_output};
use super::helpers::get_flow_dir;

use flowctl_db::FlowStore;
use flowctl_core::json_store;

#[derive(Subcommand, Debug)]
pub enum LogCmd {
Expand Down Expand Up @@ -65,7 +65,6 @@ fn cmd_log_decision(
task_id: Option<&str>,
) {
let flow_dir = get_flow_dir();
let store = FlowStore::new(flow_dir);

let epic = epic_id.unwrap_or("_global");

Expand All @@ -80,7 +79,7 @@ fn cmd_log_decision(
"timestamp": chrono::Utc::now().to_rfc3339(),
});

if let Err(e) = store.events().append(&event.to_string()) {
if let Err(e) = json_store::events_append(&flow_dir, &event.to_string()) {
error_exit(&format!("Failed to log decision: {e}"));
}

Expand All @@ -100,9 +99,8 @@ fn cmd_log_decision(

fn cmd_log_decisions(json_mode: bool, epic_id: Option<&str>, limit: usize) {
let flow_dir = get_flow_dir();
let store = FlowStore::new(flow_dir);

let all_lines = store.events().read_all().unwrap_or_else(|e| {
let all_lines = json_store::events_read_all(&flow_dir).unwrap_or_else(|e| {
error_exit(&format!("Failed to query decisions: {e}"));
});

Expand Down
4 changes: 2 additions & 2 deletions flowctl/crates/flowctl-cli/src/commands/outputs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Outputs commands: write, list, show.
//!
//! Thin CLI wrapper over `flowctl_service::outputs::OutputsStore`. Provides
//! Thin CLI wrapper over `flowctl_core::outputs::OutputsStore`. Provides
//! a lightweight narrative handoff layer at `.flow/outputs/<task-id>.md` that
//! workers populate in Phase 9 and read during Phase 2 re-anchor.

Expand All @@ -11,7 +11,7 @@ use clap::Subcommand;
use serde_json::json;

use flowctl_core::id::is_task_id;
use flowctl_service::outputs::OutputsStore;
use flowctl_core::outputs::OutputsStore;

use crate::output::{error_exit, json_output};

Expand Down
75 changes: 32 additions & 43 deletions flowctl/crates/flowctl-cli/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,21 +599,21 @@ pub fn cmd_lock(json: bool, task: String, files: String, mode: String) {
error_exit("No files specified for locking.");
}

let store = flowctl_db::FlowStore::new(flow_dir);
let lock_store = store.locks();

let mut locked = Vec::new();
let mut already_locked = Vec::new();

for file in &file_list {
match lock_store.acquire(file, &task, &mode) {
Ok(()) => locked.push(file.to_string()),
Err(flowctl_db::DbError::Constraint(msg)) => {
let holder = lock_store.check(file).ok().flatten().unwrap_or_default();
already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder)], "detail": msg}));
}
Err(e) => {
error_exit(&format!("Failed to lock {}: {}", file, e));
// Check for conflict: another task holding the file.
let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default();
let conflict = locks.iter().find(|l| l.file_path == *file && l.task_id != task);
if let Some(holder) = conflict {
already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)}));
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reporting an already-locked conflict, the code formats the owner as holder.task_id with the requested {mode} rather than the actual mode stored in the existing lock entry. This can misreport the current lock state; use holder.mode (and/or include it in the detail string) instead of the input mode.

Suggested change
already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)}));
already_locked.push(json!({
"file": file,
"owners": [format!("{}({})", holder.task_id, holder.mode)],
"detail": format!(
"file '{}' already locked by task '{}' (mode: {})",
file, holder.task_id, holder.mode
)
}));

Copilot uses AI. Check for mistakes.
} else {
match flowctl_core::json_store::lock_acquire(&flow_dir, file, &task, &mode) {
Ok(()) => locked.push(file.to_string()),
Comment on lines 605 to +613
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd_lock re-reads locks.json from disk for every file in file_list. This adds unnecessary I/O for multi-file lock operations; consider reading once before the loop and updating the in-memory set as locks are acquired (or batching conflict detection).

Copilot uses AI. Check for mistakes.
Err(e) => {
error_exit(&format!("Failed to lock {}: {}", file, e));
}
}
}
}
Expand Down Expand Up @@ -641,11 +641,8 @@ pub fn cmd_lock(json: bool, task: String, files: String, mode: String) {

pub fn cmd_unlock(json: bool, task: Option<String>, _files: Option<String>, all: bool) {
let flow_dir = ensure_flow_exists();
let store = flowctl_db::FlowStore::new(flow_dir);
let lock_store = store.locks();

if all {
match lock_store.release_all() {
match flowctl_core::json_store::locks_clear(&flow_dir) {
Ok(count) => {
if json {
json_output(json!({
Expand All @@ -668,7 +665,7 @@ pub fn cmd_unlock(json: bool, task: Option<String>, _files: Option<String>, all:
}
};

match lock_store.release_for_task(&task_id) {
match flowctl_core::json_store::lock_release_task(&flow_dir, &task_id) {
Ok(count) => {
if json {
json_output(json!({
Expand All @@ -686,39 +683,31 @@ pub fn cmd_unlock(json: bool, task: Option<String>, _files: Option<String>, all:

pub fn cmd_lock_check(json: bool, file: Option<String>) {
let flow_dir = ensure_flow_exists();
let store = flowctl_db::FlowStore::new(flow_dir);
let lock_store = store.locks();

match file {
Some(f) => {
match lock_store.check(&f) {
Ok(Some(task_id)) => {
if json {
json_output(json!({
"file": f,
"locked": true,
"locks": [{"task_id": task_id, "mode": "write"}],
}));
} else {
println!("{}: locked by {}", f, task_id);
}
}
Ok(None) => {
if json {
json_output(json!({
"file": f,
"locked": false,
}));
} else {
println!("{}: not locked", f);
}
let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default();
let holder = locks.iter().find(|l| l.file_path == f).map(|l| l.task_id.clone());
if let Some(task_id) = holder {
if json {
json_output(json!({
"file": f,
"locked": true,
"locks": [{"task_id": task_id, "mode": "write"}],
Comment on lines +689 to +695
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cmd_lock_check(Some(file)), JSON output hardcodes the lock mode to "write" even though locks store a mode field. This can produce incorrect output for read locks; emit the actual mode from the matched lock entry.

Suggested change
let holder = locks.iter().find(|l| l.file_path == f).map(|l| l.task_id.clone());
if let Some(task_id) = holder {
if json {
json_output(json!({
"file": f,
"locked": true,
"locks": [{"task_id": task_id, "mode": "write"}],
let holder = locks.iter().find(|l| l.file_path == f);
if let Some(lock) = holder {
let task_id = lock.task_id.clone();
if json {
json_output(json!({
"file": f,
"locked": true,
"locks": [{"task_id": task_id, "mode": lock.mode.clone()}],

Copilot uses AI. Check for mistakes.
}));
} else {
println!("{}: locked by {}", f, task_id);
}
Err(e) => error_exit(&format!("Failed to check lock: {}", e)),
} else if json {
json_output(json!({
"file": f,
"locked": false,
}));
} else {
println!("{}: not locked", f);
}
}
None => {
let entries = lock_store
.list()
let entries = flowctl_core::json_store::locks_read(&flow_dir)
.unwrap_or_else(|e| { error_exit(&format!("Query failed: {}", e)); });
let locks: Vec<serde_json::Value> = entries
.into_iter()
Expand Down
3 changes: 1 addition & 2 deletions flowctl/crates/flowctl-cli/src/commands/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ fn cmd_summary(json_flag: bool) {
}
}

let store = flowctl_db::FlowStore::new(flow_dir);
let total_events = store.events().read_all().map(|v| v.len() as i64).unwrap_or(0);
let total_events = flowctl_core::json_store::events_read_all(&flow_dir).map(|v| v.len() as i64).unwrap_or(0);

if should_json(json_flag) {
json_output(json!({
Expand Down
Loading
Loading