Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bash scripts/ralph_e2e_short_rp_test.sh

All tests create temp directories and clean up after themselves. They must NOT be run from the plugin repo root (safety check enforced).

**Storage runtime**: flowctl is libSQL-only (async, native vector search via `F32_BLOB(384)`). The `flowctl-db` rusqlite crate was deleted in fn-19 — `flowctl-db-lsql` is the sole storage crate. First build downloads the fastembed ONNX model (~130MB) to `.fastembed_cache/` for semantic memory search; subsequent builds/tests reuse the cache.
**Storage runtime**: flowctl is libSQL-only (async, native vector search via `F32_BLOB(384)`). The `flowctl-db` crate was rewritten from rusqlite to libsql in fn-19 and is the sole storage crate. First build downloads the fastembed ONNX model (~130MB) to `.fastembed_cache/` for semantic memory search; subsequent builds/tests reuse the cache.

## Code Quality

Expand Down
8 changes: 4 additions & 4 deletions flowctl/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flowctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
resolver = "2"
members = [
"crates/flowctl-core",
"crates/flowctl-db-lsql",
"crates/flowctl-db",
"crates/flowctl-scheduler",
"crates/flowctl-service",
"crates/flowctl-cli",
Expand Down Expand Up @@ -74,7 +74,7 @@ trycmd = "0.15"

# Internal crate references
flowctl-core = { path = "crates/flowctl-core" }
flowctl-db-lsql = { path = "crates/flowctl-db-lsql" }
flowctl-db = { path = "crates/flowctl-db" }
flowctl-scheduler = { path = "crates/flowctl-scheduler" }
flowctl-service = { path = "crates/flowctl-service" }

Expand Down
4 changes: 2 additions & 2 deletions flowctl/crates/flowctl-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ daemon = ["dep:flowctl-daemon", "dep:axum", "dep:tower-http"]

[dependencies]
flowctl-core = { workspace = true }
flowctl-db-lsql = { workspace = true }
flowctl-db = { workspace = true }
flowctl-service = { workspace = true }
libsql = { workspace = true }
flowctl-scheduler = { workspace = true }
Expand All @@ -42,5 +42,5 @@ trycmd = { workspace = true }
tempfile = "3"
serde_json = { workspace = true }
flowctl-core = { workspace = true }
flowctl-db-lsql = { workspace = true }
flowctl-db = { workspace = true }
flowctl-service = { workspace = true }
78 changes: 39 additions & 39 deletions flowctl/crates/flowctl-cli/src/commands/db_shim.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Sync shim over `flowctl-db-lsql` (async libSQL) providing the same API
//! Sync shim over `flowctl-db` (async libSQL) providing the same API
//! surface as the deprecated `flowctl-db` (rusqlite) crate.
//!
//! Every sync method spins up a per-call `tokio::runtime::Builder::
Expand All @@ -14,8 +14,8 @@

use std::path::{Path, PathBuf};

pub use flowctl_db_lsql::{DbError, ReindexResult};
pub use flowctl_db_lsql::metrics::{
pub use flowctl_db::{DbError, ReindexResult};
pub use flowctl_db::metrics::{
Bottleneck, DoraMetrics, EpicStats, Summary, TokenBreakdown, WeeklyTrend,
};

Expand Down Expand Up @@ -43,16 +43,16 @@ fn block_on<F: std::future::Future>(fut: F) -> F::Output {
// ── Pool functions ──────────────────────────────────────────────────

pub fn resolve_state_dir(working_dir: &Path) -> Result<PathBuf, DbError> {
flowctl_db_lsql::resolve_state_dir(working_dir)
flowctl_db::resolve_state_dir(working_dir)
}

pub fn resolve_db_path(working_dir: &Path) -> Result<PathBuf, DbError> {
flowctl_db_lsql::resolve_db_path(working_dir)
flowctl_db::resolve_db_path(working_dir)
}

pub fn open(working_dir: &Path) -> Result<Connection, DbError> {
block_on(async {
let db = flowctl_db_lsql::open_async(working_dir).await?;
let db = flowctl_db::open_async(working_dir).await?;
let conn = db.connect()?;
// Leak the Database handle to keep it alive for the process lifetime.
// (libsql Database drop closes the file.)
Expand All @@ -62,15 +62,15 @@ pub fn open(working_dir: &Path) -> Result<Connection, DbError> {
}

pub fn cleanup(conn: &Connection) -> Result<u64, DbError> {
block_on(flowctl_db_lsql::cleanup(&conn.inner()))
block_on(flowctl_db::cleanup(&conn.inner()))
}

pub fn reindex(
conn: &Connection,
flow_dir: &Path,
state_dir: Option<&Path>,
) -> Result<ReindexResult, DbError> {
block_on(flowctl_db_lsql::reindex(&conn.inner(), flow_dir, state_dir))
block_on(flowctl_db::reindex(&conn.inner(), flow_dir, state_dir))
}

// ── Epic repository ────────────────────────────────────────────────
Expand All @@ -83,41 +83,41 @@ impl EpicRepo {
}

pub fn get(&self, id: &str) -> Result<flowctl_core::types::Epic, DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).get(id))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).get(id))
}

pub fn get_with_body(
&self,
id: &str,
) -> Result<(flowctl_core::types::Epic, String), DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).get_with_body(id))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).get_with_body(id))
}

