From 7d9fe9ccb6bd23a22c64f15de8f6c1462f8b98b2 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 16:23:41 -0400 Subject: [PATCH 01/15] Consolidate kind constants, add helpers, remove dead code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Centralize KIND_AUTH (22242), KIND_CANVAS (40100), EPHEMERAL_KIND_MIN/MAX into sprout-core/src/kind.rs; remove 3 duplicate local definitions - Add helpers: is_ephemeral(), is_workflow_execution_kind(), event_kind_u32(), event_kind_i32() — eliminates 9 cast chains - Add compile-time value assertions (kind constants fit in u16) - Remove dead Redis subscriber spawn (zero consumers) - Remove dead cron scheduler stub spawn (no-op loop) - Wire TYPESENSE_COLLECTION env var in relay main.rs - Document sprout-mcp re-export dependency for sprout-test-client - Add TYPESENSE_COLLECTION to README config table Zero behavior change. All unit, E2E, and integration tests pass. --- Cargo.lock | 1 + README.md | 1 + crates/sprout-audit/src/service.rs | 3 +- crates/sprout-core/src/kind.rs | 39 +++++++++++++++++++++++ crates/sprout-db/src/event.rs | 10 ++---- crates/sprout-db/src/feed.rs | 3 +- crates/sprout-mcp/Cargo.toml | 3 ++ crates/sprout-mcp/src/lib.rs | 4 +++ crates/sprout-mcp/src/server.rs | 8 +++-- crates/sprout-relay/src/api/feed.rs | 6 ++-- crates/sprout-relay/src/handlers/event.rs | 16 ++++------ crates/sprout-relay/src/main.rs | 14 ++++---- crates/sprout-search/src/index.rs | 3 +- crates/sprout-workflow/src/lib.rs | 9 +++--- 14 files changed, 85 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 921f686d9..83d6202b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2723,6 +2723,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "sprout-core", "thiserror", "tokio", "tokio-tungstenite 0.26.2", diff --git a/README.md b/README.md index 5da7c74a0..dc22c97bb 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,7 @@ Copy `.env.example` to `.env`. All defaults work with `docker compose up` out of | `REDIS_URL` | `redis://localhost:6379` | Redis connection string | | `TYPESENSE_URL` | `http://localhost:8108` | Typesense base URL | | `TYPESENSE_API_KEY` | `sprout_dev_key` | Typesense API key | +| `TYPESENSE_COLLECTION` | `events` | Typesense collection name | | `SPROUT_BIND_ADDR` | `0.0.0.0:3000` | Relay bind address (host:port) | | `RELAY_URL` | `ws://localhost:3000` | Public URL (used in NIP-42 challenges) | | `SPROUT_REQUIRE_AUTH_TOKEN` | `false` | Require bearer token for auth (set `true` in production) | diff --git a/crates/sprout-audit/src/service.rs b/crates/sprout-audit/src/service.rs index 508d11314..0a09bf641 100644 --- a/crates/sprout-audit/src/service.rs +++ b/crates/sprout-audit/src/service.rs @@ -3,6 +3,8 @@ use futures_util::FutureExt as _; use sqlx::{Acquire, MySqlPool, Row}; use tracing::{debug, instrument, warn}; +use sprout_core::kind::KIND_AUTH; + use crate::{ action::AuditAction, entry::{AuditEntry, NewAuditEntry}, @@ -11,7 +13,6 @@ use crate::{ schema::AUDIT_SCHEMA_SQL, }; -const KIND_AUTH: u32 = 22242; const AUDIT_LOCK_NAME: &str = "sprout_audit"; const AUDIT_LOCK_TIMEOUT_SECS: i64 = 10; diff --git a/crates/sprout-core/src/kind.rs b/crates/sprout-core/src/kind.rs index 15e25865e..931269e79 100644 --- a/crates/sprout-core/src/kind.rs +++ b/crates/sprout-core/src/kind.rs @@ -17,6 +17,8 @@ pub const KIND_REACTION: u32 = 7; pub const KIND_GIFT_WRAP: u32 = 1059; /// NIP-94: File metadata attachment. pub const KIND_FILE_METADATA: u32 = 1063; +/// NIP-42 auth event — never stored (carries bearer tokens). +pub const KIND_AUTH: u32 = 22242; // NIP-29 group admin events /// NIP-29: Add a user to a group. @@ -56,6 +58,11 @@ pub const KIND_NIP29_GROUP_MEMBERS: u32 = 39002; /// NIP-29: Addressable group roles definition. pub const KIND_NIP29_GROUP_ROLES: u32 = 39003; +/// Lower bound of the ephemeral event range (20000–29999). Never stored. +pub const EPHEMERAL_KIND_MIN: u32 = 20000; +/// Upper bound of the ephemeral event range (20000–29999). Never stored. +pub const EPHEMERAL_KIND_MAX: u32 = 29999; + // Ephemeral events (20000–29999) — Redis pub/sub only, never stored. /// Ephemeral: user presence update (online/away/offline). pub const KIND_PRESENCE_UPDATE: u32 = 20001; @@ -77,6 +84,8 @@ pub const KIND_STREAM_MESSAGE_BOOKMARKED: u32 = 40005; pub const KIND_STREAM_MESSAGE_SCHEDULED: u32 = 40006; /// A reminder attached to a stream message or time. pub const KIND_STREAM_REMINDER: u32 = 40007; +/// Canvas (shared document) for a channel. +pub const KIND_CANVAS: u32 = 40100; // Direct messages (41000–41999) /// A new direct-message conversation was created. @@ -215,6 +224,7 @@ pub const ALL_KINDS: &[u32] = &[ KIND_STREAM_MESSAGE_BOOKMARKED, KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_REMINDER, + KIND_CANVAS, KIND_DM_CREATED, KIND_DM_MEMBER_ADDED, KIND_DM_MEMBER_REMOVED, @@ -260,6 +270,35 @@ pub const ALL_KINDS: &[u32] = &[ KIND_HUDDLE_RECORDING_AVAILABLE, ]; +/// Returns `true` if `kind` is in the ephemeral range (20000–29999). +pub const fn is_ephemeral(kind: u32) -> bool { + kind >= EPHEMERAL_KIND_MIN && kind <= EPHEMERAL_KIND_MAX +} + +/// Returns `true` if `kind` is a workflow execution event (46001–46012). +/// These must not trigger workflows (prevents infinite loops). +pub const fn is_workflow_execution_kind(kind: u32) -> bool { + kind >= KIND_WORKFLOW_TRIGGERED && kind <= KIND_WORKFLOW_APPROVAL_DENIED +} + +/// Extract the kind from a nostr Event as u32. +/// NIP-01 specifies kind as an unsigned integer; u32 covers the full range. +pub fn event_kind_u32(event: &nostr::Event) -> u32 { + event.kind.as_u16() as u32 +} + +/// Extract the kind from a nostr Event as i32 (for MySQL INT columns). +/// Safe: all Sprout kinds fit in i32 (max 65535 < i32::MAX). +pub fn event_kind_i32(event: &nostr::Event) -> i32 { + event.kind.as_u16() as i32 +} + +// Compile-time: all Sprout kind constants fit in nostr's u16-backed Kind. +const _: () = assert!(KIND_AUTH <= u16::MAX as u32); +const _: () = assert!(KIND_CANVAS <= u16::MAX as u32); +const _: () = assert!(KIND_HUDDLE_RECORDING_AVAILABLE <= u16::MAX as u32); +const _: () = assert!(EPHEMERAL_KIND_MIN < EPHEMERAL_KIND_MAX); + #[cfg(test)] mod tests { use super::*; diff --git a/crates/sprout-db/src/event.rs b/crates/sprout-db/src/event.rs index 5fe01d1e4..5136c2ee2 100644 --- a/crates/sprout-db/src/event.rs +++ b/crates/sprout-db/src/event.rs @@ -9,15 +9,11 @@ use nostr::Event; use sqlx::{MySqlPool, QueryBuilder, Row}; use uuid::Uuid; +use sprout_core::kind::{KIND_AUTH, is_ephemeral, event_kind_i32}; use sprout_core::StoredEvent; use crate::error::{DbError, Result}; -/// NIP-42 auth event kind — never stored (carries bearer tokens). -const KIND_AUTH: u32 = 22242; -const EPHEMERAL_KIND_MIN: u32 = 20000; -const EPHEMERAL_KIND_MAX: u32 = 29999; - /// Optional filters for [`query_events`]. #[derive(Debug, Default, Clone)] pub struct EventQuery { @@ -51,7 +47,7 @@ pub async fn insert_event( if kind_u32 == KIND_AUTH { return Err(DbError::AuthEventRejected); } - if (EPHEMERAL_KIND_MIN..=EPHEMERAL_KIND_MAX).contains(&kind_u32) { + if is_ephemeral(kind_u32) { return Err(DbError::EphemeralEventRejected(kind_u16)); } @@ -60,7 +56,7 @@ pub async fn insert_event( let sig_bytes = event.sig.serialize(); let tags_json = serde_json::to_value(&event.tags)?; // Cast chain: nostr Kind (u16) → i32 (MySQL INT column). Safe: all Sprout kinds fit in i32. - let kind_i32 = event.kind.as_u16() as i32; + let kind_i32 = event_kind_i32(event); let created_at_secs = event.created_at.as_u64() as i64; let created_at = DateTime::from_timestamp(created_at_secs, 0) .ok_or(DbError::InvalidTimestamp(created_at_secs))?; diff --git a/crates/sprout-db/src/feed.rs b/crates/sprout-db/src/feed.rs index 8eabc1c7b..38965ffe0 100644 --- a/crates/sprout-db/src/feed.rs +++ b/crates/sprout-db/src/feed.rs @@ -407,7 +407,8 @@ mod tests { KIND_JOB_RESULT, ]; - for kind in 46001u32..=46012 { + use sprout_core::kind::{KIND_WORKFLOW_TRIGGERED, KIND_WORKFLOW_APPROVAL_DENIED}; + for kind in KIND_WORKFLOW_TRIGGERED..=KIND_WORKFLOW_APPROVAL_DENIED { assert!( !activity_kinds.contains(&kind), "workflow execution kind {kind} must NOT be in activity" diff --git a/crates/sprout-mcp/Cargo.toml b/crates/sprout-mcp/Cargo.toml index da873d6d8..1f27ccd3b 100644 --- a/crates/sprout-mcp/Cargo.toml +++ b/crates/sprout-mcp/Cargo.toml @@ -12,6 +12,9 @@ name = "sprout-mcp-server" path = "src/main.rs" [dependencies] +# Sprout core types +sprout-core = { workspace = true } + # MCP SDK rmcp = { workspace = true } schemars = { workspace = true } diff --git a/crates/sprout-mcp/src/lib.rs b/crates/sprout-mcp/src/lib.rs index 03d272960..3740cdc97 100644 --- a/crates/sprout-mcp/src/lib.rs +++ b/crates/sprout-mcp/src/lib.rs @@ -94,6 +94,10 @@ //! [Sprout]: https://github.com/sprout-rs/sprout //! [NIP-42]: https://github.com/nostr-protocol/nips/blob/master/42.md +// NOTE: `parse_relay_message`, `OkResponse`, and `RelayMessage` from `relay_client` +// are re-exported by `sprout-test-client`. Changes to these types are a breaking +// change for the test harness. + /// WebSocket client for the Sprout relay (NIP-42 auth, subscriptions, reconnect). pub mod relay_client; /// MCP tool implementations backed by the relay client. diff --git a/crates/sprout-mcp/src/server.rs b/crates/sprout-mcp/src/server.rs index 114080931..6bf8adfcf 100644 --- a/crates/sprout-mcp/src/server.rs +++ b/crates/sprout-mcp/src/server.rs @@ -1,3 +1,5 @@ +use sprout_core::kind::{KIND_CANVAS, event_kind_u32}; + use rmcp::{ handler::server::{router::tool::ToolRouter, wrapper::Parameters}, model::{ServerCapabilities, ServerInfo}, @@ -310,7 +312,7 @@ impl SproutMcpServer { "id": event.id.to_hex(), "pubkey": event.pubkey.to_hex(), "content": event.content, - "kind": event.kind.as_u16() as u32, + "kind": event_kind_u32(event), "created_at": event.created_at.as_u64(), }) }) @@ -381,7 +383,7 @@ impl SproutMcpServer { nostr::SingleLetterTag::lowercase(nostr::Alphabet::E), [p.channel_id.as_str()], ) - .kind(nostr::Kind::Custom(40100)) + .kind(nostr::Kind::Custom(KIND_CANVAS as u16)) .limit(1); let sub_id = format!("canvas-{}", uuid::Uuid::new_v4()); @@ -415,7 +417,7 @@ impl SproutMcpServer { Err(e) => return format!("Error building tag: {e}"), }; - let event = match nostr::EventBuilder::new(nostr::Kind::Custom(40100), &p.content, [e_tag]) + let event = match nostr::EventBuilder::new(nostr::Kind::Custom(KIND_CANVAS as u16), &p.content, [e_tag]) .sign_with_keys(&keys) { Ok(e) => e, diff --git a/crates/sprout-relay/src/api/feed.rs b/crates/sprout-relay/src/api/feed.rs index 0e7dc01ea..c2ed7e5c7 100644 --- a/crates/sprout-relay/src/api/feed.rs +++ b/crates/sprout-relay/src/api/feed.rs @@ -17,7 +17,7 @@ use axum::{ use chrono::{DateTime, Duration, Utc}; use serde::Deserialize; -use sprout_core::kind; +use sprout_core::kind::{self, event_kind_u32}; use crate::state::AppState; @@ -114,7 +114,7 @@ pub async fn feed_handler( // 3. Partition activity into agent activity vs channel activity. let (agent_activity, channel_activity): (Vec<_>, Vec<_>) = activity_all .into_iter() - .partition(|e| AGENT_KINDS.contains(&(e.event.kind.as_u16() as u32))); + .partition(|e| AGENT_KINDS.contains(&event_kind_u32(&e.event))); // 4. Enrich events with channel names (batch lookup). let all_channels = state.db.list_channels(None).await.unwrap_or_else(|e| { @@ -144,7 +144,7 @@ pub async fn feed_handler( serde_json::json!({ "id": event.event.id.to_hex(), - "kind": event.event.kind.as_u16() as u32, + "kind": event_kind_u32(&event.event), "pubkey": event.event.pubkey.to_hex(), "content": event.event.content, "created_at": event.event.created_at.as_u64(), diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index 31eff02d0..7dc5cb16e 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -7,7 +7,9 @@ use tracing::{debug, error, info, warn}; use nostr::Event; use sprout_audit::{AuditAction, NewAuditEntry}; use sprout_core::event::StoredEvent; -use sprout_core::kind::KIND_PRESENCE_UPDATE; +use sprout_core::kind::{ + event_kind_u32, is_ephemeral, is_workflow_execution_kind, KIND_AUTH, KIND_PRESENCE_UPDATE, +}; use sprout_core::verification::verify_event; use sprout_auth::Scope; @@ -16,14 +18,10 @@ use crate::connection::{AuthState, ConnectionState}; use crate::protocol::RelayMessage; use crate::state::AppState; -const KIND_AUTH: u32 = 22242; -const EPHEMERAL_MIN: u32 = 20000; -const EPHEMERAL_MAX: u32 = 29999; - /// Handle an EVENT message: authenticate, verify, store, fan-out, index, and audit the event. pub async fn handle_event(event: Event, conn: Arc, state: Arc) { let event_id_hex = event.id.to_hex(); - let kind_u32 = event.kind.as_u16() as u32; + let kind_u32 = event_kind_u32(&event); debug!(event_id = %event_id_hex, kind = kind_u32, "EVENT"); let (conn_id, pubkey_hex, pubkey_bytes, auth_pubkey) = { @@ -76,7 +74,7 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc, state: Arc 128 { let mut end = 128; diff --git a/crates/sprout-relay/src/main.rs b/crates/sprout-relay/src/main.rs index 6bd3633f5..944ab4c30 100644 --- a/crates/sprout-relay/src/main.rs +++ b/crates/sprout-relay/src/main.rs @@ -68,15 +68,17 @@ async fn main() -> anyhow::Result<()> { ); info!("Redis pub/sub connected"); - let pubsub_clone = Arc::clone(&pubsub); - tokio::spawn(async move { pubsub_clone.run_subscriber().await }); + // TODO: spawn pubsub.run_subscriber() for multi-node fan-out. + // Currently no consumer calls subscribe_local(), so the subscriber + // would process Redis messages into a broadcast channel with zero receivers. let auth = AuthService::new(config.auth.clone()); let search_config = SearchConfig { url: config.typesense_url.clone(), api_key: config.typesense_key.clone(), - collection: "events".to_string(), + collection: std::env::var("TYPESENSE_COLLECTION") + .unwrap_or_else(|_| "events".to_string()), }; let search = SearchService::new(search_config); if let Err(e) = search.ensure_collection().await { @@ -86,9 +88,9 @@ async fn main() -> anyhow::Result<()> { let workflow_config = sprout_workflow::WorkflowConfig::default(); let workflow_engine = Arc::new(WorkflowEngine::new(db.clone(), workflow_config)); - // Spawn cron scheduler background task - let wf_clone = Arc::clone(&workflow_engine); - tokio::spawn(async move { wf_clone.run().await }); + // TODO: spawn workflow_engine cron scheduler when cron trigger + // matching is implemented (WF-07). Currently WorkflowEngine::run() + // is a stub that sleeps and logs. let state = Arc::new(AppState::new( config.clone(), diff --git a/crates/sprout-search/src/index.rs b/crates/sprout-search/src/index.rs index f5b4dc63b..6eba8884a 100644 --- a/crates/sprout-search/src/index.rs +++ b/crates/sprout-search/src/index.rs @@ -4,6 +4,7 @@ use serde_json::{json, Value}; use tracing::{debug, warn}; use sprout_core::event::StoredEvent; +use sprout_core::kind::event_kind_i32; use crate::error::SearchError; @@ -34,7 +35,7 @@ pub fn event_to_document(event: &StoredEvent) -> Result { "id": nostr_event.id.to_string(), "content": nostr_event.content.as_str(), // Cast to i32 for Typesense schema (int32 field). nostr Kind is u16; all Sprout kinds fit in i32. - "kind": nostr_event.kind.as_u16() as i32, + "kind": event_kind_i32(nostr_event), "pubkey": nostr_event.pubkey.to_string(), "channel_id": channel_id, "created_at": nostr_event.created_at.as_u64() as i64, diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 01da68780..eb8204704 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -37,6 +37,7 @@ pub use schema::{ActionDef, Step, TriggerDef, WorkflowDef}; use std::sync::Arc; +use sprout_core::kind::{is_workflow_execution_kind, event_kind_u32}; use sprout_db::Db; use tokio::sync::Semaphore; @@ -101,11 +102,11 @@ impl WorkflowEngine { return Ok(()); }; - let kind_u32 = event.event.kind.as_u16() as u32; + let kind_u32 = event_kind_u32(&event.event); // Exclude workflow execution events to prevent infinite loops. // See Decision 10 in PLANS/SPROUT_WORKFLOWS.md. - if (46001..=46012).contains(&kind_u32) { + if is_workflow_execution_kind(kind_u32) { return Ok(()); } @@ -257,7 +258,7 @@ steps: &trigger, sprout_core::kind::KIND_REACTION )); - assert!(!trigger_matches_event(&trigger, 46001)); + assert!(!trigger_matches_event(&trigger, sprout_core::kind::KIND_WORKFLOW_TRIGGERED)); } #[test] @@ -326,7 +327,7 @@ steps: let msg_trigger = TriggerDef::MessagePosted { filter: None }; let react_trigger = TriggerDef::ReactionAdded { emoji: None }; - for kind in 46001u32..=46012 { + for kind in sprout_core::kind::KIND_WORKFLOW_TRIGGERED..=sprout_core::kind::KIND_WORKFLOW_APPROVAL_DENIED { assert!( !trigger_matches_event(&msg_trigger, kind), "message_posted should not match workflow execution kind {kind}" From 4e5d0045718f8214dcf770e4910796e62ae04393 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 17:04:47 -0400 Subject: [PATCH 02/15] =?UTF-8?q?Wire=20workflow=20execution=20pipeline:?= =?UTF-8?q?=20on=5Fevent()=20=E2=86=92=20run=20creation=20=E2=86=92=20asyn?= =?UTF-8?q?c=20execution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - on_event() fully wired: trigger matching → filter evaluation → TriggerContext build → DB run creation → async executor spawn - Changed on_event() signature to self: &Arc for spawn compatibility - Extracted build_trigger_context() as pub helper (StoredEvent → TriggerContext) - Re-enabled cron spawn in relay main.rs (stub loop, WF-09) - Added 6 unit tests for build_trigger_context - Added 2 E2E tests: event-driven workflow execution + filter evaluation Crossfire fixes (GPT-5-4 + Claude): - Fix reaction message_id: extract target from NIP-25 'e' tag, not reaction event ID - Fix pending-stuck runs: mark Failed if Running transition fails - Wire run_semaphore: acquire permit before spawning execution - Warn on trace serialization failure instead of silent fallback All 280+ unit tests pass, all 6 E2E workflow tests pass, clippy clean. --- Cargo.lock | 1 + crates/sprout-relay/src/main.rs | 6 +- .../sprout-test-client/tests/e2e_workflows.rs | 209 +++++++++ crates/sprout-workflow/Cargo.toml | 3 + crates/sprout-workflow/src/lib.rs | 402 +++++++++++++++++- 5 files changed, 603 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83d6202b1..5eeeb97a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2848,6 +2848,7 @@ dependencies = [ "chrono", "cron", "evalexpr", + "nostr", "reqwest", "serde", "serde_json", diff --git a/crates/sprout-relay/src/main.rs b/crates/sprout-relay/src/main.rs index 944ab4c30..96f78b12b 100644 --- a/crates/sprout-relay/src/main.rs +++ b/crates/sprout-relay/src/main.rs @@ -88,9 +88,9 @@ async fn main() -> anyhow::Result<()> { let workflow_config = sprout_workflow::WorkflowConfig::default(); let workflow_engine = Arc::new(WorkflowEngine::new(db.clone(), workflow_config)); - // TODO: spawn workflow_engine cron scheduler when cron trigger - // matching is implemented (WF-07). Currently WorkflowEngine::run() - // is a stub that sleeps and logs. + // Spawn cron scheduler background task. + let wf_cron = Arc::clone(&workflow_engine); + tokio::spawn(async move { wf_cron.run().await }); let state = Arc::new(AppState::new( config.clone(), diff --git a/crates/sprout-test-client/tests/e2e_workflows.rs b/crates/sprout-test-client/tests/e2e_workflows.rs index c348438b6..d8445e7b9 100644 --- a/crates/sprout-test-client/tests/e2e_workflows.rs +++ b/crates/sprout-test-client/tests/e2e_workflows.rs @@ -317,6 +317,215 @@ async fn test_trigger_workflow_and_check_run() { assert_eq!(del_status, 204, "cleanup DELETE should return 204"); } +// ── Test 5: Event-driven workflow execution ─────────────────────────────────── + +/// Send a kind:40001 message to a channel that has a `message_posted` workflow. +/// Verify that the workflow engine creates a run record. +/// +/// NOTE: Uses `SEEDED_PUBKEY` for workflow ownership due to the FK constraint +/// on `workflows.owner_pubkey`. The WebSocket sender uses fresh keys. +#[tokio::test] +#[ignore = "requires running relay"] +async fn test_event_driven_workflow_execution() { + use nostr::{Kind, Tag}; + use sprout_test_client::SproutTestClient; + + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a message_posted workflow in the general channel ─────── + let workflow_yaml = r#"name: event-driven-e2e-test +description: E2E test for message_posted trigger +trigger: + on: message_posted +steps: + - id: step1 + name: Acknowledge + action: send_message + text: "Workflow fired by event" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + // ── Step 2: Connect via WebSocket and send a kind:40001 message ─────────── + // Use fresh keys for the sender (channel is open, no auth required to post). + let sender_keys = Keys::generate(); + let mut ws_client = SproutTestClient::connect(&relay_ws_url(), &sender_keys) + .await + .expect("ws connect failed"); + + let e_tag = Tag::parse(&["e", CHANNEL_GENERAL]).expect("tag parse failed"); + let event = nostr::EventBuilder::new( + Kind::Custom(40001), + "trigger this workflow please", + [e_tag], + ) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client.send_event(event).await.expect("send event failed"); + + // ── Step 3: Wait for the workflow engine to process the event ───────────── + tokio::time::sleep(Duration::from_secs(3)).await; + + // ── Step 4: Check that a run was created ────────────────────────────────── + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs failed"); + assert_eq!(runs_resp.status(), 200, "GET runs must return 200"); + + let runs: Vec = runs_resp.json().await.expect("runs must be JSON array"); + assert!( + !runs.is_empty(), + "expected at least one workflow run after sending kind:40001 event" + ); + + let run = &runs[0]; + assert!(run.get("id").is_some(), "run missing 'id'"); + assert!(run.get("workflow_id").is_some(), "run missing 'workflow_id'"); + assert!(run.get("status").is_some(), "run missing 'status'"); + + let status = run["status"].as_str().unwrap_or(""); + assert!( + matches!(status, "pending" | "running" | "completed" | "failed"), + "run status '{status}' is not a recognized value" + ); + + // Clean up. + let _ = ws_client.disconnect().await; + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} + +// ── Test 6: Event-driven workflow with filter ───────────────────────────────── + +/// Verify that a `message_posted` workflow with a filter expression only fires +/// when the filter matches. +/// +/// 1. Create a workflow with `filter: "str_contains(trigger_text, \"P1\")"`. +/// 2. Send a message that does NOT contain "P1" — expect zero runs. +/// 3. Send a message that DOES contain "P1" — expect one run. +/// +/// NOTE: Filter evaluation is wired in WF-07. Until then, all matched-kind +/// events fire the workflow regardless of filter. This test documents the +/// intended behaviour so it can be un-skipped once WF-07 lands. +#[tokio::test] +#[ignore = "requires running relay with WF-07 filter evaluation"] +async fn test_event_driven_workflow_with_filter() { + use nostr::{Kind, Tag}; + use sprout_test_client::SproutTestClient; + + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a filtered message_posted workflow ───────────────────── + let workflow_yaml = r#"name: filtered-event-e2e-test +description: E2E test for message_posted trigger with filter +trigger: + on: message_posted + filter: "str_contains(trigger_text, \"P1\")" +steps: + - id: step1 + name: Notify + action: send_message + text: "P1 incident detected" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + let sender_keys = Keys::generate(); + let mut ws_client = SproutTestClient::connect(&relay_ws_url(), &sender_keys) + .await + .expect("ws connect failed"); + + // ── Step 2: Send a message that does NOT match the filter ───────────────── + let e_tag = Tag::parse(&["e", CHANNEL_GENERAL]).expect("tag parse failed"); + let non_matching = nostr::EventBuilder::new( + Kind::Custom(40001), + "this is a routine update, nothing urgent", + [e_tag.clone()], + ) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client + .send_event(non_matching) + .await + .expect("send non-matching event failed"); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs (non-matching) failed"); + assert_eq!(runs_resp.status(), 200); + let runs_after_non_match: Vec = + runs_resp.json().await.expect("runs must be JSON array"); + assert!( + runs_after_non_match.is_empty(), + "non-matching message must NOT trigger a workflow run, but got {} run(s)", + runs_after_non_match.len() + ); + + // ── Step 3: Send a message that DOES match the filter ───────────────────── + let matching = nostr::EventBuilder::new( + Kind::Custom(40001), + "P1 alert: database is down", + [e_tag], + ) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client + .send_event(matching) + .await + .expect("send matching event failed"); + + tokio::time::sleep(Duration::from_secs(3)).await; + + let runs_resp2 = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs (matching) failed"); + assert_eq!(runs_resp2.status(), 200); + let runs_after_match: Vec = + runs_resp2.json().await.expect("runs must be JSON array"); + assert!( + !runs_after_match.is_empty(), + "matching message must trigger a workflow run" + ); + + let run = &runs_after_match[0]; + let status = run["status"].as_str().unwrap_or(""); + assert!( + matches!(status, "pending" | "running" | "completed" | "failed"), + "run status '{status}' is not a recognized value" + ); + + // Clean up. + let _ = ws_client.disconnect().await; + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} + // ── Test 4: Workflow CRUD (update + delete) ─────────────────────────────────── /// Full CRUD lifecycle: diff --git a/crates/sprout-workflow/Cargo.toml b/crates/sprout-workflow/Cargo.toml index f8cd14a42..8ab919528 100644 --- a/crates/sprout-workflow/Cargo.toml +++ b/crates/sprout-workflow/Cargo.toml @@ -22,5 +22,8 @@ tracing = { workspace = true } thiserror = { workspace = true } reqwest = { workspace = true, optional = true } +[dev-dependencies] +nostr = { workspace = true } + [features] reqwest = ["dep:reqwest"] diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index eb8204704..9d9d9550f 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -15,7 +15,7 @@ //! ## Usage //! //! ```rust,ignore -//! let engine = WorkflowEngine::new(db, WorkflowConfig::default()); +//! let engine = Arc::new(WorkflowEngine::new(db, WorkflowConfig::default())); //! //! // Parse and validate a YAML definition. //! let (def, json) = WorkflowEngine::parse_yaml(yaml_str)?; @@ -35,9 +35,11 @@ pub use error::WorkflowError; pub use executor::ExecutionResult; pub use schema::{ActionDef, Step, TriggerDef, WorkflowDef}; +use std::collections::HashMap; use std::sync::Arc; -use sprout_core::kind::{is_workflow_execution_kind, event_kind_u32}; +use sprout_core::kind::{is_workflow_execution_kind, event_kind_u32, KIND_REACTION}; +use sprout_db::workflow::RunStatus; use sprout_db::Db; use tokio::sync::Semaphore; @@ -96,8 +98,18 @@ impl WorkflowEngine { /// Checks whether any workflow in the event's channel has a matching trigger. /// Workflow execution events (kinds 46001–46012) are excluded to prevent loops. /// - /// Full trigger matching and execution spawning is wired in WF-07/08. - pub async fn on_event(&self, event: &sprout_core::StoredEvent) -> Result<(), WorkflowError> { + /// For each matching workflow: + /// 1. Evaluates the trigger filter expression (if present). + /// 2. Builds a [`executor::TriggerContext`] from the event. + /// 3. Creates a `workflow_run` row in the DB (status: `pending`). + /// 4. Spawns an async task to execute the run via [`executor::execute_run`]. + /// + /// The method takes `self: &Arc` so that the spawned task can hold a + /// clone of the `Arc` without requiring `'static` on `&self`. + pub async fn on_event( + self: &Arc, + event: &sprout_core::StoredEvent, + ) -> Result<(), WorkflowError> { let Some(channel_id) = event.channel_id else { return Ok(()); }; @@ -121,6 +133,10 @@ impl WorkflowEngine { return Ok(()); } + // Build TriggerContext once — all matching workflows in this channel + // share the same triggering event. + let trigger_ctx = build_trigger_context(event); + for workflow in &workflows { // Parse the stored JSON definition. let def: WorkflowDef = match serde_json::from_value(workflow.definition.clone()) { @@ -143,13 +159,198 @@ impl WorkflowEngine { continue; } - // TODO (WF-07): evaluate trigger filter expression, create workflow_run - // in DB, build TriggerContext from event, spawn execute_run(). + // Evaluate the trigger filter expression (MessagePosted only). + // A filter that evaluates to false skips this workflow entirely. + if let TriggerDef::MessagePosted { filter: Some(ref expr) } = def.trigger { + match executor::evaluate_condition(expr, &trigger_ctx, &HashMap::new()).await { + Ok(true) => {} + Ok(false) => { + tracing::debug!( + workflow_id = %workflow.id, + "Trigger filter evaluated false — skipping workflow" + ); + continue; + } + Err(e) => { + tracing::warn!( + workflow_id = %workflow.id, + "Trigger filter error: {e} — skipping workflow" + ); + continue; + } + } + } + + // Serialize TriggerContext for DB storage. + let trigger_ctx_json = match serde_json::to_value(&trigger_ctx) { + Ok(v) => v, + Err(e) => { + tracing::warn!( + workflow_id = %workflow.id, + "Failed to serialize TriggerContext: {e}" + ); + continue; + } + }; + + // Create the workflow_run row (status: pending). + let trigger_event_id_bytes = event.event.id.as_bytes().to_vec(); + let run_id = match self + .db + .create_workflow_run( + workflow.id, + Some(&trigger_event_id_bytes), + Some(&trigger_ctx_json), + ) + .await + { + Ok(id) => id, + Err(e) => { + tracing::error!( + workflow_id = %workflow.id, + "Failed to create workflow_run: {e}" + ); + continue; + } + }; + tracing::debug!( workflow_id = %workflow.id, - event_kind = kind_u32, - "Workflow trigger matched — execution wired in WF-07" + run_id = %run_id, + "Workflow triggered — spawning execution" ); + + // Spawn execution. Clone Arc so the task owns its references. + let engine = Arc::clone(self); + let def_clone = def.clone(); + let ctx_clone = trigger_ctx.clone(); + + tokio::spawn(async move { + // Acquire a concurrency permit before executing. + let _permit = match engine.run_semaphore.acquire().await { + Ok(p) => p, + Err(_) => { + tracing::error!(run_id = %run_id, "Semaphore closed — aborting run"); + let _ = engine + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + 0, + &serde_json::json!([]), + Some("Semaphore closed"), + ) + .await; + return; + } + }; + + // Transition to Running. + if let Err(e) = engine + .db + .update_workflow_run( + run_id, + RunStatus::Running, + 0, + &serde_json::json!([]), + None, + ) + .await + { + tracing::error!(run_id = %run_id, "Failed to set run to Running: {e}"); + // Mark as Failed so the run doesn't stay in `pending` forever. + let _ = engine + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + 0, + &serde_json::json!([]), + Some(&format!("Failed to transition to Running: {e}")), + ) + .await; + return; + } + + match executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await { + Ok(result) => { + let trace_json = match serde_json::to_value(&result.trace) { + Ok(v) => v, + Err(e) => { + tracing::warn!(run_id = %run_id, "Failed to serialize trace: {e}"); + serde_json::json!([]) + } + }; + let step_count = result.step_index as i32; + + if let Some(token) = result.approval_token { + // Run suspended awaiting human approval. + tracing::info!( + run_id = %run_id, + step_index = result.step_index, + "Workflow suspended — awaiting approval (token: )" + ); + // Store the approval record and update run status. + // Full approval wiring (emit kind:46010) is deferred to WF-08. + let _ = token; // token used by WF-08 approval handler + if let Err(e) = engine + .db + .update_workflow_run( + run_id, + RunStatus::WaitingApproval, + step_count, + &trace_json, + None, + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to WaitingApproval: {e}" + ); + } + } else { + // Normal completion. + tracing::info!(run_id = %run_id, "Workflow run completed"); + if let Err(e) = engine + .db + .update_workflow_run( + run_id, + RunStatus::Completed, + step_count, + &trace_json, + None, + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Completed: {e}" + ); + } + } + } + Err(e) => { + tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); + if let Err(db_err) = engine + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + 0, + &serde_json::json!([]), + Some(&e.to_string()), + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed: {db_err}" + ); + } + } + } + }); } Ok(()) @@ -157,20 +358,86 @@ impl WorkflowEngine { /// Background task for scheduled (cron) triggers. /// - /// Runs indefinitely. Checks cron schedules every minute and fires - /// matching workflows. - /// - /// TODO (WF-07): implement cron schedule matching and execution. + /// Runs indefinitely. Cron trigger matching requires a cross-channel + /// workflow query (`list_all_enabled_workflows`) that doesn't exist yet. + /// Interval triggers need last-run tracking. Both are deferred to WF-09. pub async fn run(&self) { loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; - // TODO (WF-07): load schedule-triggered workflows, check cron expressions, - // spawn executions for any that are due. - tracing::debug!("WorkflowEngine::run tick — cron check (not yet implemented)"); + // Cron trigger matching requires a cross-channel workflow query + // (list_all_enabled_workflows) that doesn't exist yet. Interval + // triggers need last-run tracking. Both are deferred to WF-09. + tracing::trace!("WorkflowEngine::run tick — cron/interval triggers not yet wired"); } } } +// ── Trigger context builder ─────────────────────────────────────────────────── + +/// Build a [`executor::TriggerContext`] from a [`sprout_core::StoredEvent`]. +/// +/// - `text` — event content (message body or reaction emoji character) +/// - `author` — pubkey hex string +/// - `channel_id` — channel UUID as string (empty if no channel scope) +/// - `timestamp` — Unix timestamp as string +/// - `emoji` — for `KIND_REACTION` events, the content is the emoji; otherwise empty +/// - `message_id` — for reactions, the target message's event ID (from `e` tag); +/// for all other events, the event's own ID +pub fn build_trigger_context(event: &sprout_core::StoredEvent) -> executor::TriggerContext { + let kind_u32 = event_kind_u32(&event.event); + let content = event.event.content.clone(); + + // For reaction events (NIP-25), the content field holds the emoji character + // or shortcode (e.g. "👍", "+", "-"). Expose it as `emoji`. + let emoji = if kind_u32 == KIND_REACTION { + content.clone() + } else { + String::new() + }; + + // For reactions (NIP-25), `message_id` should be the target message, not + // the reaction event itself. NIP-25 stores the target in an `e` tag whose + // value is a 64-char hex event ID (not a UUID channel reference). + let message_id = if kind_u32 == KIND_REACTION { + event + .event + .tags + .iter() + .find_map(|tag| { + let key = tag.kind().to_string(); + if key == "e" { + tag.content().and_then(|v| { + // Distinguish hex event IDs (64 chars) from UUID channel refs. + if v.len() == 64 && v.chars().all(|c| c.is_ascii_hexdigit()) { + Some(v.to_string()) + } else { + None + } + }) + } else { + None + } + }) + // Fallback to the reaction event's own ID if no valid `e` tag found. + .unwrap_or_else(|| event.event.id.to_hex()) + } else { + event.event.id.to_hex() + }; + + executor::TriggerContext { + text: content, + author: event.event.pubkey.to_hex(), + channel_id: event + .channel_id + .map(|id| id.to_string()) + .unwrap_or_default(), + timestamp: event.event.created_at.as_u64().to_string(), + emoji, + message_id, + webhook_fields: HashMap::new(), + } +} + // ── Trigger matching ────────────────────────────────────────────────────────── /// Returns `true` if the trigger type matches the given event kind. @@ -365,4 +632,109 @@ steps: assert_eq!(cfg.max_concurrent, 50); assert_eq!(cfg.default_timeout_secs, 600); } + + // ── build_trigger_context ───────────────────────────────────────────────── + + fn make_message_event() -> sprout_core::StoredEvent { + use nostr::{EventBuilder, Keys, Kind}; + use uuid::Uuid; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::Custom(40001), "hello world", []) + .sign_with_keys(&keys) + .expect("sign"); + sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())) + } + + /// Create a reaction event with an `e` tag pointing to a target message. + fn make_reaction_event() -> (sprout_core::StoredEvent, String) { + use nostr::{EventBuilder, Keys, Kind, Tag}; + use uuid::Uuid; + let keys = Keys::generate(); + // Create a dummy target message ID (64-char hex). + let target_keys = Keys::generate(); + let target_event = EventBuilder::new(Kind::Custom(40001), "target msg", []) + .sign_with_keys(&target_keys) + .expect("sign target"); + let target_id_hex = target_event.id.to_hex(); + // NIP-25: reaction references the target via an `e` tag. + let e_tag = Tag::parse(&["e", &target_id_hex]).expect("tag parse"); + let event = EventBuilder::new(Kind::Reaction, "👍", [e_tag]) + .sign_with_keys(&keys) + .expect("sign"); + (sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())), target_id_hex) + } + + #[test] + fn build_trigger_context_message_event() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + + assert_eq!(ctx.text, "hello world"); + assert_eq!(ctx.author, stored.event.pubkey.to_hex()); + assert_eq!( + ctx.channel_id, + stored.channel_id.unwrap().to_string() + ); + assert_eq!(ctx.timestamp, stored.event.created_at.as_u64().to_string()); + assert_eq!(ctx.message_id, stored.event.id.to_hex()); + // Non-reaction events have empty emoji. + assert_eq!(ctx.emoji, ""); + assert!(ctx.webhook_fields.is_empty()); + } + + #[test] + fn build_trigger_context_reaction_event() { + let (stored, target_id_hex) = make_reaction_event(); + let ctx = build_trigger_context(&stored); + + // For reactions, content IS the emoji. + assert_eq!(ctx.text, "👍"); + assert_eq!(ctx.emoji, "👍"); + assert_eq!(ctx.author, stored.event.pubkey.to_hex()); + // message_id should be the TARGET message, not the reaction event itself. + assert_eq!(ctx.message_id, target_id_hex); + assert_ne!(ctx.message_id, stored.event.id.to_hex()); + assert!(ctx.webhook_fields.is_empty()); + } + + #[test] + fn build_trigger_context_no_channel_id() { + use nostr::{EventBuilder, Keys, Kind}; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::Custom(40001), "msg", []) + .sign_with_keys(&keys) + .expect("sign"); + // channel_id = None (global/DM event) + let stored = sprout_core::StoredEvent::new(event, None); + let ctx = build_trigger_context(&stored); + + assert_eq!(ctx.channel_id, ""); + assert_eq!(ctx.text, "msg"); + } + + #[test] + fn build_trigger_context_author_is_hex_pubkey() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Pubkey hex is 64 lowercase hex characters. + assert_eq!(ctx.author.len(), 64); + assert!(ctx.author.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn build_trigger_context_message_id_is_hex() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Event ID hex is 64 lowercase hex characters. + assert_eq!(ctx.message_id.len(), 64); + assert!(ctx.message_id.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn build_trigger_context_timestamp_is_numeric_string() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Timestamp must parse as a u64. + ctx.timestamp.parse::().expect("timestamp should be a u64 string"); + } } From 4a5d8b73cb58972d7ea8d1078cd0f014c0903547 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 18:01:56 -0400 Subject: [PATCH 03/15] Round 2 crossfire fixes: semaphore, emoji filter, duration overflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix double semaphore: remove from on_event() spawned task, keep single acquire in execute_run() (now uses .acquire().await for queuing) - Enforce reaction emoji filter in on_event(): skip workflows whose configured emoji doesn't match the event content (NIP-25) - Duration overflow: use checked_mul for hours*3600 and mins*60 - Verified: evalexpr conditions use context variables, NOT template substitution — no injection vector Crossfire scores: GPT-5-4 6.5/10, Claude 6/10, Codex 4/10. Deferred to issues: trace durability, unbounded fan-out, SSRF IPv6, approval persistence (WF-08), stale run recovery, webhook HTTPS. All 280+ unit tests pass, clippy clean. --- crates/sprout-workflow/src/executor.rs | 16 ++++++++---- crates/sprout-workflow/src/lib.rs | 35 ++++++++++++-------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index c52d5fb27..7e64ce82d 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -640,13 +640,17 @@ pub(crate) fn parse_duration_secs(duration: &str) -> Result let hours: u64 = n.trim().parse().map_err(|_| { WorkflowError::InvalidDefinition(format!("invalid duration: {duration}")) })?; - return Ok(hours * 3600); + return hours.checked_mul(3600).ok_or_else(|| { + WorkflowError::InvalidDefinition(format!("duration overflow: {duration}")) + }); } if let Some(n) = duration.strip_suffix('m') { let mins: u64 = n.trim().parse().map_err(|_| { WorkflowError::InvalidDefinition(format!("invalid duration: {duration}")) })?; - return Ok(mins * 60); + return mins.checked_mul(60).ok_or_else(|| { + WorkflowError::InvalidDefinition(format!("duration overflow: {duration}")) + }); } if let Some(n) = duration.strip_suffix('s') { let secs: u64 = n.trim().parse().map_err(|_| { @@ -844,11 +848,13 @@ pub async fn execute_run( def: &WorkflowDef, trigger_ctx: &TriggerContext, ) -> Result { - // Acquire a concurrency permit. `try_acquire` is non-blocking — if all - // permits are in use we return CapacityExceeded rather than queuing. + // Acquire a concurrency permit. Waits if all permits are in use rather + // than failing immediately — event-driven and webhook triggers queue up + // during load spikes instead of returning errors. let _permit = engine .run_semaphore - .try_acquire() + .acquire() + .await .map_err(|_| WorkflowError::CapacityExceeded)?; execute_from_step(engine, run_id, def, trigger_ctx, 0, None).await diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 9d9d9550f..8a7950011 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -159,6 +159,22 @@ impl WorkflowEngine { continue; } + // Enforce reaction emoji filter: if the workflow specifies a specific + // emoji, skip events whose content doesn't match. NIP-25 stores the + // emoji character (or shortcode) in the event content field. + if let TriggerDef::ReactionAdded { emoji: Some(ref expected) } = def.trigger { + let actual = &trigger_ctx.emoji; + if actual != expected { + tracing::debug!( + workflow_id = %workflow.id, + expected_emoji = %expected, + actual_emoji = %actual, + "Reaction emoji mismatch — skipping workflow" + ); + continue; + } + } + // Evaluate the trigger filter expression (MessagePosted only). // A filter that evaluates to false skips this workflow entirely. if let TriggerDef::MessagePosted { filter: Some(ref expr) } = def.trigger { @@ -226,25 +242,6 @@ impl WorkflowEngine { let ctx_clone = trigger_ctx.clone(); tokio::spawn(async move { - // Acquire a concurrency permit before executing. - let _permit = match engine.run_semaphore.acquire().await { - Ok(p) => p, - Err(_) => { - tracing::error!(run_id = %run_id, "Semaphore closed — aborting run"); - let _ = engine - .db - .update_workflow_run( - run_id, - RunStatus::Failed, - 0, - &serde_json::json!([]), - Some("Semaphore closed"), - ) - .await; - return; - } - }; - // Transition to Running. if let Err(e) = engine .db From 5ca53dfd4ac3ab75277db44199be066077617a63 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 18:47:28 -0400 Subject: [PATCH 04/15] Fix CI: cargo fmt formatting --- crates/sprout-db/src/event.rs | 2 +- crates/sprout-db/src/feed.rs | 2 +- crates/sprout-mcp/src/server.rs | 10 ++++-- crates/sprout-relay/src/main.rs | 3 +- .../sprout-test-client/tests/e2e_workflows.rs | 32 ++++++++--------- crates/sprout-workflow/src/lib.rs | 35 +++++++++++++------ 6 files changed, 50 insertions(+), 34 deletions(-) diff --git a/crates/sprout-db/src/event.rs b/crates/sprout-db/src/event.rs index 5136c2ee2..b5594fdf8 100644 --- a/crates/sprout-db/src/event.rs +++ b/crates/sprout-db/src/event.rs @@ -9,7 +9,7 @@ use nostr::Event; use sqlx::{MySqlPool, QueryBuilder, Row}; use uuid::Uuid; -use sprout_core::kind::{KIND_AUTH, is_ephemeral, event_kind_i32}; +use sprout_core::kind::{event_kind_i32, is_ephemeral, KIND_AUTH}; use sprout_core::StoredEvent; use crate::error::{DbError, Result}; diff --git a/crates/sprout-db/src/feed.rs b/crates/sprout-db/src/feed.rs index 38965ffe0..b760e0bbc 100644 --- a/crates/sprout-db/src/feed.rs +++ b/crates/sprout-db/src/feed.rs @@ -407,7 +407,7 @@ mod tests { KIND_JOB_RESULT, ]; - use sprout_core::kind::{KIND_WORKFLOW_TRIGGERED, KIND_WORKFLOW_APPROVAL_DENIED}; + use sprout_core::kind::{KIND_WORKFLOW_APPROVAL_DENIED, KIND_WORKFLOW_TRIGGERED}; for kind in KIND_WORKFLOW_TRIGGERED..=KIND_WORKFLOW_APPROVAL_DENIED { assert!( !activity_kinds.contains(&kind), diff --git a/crates/sprout-mcp/src/server.rs b/crates/sprout-mcp/src/server.rs index 6bf8adfcf..2a31fecb1 100644 --- a/crates/sprout-mcp/src/server.rs +++ b/crates/sprout-mcp/src/server.rs @@ -1,4 +1,4 @@ -use sprout_core::kind::{KIND_CANVAS, event_kind_u32}; +use sprout_core::kind::{event_kind_u32, KIND_CANVAS}; use rmcp::{ handler::server::{router::tool::ToolRouter, wrapper::Parameters}, @@ -417,8 +417,12 @@ impl SproutMcpServer { Err(e) => return format!("Error building tag: {e}"), }; - let event = match nostr::EventBuilder::new(nostr::Kind::Custom(KIND_CANVAS as u16), &p.content, [e_tag]) - .sign_with_keys(&keys) + let event = match nostr::EventBuilder::new( + nostr::Kind::Custom(KIND_CANVAS as u16), + &p.content, + [e_tag], + ) + .sign_with_keys(&keys) { Ok(e) => e, Err(e) => return format!("Error signing event: {e}"), diff --git a/crates/sprout-relay/src/main.rs b/crates/sprout-relay/src/main.rs index 96f78b12b..854a0e16c 100644 --- a/crates/sprout-relay/src/main.rs +++ b/crates/sprout-relay/src/main.rs @@ -77,8 +77,7 @@ async fn main() -> anyhow::Result<()> { let search_config = SearchConfig { url: config.typesense_url.clone(), api_key: config.typesense_key.clone(), - collection: std::env::var("TYPESENSE_COLLECTION") - .unwrap_or_else(|_| "events".to_string()), + collection: std::env::var("TYPESENSE_COLLECTION").unwrap_or_else(|_| "events".to_string()), }; let search = SearchService::new(search_config); if let Err(e) = search.ensure_collection().await { diff --git a/crates/sprout-test-client/tests/e2e_workflows.rs b/crates/sprout-test-client/tests/e2e_workflows.rs index d8445e7b9..fecc0e22d 100644 --- a/crates/sprout-test-client/tests/e2e_workflows.rs +++ b/crates/sprout-test-client/tests/e2e_workflows.rs @@ -359,15 +359,15 @@ steps: .expect("ws connect failed"); let e_tag = Tag::parse(&["e", CHANNEL_GENERAL]).expect("tag parse failed"); - let event = nostr::EventBuilder::new( - Kind::Custom(40001), - "trigger this workflow please", - [e_tag], - ) - .sign_with_keys(&sender_keys) - .expect("sign event"); + let event = + nostr::EventBuilder::new(Kind::Custom(40001), "trigger this workflow please", [e_tag]) + .sign_with_keys(&sender_keys) + .expect("sign event"); - ws_client.send_event(event).await.expect("send event failed"); + ws_client + .send_event(event) + .await + .expect("send event failed"); // ── Step 3: Wait for the workflow engine to process the event ───────────── tokio::time::sleep(Duration::from_secs(3)).await; @@ -390,7 +390,10 @@ steps: let run = &runs[0]; assert!(run.get("id").is_some(), "run missing 'id'"); - assert!(run.get("workflow_id").is_some(), "run missing 'workflow_id'"); + assert!( + run.get("workflow_id").is_some(), + "run missing 'workflow_id'" + ); assert!(run.get("status").is_some(), "run missing 'status'"); let status = run["status"].as_str().unwrap_or(""); @@ -484,13 +487,10 @@ steps: ); // ── Step 3: Send a message that DOES match the filter ───────────────────── - let matching = nostr::EventBuilder::new( - Kind::Custom(40001), - "P1 alert: database is down", - [e_tag], - ) - .sign_with_keys(&sender_keys) - .expect("sign event"); + let matching = + nostr::EventBuilder::new(Kind::Custom(40001), "P1 alert: database is down", [e_tag]) + .sign_with_keys(&sender_keys) + .expect("sign event"); ws_client .send_event(matching) diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 8a7950011..05f4361cb 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -38,7 +38,7 @@ pub use schema::{ActionDef, Step, TriggerDef, WorkflowDef}; use std::collections::HashMap; use std::sync::Arc; -use sprout_core::kind::{is_workflow_execution_kind, event_kind_u32, KIND_REACTION}; +use sprout_core::kind::{event_kind_u32, is_workflow_execution_kind, KIND_REACTION}; use sprout_db::workflow::RunStatus; use sprout_db::Db; use tokio::sync::Semaphore; @@ -162,7 +162,10 @@ impl WorkflowEngine { // Enforce reaction emoji filter: if the workflow specifies a specific // emoji, skip events whose content doesn't match. NIP-25 stores the // emoji character (or shortcode) in the event content field. - if let TriggerDef::ReactionAdded { emoji: Some(ref expected) } = def.trigger { + if let TriggerDef::ReactionAdded { + emoji: Some(ref expected), + } = def.trigger + { let actual = &trigger_ctx.emoji; if actual != expected { tracing::debug!( @@ -177,7 +180,10 @@ impl WorkflowEngine { // Evaluate the trigger filter expression (MessagePosted only). // A filter that evaluates to false skips this workflow entirely. - if let TriggerDef::MessagePosted { filter: Some(ref expr) } = def.trigger { + if let TriggerDef::MessagePosted { + filter: Some(ref expr), + } = def.trigger + { match executor::evaluate_condition(expr, &trigger_ctx, &HashMap::new()).await { Ok(true) => {} Ok(false) => { @@ -522,7 +528,10 @@ steps: &trigger, sprout_core::kind::KIND_REACTION )); - assert!(!trigger_matches_event(&trigger, sprout_core::kind::KIND_WORKFLOW_TRIGGERED)); + assert!(!trigger_matches_event( + &trigger, + sprout_core::kind::KIND_WORKFLOW_TRIGGERED + )); } #[test] @@ -591,7 +600,9 @@ steps: let msg_trigger = TriggerDef::MessagePosted { filter: None }; let react_trigger = TriggerDef::ReactionAdded { emoji: None }; - for kind in sprout_core::kind::KIND_WORKFLOW_TRIGGERED..=sprout_core::kind::KIND_WORKFLOW_APPROVAL_DENIED { + for kind in sprout_core::kind::KIND_WORKFLOW_TRIGGERED + ..=sprout_core::kind::KIND_WORKFLOW_APPROVAL_DENIED + { assert!( !trigger_matches_event(&msg_trigger, kind), "message_posted should not match workflow execution kind {kind}" @@ -658,7 +669,10 @@ steps: let event = EventBuilder::new(Kind::Reaction, "👍", [e_tag]) .sign_with_keys(&keys) .expect("sign"); - (sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())), target_id_hex) + ( + sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())), + target_id_hex, + ) } #[test] @@ -668,10 +682,7 @@ steps: assert_eq!(ctx.text, "hello world"); assert_eq!(ctx.author, stored.event.pubkey.to_hex()); - assert_eq!( - ctx.channel_id, - stored.channel_id.unwrap().to_string() - ); + assert_eq!(ctx.channel_id, stored.channel_id.unwrap().to_string()); assert_eq!(ctx.timestamp, stored.event.created_at.as_u64().to_string()); assert_eq!(ctx.message_id, stored.event.id.to_hex()); // Non-reaction events have empty emoji. @@ -732,6 +743,8 @@ steps: let stored = make_message_event(); let ctx = build_trigger_context(&stored); // Timestamp must parse as a u64. - ctx.timestamp.parse::().expect("timestamp should be a u64 string"); + ctx.timestamp + .parse::() + .expect("timestamp should be a u64 string"); } } From db4a4cb7d5b443eeec089acb1b2d168ca6c57941 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 19:37:59 -0400 Subject: [PATCH 05/15] Address crossfire findings: semaphore backpressure, approval gates, reaction scoping, schedule rejection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Revert semaphore from acquire().await to try_acquire() for fail-fast backpressure - Move Running status transition into execute_run() after permit acquired - Replace zombie WaitingApproval with explicit Failed (approval gates deferred to WF-08) - Add derive_reaction_channel() for NIP-25 reactions — DB lookup of target event's channel - Reject schedule/interval triggers at create/update until cron scheduler is implemented - Add debug logging for events skipped due to missing channel_id --- crates/sprout-relay/src/api/workflows.rs | 18 ++++++ crates/sprout-relay/src/handlers/event.rs | 75 +++++++++++++++++++++++ crates/sprout-workflow/src/executor.rs | 21 +++++-- crates/sprout-workflow/src/lib.rs | 58 +++++------------- 4 files changed, 123 insertions(+), 49 deletions(-) diff --git a/crates/sprout-relay/src/api/workflows.rs b/crates/sprout-relay/src/api/workflows.rs index 4dffa8497..2813ddaf7 100644 --- a/crates/sprout-relay/src/api/workflows.rs +++ b/crates/sprout-relay/src/api/workflows.rs @@ -88,6 +88,15 @@ pub async fn create_workflow( ) })?; + // Reject schedule/interval triggers until the cron scheduler is implemented. + // Accepting them would create workflows that silently never execute. + if matches!(def.trigger, sprout_workflow::TriggerDef::Schedule { .. }) { + return Err(api_error( + StatusCode::BAD_REQUEST, + "schedule and interval triggers are not yet supported — use message_posted, reaction_added, or webhook triggers", + )); + } + validate_webhook_urls(&def) .await .map_err(|e| api_error(StatusCode::BAD_REQUEST, &e))?; @@ -201,6 +210,15 @@ pub async fn update_workflow( ) })?; + // Reject schedule/interval triggers until the cron scheduler is implemented. + // Accepting them would create workflows that silently never execute. + if matches!(def.trigger, sprout_workflow::TriggerDef::Schedule { .. }) { + return Err(api_error( + StatusCode::BAD_REQUEST, + "schedule and interval triggers are not yet supported — use message_posted, reaction_added, or webhook triggers", + )); + } + validate_webhook_urls(&def) .await .map_err(|e| api_error(StatusCode::BAD_REQUEST, &e))?; diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index 7dc5cb16e..4863323b9 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use hex; use tracing::{debug, error, info, warn}; use nostr::Event; @@ -115,6 +116,14 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc Option { + // Find the target event ID from NIP-25 `e` tags. + // Per NIP-25, the last `e` tag is the target (in case of threading). + let target_hex = event.tags.iter().rev().find_map(|tag| { + let key = tag.kind().to_string(); + if key == "e" { + tag.content().map(|s| s.to_string()) + } else { + None + } + })?; + + // Must be a 64-char hex string (event ID), not a UUID + if target_hex.len() != 64 { + return None; + } + + // Decode hex to bytes for DB lookup + let id_bytes = match hex::decode(&target_hex) { + Ok(b) if b.len() == 32 => b, + _ => return None, + }; + + // Look up the target event to get its channel_id + match db.get_event_by_id(&id_bytes).await { + Ok(Some(target_event)) => { + if let Some(ch_id) = target_event.channel_id { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + channel_id = %ch_id, + "Derived reaction channel from target event" + ); + Some(ch_id) + } else { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Target event has no channel — reaction will be global" + ); + None + } + } + Ok(None) => { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Target event not found — reaction will be global" + ); + None + } + Err(e) => { + tracing::warn!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Failed to look up target event: {e}" + ); + None + } + } +} + /// Extract a channel UUID from event tags. /// /// Checks both `"channel"` custom tags and `"e"` reference tags (clients use diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index 7e64ce82d..f24eebd21 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -842,21 +842,32 @@ pub struct ExecutionResult { /// /// Enforces `engine.config.max_concurrent` via a semaphore — returns /// [`WorkflowError::CapacityExceeded`] immediately if all permits are taken. +/// Transitions the run to `Running` after acquiring a permit. pub async fn execute_run( engine: &WorkflowEngine, run_id: Uuid, def: &WorkflowDef, trigger_ctx: &TriggerContext, ) -> Result { - // Acquire a concurrency permit. Waits if all permits are in use rather - // than failing immediately — event-driven and webhook triggers queue up - // during load spikes instead of returning errors. + // Fail fast if all concurrency permits are in use — no queuing. let _permit = engine .run_semaphore - .acquire() - .await + .try_acquire() .map_err(|_| WorkflowError::CapacityExceeded)?; + // Mark run as Running now that we have a permit. + engine + .db + .update_workflow_run( + run_id, + sprout_db::workflow::RunStatus::Running, + 0, + &serde_json::json!([]), + None, + ) + .await + .map_err(WorkflowError::from)?; + execute_from_step(engine, run_id, def, trigger_ctx, 0, None).await } diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 05f4361cb..99e2e7997 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -111,6 +111,11 @@ impl WorkflowEngine { event: &sprout_core::StoredEvent, ) -> Result<(), WorkflowError> { let Some(channel_id) = event.channel_id else { + tracing::debug!( + event_id = %event.event.id.to_hex(), + kind = event_kind_u32(&event.event), + "Skipping workflow trigger — event has no channel_id" + ); return Ok(()); }; @@ -248,33 +253,6 @@ impl WorkflowEngine { let ctx_clone = trigger_ctx.clone(); tokio::spawn(async move { - // Transition to Running. - if let Err(e) = engine - .db - .update_workflow_run( - run_id, - RunStatus::Running, - 0, - &serde_json::json!([]), - None, - ) - .await - { - tracing::error!(run_id = %run_id, "Failed to set run to Running: {e}"); - // Mark as Failed so the run doesn't stay in `pending` forever. - let _ = engine - .db - .update_workflow_run( - run_id, - RunStatus::Failed, - 0, - &serde_json::json!([]), - Some(&format!("Failed to transition to Running: {e}")), - ) - .await; - return; - } - match executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await { Ok(result) => { let trace_json = match serde_json::to_value(&result.trace) { @@ -286,32 +264,24 @@ impl WorkflowEngine { }; let step_count = result.step_index as i32; - if let Some(token) = result.approval_token { - // Run suspended awaiting human approval. - tracing::info!( + if result.approval_token.is_some() { + // Approval gates are not yet implemented (WF-08). + // Fail explicitly rather than creating unreachable WaitingApproval rows. + tracing::warn!( run_id = %run_id, step_index = result.step_index, - "Workflow suspended — awaiting approval (token: )" + "Workflow hit approval gate — not yet implemented, marking as failed" ); - // Store the approval record and update run status. - // Full approval wiring (emit kind:46010) is deferred to WF-08. - let _ = token; // token used by WF-08 approval handler - if let Err(e) = engine + let _ = engine .db .update_workflow_run( run_id, - RunStatus::WaitingApproval, + RunStatus::Failed, step_count, &trace_json, - None, + Some("approval gates not yet implemented — see WF-08"), ) - .await - { - tracing::error!( - run_id = %run_id, - "Failed to update run to WaitingApproval: {e}" - ); - } + .await; } else { // Normal completion. tracing::info!(run_id = %run_id, "Workflow run completed"); From 8d1f621913c8b85f8a99ce44660891b9a7137ffe Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 19:58:25 -0400 Subject: [PATCH 06/15] Round 2 crossfire fixes: unified semaphore, consistent approval handling, NIP-25 e-tag ordering - Remove Running pre-set from spawn_workflow_execution and approval resume (executor owns it) - Add semaphore to execute_from_step via private execute_steps helper - Fail approval gates consistently across all trigger paths (deferred to WF-08) - Fix build_trigger_context to use last e-tag per NIP-25 threading convention - Add regression test for multi-e-tag reaction scenarios --- crates/sprout-relay/src/api/approvals.rs | 67 ++----------------- .../sprout-relay/src/api/workflow_helpers.rs | 37 +++++----- crates/sprout-workflow/src/executor.rs | 36 +++++++++- crates/sprout-workflow/src/lib.rs | 30 +++++++++ 4 files changed, 92 insertions(+), 78 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index d7204e87d..bd45f8a99 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -142,20 +142,6 @@ async fn resume_workflow_after_approval( .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); - // Mark the run as Running again before resuming. - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Running, - resume_index as i32, - &run.execution_trace, - None, - ) - .await - { - tracing::error!("grant_approval: failed to set Running status for run {run_id}: {e}"); - } - match sprout_workflow::executor::execute_from_step( &engine, run_id, @@ -186,65 +172,26 @@ async fn resume_workflow_after_approval( } } Ok(result) => { - // Suspended again at another approval gate. - let next_token = match result.approval_token { - Some(t) => t, - None => { - tracing::error!( - "grant_approval: expected approval_token but got None for run {run_id}" - ); - return; - } - }; - let suspended_step_index = result.step_index; + // Chained approval gates are not yet fully implemented (see WF-08). + // Mark the run as Failed rather than silently creating a new approval + // record that nothing will ever resolve. let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); full_trace.extend(result.trace); let trace_json = serde_json::Value::Array(full_trace); - if let Err(e) = db .update_workflow_run( run_id, - sprout_db::workflow::RunStatus::WaitingApproval, - suspended_step_index as i32, + sprout_db::workflow::RunStatus::Failed, + result.step_index as i32, &trace_json, - None, + Some("approval gates not yet fully implemented — see WF-08"), ) .await { tracing::error!( - "grant_approval: failed to set WaitingApproval status for run {run_id}: {e}" + "grant_approval: failed to set Failed status for run {run_id}: {e}" ); } - - if let Some(suspended_step) = def.steps.get(suspended_step_index) { - let approver_spec = match &suspended_step.action { - sprout_workflow::ActionDef::RequestApproval { from, .. } => from.clone(), - _ => "any".to_string(), - }; - let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); - if let Err(e) = db - .create_approval(sprout_db::workflow::CreateApprovalParams { - token: &next_token, - workflow_id, - run_id, - step_id: &suspended_step.id, - step_index: suspended_step_index as i32, - approver_spec: &approver_spec, - expires_at, - }) - .await - { - tracing::error!( - "grant_approval: failed to create approval record for run {run_id}: {e}" - ); - } - } - - tracing::info!( - "workflow run {} suspended again at step {} (token: )", - run_id, - suspended_step_index, - ); } Err(e) => { tracing::error!("workflow run {run_id} failed after approval resume: {e}"); diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index e92ec9232..7f691f0a2 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -163,26 +163,12 @@ pub(crate) fn definition_hash(json_str: &str) -> Vec { pub(crate) fn spawn_workflow_execution( engine: Arc, db: sprout_db::Db, - workflow_id: uuid::Uuid, + _workflow_id: uuid::Uuid, run_id: uuid::Uuid, workflow_def_value: serde_json::Value, trigger_ctx: sprout_workflow::executor::TriggerContext, ) { tokio::spawn(async move { - // Transition to Running first — stamps started_at. - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Running, - 0, - &serde_json::Value::Array(vec![]), - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Running status: {e}"); - } - let def: sprout_workflow::WorkflowDef = match serde_json::from_value(workflow_def_value) { Ok(d) => d, Err(e) => { @@ -220,7 +206,24 @@ pub(crate) fn spawn_workflow_execution( } } Ok(result) => { - handle_approval_suspension(&db, &def, workflow_id, run_id, result).await; + // Approval gates are not yet fully implemented (WF-08). + // Fail explicitly rather than creating potentially orphaned WaitingApproval rows. + tracing::warn!( + "workflow run {run_id}: hit approval gate — not yet implemented, marking as failed" + ); + let trace_json = serde_json::Value::Array(result.trace); + if let Err(e) = db + .update_workflow_run( + run_id, + sprout_db::workflow::RunStatus::Failed, + result.step_index as i32, + &trace_json, + Some("approval gates not yet implemented — see WF-08"), + ) + .await + { + tracing::error!("workflow run {run_id}: failed to set Failed status: {e}"); + } } Err(e) => { tracing::error!("workflow run {run_id} failed: {e}"); @@ -242,6 +245,8 @@ pub(crate) fn spawn_workflow_execution( } /// Persist approval-gate suspension state and create the approval record. +/// Not called from the execution path yet — will be wired up when WF-08 is implemented. +#[allow(dead_code)] pub(crate) async fn handle_approval_suspension( db: &sprout_db::Db, def: &sprout_workflow::WorkflowDef, diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index f24eebd21..820bf0614 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -868,10 +868,14 @@ pub async fn execute_run( .await .map_err(WorkflowError::from)?; - execute_from_step(engine, run_id, def, trigger_ctx, 0, None).await + execute_steps(engine, run_id, def, trigger_ctx, 0, None).await } -/// Execute starting from a specific step index (used for approval resume). +/// Resume execution from a specific step index (used for approval resume). +/// +/// Acquires a concurrency permit from `engine.run_semaphore` before executing — +/// returns [`WorkflowError::CapacityExceeded`] immediately if all permits are +/// taken. /// /// `initial_outputs` should be reconstructed from the execution trace before /// calling this function on resume, so that steps after the resume point can @@ -883,6 +887,34 @@ pub async fn execute_from_step( trigger_ctx: &TriggerContext, start_index: usize, initial_outputs: Option>, +) -> Result { + // Fail fast if all concurrency permits are in use — no queuing. + let _permit = engine + .run_semaphore + .try_acquire() + .map_err(|_| WorkflowError::CapacityExceeded)?; + + execute_steps( + engine, + run_id, + def, + trigger_ctx, + start_index, + initial_outputs, + ) + .await +} + +/// Internal: execute workflow steps starting from `start_index`, without +/// acquiring the semaphore. Called by both [`execute_run`] and +/// [`execute_from_step`] after they have already acquired a permit. +async fn execute_steps( + engine: &WorkflowEngine, + run_id: Uuid, + def: &WorkflowDef, + trigger_ctx: &TriggerContext, + start_index: usize, + initial_outputs: Option>, ) -> Result { let mut step_outputs: HashMap = initial_outputs.unwrap_or_default(); let mut trace: Vec = Vec::new(); diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 99e2e7997..8af7537a9 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -371,11 +371,13 @@ pub fn build_trigger_context(event: &sprout_core::StoredEvent) -> executor::Trig // For reactions (NIP-25), `message_id` should be the target message, not // the reaction event itself. NIP-25 stores the target in an `e` tag whose // value is a 64-char hex event ID (not a UUID channel reference). + // Per NIP-25, the last `e` tag is the direct target (earlier ones may be thread roots). let message_id = if kind_u32 == KIND_REACTION { event .event .tags .iter() + .rev() .find_map(|tag| { let key = tag.kind().to_string(); if key == "e" { @@ -717,4 +719,32 @@ steps: .parse::() .expect("timestamp should be a u64 string"); } + + #[test] + fn test_build_trigger_context_reaction_multiple_e_tags() { + // NIP-25: last e tag is the direct target, first may be thread root + use nostr::{EventBuilder, EventId, Keys, Kind, Tag}; + use uuid::Uuid; + + let keys = Keys::generate(); + let thread_root_id = EventId::all_zeros(); + let direct_target_id = EventId::from_byte_array([0x42; 32]); + + let event = EventBuilder::new( + Kind::Reaction, + "👍", + [ + Tag::parse(&["e", &thread_root_id.to_hex()]).unwrap(), + Tag::parse(&["e", &direct_target_id.to_hex()]).unwrap(), + ], + ) + .sign_with_keys(&keys) + .expect("sign"); + + let stored = sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())); + let ctx = build_trigger_context(&stored); + + // Should pick the LAST e tag (direct target), not the first (thread root) + assert_eq!(ctx.message_id, direct_target_id.to_hex()); + } } From 70731313ef44ef9ac9cc82fabb48847ef3a838a1 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 20:09:34 -0400 Subject: [PATCH 07/15] Round 3 crossfire fixes: reaction channel security, resume Running status, partial trace preservation - Always derive reaction channel from target event DB lookup (prevent channel tag spoofing) - Set RunStatus::Running in execute_from_step after semaphore (approval resume path) - Preserve resume_index and existing trace on resume failure instead of resetting to 0 - Update lifecycle comment in workflow_helpers.rs to reflect current behavior --- crates/sprout-relay/src/api/approvals.rs | 11 ++++++++--- crates/sprout-relay/src/api/workflow_helpers.rs | 3 ++- crates/sprout-relay/src/handlers/event.rs | 10 ++++------ crates/sprout-workflow/src/executor.rs | 16 ++++++++++++++++ 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index bd45f8a99..cd546d569 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -194,14 +194,19 @@ async fn resume_workflow_after_approval( } } Err(e) => { - tracing::error!("workflow run {run_id} failed after approval resume: {e}"); + tracing::error!( + "grant_approval: resume of run {run_id} failed at step >= {resume_index}: {e}" + ); + // Note: partial trace from steps executed after resume is lost on error. + // The executor error type does not carry partial results. + // TODO(WF-08): Consider returning partial trace in WorkflowError. if let Err(db_err) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Failed, resume_index as i32, - &run.execution_trace, - Some(&e.to_string()), + &run.execution_trace, // preserve existing trace + Some(&format!("execution failed after approval resume: {e}")), ) .await { diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index 7f691f0a2..ecf20f0a3 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -158,7 +158,8 @@ pub(crate) fn definition_hash(json_str: &str) -> Vec { /// Spawn an async workflow execution task. /// -/// Handles the full lifecycle: Running → Completed / WaitingApproval / Failed. +/// Handles the full lifecycle: Pending → (executor sets Running) → Completed / Failed. +/// Approval gates are not yet implemented (WF-08) — runs hitting approval steps are marked Failed. /// Used by trigger and webhook paths to avoid code duplication. pub(crate) fn spawn_workflow_execution( engine: Arc, diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index 4863323b9..ee2b607eb 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -114,14 +114,12 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc Date: Mon, 9 Mar 2026 20:13:05 -0400 Subject: [PATCH 08/15] Round 4 crossfire fixes: preserve trace on resume, fail-closed reaction scoping - Preserve existing execution trace when marking resumed run as Running - Reject reactions when target event cannot be resolved (fail closed, not open) - Prevents authorization downgrade on DB lookup errors for reactions --- crates/sprout-relay/src/handlers/event.rs | 17 ++++++++++++++++- crates/sprout-workflow/src/executor.rs | 9 ++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index ee2b607eb..715483879 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -117,7 +117,22 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc Some(ch_id), + None => { + warn!( + event_id = %event_id_hex, + "Rejecting reaction: target event not found or has no channel" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "invalid: reaction target event not found or not in a channel", + )); + return; + } + } } else { extract_channel_id(&event) }; diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index cecb17277..715640606 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -898,13 +898,20 @@ pub async fn execute_from_step( .map_err(|_| WorkflowError::CapacityExceeded)?; // Mark run as Running now that we have a permit (resume from approval). + // Preserve the existing execution trace from pre-approval steps. + let existing_trace = engine + .db + .get_workflow_run(run_id) + .await + .map(|r| r.execution_trace) + .unwrap_or_else(|_| serde_json::json!([])); engine .db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Running, start_index as i32, - &serde_json::json!([]), + &existing_trace, None, ) .await From 1e0d532c5fed9df1e78e395b63b2f85868097431 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 20:15:43 -0400 Subject: [PATCH 09/15] Preserve partial execution progress on run failures - Read existing trace/step from DB before marking run as Failed - Prevents loss of partial progress when executor errors mid-run - Applied to both event-triggered (lib.rs) and API-triggered (workflow_helpers.rs) paths --- crates/sprout-relay/src/api/workflow_helpers.rs | 9 +++++++-- crates/sprout-workflow/src/lib.rs | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index ecf20f0a3..3f3739f74 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -228,12 +228,17 @@ pub(crate) fn spawn_workflow_execution( } Err(e) => { tracing::error!("workflow run {run_id} failed: {e}"); + // Preserve any partial progress (trace/step) already written by the executor. + let (step, trace) = match db.get_workflow_run(run_id).await { + Ok(r) => (r.current_step, r.execution_trace), + Err(_) => (0, serde_json::json!([])), + }; if let Err(db_err) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Failed, - 0, - &serde_json::Value::Null, + step, + &trace, Some(&e.to_string()), ) .await diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 8af7537a9..3923e7d4c 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -305,13 +305,18 @@ impl WorkflowEngine { } Err(e) => { tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); + // Preserve any partial progress (trace/step) already written by the executor. + let (step, trace) = match engine.db.get_workflow_run(run_id).await { + Ok(r) => (r.current_step, r.execution_trace), + Err(_) => (0, serde_json::json!([])), + }; if let Err(db_err) = engine .db .update_workflow_run( run_id, RunStatus::Failed, - 0, - &serde_json::json!([]), + step, + &trace, Some(&e.to_string()), ) .await From 3be9f7cec6d7010f21fc156f19feca4f61dc39b1 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 21:08:30 -0400 Subject: [PATCH 10/15] Fix E2E test channel UUIDs to match UUID5-derived DB values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All 3 E2E test files used stale placeholder UUIDs (aaaaaaaa-...) that don't exist in the database. The actual channels use UUID5-derived IDs seeded at startup. Since the channel didn't exist, get_channel() returned None, the open-visibility check defaulted to false, and the API returned 403. Changes: - CHANNEL_GENERAL: aaaaaaaa-...-aaa1 → 9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50 - CHANNEL_PROJECTS → CHANNEL_ENGINEERING: 1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9 - Updated doc comments and assertion messages All 31 E2E tests pass (6 workflow + 18 REST API + 7 MCP). --- crates/sprout-test-client/tests/e2e_mcp.rs | 10 +++++----- crates/sprout-test-client/tests/e2e_rest_api.rs | 14 +++++++------- crates/sprout-test-client/tests/e2e_workflows.rs | 6 +++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/sprout-test-client/tests/e2e_mcp.rs b/crates/sprout-test-client/tests/e2e_mcp.rs index 4c2cbb100..29470b92c 100644 --- a/crates/sprout-test-client/tests/e2e_mcp.rs +++ b/crates/sprout-test-client/tests/e2e_mcp.rs @@ -29,10 +29,10 @@ use std::time::Duration; use serde_json::{json, Value}; -// ── Seeded channel IDs (stable across relay restarts) ───────────────────────── +// ── Seeded channel IDs (UUID5-derived, stable across relay restarts) ────────── -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; -const CHANNEL_PROJECTS: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa2"; +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; +const CHANNEL_ENGINEERING: &str = "1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9"; // ── Helpers ─────────────────────────────────────────────────────────────────── @@ -546,7 +546,7 @@ async fn test_mcp_create_and_trigger_workflow() { let create_resp = session.call_tool( "create_workflow", json!({ - "channel_id": CHANNEL_PROJECTS, + "channel_id": CHANNEL_ENGINEERING, "yaml_definition": yaml_definition, }), ); @@ -588,7 +588,7 @@ async fn test_mcp_create_and_trigger_workflow() { let list_resp = session.call_tool( "list_workflows", json!({ - "channel_id": CHANNEL_PROJECTS, + "channel_id": CHANNEL_ENGINEERING, }), ); diff --git a/crates/sprout-test-client/tests/e2e_rest_api.rs b/crates/sprout-test-client/tests/e2e_rest_api.rs index 5c36e0a59..0f1430591 100644 --- a/crates/sprout-test-client/tests/e2e_rest_api.rs +++ b/crates/sprout-test-client/tests/e2e_rest_api.rs @@ -22,7 +22,7 @@ //! //! The relay does not expose a REST endpoint to create channels — channels are //! created via the DB (seeded at startup). Tests use the pre-seeded open -//! channels (`general`, `agents`, `projects`, etc.) for read operations and +//! channels (`general`, `agents`, `engineering`, etc.) for read operations and //! send messages via WebSocket to set up search / feed data. use std::time::Duration; @@ -65,10 +65,10 @@ async fn authed_get(client: &Client, url: &str, pubkey_hex: &str) -> reqwest::Re /// Known open channel IDs seeded in the dev database. /// -/// These are stable across relay restarts because they are inserted with -/// explicit UUIDs in the seed migration. -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; -const CHANNEL_PROJECTS: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa2"; +/// These are UUID5-derived from the channel name and are stable across relay +/// restarts as long as the seed data uses the same namespace + name inputs. +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; +const CHANNEL_ENGINEERING: &str = "1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9"; // ── Channel tests ───────────────────────────────────────────────────────────── @@ -152,8 +152,8 @@ async fn test_channel_visibility_open_channels_visible_to_all() { "expected seeded 'general' channel (id={CHANNEL_GENERAL})" ); assert!( - ids_a.contains(CHANNEL_PROJECTS), - "expected seeded 'projects' channel (id={CHANNEL_PROJECTS})" + ids_a.contains(CHANNEL_ENGINEERING), + "expected seeded 'engineering' channel (id={CHANNEL_ENGINEERING})" ); } diff --git a/crates/sprout-test-client/tests/e2e_workflows.rs b/crates/sprout-test-client/tests/e2e_workflows.rs index fecc0e22d..8553b91fd 100644 --- a/crates/sprout-test-client/tests/e2e_workflows.rs +++ b/crates/sprout-test-client/tests/e2e_workflows.rs @@ -47,9 +47,9 @@ fn http_client() -> Client { /// Known open channel IDs seeded in the dev database. /// -/// These are stable across relay restarts because they are inserted with -/// explicit UUIDs in the seed migration. -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; +/// These are UUID5-derived from the channel name and are stable across relay +/// restarts as long as the seed data uses the same namespace + name inputs. +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; /// A seeded user pubkey that exists in the `users` table. /// From c42a134d5f127f728395a53d67ec58f1b0a6bdc1 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 21:41:43 -0400 Subject: [PATCH 11/15] Crossfire round 6: partial trace preservation, reaction channel semantics, error logging Fixes all critical and important issues from the 3-model crossfire review: 1. Partial trace preservation (Codex + Gemini): - Added PartialProgress struct to carry step_index + trace in error path - execute_steps() now returns (WorkflowError, PartialProgress) on failure - All callers (on_event, spawn_workflow_execution, approval resume) use the executor's partial trace instead of re-reading stale DB state - Trace is never lost on mid-execution failures 2. Reaction channel semantics (Gemini + Opus): - derive_reaction_channel() now returns ReactionChannelResult enum with 5 variants: Channel, NoChannel, NotFound, NoTarget, DbError - Caller distinguishes 'target exists but is global' (allow) from 'target not found' (reject) and 'DB error' (reject with server error) - Reactions to global/DM messages are no longer incorrectly rejected 3. Silent error drop (Opus): - Replaced let _ = on approval gate DB update with if let Err(e) logging - Consistent with all other DB update error handling paths 4. Null trace consistency (Codex): - Changed Value::Null to json!([]) in definition-parse failure path - All trace fields now consistently use JSON arrays --- crates/sprout-relay/src/api/approvals.rs | 16 ++-- .../sprout-relay/src/api/workflow_helpers.rs | 16 ++-- crates/sprout-relay/src/handlers/event.rs | 87 ++++++++++++++---- crates/sprout-workflow/src/error.rs | 22 +++++ crates/sprout-workflow/src/executor.rs | 92 ++++++++++++++----- crates/sprout-workflow/src/lib.rs | 34 ++++--- 6 files changed, 196 insertions(+), 71 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index cd546d569..bb3ad4c98 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -193,19 +193,21 @@ async fn resume_workflow_after_approval( ); } } - Err(e) => { + Err((e, progress)) => { tracing::error!( - "grant_approval: resume of run {run_id} failed at step >= {resume_index}: {e}" + "grant_approval: resume of run {run_id} failed at step {}: {e}", + progress.step_index ); - // Note: partial trace from steps executed after resume is lost on error. - // The executor error type does not carry partial results. - // TODO(WF-08): Consider returning partial trace in WorkflowError. + // Merge pre-approval trace with partial post-approval trace from the executor. + let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); + full_trace.extend(progress.trace); + let trace_json = serde_json::Value::Array(full_trace); if let Err(db_err) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Failed, - resume_index as i32, - &run.execution_trace, // preserve existing trace + progress.step_index as i32, + &trace_json, Some(&format!("execution failed after approval resume: {e}")), ) .await diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index 3f3739f74..3a43b7b41 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -179,7 +179,7 @@ pub(crate) fn spawn_workflow_execution( run_id, sprout_db::workflow::RunStatus::Failed, 0, - &serde_json::Value::Null, + &serde_json::json!([]), Some(&format!("definition parse error: {e}")), ) .await @@ -226,19 +226,17 @@ pub(crate) fn spawn_workflow_execution( tracing::error!("workflow run {run_id}: failed to set Failed status: {e}"); } } - Err(e) => { + Err((e, progress)) => { tracing::error!("workflow run {run_id} failed: {e}"); - // Preserve any partial progress (trace/step) already written by the executor. - let (step, trace) = match db.get_workflow_run(run_id).await { - Ok(r) => (r.current_step, r.execution_trace), - Err(_) => (0, serde_json::json!([])), - }; + // Use partial trace from the executor — contains steps + // completed/skipped before the failure. + let trace_json = serde_json::Value::Array(progress.trace); if let Err(db_err) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Failed, - step, - &trace, + progress.step_index as i32, + &trace_json, Some(&e.to_string()), ) .await diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index 715483879..bdfca5c49 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -117,18 +117,50 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc Some(ch_id), - None => { + ReactionChannelResult::Channel(ch_id) => Some(ch_id), + ReactionChannelResult::NoChannel => { + // Target event exists but has no channel (global/DM message). + // Allow the reaction to proceed without channel scoping. + None + } + ReactionChannelResult::NotFound => { + // Fail closed: reject reactions to events we don't know about. warn!( event_id = %event_id_hex, - "Rejecting reaction: target event not found or has no channel" + "Rejecting reaction: target event not found in DB" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "invalid: reaction target event not found", + )); + return; + } + ReactionChannelResult::NoTarget => { + // Malformed reaction: no valid `e` tag. + warn!( + event_id = %event_id_hex, + "Rejecting reaction: no valid e tag referencing target event" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "invalid: reaction must reference a target event via e tag", + )); + return; + } + ReactionChannelResult::DbError(ref err) => { + // Fail closed on transient DB errors — don't allow reactions + // through when we can't verify the target. + error!( + event_id = %event_id_hex, + "Rejecting reaction: database error looking up target: {err}" ); conn.send(RelayMessage::ok( &event_id_hex, false, - "invalid: reaction target event not found or not in a channel", + "error: internal error looking up reaction target", )); return; } @@ -370,31 +402,54 @@ async fn check_channel_membership( } } +/// Result of resolving a reaction's target channel. +enum ReactionChannelResult { + /// Target event found and has a channel_id. + Channel(uuid::Uuid), + /// Target event found but has no channel (global/DM message) — allow as global. + NoChannel, + /// Target event not found in DB — reject (fail closed). + NotFound, + /// No valid `e` tag on the reaction — reject (malformed). + NoTarget, + /// DB error during lookup — reject (fail closed on transient errors). + DbError(String), +} + /// For NIP-25 reactions, derive the channel_id from the target event. /// /// Reactions reference their target via an `e` tag containing a 64-hex event ID. /// We look up that event in the DB to find its channel_id. -async fn derive_reaction_channel(db: &sprout_db::Db, event: &nostr::Event) -> Option { +/// +/// Returns a [`ReactionChannelResult`] so the caller can distinguish between +/// "target exists but is global" (allow) and "target not found" (reject). +async fn derive_reaction_channel( + db: &sprout_db::Db, + event: &nostr::Event, +) -> ReactionChannelResult { // Find the target event ID from NIP-25 `e` tags. // Per NIP-25, the last `e` tag is the target (in case of threading). - let target_hex = event.tags.iter().rev().find_map(|tag| { + let target_hex = match event.tags.iter().rev().find_map(|tag| { let key = tag.kind().to_string(); if key == "e" { tag.content().map(|s| s.to_string()) } else { None } - })?; + }) { + Some(h) => h, + None => return ReactionChannelResult::NoTarget, + }; // Must be a 64-char hex string (event ID), not a UUID if target_hex.len() != 64 { - return None; + return ReactionChannelResult::NoTarget; } // Decode hex to bytes for DB lookup let id_bytes = match hex::decode(&target_hex) { Ok(b) if b.len() == 32 => b, - _ => return None, + _ => return ReactionChannelResult::NoTarget, }; // Look up the target event to get its channel_id @@ -407,23 +462,23 @@ async fn derive_reaction_channel(db: &sprout_db::Db, event: &nostr::Event) -> Op channel_id = %ch_id, "Derived reaction channel from target event" ); - Some(ch_id) + ReactionChannelResult::Channel(ch_id) } else { tracing::debug!( reaction_id = %event.id.to_hex(), target_id = %target_hex, - "Target event has no channel — reaction will be global" + "Target event has no channel — allowing as global reaction" ); - None + ReactionChannelResult::NoChannel } } Ok(None) => { tracing::debug!( reaction_id = %event.id.to_hex(), target_id = %target_hex, - "Target event not found — reaction will be global" + "Target event not found in DB" ); - None + ReactionChannelResult::NotFound } Err(e) => { tracing::warn!( @@ -431,7 +486,7 @@ async fn derive_reaction_channel(db: &sprout_db::Db, event: &nostr::Event) -> Op target_id = %target_hex, "Failed to look up target event: {e}" ); - None + ReactionChannelResult::DbError(e.to_string()) } } } diff --git a/crates/sprout-workflow/src/error.rs b/crates/sprout-workflow/src/error.rs index 0e56e76a0..f00aa0d9c 100644 --- a/crates/sprout-workflow/src/error.rs +++ b/crates/sprout-workflow/src/error.rs @@ -2,6 +2,18 @@ use thiserror::Error; +/// Partial execution progress captured when a workflow step fails mid-run. +/// +/// This allows callers to persist whatever trace was accumulated before the +/// error, rather than losing it when the in-memory `Vec` is dropped. +#[derive(Debug, Default)] +pub struct PartialProgress { + /// Index of the step that failed (0-based). + pub step_index: usize, + /// Trace entries for steps completed/skipped before the failure. + pub trace: Vec, +} + /// Errors produced by the workflow engine. #[derive(Debug, Error)] pub enum WorkflowError { @@ -43,6 +55,16 @@ pub enum WorkflowError { Database(String), } +impl WorkflowError { + /// Attach partial execution progress to this error. + /// + /// Returns a `(WorkflowError, PartialProgress)` tuple that callers can + /// destructure to persist the trace before marking the run as failed. + pub fn with_progress(self, progress: PartialProgress) -> (Self, PartialProgress) { + (self, progress) + } +} + impl From for WorkflowError { fn from(e: sprout_db::error::DbError) -> Self { WorkflowError::Database(e.to_string()) diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index 715640606..61efa629b 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -848,12 +848,14 @@ pub async fn execute_run( run_id: Uuid, def: &WorkflowDef, trigger_ctx: &TriggerContext, -) -> Result { +) -> Result { // Fail fast if all concurrency permits are in use — no queuing. - let _permit = engine - .run_semaphore - .try_acquire() - .map_err(|_| WorkflowError::CapacityExceeded)?; + let _permit = engine.run_semaphore.try_acquire().map_err(|_| { + ( + WorkflowError::CapacityExceeded, + crate::error::PartialProgress::default(), + ) + })?; // Mark run as Running now that we have a permit. engine @@ -866,7 +868,12 @@ pub async fn execute_run( None, ) .await - .map_err(WorkflowError::from)?; + .map_err(|e| { + ( + WorkflowError::from(e), + crate::error::PartialProgress::default(), + ) + })?; execute_steps(engine, run_id, def, trigger_ctx, 0, None).await } @@ -890,12 +897,14 @@ pub async fn execute_from_step( trigger_ctx: &TriggerContext, start_index: usize, initial_outputs: Option>, -) -> Result { +) -> Result { // Fail fast if all concurrency permits are in use — no queuing. - let _permit = engine - .run_semaphore - .try_acquire() - .map_err(|_| WorkflowError::CapacityExceeded)?; + let _permit = engine.run_semaphore.try_acquire().map_err(|_| { + ( + WorkflowError::CapacityExceeded, + crate::error::PartialProgress::default(), + ) + })?; // Mark run as Running now that we have a permit (resume from approval). // Preserve the existing execution trace from pre-approval steps. @@ -915,7 +924,12 @@ pub async fn execute_from_step( None, ) .await - .map_err(WorkflowError::from)?; + .map_err(|e| { + ( + WorkflowError::from(e), + crate::error::PartialProgress::default(), + ) + })?; execute_steps( engine, @@ -931,6 +945,9 @@ pub async fn execute_from_step( /// Internal: execute workflow steps starting from `start_index`, without /// acquiring the semaphore. Called by both [`execute_run`] and /// [`execute_from_step`] after they have already acquired a permit. +/// +/// On error, returns `(WorkflowError, PartialProgress)` so callers can persist +/// the trace of steps completed before the failure. async fn execute_steps( engine: &WorkflowEngine, run_id: Uuid, @@ -938,7 +955,7 @@ async fn execute_steps( trigger_ctx: &TriggerContext, start_index: usize, initial_outputs: Option>, -) -> Result { +) -> Result { let mut step_outputs: HashMap = initial_outputs.unwrap_or_default(); let mut trace: Vec = Vec::new(); @@ -964,27 +981,60 @@ async fn execute_steps( } Err(e) => { warn!(run_id = %run_id, step = %step.id, "Condition error: {e}"); - return Err(e); + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); } } } // 2. Resolve template variables. - let resolved_action = resolve_step_templates(step, trigger_ctx, &step_outputs)?; + let resolved_action = match resolve_step_templates(step, trigger_ctx, &step_outputs) { + Ok(a) => a, + Err(e) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); + } + }; // 3. Dispatch action (with per-step timeout). let timeout_secs = step .timeout_secs .unwrap_or(engine.config.default_timeout_secs); - let result = tokio::time::timeout( + let dispatch_result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), dispatch_action(&step.id, &resolved_action, engine, run_id), ) - .await - .map_err(|_| WorkflowError::StepTimeout { - step_id: step.id.clone(), - timeout_secs, - })??; + .await; + + let result = match dispatch_result { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); + } + Err(_timeout) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err(( + WorkflowError::StepTimeout { + step_id: step.id.clone(), + timeout_secs, + }, + progress, + )); + } + }; match result { StepResult::Completed(output) => { diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 3923e7d4c..6dcbc1daa 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -31,7 +31,7 @@ pub mod error; pub mod executor; pub mod schema; -pub use error::WorkflowError; +pub use error::{PartialProgress, WorkflowError}; pub use executor::ExecutionResult; pub use schema::{ActionDef, Step, TriggerDef, WorkflowDef}; @@ -255,13 +255,7 @@ impl WorkflowEngine { tokio::spawn(async move { match executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await { Ok(result) => { - let trace_json = match serde_json::to_value(&result.trace) { - Ok(v) => v, - Err(e) => { - tracing::warn!(run_id = %run_id, "Failed to serialize trace: {e}"); - serde_json::json!([]) - } - }; + let trace_json = serde_json::Value::Array(result.trace); let step_count = result.step_index as i32; if result.approval_token.is_some() { @@ -272,7 +266,7 @@ impl WorkflowEngine { step_index = result.step_index, "Workflow hit approval gate — not yet implemented, marking as failed" ); - let _ = engine + if let Err(e) = engine .db .update_workflow_run( run_id, @@ -281,7 +275,13 @@ impl WorkflowEngine { &trace_json, Some("approval gates not yet implemented — see WF-08"), ) - .await; + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed (approval gate): {e}" + ); + } } else { // Normal completion. tracing::info!(run_id = %run_id, "Workflow run completed"); @@ -303,20 +303,18 @@ impl WorkflowEngine { } } } - Err(e) => { + Err((e, progress)) => { tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); - // Preserve any partial progress (trace/step) already written by the executor. - let (step, trace) = match engine.db.get_workflow_run(run_id).await { - Ok(r) => (r.current_step, r.execution_trace), - Err(_) => (0, serde_json::json!([])), - }; + // Use partial trace from the executor — contains steps + // completed/skipped before the failure. + let trace_json = serde_json::Value::Array(progress.trace); if let Err(db_err) = engine .db .update_workflow_run( run_id, RunStatus::Failed, - step, - &trace, + progress.step_index as i32, + &trace_json, Some(&e.to_string()), ) .await From 375a3ee8f5c469c336fc3a030ab669b1734ac1f8 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 21:50:56 -0400 Subject: [PATCH 12/15] Crossfire round 7: approval status guard, tag filtering alignment, DB error logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes from round 2 crossfire (Codex 6/10, Opus 8/10): 1. Approval resume status guard (Codex): - Validate run.status == WaitingApproval before resuming - Prevents stale approval tokens from resurrecting cancelled/failed/completed runs 2. Reaction tag filtering alignment (Opus): - derive_reaction_channel now filters for 64-char hex inside find_map - Consistent with build_trigger_context's approach — skips UUID channel refs - Fixes edge case where last e-tag is a channel UUID 3. Silent DB error in execute_from_step (Opus): - Log warning when get_workflow_run fails during resume - Pre-approval trace loss is now visible in logs 4. Dead code removal (Opus): - Removed unused with_progress() helper on WorkflowError --- crates/sprout-relay/src/api/approvals.rs | 10 ++++++++++ crates/sprout-relay/src/handlers/event.rs | 17 ++++++++++------- crates/sprout-workflow/src/error.rs | 10 ---------- crates/sprout-workflow/src/executor.rs | 16 ++++++++++------ 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index bb3ad4c98..3564a90b0 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -87,6 +87,16 @@ async fn resume_workflow_after_approval( } }; + // Guard: only resume runs that are actually waiting for approval. + // A stale approval token could otherwise resurrect a cancelled/failed/completed run. + if run.status != sprout_db::workflow::RunStatus::WaitingApproval { + tracing::warn!( + "grant_approval: run {run_id} has status '{}', expected 'waiting_approval' — ignoring stale approval", + run.status + ); + return; + } + let workflow = match db.get_workflow(workflow_id).await { Ok(w) => w, Err(e) => { diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index bdfca5c49..be306fc96 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -429,10 +429,18 @@ async fn derive_reaction_channel( ) -> ReactionChannelResult { // Find the target event ID from NIP-25 `e` tags. // Per NIP-25, the last `e` tag is the target (in case of threading). + // Filter for 64-char hex event IDs inside find_map to skip UUID channel refs, + // consistent with build_trigger_context() in sprout-workflow/src/lib.rs. let target_hex = match event.tags.iter().rev().find_map(|tag| { let key = tag.kind().to_string(); if key == "e" { - tag.content().map(|s| s.to_string()) + tag.content().and_then(|v| { + if v.len() == 64 && v.chars().all(|c| c.is_ascii_hexdigit()) { + Some(v.to_string()) + } else { + None + } + }) } else { None } @@ -441,12 +449,7 @@ async fn derive_reaction_channel( None => return ReactionChannelResult::NoTarget, }; - // Must be a 64-char hex string (event ID), not a UUID - if target_hex.len() != 64 { - return ReactionChannelResult::NoTarget; - } - - // Decode hex to bytes for DB lookup + // Decode hex to bytes for DB lookup (already validated as 64-char hex above) let id_bytes = match hex::decode(&target_hex) { Ok(b) if b.len() == 32 => b, _ => return ReactionChannelResult::NoTarget, diff --git a/crates/sprout-workflow/src/error.rs b/crates/sprout-workflow/src/error.rs index f00aa0d9c..5d671fa95 100644 --- a/crates/sprout-workflow/src/error.rs +++ b/crates/sprout-workflow/src/error.rs @@ -55,16 +55,6 @@ pub enum WorkflowError { Database(String), } -impl WorkflowError { - /// Attach partial execution progress to this error. - /// - /// Returns a `(WorkflowError, PartialProgress)` tuple that callers can - /// destructure to persist the trace before marking the run as failed. - pub fn with_progress(self, progress: PartialProgress) -> (Self, PartialProgress) { - (self, progress) - } -} - impl From for WorkflowError { fn from(e: sprout_db::error::DbError) -> Self { WorkflowError::Database(e.to_string()) diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index 61efa629b..1c03944a6 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -908,12 +908,16 @@ pub async fn execute_from_step( // Mark run as Running now that we have a permit (resume from approval). // Preserve the existing execution trace from pre-approval steps. - let existing_trace = engine - .db - .get_workflow_run(run_id) - .await - .map(|r| r.execution_trace) - .unwrap_or_else(|_| serde_json::json!([])); + let existing_trace = match engine.db.get_workflow_run(run_id).await { + Ok(r) => r.execution_trace, + Err(e) => { + warn!( + run_id = %run_id, + "Failed to read existing trace for resume — pre-approval trace will be lost: {e}" + ); + serde_json::json!([]) + } + }; engine .db .update_workflow_run( From c39d4526bcc5a4af2e07a899d4794c84b90b4dc1 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 21:55:52 -0400 Subject: [PATCH 13/15] Crossfire round 8: deny_approval WaitingApproval guard Add run.status == WaitingApproval guard to deny_approval path, consistent with the grant_approval path added in round 7. Without this guard, a stale deny token could overwrite a terminal run status (Failed/Completed/Cancelled) with Cancelled, corrupting state. Now both grant and deny paths validate the run is still waiting before mutating it. --- crates/sprout-relay/src/api/approvals.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index 3564a90b0..a2459e780 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -348,25 +348,36 @@ pub async fn deny_approval( return Err(api_error(StatusCode::CONFLICT, "approval already acted on")); } - // Mark the workflow run as Cancelled. + // Mark the workflow run as Cancelled — only if it's still WaitingApproval. + // A run that has already transitioned to Failed/Completed/Cancelled through + // another path (e.g., timeout, manual cancel) must not be overwritten. let run_id = approval.run_id; let pubkey_for_msg = pubkey.to_hex(); let db = state.db.clone(); tokio::spawn(async move { - let (current_step, trace) = match db.get_workflow_run(run_id).await { - Ok(r) => (r.current_step, r.execution_trace), + let run = match db.get_workflow_run(run_id).await { + Ok(r) => r, Err(e) => { tracing::error!("deny_approval: failed to fetch run {run_id}: {e}"); - (0, serde_json::Value::Array(vec![])) + return; } }; + + if run.status != sprout_db::workflow::RunStatus::WaitingApproval { + tracing::warn!( + "deny_approval: run {run_id} has status '{}', expected 'waiting_approval' — skipping cancellation", + run.status + ); + return; + } + let cancel_msg = format!("workflow cancelled: approval denied by {pubkey_for_msg}"); if let Err(e) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Cancelled, - current_step, - &trace, + run.current_step, + &run.execution_trace, Some(&cancel_msg), ) .await From 43ac32671cf57ad5a38922912d0dc347490a36d6 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Mon, 9 Mar 2026 22:22:28 -0400 Subject: [PATCH 14/15] Extract finalize_run, decompose on_event, remove dead code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Structural refactor addressing valid review feedback: 1. Extract WorkflowEngine::finalize_run() — single place for the 3-way executor result → DB status mapping. Eliminates ~170 lines of near-identical code duplicated across on_event, spawn_workflow_execution, and resume_workflow_after_approval. 2. Extract should_fire_workflow() — emoji filter and trigger filter expression evaluation pulled out of on_event's per-workflow loop body. on_event is now ~90 lines (was ~225). 3. Remove _workflow_id parameter from spawn_workflow_execution — was prefixed with underscore since approval gate removal, now gone entirely. 4. Remove handle_approval_suspension dead code — was #[allow(dead_code)] since round 6. The lifecycle logic now lives in finalize_run; WF-08 will re-implement suspension there when needed. All 280+ unit tests pass, all 44 E2E tests pass, zero clippy warnings. --- crates/sprout-relay/src/api/approvals.rs | 75 +---- .../sprout-relay/src/api/workflow_helpers.rs | 123 +------- crates/sprout-relay/src/api/workflows.rs | 2 - crates/sprout-workflow/src/lib.rs | 298 +++++++++--------- 4 files changed, 164 insertions(+), 334 deletions(-) diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index a2459e780..b969a364e 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -152,7 +152,10 @@ async fn resume_workflow_after_approval( .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); - match sprout_workflow::executor::execute_from_step( + // Execute remaining steps and finalize the run. + // Pass existing trace so finalize_run merges pre-approval + post-approval entries. + let existing_trace = run.execution_trace.as_array().cloned(); + let result = sprout_workflow::executor::execute_from_step( &engine, run_id, &def, @@ -160,74 +163,8 @@ async fn resume_workflow_after_approval( resume_index, Some(initial_outputs), ) - .await - { - Ok(result) if result.approval_token.is_none() => { - let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); - full_trace.extend(result.trace); - let trace_json = serde_json::Value::Array(full_trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Completed, - result.step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!( - "grant_approval: failed to set Completed status for run {run_id}: {e}" - ); - } - } - Ok(result) => { - // Chained approval gates are not yet fully implemented (see WF-08). - // Mark the run as Failed rather than silently creating a new approval - // record that nothing will ever resolve. - let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); - full_trace.extend(result.trace); - let trace_json = serde_json::Value::Array(full_trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - result.step_index as i32, - &trace_json, - Some("approval gates not yet fully implemented — see WF-08"), - ) - .await - { - tracing::error!( - "grant_approval: failed to set Failed status for run {run_id}: {e}" - ); - } - } - Err((e, progress)) => { - tracing::error!( - "grant_approval: resume of run {run_id} failed at step {}: {e}", - progress.step_index - ); - // Merge pre-approval trace with partial post-approval trace from the executor. - let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); - full_trace.extend(progress.trace); - let trace_json = serde_json::Value::Array(full_trace); - if let Err(db_err) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - progress.step_index as i32, - &trace_json, - Some(&format!("execution failed after approval resume: {e}")), - ) - .await - { - tracing::error!( - "grant_approval: failed to set Failed status for run {run_id}: {db_err}" - ); - } - } - } + .await; + engine.finalize_run(run_id, result, existing_trace).await; } // ── POST /api/approvals/:token/grant ───────────────────────────────────────── diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index 3a43b7b41..ac40cccc1 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -159,12 +159,11 @@ pub(crate) fn definition_hash(json_str: &str) -> Vec { /// Spawn an async workflow execution task. /// /// Handles the full lifecycle: Pending → (executor sets Running) → Completed / Failed. -/// Approval gates are not yet implemented (WF-08) — runs hitting approval steps are marked Failed. +/// Uses [`WorkflowEngine::finalize_run`] for the result→DB-status mapping. /// Used by trigger and webhook paths to avoid code duplication. pub(crate) fn spawn_workflow_execution( engine: Arc, db: sprout_db::Db, - _workflow_id: uuid::Uuid, run_id: uuid::Uuid, workflow_def_value: serde_json::Value, trigger_ctx: sprout_workflow::executor::TriggerContext, @@ -190,126 +189,12 @@ pub(crate) fn spawn_workflow_execution( } }; - match sprout_workflow::executor::execute_run(&engine, run_id, &def, &trigger_ctx).await { - Ok(result) if result.approval_token.is_none() => { - let trace_json = serde_json::Value::Array(result.trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Completed, - result.step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Completed status: {e}"); - } - } - Ok(result) => { - // Approval gates are not yet fully implemented (WF-08). - // Fail explicitly rather than creating potentially orphaned WaitingApproval rows. - tracing::warn!( - "workflow run {run_id}: hit approval gate — not yet implemented, marking as failed" - ); - let trace_json = serde_json::Value::Array(result.trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - result.step_index as i32, - &trace_json, - Some("approval gates not yet implemented — see WF-08"), - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Failed status: {e}"); - } - } - Err((e, progress)) => { - tracing::error!("workflow run {run_id} failed: {e}"); - // Use partial trace from the executor — contains steps - // completed/skipped before the failure. - let trace_json = serde_json::Value::Array(progress.trace); - if let Err(db_err) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - progress.step_index as i32, - &trace_json, - Some(&e.to_string()), - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Failed status: {db_err}"); - } - } - } + let result = + sprout_workflow::executor::execute_run(&engine, run_id, &def, &trigger_ctx).await; + engine.finalize_run(run_id, result, None).await; }); } -/// Persist approval-gate suspension state and create the approval record. -/// Not called from the execution path yet — will be wired up when WF-08 is implemented. -#[allow(dead_code)] -pub(crate) async fn handle_approval_suspension( - db: &sprout_db::Db, - def: &sprout_workflow::WorkflowDef, - workflow_id: uuid::Uuid, - run_id: uuid::Uuid, - result: sprout_workflow::executor::ExecutionResult, -) { - let approval_token = match result.approval_token { - Some(token) => token, - None => { - tracing::error!("workflow run {run_id}: handle_approval_suspension called but approval_token is None"); - return; - } - }; - let suspended_step_index = result.step_index; - let trace_json = serde_json::Value::Array(result.trace); - - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::WaitingApproval, - suspended_step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set WaitingApproval status: {e}"); - } - - if let Some(suspended_step) = def.steps.get(suspended_step_index) { - let approver_spec = match &suspended_step.action { - sprout_workflow::ActionDef::RequestApproval { from, .. } => from.clone(), - _ => "any".to_string(), - }; - let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); - if let Err(e) = db - .create_approval(sprout_db::workflow::CreateApprovalParams { - token: &approval_token, - workflow_id, - run_id, - step_id: &suspended_step.id, - step_index: suspended_step_index as i32, - approver_spec: &approver_spec, - expires_at, - }) - .await - { - tracing::error!("workflow run {run_id}: failed to create approval record: {e}"); - } - } - - tracing::info!( - "workflow run {} suspended for approval at step {} (token: )", - run_id, - suspended_step_index, - ); -} - // ── Tests ───────────────────────────────────────────────────────────────────── #[cfg(test)] diff --git a/crates/sprout-relay/src/api/workflows.rs b/crates/sprout-relay/src/api/workflows.rs index 2813ddaf7..9116671f5 100644 --- a/crates/sprout-relay/src/api/workflows.rs +++ b/crates/sprout-relay/src/api/workflows.rs @@ -382,7 +382,6 @@ pub async fn trigger_workflow( spawn_workflow_execution( Arc::clone(&state.workflow_engine), state.db.clone(), - id, run_id, workflow.definition.clone(), trigger_ctx, @@ -495,7 +494,6 @@ pub async fn workflow_webhook( spawn_workflow_execution( Arc::clone(&state.workflow_engine), state.db.clone(), - id, run_id, workflow.definition.clone(), trigger_ctx, diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 6dcbc1daa..0976787e0 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -93,17 +93,103 @@ impl WorkflowEngine { schema::parse_yaml(yaml) } + /// Finalize a workflow run after execution completes or fails. + /// + /// This is the **single** place that maps an executor result to a DB status + /// update. All execution paths (event-triggered, manual trigger/webhook, + /// approval resume) call this instead of duplicating the 3-way match. + /// + /// `existing_trace` is prepended to the executor's trace — used by the + /// approval-resume path where pre-approval steps already have trace entries. + pub async fn finalize_run( + &self, + run_id: uuid::Uuid, + result: Result, + existing_trace: Option>, + ) { + let prefix = existing_trace.unwrap_or_default(); + + match result { + Ok(result) => { + let mut full_trace = prefix; + full_trace.extend(result.trace); + let trace_json = serde_json::Value::Array(full_trace); + let step_count = result.step_index as i32; + + if result.approval_token.is_some() { + // Approval gates are not yet implemented (WF-08). + // Fail explicitly rather than creating unreachable WaitingApproval rows. + tracing::warn!( + run_id = %run_id, + step_index = result.step_index, + "Workflow hit approval gate — not yet implemented, marking as failed" + ); + if let Err(e) = self + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + step_count, + &trace_json, + Some("approval gates not yet implemented — see WF-08"), + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed (approval gate): {e}" + ); + } + } else { + tracing::info!(run_id = %run_id, "Workflow run completed"); + if let Err(e) = self + .db + .update_workflow_run( + run_id, + RunStatus::Completed, + step_count, + &trace_json, + None, + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Completed: {e}" + ); + } + } + } + Err((e, progress)) => { + tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); + let mut full_trace = prefix; + full_trace.extend(progress.trace); + let trace_json = serde_json::Value::Array(full_trace); + if let Err(db_err) = self + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + progress.step_index as i32, + &trace_json, + Some(&e.to_string()), + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed: {db_err}" + ); + } + } + } + } + /// Called from the event handler post-store hook for every stored event. /// /// Checks whether any workflow in the event's channel has a matching trigger. /// Workflow execution events (kinds 46001–46012) are excluded to prevent loops. /// - /// For each matching workflow: - /// 1. Evaluates the trigger filter expression (if present). - /// 2. Builds a [`executor::TriggerContext`] from the event. - /// 3. Creates a `workflow_run` row in the DB (status: `pending`). - /// 4. Spawns an async task to execute the run via [`executor::execute_run`]. - /// /// The method takes `self: &Arc` so that the spawned task can hold a /// clone of the `Arc` without requiring `'static` on `&self`. pub async fn on_event( @@ -122,12 +208,10 @@ impl WorkflowEngine { let kind_u32 = event_kind_u32(&event.event); // Exclude workflow execution events to prevent infinite loops. - // See Decision 10 in PLANS/SPROUT_WORKFLOWS.md. if is_workflow_execution_kind(kind_u32) { return Ok(()); } - // Load enabled workflows for this channel. let workflows = self .db .list_enabled_channel_workflows(channel_id) @@ -138,84 +222,30 @@ impl WorkflowEngine { return Ok(()); } - // Build TriggerContext once — all matching workflows in this channel - // share the same triggering event. let trigger_ctx = build_trigger_context(event); for workflow in &workflows { - // Parse the stored JSON definition. let def: WorkflowDef = match serde_json::from_value(workflow.definition.clone()) { Ok(d) => d, Err(e) => { - tracing::warn!( - workflow_id = %workflow.id, - "Failed to parse workflow definition: {e}" - ); + tracing::warn!(workflow_id = %workflow.id, "Failed to parse definition: {e}"); continue; } }; - if !def.enabled { + if !def.enabled || !trigger_matches_event(&def.trigger, kind_u32) { continue; } - // Check if the trigger type matches the event kind. - if !trigger_matches_event(&def.trigger, kind_u32) { + if !should_fire_workflow(&def, &trigger_ctx, workflow.id).await { continue; } - // Enforce reaction emoji filter: if the workflow specifies a specific - // emoji, skip events whose content doesn't match. NIP-25 stores the - // emoji character (or shortcode) in the event content field. - if let TriggerDef::ReactionAdded { - emoji: Some(ref expected), - } = def.trigger - { - let actual = &trigger_ctx.emoji; - if actual != expected { - tracing::debug!( - workflow_id = %workflow.id, - expected_emoji = %expected, - actual_emoji = %actual, - "Reaction emoji mismatch — skipping workflow" - ); - continue; - } - } - - // Evaluate the trigger filter expression (MessagePosted only). - // A filter that evaluates to false skips this workflow entirely. - if let TriggerDef::MessagePosted { - filter: Some(ref expr), - } = def.trigger - { - match executor::evaluate_condition(expr, &trigger_ctx, &HashMap::new()).await { - Ok(true) => {} - Ok(false) => { - tracing::debug!( - workflow_id = %workflow.id, - "Trigger filter evaluated false — skipping workflow" - ); - continue; - } - Err(e) => { - tracing::warn!( - workflow_id = %workflow.id, - "Trigger filter error: {e} — skipping workflow" - ); - continue; - } - } - } - // Serialize TriggerContext for DB storage. let trigger_ctx_json = match serde_json::to_value(&trigger_ctx) { Ok(v) => v, Err(e) => { - tracing::warn!( - workflow_id = %workflow.id, - "Failed to serialize TriggerContext: {e}" - ); + tracing::warn!(workflow_id = %workflow.id, "Failed to serialize ctx: {e}"); continue; } }; @@ -233,10 +263,7 @@ impl WorkflowEngine { { Ok(id) => id, Err(e) => { - tracing::error!( - workflow_id = %workflow.id, - "Failed to create workflow_run: {e}" - ); + tracing::error!(workflow_id = %workflow.id, "Failed to create run: {e}"); continue; } }; @@ -247,85 +274,13 @@ impl WorkflowEngine { "Workflow triggered — spawning execution" ); - // Spawn execution. Clone Arc so the task owns its references. let engine = Arc::clone(self); let def_clone = def.clone(); let ctx_clone = trigger_ctx.clone(); tokio::spawn(async move { - match executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await { - Ok(result) => { - let trace_json = serde_json::Value::Array(result.trace); - let step_count = result.step_index as i32; - - if result.approval_token.is_some() { - // Approval gates are not yet implemented (WF-08). - // Fail explicitly rather than creating unreachable WaitingApproval rows. - tracing::warn!( - run_id = %run_id, - step_index = result.step_index, - "Workflow hit approval gate — not yet implemented, marking as failed" - ); - if let Err(e) = engine - .db - .update_workflow_run( - run_id, - RunStatus::Failed, - step_count, - &trace_json, - Some("approval gates not yet implemented — see WF-08"), - ) - .await - { - tracing::error!( - run_id = %run_id, - "Failed to update run to Failed (approval gate): {e}" - ); - } - } else { - // Normal completion. - tracing::info!(run_id = %run_id, "Workflow run completed"); - if let Err(e) = engine - .db - .update_workflow_run( - run_id, - RunStatus::Completed, - step_count, - &trace_json, - None, - ) - .await - { - tracing::error!( - run_id = %run_id, - "Failed to update run to Completed: {e}" - ); - } - } - } - Err((e, progress)) => { - tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); - // Use partial trace from the executor — contains steps - // completed/skipped before the failure. - let trace_json = serde_json::Value::Array(progress.trace); - if let Err(db_err) = engine - .db - .update_workflow_run( - run_id, - RunStatus::Failed, - progress.step_index as i32, - &trace_json, - Some(&e.to_string()), - ) - .await - { - tracing::error!( - run_id = %run_id, - "Failed to update run to Failed: {db_err}" - ); - } - } - } + let result = executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await; + engine.finalize_run(run_id, result, None).await; }); } @@ -348,6 +303,61 @@ impl WorkflowEngine { } } +// ── Pre-trigger filtering ───────────────────────────────────────────────────── + +/// Check emoji and filter-expression conditions that determine whether a +/// matched workflow should actually fire. Extracted from `on_event` to keep +/// the per-workflow loop body small. +/// +/// Returns `true` if the workflow should fire, `false` to skip. +async fn should_fire_workflow( + def: &WorkflowDef, + trigger_ctx: &executor::TriggerContext, + workflow_id: uuid::Uuid, +) -> bool { + // Enforce reaction emoji filter. + if let TriggerDef::ReactionAdded { + emoji: Some(ref expected), + } = def.trigger + { + if &trigger_ctx.emoji != expected { + tracing::debug!( + workflow_id = %workflow_id, + expected_emoji = %expected, + actual_emoji = %trigger_ctx.emoji, + "Reaction emoji mismatch — skipping workflow" + ); + return false; + } + } + + // Evaluate trigger filter expression (MessagePosted only). + if let TriggerDef::MessagePosted { + filter: Some(ref expr), + } = def.trigger + { + match executor::evaluate_condition(expr, trigger_ctx, &HashMap::new()).await { + Ok(true) => {} + Ok(false) => { + tracing::debug!( + workflow_id = %workflow_id, + "Trigger filter evaluated false — skipping workflow" + ); + return false; + } + Err(e) => { + tracing::warn!( + workflow_id = %workflow_id, + "Trigger filter error: {e} — skipping workflow" + ); + return false; + } + } + } + + true +} + // ── Trigger context builder ─────────────────────────────────────────────────── /// Build a [`executor::TriggerContext`] from a [`sprout_core::StoredEvent`]. From d24c5806cf29ae34606f28bf8c49ef943fabb2a1 Mon Sep 17 00:00:00 2001 From: Tyler Longwell Date: Tue, 10 Mar 2026 10:35:45 -0400 Subject: [PATCH 15/15] Address review findings: delay/timeout race, auth gap, 202 approvals, E2E coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix delay/timeout race: MAX_DELAY_SECS 300→270 (must be < default_timeout_secs) - Close auth gap on global workflows: get/update now check owner_pubkey when channel_id is NULL - Return 202 Accepted from grant/deny approval endpoints (async work follows response) - Normalize check_approver_spec to case-insensitive hex comparison - Serialize trigger_ctx_json once before workflow loop (was redundant per-workflow) - Fix ARCHITECTURE.md: only kind 40001 triggers workflows, not 40002 - Fix misleading KIND_JOB comments in feed.rs - Add E2E test for approval gate stub (WF-08) --- ARCHITECTURE.md | 2 +- crates/sprout-db/src/feed.rs | 2 +- crates/sprout-relay/src/api/approvals.rs | 47 ++++---- crates/sprout-relay/src/api/workflows.rs | 4 + .../sprout-test-client/tests/e2e_workflows.rs | 106 ++++++++++++++++++ crates/sprout-workflow/src/executor.rs | 6 +- crates/sprout-workflow/src/lib.rs | 17 ++- 7 files changed, 148 insertions(+), 36 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 324f19186..1faadeed5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -222,7 +222,7 @@ Steps 10–12 are fire-and-forget: they are spawned as independent async tasks. Step 9 (fan-out) also checks global subscriptions (no `channel_id` constraint) — broad subscriptions receive channel-scoped events if their filters match. -Workflow loop prevention: kinds 46001–46012 (workflow execution events) are excluded from triggering workflows. Exception: stream message kinds (40001, 40002) always trigger regardless of other exclusion rules. +Workflow loop prevention: kinds 46001–46012 (workflow execution events) are excluded from triggering workflows. Exception: stream message kind 40001 (`KIND_STREAM_MESSAGE`) always triggers regardless of other exclusion rules. Kind 40002 (`KIND_STREAM_MESSAGE_V2`) does not trigger workflows. ### Ephemeral Sub-Pipeline (kinds 20000–29999) diff --git a/crates/sprout-db/src/feed.rs b/crates/sprout-db/src/feed.rs index b760e0bbc..a756346d1 100644 --- a/crates/sprout-db/src/feed.rs +++ b/crates/sprout-db/src/feed.rs @@ -184,7 +184,7 @@ pub async fn query_activity( ); // Stream messages, forum posts, agent job events. - // KIND_JOB_REQUEST = agent job created, KIND_JOB_PROGRESS = agent job completed, KIND_JOB_RESULT = agent job failed. + // KIND_JOB_REQUEST = agent job requested, KIND_JOB_PROGRESS = in-flight progress update, KIND_JOB_RESULT = completed result. qb.push(format!( " AND kind IN ({KIND_STREAM_MESSAGE}, {KIND_STREAM_MESSAGE_V2}, {KIND_FORUM_POST}, {KIND_JOB_REQUEST}, {KIND_JOB_PROGRESS}, {KIND_JOB_RESULT})" )); diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index b969a364e..b7a2f6ea4 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -49,9 +49,9 @@ fn check_approver_spec( return Ok(()); } - // Exact pubkey match (64-char lowercase hex). + // Exact pubkey match (64-char hex, case-insensitive). if spec.len() == 64 && spec.chars().all(|c| c.is_ascii_hexdigit()) { - if requester_hex == spec { + if requester_hex.to_lowercase() == spec.to_lowercase() { return Ok(()); } return Err(forbidden( @@ -177,7 +177,7 @@ pub async fn grant_approval( headers: HeaderMap, Path(token): Path, body: Option>, -) -> Result, (StatusCode, Json)> { +) -> Result<(StatusCode, Json), (StatusCode, Json)> { let (pubkey, pubkey_bytes) = extract_auth_pubkey(&headers, &state).await?; let approval = state @@ -228,12 +228,15 @@ pub async fn grant_approval( resume_workflow_after_approval(engine, db, run_id, workflow_id, resume_index).await; }); - Ok(Json(serde_json::json!({ - "token": token, - "status": "granted", - "run_id": approval.run_id.to_string(), - "workflow_id": approval.workflow_id.to_string(), - }))) + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "token": token, + "status": "granted", + "run_id": approval.run_id.to_string(), + "workflow_id": approval.workflow_id.to_string(), + })), + )) } // ── POST /api/approvals/:token/deny ────────────────────────────────────────── @@ -246,7 +249,7 @@ pub async fn deny_approval( headers: HeaderMap, Path(token): Path, body: Option>, -) -> Result, (StatusCode, Json)> { +) -> Result<(StatusCode, Json), (StatusCode, Json)> { let (pubkey, pubkey_bytes) = extract_auth_pubkey(&headers, &state).await?; let approval = state @@ -323,12 +326,15 @@ pub async fn deny_approval( } }); - Ok(Json(serde_json::json!({ - "token": token, - "status": "denied", - "run_id": approval.run_id.to_string(), - "workflow_id": approval.workflow_id.to_string(), - }))) + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "token": token, + "status": "denied", + "run_id": approval.run_id.to_string(), + "workflow_id": approval.workflow_id.to_string(), + })), + )) } // ── Tests ───────────────────────────────────────────────────────────────────── @@ -412,13 +418,10 @@ mod tests { } #[test] - fn uppercase_hex_spec_is_rejected_as_unrecognised() { - // Spec must be lowercase hex — uppercase fails the `is_ascii_hexdigit` path length check - // (it IS hex digits, but the spec says 64-char lowercase; uppercase passes hexdigit but - // won't match a lowercase requester_hex, so it falls through to the role branch). + fn uppercase_hex_spec_is_accepted_case_insensitive() { + // Uppercase hex spec should now succeed — comparison is case-insensitive. let upper = ALICE_HEX.to_uppercase(); let result = check_approver_spec(&upper, &upper.to_lowercase()); - // Either forbidden (no match) or forbidden (unrecognised spec) — both are errors. - assert!(result.is_err()); + assert!(result.is_ok()); } } diff --git a/crates/sprout-relay/src/api/workflows.rs b/crates/sprout-relay/src/api/workflows.rs index 9116671f5..f45651c06 100644 --- a/crates/sprout-relay/src/api/workflows.rs +++ b/crates/sprout-relay/src/api/workflows.rs @@ -163,6 +163,8 @@ pub async fn get_workflow( if let Some(channel_id) = workflow.channel_id { check_channel_access(&state, channel_id, &pubkey_bytes).await?; + } else if workflow.owner_pubkey != pubkey_bytes { + return Err(forbidden("not authorized to access this workflow")); } Ok(Json(workflow_record_to_json(&workflow))) @@ -200,6 +202,8 @@ pub async fn update_workflow( if let Some(channel_id) = existing.channel_id { check_channel_access(&state, channel_id, &pubkey_bytes).await?; + } else if existing.owner_pubkey != pubkey_bytes { + return Err(forbidden("not authorized to access this workflow")); } let (def, definition_json_str) = diff --git a/crates/sprout-test-client/tests/e2e_workflows.rs b/crates/sprout-test-client/tests/e2e_workflows.rs index 8553b91fd..5d61eb956 100644 --- a/crates/sprout-test-client/tests/e2e_workflows.rs +++ b/crates/sprout-test-client/tests/e2e_workflows.rs @@ -626,3 +626,109 @@ async fn test_workflow_update_and_delete() { "GET after DELETE must return 404" ); } + +// ── Test 7: Approval gate (WF-08 stub) ──────────────────────────────────────── + +/// Create a workflow with a `request_approval` step, trigger it, and verify +/// the run fails with the "approval gates not yet implemented" message. +/// +/// This test documents the current stub behavior. When WF-08 is implemented, +/// this test should be updated to verify the full approval round-trip: +/// create → trigger → poll for waiting_approval → grant → verify completed. +#[tokio::test] +#[ignore] +async fn test_approval_gate_stub_fails_gracefully() { + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a workflow with a request_approval step ──────────────── + let workflow_yaml = r#"name: approval-test +description: Test approval gate +trigger: + on: webhook +steps: + - id: step1 + name: Pre-approval step + action: send_message + text: "Before approval" + - id: approve + action: request_approval + from: "any" + message: "Please approve this workflow" + - id: step3 + name: Post-approval step + action: send_message + text: "After approval" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + // ── Step 2: Trigger the workflow ────────────────────────────────────────── + let trigger_url = format!("{base}/api/workflows/{workflow_id}/trigger"); + let trigger_resp = client + .post(&trigger_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .unwrap_or_else(|e| panic!("POST {trigger_url} failed: {e}")); + + assert_eq!( + trigger_resp.status(), + 202, + "trigger endpoint must return 202 Accepted" + ); + + let trigger_body: serde_json::Value = trigger_resp + .json() + .await + .expect("trigger response must be JSON"); + let run_id = trigger_body["run_id"] + .as_str() + .expect("trigger response must include 'run_id'") + .to_string(); + + // ── Step 3: Poll until the run reaches a terminal status ────────────────── + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let mut final_run: Option = None; + for _ in 0..10 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs failed"); + assert_eq!(runs_resp.status(), 200, "GET runs must return 200"); + let runs: Vec = runs_resp.json().await.expect("runs must be JSON array"); + if let Some(run) = runs.iter().find(|r| r["id"].as_str() == Some(&run_id)) { + let status = run["status"].as_str().unwrap_or(""); + if matches!(status, "completed" | "failed" | "cancelled") { + final_run = Some(run.clone()); + break; + } + } + } + + // ── Step 4: Assert the run failed with the expected stub error ──────────── + let run = final_run.expect("run must reach a terminal status within 1 second"); + + assert_eq!( + run["status"].as_str().unwrap_or(""), + "failed", + "approval gate stub must cause the run to fail" + ); + + let error_msg = run["error"].as_str().unwrap_or(""); + assert!( + error_msg.contains("approval gates not yet implemented"), + "run error must contain 'approval gates not yet implemented', got: {error_msg:?}" + ); + + // ── Step 5: Clean up ────────────────────────────────────────────────────── + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index 1c03944a6..720b03d37 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -602,10 +602,10 @@ pub async fn dispatch_action( Delay { duration } => { let secs = parse_duration_secs(duration)?; - // Cap delay at 300 seconds (5 minutes) to prevent tasks from holding - // a Tokio worker thread for extended periods. Long delays (hours/days) + // Cap delay at 270 seconds (4.5 minutes) — must be less than default_timeout_secs (300s) + // to avoid non-deterministic StepTimeout. Long delays (hours/days) // should use the scheduled resume pattern (future work: WF-09). - const MAX_DELAY_SECS: u64 = 300; + const MAX_DELAY_SECS: u64 = 270; if secs > MAX_DELAY_SECS { return Err(WorkflowError::InvalidDefinition(format!( "delay exceeds maximum of {MAX_DELAY_SECS} seconds (got {secs}s); \ diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 0976787e0..2192a919b 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -224,6 +224,14 @@ impl WorkflowEngine { let trigger_ctx = build_trigger_context(event); + let trigger_ctx_json: serde_json::Value = match serde_json::to_value(&trigger_ctx) { + Ok(v) => v, + Err(e) => { + tracing::error!("Failed to serialize trigger context: {e}"); + return Ok(()); + } + }; + for workflow in &workflows { let def: WorkflowDef = match serde_json::from_value(workflow.definition.clone()) { Ok(d) => d, @@ -241,15 +249,6 @@ impl WorkflowEngine { continue; } - // Serialize TriggerContext for DB storage. - let trigger_ctx_json = match serde_json::to_value(&trigger_ctx) { - Ok(v) => v, - Err(e) => { - tracing::warn!(workflow_id = %workflow.id, "Failed to serialize ctx: {e}"); - continue; - } - }; - // Create the workflow_run row (status: pending). let trigger_event_id_bytes = event.event.id.as_bytes().to_vec(); let run_id = match self