diff --git a/docs/content/docs/(configuration)/config.mdx b/docs/content/docs/(configuration)/config.mdx index 1ef0bb116..9d4e23dbb 100644 --- a/docs/content/docs/(configuration)/config.mdx +++ b/docs/content/docs/(configuration)/config.mdx @@ -499,6 +499,8 @@ Thresholds are fractions of `context_window`. | `refresh_secs` | integer | 900 | Seconds between background warmup passes | | `startup_delay_secs` | integer | 5 | Delay before first warmup pass after boot | +When warmup is enabled, it is the primary bulletin refresh path. The cortex bulletin loop remains as a fallback generator when warmup is disabled or when the cached bulletin is stale (`bulletin_age_secs >= max(1, warmup.refresh_secs)`). + Dispatch readiness is derived from warmup runtime state: - warmup state must be `warm` diff --git a/docs/content/docs/(core)/architecture.mdx b/docs/content/docs/(core)/architecture.mdx index d99dec336..1a8429d5f 100644 --- a/docs/content/docs/(core)/architecture.mdx +++ b/docs/content/docs/(core)/architecture.mdx @@ -336,9 +336,11 @@ CLI (clap) → parse args → For each agent: → Run SQLite migrations → Initialize MemoryStore, LanceDB tables - → Initialize MessagingManager (start all platform adapters) - → Initialize CronScheduler - → Start Cortex (warmup → first bulletin) + → Load RuntimeConfig + identity + skills + → Best-effort startup warmup pass (bounded wait) + → Initialize MessagingManager (start all platform adapters) + → Initialize CronScheduler + → Start Cortex loops (warmup, bulletin fallback, association, ready-task) → Register agent in active agents map → Enter main event loop (tokio::select!) → Inbound messages → route to Channel instances diff --git a/docs/content/docs/(core)/cortex.mdx b/docs/content/docs/(core)/cortex.mdx index e2f682403..3622cfb17 100644 --- a/docs/content/docs/(core)/cortex.mdx +++ b/docs/content/docs/(core)/cortex.mdx @@ -17,7 +17,9 @@ This is what makes Spacebot feel like it actually *knows* you. Without the bulle Bulletin generation is a two-phase process: programmatic retrieval, then LLM synthesis. -On a configurable interval (default: 60 minutes), the cortex: +When warmup is enabled (default), warmup is the primary bulletin refresher on `warmup.refresh_secs` (default: 15 minutes). The bulletin loop still runs on `bulletin_interval_secs` (default: 60 minutes) as a fallback when warmup is disabled or when the cached bulletin is stale (`bulletin_age_secs >= max(1, warmup.refresh_secs)`). + +Each bulletin generation pass does: 1. **Retrieves** memory data across eight predefined sections by querying the memory store directly (no LLM needed for retrieval): - Identity & Core Facts — typed search for Identity memories, sorted by importance @@ -34,7 +36,7 @@ On a configurable interval (default: 60 minutes), the cortex: This design avoids the problem of an LLM formulating search queries without conversation context. The retrieval phase uses `SearchMode::Typed`, `SearchMode::Recent`, and `SearchMode::Important` — metadata-based modes that query SQLite directly without needing vector embeddings or search terms. The LLM only gets involved for the part it's good at: turning structured data into readable prose. -The first bulletin is generated immediately on startup. Subsequent runs happen every `bulletin_interval_secs`. If a generation fails, the previous bulletin is preserved. If the memory graph is empty, an empty bulletin is stored without invoking the LLM. +On startup, Spacebot runs a best-effort warmup pass before adapters accept traffic (bounded wait), so the first bulletin is usually already present when the first user message arrives. If generation fails, the previous bulletin is preserved. If the memory graph is empty, an empty bulletin is stored without invoking the LLM. ### What Channels See diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index f55917c73..ea24d14c6 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -51,6 +51,118 @@ fn should_execute_warmup(warmup_config: crate::config::WarmupConfig, force: bool warmup_config.enabled || force } +fn should_generate_bulletin_from_bulletin_loop( + warmup_config: crate::config::WarmupConfig, + status: &crate::config::WarmupStatus, +) -> bool { + // If warmup is disabled, bulletin_loop remains the source of truth. + if !warmup_config.enabled { + return true; + } + + let age_secs = bulletin_age_secs(status.last_refresh_unix_ms).or(status.bulletin_age_secs); + + let Some(age_secs) = age_secs else { + // No recorded bulletin refresh yet — let bulletin loop generate one. + return true; + }; + + // Warmup loop already refreshes bulletin on this cadence. If the cached + // bulletin is still fresher than warmup cadence, skip duplicate synthesis. + age_secs >= warmup_config.refresh_secs.max(1) +} + +fn has_completed_initial_warmup(status: &crate::config::WarmupStatus) -> bool { + status.last_refresh_unix_ms.is_some() + && matches!(status.state, crate::config::WarmupState::Warm) +} + +fn apply_cancelled_warmup_status( + status: &mut crate::config::WarmupStatus, + reason: &str, + force: bool, +) -> bool { + if !matches!(status.state, crate::config::WarmupState::Warming) { + return false; + } + + status.state = crate::config::WarmupState::Degraded; + status.last_error = Some(format!( + "warmup cancelled before completion (reason: {reason}, forced: {force})" + )); + status.bulletin_age_secs = bulletin_age_secs(status.last_refresh_unix_ms); + true +} + +struct WarmupRunGuard<'a> { + deps: &'a AgentDeps, + reason: &'a str, + force: bool, + committed: bool, +} + +impl<'a> WarmupRunGuard<'a> { + fn new(deps: &'a AgentDeps, reason: &'a str, force: bool) -> Self { + Self { + deps, + reason, + force, + committed: false, + } + } + + fn mark_committed(&mut self) { + self.committed = true; + } +} + +impl Drop for WarmupRunGuard<'_> { + fn drop(&mut self) { + if self.committed { + return; + } + + update_warmup_status(self.deps, |status| { + if apply_cancelled_warmup_status(status, self.reason, self.force) { + tracing::warn!( + reason = self.reason, + forced = self.force, + "warmup run ended without terminal status; demoted state to degraded" + ); + } + }); + } +} + +async fn maybe_generate_bulletin_under_lock( + warmup_lock: &tokio::sync::Mutex<()>, + warmup_config: &arc_swap::ArcSwap, + warmup_status: &arc_swap::ArcSwap, + generate: F, +) -> bool +where + F: FnOnce() -> Fut, + Fut: std::future::Future, +{ + let _warmup_guard = warmup_lock.lock().await; + let warmup_config = **warmup_config.load(); + let status = warmup_status.load().as_ref().clone(); + let age_secs = bulletin_age_secs(status.last_refresh_unix_ms).or(status.bulletin_age_secs); + let refresh_secs = warmup_config.refresh_secs.max(1); + + if should_generate_bulletin_from_bulletin_loop(warmup_config, &status) { + generate().await + } else { + tracing::debug!( + warmup_enabled = warmup_config.enabled, + age_secs = ?age_secs, + refresh_secs, + "skipping bulletin loop generation because warmup bulletin is fresh" + ); + true + } +} + /// The cortex observes system-wide activity and maintains the memory bulletin. pub struct Cortex { pub deps: AgentDeps, @@ -269,9 +381,11 @@ impl Cortex { /// Spawn the cortex bulletin loop for an agent. /// -/// Generates a memory bulletin immediately on startup, then refreshes on a -/// configurable interval. The bulletin is stored in `RuntimeConfig::memory_bulletin` -/// and injected into every channel's system prompt. +/// Runs bulletin/profile maintenance on a configurable interval. +/// +/// When warmup is enabled, warmup is the primary bulletin refresher and this +/// loop skips duplicate bulletin synthesis while the cached bulletin is fresh. +/// When warmup is disabled (or stale), this loop generates the bulletin. pub fn spawn_bulletin_loop(deps: AgentDeps, logger: CortexLogger) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { if let Err(error) = run_bulletin_loop(&deps, &logger).await { @@ -286,7 +400,8 @@ pub fn spawn_bulletin_loop(deps: AgentDeps, logger: CortexLogger) -> tokio::task pub fn spawn_warmup_loop(deps: AgentDeps, logger: CortexLogger) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { tracing::info!("warmup loop started"); - let mut completed_initial_pass = false; + let mut completed_initial_pass = + has_completed_initial_warmup(deps.runtime_config.warmup_status.load().as_ref()); loop { let warmup_config = **deps.runtime_config.warmup.load(); @@ -301,6 +416,11 @@ pub fn spawn_warmup_loop(deps: AgentDeps, logger: CortexLogger) -> tokio::task:: continue; } + if !completed_initial_pass { + completed_initial_pass = + has_completed_initial_warmup(deps.runtime_config.warmup_status.load().as_ref()); + } + let sleep_secs = if completed_initial_pass { warmup_config.refresh_secs.max(1) } else { @@ -308,6 +428,14 @@ pub fn spawn_warmup_loop(deps: AgentDeps, logger: CortexLogger) -> tokio::task:: }; tokio::time::sleep(Duration::from_secs(sleep_secs)).await; + if !completed_initial_pass { + completed_initial_pass = + has_completed_initial_warmup(deps.runtime_config.warmup_status.load().as_ref()); + if completed_initial_pass { + continue; + } + } + let reason = if completed_initial_pass { "scheduled" } else { @@ -339,6 +467,7 @@ pub async fn run_warmup_once(deps: &AgentDeps, logger: &CortexLogger, reason: &s status.last_error = None; status.bulletin_age_secs = bulletin_age_secs(status.last_refresh_unix_ms); }); + let mut terminal_state_guard = WarmupRunGuard::new(deps, reason, force); let mut errors = Vec::new(); let mut embedding_ready = false; @@ -371,6 +500,7 @@ pub async fn run_warmup_once(deps: &AgentDeps, logger: &CortexLogger, reason: &s status.last_error = None; status.bulletin_age_secs = Some(0); }); + terminal_state_guard.mark_committed(); logger.log( "warmup_succeeded", "Warmup pass completed", @@ -388,6 +518,7 @@ pub async fn run_warmup_once(deps: &AgentDeps, logger: &CortexLogger, reason: &s status.last_error = Some(last_error.clone()); status.bulletin_age_secs = bulletin_age_secs(status.last_refresh_unix_ms); }); + terminal_state_guard.mark_committed(); logger.log( "warmup_failed", "Warmup pass failed", @@ -430,10 +561,14 @@ async fn run_bulletin_loop(deps: &AgentDeps, logger: &CortexLogger) -> anyhow::R // Run immediately on startup, with retries for attempt in 0..=MAX_RETRIES { - let bulletin_ok = { - let _warmup_guard = deps.runtime_config.warmup_lock.lock().await; - generate_bulletin(deps, logger).await - }; + let bulletin_ok = maybe_generate_bulletin_under_lock( + deps.runtime_config.warmup_lock.as_ref(), + &deps.runtime_config.warmup, + &deps.runtime_config.warmup_status, + || generate_bulletin(deps, logger), + ) + .await; + if bulletin_ok { break; } @@ -465,10 +600,13 @@ async fn run_bulletin_loop(deps: &AgentDeps, logger: &CortexLogger) -> anyhow::R tokio::time::sleep(Duration::from_secs(interval)).await; - { - let _warmup_guard = deps.runtime_config.warmup_lock.lock().await; - generate_bulletin(deps, logger).await; - } + maybe_generate_bulletin_under_lock( + deps.runtime_config.warmup_lock.as_ref(), + &deps.runtime_config.warmup, + &deps.runtime_config.warmup_status, + || generate_bulletin(deps, logger), + ) + .await; generate_profile(deps, logger).await; } } @@ -1379,7 +1517,13 @@ async fn fetch_memories_for_association( #[cfg(test)] mod tests { - use super::should_execute_warmup; + use super::{ + apply_cancelled_warmup_status, has_completed_initial_warmup, + maybe_generate_bulletin_under_lock, should_execute_warmup, + should_generate_bulletin_from_bulletin_loop, + }; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; #[test] fn run_warmup_once_semantics_skip_when_disabled_without_force() { @@ -1410,4 +1554,167 @@ mod tests { assert!(should_execute_warmup(warmup_config, false)); } + + #[test] + fn initial_warmup_completion_detected_when_status_has_refresh_timestamp() { + let status = crate::config::WarmupStatus { + state: crate::config::WarmupState::Warm, + last_refresh_unix_ms: Some(1_700_000_000_000), + ..Default::default() + }; + + assert!(has_completed_initial_warmup(&status)); + } + + #[test] + fn initial_warmup_completion_not_detected_without_refresh_timestamp() { + let status = crate::config::WarmupStatus::default(); + + assert!(!has_completed_initial_warmup(&status)); + } + + #[test] + fn initial_warmup_completion_not_detected_when_timestamp_exists_but_state_is_not_warm() { + let status = crate::config::WarmupStatus { + state: crate::config::WarmupState::Cold, + last_refresh_unix_ms: Some(1_700_000_000_000), + ..Default::default() + }; + + assert!(!has_completed_initial_warmup(&status)); + } + + #[test] + fn cancelled_warmup_demotes_warming_state_to_degraded() { + let mut status = crate::config::WarmupStatus { + state: crate::config::WarmupState::Warming, + ..Default::default() + }; + + let changed = apply_cancelled_warmup_status(&mut status, "startup", false); + + assert!(changed); + assert_eq!(status.state, crate::config::WarmupState::Degraded); + assert!( + status + .last_error + .as_deref() + .is_some_and(|error| error.contains("warmup cancelled before completion")) + ); + } + + #[test] + fn cancelled_warmup_does_not_override_terminal_state() { + let mut status = crate::config::WarmupStatus { + state: crate::config::WarmupState::Warm, + last_refresh_unix_ms: Some(1_700_000_000_000), + ..Default::default() + }; + + let changed = apply_cancelled_warmup_status(&mut status, "scheduled", false); + + assert!(!changed); + assert_eq!(status.state, crate::config::WarmupState::Warm); + } + + #[test] + fn bulletin_loop_generation_runs_when_warmup_disabled() { + let warmup_config = crate::config::WarmupConfig { + enabled: false, + ..Default::default() + }; + let status = crate::config::WarmupStatus { + bulletin_age_secs: Some(0), + ..Default::default() + }; + + assert!(should_generate_bulletin_from_bulletin_loop( + warmup_config, + &status + )); + } + + #[test] + fn bulletin_loop_generation_skips_when_warmup_enabled_and_fresh() { + let warmup_config = crate::config::WarmupConfig { + enabled: true, + refresh_secs: 900, + ..Default::default() + }; + let status = crate::config::WarmupStatus { + bulletin_age_secs: Some(10), + ..Default::default() + }; + + assert!(!should_generate_bulletin_from_bulletin_loop( + warmup_config, + &status + )); + } + + #[test] + fn bulletin_loop_generation_runs_when_warmup_enabled_and_stale() { + let warmup_config = crate::config::WarmupConfig { + enabled: true, + refresh_secs: 900, + ..Default::default() + }; + let status = crate::config::WarmupStatus { + bulletin_age_secs: Some(901), + ..Default::default() + }; + + assert!(should_generate_bulletin_from_bulletin_loop( + warmup_config, + &status + )); + } + + #[tokio::test] + async fn bulletin_loop_generation_lock_snapshot_skips_after_fresh_update() { + let warmup_lock = Arc::new(tokio::sync::Mutex::new(())); + let warmup_config = Arc::new(arc_swap::ArcSwap::from_pointee( + crate::config::WarmupConfig::default(), + )); + let warmup_status = Arc::new(arc_swap::ArcSwap::from_pointee( + crate::config::WarmupStatus { + bulletin_age_secs: Some(901), // stale at first + ..Default::default() + }, + )); + + let calls = Arc::new(AtomicUsize::new(0)); + + // Hold lock so we can update status before helper takes its snapshot. + let guard = warmup_lock.as_ref().lock().await; + + let warmup_lock_for_task = Arc::clone(&warmup_lock); + let warmup_config_for_task = Arc::clone(&warmup_config); + let warmup_status_for_task = Arc::clone(&warmup_status); + let calls_for_task = Arc::clone(&calls); + let task = tokio::spawn(async move { + maybe_generate_bulletin_under_lock( + warmup_lock_for_task.as_ref(), + warmup_config_for_task.as_ref(), + warmup_status_for_task.as_ref(), + || async { + calls_for_task.fetch_add(1, Ordering::SeqCst); + true + }, + ) + .await + }); + + // Warmup refresh lands before lock is released; helper should observe + // fresh status and skip generation. + warmup_status.store(Arc::new(crate::config::WarmupStatus { + bulletin_age_secs: Some(10), + ..Default::default() + })); + drop(guard); + + let result = task.await.expect("task should join"); + assert!(result); + assert_eq!(calls.load(Ordering::SeqCst), 0); + } } diff --git a/src/main.rs b/src/main.rs index f2a928904..3aaa7f7c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1192,6 +1192,30 @@ async fn run( /// Initialize agents, messaging adapters, cron, cortex, and ingestion. /// Extracted so it can be called either at startup or after providers are configured. +async fn wait_for_startup_warmup_tasks( + startup_warmup: &mut tokio::task::JoinSet<()>, + timeout: std::time::Duration, +) -> bool { + let wait_all = async { + while let Some(result) = startup_warmup.join_next().await { + if let Err(error) = result { + if error.is_cancelled() { + tracing::warn!(%error, "startup warmup task cancelled"); + } else { + tracing::error!(%error, "startup warmup task panicked"); + } + } + } + }; + + if tokio::time::timeout(timeout, wait_all).await.is_err() { + startup_warmup.abort_all(); + true + } else { + false + } +} + #[allow(clippy::too_many_arguments)] async fn initialize_agents( config: &spacebot::config::Config, @@ -1436,6 +1460,50 @@ async fn initialize_agents( api_state.set_instance_dir(config.instance_dir.clone()); } + // Run a startup warmup pass for every agent before adapters begin receiving + // inbound traffic. This reduces first-message cold-start latency. + { + const STARTUP_WARMUP_WAIT_SECS: u64 = 30; + let mut startup_warmup = tokio::task::JoinSet::new(); + + for (agent_id, agent) in agents.iter() { + let deps = agent.deps.clone(); + let sqlite_pool = agent.db.sqlite.clone(); + let agent_id = agent_id.clone(); + startup_warmup.spawn(async move { + let logger = spacebot::agent::cortex::CortexLogger::new(sqlite_pool); + spacebot::agent::cortex::run_warmup_once( + &deps, + &logger, + "startup_pre_adapter", + false, + ) + .await; + let status = deps.runtime_config.warmup_status.load().as_ref().clone(); + tracing::info!( + agent_id = %agent_id, + state = ?status.state, + embedding_ready = status.embedding_ready, + bulletin_age_secs = ?status.bulletin_age_secs, + last_error = ?status.last_error, + "startup warmup pass finished" + ); + }); + } + + if wait_for_startup_warmup_tasks( + &mut startup_warmup, + std::time::Duration::from_secs(STARTUP_WARMUP_WAIT_SECS), + ) + .await + { + tracing::warn!( + timeout_secs = STARTUP_WARMUP_WAIT_SECS, + "startup warmup wait timed out; cancelled unfinished startup warmup tasks and continuing startup" + ); + } + } + // Initialize messaging adapters let new_messaging_manager = spacebot::messaging::MessagingManager::new(); @@ -1910,3 +1978,70 @@ async fn initialize_agents( Ok(()) } + +#[cfg(test)] +mod tests { + use super::wait_for_startup_warmup_tasks; + use std::future::pending; + use std::sync::Arc; + use std::time::Duration; + + #[tokio::test] + async fn startup_warmup_wait_returns_false_when_tasks_finish_in_time() { + let mut tasks = tokio::task::JoinSet::new(); + tasks.spawn(async {}); + let timed_out = wait_for_startup_warmup_tasks(&mut tasks, Duration::from_millis(50)).await; + assert!(!timed_out); + } + + #[tokio::test] + async fn startup_warmup_wait_returns_true_when_timeout_expires() { + let mut tasks = tokio::task::JoinSet::new(); + tasks.spawn(async { + tokio::time::sleep(Duration::from_millis(50)).await; + }); + let timed_out = wait_for_startup_warmup_tasks(&mut tasks, Duration::from_millis(5)).await; + assert!(timed_out); + } + + #[tokio::test] + async fn startup_warmup_wait_aborts_timed_out_task_and_releases_lock() { + let warmup_lock = Arc::new(tokio::sync::Mutex::new(())); + let mut tasks = tokio::task::JoinSet::new(); + let warmup_lock_for_task = Arc::clone(&warmup_lock); + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel(); + tasks.spawn(async move { + let _guard = warmup_lock_for_task.lock().await; + locked_tx.send(()).ok(); + pending::<()>().await; + }); + + tokio::time::timeout(Duration::from_millis(50), locked_rx) + .await + .expect("task should acquire lock") + .expect("lock signal should send"); + + let timed_out = wait_for_startup_warmup_tasks(&mut tasks, Duration::from_millis(5)).await; + assert!(timed_out); + + let _guard = tokio::time::timeout(Duration::from_millis(50), warmup_lock.lock()) + .await + .expect("startup warmup timeout should cancel blocked task and release lock"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn startup_warmup_wait_timeout_stays_bounded_for_non_cooperative_task() { + let mut tasks = tokio::task::JoinSet::new(); + tasks.spawn(async { + std::thread::sleep(Duration::from_millis(100)); + }); + + let started = std::time::Instant::now(); + let timed_out = wait_for_startup_warmup_tasks(&mut tasks, Duration::from_millis(5)).await; + assert!(timed_out); + assert!( + started.elapsed() < Duration::from_millis(80), + "startup warmup timeout should return without waiting for non-cooperative task" + ); + } +}