pub fn list(
&self,
status: Option<&str>,
) -> Result<Vec<flowctl_core::types::Epic>, DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).list(status))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).list(status))
}

pub fn upsert(&self, epic: &flowctl_core::types::Epic) -> Result<(), DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).upsert(epic))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).upsert(epic))
}

pub fn upsert_with_body(
&self,
epic: &flowctl_core::types::Epic,
body: &str,
) -> Result<(), DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).upsert_with_body(epic, body))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).upsert_with_body(epic, body))
}

pub fn update_status(
&self,
id: &str,
status: flowctl_core::types::EpicStatus,
) -> Result<(), DbError> {
block_on(flowctl_db_lsql::EpicRepo::new(self.0.clone()).update_status(id, status))
block_on(flowctl_db::EpicRepo::new(self.0.clone()).update_status(id, status))
}
}

Expand All @@ -131,49 +131,49 @@ impl TaskRepo {
}

pub fn get(&self, id: &str) -> Result<flowctl_core::types::Task, DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).get(id))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).get(id))
}

pub fn get_with_body(
&self,
id: &str,
) -> Result<(flowctl_core::types::Task, String), DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).get_with_body(id))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).get_with_body(id))
}

pub fn list_by_epic(
&self,
epic_id: &str,
) -> Result<Vec<flowctl_core::types::Task>, DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).list_by_epic(epic_id))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).list_by_epic(epic_id))
}

pub fn list_all(
&self,
status: Option<&str>,
domain: Option<&str>,
) -> Result<Vec<flowctl_core::types::Task>, DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).list_all(status, domain))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).list_all(status, domain))
}

pub fn upsert(&self, task: &flowctl_core::types::Task) -> Result<(), DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).upsert(task))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).upsert(task))
}

pub fn upsert_with_body(
&self,
task: &flowctl_core::types::Task,
body: &str,
) -> Result<(), DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).upsert_with_body(task, body))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).upsert_with_body(task, body))
}

pub fn update_status(
&self,
id: &str,
status: flowctl_core::state_machine::Status,
) -> Result<(), DbError> {
block_on(flowctl_db_lsql::TaskRepo::new(self.0.clone()).update_status(id, status))
block_on(flowctl_db::TaskRepo::new(self.0.clone()).update_status(id, status))
}
}

Expand All @@ -188,18 +188,18 @@ impl DepRepo {

pub fn add_task_dep(&self, task_id: &str, depends_on: &str) -> Result<(), DbError> {
block_on(
flowctl_db_lsql::DepRepo::new(self.0.clone()).add_task_dep(task_id, depends_on),
flowctl_db::DepRepo::new(self.0.clone()).add_task_dep(task_id, depends_on),
)
}

pub fn remove_task_dep(&self, task_id: &str, depends_on: &str) -> Result<(), DbError> {
block_on(
flowctl_db_lsql::DepRepo::new(self.0.clone()).remove_task_dep(task_id, depends_on),
flowctl_db::DepRepo::new(self.0.clone()).remove_task_dep(task_id, depends_on),
)
}

pub fn list_task_deps(&self, task_id: &str) -> Result<Vec<String>, DbError> {
block_on(flowctl_db_lsql::DepRepo::new(self.0.clone()).list_task_deps(task_id))
block_on(flowctl_db::DepRepo::new(self.0.clone()).list_task_deps(task_id))
}

/// Replace all deps for a task (delete-all + insert each).
Expand Down Expand Up @@ -238,14 +238,14 @@ impl RuntimeRepo {
&self,
task_id: &str,
) -> Result<Option<flowctl_core::types::RuntimeState>, DbError> {
block_on(flowctl_db_lsql::RuntimeRepo::new(self.0.clone()).get(task_id))
block_on(flowctl_db::RuntimeRepo::new(self.0.clone()).get(task_id))
}

pub fn upsert(
&self,
state: &flowctl_core::types::RuntimeState,
) -> Result<(), DbError> {
block_on(flowctl_db_lsql::RuntimeRepo::new(self.0.clone()).upsert(state))
block_on(flowctl_db::RuntimeRepo::new(self.0.clone()).upsert(state))
}
}

