From 2405da2ac1a217b525420d97690ef8d480ea2ea9 Mon Sep 17 00:00:00 2001 From: James Pine Date: Thu, 26 Feb 2026 16:48:58 -0800 Subject: [PATCH 1/2] feat: add IMAP email_search tool for branch read-back --- docs/content/docs/(features)/tools.mdx | 16 +- docs/content/docs/(messaging)/email-setup.mdx | 15 +- docs/content/docs/(messaging)/messaging.mdx | 2 +- .../en/tools/email_search_description.md.j2 | 3 + src/agent/channel.rs | 1 + src/agent/ingestion.rs | 1 + src/messaging/email.rs | 256 +++++++++++++++- src/prompts/engine.rs | 14 +- src/prompts/text.rs | 3 + src/tools.rs | 5 + src/tools/email_search.rs | 274 ++++++++++++++++++ tests/context_dump.rs | 2 + 12 files changed, 579 insertions(+), 13 deletions(-) create mode 100644 prompts/en/tools/email_search_description.md.j2 create mode 100644 src/tools/email_search.rs diff --git a/docs/content/docs/(features)/tools.mdx b/docs/content/docs/(features)/tools.mdx index f1def1fc5..7bd2fc706 100644 --- a/docs/content/docs/(features)/tools.mdx +++ b/docs/content/docs/(features)/tools.mdx @@ -1,6 +1,6 @@ --- title: Tools -description: All 16 tools that give LLM processes the ability to act. +description: Tools that give LLM processes the ability to act. --- # Tools @@ -11,7 +11,7 @@ How Spacebot gives LLM processes the ability to act. Every tool implements Rig's `Tool` trait and lives in `src/tools/`. Tools are organized by function, not by consumer. Which process gets which tools is configured via ToolServer factory functions in `src/tools.rs`. -All 16 tools: +Core tools include: | Tool | Purpose | Consumers | |------|---------|-----------| @@ -25,6 +25,7 @@ All 16 tools: | `memory_save` | Write a memory to the store | Branch, Cortex, Compactor | | `memory_recall` | Search memories via hybrid search | Branch | | `channel_recall` | Retrieve transcript from another channel | Branch | +| `email_search` | Search IMAP mailbox content directly | Branch | | `set_status` | Report worker progress to the channel | Worker | | `shell` | Execute shell commands | Worker | | `file` | Read, write, and list files | Worker | @@ -71,10 +72,11 @@ Each branch gets its own isolated ToolServer, created at spawn time via `create_ │ memory_save (Arc) │ │ memory_recall (Arc) │ │ channel_recall (ConversationLogger) │ +│ email_search (IMAP mailbox search) │ └──────────────────────────────────────────────┘ ``` -Branch isolation ensures `memory_recall` and `channel_recall` are never visible to the channel. All tools are registered at creation and live for the lifetime of the branch. +Branch isolation ensures `memory_recall`, `channel_recall`, and `email_search` are never visible to the channel. All tools are registered at creation and live for the lifetime of the branch. ### Worker ToolServer (per-worker) @@ -136,7 +138,7 @@ create_cortex_tool_server(memory_search) -> ToolServerHandle ### Static tools (registered at creation) -`memory_save`, `memory_recall`, `channel_recall` on branch ToolServers. `shell`, `file`, `exec` on worker ToolServers. `memory_save` on cortex and compactor ToolServers. These are registered before `.run()` via the builder pattern and live for the lifetime of the ToolServer. +`memory_save`, `memory_recall`, `channel_recall`, `email_search` on branch ToolServers. `shell`, `file`, `exec` on worker ToolServers. `memory_save` on cortex and compactor ToolServers. These are registered before `.run()` via the builder pattern and live for the lifetime of the ToolServer. ### Dynamic tools (added/removed at runtime) @@ -153,7 +155,7 @@ create_cortex_tool_server(memory_search) -> ToolServerHandle ### Per-process tools (created and destroyed with the process) -Branch and worker ToolServers are created when the process spawns and dropped when it finishes. Each branch gets `memory_save` + `memory_recall` + `channel_recall`. Each worker gets `shell`, `file`, `exec`, `set_status` (bound to that worker's ID), and optionally `browser`. +Branch and worker ToolServers are created when the process spawns and dropped when it finishes. Each branch gets `memory_save` + `memory_recall` + `channel_recall` + `email_search`. Each worker gets `shell`, `file`, `exec`, `set_status` (bound to that worker's ID), and optionally `browser`. ## Tool Design Patterns @@ -228,6 +230,10 @@ If the name doesn't match any channel, falls back to list mode so the LLM can se Channel names are resolved from the `discord_channel_name` field stored in message metadata. The tool queries `conversation_messages` in SQLite directly — it reads persisted messages, not in-memory Rig history. +### email_search + +Searches the configured email mailbox directly over IMAP with filters like sender (`from`), subject, text query, unread-only, and time window (`since_days`). Returns message metadata plus a body snippet for precise read-back in email workflows. + ### set_status Reports the worker's current progress. The status string appears in the channel's status block so the user-facing process knows what's happening without polling. diff --git a/docs/content/docs/(messaging)/email-setup.mdx b/docs/content/docs/(messaging)/email-setup.mdx index 22c142839..1e3dc6769 100644 --- a/docs/content/docs/(messaging)/email-setup.mdx +++ b/docs/content/docs/(messaging)/email-setup.mdx @@ -99,6 +99,18 @@ When the Email adapter is configured, you can intentionally send email from a no This keeps email channels inbound-only while still allowing deliberate outbound send workflows. +## Search mailbox content (`email_search`) + +Branches can use `email_search` to query IMAP directly when you ask for exact read-back details. + +Examples: + +- "Find emails from Alice about renewal in the last 14 days" +- "Search unread emails mentioning invoice" +- "Look for subject containing Q1 roadmap" + +Use specific filters (`from`, `subject`, `query`, `since_days`) to avoid broad mailbox scans. + ## Thread behavior Spacebot keeps one conversation per email thread. It uses `References`, `In-Reply-To`, and `Message-ID` headers to map replies back to the correct conversation. @@ -134,7 +146,8 @@ Use a longer interval if your provider rate limits IMAP polling. 1. Send an email to the configured mailbox from an allowed sender. 2. Confirm a new channel appears in Spacebot with a subject-based name. -3. Reply in the same thread and confirm Spacebot replies in-thread. +3. Reply in the same thread and confirm the message is ingested into the same conversation thread (no automated reply by default). +4. If direct email replies are explicitly enabled in your setup, verify outbound replies preserve threading headers. ## Troubleshooting diff --git a/docs/content/docs/(messaging)/messaging.mdx b/docs/content/docs/(messaging)/messaging.mdx index 11d3087be..4804d3148 100644 --- a/docs/content/docs/(messaging)/messaging.mdx +++ b/docs/content/docs/(messaging)/messaging.mdx @@ -144,4 +144,4 @@ curl -X POST http://localhost:18789/webhook \ ## Hot Reloading -Changes to bindings and permissions (channel filters, DM allowed users) take effect within a couple seconds — no restart needed. Token and credential changes are applied by reconnecting the adapter. +Changes to bindings and permissions (channel filters, DM allowed users) take effect within a couple of seconds — no restart needed. Token and credential changes are applied by reconnecting the adapter. diff --git a/prompts/en/tools/email_search_description.md.j2 b/prompts/en/tools/email_search_description.md.j2 new file mode 100644 index 000000000..d54c57d22 --- /dev/null +++ b/prompts/en/tools/email_search_description.md.j2 @@ -0,0 +1,3 @@ +Search the configured IMAP mailbox for specific emails and return matching results with metadata and body snippets. Use this when you need to read back what someone said in email, verify details from a previous message, or find messages by sender/subject/topic. + +Prefer specific filters (`from`, `subject`, `query`, `since_days`) instead of broad searches. Keep `limit` small unless the user asks for a wider sweep. diff --git a/src/agent/channel.rs b/src/agent/channel.rs index d514cfc00..1a3d7c155 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1854,6 +1854,7 @@ async fn spawn_branch( state.deps.agent_id.clone(), state.deps.task_store.clone(), state.deps.memory_search.clone(), + state.deps.runtime_config.clone(), state.conversation_logger.clone(), state.channel_store.clone(), crate::conversation::ProcessRunLogger::new(state.deps.sqlite_pool.clone()), diff --git a/src/agent/ingestion.rs b/src/agent/ingestion.rs index cc407d5f5..fa122340c 100644 --- a/src/agent/ingestion.rs +++ b/src/agent/ingestion.rs @@ -476,6 +476,7 @@ async fn process_chunk( deps.agent_id.clone(), deps.task_store.clone(), deps.memory_search.clone(), + deps.runtime_config.clone(), conversation_logger, channel_store, crate::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()), diff --git a/src/messaging/email.rs b/src/messaging/email.rs index a1a274901..3642fe242 100644 --- a/src/messaging/email.rs +++ b/src/messaging/email.rs @@ -5,7 +5,7 @@ use crate::messaging::traits::{HistoryMessage, InboundStream, Messaging}; use crate::{InboundMessage, MessageContent, OutboundResponse}; use anyhow::Context as _; -use chrono::{TimeZone as _, Utc}; +use chrono::{Duration as ChronoDuration, TimeZone as _, Utc}; use lettre::message::header::ContentType; use lettre::message::{Attachment as EmailAttachment, Mailbox, MultiPart, SinglePart}; use lettre::transport::smtp::authentication::Credentials; @@ -43,6 +43,31 @@ struct HistoryEntry { message: HistoryMessage, } +/// Query filters for direct IMAP mailbox search. +#[derive(Debug, Clone, Default)] +pub struct EmailSearchQuery { + pub text: Option, + pub from: Option, + pub subject: Option, + pub unread_only: bool, + pub since_days: Option, + pub folders: Vec, + pub limit: usize, +} + +/// A single match returned by `search_mailbox`. +#[derive(Debug, Clone)] +pub struct EmailSearchHit { + pub folder: String, + pub uid: u32, + pub from: String, + pub subject: String, + pub date: Option, + pub message_id: Option, + pub body: String, + pub attachment_names: Vec, +} + /// Email adapter state. pub struct EmailAdapter { imap_host: String, @@ -987,6 +1012,202 @@ fn fetch_history_from_imap( Ok(entries.into_iter().map(|entry| entry.message).collect()) } +/// Search the configured mailbox directly via IMAP. +/// +/// Results are returned newest-first across searched folders. +pub fn search_mailbox( + config: &EmailConfig, + query: EmailSearchQuery, +) -> crate::Result> { + let mut session = open_imap_session(&EmailPollConfig { + imap_host: config.imap_host.clone(), + imap_port: config.imap_port, + imap_username: config.imap_username.clone(), + imap_password: config.imap_password.clone(), + imap_use_tls: config.imap_use_tls, + from_address: config.from_address.clone(), + smtp_username: config.smtp_username.clone(), + folders: config.folders.clone(), + poll_interval: Duration::from_secs(config.poll_interval_secs.max(5)), + allowed_senders: config.allowed_senders.clone(), + max_body_bytes: config.max_body_bytes.max(1024), + })?; + + let limit = query.limit.clamp(1, 50); + let criterion = build_imap_search_criterion(&query); + let folders = normalize_search_folders(&query.folders, &config.folders); + let max_body_bytes = config.max_body_bytes.max(1024); + let mut seen_message_ids = HashSet::new(); + let mut results = Vec::new(); + + for folder in folders { + if results.len() >= limit { + break; + } + + if let Err(error) = session.select(folder.as_str()) { + tracing::warn!(folder, %error, "failed to select IMAP folder for search"); + continue; + } + + let mut message_uids: Vec = match session.uid_search(&criterion) { + Ok(uids) => uids.into_iter().collect(), + Err(error) => { + tracing::warn!(folder, criterion, %error, "failed IMAP mailbox search"); + continue; + } + }; + + message_uids.sort_unstable_by(|left, right| right.cmp(left)); + + for uid in message_uids { + if results.len() >= limit { + break; + } + + let fetches = match session.uid_fetch(uid.to_string(), "(UID RFC822)") { + Ok(fetches) => fetches, + Err(error) => { + tracing::warn!(folder, uid, %error, "failed IMAP mailbox fetch"); + continue; + } + }; + + for fetch in &fetches { + if results.len() >= limit { + break; + } + + let current_uid = fetch.uid.unwrap_or(uid); + let Some(raw_email) = fetch.body() else { + continue; + }; + + let parsed = match mailparse::parse_mail(raw_email) { + Ok(parsed) => parsed, + Err(error) => { + tracing::warn!(folder, uid = current_uid, %error, "failed to parse searched email MIME"); + continue; + } + }; + + let headers = parsed.headers.as_slice(); + let message_id = headers + .get_first_value("Message-ID") + .map(|value| normalize_message_id(&value)) + .filter(|value| !value.is_empty()); + + if let Some(message_id) = &message_id + && !seen_message_ids.insert(message_id.clone()) + { + continue; + } + + let from = headers.get_first_value("From").unwrap_or_default(); + let subject = headers + .get_first_value("Subject") + .unwrap_or_else(|| "(No subject)".to_string()); + let date = headers.get_first_value("Date"); + let (body, attachment_names) = + extract_text_and_attachments(&parsed, max_body_bytes); + + results.push(EmailSearchHit { + folder: folder.clone(), + uid: current_uid, + from, + subject, + date, + message_id, + body, + attachment_names, + }); + } + } + } + + if let Err(error) = session.logout() { + tracing::debug!(%error, "IMAP logout failed after mailbox search"); + } + + Ok(results) +} + +fn normalize_search_folders(requested: &[String], fallback: &[String]) -> Vec { + let mut folders = requested + .iter() + .map(|folder| folder.trim().to_string()) + .filter(|folder| !folder.is_empty()) + .collect::>(); + + if folders.is_empty() { + folders = fallback + .iter() + .map(|folder| folder.trim().to_string()) + .filter(|folder| !folder.is_empty()) + .collect::>(); + } + + if folders.is_empty() { + folders.push("INBOX".to_string()); + } + + folders.sort(); + folders.dedup(); + folders +} + +fn build_imap_search_criterion(query: &EmailSearchQuery) -> String { + let mut clauses = Vec::new(); + + if query.unread_only { + clauses.push("UNSEEN".to_string()); + } + + if let Some(from) = sanitize_imap_search_value(query.from.as_deref()) { + clauses.push(format!("FROM {}", quote_imap_search_value(&from))); + } + + if let Some(subject) = sanitize_imap_search_value(query.subject.as_deref()) { + clauses.push(format!("SUBJECT {}", quote_imap_search_value(&subject))); + } + + if let Some(text) = sanitize_imap_search_value(query.text.as_deref()) { + clauses.push(format!("TEXT {}", quote_imap_search_value(&text))); + } + + if let Some(since_days) = query.since_days.filter(|days| *days > 0) { + let since_date = (Utc::now() - ChronoDuration::days(since_days as i64)) + .format("%d-%b-%Y") + .to_string(); + clauses.push(format!("SINCE {since_date}")); + } + + if clauses.is_empty() { + "ALL".to_string() + } else { + clauses.join(" ") + } +} + +fn sanitize_imap_search_value(value: Option<&str>) -> Option { + let value = value?.trim(); + if value.is_empty() { + return None; + } + + let normalized = value.replace(['\r', '\n'], " ").trim().to_string(); + if normalized.is_empty() { + None + } else { + Some(normalized) + } +} + +fn quote_imap_search_value(value: &str) -> String { + let escaped = value.replace('\\', "\\\\").replace('"', "\\\""); + format!("\"{escaped}\"") +} + fn is_auto_generated_email(headers: &[mailparse::MailHeader<'_>]) -> bool { let auto_submitted = headers .get_first_value("Auto-Submitted") @@ -1329,7 +1550,8 @@ struct EmailReplyContext { #[cfg(test)] mod tests { use super::{ - derive_thread_key, extract_message_ids, normalize_email_target, normalize_reply_subject, + EmailSearchQuery, build_imap_search_criterion, derive_thread_key, extract_message_ids, + normalize_email_target, normalize_reply_subject, normalize_search_folders, parse_primary_mailbox, }; @@ -1393,4 +1615,34 @@ mod tests { assert_eq!(from_references, from_root_only); } + + #[test] + fn build_imap_search_criterion_defaults_to_all() { + let criterion = build_imap_search_criterion(&EmailSearchQuery::default()); + assert_eq!(criterion, "ALL"); + } + + #[test] + fn build_imap_search_criterion_escapes_values() { + let criterion = build_imap_search_criterion(&EmailSearchQuery { + text: Some("release \\\"candidate\\\"".to_string()), + from: Some("Alice ".to_string()), + subject: Some("Q1 update".to_string()), + unread_only: true, + since_days: None, + folders: Vec::new(), + limit: 10, + }); + + assert!(criterion.contains("UNSEEN")); + assert!(criterion.contains("FROM \"Alice \"")); + assert!(criterion.contains("SUBJECT \"Q1 update\"")); + assert!(criterion.contains("TEXT \"release \\\\\\\"candidate\\\\\\\"\"")); + } + + #[test] + fn normalize_search_folders_falls_back_to_inbox() { + let folders = normalize_search_folders(&[], &[]); + assert_eq!(folders, vec!["INBOX".to_string()]); + } } diff --git a/src/prompts/engine.rs b/src/prompts/engine.rs index 361164846..ef618f389 100644 --- a/src/prompts/engine.rs +++ b/src/prompts/engine.rs @@ -445,10 +445,16 @@ impl PromptEngine { _ => return None, }; - self.render_static(template_name) - .ok() - .map(|value| value.trim().to_string()) - .filter(|value| !value.is_empty()) + match self.render_static(template_name) { + Ok(value) => { + let value = value.trim().to_string(); + if value.is_empty() { None } else { Some(value) } + } + Err(error) => { + tracing::error!(template_name, %error, "failed to render adapter prompt template"); + None + } + } } /// Render the cortex chat system prompt with optional channel context. diff --git a/src/prompts/text.rs b/src/prompts/text.rs index e7fded748..2e504db03 100644 --- a/src/prompts/text.rs +++ b/src/prompts/text.rs @@ -158,6 +158,9 @@ fn lookup(lang: &str, key: &str) -> &'static str { ("en", "tools/channel_recall") => { include_str!("../../prompts/en/tools/channel_recall_description.md.j2") } + ("en", "tools/email_search") => { + include_str!("../../prompts/en/tools/email_search_description.md.j2") + } ("en", "tools/worker_inspect") => { include_str!("../../prompts/en/tools/worker_inspect_description.md.j2") } diff --git a/src/tools.rs b/src/tools.rs index 9ecd5c3e1..cc5908a2d 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -29,6 +29,7 @@ pub mod browser; pub mod cancel; pub mod channel_recall; pub mod cron; +pub mod email_search; pub mod exec; pub mod file; pub mod mcp; @@ -62,6 +63,7 @@ pub use channel_recall::{ ChannelRecallArgs, ChannelRecallError, ChannelRecallOutput, ChannelRecallTool, }; pub use cron::{CronArgs, CronError, CronOutput, CronTool}; +pub use email_search::{EmailSearchArgs, EmailSearchError, EmailSearchOutput, EmailSearchTool}; pub use exec::{EnvVar, ExecArgs, ExecError, ExecOutput, ExecResult, ExecTool}; pub use file::{FileArgs, FileEntry, FileEntryOutput, FileError, FileOutput, FileTool, FileType}; pub use mcp::{McpToolAdapter, McpToolError, McpToolOutput}; @@ -333,11 +335,13 @@ pub async fn remove_channel_tools( /// Each branch gets its own isolated ToolServer so `memory_recall` is never /// visible to the channel. Both `memory_save` and `memory_recall` are /// registered at creation. +#[allow(clippy::too_many_arguments)] pub fn create_branch_tool_server( state: Option, agent_id: AgentId, task_store: Arc, memory_search: Arc, + runtime_config: Arc, conversation_logger: crate::conversation::history::ConversationLogger, channel_store: crate::conversation::ChannelStore, run_logger: crate::conversation::history::ProcessRunLogger, @@ -347,6 +351,7 @@ pub fn create_branch_tool_server( .tool(MemoryRecallTool::new(memory_search.clone())) .tool(MemoryDeleteTool::new(memory_search)) .tool(ChannelRecallTool::new(conversation_logger, channel_store)) + .tool(EmailSearchTool::new(runtime_config)) .tool(WorkerInspectTool::new(run_logger, agent_id.to_string())) .tool(TaskCreateTool::new( task_store.clone(), diff --git a/src/tools/email_search.rs b/src/tools/email_search.rs new file mode 100644 index 000000000..d28339cf1 --- /dev/null +++ b/src/tools/email_search.rs @@ -0,0 +1,274 @@ +//! Search email directly from IMAP for read-back and retrieval. + +use crate::config::{Config, EmailConfig, RuntimeConfig}; +use crate::messaging::email::EmailSearchQuery; +use rig::completion::ToolDefinition; +use rig::tool::Tool; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +/// Tool for searching mailbox content through IMAP. +#[derive(Debug, Clone)] +pub struct EmailSearchTool { + runtime_config: Arc, +} + +impl EmailSearchTool { + pub fn new(runtime_config: Arc) -> Self { + Self { runtime_config } + } +} + +/// Error type for email_search tool. +#[derive(Debug, thiserror::Error)] +#[error("email_search failed: {0}")] +pub struct EmailSearchError(String); + +/// Arguments for email_search. +#[derive(Debug, Deserialize, JsonSchema)] +pub struct EmailSearchArgs { + /// Full-text query against message body and headers. + #[serde(default)] + pub query: Option, + /// Sender filter (email or display-name fragment). + #[serde(default)] + pub from: Option, + /// Subject filter. + #[serde(default)] + pub subject: Option, + /// Optional folder list. Defaults to configured email folders. + #[serde(default)] + pub folders: Vec, + /// When true, restricts to unread messages. + #[serde(default)] + pub unread_only: bool, + /// Search lookback window in days. Defaults to 30. + #[serde(default)] + pub since_days: Option, + /// Maximum results to return (1..50). Defaults to 10. + #[serde(default)] + pub limit: Option, +} + +/// A single email search result. +#[derive(Debug, Serialize)] +pub struct EmailSearchResult { + pub folder: String, + pub uid: u32, + pub from: String, + pub subject: String, + pub date: Option, + pub message_id: Option, + pub body_snippet: String, + pub attachment_names: Vec, +} + +/// Output for email_search. +#[derive(Debug, Serialize)] +pub struct EmailSearchOutput { + pub criteria: String, + pub result_count: usize, + pub results: Vec, +} + +impl Tool for EmailSearchTool { + const NAME: &'static str = "email_search"; + + type Error = EmailSearchError; + type Args = EmailSearchArgs; + type Output = EmailSearchOutput; + + async fn definition(&self, _prompt: String) -> ToolDefinition { + ToolDefinition { + name: Self::NAME.to_string(), + description: crate::prompts::text::get("tools/email_search").to_string(), + parameters: serde_json::json!({ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Optional full-text query for message bodies and headers." + }, + "from": { + "type": "string", + "description": "Optional sender filter (e.g. alice@example.com)." + }, + "subject": { + "type": "string", + "description": "Optional subject filter." + }, + "folders": { + "type": "array", + "description": "Optional folder names to search. Defaults to configured folders.", + "items": { "type": "string" } + }, + "unread_only": { + "type": "boolean", + "description": "When true, search only unread messages." + }, + "since_days": { + "type": "integer", + "description": "Search lookback in days (defaults to 30)." + }, + "limit": { + "type": "integer", + "description": "Maximum results (1-50, default 10)." + } + } + }), + } + } + + async fn call(&self, args: Self::Args) -> Result { + let email_config = load_email_config(&self.runtime_config)?; + + let query = EmailSearchQuery { + text: clean_optional(args.query), + from: clean_optional(args.from), + subject: clean_optional(args.subject), + unread_only: args.unread_only, + since_days: args.since_days.filter(|days| *days > 0).or(Some(30)), + folders: args.folders, + limit: args.limit.unwrap_or(10).clamp(1, 50), + }; + + let criteria = format_search_criteria(&query); + + let search_config = email_config.clone(); + let search_query = query.clone(); + let hits = tokio::task::spawn_blocking(move || { + crate::messaging::email::search_mailbox(&search_config, search_query) + }) + .await + .map_err(|error| EmailSearchError(format!("email search task failed: {error}")))? + .map_err(|error| EmailSearchError(error.to_string()))?; + + let results = hits + .into_iter() + .map(|hit| EmailSearchResult { + folder: hit.folder, + uid: hit.uid, + from: hit.from, + subject: hit.subject, + date: hit.date, + message_id: hit.message_id, + body_snippet: truncate_snippet(&hit.body, 1600), + attachment_names: hit.attachment_names, + }) + .collect::>(); + + Ok(EmailSearchOutput { + criteria, + result_count: results.len(), + results, + }) + } +} + +fn load_email_config(runtime_config: &RuntimeConfig) -> Result { + let config_path = runtime_config.instance_dir.join("config.toml"); + let config = if config_path.exists() { + Config::load_from_path(&config_path).map_err(|error| { + EmailSearchError(format!( + "failed to load config from {}: {error}", + config_path.display() + )) + })? + } else { + Config::load_from_env(&runtime_config.instance_dir).map_err(|error| { + EmailSearchError(format!( + "failed to load config from environment for {}: {error}", + runtime_config.instance_dir.display() + )) + })? + }; + + let email = config + .messaging + .email + .ok_or_else(|| EmailSearchError("email adapter is not configured".to_string()))?; + + if !email.enabled { + return Err(EmailSearchError( + "email adapter is configured but disabled".to_string(), + )); + } + + Ok(email) +} + +fn clean_optional(value: Option) -> Option { + value + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn format_search_criteria(query: &EmailSearchQuery) -> String { + let mut parts = Vec::new(); + if let Some(from) = &query.from { + parts.push(format!("from={from}")); + } + if let Some(subject) = &query.subject { + parts.push(format!("subject={subject}")); + } + if let Some(text) = &query.text { + parts.push(format!("text={text}")); + } + if query.unread_only { + parts.push("unread_only=true".to_string()); + } + if let Some(since_days) = query.since_days { + parts.push(format!("since_days={since_days}")); + } + if !query.folders.is_empty() { + parts.push(format!("folders={}", query.folders.join(","))); + } + parts.push(format!("limit={}", query.limit)); + + parts.join("; ") +} + +fn truncate_snippet(value: &str, max_bytes: usize) -> String { + if value.len() <= max_bytes { + return value.to_string(); + } + + let mut end = max_bytes; + while end > 0 && !value.is_char_boundary(end) { + end -= 1; + } + + format!("{}\n\n[snippet truncated]", &value[..end]) +} + +#[cfg(test)] +mod tests { + use super::{clean_optional, format_search_criteria}; + use crate::messaging::email::EmailSearchQuery; + + #[test] + fn clean_optional_rejects_empty_values() { + assert_eq!(clean_optional(None), None); + assert_eq!(clean_optional(Some(" ".to_string())), None); + assert_eq!( + clean_optional(Some(" value ".to_string())), + Some("value".to_string()) + ); + } + + #[test] + fn format_search_criteria_includes_defaults() { + let criteria = format_search_criteria(&EmailSearchQuery { + text: None, + from: None, + subject: None, + unread_only: false, + since_days: Some(30), + folders: Vec::new(), + limit: 10, + }); + + assert_eq!(criteria, "since_days=30; limit=10"); + } +} diff --git a/tests/context_dump.rs b/tests/context_dump.rs index d97925e4b..f3c1e3baf 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -279,6 +279,7 @@ async fn dump_branch_context() { deps.agent_id.clone(), deps.task_store.clone(), deps.memory_search.clone(), + deps.runtime_config.clone(), conversation_logger, channel_store, run_logger, @@ -467,6 +468,7 @@ async fn dump_all_contexts() { deps.agent_id.clone(), deps.task_store.clone(), deps.memory_search.clone(), + deps.runtime_config.clone(), conversation_logger, channel_store, run_logger, From 2a34b1acee50a94c95666e19f0519f94054d8890 Mon Sep 17 00:00:00 2001 From: James Pine Date: Thu, 26 Feb 2026 18:24:31 -0800 Subject: [PATCH 2/2] fix: harden email_search ordering and config loading --- src/config.rs | 8 ++- src/messaging/email.rs | 129 ++++++++++++++++++++++++++++++-------- src/tools/email_search.rs | 38 +++++------ 3 files changed, 123 insertions(+), 52 deletions(-) diff --git a/src/config.rs b/src/config.rs index ea05f87aa..22ad95391 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2618,11 +2618,17 @@ impl Config { pub fn load() -> Result { let instance_dir = Self::default_instance_dir(); + Self::load_for_instance(&instance_dir) + } + + /// Load configuration for a specific instance directory. + pub fn load_for_instance(instance_dir: &Path) -> Result { let config_path = instance_dir.join("config.toml"); + if config_path.exists() { Self::load_from_path(&config_path) } else { - Self::load_from_env(&instance_dir) + Self::load_from_env(instance_dir) } } diff --git a/src/messaging/email.rs b/src/messaging/email.rs index 3642fe242..ae73e9d19 100644 --- a/src/messaging/email.rs +++ b/src/messaging/email.rs @@ -1038,13 +1038,9 @@ pub fn search_mailbox( let folders = normalize_search_folders(&query.folders, &config.folders); let max_body_bytes = config.max_body_bytes.max(1024); let mut seen_message_ids = HashSet::new(); - let mut results = Vec::new(); + let mut ranked_results: Vec<(i64, EmailSearchHit)> = Vec::new(); for folder in folders { - if results.len() >= limit { - break; - } - if let Err(error) = session.select(folder.as_str()) { tracing::warn!(folder, %error, "failed to select IMAP folder for search"); continue; @@ -1053,7 +1049,17 @@ pub fn search_mailbox( let mut message_uids: Vec = match session.uid_search(&criterion) { Ok(uids) => uids.into_iter().collect(), Err(error) => { - tracing::warn!(folder, criterion, %error, "failed IMAP mailbox search"); + tracing::warn!( + folder, + criterion_len = criterion.len(), + has_text = query.text.is_some(), + has_from = query.from.is_some(), + has_subject = query.subject.is_some(), + unread_only = query.unread_only, + since_days = query.since_days, + %error, + "failed IMAP mailbox search" + ); continue; } }; @@ -1061,10 +1067,6 @@ pub fn search_mailbox( message_uids.sort_unstable_by(|left, right| right.cmp(left)); for uid in message_uids { - if results.len() >= limit { - break; - } - let fetches = match session.uid_fetch(uid.to_string(), "(UID RFC822)") { Ok(fetches) => fetches, Err(error) => { @@ -1074,10 +1076,6 @@ pub fn search_mailbox( }; for fetch in &fetches { - if results.len() >= limit { - break; - } - let current_uid = fetch.uid.unwrap_or(uid); let Some(raw_email) = fetch.body() else { continue; @@ -1108,23 +1106,32 @@ pub fn search_mailbox( .get_first_value("Subject") .unwrap_or_else(|| "(No subject)".to_string()); let date = headers.get_first_value("Date"); + let sort_timestamp = date + .as_deref() + .and_then(|value| mailparse::dateparse(value).ok()) + .unwrap_or(i64::MIN); let (body, attachment_names) = extract_text_and_attachments(&parsed, max_body_bytes); - results.push(EmailSearchHit { - folder: folder.clone(), - uid: current_uid, - from, - subject, - date, - message_id, - body, - attachment_names, - }); + ranked_results.push(( + sort_timestamp, + EmailSearchHit { + folder: folder.clone(), + uid: current_uid, + from, + subject, + date, + message_id, + body, + attachment_names, + }, + )); } } } + let results = sort_and_limit_search_hits(ranked_results, limit); + if let Err(error) = session.logout() { tracing::debug!(%error, "IMAP logout failed after mailbox search"); } @@ -1132,6 +1139,24 @@ pub fn search_mailbox( Ok(results) } +fn sort_and_limit_search_hits( + mut ranked_results: Vec<(i64, EmailSearchHit)>, + limit: usize, +) -> Vec { + ranked_results.sort_unstable_by(|left, right| { + right + .0 + .cmp(&left.0) + .then_with(|| right.1.uid.cmp(&left.1.uid)) + }); + + ranked_results + .into_iter() + .map(|(_, hit)| hit) + .take(limit) + .collect() +} + fn normalize_search_folders(requested: &[String], fallback: &[String]) -> Vec { let mut folders = requested .iter() @@ -1550,9 +1575,9 @@ struct EmailReplyContext { #[cfg(test)] mod tests { use super::{ - EmailSearchQuery, build_imap_search_criterion, derive_thread_key, extract_message_ids, - normalize_email_target, normalize_reply_subject, normalize_search_folders, - parse_primary_mailbox, + EmailSearchHit, EmailSearchQuery, build_imap_search_criterion, derive_thread_key, + extract_message_ids, normalize_email_target, normalize_reply_subject, + normalize_search_folders, parse_primary_mailbox, sort_and_limit_search_hits, }; #[test] @@ -1645,4 +1670,54 @@ mod tests { let folders = normalize_search_folders(&[], &[]); assert_eq!(folders, vec!["INBOX".to_string()]); } + + #[test] + fn sort_and_limit_search_hits_orders_globally_newest_first() { + let ranked = vec![ + ( + 100, + EmailSearchHit { + folder: "INBOX".to_string(), + uid: 10, + from: "a@example.com".to_string(), + subject: "old".to_string(), + date: None, + message_id: Some("m1".to_string()), + body: "body".to_string(), + attachment_names: Vec::new(), + }, + ), + ( + 300, + EmailSearchHit { + folder: "Support".to_string(), + uid: 20, + from: "b@example.com".to_string(), + subject: "newest".to_string(), + date: None, + message_id: Some("m2".to_string()), + body: "body".to_string(), + attachment_names: Vec::new(), + }, + ), + ( + 200, + EmailSearchHit { + folder: "Escalations".to_string(), + uid: 30, + from: "c@example.com".to_string(), + subject: "middle".to_string(), + date: None, + message_id: Some("m3".to_string()), + body: "body".to_string(), + attachment_names: Vec::new(), + }, + ), + ]; + + let results = sort_and_limit_search_hits(ranked, 2); + assert_eq!(results.len(), 2); + assert_eq!(results[0].subject, "newest"); + assert_eq!(results[1].subject, "middle"); + } } diff --git a/src/tools/email_search.rs b/src/tools/email_search.rs index d28339cf1..6f71c54a3 100644 --- a/src/tools/email_search.rs +++ b/src/tools/email_search.rs @@ -6,6 +6,7 @@ use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::path::Path; use std::sync::Arc; /// Tool for searching mailbox content through IMAP. @@ -121,8 +122,6 @@ impl Tool for EmailSearchTool { } async fn call(&self, args: Self::Args) -> Result { - let email_config = load_email_config(&self.runtime_config)?; - let query = EmailSearchQuery { text: clean_optional(args.query), from: clean_optional(args.from), @@ -135,14 +134,15 @@ impl Tool for EmailSearchTool { let criteria = format_search_criteria(&query); - let search_config = email_config.clone(); + let instance_dir = self.runtime_config.instance_dir.clone(); let search_query = query.clone(); let hits = tokio::task::spawn_blocking(move || { - crate::messaging::email::search_mailbox(&search_config, search_query) + let email_config = load_email_config(&instance_dir)?; + crate::messaging::email::search_mailbox(&email_config, search_query) + .map_err(|error| EmailSearchError(error.to_string())) }) .await - .map_err(|error| EmailSearchError(format!("email search task failed: {error}")))? - .map_err(|error| EmailSearchError(error.to_string()))?; + .map_err(|error| EmailSearchError(format!("email search task failed: {error}")))??; let results = hits .into_iter() @@ -166,23 +166,13 @@ impl Tool for EmailSearchTool { } } -fn load_email_config(runtime_config: &RuntimeConfig) -> Result { - let config_path = runtime_config.instance_dir.join("config.toml"); - let config = if config_path.exists() { - Config::load_from_path(&config_path).map_err(|error| { - EmailSearchError(format!( - "failed to load config from {}: {error}", - config_path.display() - )) - })? - } else { - Config::load_from_env(&runtime_config.instance_dir).map_err(|error| { - EmailSearchError(format!( - "failed to load config from environment for {}: {error}", - runtime_config.instance_dir.display() - )) - })? - }; +fn load_email_config(instance_dir: &Path) -> Result { + let config = Config::load_for_instance(instance_dir).map_err(|error| { + EmailSearchError(format!( + "failed to resolve config for {}: {error}", + instance_dir.display() + )) + })?; let email = config .messaging @@ -213,7 +203,7 @@ fn format_search_criteria(query: &EmailSearchQuery) -> String { parts.push(format!("subject={subject}")); } if let Some(text) = &query.text { - parts.push(format!("text={text}")); + parts.push(format!("query={text}")); } if query.unread_only { parts.push("unread_only=true".to_string());