diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 324f19186..1faadeed5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -222,7 +222,7 @@ Steps 10–12 are fire-and-forget: they are spawned as independent async tasks. Step 9 (fan-out) also checks global subscriptions (no `channel_id` constraint) — broad subscriptions receive channel-scoped events if their filters match. -Workflow loop prevention: kinds 46001–46012 (workflow execution events) are excluded from triggering workflows. Exception: stream message kinds (40001, 40002) always trigger regardless of other exclusion rules. +Workflow loop prevention: kinds 46001–46012 (workflow execution events) are excluded from triggering workflows. Exception: stream message kind 40001 (`KIND_STREAM_MESSAGE`) always triggers regardless of other exclusion rules. Kind 40002 (`KIND_STREAM_MESSAGE_V2`) does not trigger workflows. ### Ephemeral Sub-Pipeline (kinds 20000–29999) diff --git a/Cargo.lock b/Cargo.lock index 921f686d9..5eeeb97a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2723,6 +2723,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "sprout-core", "thiserror", "tokio", "tokio-tungstenite 0.26.2", @@ -2847,6 +2848,7 @@ dependencies = [ "chrono", "cron", "evalexpr", + "nostr", "reqwest", "serde", "serde_json", diff --git a/README.md b/README.md index 0ff0cb749..a6a025b59 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,7 @@ Copy `.env.example` to `.env`. All defaults work with `docker compose up` out of | `REDIS_URL` | `redis://localhost:6379` | Redis connection string | | `TYPESENSE_URL` | `http://localhost:8108` | Typesense base URL | | `TYPESENSE_API_KEY` | `sprout_dev_key` | Typesense API key | +| `TYPESENSE_COLLECTION` | `events` | Typesense collection name | | `SPROUT_BIND_ADDR` | `0.0.0.0:3000` | Relay bind address (host:port) | | `RELAY_URL` | `ws://localhost:3000` | Public URL (used in NIP-42 challenges) | | `SPROUT_REQUIRE_AUTH_TOKEN` | `false` | Require bearer token for auth (set `true` in production) | diff --git a/crates/sprout-audit/src/service.rs b/crates/sprout-audit/src/service.rs index 508d11314..0a09bf641 100644 --- a/crates/sprout-audit/src/service.rs +++ b/crates/sprout-audit/src/service.rs @@ -3,6 +3,8 @@ use futures_util::FutureExt as _; use sqlx::{Acquire, MySqlPool, Row}; use tracing::{debug, instrument, warn}; +use sprout_core::kind::KIND_AUTH; + use crate::{ action::AuditAction, entry::{AuditEntry, NewAuditEntry}, @@ -11,7 +13,6 @@ use crate::{ schema::AUDIT_SCHEMA_SQL, }; -const KIND_AUTH: u32 = 22242; const AUDIT_LOCK_NAME: &str = "sprout_audit"; const AUDIT_LOCK_TIMEOUT_SECS: i64 = 10; diff --git a/crates/sprout-core/src/kind.rs b/crates/sprout-core/src/kind.rs index 15e25865e..931269e79 100644 --- a/crates/sprout-core/src/kind.rs +++ b/crates/sprout-core/src/kind.rs @@ -17,6 +17,8 @@ pub const KIND_REACTION: u32 = 7; pub const KIND_GIFT_WRAP: u32 = 1059; /// NIP-94: File metadata attachment. pub const KIND_FILE_METADATA: u32 = 1063; +/// NIP-42 auth event — never stored (carries bearer tokens). +pub const KIND_AUTH: u32 = 22242; // NIP-29 group admin events /// NIP-29: Add a user to a group. @@ -56,6 +58,11 @@ pub const KIND_NIP29_GROUP_MEMBERS: u32 = 39002; /// NIP-29: Addressable group roles definition. pub const KIND_NIP29_GROUP_ROLES: u32 = 39003; +/// Lower bound of the ephemeral event range (20000–29999). Never stored. +pub const EPHEMERAL_KIND_MIN: u32 = 20000; +/// Upper bound of the ephemeral event range (20000–29999). Never stored. +pub const EPHEMERAL_KIND_MAX: u32 = 29999; + // Ephemeral events (20000–29999) — Redis pub/sub only, never stored. /// Ephemeral: user presence update (online/away/offline). pub const KIND_PRESENCE_UPDATE: u32 = 20001; @@ -77,6 +84,8 @@ pub const KIND_STREAM_MESSAGE_BOOKMARKED: u32 = 40005; pub const KIND_STREAM_MESSAGE_SCHEDULED: u32 = 40006; /// A reminder attached to a stream message or time. pub const KIND_STREAM_REMINDER: u32 = 40007; +/// Canvas (shared document) for a channel. +pub const KIND_CANVAS: u32 = 40100; // Direct messages (41000–41999) /// A new direct-message conversation was created. @@ -215,6 +224,7 @@ pub const ALL_KINDS: &[u32] = &[ KIND_STREAM_MESSAGE_BOOKMARKED, KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_REMINDER, + KIND_CANVAS, KIND_DM_CREATED, KIND_DM_MEMBER_ADDED, KIND_DM_MEMBER_REMOVED, @@ -260,6 +270,35 @@ pub const ALL_KINDS: &[u32] = &[ KIND_HUDDLE_RECORDING_AVAILABLE, ]; +/// Returns `true` if `kind` is in the ephemeral range (20000–29999). +pub const fn is_ephemeral(kind: u32) -> bool { + kind >= EPHEMERAL_KIND_MIN && kind <= EPHEMERAL_KIND_MAX +} + +/// Returns `true` if `kind` is a workflow execution event (46001–46012). +/// These must not trigger workflows (prevents infinite loops). +pub const fn is_workflow_execution_kind(kind: u32) -> bool { + kind >= KIND_WORKFLOW_TRIGGERED && kind <= KIND_WORKFLOW_APPROVAL_DENIED +} + +/// Extract the kind from a nostr Event as u32. +/// NIP-01 specifies kind as an unsigned integer; u32 covers the full range. +pub fn event_kind_u32(event: &nostr::Event) -> u32 { + event.kind.as_u16() as u32 +} + +/// Extract the kind from a nostr Event as i32 (for MySQL INT columns). +/// Safe: all Sprout kinds fit in i32 (max 65535 < i32::MAX). +pub fn event_kind_i32(event: &nostr::Event) -> i32 { + event.kind.as_u16() as i32 +} + +// Compile-time: all Sprout kind constants fit in nostr's u16-backed Kind. +const _: () = assert!(KIND_AUTH <= u16::MAX as u32); +const _: () = assert!(KIND_CANVAS <= u16::MAX as u32); +const _: () = assert!(KIND_HUDDLE_RECORDING_AVAILABLE <= u16::MAX as u32); +const _: () = assert!(EPHEMERAL_KIND_MIN < EPHEMERAL_KIND_MAX); + #[cfg(test)] mod tests { use super::*; diff --git a/crates/sprout-db/src/event.rs b/crates/sprout-db/src/event.rs index 5fe01d1e4..b5594fdf8 100644 --- a/crates/sprout-db/src/event.rs +++ b/crates/sprout-db/src/event.rs @@ -9,15 +9,11 @@ use nostr::Event; use sqlx::{MySqlPool, QueryBuilder, Row}; use uuid::Uuid; +use sprout_core::kind::{event_kind_i32, is_ephemeral, KIND_AUTH}; use sprout_core::StoredEvent; use crate::error::{DbError, Result}; -/// NIP-42 auth event kind — never stored (carries bearer tokens). -const KIND_AUTH: u32 = 22242; -const EPHEMERAL_KIND_MIN: u32 = 20000; -const EPHEMERAL_KIND_MAX: u32 = 29999; - /// Optional filters for [`query_events`]. #[derive(Debug, Default, Clone)] pub struct EventQuery { @@ -51,7 +47,7 @@ pub async fn insert_event( if kind_u32 == KIND_AUTH { return Err(DbError::AuthEventRejected); } - if (EPHEMERAL_KIND_MIN..=EPHEMERAL_KIND_MAX).contains(&kind_u32) { + if is_ephemeral(kind_u32) { return Err(DbError::EphemeralEventRejected(kind_u16)); } @@ -60,7 +56,7 @@ pub async fn insert_event( let sig_bytes = event.sig.serialize(); let tags_json = serde_json::to_value(&event.tags)?; // Cast chain: nostr Kind (u16) → i32 (MySQL INT column). Safe: all Sprout kinds fit in i32. - let kind_i32 = event.kind.as_u16() as i32; + let kind_i32 = event_kind_i32(event); let created_at_secs = event.created_at.as_u64() as i64; let created_at = DateTime::from_timestamp(created_at_secs, 0) .ok_or(DbError::InvalidTimestamp(created_at_secs))?; diff --git a/crates/sprout-db/src/feed.rs b/crates/sprout-db/src/feed.rs index 8eabc1c7b..a756346d1 100644 --- a/crates/sprout-db/src/feed.rs +++ b/crates/sprout-db/src/feed.rs @@ -184,7 +184,7 @@ pub async fn query_activity( ); // Stream messages, forum posts, agent job events. - // KIND_JOB_REQUEST = agent job created, KIND_JOB_PROGRESS = agent job completed, KIND_JOB_RESULT = agent job failed. + // KIND_JOB_REQUEST = agent job requested, KIND_JOB_PROGRESS = in-flight progress update, KIND_JOB_RESULT = completed result. qb.push(format!( " AND kind IN ({KIND_STREAM_MESSAGE}, {KIND_STREAM_MESSAGE_V2}, {KIND_FORUM_POST}, {KIND_JOB_REQUEST}, {KIND_JOB_PROGRESS}, {KIND_JOB_RESULT})" )); @@ -407,7 +407,8 @@ mod tests { KIND_JOB_RESULT, ]; - for kind in 46001u32..=46012 { + use sprout_core::kind::{KIND_WORKFLOW_APPROVAL_DENIED, KIND_WORKFLOW_TRIGGERED}; + for kind in KIND_WORKFLOW_TRIGGERED..=KIND_WORKFLOW_APPROVAL_DENIED { assert!( !activity_kinds.contains(&kind), "workflow execution kind {kind} must NOT be in activity" diff --git a/crates/sprout-mcp/Cargo.toml b/crates/sprout-mcp/Cargo.toml index da873d6d8..1f27ccd3b 100644 --- a/crates/sprout-mcp/Cargo.toml +++ b/crates/sprout-mcp/Cargo.toml @@ -12,6 +12,9 @@ name = "sprout-mcp-server" path = "src/main.rs" [dependencies] +# Sprout core types +sprout-core = { workspace = true } + # MCP SDK rmcp = { workspace = true } schemars = { workspace = true } diff --git a/crates/sprout-mcp/src/lib.rs b/crates/sprout-mcp/src/lib.rs index 03d272960..3740cdc97 100644 --- a/crates/sprout-mcp/src/lib.rs +++ b/crates/sprout-mcp/src/lib.rs @@ -94,6 +94,10 @@ //! [Sprout]: https://github.com/sprout-rs/sprout //! [NIP-42]: https://github.com/nostr-protocol/nips/blob/master/42.md +// NOTE: `parse_relay_message`, `OkResponse`, and `RelayMessage` from `relay_client` +// are re-exported by `sprout-test-client`. Changes to these types are a breaking +// change for the test harness. + /// WebSocket client for the Sprout relay (NIP-42 auth, subscriptions, reconnect). pub mod relay_client; /// MCP tool implementations backed by the relay client. diff --git a/crates/sprout-mcp/src/server.rs b/crates/sprout-mcp/src/server.rs index 114080931..2a31fecb1 100644 --- a/crates/sprout-mcp/src/server.rs +++ b/crates/sprout-mcp/src/server.rs @@ -1,3 +1,5 @@ +use sprout_core::kind::{event_kind_u32, KIND_CANVAS}; + use rmcp::{ handler::server::{router::tool::ToolRouter, wrapper::Parameters}, model::{ServerCapabilities, ServerInfo}, @@ -310,7 +312,7 @@ impl SproutMcpServer { "id": event.id.to_hex(), "pubkey": event.pubkey.to_hex(), "content": event.content, - "kind": event.kind.as_u16() as u32, + "kind": event_kind_u32(event), "created_at": event.created_at.as_u64(), }) }) @@ -381,7 +383,7 @@ impl SproutMcpServer { nostr::SingleLetterTag::lowercase(nostr::Alphabet::E), [p.channel_id.as_str()], ) - .kind(nostr::Kind::Custom(40100)) + .kind(nostr::Kind::Custom(KIND_CANVAS as u16)) .limit(1); let sub_id = format!("canvas-{}", uuid::Uuid::new_v4()); @@ -415,8 +417,12 @@ impl SproutMcpServer { Err(e) => return format!("Error building tag: {e}"), }; - let event = match nostr::EventBuilder::new(nostr::Kind::Custom(40100), &p.content, [e_tag]) - .sign_with_keys(&keys) + let event = match nostr::EventBuilder::new( + nostr::Kind::Custom(KIND_CANVAS as u16), + &p.content, + [e_tag], + ) + .sign_with_keys(&keys) { Ok(e) => e, Err(e) => return format!("Error signing event: {e}"), diff --git a/crates/sprout-relay/src/api/approvals.rs b/crates/sprout-relay/src/api/approvals.rs index d7204e87d..b7a2f6ea4 100644 --- a/crates/sprout-relay/src/api/approvals.rs +++ b/crates/sprout-relay/src/api/approvals.rs @@ -49,9 +49,9 @@ fn check_approver_spec( return Ok(()); } - // Exact pubkey match (64-char lowercase hex). + // Exact pubkey match (64-char hex, case-insensitive). if spec.len() == 64 && spec.chars().all(|c| c.is_ascii_hexdigit()) { - if requester_hex == spec { + if requester_hex.to_lowercase() == spec.to_lowercase() { return Ok(()); } return Err(forbidden( @@ -87,6 +87,16 @@ async fn resume_workflow_after_approval( } }; + // Guard: only resume runs that are actually waiting for approval. + // A stale approval token could otherwise resurrect a cancelled/failed/completed run. + if run.status != sprout_db::workflow::RunStatus::WaitingApproval { + tracing::warn!( + "grant_approval: run {run_id} has status '{}', expected 'waiting_approval' — ignoring stale approval", + run.status + ); + return; + } + let workflow = match db.get_workflow(workflow_id).await { Ok(w) => w, Err(e) => { @@ -142,21 +152,10 @@ async fn resume_workflow_after_approval( .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); - // Mark the run as Running again before resuming. - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Running, - resume_index as i32, - &run.execution_trace, - None, - ) - .await - { - tracing::error!("grant_approval: failed to set Running status for run {run_id}: {e}"); - } - - match sprout_workflow::executor::execute_from_step( + // Execute remaining steps and finalize the run. + // Pass existing trace so finalize_run merges pre-approval + post-approval entries. + let existing_trace = run.execution_trace.as_array().cloned(); + let result = sprout_workflow::executor::execute_from_step( &engine, run_id, &def, @@ -164,106 +163,8 @@ async fn resume_workflow_after_approval( resume_index, Some(initial_outputs), ) - .await - { - Ok(result) if result.approval_token.is_none() => { - let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); - full_trace.extend(result.trace); - let trace_json = serde_json::Value::Array(full_trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Completed, - result.step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!( - "grant_approval: failed to set Completed status for run {run_id}: {e}" - ); - } - } - Ok(result) => { - // Suspended again at another approval gate. - let next_token = match result.approval_token { - Some(t) => t, - None => { - tracing::error!( - "grant_approval: expected approval_token but got None for run {run_id}" - ); - return; - } - }; - let suspended_step_index = result.step_index; - let mut full_trace = run.execution_trace.as_array().cloned().unwrap_or_default(); - full_trace.extend(result.trace); - let trace_json = serde_json::Value::Array(full_trace); - - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::WaitingApproval, - suspended_step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!( - "grant_approval: failed to set WaitingApproval status for run {run_id}: {e}" - ); - } - - if let Some(suspended_step) = def.steps.get(suspended_step_index) { - let approver_spec = match &suspended_step.action { - sprout_workflow::ActionDef::RequestApproval { from, .. } => from.clone(), - _ => "any".to_string(), - }; - let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); - if let Err(e) = db - .create_approval(sprout_db::workflow::CreateApprovalParams { - token: &next_token, - workflow_id, - run_id, - step_id: &suspended_step.id, - step_index: suspended_step_index as i32, - approver_spec: &approver_spec, - expires_at, - }) - .await - { - tracing::error!( - "grant_approval: failed to create approval record for run {run_id}: {e}" - ); - } - } - - tracing::info!( - "workflow run {} suspended again at step {} (token: )", - run_id, - suspended_step_index, - ); - } - Err(e) => { - tracing::error!("workflow run {run_id} failed after approval resume: {e}"); - if let Err(db_err) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - resume_index as i32, - &run.execution_trace, - Some(&e.to_string()), - ) - .await - { - tracing::error!( - "grant_approval: failed to set Failed status for run {run_id}: {db_err}" - ); - } - } - } + .await; + engine.finalize_run(run_id, result, existing_trace).await; } // ── POST /api/approvals/:token/grant ───────────────────────────────────────── @@ -276,7 +177,7 @@ pub async fn grant_approval( headers: HeaderMap, Path(token): Path, body: Option>, -) -> Result, (StatusCode, Json)> { +) -> Result<(StatusCode, Json), (StatusCode, Json)> { let (pubkey, pubkey_bytes) = extract_auth_pubkey(&headers, &state).await?; let approval = state @@ -327,12 +228,15 @@ pub async fn grant_approval( resume_workflow_after_approval(engine, db, run_id, workflow_id, resume_index).await; }); - Ok(Json(serde_json::json!({ - "token": token, - "status": "granted", - "run_id": approval.run_id.to_string(), - "workflow_id": approval.workflow_id.to_string(), - }))) + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "token": token, + "status": "granted", + "run_id": approval.run_id.to_string(), + "workflow_id": approval.workflow_id.to_string(), + })), + )) } // ── POST /api/approvals/:token/deny ────────────────────────────────────────── @@ -345,7 +249,7 @@ pub async fn deny_approval( headers: HeaderMap, Path(token): Path, body: Option>, -) -> Result, (StatusCode, Json)> { +) -> Result<(StatusCode, Json), (StatusCode, Json)> { let (pubkey, pubkey_bytes) = extract_auth_pubkey(&headers, &state).await?; let approval = state @@ -384,25 +288,36 @@ pub async fn deny_approval( return Err(api_error(StatusCode::CONFLICT, "approval already acted on")); } - // Mark the workflow run as Cancelled. + // Mark the workflow run as Cancelled — only if it's still WaitingApproval. + // A run that has already transitioned to Failed/Completed/Cancelled through + // another path (e.g., timeout, manual cancel) must not be overwritten. let run_id = approval.run_id; let pubkey_for_msg = pubkey.to_hex(); let db = state.db.clone(); tokio::spawn(async move { - let (current_step, trace) = match db.get_workflow_run(run_id).await { - Ok(r) => (r.current_step, r.execution_trace), + let run = match db.get_workflow_run(run_id).await { + Ok(r) => r, Err(e) => { tracing::error!("deny_approval: failed to fetch run {run_id}: {e}"); - (0, serde_json::Value::Array(vec![])) + return; } }; + + if run.status != sprout_db::workflow::RunStatus::WaitingApproval { + tracing::warn!( + "deny_approval: run {run_id} has status '{}', expected 'waiting_approval' — skipping cancellation", + run.status + ); + return; + } + let cancel_msg = format!("workflow cancelled: approval denied by {pubkey_for_msg}"); if let Err(e) = db .update_workflow_run( run_id, sprout_db::workflow::RunStatus::Cancelled, - current_step, - &trace, + run.current_step, + &run.execution_trace, Some(&cancel_msg), ) .await @@ -411,12 +326,15 @@ pub async fn deny_approval( } }); - Ok(Json(serde_json::json!({ - "token": token, - "status": "denied", - "run_id": approval.run_id.to_string(), - "workflow_id": approval.workflow_id.to_string(), - }))) + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "token": token, + "status": "denied", + "run_id": approval.run_id.to_string(), + "workflow_id": approval.workflow_id.to_string(), + })), + )) } // ── Tests ───────────────────────────────────────────────────────────────────── @@ -500,13 +418,10 @@ mod tests { } #[test] - fn uppercase_hex_spec_is_rejected_as_unrecognised() { - // Spec must be lowercase hex — uppercase fails the `is_ascii_hexdigit` path length check - // (it IS hex digits, but the spec says 64-char lowercase; uppercase passes hexdigit but - // won't match a lowercase requester_hex, so it falls through to the role branch). + fn uppercase_hex_spec_is_accepted_case_insensitive() { + // Uppercase hex spec should now succeed — comparison is case-insensitive. let upper = ALICE_HEX.to_uppercase(); let result = check_approver_spec(&upper, &upper.to_lowercase()); - // Either forbidden (no match) or forbidden (unrecognised spec) — both are errors. - assert!(result.is_err()); + assert!(result.is_ok()); } } diff --git a/crates/sprout-relay/src/api/feed.rs b/crates/sprout-relay/src/api/feed.rs index 0e7dc01ea..c2ed7e5c7 100644 --- a/crates/sprout-relay/src/api/feed.rs +++ b/crates/sprout-relay/src/api/feed.rs @@ -17,7 +17,7 @@ use axum::{ use chrono::{DateTime, Duration, Utc}; use serde::Deserialize; -use sprout_core::kind; +use sprout_core::kind::{self, event_kind_u32}; use crate::state::AppState; @@ -114,7 +114,7 @@ pub async fn feed_handler( // 3. Partition activity into agent activity vs channel activity. let (agent_activity, channel_activity): (Vec<_>, Vec<_>) = activity_all .into_iter() - .partition(|e| AGENT_KINDS.contains(&(e.event.kind.as_u16() as u32))); + .partition(|e| AGENT_KINDS.contains(&event_kind_u32(&e.event))); // 4. Enrich events with channel names (batch lookup). let all_channels = state.db.list_channels(None).await.unwrap_or_else(|e| { @@ -144,7 +144,7 @@ pub async fn feed_handler( serde_json::json!({ "id": event.event.id.to_hex(), - "kind": event.event.kind.as_u16() as u32, + "kind": event_kind_u32(&event.event), "pubkey": event.event.pubkey.to_hex(), "content": event.event.content, "created_at": event.event.created_at.as_u64(), diff --git a/crates/sprout-relay/src/api/workflow_helpers.rs b/crates/sprout-relay/src/api/workflow_helpers.rs index e92ec9232..ac40cccc1 100644 --- a/crates/sprout-relay/src/api/workflow_helpers.rs +++ b/crates/sprout-relay/src/api/workflow_helpers.rs @@ -158,31 +158,17 @@ pub(crate) fn definition_hash(json_str: &str) -> Vec { /// Spawn an async workflow execution task. /// -/// Handles the full lifecycle: Running → Completed / WaitingApproval / Failed. +/// Handles the full lifecycle: Pending → (executor sets Running) → Completed / Failed. +/// Uses [`WorkflowEngine::finalize_run`] for the result→DB-status mapping. /// Used by trigger and webhook paths to avoid code duplication. pub(crate) fn spawn_workflow_execution( engine: Arc, db: sprout_db::Db, - workflow_id: uuid::Uuid, run_id: uuid::Uuid, workflow_def_value: serde_json::Value, trigger_ctx: sprout_workflow::executor::TriggerContext, ) { tokio::spawn(async move { - // Transition to Running first — stamps started_at. - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Running, - 0, - &serde_json::Value::Array(vec![]), - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Running status: {e}"); - } - let def: sprout_workflow::WorkflowDef = match serde_json::from_value(workflow_def_value) { Ok(d) => d, Err(e) => { @@ -192,7 +178,7 @@ pub(crate) fn spawn_workflow_execution( run_id, sprout_db::workflow::RunStatus::Failed, 0, - &serde_json::Value::Null, + &serde_json::json!([]), Some(&format!("definition parse error: {e}")), ) .await @@ -203,104 +189,12 @@ pub(crate) fn spawn_workflow_execution( } }; - match sprout_workflow::executor::execute_run(&engine, run_id, &def, &trigger_ctx).await { - Ok(result) if result.approval_token.is_none() => { - let trace_json = serde_json::Value::Array(result.trace); - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Completed, - result.step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Completed status: {e}"); - } - } - Ok(result) => { - handle_approval_suspension(&db, &def, workflow_id, run_id, result).await; - } - Err(e) => { - tracing::error!("workflow run {run_id} failed: {e}"); - if let Err(db_err) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::Failed, - 0, - &serde_json::Value::Null, - Some(&e.to_string()), - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set Failed status: {db_err}"); - } - } - } + let result = + sprout_workflow::executor::execute_run(&engine, run_id, &def, &trigger_ctx).await; + engine.finalize_run(run_id, result, None).await; }); } -/// Persist approval-gate suspension state and create the approval record. -pub(crate) async fn handle_approval_suspension( - db: &sprout_db::Db, - def: &sprout_workflow::WorkflowDef, - workflow_id: uuid::Uuid, - run_id: uuid::Uuid, - result: sprout_workflow::executor::ExecutionResult, -) { - let approval_token = match result.approval_token { - Some(token) => token, - None => { - tracing::error!("workflow run {run_id}: handle_approval_suspension called but approval_token is None"); - return; - } - }; - let suspended_step_index = result.step_index; - let trace_json = serde_json::Value::Array(result.trace); - - if let Err(e) = db - .update_workflow_run( - run_id, - sprout_db::workflow::RunStatus::WaitingApproval, - suspended_step_index as i32, - &trace_json, - None, - ) - .await - { - tracing::error!("workflow run {run_id}: failed to set WaitingApproval status: {e}"); - } - - if let Some(suspended_step) = def.steps.get(suspended_step_index) { - let approver_spec = match &suspended_step.action { - sprout_workflow::ActionDef::RequestApproval { from, .. } => from.clone(), - _ => "any".to_string(), - }; - let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); - if let Err(e) = db - .create_approval(sprout_db::workflow::CreateApprovalParams { - token: &approval_token, - workflow_id, - run_id, - step_id: &suspended_step.id, - step_index: suspended_step_index as i32, - approver_spec: &approver_spec, - expires_at, - }) - .await - { - tracing::error!("workflow run {run_id}: failed to create approval record: {e}"); - } - } - - tracing::info!( - "workflow run {} suspended for approval at step {} (token: )", - run_id, - suspended_step_index, - ); -} - // ── Tests ───────────────────────────────────────────────────────────────────── #[cfg(test)] diff --git a/crates/sprout-relay/src/api/workflows.rs b/crates/sprout-relay/src/api/workflows.rs index 4dffa8497..f45651c06 100644 --- a/crates/sprout-relay/src/api/workflows.rs +++ b/crates/sprout-relay/src/api/workflows.rs @@ -88,6 +88,15 @@ pub async fn create_workflow( ) })?; + // Reject schedule/interval triggers until the cron scheduler is implemented. + // Accepting them would create workflows that silently never execute. + if matches!(def.trigger, sprout_workflow::TriggerDef::Schedule { .. }) { + return Err(api_error( + StatusCode::BAD_REQUEST, + "schedule and interval triggers are not yet supported — use message_posted, reaction_added, or webhook triggers", + )); + } + validate_webhook_urls(&def) .await .map_err(|e| api_error(StatusCode::BAD_REQUEST, &e))?; @@ -154,6 +163,8 @@ pub async fn get_workflow( if let Some(channel_id) = workflow.channel_id { check_channel_access(&state, channel_id, &pubkey_bytes).await?; + } else if workflow.owner_pubkey != pubkey_bytes { + return Err(forbidden("not authorized to access this workflow")); } Ok(Json(workflow_record_to_json(&workflow))) @@ -191,6 +202,8 @@ pub async fn update_workflow( if let Some(channel_id) = existing.channel_id { check_channel_access(&state, channel_id, &pubkey_bytes).await?; + } else if existing.owner_pubkey != pubkey_bytes { + return Err(forbidden("not authorized to access this workflow")); } let (def, definition_json_str) = @@ -201,6 +214,15 @@ pub async fn update_workflow( ) })?; + // Reject schedule/interval triggers until the cron scheduler is implemented. + // Accepting them would create workflows that silently never execute. + if matches!(def.trigger, sprout_workflow::TriggerDef::Schedule { .. }) { + return Err(api_error( + StatusCode::BAD_REQUEST, + "schedule and interval triggers are not yet supported — use message_posted, reaction_added, or webhook triggers", + )); + } + validate_webhook_urls(&def) .await .map_err(|e| api_error(StatusCode::BAD_REQUEST, &e))?; @@ -364,7 +386,6 @@ pub async fn trigger_workflow( spawn_workflow_execution( Arc::clone(&state.workflow_engine), state.db.clone(), - id, run_id, workflow.definition.clone(), trigger_ctx, @@ -477,7 +498,6 @@ pub async fn workflow_webhook( spawn_workflow_execution( Arc::clone(&state.workflow_engine), state.db.clone(), - id, run_id, workflow.definition.clone(), trigger_ctx, diff --git a/crates/sprout-relay/src/handlers/event.rs b/crates/sprout-relay/src/handlers/event.rs index 31eff02d0..be306fc96 100644 --- a/crates/sprout-relay/src/handlers/event.rs +++ b/crates/sprout-relay/src/handlers/event.rs @@ -2,12 +2,15 @@ use std::sync::Arc; +use hex; use tracing::{debug, error, info, warn}; use nostr::Event; use sprout_audit::{AuditAction, NewAuditEntry}; use sprout_core::event::StoredEvent; -use sprout_core::kind::KIND_PRESENCE_UPDATE; +use sprout_core::kind::{ + event_kind_u32, is_ephemeral, is_workflow_execution_kind, KIND_AUTH, KIND_PRESENCE_UPDATE, +}; use sprout_core::verification::verify_event; use sprout_auth::Scope; @@ -16,14 +19,10 @@ use crate::connection::{AuthState, ConnectionState}; use crate::protocol::RelayMessage; use crate::state::AppState; -const KIND_AUTH: u32 = 22242; -const EPHEMERAL_MIN: u32 = 20000; -const EPHEMERAL_MAX: u32 = 29999; - /// Handle an EVENT message: authenticate, verify, store, fan-out, index, and audit the event. pub async fn handle_event(event: Event, conn: Arc, state: Arc) { let event_id_hex = event.id.to_hex(); - let kind_u32 = event.kind.as_u16() as u32; + let kind_u32 = event_kind_u32(&event); debug!(event_id = %event_id_hex, kind = kind_u32, "EVENT"); let (conn_id, pubkey_hex, pubkey_bytes, auth_pubkey) = { @@ -76,7 +75,7 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc, state: Arc Some(ch_id), + ReactionChannelResult::NoChannel => { + // Target event exists but has no channel (global/DM message). + // Allow the reaction to proceed without channel scoping. + None + } + ReactionChannelResult::NotFound => { + // Fail closed: reject reactions to events we don't know about. + warn!( + event_id = %event_id_hex, + "Rejecting reaction: target event not found in DB" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "invalid: reaction target event not found", + )); + return; + } + ReactionChannelResult::NoTarget => { + // Malformed reaction: no valid `e` tag. + warn!( + event_id = %event_id_hex, + "Rejecting reaction: no valid e tag referencing target event" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "invalid: reaction must reference a target event via e tag", + )); + return; + } + ReactionChannelResult::DbError(ref err) => { + // Fail closed on transient DB errors — don't allow reactions + // through when we can't verify the target. + error!( + event_id = %event_id_hex, + "Rejecting reaction: database error looking up target: {err}" + ); + conn.send(RelayMessage::ok( + &event_id_hex, + false, + "error: internal error looking up reaction target", + )); + return; + } + } + } else { + extract_channel_id(&event) + }; if let Some(ch_id) = channel_id { if let Err(msg) = @@ -198,7 +250,7 @@ pub async fn handle_event(event: Event, conn: Arc, state: Arc 128 { let mut end = 128; @@ -350,6 +402,98 @@ async fn check_channel_membership( } } +/// Result of resolving a reaction's target channel. +enum ReactionChannelResult { + /// Target event found and has a channel_id. + Channel(uuid::Uuid), + /// Target event found but has no channel (global/DM message) — allow as global. + NoChannel, + /// Target event not found in DB — reject (fail closed). + NotFound, + /// No valid `e` tag on the reaction — reject (malformed). + NoTarget, + /// DB error during lookup — reject (fail closed on transient errors). + DbError(String), +} + +/// For NIP-25 reactions, derive the channel_id from the target event. +/// +/// Reactions reference their target via an `e` tag containing a 64-hex event ID. +/// We look up that event in the DB to find its channel_id. +/// +/// Returns a [`ReactionChannelResult`] so the caller can distinguish between +/// "target exists but is global" (allow) and "target not found" (reject). +async fn derive_reaction_channel( + db: &sprout_db::Db, + event: &nostr::Event, +) -> ReactionChannelResult { + // Find the target event ID from NIP-25 `e` tags. + // Per NIP-25, the last `e` tag is the target (in case of threading). + // Filter for 64-char hex event IDs inside find_map to skip UUID channel refs, + // consistent with build_trigger_context() in sprout-workflow/src/lib.rs. + let target_hex = match event.tags.iter().rev().find_map(|tag| { + let key = tag.kind().to_string(); + if key == "e" { + tag.content().and_then(|v| { + if v.len() == 64 && v.chars().all(|c| c.is_ascii_hexdigit()) { + Some(v.to_string()) + } else { + None + } + }) + } else { + None + } + }) { + Some(h) => h, + None => return ReactionChannelResult::NoTarget, + }; + + // Decode hex to bytes for DB lookup (already validated as 64-char hex above) + let id_bytes = match hex::decode(&target_hex) { + Ok(b) if b.len() == 32 => b, + _ => return ReactionChannelResult::NoTarget, + }; + + // Look up the target event to get its channel_id + match db.get_event_by_id(&id_bytes).await { + Ok(Some(target_event)) => { + if let Some(ch_id) = target_event.channel_id { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + channel_id = %ch_id, + "Derived reaction channel from target event" + ); + ReactionChannelResult::Channel(ch_id) + } else { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Target event has no channel — allowing as global reaction" + ); + ReactionChannelResult::NoChannel + } + } + Ok(None) => { + tracing::debug!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Target event not found in DB" + ); + ReactionChannelResult::NotFound + } + Err(e) => { + tracing::warn!( + reaction_id = %event.id.to_hex(), + target_id = %target_hex, + "Failed to look up target event: {e}" + ); + ReactionChannelResult::DbError(e.to_string()) + } + } +} + /// Extract a channel UUID from event tags. /// /// Checks both `"channel"` custom tags and `"e"` reference tags (clients use diff --git a/crates/sprout-relay/src/main.rs b/crates/sprout-relay/src/main.rs index 6bd3633f5..854a0e16c 100644 --- a/crates/sprout-relay/src/main.rs +++ b/crates/sprout-relay/src/main.rs @@ -68,15 +68,16 @@ async fn main() -> anyhow::Result<()> { ); info!("Redis pub/sub connected"); - let pubsub_clone = Arc::clone(&pubsub); - tokio::spawn(async move { pubsub_clone.run_subscriber().await }); + // TODO: spawn pubsub.run_subscriber() for multi-node fan-out. + // Currently no consumer calls subscribe_local(), so the subscriber + // would process Redis messages into a broadcast channel with zero receivers. let auth = AuthService::new(config.auth.clone()); let search_config = SearchConfig { url: config.typesense_url.clone(), api_key: config.typesense_key.clone(), - collection: "events".to_string(), + collection: std::env::var("TYPESENSE_COLLECTION").unwrap_or_else(|_| "events".to_string()), }; let search = SearchService::new(search_config); if let Err(e) = search.ensure_collection().await { @@ -86,9 +87,9 @@ async fn main() -> anyhow::Result<()> { let workflow_config = sprout_workflow::WorkflowConfig::default(); let workflow_engine = Arc::new(WorkflowEngine::new(db.clone(), workflow_config)); - // Spawn cron scheduler background task - let wf_clone = Arc::clone(&workflow_engine); - tokio::spawn(async move { wf_clone.run().await }); + // Spawn cron scheduler background task. + let wf_cron = Arc::clone(&workflow_engine); + tokio::spawn(async move { wf_cron.run().await }); let state = Arc::new(AppState::new( config.clone(), diff --git a/crates/sprout-search/src/index.rs b/crates/sprout-search/src/index.rs index f5b4dc63b..6eba8884a 100644 --- a/crates/sprout-search/src/index.rs +++ b/crates/sprout-search/src/index.rs @@ -4,6 +4,7 @@ use serde_json::{json, Value}; use tracing::{debug, warn}; use sprout_core::event::StoredEvent; +use sprout_core::kind::event_kind_i32; use crate::error::SearchError; @@ -34,7 +35,7 @@ pub fn event_to_document(event: &StoredEvent) -> Result { "id": nostr_event.id.to_string(), "content": nostr_event.content.as_str(), // Cast to i32 for Typesense schema (int32 field). nostr Kind is u16; all Sprout kinds fit in i32. - "kind": nostr_event.kind.as_u16() as i32, + "kind": event_kind_i32(nostr_event), "pubkey": nostr_event.pubkey.to_string(), "channel_id": channel_id, "created_at": nostr_event.created_at.as_u64() as i64, diff --git a/crates/sprout-test-client/tests/e2e_mcp.rs b/crates/sprout-test-client/tests/e2e_mcp.rs index 4c2cbb100..29470b92c 100644 --- a/crates/sprout-test-client/tests/e2e_mcp.rs +++ b/crates/sprout-test-client/tests/e2e_mcp.rs @@ -29,10 +29,10 @@ use std::time::Duration; use serde_json::{json, Value}; -// ── Seeded channel IDs (stable across relay restarts) ───────────────────────── +// ── Seeded channel IDs (UUID5-derived, stable across relay restarts) ────────── -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; -const CHANNEL_PROJECTS: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa2"; +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; +const CHANNEL_ENGINEERING: &str = "1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9"; // ── Helpers ─────────────────────────────────────────────────────────────────── @@ -546,7 +546,7 @@ async fn test_mcp_create_and_trigger_workflow() { let create_resp = session.call_tool( "create_workflow", json!({ - "channel_id": CHANNEL_PROJECTS, + "channel_id": CHANNEL_ENGINEERING, "yaml_definition": yaml_definition, }), ); @@ -588,7 +588,7 @@ async fn test_mcp_create_and_trigger_workflow() { let list_resp = session.call_tool( "list_workflows", json!({ - "channel_id": CHANNEL_PROJECTS, + "channel_id": CHANNEL_ENGINEERING, }), ); diff --git a/crates/sprout-test-client/tests/e2e_rest_api.rs b/crates/sprout-test-client/tests/e2e_rest_api.rs index 57aae6351..6b1e3a995 100644 --- a/crates/sprout-test-client/tests/e2e_rest_api.rs +++ b/crates/sprout-test-client/tests/e2e_rest_api.rs @@ -21,8 +21,9 @@ //! # Channel setup //! //! The relay exposes REST endpoints to list and create channels. Tests use the -//! pre-seeded open channels (`general`, `agents`, `projects`, etc.) for read +//! pre-seeded open channels (`general`, `agents`, `engineering`, etc.) for read //! operations and create temporary channels for write coverage when needed. +//! Some tests also send messages via WebSocket to set up search / feed data. use std::time::Duration; @@ -80,10 +81,10 @@ async fn authed_post_json( /// Known open channel IDs seeded in the dev database. /// -/// These are stable across relay restarts because they are inserted with -/// explicit UUIDs in the seed migration. -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; -const CHANNEL_PROJECTS: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa2"; +/// These are UUID5-derived from the channel name and are stable across relay +/// restarts as long as the seed data uses the same namespace + name inputs. +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; +const CHANNEL_ENGINEERING: &str = "1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9"; // ── Channel tests ───────────────────────────────────────────────────────────── @@ -220,8 +221,8 @@ async fn test_channel_visibility_open_channels_visible_to_all() { "expected seeded 'general' channel (id={CHANNEL_GENERAL})" ); assert!( - ids_a.contains(CHANNEL_PROJECTS), - "expected seeded 'projects' channel (id={CHANNEL_PROJECTS})" + ids_a.contains(CHANNEL_ENGINEERING), + "expected seeded 'engineering' channel (id={CHANNEL_ENGINEERING})" ); } diff --git a/crates/sprout-test-client/tests/e2e_workflows.rs b/crates/sprout-test-client/tests/e2e_workflows.rs index c348438b6..5d61eb956 100644 --- a/crates/sprout-test-client/tests/e2e_workflows.rs +++ b/crates/sprout-test-client/tests/e2e_workflows.rs @@ -47,9 +47,9 @@ fn http_client() -> Client { /// Known open channel IDs seeded in the dev database. /// -/// These are stable across relay restarts because they are inserted with -/// explicit UUIDs in the seed migration. -const CHANNEL_GENERAL: &str = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaa1"; +/// These are UUID5-derived from the channel name and are stable across relay +/// restarts as long as the seed data uses the same namespace + name inputs. +const CHANNEL_GENERAL: &str = "9a1657ac-f7aa-5db0-b632-d8bbeb6dfb50"; /// A seeded user pubkey that exists in the `users` table. /// @@ -317,6 +317,215 @@ async fn test_trigger_workflow_and_check_run() { assert_eq!(del_status, 204, "cleanup DELETE should return 204"); } +// ── Test 5: Event-driven workflow execution ─────────────────────────────────── + +/// Send a kind:40001 message to a channel that has a `message_posted` workflow. +/// Verify that the workflow engine creates a run record. +/// +/// NOTE: Uses `SEEDED_PUBKEY` for workflow ownership due to the FK constraint +/// on `workflows.owner_pubkey`. The WebSocket sender uses fresh keys. +#[tokio::test] +#[ignore = "requires running relay"] +async fn test_event_driven_workflow_execution() { + use nostr::{Kind, Tag}; + use sprout_test_client::SproutTestClient; + + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a message_posted workflow in the general channel ─────── + let workflow_yaml = r#"name: event-driven-e2e-test +description: E2E test for message_posted trigger +trigger: + on: message_posted +steps: + - id: step1 + name: Acknowledge + action: send_message + text: "Workflow fired by event" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + // ── Step 2: Connect via WebSocket and send a kind:40001 message ─────────── + // Use fresh keys for the sender (channel is open, no auth required to post). + let sender_keys = Keys::generate(); + let mut ws_client = SproutTestClient::connect(&relay_ws_url(), &sender_keys) + .await + .expect("ws connect failed"); + + let e_tag = Tag::parse(&["e", CHANNEL_GENERAL]).expect("tag parse failed"); + let event = + nostr::EventBuilder::new(Kind::Custom(40001), "trigger this workflow please", [e_tag]) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client + .send_event(event) + .await + .expect("send event failed"); + + // ── Step 3: Wait for the workflow engine to process the event ───────────── + tokio::time::sleep(Duration::from_secs(3)).await; + + // ── Step 4: Check that a run was created ────────────────────────────────── + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs failed"); + assert_eq!(runs_resp.status(), 200, "GET runs must return 200"); + + let runs: Vec = runs_resp.json().await.expect("runs must be JSON array"); + assert!( + !runs.is_empty(), + "expected at least one workflow run after sending kind:40001 event" + ); + + let run = &runs[0]; + assert!(run.get("id").is_some(), "run missing 'id'"); + assert!( + run.get("workflow_id").is_some(), + "run missing 'workflow_id'" + ); + assert!(run.get("status").is_some(), "run missing 'status'"); + + let status = run["status"].as_str().unwrap_or(""); + assert!( + matches!(status, "pending" | "running" | "completed" | "failed"), + "run status '{status}' is not a recognized value" + ); + + // Clean up. + let _ = ws_client.disconnect().await; + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} + +// ── Test 6: Event-driven workflow with filter ───────────────────────────────── + +/// Verify that a `message_posted` workflow with a filter expression only fires +/// when the filter matches. +/// +/// 1. Create a workflow with `filter: "str_contains(trigger_text, \"P1\")"`. +/// 2. Send a message that does NOT contain "P1" — expect zero runs. +/// 3. Send a message that DOES contain "P1" — expect one run. +/// +/// NOTE: Filter evaluation is wired in WF-07. Until then, all matched-kind +/// events fire the workflow regardless of filter. This test documents the +/// intended behaviour so it can be un-skipped once WF-07 lands. +#[tokio::test] +#[ignore = "requires running relay with WF-07 filter evaluation"] +async fn test_event_driven_workflow_with_filter() { + use nostr::{Kind, Tag}; + use sprout_test_client::SproutTestClient; + + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a filtered message_posted workflow ───────────────────── + let workflow_yaml = r#"name: filtered-event-e2e-test +description: E2E test for message_posted trigger with filter +trigger: + on: message_posted + filter: "str_contains(trigger_text, \"P1\")" +steps: + - id: step1 + name: Notify + action: send_message + text: "P1 incident detected" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + let sender_keys = Keys::generate(); + let mut ws_client = SproutTestClient::connect(&relay_ws_url(), &sender_keys) + .await + .expect("ws connect failed"); + + // ── Step 2: Send a message that does NOT match the filter ───────────────── + let e_tag = Tag::parse(&["e", CHANNEL_GENERAL]).expect("tag parse failed"); + let non_matching = nostr::EventBuilder::new( + Kind::Custom(40001), + "this is a routine update, nothing urgent", + [e_tag.clone()], + ) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client + .send_event(non_matching) + .await + .expect("send non-matching event failed"); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs (non-matching) failed"); + assert_eq!(runs_resp.status(), 200); + let runs_after_non_match: Vec = + runs_resp.json().await.expect("runs must be JSON array"); + assert!( + runs_after_non_match.is_empty(), + "non-matching message must NOT trigger a workflow run, but got {} run(s)", + runs_after_non_match.len() + ); + + // ── Step 3: Send a message that DOES match the filter ───────────────────── + let matching = + nostr::EventBuilder::new(Kind::Custom(40001), "P1 alert: database is down", [e_tag]) + .sign_with_keys(&sender_keys) + .expect("sign event"); + + ws_client + .send_event(matching) + .await + .expect("send matching event failed"); + + tokio::time::sleep(Duration::from_secs(3)).await; + + let runs_resp2 = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs (matching) failed"); + assert_eq!(runs_resp2.status(), 200); + let runs_after_match: Vec = + runs_resp2.json().await.expect("runs must be JSON array"); + assert!( + !runs_after_match.is_empty(), + "matching message must trigger a workflow run" + ); + + let run = &runs_after_match[0]; + let status = run["status"].as_str().unwrap_or(""); + assert!( + matches!(status, "pending" | "running" | "completed" | "failed"), + "run status '{status}' is not a recognized value" + ); + + // Clean up. + let _ = ws_client.disconnect().await; + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} + // ── Test 4: Workflow CRUD (update + delete) ─────────────────────────────────── /// Full CRUD lifecycle: @@ -417,3 +626,109 @@ async fn test_workflow_update_and_delete() { "GET after DELETE must return 404" ); } + +// ── Test 7: Approval gate (WF-08 stub) ──────────────────────────────────────── + +/// Create a workflow with a `request_approval` step, trigger it, and verify +/// the run fails with the "approval gates not yet implemented" message. +/// +/// This test documents the current stub behavior. When WF-08 is implemented, +/// this test should be updated to verify the full approval round-trip: +/// create → trigger → poll for waiting_approval → grant → verify completed. +#[tokio::test] +#[ignore] +async fn test_approval_gate_stub_fails_gracefully() { + let client = http_client(); + let pubkey_hex: &str = SEEDED_PUBKEY; + let base = relay_http_url(); + + // ── Step 1: Create a workflow with a request_approval step ──────────────── + let workflow_yaml = r#"name: approval-test +description: Test approval gate +trigger: + on: webhook +steps: + - id: step1 + name: Pre-approval step + action: send_message + text: "Before approval" + - id: approve + action: request_approval + from: "any" + message: "Please approve this workflow" + - id: step3 + name: Post-approval step + action: send_message + text: "After approval" +"#; + let created = create_workflow(&client, &base, pubkey_hex, CHANNEL_GENERAL, workflow_yaml).await; + let workflow_id = created["id"] + .as_str() + .expect("created workflow must have 'id'") + .to_string(); + + // ── Step 2: Trigger the workflow ────────────────────────────────────────── + let trigger_url = format!("{base}/api/workflows/{workflow_id}/trigger"); + let trigger_resp = client + .post(&trigger_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .unwrap_or_else(|e| panic!("POST {trigger_url} failed: {e}")); + + assert_eq!( + trigger_resp.status(), + 202, + "trigger endpoint must return 202 Accepted" + ); + + let trigger_body: serde_json::Value = trigger_resp + .json() + .await + .expect("trigger response must be JSON"); + let run_id = trigger_body["run_id"] + .as_str() + .expect("trigger response must include 'run_id'") + .to_string(); + + // ── Step 3: Poll until the run reaches a terminal status ────────────────── + let runs_url = format!("{base}/api/workflows/{workflow_id}/runs"); + let mut final_run: Option = None; + for _ in 0..10 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let runs_resp = client + .get(&runs_url) + .header("X-Pubkey", pubkey_hex) + .send() + .await + .expect("GET runs failed"); + assert_eq!(runs_resp.status(), 200, "GET runs must return 200"); + let runs: Vec = runs_resp.json().await.expect("runs must be JSON array"); + if let Some(run) = runs.iter().find(|r| r["id"].as_str() == Some(&run_id)) { + let status = run["status"].as_str().unwrap_or(""); + if matches!(status, "completed" | "failed" | "cancelled") { + final_run = Some(run.clone()); + break; + } + } + } + + // ── Step 4: Assert the run failed with the expected stub error ──────────── + let run = final_run.expect("run must reach a terminal status within 1 second"); + + assert_eq!( + run["status"].as_str().unwrap_or(""), + "failed", + "approval gate stub must cause the run to fail" + ); + + let error_msg = run["error"].as_str().unwrap_or(""); + assert!( + error_msg.contains("approval gates not yet implemented"), + "run error must contain 'approval gates not yet implemented', got: {error_msg:?}" + ); + + // ── Step 5: Clean up ────────────────────────────────────────────────────── + let del_status = delete_workflow(&client, &base, pubkey_hex, &workflow_id).await; + assert_eq!(del_status, 204, "cleanup DELETE should return 204"); +} diff --git a/crates/sprout-workflow/Cargo.toml b/crates/sprout-workflow/Cargo.toml index f8cd14a42..8ab919528 100644 --- a/crates/sprout-workflow/Cargo.toml +++ b/crates/sprout-workflow/Cargo.toml @@ -22,5 +22,8 @@ tracing = { workspace = true } thiserror = { workspace = true } reqwest = { workspace = true, optional = true } +[dev-dependencies] +nostr = { workspace = true } + [features] reqwest = ["dep:reqwest"] diff --git a/crates/sprout-workflow/src/error.rs b/crates/sprout-workflow/src/error.rs index 0e56e76a0..5d671fa95 100644 --- a/crates/sprout-workflow/src/error.rs +++ b/crates/sprout-workflow/src/error.rs @@ -2,6 +2,18 @@ use thiserror::Error; +/// Partial execution progress captured when a workflow step fails mid-run. +/// +/// This allows callers to persist whatever trace was accumulated before the +/// error, rather than losing it when the in-memory `Vec` is dropped. +#[derive(Debug, Default)] +pub struct PartialProgress { + /// Index of the step that failed (0-based). + pub step_index: usize, + /// Trace entries for steps completed/skipped before the failure. + pub trace: Vec, +} + /// Errors produced by the workflow engine. #[derive(Debug, Error)] pub enum WorkflowError { diff --git a/crates/sprout-workflow/src/executor.rs b/crates/sprout-workflow/src/executor.rs index c52d5fb27..720b03d37 100644 --- a/crates/sprout-workflow/src/executor.rs +++ b/crates/sprout-workflow/src/executor.rs @@ -602,10 +602,10 @@ pub async fn dispatch_action( Delay { duration } => { let secs = parse_duration_secs(duration)?; - // Cap delay at 300 seconds (5 minutes) to prevent tasks from holding - // a Tokio worker thread for extended periods. Long delays (hours/days) + // Cap delay at 270 seconds (4.5 minutes) — must be less than default_timeout_secs (300s) + // to avoid non-deterministic StepTimeout. Long delays (hours/days) // should use the scheduled resume pattern (future work: WF-09). - const MAX_DELAY_SECS: u64 = 300; + const MAX_DELAY_SECS: u64 = 270; if secs > MAX_DELAY_SECS { return Err(WorkflowError::InvalidDefinition(format!( "delay exceeds maximum of {MAX_DELAY_SECS} seconds (got {secs}s); \ @@ -640,13 +640,17 @@ pub(crate) fn parse_duration_secs(duration: &str) -> Result let hours: u64 = n.trim().parse().map_err(|_| { WorkflowError::InvalidDefinition(format!("invalid duration: {duration}")) })?; - return Ok(hours * 3600); + return hours.checked_mul(3600).ok_or_else(|| { + WorkflowError::InvalidDefinition(format!("duration overflow: {duration}")) + }); } if let Some(n) = duration.strip_suffix('m') { let mins: u64 = n.trim().parse().map_err(|_| { WorkflowError::InvalidDefinition(format!("invalid duration: {duration}")) })?; - return Ok(mins * 60); + return mins.checked_mul(60).ok_or_else(|| { + WorkflowError::InvalidDefinition(format!("duration overflow: {duration}")) + }); } if let Some(n) = duration.strip_suffix('s') { let secs: u64 = n.trim().parse().map_err(|_| { @@ -838,23 +842,50 @@ pub struct ExecutionResult { /// /// Enforces `engine.config.max_concurrent` via a semaphore — returns /// [`WorkflowError::CapacityExceeded`] immediately if all permits are taken. +/// Transitions the run to `Running` after acquiring a permit. pub async fn execute_run( engine: &WorkflowEngine, run_id: Uuid, def: &WorkflowDef, trigger_ctx: &TriggerContext, -) -> Result { - // Acquire a concurrency permit. `try_acquire` is non-blocking — if all - // permits are in use we return CapacityExceeded rather than queuing. - let _permit = engine - .run_semaphore - .try_acquire() - .map_err(|_| WorkflowError::CapacityExceeded)?; - - execute_from_step(engine, run_id, def, trigger_ctx, 0, None).await +) -> Result { + // Fail fast if all concurrency permits are in use — no queuing. + let _permit = engine.run_semaphore.try_acquire().map_err(|_| { + ( + WorkflowError::CapacityExceeded, + crate::error::PartialProgress::default(), + ) + })?; + + // Mark run as Running now that we have a permit. + engine + .db + .update_workflow_run( + run_id, + sprout_db::workflow::RunStatus::Running, + 0, + &serde_json::json!([]), + None, + ) + .await + .map_err(|e| { + ( + WorkflowError::from(e), + crate::error::PartialProgress::default(), + ) + })?; + + execute_steps(engine, run_id, def, trigger_ctx, 0, None).await } -/// Execute starting from a specific step index (used for approval resume). +/// Resume execution from a specific step index (used for approval resume). +/// +/// Acquires a concurrency permit from `engine.run_semaphore` before executing — +/// returns [`WorkflowError::CapacityExceeded`] immediately if all permits are +/// taken. +/// +/// Transitions the run to `Running` after acquiring a permit, so that +/// approval-resumed runs correctly reflect their active state. /// /// `initial_outputs` should be reconstructed from the execution trace before /// calling this function on resume, so that steps after the resume point can @@ -866,7 +897,69 @@ pub async fn execute_from_step( trigger_ctx: &TriggerContext, start_index: usize, initial_outputs: Option>, -) -> Result { +) -> Result { + // Fail fast if all concurrency permits are in use — no queuing. + let _permit = engine.run_semaphore.try_acquire().map_err(|_| { + ( + WorkflowError::CapacityExceeded, + crate::error::PartialProgress::default(), + ) + })?; + + // Mark run as Running now that we have a permit (resume from approval). + // Preserve the existing execution trace from pre-approval steps. + let existing_trace = match engine.db.get_workflow_run(run_id).await { + Ok(r) => r.execution_trace, + Err(e) => { + warn!( + run_id = %run_id, + "Failed to read existing trace for resume — pre-approval trace will be lost: {e}" + ); + serde_json::json!([]) + } + }; + engine + .db + .update_workflow_run( + run_id, + sprout_db::workflow::RunStatus::Running, + start_index as i32, + &existing_trace, + None, + ) + .await + .map_err(|e| { + ( + WorkflowError::from(e), + crate::error::PartialProgress::default(), + ) + })?; + + execute_steps( + engine, + run_id, + def, + trigger_ctx, + start_index, + initial_outputs, + ) + .await +} + +/// Internal: execute workflow steps starting from `start_index`, without +/// acquiring the semaphore. Called by both [`execute_run`] and +/// [`execute_from_step`] after they have already acquired a permit. +/// +/// On error, returns `(WorkflowError, PartialProgress)` so callers can persist +/// the trace of steps completed before the failure. +async fn execute_steps( + engine: &WorkflowEngine, + run_id: Uuid, + def: &WorkflowDef, + trigger_ctx: &TriggerContext, + start_index: usize, + initial_outputs: Option>, +) -> Result { let mut step_outputs: HashMap = initial_outputs.unwrap_or_default(); let mut trace: Vec = Vec::new(); @@ -892,27 +985,60 @@ pub async fn execute_from_step( } Err(e) => { warn!(run_id = %run_id, step = %step.id, "Condition error: {e}"); - return Err(e); + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); } } } // 2. Resolve template variables. - let resolved_action = resolve_step_templates(step, trigger_ctx, &step_outputs)?; + let resolved_action = match resolve_step_templates(step, trigger_ctx, &step_outputs) { + Ok(a) => a, + Err(e) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); + } + }; // 3. Dispatch action (with per-step timeout). let timeout_secs = step .timeout_secs .unwrap_or(engine.config.default_timeout_secs); - let result = tokio::time::timeout( + let dispatch_result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), dispatch_action(&step.id, &resolved_action, engine, run_id), ) - .await - .map_err(|_| WorkflowError::StepTimeout { - step_id: step.id.clone(), - timeout_secs, - })??; + .await; + + let result = match dispatch_result { + Ok(Ok(r)) => r, + Ok(Err(e)) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err((e, progress)); + } + Err(_timeout) => { + let progress = crate::error::PartialProgress { + step_index: i, + trace, + }; + return Err(( + WorkflowError::StepTimeout { + step_id: step.id.clone(), + timeout_secs, + }, + progress, + )); + } + }; match result { StepResult::Completed(output) => { diff --git a/crates/sprout-workflow/src/lib.rs b/crates/sprout-workflow/src/lib.rs index 01da68780..2192a919b 100644 --- a/crates/sprout-workflow/src/lib.rs +++ b/crates/sprout-workflow/src/lib.rs @@ -15,7 +15,7 @@ //! ## Usage //! //! ```rust,ignore -//! let engine = WorkflowEngine::new(db, WorkflowConfig::default()); +//! let engine = Arc::new(WorkflowEngine::new(db, WorkflowConfig::default())); //! //! // Parse and validate a YAML definition. //! let (def, json) = WorkflowEngine::parse_yaml(yaml_str)?; @@ -31,12 +31,15 @@ pub mod error; pub mod executor; pub mod schema; -pub use error::WorkflowError; +pub use error::{PartialProgress, WorkflowError}; pub use executor::ExecutionResult; pub use schema::{ActionDef, Step, TriggerDef, WorkflowDef}; +use std::collections::HashMap; use std::sync::Arc; +use sprout_core::kind::{event_kind_u32, is_workflow_execution_kind, KIND_REACTION}; +use sprout_db::workflow::RunStatus; use sprout_db::Db; use tokio::sync::Semaphore; @@ -90,26 +93,125 @@ impl WorkflowEngine { schema::parse_yaml(yaml) } + /// Finalize a workflow run after execution completes or fails. + /// + /// This is the **single** place that maps an executor result to a DB status + /// update. All execution paths (event-triggered, manual trigger/webhook, + /// approval resume) call this instead of duplicating the 3-way match. + /// + /// `existing_trace` is prepended to the executor's trace — used by the + /// approval-resume path where pre-approval steps already have trace entries. + pub async fn finalize_run( + &self, + run_id: uuid::Uuid, + result: Result, + existing_trace: Option>, + ) { + let prefix = existing_trace.unwrap_or_default(); + + match result { + Ok(result) => { + let mut full_trace = prefix; + full_trace.extend(result.trace); + let trace_json = serde_json::Value::Array(full_trace); + let step_count = result.step_index as i32; + + if result.approval_token.is_some() { + // Approval gates are not yet implemented (WF-08). + // Fail explicitly rather than creating unreachable WaitingApproval rows. + tracing::warn!( + run_id = %run_id, + step_index = result.step_index, + "Workflow hit approval gate — not yet implemented, marking as failed" + ); + if let Err(e) = self + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + step_count, + &trace_json, + Some("approval gates not yet implemented — see WF-08"), + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed (approval gate): {e}" + ); + } + } else { + tracing::info!(run_id = %run_id, "Workflow run completed"); + if let Err(e) = self + .db + .update_workflow_run( + run_id, + RunStatus::Completed, + step_count, + &trace_json, + None, + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Completed: {e}" + ); + } + } + } + Err((e, progress)) => { + tracing::error!(run_id = %run_id, "Workflow run failed: {e}"); + let mut full_trace = prefix; + full_trace.extend(progress.trace); + let trace_json = serde_json::Value::Array(full_trace); + if let Err(db_err) = self + .db + .update_workflow_run( + run_id, + RunStatus::Failed, + progress.step_index as i32, + &trace_json, + Some(&e.to_string()), + ) + .await + { + tracing::error!( + run_id = %run_id, + "Failed to update run to Failed: {db_err}" + ); + } + } + } + } + /// Called from the event handler post-store hook for every stored event. /// /// Checks whether any workflow in the event's channel has a matching trigger. /// Workflow execution events (kinds 46001–46012) are excluded to prevent loops. /// - /// Full trigger matching and execution spawning is wired in WF-07/08. - pub async fn on_event(&self, event: &sprout_core::StoredEvent) -> Result<(), WorkflowError> { + /// The method takes `self: &Arc` so that the spawned task can hold a + /// clone of the `Arc` without requiring `'static` on `&self`. + pub async fn on_event( + self: &Arc, + event: &sprout_core::StoredEvent, + ) -> Result<(), WorkflowError> { let Some(channel_id) = event.channel_id else { + tracing::debug!( + event_id = %event.event.id.to_hex(), + kind = event_kind_u32(&event.event), + "Skipping workflow trigger — event has no channel_id" + ); return Ok(()); }; - let kind_u32 = event.event.kind.as_u16() as u32; + let kind_u32 = event_kind_u32(&event.event); // Exclude workflow execution events to prevent infinite loops. - // See Decision 10 in PLANS/SPROUT_WORKFLOWS.md. - if (46001..=46012).contains(&kind_u32) { + if is_workflow_execution_kind(kind_u32) { return Ok(()); } - // Load enabled workflows for this channel. let workflows = self .db .list_enabled_channel_workflows(channel_id) @@ -120,35 +222,65 @@ impl WorkflowEngine { return Ok(()); } + let trigger_ctx = build_trigger_context(event); + + let trigger_ctx_json: serde_json::Value = match serde_json::to_value(&trigger_ctx) { + Ok(v) => v, + Err(e) => { + tracing::error!("Failed to serialize trigger context: {e}"); + return Ok(()); + } + }; + for workflow in &workflows { - // Parse the stored JSON definition. let def: WorkflowDef = match serde_json::from_value(workflow.definition.clone()) { Ok(d) => d, Err(e) => { - tracing::warn!( - workflow_id = %workflow.id, - "Failed to parse workflow definition: {e}" - ); + tracing::warn!(workflow_id = %workflow.id, "Failed to parse definition: {e}"); continue; } }; - if !def.enabled { + if !def.enabled || !trigger_matches_event(&def.trigger, kind_u32) { continue; } - // Check if the trigger type matches the event kind. - if !trigger_matches_event(&def.trigger, kind_u32) { + if !should_fire_workflow(&def, &trigger_ctx, workflow.id).await { continue; } - // TODO (WF-07): evaluate trigger filter expression, create workflow_run - // in DB, build TriggerContext from event, spawn execute_run(). + // Create the workflow_run row (status: pending). + let trigger_event_id_bytes = event.event.id.as_bytes().to_vec(); + let run_id = match self + .db + .create_workflow_run( + workflow.id, + Some(&trigger_event_id_bytes), + Some(&trigger_ctx_json), + ) + .await + { + Ok(id) => id, + Err(e) => { + tracing::error!(workflow_id = %workflow.id, "Failed to create run: {e}"); + continue; + } + }; + tracing::debug!( workflow_id = %workflow.id, - event_kind = kind_u32, - "Workflow trigger matched — execution wired in WF-07" + run_id = %run_id, + "Workflow triggered — spawning execution" ); + + let engine = Arc::clone(self); + let def_clone = def.clone(); + let ctx_clone = trigger_ctx.clone(); + + tokio::spawn(async move { + let result = executor::execute_run(&engine, run_id, &def_clone, &ctx_clone).await; + engine.finalize_run(run_id, result, None).await; + }); } Ok(()) @@ -156,20 +288,143 @@ impl WorkflowEngine { /// Background task for scheduled (cron) triggers. /// - /// Runs indefinitely. Checks cron schedules every minute and fires - /// matching workflows. - /// - /// TODO (WF-07): implement cron schedule matching and execution. + /// Runs indefinitely. Cron trigger matching requires a cross-channel + /// workflow query (`list_all_enabled_workflows`) that doesn't exist yet. + /// Interval triggers need last-run tracking. Both are deferred to WF-09. pub async fn run(&self) { loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; - // TODO (WF-07): load schedule-triggered workflows, check cron expressions, - // spawn executions for any that are due. - tracing::debug!("WorkflowEngine::run tick — cron check (not yet implemented)"); + // Cron trigger matching requires a cross-channel workflow query + // (list_all_enabled_workflows) that doesn't exist yet. Interval + // triggers need last-run tracking. Both are deferred to WF-09. + tracing::trace!("WorkflowEngine::run tick — cron/interval triggers not yet wired"); } } } +// ── Pre-trigger filtering ───────────────────────────────────────────────────── + +/// Check emoji and filter-expression conditions that determine whether a +/// matched workflow should actually fire. Extracted from `on_event` to keep +/// the per-workflow loop body small. +/// +/// Returns `true` if the workflow should fire, `false` to skip. +async fn should_fire_workflow( + def: &WorkflowDef, + trigger_ctx: &executor::TriggerContext, + workflow_id: uuid::Uuid, +) -> bool { + // Enforce reaction emoji filter. + if let TriggerDef::ReactionAdded { + emoji: Some(ref expected), + } = def.trigger + { + if &trigger_ctx.emoji != expected { + tracing::debug!( + workflow_id = %workflow_id, + expected_emoji = %expected, + actual_emoji = %trigger_ctx.emoji, + "Reaction emoji mismatch — skipping workflow" + ); + return false; + } + } + + // Evaluate trigger filter expression (MessagePosted only). + if let TriggerDef::MessagePosted { + filter: Some(ref expr), + } = def.trigger + { + match executor::evaluate_condition(expr, trigger_ctx, &HashMap::new()).await { + Ok(true) => {} + Ok(false) => { + tracing::debug!( + workflow_id = %workflow_id, + "Trigger filter evaluated false — skipping workflow" + ); + return false; + } + Err(e) => { + tracing::warn!( + workflow_id = %workflow_id, + "Trigger filter error: {e} — skipping workflow" + ); + return false; + } + } + } + + true +} + +// ── Trigger context builder ─────────────────────────────────────────────────── + +/// Build a [`executor::TriggerContext`] from a [`sprout_core::StoredEvent`]. +/// +/// - `text` — event content (message body or reaction emoji character) +/// - `author` — pubkey hex string +/// - `channel_id` — channel UUID as string (empty if no channel scope) +/// - `timestamp` — Unix timestamp as string +/// - `emoji` — for `KIND_REACTION` events, the content is the emoji; otherwise empty +/// - `message_id` — for reactions, the target message's event ID (from `e` tag); +/// for all other events, the event's own ID +pub fn build_trigger_context(event: &sprout_core::StoredEvent) -> executor::TriggerContext { + let kind_u32 = event_kind_u32(&event.event); + let content = event.event.content.clone(); + + // For reaction events (NIP-25), the content field holds the emoji character + // or shortcode (e.g. "👍", "+", "-"). Expose it as `emoji`. + let emoji = if kind_u32 == KIND_REACTION { + content.clone() + } else { + String::new() + }; + + // For reactions (NIP-25), `message_id` should be the target message, not + // the reaction event itself. NIP-25 stores the target in an `e` tag whose + // value is a 64-char hex event ID (not a UUID channel reference). + // Per NIP-25, the last `e` tag is the direct target (earlier ones may be thread roots). + let message_id = if kind_u32 == KIND_REACTION { + event + .event + .tags + .iter() + .rev() + .find_map(|tag| { + let key = tag.kind().to_string(); + if key == "e" { + tag.content().and_then(|v| { + // Distinguish hex event IDs (64 chars) from UUID channel refs. + if v.len() == 64 && v.chars().all(|c| c.is_ascii_hexdigit()) { + Some(v.to_string()) + } else { + None + } + }) + } else { + None + } + }) + // Fallback to the reaction event's own ID if no valid `e` tag found. + .unwrap_or_else(|| event.event.id.to_hex()) + } else { + event.event.id.to_hex() + }; + + executor::TriggerContext { + text: content, + author: event.event.pubkey.to_hex(), + channel_id: event + .channel_id + .map(|id| id.to_string()) + .unwrap_or_default(), + timestamp: event.event.created_at.as_u64().to_string(), + emoji, + message_id, + webhook_fields: HashMap::new(), + } +} + // ── Trigger matching ────────────────────────────────────────────────────────── /// Returns `true` if the trigger type matches the given event kind. @@ -257,7 +512,10 @@ steps: &trigger, sprout_core::kind::KIND_REACTION )); - assert!(!trigger_matches_event(&trigger, 46001)); + assert!(!trigger_matches_event( + &trigger, + sprout_core::kind::KIND_WORKFLOW_TRIGGERED + )); } #[test] @@ -326,7 +584,9 @@ steps: let msg_trigger = TriggerDef::MessagePosted { filter: None }; let react_trigger = TriggerDef::ReactionAdded { emoji: None }; - for kind in 46001u32..=46012 { + for kind in sprout_core::kind::KIND_WORKFLOW_TRIGGERED + ..=sprout_core::kind::KIND_WORKFLOW_APPROVAL_DENIED + { assert!( !trigger_matches_event(&msg_trigger, kind), "message_posted should not match workflow execution kind {kind}" @@ -364,4 +624,139 @@ steps: assert_eq!(cfg.max_concurrent, 50); assert_eq!(cfg.default_timeout_secs, 600); } + + // ── build_trigger_context ───────────────────────────────────────────────── + + fn make_message_event() -> sprout_core::StoredEvent { + use nostr::{EventBuilder, Keys, Kind}; + use uuid::Uuid; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::Custom(40001), "hello world", []) + .sign_with_keys(&keys) + .expect("sign"); + sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())) + } + + /// Create a reaction event with an `e` tag pointing to a target message. + fn make_reaction_event() -> (sprout_core::StoredEvent, String) { + use nostr::{EventBuilder, Keys, Kind, Tag}; + use uuid::Uuid; + let keys = Keys::generate(); + // Create a dummy target message ID (64-char hex). + let target_keys = Keys::generate(); + let target_event = EventBuilder::new(Kind::Custom(40001), "target msg", []) + .sign_with_keys(&target_keys) + .expect("sign target"); + let target_id_hex = target_event.id.to_hex(); + // NIP-25: reaction references the target via an `e` tag. + let e_tag = Tag::parse(&["e", &target_id_hex]).expect("tag parse"); + let event = EventBuilder::new(Kind::Reaction, "👍", [e_tag]) + .sign_with_keys(&keys) + .expect("sign"); + ( + sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())), + target_id_hex, + ) + } + + #[test] + fn build_trigger_context_message_event() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + + assert_eq!(ctx.text, "hello world"); + assert_eq!(ctx.author, stored.event.pubkey.to_hex()); + assert_eq!(ctx.channel_id, stored.channel_id.unwrap().to_string()); + assert_eq!(ctx.timestamp, stored.event.created_at.as_u64().to_string()); + assert_eq!(ctx.message_id, stored.event.id.to_hex()); + // Non-reaction events have empty emoji. + assert_eq!(ctx.emoji, ""); + assert!(ctx.webhook_fields.is_empty()); + } + + #[test] + fn build_trigger_context_reaction_event() { + let (stored, target_id_hex) = make_reaction_event(); + let ctx = build_trigger_context(&stored); + + // For reactions, content IS the emoji. + assert_eq!(ctx.text, "👍"); + assert_eq!(ctx.emoji, "👍"); + assert_eq!(ctx.author, stored.event.pubkey.to_hex()); + // message_id should be the TARGET message, not the reaction event itself. + assert_eq!(ctx.message_id, target_id_hex); + assert_ne!(ctx.message_id, stored.event.id.to_hex()); + assert!(ctx.webhook_fields.is_empty()); + } + + #[test] + fn build_trigger_context_no_channel_id() { + use nostr::{EventBuilder, Keys, Kind}; + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::Custom(40001), "msg", []) + .sign_with_keys(&keys) + .expect("sign"); + // channel_id = None (global/DM event) + let stored = sprout_core::StoredEvent::new(event, None); + let ctx = build_trigger_context(&stored); + + assert_eq!(ctx.channel_id, ""); + assert_eq!(ctx.text, "msg"); + } + + #[test] + fn build_trigger_context_author_is_hex_pubkey() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Pubkey hex is 64 lowercase hex characters. + assert_eq!(ctx.author.len(), 64); + assert!(ctx.author.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn build_trigger_context_message_id_is_hex() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Event ID hex is 64 lowercase hex characters. + assert_eq!(ctx.message_id.len(), 64); + assert!(ctx.message_id.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn build_trigger_context_timestamp_is_numeric_string() { + let stored = make_message_event(); + let ctx = build_trigger_context(&stored); + // Timestamp must parse as a u64. + ctx.timestamp + .parse::() + .expect("timestamp should be a u64 string"); + } + + #[test] + fn test_build_trigger_context_reaction_multiple_e_tags() { + // NIP-25: last e tag is the direct target, first may be thread root + use nostr::{EventBuilder, EventId, Keys, Kind, Tag}; + use uuid::Uuid; + + let keys = Keys::generate(); + let thread_root_id = EventId::all_zeros(); + let direct_target_id = EventId::from_byte_array([0x42; 32]); + + let event = EventBuilder::new( + Kind::Reaction, + "👍", + [ + Tag::parse(&["e", &thread_root_id.to_hex()]).unwrap(), + Tag::parse(&["e", &direct_target_id.to_hex()]).unwrap(), + ], + ) + .sign_with_keys(&keys) + .expect("sign"); + + let stored = sprout_core::StoredEvent::new(event, Some(Uuid::new_v4())); + let ctx = build_trigger_context(&stored); + + // Should pick the LAST e tag (direct target), not the first (thread root) + assert_eq!(ctx.message_id, direct_target_id.to_hex()); + } }