refactor(flowctl): merge 4 crates into 2 (core + cli)#24
Conversation
Delete flowctl-db and flowctl-service crates entirely: - FlowStore was a thin wrapper around json_store — callers use json_store directly now - lifecycle.rs, ChangesApplier, FileApprovalStore, OutputsStore moved into flowctl-core - ServiceError merged into flowctl-core::error Result: cli → core (zero intermediate crates) 284 tests pass, clean build. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR collapses the previous cli → service → db → core layering into cli → core by removing the flowctl-db and flowctl-service crates and moving their business logic / persistence helpers into flowctl-core.
Changes:
- Deleted
flowctl-dbandflowctl-servicecrates and removed them from the workspace/dependency graph. - Moved service-layer functionality into
flowctl-core(lifecycle operations, approvals store, outputs store, andChangesApplier). - Updated
flowctl-clicommands to callflowctl_core::{lifecycle, json_store, ...}directly.
Reviewed changes
Copilot reviewed 37 out of 38 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| flowctl/crates/flowctl-service/src/outputs.rs | Removed outputs store implementation (moved to core). |
| flowctl/crates/flowctl-service/src/lib.rs | Deleted service crate root module. |
| flowctl/crates/flowctl-service/src/error.rs | Deleted service error types (replaced in core). |
| flowctl/crates/flowctl-service/src/changes.rs | Removed service-side ChangesApplier (moved to core). |
| flowctl/crates/flowctl-service/src/approvals.rs | Removed file approval store (moved to core). |
| flowctl/crates/flowctl-service/Cargo.toml | Removed service crate manifest. |
| flowctl/crates/flowctl-db/src/store.rs | Removed FlowStore entrypoint. |
| flowctl/crates/flowctl-db/src/pipeline.rs | Removed pipeline store wrapper. |
| flowctl/crates/flowctl-db/src/phases.rs | Removed phase store wrapper. |
| flowctl/crates/flowctl-db/src/memory.rs | Removed memory store wrapper. |
| flowctl/crates/flowctl-db/src/locks.rs | Removed locks store wrapper. |
| flowctl/crates/flowctl-db/src/lib.rs | Removed db crate root module and re-exports. |
| flowctl/crates/flowctl-db/src/gaps.rs | Removed gaps store wrapper. |
| flowctl/crates/flowctl-db/src/events.rs | Removed events store wrapper. |
| flowctl/crates/flowctl-db/src/error.rs | Removed db error type. |
| flowctl/crates/flowctl-db/src/approvals.rs | Removed approvals store wrapper. |
| flowctl/crates/flowctl-db/Cargo.toml | Removed db crate manifest. |
| flowctl/crates/flowctl-core/src/outputs.rs | Added OutputsStore implementation to core. |
| flowctl/crates/flowctl-core/src/lifecycle.rs | Switched lifecycle to use core json_store directly (no db/service). |
| flowctl/crates/flowctl-core/src/lib.rs | Re-exported moved APIs (lifecycle, OutputsStore, ChangesApplier, FileApprovalStore, service errors). |
| flowctl/crates/flowctl-core/src/error.rs | Added ServiceError/ServiceResult to core. |
| flowctl/crates/flowctl-core/src/changes.rs | Added ChangesApplier/ApplyResult to core. |
| flowctl/crates/flowctl-core/src/approvals.rs | Added FileApprovalStore to core. |
| flowctl/crates/flowctl-cli/src/commands/workflow/pipeline_phase.rs | Replaced FlowStore usage with json_store helpers. |
| flowctl/crates/flowctl-cli/src/commands/workflow/phase.rs | Replaced phase store wrapper usage with json_store helpers. |
| flowctl/crates/flowctl-cli/src/commands/workflow/lifecycle.rs | Pointed lifecycle commands at flowctl_core::lifecycle. |
| flowctl/crates/flowctl-cli/src/commands/stats.rs | Replaced events store wrapper usage with json_store helpers. |
| flowctl/crates/flowctl-cli/src/commands/query.rs | Replaced lock store wrapper usage with direct json_store locking. |
| flowctl/crates/flowctl-cli/src/commands/outputs.rs | Updated outputs command to use core OutputsStore. |
| flowctl/crates/flowctl-cli/src/commands/log.rs | Replaced event appends/reads with json_store helpers. |
| flowctl/crates/flowctl-cli/src/commands/helpers.rs | Updated apply_changes to use core ChangesApplier. |
| flowctl/crates/flowctl-cli/src/commands/gap.rs | Replaced gaps store wrapper usage with json_store helpers. |
| flowctl/crates/flowctl-cli/src/commands/approval.rs | Updated to use core FileApprovalStore. |
| flowctl/crates/flowctl-cli/src/commands/admin/status.rs | Replaced lock listing with json_store::locks_read. |
| flowctl/crates/flowctl-cli/src/commands/admin/init.rs | Replaced ensure_dirs call with json_store::ensure_dirs. |
| flowctl/crates/flowctl-cli/Cargo.toml | Removed dependencies on deleted crates. |
| flowctl/Cargo.toml | Removed deleted crates from workspace members/deps. |
| flowctl/Cargo.lock | Removed deleted packages from lockfile. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| /// Convert a `json_store::StoreError` into a `ServiceError`. | ||
| fn store_err(e: json_store::StoreError) -> ServiceError { | ||
| ServiceError::IoError(std::io::Error::other(e.to_string())) |
There was a problem hiding this comment.
store_err converts json_store::StoreError into ServiceError::IoError, even though ServiceError has a dedicated StoreError(#[from] StoreError) variant. This loses the original error kind/context (e.g., NotFound vs Io) and makes diagnostics less accurate; prefer propagating the StoreError directly (e.g., map_err(ServiceError::from) or ServiceError::StoreError(e)).
| ServiceError::IoError(std::io::Error::other(e.to_string())) | |
| ServiceError::from(e) |
| /// Apply all mutations in order. Stops on first error. | ||
| pub fn apply(&self, changes: &Changes) -> ServiceResult<ApplyResult> { | ||
| let mut applied = 0; | ||
|
|
||
| for mutation in &changes.mutations { | ||
| self.apply_one(mutation)?; | ||
| self.log_event(mutation); | ||
| applied += 1; |
There was a problem hiding this comment.
ChangesApplier introduces side-effecting behavior (writing JSON entities and appending to the event log), but there are no tests covering apply/apply_one behavior. Adding at least a round-trip test (apply create/update/remove and assert files + event log entries) would help prevent regressions during future refactors.
| fn load_all(&self) -> ServiceResult<Vec<Approval>> { | ||
| let raw = crate::json_store::approvals_read(&self.flow_dir) | ||
| .map_err(|e| ServiceError::IoError(std::io::Error::other(e.to_string())))?; | ||
| let mut out = Vec::new(); |
There was a problem hiding this comment.
load_all maps json_store::approvals_read failures into ServiceError::IoError with a stringified message. Since ServiceError already supports StoreError(#[from] StoreError), prefer preserving the structured store error instead of flattening it into an opaque I/O error.
| .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}"))) | ||
| } | ||
|
|
||
| pub fn approve(&self, id: &str, resolver: Option<String>) -> ServiceResult<Approval> { | ||
| let mut all = self.load_all()?; | ||
| let approval = all.iter_mut() | ||
| .find(|a| a.id == id) | ||
| .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; |
There was a problem hiding this comment.
get/approve/reject return ServiceError::TaskNotFound when an approval ID is missing, which produces misleading errors like "task not found: approval not found: ...". Consider introducing an ApprovalNotFound variant or reusing a more accurate variant (e.g., ValidationError/StoreError::NotFound) so the message matches the failing entity type.
| .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}"))) | |
| } | |
| pub fn approve(&self, id: &str, resolver: Option<String>) -> ServiceResult<Approval> { | |
| let mut all = self.load_all()?; | |
| let approval = all.iter_mut() | |
| .find(|a| a.id == id) | |
| .ok_or_else(|| ServiceError::TaskNotFound(format!("approval not found: {id}")))?; | |
| .ok_or_else(|| ServiceError::ValidationError(format!("approval not found: {id}"))) | |
| } | |
| pub fn approve(&self, id: &str, resolver: Option<String>) -> ServiceResult<Approval> { | |
| let mut all = self.load_all()?; | |
| let approval = all.iter_mut() | |
| .find(|a| a.id == id) | |
| .ok_or_else(|| ServiceError::ValidationError(format!("approval not found: {id}")))?; |
| pub fn create(&self, req: CreateApprovalRequest) -> ServiceResult<Approval> { | ||
| // Validate task exists | ||
| if crate::json_store::task_read(&self.flow_dir, &req.task_id).is_err() { | ||
| return Err(ServiceError::ValidationError(format!( | ||
| "task {} does not exist", | ||
| req.task_id | ||
| ))); | ||
| } | ||
|
|
||
| let id = Self::new_id(); | ||
| let now = Utc::now().timestamp(); | ||
|
|
||
| let approval = Approval { | ||
| id: id.clone(), | ||
| task_id: req.task_id, | ||
| kind: req.kind, | ||
| payload: req.payload, | ||
| status: ApprovalStatus::Pending, | ||
| created_at: now, | ||
| resolved_at: None, | ||
| resolver: None, | ||
| reason: None, | ||
| }; | ||
|
|
||
| let mut all = self.load_all()?; | ||
| all.push(approval.clone()); | ||
| self.save_all(&all)?; | ||
|
|
||
| Ok(approval) | ||
| } |
There was a problem hiding this comment.
FileApprovalStore persistence behavior (create/list/get/approve/reject and serialization to .state/approvals.json) is untested in this module. Adding a tempfile-backed test that exercises the full lifecycle would help validate the refactor that moved this store into flowctl-core.
| for file in &file_list { | ||
| match lock_store.acquire(file, &task, &mode) { | ||
| Ok(()) => locked.push(file.to_string()), | ||
| Err(flowctl_db::DbError::Constraint(msg)) => { | ||
| let holder = lock_store.check(file).ok().flatten().unwrap_or_default(); | ||
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder)], "detail": msg})); | ||
| } | ||
| Err(e) => { | ||
| error_exit(&format!("Failed to lock {}: {}", file, e)); | ||
| // Check for conflict: another task holding the file. | ||
| let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default(); | ||
| let conflict = locks.iter().find(|l| l.file_path == *file && l.task_id != task); | ||
| if let Some(holder) = conflict { | ||
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)})); | ||
| } else { | ||
| match flowctl_core::json_store::lock_acquire(&flow_dir, file, &task, &mode) { | ||
| Ok(()) => locked.push(file.to_string()), |
There was a problem hiding this comment.
cmd_lock re-reads locks.json from disk for every file in file_list. This adds unnecessary I/O for multi-file lock operations; consider reading once before the loop and updating the in-memory set as locks are acquired (or batching conflict detection).
| let locks = flowctl_core::json_store::locks_read(&flow_dir).unwrap_or_default(); | ||
| let conflict = locks.iter().find(|l| l.file_path == *file && l.task_id != task); | ||
| if let Some(holder) = conflict { | ||
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)})); |
There was a problem hiding this comment.
When reporting an already-locked conflict, the code formats the owner as holder.task_id with the requested {mode} rather than the actual mode stored in the existing lock entry. This can misreport the current lock state; use holder.mode (and/or include it in the detail string) instead of the input mode.
| already_locked.push(json!({"file": file, "owners": [format!("{}({mode})", holder.task_id)], "detail": format!("file '{}' already locked by task '{}'", file, holder.task_id)})); | |
| already_locked.push(json!({ | |
| "file": file, | |
| "owners": [format!("{}({})", holder.task_id, holder.mode)], | |
| "detail": format!( | |
| "file '{}' already locked by task '{}' (mode: {})", | |
| file, holder.task_id, holder.mode | |
| ) | |
| })); |
| let holder = locks.iter().find(|l| l.file_path == f).map(|l| l.task_id.clone()); | ||
| if let Some(task_id) = holder { | ||
| if json { | ||
| json_output(json!({ | ||
| "file": f, | ||
| "locked": true, | ||
| "locks": [{"task_id": task_id, "mode": "write"}], |
There was a problem hiding this comment.
In cmd_lock_check(Some(file)), JSON output hardcodes the lock mode to "write" even though locks store a mode field. This can produce incorrect output for read locks; emit the actual mode from the matched lock entry.
| let holder = locks.iter().find(|l| l.file_path == f).map(|l| l.task_id.clone()); | |
| if let Some(task_id) = holder { | |
| if json { | |
| json_output(json!({ | |
| "file": f, | |
| "locked": true, | |
| "locks": [{"task_id": task_id, "mode": "write"}], | |
| let holder = locks.iter().find(|l| l.file_path == f); | |
| if let Some(lock) = holder { | |
| let task_id = lock.task_id.clone(); | |
| if json { | |
| json_output(json!({ | |
| "file": f, | |
| "locked": true, | |
| "locks": [{"task_id": task_id, "mode": lock.mode.clone()}], |
Summary
flowctl-dbcrate (FlowStore was just a json_store wrapper)flowctl-servicecrate (lifecycle/changes/approvals moved to core)cli → service → db → core→cli → core🤖 Generated with Claude Code