diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index 76df207f7..2c9130ec7 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -577,6 +577,10 @@ export interface BrowserSection { close_policy: "close_browser" | "close_tabs" | "detach"; } +export interface ChannelSection { + listen_only_mode: boolean; +} + export interface SandboxSection { mode: "enabled" | "disabled"; writable_paths: string[]; @@ -595,6 +599,7 @@ export interface AgentConfigResponse { coalesce: CoalesceSection; memory_persistence: MemoryPersistenceSection; browser: BrowserSection; + channel: ChannelSection; discord: DiscordSection; sandbox: SandboxSection; } @@ -661,6 +666,10 @@ export interface BrowserUpdate { close_policy?: "close_browser" | "close_tabs" | "detach"; } +export interface ChannelUpdate { + listen_only_mode?: boolean; +} + export interface SandboxUpdate { mode?: "enabled" | "disabled"; writable_paths?: string[]; @@ -679,6 +688,7 @@ export interface AgentConfigUpdateRequest { coalesce?: CoalesceUpdate; memory_persistence?: MemoryPersistenceUpdate; browser?: BrowserUpdate; + channel?: ChannelUpdate; discord?: DiscordUpdate; sandbox?: SandboxUpdate; } diff --git a/interface/src/routes/AgentConfig.tsx b/interface/src/routes/AgentConfig.tsx index d9a2ecded..363f915c5 100644 --- a/interface/src/routes/AgentConfig.tsx +++ b/interface/src/routes/AgentConfig.tsx @@ -15,7 +15,7 @@ function supportsAdaptiveThinking(modelId: string): boolean { || id.includes("sonnet-4-6") || id.includes("sonnet-4.6"); } -type SectionId = "soul" | "identity" | "user" | "routing" | "tuning" | "compaction" | "cortex" | "coalesce" | "memory" | "browser" | "sandbox"; +type SectionId = "soul" | "identity" | "user" | "routing" | "tuning" | "compaction" | "cortex" | "coalesce" | "memory" | "browser" | "channel" | "sandbox"; const SECTIONS: { id: SectionId; @@ -34,6 +34,7 @@ const SECTIONS: { { id: "coalesce", label: "Coalesce", group: "config", description: "Message batching", detail: "When multiple messages arrive in quick succession, coalescing batches them into a single LLM turn. This prevents the agent from responding to each message individually in fast-moving conversations." }, { id: "memory", label: "Memory Persistence", group: "config", description: "Auto-save interval", detail: "Spawns a silent background branch at regular intervals to recall existing memories and save new ones from the recent conversation. Runs without blocking the channel." }, { id: "browser", label: "Browser", group: "config", description: "Chrome automation", detail: "Controls browser automation tools available to workers. When enabled, workers can navigate web pages, take screenshots, and interact with sites. JavaScript evaluation is a separate permission." }, + { id: "channel", label: "Channel Behavior", group: "config", description: "Reply behavior", detail: "Listen-only mode suppresses unsolicited replies in busy channels. The agent still responds to slash commands, @mentions, and replies to its own messages." }, { id: "sandbox", label: "Sandbox", group: "config", description: "Process containment", detail: "OS-level filesystem containment for shell and exec tool subprocesses. When enabled, worker processes run inside a kernel-enforced sandbox (bubblewrap on Linux, sandbox-exec on macOS) with an allowlist-only filesystem — only system paths, the workspace, and explicitly configured extra paths are accessible." }, ]; @@ -64,7 +65,7 @@ export function AgentConfig({ agentId }: AgentConfigProps) { // Sync activeSection with URL search param useEffect(() => { if (search.tab) { - const validSections: SectionId[] = ["soul", "identity", "user", "routing", "tuning", "compaction", "cortex", "coalesce", "memory", "browser", "sandbox"]; + const validSections = SECTIONS.map((section) => section.id); if (validSections.includes(search.tab as SectionId)) { setActiveSection(search.tab as SectionId); } @@ -414,6 +415,7 @@ const SANDBOX_DEFAULTS = { mode: "enabled" as const, writable_paths: [] as strin function ConfigSectionEditor({ sectionId, label, description, detail, config, onDirtyChange, saveHandlerRef, onSave }: ConfigSectionEditorProps) { type ConfigValues = Record; const sandbox = config.sandbox ?? SANDBOX_DEFAULTS; + const channel = config.channel ?? { listen_only_mode: false }; const [localValues, setLocalValues] = useState(() => { // Initialize from config based on section switch (sectionId) { @@ -431,6 +433,8 @@ function ConfigSectionEditor({ sectionId, label, description, detail, config, on return { ...config.memory_persistence } as ConfigValues; case "browser": return { ...config.browser } as ConfigValues; + case "channel": + return { ...channel } as ConfigValues; case "sandbox": return { mode: sandbox.mode, writable_paths: sandbox.writable_paths } as ConfigValues; default: @@ -469,6 +473,9 @@ function ConfigSectionEditor({ sectionId, label, description, detail, config, on case "browser": setLocalValues({ ...config.browser }); break; + case "channel": + setLocalValues({ ...channel }); + break; case "sandbox": setLocalValues({ mode: sandbox.mode, writable_paths: sandbox.writable_paths }); break; @@ -509,6 +516,9 @@ function ConfigSectionEditor({ sectionId, label, description, detail, config, on case "browser": setLocalValues({ ...config.browser }); break; + case "channel": + setLocalValues({ ...channel }); + break; case "sandbox": setLocalValues({ mode: sandbox.mode, writable_paths: sandbox.writable_paths }); break; @@ -875,6 +885,17 @@ function ConfigSectionEditor({ sectionId, label, description, detail, config, on ); + case "channel": + return ( +
+ handleChange("listen_only_mode", v)} + /> +
+ ); default: return null; } diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 238911b76..e62dad550 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -174,6 +174,11 @@ pub struct Channel { pending_results: Vec, /// Optional send_agent_message tool (only when agent has active links). send_agent_message_tool: Option, + /// Channel-local reply mode toggle. + /// When true, suppress unsolicited replies unless explicitly invoked. + listen_only_mode: bool, + /// Session-scoped override used when persistence is unavailable/failed. + listen_only_session_override: Option, } impl Channel { @@ -249,6 +254,7 @@ impl Channel { }; let self_tx = message_tx.clone(); + let resolved_listen_only_mode = deps.runtime_config.channel_config.load().listen_only_mode; let channel = Self { id: id.clone(), title: None, @@ -275,6 +281,8 @@ impl Channel { retrigger_deadline: None, pending_results: Vec::new(), send_agent_message_tool, + listen_only_mode: resolved_listen_only_mode, + listen_only_session_override: None, }; (channel, message_tx) @@ -300,10 +308,339 @@ impl Channel { .filter(|adapter| !adapter.is_empty()) } + fn sync_listen_only_mode_from_runtime(&mut self) { + if let Some(override_mode) = self.listen_only_session_override { + self.listen_only_mode = override_mode; + return; + } + let runtime_default = self + .deps + .runtime_config + .channel_config + .load() + .listen_only_mode; + let explicit_listen_only = **self.deps.runtime_config.channel_listen_only_explicit.load(); + let settings_store = self + .deps + .runtime_config + .settings + .load() + .as_ref() + .as_ref() + .cloned(); + self.listen_only_mode = if explicit_listen_only.is_some() { + runtime_default + } else if let Some(settings_store) = settings_store { + match settings_store.channel_listen_only_mode_for(self.id.as_ref()) { + Ok(Some(enabled)) => enabled, + Ok(None) => runtime_default, + Err(error) => { + tracing::warn!( + %error, + channel_id = %self.id, + "failed to sync channel-scoped listen_only_mode setting" + ); + runtime_default + } + } + } else { + runtime_default + }; + } + + fn set_listen_only_mode(&mut self, enabled: bool) -> bool { + let mut persisted = false; + let settings_store = self + .deps + .runtime_config + .settings + .load() + .as_ref() + .as_ref() + .cloned(); + if let Some(settings_store) = settings_store { + match settings_store.set_channel_listen_only_mode_for(self.id.as_ref(), enabled) { + Ok(()) => persisted = true, + Err(error) => { + tracing::warn!( + %error, + channel_id = %self.id, + listen_only_mode = enabled, + "failed to persist listen_only_mode setting" + ); + } + } + } else { + tracing::warn!( + channel_id = %self.id, + listen_only_mode = enabled, + "settings store unavailable; listen_only_mode is session-scoped" + ); + } + + self.listen_only_mode = enabled; + self.listen_only_session_override = if persisted { None } else { Some(enabled) }; + persisted + } + + fn persist_inbound_user_message(&self, message: &InboundMessage, raw_text: &str) { + if message.source == "system" { + return; + } + let sender_name = message + .metadata + .get("sender_display_name") + .and_then(|v| v.as_str()) + .unwrap_or(&message.sender_id); + self.state.conversation_logger.log_user_message( + &self.state.channel_id, + sender_name, + &message.sender_id, + raw_text, + &message.metadata, + ); + self.state + .channel_store + .upsert(&message.conversation_id, &message.metadata); + } + fn suppress_plaintext_fallback(&self) -> bool { matches!(self.current_adapter(), Some("email")) } + fn rewrite_tool_routed_command_prompt(&self, raw_text: &str) -> Option { + match raw_text.trim() { + "/tasks" => Some( + "use channel tools to fetch my ready tasks (limit 10) and reply exactly with:\n\ + - header: tasks (ready):\n\ + - each line: - # [] \n\ + if no tasks are ready, reply exactly: tasks (ready): none" + .to_string(), + ), + "/today" => Some( + "use channel tools to build a local tasks snapshot and reply exactly in this format:\n\ + - first line: today (local tasks snapshot):\n\ + - section 1: in-progress tasks (up to 5), each line: #<task_number> [<priority>] <title>\n\ + - section 2: up next ready tasks (up to 5), each line: #<task_number> [<priority>] <title>\n\ + if a section is empty use:\n\ + - in progress: none\n\ + - up next (ready): none" + .to_string(), + ), + "/digest" => Some( + "using available tools and channel context, generate a concise day digest from local 00:00 to now with exactly this order:\n\ + 1) top decisions\n\ + 2) key convo themes\n\ + 3) open loops\n\ + keep it practical and concise; if there are no meaningful updates, reply exactly: no material updates today." + .to_string(), + ), + _ => None, + } + } + + fn compute_listen_mode_invocation( + &self, + message: &InboundMessage, + raw_text: &str, + ) -> (bool, bool, bool) { + let text = raw_text.trim(); + let invoked_by_command = text.starts_with('/'); + let invoked_by_mention = match message.source.as_str() { + "telegram" => { + let text_lower = text.to_lowercase(); + message + .metadata + .get("telegram_bot_username") + .and_then(|v| v.as_str()) + .map(|username| { + let mention = format!("@{}", username.to_lowercase()); + text_lower.match_indices(&mention).any(|(start, _)| { + let end = start + mention.len(); + let before_ok = start == 0 + || text_lower[..start].chars().next_back().is_none_or( + |character| { + !(character.is_ascii_alphanumeric() || character == '_') + }, + ); + let after_ok = end == text_lower.len() + || text_lower[end..].chars().next().is_none_or(|character| { + !(character.is_ascii_alphanumeric() || character == '_') + }); + before_ok && after_ok + }) + }) + .unwrap_or(false) + } + "discord" => message + .metadata + .get("discord_mentioned_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + "slack" => message + .metadata + .get("slack_mentions_or_replies_to_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + "twitch" => message + .metadata + .get("twitch_mentions_or_replies_to_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + _ => false, + }; + let invoked_by_reply = match message.source.as_str() { + // Use bot-specific reply metadata; generic reply_to_is_bot can + // match unrelated bots and cause false invokes. + "discord" => message + .metadata + .get("discord_reply_to_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + "telegram" => { + let reply_to_is_bot = message + .metadata + .get("reply_to_is_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let bot_username = message + .metadata + .get("telegram_bot_username") + .and_then(|v| v.as_str()) + .map(str::to_lowercase); + let reply_username = message + .metadata + .get("reply_to_username") + .and_then(|v| v.as_str()) + .map(str::to_lowercase); + reply_to_is_bot + && reply_username + .zip(bot_username) + .is_some_and(|(reply, bot)| bot == reply) + } + _ => message + .metadata + .get("reply_to_is_bot") + .and_then(|v| v.as_bool()) + .unwrap_or(false), + }; + + (invoked_by_command, invoked_by_mention, invoked_by_reply) + } + + async fn send_builtin_text(&mut self, text: String, log_label: &str) { + if let Err(error) = self + .response_tx + .send(OutboundResponse::Text(text.clone())) + .await + { + tracing::error!(%error, channel_id = %self.id, %log_label, "failed to send built-in reply"); + return; + } + self.state.conversation_logger.log_bot_message_with_name( + &self.state.channel_id, + &text, + Some(self.agent_display_name()), + ); + } + + async fn try_handle_builtin_ops_commands( + &mut self, + raw_text: &str, + message: &InboundMessage, + ) -> Result<bool> { + if message.source == "system" { + return Ok(false); + } + let supported_source = matches!( + message.source.as_str(), + "telegram" | "discord" | "slack" | "twitch" + ); + if !supported_source { + return Ok(false); + } + + let text = raw_text.trim(); + if !text.starts_with('/') { + return Ok(false); + } + + let temporal_context = TemporalContext::from_runtime(self.deps.runtime_config.as_ref()); + let now_line = temporal_context.current_time_line(); + + match text { + "/status" => { + let routing = self.deps.runtime_config.routing.load(); + let channel_model = routing.resolve(ProcessType::Channel, None).to_string(); + let branch_model = routing.resolve(ProcessType::Branch, None).to_string(); + let mode = if self.listen_only_mode { + "quiet" + } else { + "active" + }; + let adapter = self.current_adapter().unwrap_or("unknown"); + let body = format!( + "status\n\ + - agent: {}\n\ + - channel: {}\n\ + - adapter: {}\n\ + - mode: {} (quiet => only command/@mention/reply-to-bot)\n\ + - channel model: {}\n\ + - branch model: {}\n\ + - time: {}", + self.deps.agent_id, + self.id, + adapter, + mode, + channel_model, + branch_model, + now_line + ); + self.send_builtin_text(body, "status").await; + return Ok(true); + } + "/quiet" => { + let persisted = self.set_listen_only_mode(true); + let body = if persisted { + "quiet mode enabled. i'll only reply to commands, @mentions, or replies to my message." + .to_string() + } else { + "quiet mode enabled for this session, but persistence failed; it may revert after restart.".to_string() + }; + self.send_builtin_text(body, "quiet").await; + return Ok(true); + } + "/active" => { + let persisted = self.set_listen_only_mode(false); + let body = if persisted { + "active mode enabled. i'll respond normally in this chat.".to_string() + } else { + "active mode enabled for this session, but persistence failed; it may revert after restart.".to_string() + }; + self.send_builtin_text(body, "active").await; + return Ok(true); + } + "/help" => { + let lines = [ + "commands:".to_string(), + "- /status: current mode, models, binding snapshot".to_string(), + "- /today: in-progress + ready task snapshot".to_string(), + "- /tasks: ready task list".to_string(), + "- /digest: one-shot day digest (00:00 -> now)".to_string(), + "- /quiet: listen-only mode".to_string(), + "- /active: normal reply mode".to_string(), + "- /agent-id: runtime agent id".to_string(), + ]; + let body = lines.join("\n"); + self.send_builtin_text(body, "help").await; + return Ok(true); + } + _ => {} + } + + Ok(false) + } + /// Run the channel event loop. pub async fn run(mut self) -> Result<()> { tracing::info!(channel_id = %self.id, "channel started"); @@ -413,6 +750,17 @@ impl Channel { if config.multi_user_only && self.is_dm() { return false; } + // Built-in slash commands should execute immediately and never be batched. + let looks_like_command = match &message.content { + crate::MessageContent::Text(text) => text.trim_start().starts_with('/'), + crate::MessageContent::Media { text, .. } => text + .as_deref() + .is_some_and(|value| value.trim_start().starts_with('/')), + crate::MessageContent::Interaction { .. } => false, + }; + if looks_like_command { + return false; + } true } @@ -497,6 +845,9 @@ impl Channel { /// with a coalesce hint telling the LLM this is a fast-moving conversation. #[tracing::instrument(skip(self, messages), fields(channel_id = %self.id, agent_id = %self.deps.agent_id, message_count = messages.len()))] async fn handle_message_batch(&mut self, messages: Vec<InboundMessage>) -> Result<()> { + // Apply runtime-config updates immediately without requiring a restart. + self.sync_listen_only_mode_from_runtime(); + let message_count = messages.len(); let batch_start_timestamp = messages .iter() @@ -558,9 +909,10 @@ impl Channel { } // Persist each message to conversation log (individual audit trail) - let mut user_contents: Vec<UserContent> = Vec::new(); + let mut pending_batch_entries: Vec<(String, Vec<_>)> = Vec::new(); let mut conversation_id = String::new(); let temporal_context = TemporalContext::from_runtime(self.deps.runtime_config.as_ref()); + let mut batch_has_invoke = false; for message in &messages { if message.source != "system" { @@ -581,6 +933,13 @@ impl Channel { } }; + if self.listen_only_mode { + let (invoked_by_command, invoked_by_mention, invoked_by_reply) = + self.compute_listen_mode_invocation(message, &raw_text); + batch_has_invoke |= + invoked_by_command || invoked_by_mention || invoked_by_reply; + } + self.state.conversation_logger.log_user_message( &self.state.channel_id, sender_name, @@ -617,17 +976,33 @@ impl Channel { &raw_text, ); - // Download attachments for this message - if !attachments.is_empty() { - let attachment_content = download_attachments(&self.deps, &attachments).await; - for content in attachment_content { - user_contents.push(content); - } - } + pending_batch_entries.push((formatted_text, attachments)); + } + } + + if self.listen_only_mode && !batch_has_invoke { + tracing::debug!( + channel_id = %self.id, + message_count, + "listen-first mode: suppressing unsolicited coalesced batch" + ); + // Keep passive memory capture behavior aligned with single-message flow. + self.message_count += message_count; + self.check_memory_persistence().await; + return Ok(()); + } - user_contents.push(UserContent::text(formatted_text)); + let mut user_contents: Vec<UserContent> = Vec::new(); + for (formatted_text, attachments) in pending_batch_entries { + if !attachments.is_empty() { + let attachment_content = download_attachments(&self.deps, &attachments).await; + for content in attachment_content { + user_contents.push(content); + } } + user_contents.push(UserContent::text(formatted_text)); } + // Separate text and non-text (image/audio) content let mut text_parts = Vec::new(); let mut attachment_parts = Vec::new(); @@ -749,6 +1124,9 @@ impl Channel { /// memory_save. The tools act on the channel's shared state directly. #[tracing::instrument(skip(self, message), fields(channel_id = %self.id, agent_id = %self.deps.agent_id, message_id = %message.id))] async fn handle_message(&mut self, message: InboundMessage) -> Result<()> { + // Apply runtime-config updates immediately without requiring a restart. + self.sync_listen_only_mode_from_runtime(); + tracing::info!( channel_id = %self.id, message_id = %message.id, @@ -773,33 +1151,52 @@ impl Channel { crate::MessageContent::Interaction { .. } => (message.content.to_string(), Vec::new()), }; - let temporal_context = TemporalContext::from_runtime(self.deps.runtime_config.as_ref()); - let message_timestamp = temporal_context.format_timestamp(message.timestamp); - let user_text = format_user_message(&raw_text, &message, &message_timestamp); + self.persist_inbound_user_message(&message, &raw_text); - let attachment_content = if !attachments.is_empty() { - download_attachments(&self.deps, &attachments).await - } else { - Vec::new() - }; + // Deterministic built-in command: bypass model output drift for agent identity checks. + if message.source != "system" && raw_text.trim() == "/agent-id" { + self.send_builtin_text(self.deps.agent_id.to_string(), "agent-id") + .await; + return Ok(()); + } - // Persist user messages (skip system re-triggers) - if message.source != "system" { - let sender_name = message - .metadata - .get("sender_display_name") - .and_then(|v| v.as_str()) - .unwrap_or(&message.sender_id); - self.state.conversation_logger.log_user_message( - &self.state.channel_id, - sender_name, - &message.sender_id, - &raw_text, - &message.metadata, - ); - self.state - .channel_store - .upsert(&message.conversation_id, &message.metadata); + // Deterministic liveness ping for Telegram mentions. + // This avoids model/provider flakiness for simple "you there?" style checks. + if message.source == "telegram" { + let text = raw_text.trim().to_lowercase(); + let (_, has_mention, _) = self.compute_listen_mode_invocation(&message, &raw_text); + let looks_like_ping = text.contains("you here") + || text.contains("ping") + || text.ends_with(" yo") + || text == "yo" + || text.contains("alive") + || text.contains("there?"); + + if has_mention && looks_like_ping { + self.send_builtin_text("yeah i'm here".to_string(), "telegram-ping") + .await; + return Ok(()); + } + } + + // Deterministic ping ack for Discord quiet-mode mentions/replies to avoid + // flaky model behavior (e.g. skipping or over-formatting simple liveness checks). + if message.source == "discord" && self.listen_only_mode { + let text = raw_text.trim().to_lowercase(); + let (_, invoked_by_mention, invoked_by_reply) = + self.compute_listen_mode_invocation(&message, &raw_text); + let directed = invoked_by_mention || invoked_by_reply; + let looks_like_ping = text.contains("you here") + || text.contains("ping") + || text.ends_with(" yo") + || text == "yo" + || text.contains("alive") + || text.contains("there?"); + if directed && looks_like_ping { + self.send_builtin_text("yeah i'm here".to_string(), "discord-ping") + .await; + return Ok(()); + } } // Capture conversation context from the first message (platform, channel, server) @@ -820,6 +1217,49 @@ impl Channel { )?); } + if self + .try_handle_builtin_ops_commands(&raw_text, &message) + .await? + { + return Ok(()); + } + + let rewritten_text = if message.source == "system" { + raw_text.clone() + } else { + self.rewrite_tool_routed_command_prompt(&raw_text) + .unwrap_or_else(|| raw_text.clone()) + }; + + let temporal_context = TemporalContext::from_runtime(self.deps.runtime_config.as_ref()); + let message_timestamp = temporal_context.format_timestamp(message.timestamp); + let user_text = format_user_message(&rewritten_text, &message, &message_timestamp); + + let mut invoked_by_command = false; + let mut invoked_by_mention = false; + let mut invoked_by_reply = false; + + // Listen-first guardrail: + // ingest all messages, but only reply when explicitly invoked. + if self.listen_only_mode && message.source != "system" { + (invoked_by_command, invoked_by_mention, invoked_by_reply) = + self.compute_listen_mode_invocation(&message, &raw_text); + + if !invoked_by_command && !invoked_by_mention && !invoked_by_reply { + tracing::debug!( + channel_id = %self.id, + source = %message.source, + "listen-first mode: suppressing unsolicited reply" + ); + // In quiet/listen-first mode we still want passive memory capture. + // Count suppressed user messages so auto memory persistence branches + // continue to run on interval without requiring explicit invokes. + self.message_count += 1; + self.check_memory_persistence().await; + return Ok(()); + } + } + let system_prompt = self.build_system_prompt().await?; { @@ -828,6 +1268,11 @@ impl Channel { } let is_retrigger = message.source == "system"; + let attachment_content = if !attachments.is_empty() { + download_attachments(&self.deps, &attachments).await + } else { + Vec::new() + }; let (result, skip_flag, replied_flag, retrigger_reply_preserved) = self .run_agent_turn( @@ -842,6 +1287,25 @@ impl Channel { self.handle_agent_result(result, &skip_flag, &replied_flag, is_retrigger) .await; + // Safety-net: in quiet mode, explicit mention/reply should never be dropped silently. + if self.listen_only_mode + && !is_retrigger + && !invoked_by_command + && (invoked_by_mention || invoked_by_reply) + && skip_flag.load(std::sync::atomic::Ordering::Relaxed) + && !replied_flag.load(std::sync::atomic::Ordering::Relaxed) + && matches!( + message.source.as_str(), + "discord" | "telegram" | "slack" | "twitch" + ) + { + self.send_builtin_text( + "yeah i'm here — tell me what you need.".to_string(), + "quiet-mode-fallback", + ) + .await; + } + // After retrigger turns, persist a fallback summary only when we don't // already have the LLM's actual relay text in history. // @@ -1450,6 +1914,9 @@ impl Channel { /// Handle a process event (branch results, worker completions, status updates). async fn handle_event(&mut self, event: ProcessEvent) -> Result<()> { + // Keep mode aligned with live settings updates while this worker runs. + self.sync_listen_only_mode_from_runtime(); + // Only process events targeted at this channel if !event_is_for_channel(&event, &self.id) { return Ok(()); diff --git a/src/api/agents.rs b/src/api/agents.rs index d96addb2a..86a6e179e 100644 --- a/src/api/agents.rs +++ b/src/api/agents.rs @@ -562,6 +562,7 @@ pub(super) async fn create_agent( cortex: None, warmup: None, browser: None, + channel: None, mcp: None, brave_search_key: None, cron_timezone: None, @@ -676,7 +677,8 @@ pub(super) async fn create_agent( identity, skills, )); - runtime_config.set_settings(settings_store.clone()); + let explicit_listen_only = raw_config.channel.map(|channel| channel.listen_only_mode); + runtime_config.set_settings(settings_store.clone(), explicit_listen_only); let llm_manager = { let guard = state.llm_manager.read().await; diff --git a/src/api/config.rs b/src/api/config.rs index 471a34a32..28f508f1a 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -78,6 +78,11 @@ pub(super) struct BrowserSection { close_policy: String, } +#[derive(Serialize, Debug)] +pub(super) struct ChannelSection { + listen_only_mode: bool, +} + #[derive(Serialize, Debug)] pub(super) struct SandboxSection { mode: String, @@ -101,6 +106,7 @@ pub(super) struct AgentConfigResponse { coalesce: CoalesceSection, memory_persistence: MemoryPersistenceSection, browser: BrowserSection, + channel: ChannelSection, sandbox: SandboxSection, discord: DiscordSection, } @@ -130,6 +136,8 @@ pub(super) struct AgentConfigUpdateRequest { #[serde(default)] browser: Option<BrowserUpdate>, #[serde(default)] + channel: Option<ChannelUpdate>, + #[serde(default)] sandbox: Option<SandboxUpdate>, #[serde(default)] discord: Option<DiscordUpdate>, @@ -206,6 +214,11 @@ pub(super) struct BrowserUpdate { close_policy: Option<ClosePolicy>, } +#[derive(Deserialize, Debug)] +pub(super) struct ChannelUpdate { + listen_only_mode: Option<bool>, +} + #[derive(Deserialize, Debug)] pub(super) struct SandboxUpdate { mode: Option<String>, @@ -236,6 +249,7 @@ pub(super) async fn get_agent_config( let coalesce = rc.coalesce.load(); let memory_persistence = rc.memory_persistence.load(); let browser = rc.browser_config.load(); + let channel = rc.channel_config.load(); let sandbox = rc.sandbox.load(); let response = AgentConfigResponse { @@ -294,6 +308,9 @@ pub(super) async fn get_agent_config( persist_session: browser.persist_session, close_policy: browser.close_policy.as_str().to_string(), }, + channel: ChannelSection { + listen_only_mode: channel.listen_only_mode, + }, sandbox: SandboxSection { mode: match sandbox.mode { crate::sandbox::SandboxMode::Enabled => "enabled".to_string(), @@ -379,6 +396,9 @@ pub(super) async fn update_agent_config( if let Some(browser) = &request.browser { update_browser_table(&mut doc, agent_idx, browser)?; } + if let Some(channel) = &request.channel { + update_channel_table(&mut doc, agent_idx, channel)?; + } if let Some(sandbox) = &request.sandbox { update_sandbox_table(&mut doc, agent_idx, sandbox)?; } @@ -688,6 +708,19 @@ fn update_browser_table( Ok(()) } +fn update_channel_table( + doc: &mut toml_edit::DocumentMut, + agent_idx: usize, + channel: &ChannelUpdate, +) -> Result<(), StatusCode> { + let agent = get_agent_table_mut(doc, agent_idx)?; + let table = get_or_create_subtable(agent, "channel")?; + if let Some(v) = channel.listen_only_mode { + table["listen_only_mode"] = toml_edit::value(v); + } + Ok(()) +} + fn update_sandbox_table( doc: &mut toml_edit::DocumentMut, agent_idx: usize, @@ -834,4 +867,51 @@ id = "main" let result = update_warmup_table(&mut doc, agent_idx, &update); assert_eq!(result, Err(StatusCode::BAD_REQUEST)); } + + #[test] + fn test_update_channel_table_writes_listen_only_mode() { + let mut doc: toml_edit::DocumentMut = r#" +[[agents]] +id = "main" +"# + .parse() + .expect("failed to parse test TOML"); + + let agent_idx = + find_or_create_agent_table(&mut doc, "main").expect("failed to find/create agent"); + + let enable_update = ChannelUpdate { + listen_only_mode: Some(true), + }; + update_channel_table(&mut doc, agent_idx, &enable_update) + .expect("failed to update channel table with true"); + + let agent = doc + .get("agents") + .and_then(|item| item.as_array_of_tables()) + .and_then(|agents| agents.get(agent_idx)) + .expect("missing agent table"); + let channel = agent + .get("channel") + .and_then(|item| item.as_table()) + .expect("missing channel table"); + assert_eq!(channel["listen_only_mode"].as_bool(), Some(true)); + + let disable_update = ChannelUpdate { + listen_only_mode: Some(false), + }; + update_channel_table(&mut doc, agent_idx, &disable_update) + .expect("failed to update channel table with false"); + + let agent = doc + .get("agents") + .and_then(|item| item.as_array_of_tables()) + .and_then(|agents| agents.get(agent_idx)) + .expect("missing agent table"); + let channel = agent + .get("channel") + .and_then(|item| item.as_table()) + .expect("missing channel table"); + assert_eq!(channel["listen_only_mode"].as_bool(), Some(false)); + } } diff --git a/src/config/load.rs b/src/config/load.rs index e5f0a2d99..85a62c22e 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -10,8 +10,8 @@ use super::providers::{ }; use super::toml_schema::*; use super::{ - AgentConfig, ApiConfig, ApiType, Binding, BrowserConfig, ClosePolicy, CoalesceConfig, - CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, + AgentConfig, ApiConfig, ApiType, Binding, BrowserConfig, ChannelConfig, ClosePolicy, + CoalesceConfig, CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, DiscordInstanceConfig, EmailConfig, EmailInstanceConfig, GroupDef, HumanDef, IngestionConfig, LinkDef, LlmConfig, McpServerConfig, McpTransport, MemoryPersistenceConfig, MessagingConfig, MetricsConfig, OpenCodeConfig, ProviderConfig, SlackCommandConfig, SlackConfig, @@ -746,6 +746,7 @@ impl Config { cortex: None, warmup: None, browser: None, + channel: None, mcp: None, brave_search_key: None, cron_timezone: None, @@ -1420,6 +1421,15 @@ impl Config { ..base_defaults.browser.clone() }) }, + channel: toml + .defaults + .channel + .map(|channel_config| ChannelConfig { + listen_only_mode: channel_config + .listen_only_mode + .unwrap_or(base_defaults.channel.listen_only_mode), + }) + .unwrap_or(base_defaults.channel), mcp: default_mcp, brave_search_key: toml .defaults @@ -1616,6 +1626,11 @@ impl Config { .unwrap_or(defaults.browser.close_policy), chrome_cache_dir: defaults.browser.chrome_cache_dir.clone(), }), + channel: a.channel.and_then(|channel_config| { + channel_config + .listen_only_mode + .map(|listen_only_mode| ChannelConfig { listen_only_mode }) + }), mcp: match a.mcp { Some(mcp_servers) => Some( mcp_servers @@ -1654,6 +1669,7 @@ impl Config { cortex: None, warmup: None, browser: None, + channel: None, mcp: None, brave_search_key: None, cron_timezone: None, diff --git a/src/config/runtime.rs b/src/config/runtime.rs index 6700f82c1..47dfb70df 100644 --- a/src/config/runtime.rs +++ b/src/config/runtime.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use arc_swap::ArcSwap; use super::{ - BrowserConfig, CoalesceConfig, CompactionConfig, Config, CortexConfig, DefaultsConfig, - IngestionConfig, McpServerConfig, MemoryPersistenceConfig, OpenCodeConfig, ResolvedAgentConfig, - WarmupConfig, WarmupStatus, WorkReadiness, evaluate_work_readiness, + BrowserConfig, ChannelConfig, CoalesceConfig, CompactionConfig, Config, CortexConfig, + DefaultsConfig, IngestionConfig, McpServerConfig, MemoryPersistenceConfig, OpenCodeConfig, + ResolvedAgentConfig, WarmupConfig, WarmupStatus, WorkReadiness, evaluate_work_readiness, }; use crate::llm::routing::RoutingConfig; use crate::tools::browser::SharedBrowserHandle; @@ -26,6 +26,7 @@ pub struct RuntimeConfig { pub memory_persistence: ArcSwap<MemoryPersistenceConfig>, pub coalesce: ArcSwap<CoalesceConfig>, pub ingestion: ArcSwap<IngestionConfig>, + pub channel_config: ArcSwap<ChannelConfig>, pub max_turns: ArcSwap<usize>, pub branch_max_turns: ArcSwap<usize>, pub context_window: ArcSwap<usize>, @@ -58,6 +59,9 @@ pub struct RuntimeConfig { pub cron_scheduler: ArcSwap<Option<Arc<crate::cron::Scheduler>>>, /// Settings store for agent-specific configuration. pub settings: ArcSwap<Option<Arc<crate::settings::SettingsStore>>>, + /// Tracks whether listen_only_mode is explicitly configured via agent/env. + /// When set, channel-local persisted values must not override it. + pub channel_listen_only_explicit: ArcSwap<Option<bool>>, /// Secrets store for encrypted credential storage. pub secrets: ArcSwap<Option<Arc<crate::secrets::store::SecretsStore>>>, /// Sandbox configuration for process containment. @@ -98,6 +102,7 @@ impl RuntimeConfig { memory_persistence: ArcSwap::from_pointee(agent_config.memory_persistence), coalesce: ArcSwap::from_pointee(agent_config.coalesce), ingestion: ArcSwap::from_pointee(agent_config.ingestion), + channel_config: ArcSwap::from_pointee(agent_config.channel), max_turns: ArcSwap::from_pointee(agent_config.max_turns), branch_max_turns: ArcSwap::from_pointee(agent_config.branch_max_turns), context_window: ArcSwap::from_pointee(agent_config.context_window), @@ -122,6 +127,7 @@ impl RuntimeConfig { cron_store: ArcSwap::from_pointee(None), cron_scheduler: ArcSwap::from_pointee(None), settings: ArcSwap::from_pointee(None), + channel_listen_only_explicit: ArcSwap::from_pointee(None), secrets: ArcSwap::from_pointee(None), sandbox: Arc::new(ArcSwap::from_pointee(agent_config.sandbox.clone())), shared_browser: if agent_config.browser.persist_session { @@ -143,8 +149,29 @@ impl RuntimeConfig { } /// Set the settings store after initialization. - pub fn set_settings(&self, settings: Arc<crate::settings::SettingsStore>) { - self.settings.store(Arc::new(Some(settings))); + pub fn set_settings( + &self, + settings: Arc<crate::settings::SettingsStore>, + explicit_listen_only: Option<bool>, + ) { + self.settings.store(Arc::new(Some(settings.clone()))); + self.channel_listen_only_explicit + .store(Arc::new(explicit_listen_only)); + if explicit_listen_only.is_none() { + match settings.channel_listen_only_mode() { + Ok(Some(enabled)) => { + self.channel_config.rcu(move |current| { + let mut next = **current; + next.listen_only_mode = enabled; + Arc::new(next) + }); + } + Ok(None) => {} + Err(error) => { + tracing::warn!(%error, "failed to load persisted channel listen_only_mode"); + } + } + } } /// Set the secrets store after initialization. @@ -192,6 +219,29 @@ impl RuntimeConfig { .store(Arc::new(resolved.memory_persistence)); self.coalesce.store(Arc::new(resolved.coalesce)); self.ingestion.store(Arc::new(resolved.ingestion)); + let resolved_channel = resolved.channel; + let configured_listen_only = agent.channel.map(|channel| channel.listen_only_mode); + self.channel_listen_only_explicit + .store(Arc::new(configured_listen_only)); + let persisted_listen_only = self.settings.load().as_ref().as_ref().and_then(|settings| { + match settings.channel_listen_only_mode() { + Ok(value) => value, + Err(error) => { + tracing::warn!( + %error, + "failed to load persisted channel listen_only_mode during reload" + ); + None + } + } + }); + self.channel_config.rcu(move |current| { + let mut next = resolved_channel; + next.listen_only_mode = configured_listen_only + .or(persisted_listen_only) + .unwrap_or(current.as_ref().listen_only_mode); + Arc::new(next) + }); self.max_turns.store(Arc::new(resolved.max_turns)); self.branch_max_turns .store(Arc::new(resolved.branch_max_turns)); diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index 3dd3b898d..055d78281 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -277,6 +277,7 @@ pub(super) struct TomlDefaultsConfig { pub(super) cortex: Option<TomlCortexConfig>, pub(super) warmup: Option<TomlWarmupConfig>, pub(super) browser: Option<TomlBrowserConfig>, + pub(super) channel: Option<TomlChannelConfig>, #[serde(default)] pub(super) mcp: Vec<TomlMcpServerConfig>, pub(super) brave_search_key: Option<String>, @@ -368,6 +369,11 @@ pub(super) struct TomlBrowserConfig { pub(super) close_policy: Option<String>, } +#[derive(Deserialize)] +pub(super) struct TomlChannelConfig { + pub(super) listen_only_mode: Option<bool>, +} + #[derive(Deserialize)] pub(super) struct TomlOpenCodeConfig { pub(super) enabled: Option<bool>, @@ -426,6 +432,7 @@ pub(super) struct TomlAgentConfig { pub(super) cortex: Option<TomlCortexConfig>, pub(super) warmup: Option<TomlWarmupConfig>, pub(super) browser: Option<TomlBrowserConfig>, + pub(super) channel: Option<TomlChannelConfig>, pub(super) mcp: Option<Vec<TomlMcpServerConfig>>, pub(super) brave_search_key: Option<String>, pub(super) cron_timezone: Option<String>, diff --git a/src/config/types.rs b/src/config/types.rs index 874db5361..8bf2d6b11 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -511,6 +511,7 @@ pub struct DefaultsConfig { pub cortex: CortexConfig, pub warmup: WarmupConfig, pub browser: BrowserConfig, + pub channel: ChannelConfig, pub mcp: Vec<McpServerConfig>, /// Brave Search API key for web search tool. Supports "env:VAR_NAME" references. pub brave_search_key: Option<String>, @@ -541,6 +542,7 @@ impl std::fmt::Debug for DefaultsConfig { .field("cortex", &self.cortex) .field("warmup", &self.warmup) .field("browser", &self.browser) + .field("channel", &self.channel) .field("mcp", &self.mcp) .field( "brave_search_key", @@ -766,6 +768,13 @@ impl Default for BrowserConfig { } } +/// Channel behavior configuration. +#[derive(Debug, Clone, Copy, Default)] +pub struct ChannelConfig { + /// When true, unsolicited chat messages are ignored unless command/mention/reply. + pub listen_only_mode: bool, +} + /// OpenCode subprocess worker configuration. #[derive(Debug, Clone, PartialEq, Eq)] pub struct OpenCodeConfig { @@ -989,6 +998,7 @@ pub struct AgentConfig { pub cortex: Option<CortexConfig>, pub warmup: Option<WarmupConfig>, pub browser: Option<BrowserConfig>, + pub channel: Option<ChannelConfig>, pub mcp: Option<Vec<McpServerConfig>>, /// Per-agent Brave Search API key override. None inherits from defaults. pub brave_search_key: Option<String>, @@ -1044,6 +1054,7 @@ pub struct ResolvedAgentConfig { pub cortex: CortexConfig, pub warmup: WarmupConfig, pub browser: BrowserConfig, + pub channel: ChannelConfig, pub mcp: Vec<McpServerConfig>, pub brave_search_key: Option<String>, pub cron_timezone: Option<String>, @@ -1071,6 +1082,7 @@ impl Default for DefaultsConfig { cortex: CortexConfig::default(), warmup: WarmupConfig::default(), browser: BrowserConfig::default(), + channel: ChannelConfig::default(), mcp: Vec::new(), brave_search_key: None, cron_timezone: None, @@ -1134,6 +1146,7 @@ impl AgentConfig { .browser .clone() .unwrap_or_else(|| defaults.browser.clone()), + channel: self.channel.unwrap_or(defaults.channel), mcp: resolve_mcp_configs(&defaults.mcp, self.mcp.as_deref()), brave_search_key: self .brave_search_key diff --git a/src/lib.rs b/src/lib.rs index 330bebcea..53e130036 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -475,6 +475,62 @@ pub enum OutboundResponse { Status(StatusUpdate), } +impl OutboundResponse { + /// Ensure `RichMessage` variants have a non-empty `text` fallback. + /// + /// Some LLMs emit card-only payloads with empty content. This derives a + /// readable plaintext fallback from cards so adapters that don't support + /// rich formatting (or use `text` for notifications) always have content. + pub fn ensure_text_fallback(&mut self) { + if let OutboundResponse::RichMessage { text, cards, .. } = self + && text.trim().is_empty() + { + let derived = Self::text_from_cards(cards); + if !derived.trim().is_empty() { + *text = derived; + } + } + } + + /// Derive a plaintext representation from a slice of [`Card`]s. + /// + /// Used as a fallback when the LLM provides cards but no text content. + /// Adapters can call this directly when they destructure `RichMessage` + /// and need a text fallback without reconstructing the enum. + pub fn text_from_cards(cards: &[Card]) -> String { + let mut sections = Vec::new(); + for card in cards { + let mut lines = Vec::new(); + if let Some(title) = &card.title + && !title.trim().is_empty() + { + lines.push(title.trim().to_string()); + } + if let Some(description) = &card.description + && !description.trim().is_empty() + { + lines.push(description.trim().to_string()); + } + for field in &card.fields { + let name = field.name.trim(); + let value = field.value.trim(); + if !name.is_empty() || !value.is_empty() { + lines.push(format!("{name}\n{value}").trim().to_string()); + } + } + if let Some(footer) = &card.footer + && !footer.trim().is_empty() + { + lines.push(footer.trim().to_string()); + } + if !lines.is_empty() { + sections.push(lines.join("\n\n")); + } + } + sections.join("\n\n") + } +} + /// A generic rich-formatted card (maps to Embeds in Discord). #[derive(Debug, Clone, Serialize, Deserialize, Default, schemars::JsonSchema)] pub struct Card { diff --git a/src/main.rs b/src/main.rs index 995d94e52..3cd123da3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2162,7 +2162,12 @@ async fn initialize_agents( )); // Set the settings store in RuntimeConfig and apply config-driven defaults - runtime_config.set_settings(settings_store.clone()); + let explicit_listen_only = config + .agents + .iter() + .find(|agent| agent.id == agent_config.id) + .and_then(|agent| agent.channel.map(|channel| channel.listen_only_mode)); + runtime_config.set_settings(settings_store.clone(), explicit_listen_only); if let Err(error) = settings_store.set_worker_log_mode(config.defaults.worker_log_mode) { tracing::warn!(%error, agent = %agent_config.id, "failed to set worker_log_mode from config"); } diff --git a/src/messaging/discord.rs b/src/messaging/discord.rs index ef692e301..afd8c1c87 100644 --- a/src/messaging/discord.rs +++ b/src/messaging/discord.rs @@ -164,7 +164,7 @@ impl Messaging for DiscordAdapter { } } OutboundResponse::RichMessage { - text, + mut text, cards, interactive_elements, poll, @@ -173,6 +173,35 @@ impl Messaging for DiscordAdapter { self.stop_typing(message).await; let reply_to = Self::extract_reply_message_id(message); + // Derive a plaintext fallback from cards when text is empty so + // the message is never blank (notifications, logging, etc.). + if text.trim().is_empty() { + let derived = crate::OutboundResponse::text_from_cards(&cards); + if !derived.trim().is_empty() { + text = derived; + } + } + + // Enforce Discord API limits: max 10 embeds, 5 action rows. + let cards = if cards.len() > 10 { + tracing::warn!( + count = cards.len(), + "truncating cards to Discord embed limit (10)" + ); + &cards[..10] + } else { + &cards + }; + let interactive_elements = if interactive_elements.len() > 5 { + tracing::warn!( + count = interactive_elements.len(), + "truncating interactive elements to Discord action row limit (5)" + ); + &interactive_elements[..5] + } else { + &interactive_elements + }; + let chunks = split_message(&text, 2000); for (i, chunk) in chunks.iter().enumerate() { let is_last = i == chunks.len() - 1; @@ -183,16 +212,13 @@ impl Messaging for DiscordAdapter { // Attach rich content only to the final chunk if is_last { - let embeds: Vec<_> = cards.iter().take(10).map(build_embed).collect(); + let embeds: Vec<_> = cards.iter().map(build_embed).collect(); if !embeds.is_empty() { msg = msg.embeds(embeds); } - let components: Vec<_> = interactive_elements - .iter() - .take(5) - .map(build_action_row) - .collect(); + let components: Vec<_> = + interactive_elements.iter().map(build_action_row).collect(); if !components.is_empty() { msg = msg.components(components); } @@ -422,13 +448,41 @@ impl Messaging for DiscordAdapter { .context("failed to broadcast discord message")?; } } else if let OutboundResponse::RichMessage { - text, + mut text, cards, interactive_elements, poll, .. } = response { + // Derive a plaintext fallback from cards when text is empty. + if text.trim().is_empty() { + let derived = crate::OutboundResponse::text_from_cards(&cards); + if !derived.trim().is_empty() { + text = derived; + } + } + + // Enforce Discord API limits: max 10 embeds, 5 action rows. + let cards = if cards.len() > 10 { + tracing::warn!( + count = cards.len(), + "truncating cards to Discord embed limit (10)" + ); + &cards[..10] + } else { + &cards + }; + let interactive_elements = if interactive_elements.len() > 5 { + tracing::warn!( + count = interactive_elements.len(), + "truncating interactive elements to Discord action row limit (5)" + ); + &interactive_elements[..5] + } else { + &interactive_elements + }; + let chunks = split_message(&text, 2000); for (i, chunk) in chunks.iter().enumerate() { let is_last = i == chunks.len() - 1; @@ -439,16 +493,13 @@ impl Messaging for DiscordAdapter { // Attach rich content only to the final chunk if is_last { - let embeds: Vec<_> = cards.iter().take(10).map(build_embed).collect(); + let embeds: Vec<_> = cards.iter().map(build_embed).collect(); if !embeds.is_empty() { msg = msg.embeds(embeds); } - let components: Vec<_> = interactive_elements - .iter() - .take(5) - .map(build_action_row) - .collect(); + let components: Vec<_> = + interactive_elements.iter().map(build_action_row).collect(); if !components.is_empty() { msg = msg.components(components); } @@ -722,9 +773,13 @@ impl EventHandler for Handler { "discord_message_id".into(), serde_json::Value::Number(component.message.id.get().into()), ); + let discord_mentioned_bot = false; + let discord_reply_to_bot = true; + metadata.insert("discord_mentioned_bot".into(), discord_mentioned_bot.into()); + metadata.insert("discord_reply_to_bot".into(), discord_reply_to_bot.into()); metadata.insert( "discord_mentions_or_replies_to_bot".into(), - serde_json::Value::Bool(true), + (discord_mentioned_bot || discord_reply_to_bot).into(), ); if let Some(guild_id) = component.guild_id { metadata.insert( @@ -766,14 +821,21 @@ impl EventHandler for Handler { } fn is_mention_or_reply_to_bot(message: &Message, bot_user_id: Option<UserId>) -> bool { + is_mention_to_bot(message, bot_user_id) || is_reply_to_bot(message, bot_user_id) +} + +fn is_mention_to_bot(message: &Message, bot_user_id: Option<UserId>) -> bool { let Some(bot_id) = bot_user_id else { return false; }; - let directly_mentioned = message.mentions.iter().any(|user| user.id == bot_id); - if directly_mentioned { - return true; - } + message.mentions.iter().any(|user| user.id == bot_id) +} + +fn is_reply_to_bot(message: &Message, bot_user_id: Option<UserId>) -> bool { + let Some(bot_id) = bot_user_id else { + return false; + }; message .referenced_message @@ -945,6 +1007,14 @@ async fn build_metadata( "discord_mentions_or_replies_to_bot".into(), is_mention_or_reply_to_bot(message, bot_user_id).into(), ); + metadata.insert( + "discord_mentioned_bot".into(), + is_mention_to_bot(message, bot_user_id).into(), + ); + metadata.insert( + "discord_reply_to_bot".into(), + is_reply_to_bot(message, bot_user_id).into(), + ); (metadata, formatted_author) } diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index 49dc17287..44e67997a 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -32,6 +32,7 @@ use slack_morphism::prelude::*; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{RwLock, mpsc}; +use tokio::time::{Duration, timeout}; /// State shared with socket mode callbacks via `SlackClientEventsUserState`. struct SlackAdapterState { @@ -260,6 +261,58 @@ async fn handle_message_event( &adapter_state.channel_name_cache, ) .await; + let mut metadata = metadata; + let bot_mention = format!("<@{}>", adapter_state.bot_user_id); + let mentioned_bot = msg_event + .content + .as_ref() + .and_then(|content| content.text.as_ref()) + .map(|text| text.contains(&bot_mention)) + .unwrap_or(false); + let token = SlackApiToken::new(SlackApiTokenValue(adapter_state.bot_token.clone())); + let session = client.open_session(&token); + let replied_to_bot = if let Some(thread_ts) = msg_event.origin.thread_ts.as_ref() { + // For threaded replies, treat as explicit invoke only when the thread + // root message belongs to this bot. + if thread_ts.0 != ts { + let thread_replies_request = SlackApiConversationsRepliesRequest::new( + SlackChannelId(channel_id.clone()), + thread_ts.clone(), + ) + .with_limit(1); + match timeout( + Duration::from_secs(2), + session.conversations_replies(&thread_replies_request), + ) + .await + { + Ok(Ok(response)) => response + .messages + .first() + .and_then(|message| message.sender.user.as_ref()) + .is_some_and(|user| user.0 == adapter_state.bot_user_id), + Ok(Err(error)) => { + tracing::debug!(%error, "failed to resolve slack thread parent for reply invoke"); + false + } + Err(error) => { + tracing::debug!( + %error, + "timed out resolving slack thread parent for reply invoke" + ); + false + } + } + } else { + false + } + } else { + false + }; + metadata.insert( + "slack_mentions_or_replies_to_bot".into(), + serde_json::Value::Bool(mentioned_bot || replied_to_bot), + ); send_inbound( &adapter_state.inbound_tx, @@ -350,6 +403,11 @@ async fn handle_app_mention_event( &adapter_state.channel_name_cache, ) .await; + let mut metadata = metadata; + metadata.insert( + "slack_mentions_or_replies_to_bot".into(), + serde_json::Value::Bool(true), + ); send_inbound( &adapter_state.inbound_tx, diff --git a/src/messaging/telegram.rs b/src/messaging/telegram.rs index c6060b03e..f806e7537 100644 --- a/src/messaging/telegram.rs +++ b/src/messaging/telegram.rs @@ -874,6 +874,17 @@ fn build_metadata( } if let Some(from) = &reply.from { metadata.insert("reply_to_author".into(), build_display_name(from).into()); + metadata.insert( + "reply_to_user_id".into(), + serde_json::Value::Number(from.id.0.into()), + ); + metadata.insert( + "reply_to_is_bot".into(), + serde_json::Value::Bool(from.is_bot), + ); + if let Some(username) = &from.username { + metadata.insert("reply_to_username".into(), username.clone().into()); + } } } diff --git a/src/messaging/twitch.rs b/src/messaging/twitch.rs index 2f824eac2..0974a1326 100644 --- a/src/messaging/twitch.rs +++ b/src/messaging/twitch.rs @@ -288,10 +288,39 @@ impl Messaging for TwitchAdapter { "twitch_user_login".into(), serde_json::Value::String(privmsg.sender.login.clone()), ); + metadata.insert( + "twitch_bot_login".into(), + serde_json::Value::String(bot_username.clone()), + ); metadata.insert( "sender_display_name".into(), serde_json::Value::String(privmsg.sender.name.clone()), ); + let message_lower = privmsg.message_text.to_lowercase(); + let mention = format!("@{bot_username}"); + let is_login_char = |character: char| { + character.is_ascii_lowercase() + || character.is_ascii_digit() + || character == '_' + }; + let mentions_bot = message_lower.match_indices(&mention).any(|(start, _)| { + let before = message_lower[..start].chars().next_back(); + let after = message_lower[start + mention.len()..].chars().next(); + let before_ok = before.map(|character| !is_login_char(character)).unwrap_or(true); + let after_ok = after.map(|character| !is_login_char(character)).unwrap_or(true); + before_ok && after_ok + }); + let replies_to_bot = privmsg + .source + .tags + .0 + .get("reply-parent-user-login") + .and_then(|value| value.as_ref()) + .is_some_and(|login| login.eq_ignore_ascii_case(&bot_username)); + metadata.insert( + "twitch_mentions_or_replies_to_bot".into(), + serde_json::Value::Bool(mentions_bot || replies_to_bot), + ); let formatted_author = format!( "{} ({})", diff --git a/src/settings.rs b/src/settings.rs index 8c6d73490..29acb2be2 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -2,4 +2,4 @@ pub mod store; -pub use store::{SettingsStore, WORKER_LOG_MODE_KEY, WorkerLogMode}; +pub use store::{CHANNEL_LISTEN_ONLY_MODE_KEY, SettingsStore, WORKER_LOG_MODE_KEY, WorkerLogMode}; diff --git a/src/settings/store.rs b/src/settings/store.rs index 387ed6b69..668236c6f 100644 --- a/src/settings/store.rs +++ b/src/settings/store.rs @@ -11,6 +11,9 @@ const SETTINGS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("settin /// Default key for worker log mode setting. pub const WORKER_LOG_MODE_KEY: &str = "worker_log_mode"; +/// Key for channel listen-only mode setting. +pub const CHANNEL_LISTEN_ONLY_MODE_KEY: &str = "channel_listen_only_mode"; +const CHANNEL_LISTEN_ONLY_MODE_PREFIX: &str = "channel_listen_only_mode:"; /// How worker execution logs are stored. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] @@ -54,6 +57,9 @@ pub struct SettingsStore { } impl SettingsStore { + fn channel_listen_only_mode_key(channel_id: &str) -> String { + format!("{CHANNEL_LISTEN_ONLY_MODE_PREFIX}{channel_id}") + } /// Create a new settings store at the given path. /// The database will be created if it doesn't exist. pub fn new(path: &Path) -> Result<Self> { @@ -160,6 +166,57 @@ impl SettingsStore { pub fn set_worker_log_mode(&self, mode: WorkerLogMode) -> Result<()> { self.set_raw(WORKER_LOG_MODE_KEY, &mode.to_string()) } + + /// Get the channel listen-only mode, if explicitly persisted. + pub fn channel_listen_only_mode(&self) -> Result<Option<bool>> { + match self.get_raw(CHANNEL_LISTEN_ONLY_MODE_KEY) { + Ok(raw) => raw.parse::<bool>().map(Some).map_err(|error| { + SettingsError::ReadFailed { + key: CHANNEL_LISTEN_ONLY_MODE_KEY.to_string(), + details: format!("invalid boolean value '{raw}': {error}"), + } + .into() + }), + Err(crate::error::Error::Settings(settings_error)) => match *settings_error { + SettingsError::NotFound { .. } => Ok(None), + other => Err(other.into()), + }, + Err(other) => Err(other), + } + } + + /// Persist channel listen-only mode. + pub fn set_channel_listen_only_mode(&self, enabled: bool) -> Result<()> { + self.set_raw( + CHANNEL_LISTEN_ONLY_MODE_KEY, + if enabled { "true" } else { "false" }, + ) + } + + /// Get the listen-only mode for a specific channel, if explicitly persisted. + pub fn channel_listen_only_mode_for(&self, channel_id: &str) -> Result<Option<bool>> { + let key = Self::channel_listen_only_mode_key(channel_id); + match self.get_raw(&key) { + Ok(raw) => raw.parse::<bool>().map(Some).map_err(|error| { + SettingsError::ReadFailed { + key: key.clone(), + details: format!("invalid boolean value '{raw}': {error}"), + } + .into() + }), + Err(crate::error::Error::Settings(settings_error)) => match *settings_error { + SettingsError::NotFound { .. } => Ok(None), + other => Err(other.into()), + }, + Err(other) => Err(other), + } + } + + /// Persist listen-only mode for a specific channel. + pub fn set_channel_listen_only_mode_for(&self, channel_id: &str, enabled: bool) -> Result<()> { + let key = Self::channel_listen_only_mode_key(channel_id); + self.set_raw(&key, if enabled { "true" } else { "false" }) + } } impl std::fmt::Debug for SettingsStore { diff --git a/src/tools/reply.rs b/src/tools/reply.rs index ca7256239..91c68b2e9 100644 --- a/src/tools/reply.rs +++ b/src/tools/reply.rs @@ -372,6 +372,12 @@ impl Tool for ReplyTool { )); } + let thread_name = args + .thread_name + .as_ref() + .map(|name| name.trim()) + .filter(|name| !name.is_empty()); + if let Some(leak) = crate::secrets::scrub::scan_for_leaks(&converted_content) { tracing::error!( conversation_id = %self.conversation_id, @@ -383,18 +389,12 @@ impl Tool for ReplyTool { )); } - self.conversation_logger.log_bot_message_with_name( - &self.channel_id, - &converted_content, - Some(&self.agent_display_name), - ); - - let response = if let Some(ref name) = args.thread_name { + let response = if let Some(name) = thread_name { // Cap thread names at 100 characters (Discord limit) let thread_name = if name.len() > 100 { name[..name.floor_char_boundary(100)].to_string() } else { - name.clone() + name.to_string() }; OutboundResponse::ThreadReply { thread_name, @@ -404,7 +404,7 @@ impl Tool for ReplyTool { { OutboundResponse::RichMessage { text: converted_content.clone(), - blocks: vec![], // No block generation for now; Slack adapters will fall back to text + blocks: vec![], cards: args.cards.unwrap_or_default(), interactive_elements: args.interactive_elements.unwrap_or_default(), poll: args.poll, @@ -418,6 +418,12 @@ impl Tool for ReplyTool { .await .map_err(|e| ReplyError(format!("failed to send reply: {e}")))?; + self.conversation_logger.log_bot_message_with_name( + &self.channel_id, + &converted_content, + Some(&self.agent_display_name), + ); + // Mark the turn as handled so handle_agent_result skips the fallback send. self.replied_flag.store(true, Ordering::Relaxed);