Expand All @@ -260,20 +260,20 @@ impl FileLockRepo {

pub fn acquire(&self, file_path: &str, task_id: &str) -> Result<(), DbError> {
block_on(
flowctl_db_lsql::FileLockRepo::new(self.0.clone()).acquire(file_path, task_id),
flowctl_db::FileLockRepo::new(self.0.clone()).acquire(file_path, task_id),
)
}

pub fn release_for_task(&self, task_id: &str) -> Result<u64, DbError> {
block_on(flowctl_db_lsql::FileLockRepo::new(self.0.clone()).release_for_task(task_id))
block_on(flowctl_db::FileLockRepo::new(self.0.clone()).release_for_task(task_id))
}

pub fn release_all(&self) -> Result<u64, DbError> {
block_on(flowctl_db_lsql::FileLockRepo::new(self.0.clone()).release_all())
block_on(flowctl_db::FileLockRepo::new(self.0.clone()).release_all())
}

pub fn check(&self, file_path: &str) -> Result<Option<String>, DbError> {
block_on(flowctl_db_lsql::FileLockRepo::new(self.0.clone()).check(file_path))
block_on(flowctl_db::FileLockRepo::new(self.0.clone()).check(file_path))
}

/// List all active locks: (file_path, task_id, locked_at).
Expand Down Expand Up @@ -310,13 +310,13 @@ impl PhaseProgressRepo {

pub fn get_completed(&self, task_id: &str) -> Result<Vec<String>, DbError> {
block_on(
flowctl_db_lsql::PhaseProgressRepo::new(self.0.clone()).get_completed(task_id),
flowctl_db::PhaseProgressRepo::new(self.0.clone()).get_completed(task_id),
)
}

pub fn mark_done(&self, task_id: &str, phase: &str) -> Result<(), DbError> {
block_on(
flowctl_db_lsql::PhaseProgressRepo::new(self.0.clone()).mark_done(task_id, phase),
flowctl_db::PhaseProgressRepo::new(self.0.clone()).mark_done(task_id, phase),
)
}
}
Expand All @@ -331,33 +331,33 @@ impl StatsQuery {
}

pub fn summary(&self) -> Result<Summary, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).summary())
block_on(flowctl_db::StatsQuery::new(self.0.clone()).summary())
}

pub fn per_epic(&self, epic_id: Option<&str>) -> Result<Vec<EpicStats>, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).epic_stats(epic_id))
block_on(flowctl_db::StatsQuery::new(self.0.clone()).epic_stats(epic_id))
}

pub fn weekly_trends(&self, weeks: u32) -> Result<Vec<WeeklyTrend>, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).weekly_trends(weeks))
block_on(flowctl_db::StatsQuery::new(self.0.clone()).weekly_trends(weeks))
}

pub fn token_breakdown(
&self,
epic_id: Option<&str>,
) -> Result<Vec<TokenBreakdown>, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).token_breakdown(epic_id))
block_on(flowctl_db::StatsQuery::new(self.0.clone()).token_breakdown(epic_id))
}

pub fn bottlenecks(&self, limit: usize) -> Result<Vec<Bottleneck>, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).bottlenecks(limit))
block_on(flowctl_db::StatsQuery::new(self.0.clone()).bottlenecks(limit))
}

pub fn dora_metrics(&self) -> Result<DoraMetrics, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).dora_metrics())
block_on(flowctl_db::StatsQuery::new(self.0.clone()).dora_metrics())
}

pub fn generate_monthly_rollups(&self) -> Result<u64, DbError> {
block_on(flowctl_db_lsql::StatsQuery::new(self.0.clone()).generate_monthly_rollups())
block_on(flowctl_db::StatsQuery::new(self.0.clone()).generate_monthly_rollups())
}
}
2 changes: 1 addition & 1 deletion flowctl/crates/flowctl-cli/src/commands/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn mcp_context() -> Result<(PathBuf, Option<libsql::Connection>), String> {
.build()
.map_err(|e| format!("runtime: {e}"))?;
let conn = rt.block_on(async {
let db = flowctl_db_lsql::open_async(&cwd).await.ok()?;
let db = flowctl_db::open_async(&cwd).await.ok()?;
db.connect().ok()
});
Ok((flow_dir, conn))
Expand Down
2 changes: 1 addition & 1 deletion flowctl/crates/flowctl-cli/src/commands/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub(crate) fn try_open_lsql_conn() -> Option<libsql::Connection> {
.build()
.ok()?;
rt.block_on(async {
let db = flowctl_db_lsql::open_async(&cwd).await.ok()?;
let db = flowctl_db::open_async(&cwd).await.ok()?;
db.connect().ok()
})
}
Expand Down
Loading
Loading