diff --git a/crates/loopal-context/src/middleware/config_refresh.rs b/crates/loopal-context/src/middleware/config_refresh.rs new file mode 100644 index 0000000..df9a2ef --- /dev/null +++ b/crates/loopal-context/src/middleware/config_refresh.rs @@ -0,0 +1,67 @@ +use std::sync::Mutex; + +use async_trait::async_trait; +use loopal_error::LoopalError; +use loopal_message::{ContentBlock, Message, MessageRole}; +use loopal_provider_api::{Middleware, MiddlewareContext}; +use tracing::debug; + +use super::file_snapshot::FileSnapshot; + +pub struct ConfigRefreshMiddleware { + snapshots: Mutex>, +} + +impl ConfigRefreshMiddleware { + pub fn new(snapshots: Vec) -> Self { + Self { + snapshots: Mutex::new(snapshots), + } + } +} + +#[async_trait] +impl Middleware for ConfigRefreshMiddleware { + fn name(&self) -> &str { + "config_refresh" + } + + async fn process(&self, ctx: &mut MiddlewareContext) -> Result<(), LoopalError> { + // Recover from poison — a panic in a previous holder shouldn't block future checks, + // since the lock only protects file-snapshot metadata (mtime + content cache). + let mut snapshots = self.snapshots.lock().unwrap_or_else(|e| e.into_inner()); + let mut reminders = Vec::new(); + + for snap in snapshots.iter_mut() { + if let Some(reminder) = snap.check_and_refresh() { + debug!(label = snap.label(), "config file changed"); + reminders.push(reminder); + } + } + drop(snapshots); + + if reminders.is_empty() { + return Ok(()); + } + + let reminder_text = format!( + "\n{}\n", + reminders.join("\n\n") + ); + debug!( + count = reminders.len(), + "injecting config refresh reminders" + ); + // User role (not System) — modifying the system prompt would invalidate + // Anthropic's prefix cache. system-reminder XML tags are the established + // convention for injecting context updates as user messages. + ctx.messages.push(Message { + id: None, + role: MessageRole::User, + content: vec![ContentBlock::Text { + text: reminder_text, + }], + }); + Ok(()) + } +} diff --git a/crates/loopal-context/src/middleware/file_snapshot.rs b/crates/loopal-context/src/middleware/file_snapshot.rs new file mode 100644 index 0000000..37ba2e7 --- /dev/null +++ b/crates/loopal-context/src/middleware/file_snapshot.rs @@ -0,0 +1,136 @@ +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +pub struct FileSnapshot { + path: PathBuf, + label: String, + content: String, + mtime: Option, +} + +impl FileSnapshot { + pub fn load(path: PathBuf, label: impl Into) -> Self { + let (content, mtime) = read_with_mtime(&path); + Self { + path, + label: label.into(), + content, + mtime, + } + } + + pub fn label(&self) -> &str { + &self.label + } + + /// Check for changes and return a formatted reminder if the file content differs. + /// Combines mtime check + content read + diff in a single call to avoid TOCTOU races. + pub fn check_and_refresh(&mut self) -> Option { + let current_mtime = fs::metadata(&self.path) + .ok() + .and_then(|m| m.modified().ok()); + if current_mtime == self.mtime { + return None; + } + // Stat first (above), then read content. If a concurrent write happens between + // stat and read, we capture newer content with an older mtime. The next check + // sees a newer mtime and re-reads — no writes are permanently missed. + let new_content = fs::read_to_string(&self.path).unwrap_or_default(); + + if new_content == self.content { + self.mtime = current_mtime; + return None; + } + let (added, removed) = line_diff(&self.content, &new_content); + self.content = new_content; + self.mtime = current_mtime; + if added.is_empty() && removed.is_empty() { + return None; + } + let added_refs: Vec<&str> = added.iter().map(String::as_str).collect(); + let removed_refs: Vec<&str> = removed.iter().map(String::as_str).collect(); + Some(format_file_change(&self.label, &added_refs, &removed_refs)) + } +} + +fn read_with_mtime(path: &Path) -> (String, Option) { + let mtime = fs::metadata(path).ok().and_then(|m| m.modified().ok()); + let content = fs::read_to_string(path).unwrap_or_default(); + (content, mtime) +} + +/// Ordered line diff that preserves duplicates. +/// Returns (lines only in new, lines only in old) maintaining original order. +pub fn line_diff(old: &str, new: &str) -> (Vec, Vec) { + let old_lines: Vec<&str> = old.lines().filter(|l| !l.trim().is_empty()).collect(); + let new_lines: Vec<&str> = new.lines().filter(|l| !l.trim().is_empty()).collect(); + + let mut old_counts = std::collections::HashMap::<&str, usize>::new(); + for l in &old_lines { + *old_counts.entry(l).or_default() += 1; + } + let mut new_counts = std::collections::HashMap::<&str, usize>::new(); + for l in &new_lines { + *new_counts.entry(l).or_default() += 1; + } + + let mut added = Vec::new(); + let mut add_budget = std::collections::HashMap::<&str, usize>::new(); + for (&line, &new_n) in &new_counts { + let old_n = old_counts.get(line).copied().unwrap_or(0); + if new_n > old_n { + add_budget.insert(line, new_n - old_n); + } + } + for l in &new_lines { + if let Some(n) = add_budget.get_mut(l) + && *n > 0 + { + added.push(l.to_string()); + *n -= 1; + } + } + + let mut removed = Vec::new(); + let mut rem_budget = std::collections::HashMap::<&str, usize>::new(); + for (&line, &old_n) in &old_counts { + let new_n = new_counts.get(line).copied().unwrap_or(0); + if old_n > new_n { + rem_budget.insert(line, old_n - new_n); + } + } + for l in &old_lines { + if let Some(n) = rem_budget.get_mut(l) + && *n > 0 + { + removed.push(l.to_string()); + *n -= 1; + } + } + (added, removed) +} + +pub fn format_file_change(label: &str, added: &[&str], removed: &[&str]) -> String { + let limit = 15; + let mut parts = vec![format!("[Config Update] {label} changed:")]; + if !added.is_empty() { + parts.push(" Added:".to_string()); + for line in added.iter().take(limit) { + parts.push(format!(" + {line}")); + } + if added.len() > limit { + parts.push(format!(" ... and {} more lines", added.len() - limit)); + } + } + if !removed.is_empty() { + parts.push(" Removed:".to_string()); + for line in removed.iter().take(limit) { + parts.push(format!(" - {line}")); + } + if removed.len() > limit { + parts.push(format!(" ... and {} more lines", removed.len() - limit)); + } + } + parts.join("\n") +} diff --git a/crates/loopal-context/src/middleware/mod.rs b/crates/loopal-context/src/middleware/mod.rs index 5e8e816..950317c 100644 --- a/crates/loopal-context/src/middleware/mod.rs +++ b/crates/loopal-context/src/middleware/mod.rs @@ -1,2 +1,4 @@ +pub mod config_refresh; +pub mod file_snapshot; pub mod smart_compact; mod smart_compact_llm; diff --git a/crates/loopal-context/tests/suite.rs b/crates/loopal-context/tests/suite.rs index 10c0e61..54a54b8 100644 --- a/crates/loopal-context/tests/suite.rs +++ b/crates/loopal-context/tests/suite.rs @@ -3,8 +3,12 @@ mod compaction_pair_test; #[path = "suite/compaction_test.rs"] mod compaction_test; +#[path = "suite/config_refresh_test.rs"] +mod config_refresh_test; #[path = "suite/degradation_test.rs"] mod degradation_test; +#[path = "suite/file_snapshot_test.rs"] +mod file_snapshot_test; #[path = "suite/fork_test.rs"] mod fork_test; #[path = "suite/ingestion_test.rs"] diff --git a/crates/loopal-context/tests/suite/config_refresh_test.rs b/crates/loopal-context/tests/suite/config_refresh_test.rs new file mode 100644 index 0000000..3a655ca --- /dev/null +++ b/crates/loopal-context/tests/suite/config_refresh_test.rs @@ -0,0 +1,151 @@ +use std::fs; +use std::thread::sleep; +use std::time::Duration; + +use loopal_context::middleware::config_refresh::ConfigRefreshMiddleware; +use loopal_context::middleware::file_snapshot::FileSnapshot; +use loopal_message::{Message, MessageRole}; +use loopal_provider_api::{Middleware, MiddlewareContext}; + +fn wait_for_mtime() { + sleep(Duration::from_millis(1100)); +} + +fn make_ctx(messages: Vec) -> MiddlewareContext { + MiddlewareContext { + messages, + system_prompt: "test".to_string(), + model: "test-model".to_string(), + total_input_tokens: 0, + total_output_tokens: 0, + max_context_tokens: 200_000, + summarization_provider: None, + } +} + +#[tokio::test] +async fn no_change_no_injection() { + let dir = std::env::temp_dir().join("loopal_cr_nochange_v1"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "stable content").unwrap(); + + let snap = FileSnapshot::load(path, "Test"); + let mw = ConfigRefreshMiddleware::new(vec![snap]); + let mut ctx = make_ctx(vec![Message::user("hello")]); + + mw.process(&mut ctx).await.unwrap(); + assert_eq!(ctx.messages.len(), 1); + + let _ = fs::remove_dir_all(&dir); +} + +#[tokio::test] +async fn change_injects_reminder() { + let dir = std::env::temp_dir().join("loopal_cr_change_v1"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "original").unwrap(); + + let snap = FileSnapshot::load(path.clone(), "Project Memory"); + let mw = ConfigRefreshMiddleware::new(vec![snap]); + + wait_for_mtime(); + fs::write(&path, "updated line").unwrap(); + + let mut ctx = make_ctx(vec![Message::user("hello")]); + mw.process(&mut ctx).await.unwrap(); + + assert_eq!(ctx.messages.len(), 2); + let injected = &ctx.messages[1]; + assert_eq!( + injected.role, + MessageRole::User, + "must be User to preserve prefix cache" + ); + let text = injected.text_content(); + assert!(text.contains("system-reminder")); + assert!(text.contains("Project Memory")); + assert!(text.contains("updated line")); + + let _ = fs::remove_dir_all(&dir); +} + +#[tokio::test] +async fn second_call_no_duplicate() { + let dir = std::env::temp_dir().join("loopal_cr_nodup_v1"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "v1").unwrap(); + + let snap = FileSnapshot::load(path.clone(), "Test"); + let mw = ConfigRefreshMiddleware::new(vec![snap]); + + wait_for_mtime(); + fs::write(&path, "v2").unwrap(); + + let mut ctx1 = make_ctx(vec![Message::user("a")]); + mw.process(&mut ctx1).await.unwrap(); + assert_eq!(ctx1.messages.len(), 2); + + let mut ctx2 = make_ctx(vec![Message::user("b")]); + mw.process(&mut ctx2).await.unwrap(); + assert_eq!(ctx2.messages.len(), 1); + + let _ = fs::remove_dir_all(&dir); +} + +#[tokio::test] +async fn system_prompt_unchanged() { + let dir = std::env::temp_dir().join("loopal_cr_sysprompt_v1"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "old").unwrap(); + + let snap = FileSnapshot::load(path.clone(), "Test"); + let mw = ConfigRefreshMiddleware::new(vec![snap]); + + wait_for_mtime(); + fs::write(&path, "new").unwrap(); + + let mut ctx = make_ctx(vec![Message::user("hi")]); + let original_prompt = ctx.system_prompt.clone(); + mw.process(&mut ctx).await.unwrap(); + assert_eq!(ctx.system_prompt, original_prompt); + + let _ = fs::remove_dir_all(&dir); +} + +#[tokio::test] +async fn multiple_files_single_reminder() { + let dir = std::env::temp_dir().join("loopal_cr_multi_v1"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let p1 = dir.join("mem.md"); + let p2 = dir.join("instr.md"); + fs::write(&p1, "mem_old").unwrap(); + fs::write(&p2, "instr_old").unwrap(); + + let snaps = vec![ + FileSnapshot::load(p1.clone(), "Memory"), + FileSnapshot::load(p2.clone(), "Instructions"), + ]; + let mw = ConfigRefreshMiddleware::new(snaps); + + wait_for_mtime(); + fs::write(&p1, "mem_new").unwrap(); + fs::write(&p2, "instr_new").unwrap(); + + let mut ctx = make_ctx(vec![Message::user("hi")]); + mw.process(&mut ctx).await.unwrap(); + assert_eq!(ctx.messages.len(), 2); + let text = ctx.messages[1].text_content(); + assert!(text.contains("Memory")); + assert!(text.contains("Instructions")); + + let _ = fs::remove_dir_all(&dir); +} diff --git a/crates/loopal-context/tests/suite/file_snapshot_test.rs b/crates/loopal-context/tests/suite/file_snapshot_test.rs new file mode 100644 index 0000000..aac6ded --- /dev/null +++ b/crates/loopal-context/tests/suite/file_snapshot_test.rs @@ -0,0 +1,187 @@ +use std::fs; +use std::path::PathBuf; +use std::thread::sleep; +use std::time::Duration; + +use loopal_context::middleware::file_snapshot::{FileSnapshot, format_file_change, line_diff}; + +/// Sleep enough for filesystem mtime to advance (NTFS has ~100ms granularity, +/// but Windows CI can be slow — 1.1s covers HFS+ 1-second granularity too). +fn wait_for_mtime() { + sleep(Duration::from_millis(1100)); +} + +#[test] +fn line_diff_empty_to_content() { + let (added, removed) = line_diff("", "line1\nline2"); + assert_eq!(added, vec!["line1", "line2"]); + assert!(removed.is_empty()); +} + +#[test] +fn line_diff_content_to_empty() { + let (added, removed) = line_diff("line1\nline2", ""); + assert!(added.is_empty()); + assert_eq!(removed, vec!["line1", "line2"]); +} + +#[test] +fn line_diff_no_change() { + let (added, removed) = line_diff("same\nlines", "same\nlines"); + assert!(added.is_empty()); + assert!(removed.is_empty()); +} + +#[test] +fn line_diff_partial_change() { + let (added, removed) = line_diff("keep\nold\nstay", "keep\nnew\nstay"); + assert_eq!(added, vec!["new"]); + assert_eq!(removed, vec!["old"]); +} + +#[test] +fn line_diff_blank_lines_ignored() { + let (added, removed) = line_diff("a\n\nb", "a\n\n\nb"); + assert!(added.is_empty()); + assert!(removed.is_empty()); +} + +#[test] +fn line_diff_preserves_duplicates() { + let (added, removed) = line_diff("a\na\nb", "a\nb"); + assert!(added.is_empty()); + assert_eq!(removed, vec!["a"]); + + let (added2, removed2) = line_diff("a\nb", "a\na\nb"); + assert_eq!(added2, vec!["a"]); + assert!(removed2.is_empty()); +} + +#[test] +fn line_diff_unicode() { + let (added, removed) = line_diff("你好\n世界", "你好\n新行"); + assert_eq!(added, vec!["新行"]); + assert_eq!(removed, vec!["世界"]); +} + +#[test] +fn format_added_only() { + let result = format_file_change("Test", &["new line"], &[]); + assert!(result.contains("[Config Update] Test changed:")); + assert!(result.contains("+ new line")); + assert!(!result.contains("Removed")); +} + +#[test] +fn format_removed_only() { + let result = format_file_change("Test", &[], &["old line"]); + assert!(result.contains("- old line")); + assert!(!result.contains("Added")); +} + +#[test] +fn format_truncates_long_added() { + let lines: Vec<&str> = (0..20).map(|_| "x").collect(); + let result = format_file_change("T", &lines, &[]); + assert!(result.contains("and 5 more lines")); +} + +#[test] +fn snapshot_nonexistent_file_no_change() { + let mut snap = FileSnapshot::load(PathBuf::from("/tmp/loopal_test_noexist_xyz"), "Missing"); + assert!(snap.check_and_refresh().is_none()); +} + +#[test] +fn snapshot_detects_file_creation() { + let dir = std::env::temp_dir().join("loopal_snap_create_v2"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + + let mut snap = FileSnapshot::load(path.clone(), "Test"); + assert!(snap.check_and_refresh().is_none()); + + fs::write(&path, "new content").unwrap(); + let reminder = snap.check_and_refresh(); + assert!(reminder.is_some()); + assert!(reminder.unwrap().contains("new content")); + + let _ = fs::remove_dir_all(&dir); +} + +#[test] +fn snapshot_detects_modification() { + let dir = std::env::temp_dir().join("loopal_snap_modify_v2"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "original").unwrap(); + + let mut snap = FileSnapshot::load(path.clone(), "Test"); + + wait_for_mtime(); + fs::write(&path, "updated").unwrap(); + let reminder = snap.check_and_refresh(); + assert!(reminder.is_some()); + let text = reminder.unwrap(); + assert!(text.contains("+ updated")); + assert!(text.contains("- original")); + + let _ = fs::remove_dir_all(&dir); +} + +#[test] +fn snapshot_detects_deletion() { + let dir = std::env::temp_dir().join("loopal_snap_delete_v2"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "content").unwrap(); + + let mut snap = FileSnapshot::load(path.clone(), "Test"); + fs::remove_file(&path).unwrap(); + + let reminder = snap.check_and_refresh(); + assert!(reminder.is_some()); + assert!(reminder.unwrap().contains("- content")); + + let _ = fs::remove_dir_all(&dir); +} + +#[test] +fn snapshot_stable_returns_none() { + let dir = std::env::temp_dir().join("loopal_snap_stable_v2"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "stable").unwrap(); + + let mut snap = FileSnapshot::load(path, "Test"); + assert!(snap.check_and_refresh().is_none()); + + let _ = fs::remove_dir_all(&dir); +} + +#[test] +fn snapshot_sequential_changes_both_detected() { + let dir = std::env::temp_dir().join("loopal_snap_seq_v2"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join("mem.md"); + fs::write(&path, "v1").unwrap(); + + let mut snap = FileSnapshot::load(path.clone(), "Test"); + + wait_for_mtime(); + fs::write(&path, "v2").unwrap(); + assert!(snap.check_and_refresh().is_some()); + + wait_for_mtime(); + fs::write(&path, "v3").unwrap(); + let r = snap.check_and_refresh(); + assert!(r.is_some()); + assert!(r.unwrap().contains("v3")); + + let _ = fs::remove_dir_all(&dir); +} diff --git a/crates/loopal-runtime/src/agent_loop/context_pipeline.rs b/crates/loopal-runtime/src/agent_loop/context_pipeline.rs new file mode 100644 index 0000000..13d2693 --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/context_pipeline.rs @@ -0,0 +1,25 @@ +use loopal_message::Message; +use loopal_provider_api::MiddlewareContext; +use tracing::warn; + +use super::runner::AgentLoopRunner; + +impl AgentLoopRunner { + /// Run the context middleware pipeline on a working copy of messages. + /// Non-fatal: pipeline errors are logged and swallowed so the LLM call proceeds. + pub async fn run_context_pipeline(&self, messages: &mut Vec) { + let mut ctx = MiddlewareContext { + messages: std::mem::take(messages), + system_prompt: self.params.config.system_prompt.clone(), + model: self.params.config.model().to_string(), + total_input_tokens: self.tokens.input, + total_output_tokens: self.tokens.output, + max_context_tokens: self.params.store.budget().context_window, + summarization_provider: None, + }; + if let Err(e) = self.pipeline.execute(&mut ctx).await { + warn!(error = %e, "context pipeline failed, proceeding without refresh"); + } + *messages = ctx.messages; + } +} diff --git a/crates/loopal-runtime/src/agent_loop/mod.rs b/crates/loopal-runtime/src/agent_loop/mod.rs index 94db917..d22fff7 100644 --- a/crates/loopal-runtime/src/agent_loop/mod.rs +++ b/crates/loopal-runtime/src/agent_loop/mod.rs @@ -1,5 +1,6 @@ pub mod cancel; mod compaction; +mod context_pipeline; mod context_prep; pub mod diff_tracker; pub mod env_context; @@ -16,7 +17,9 @@ mod llm_retry; pub mod loop_detector; pub(crate) mod message_build; pub(crate) mod model_config; +mod params; mod permission; +mod pipeline_setup; mod question_parse; mod resume_session; pub mod rewind; @@ -42,142 +45,17 @@ pub(crate) mod turn_metrics; pub mod turn_observer; mod turn_observer_dispatch; mod turn_telemetry; +mod turn_tool_phase; -use std::collections::HashSet; -use std::sync::Arc; - -use crate::frontend::traits::AgentFrontend; -use loopal_config::HarnessConfig; -use loopal_context::ContextStore; use loopal_error::{AgentOutput, Result}; -use loopal_kernel::Kernel; -use loopal_protocol::InterruptSignal; -use loopal_provider_api::{ModelRouter, ThinkingConfig}; -use loopal_storage::Session; -use loopal_tool_api::{MemoryChannel, PermissionMode}; -use tokio::sync::watch; - -use crate::mode::AgentMode; -use crate::session::SessionManager; - -use finished_guard::FinishedGuard; +pub use params::{ + AgentConfig, AgentDeps, AgentLoopParams, InterruptHandle, LifecycleMode, PlanModeState, +}; pub use runner::AgentLoopRunner; -/// Agent lifecycle mode — determines idle behavior after turn completion. -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] -pub enum LifecycleMode { - /// Wait indefinitely for next input (root agent, long-lived sessions). - #[default] - Persistent, - /// Exit when idle with no pending input (sub-agents, one-shot tasks). - Ephemeral, -} - -/// Agent configuration — mostly immutable, some fields switchable at runtime. -pub struct AgentConfig { - pub lifecycle: LifecycleMode, - pub router: ModelRouter, - pub system_prompt: String, - pub mode: AgentMode, - pub permission_mode: PermissionMode, - /// Tool whitelist filter — if `Some`, only tools in this set are exposed. - pub tool_filter: Option>, - /// Thinking/reasoning configuration (default: Auto). - pub thinking_config: ThinkingConfig, - /// Context tokens cap from settings (0 = auto, use model's context_window). - pub context_tokens_cap: u32, - /// Captured state before entering Plan mode (restored on ExitPlanMode). - /// `Some` iff the agent is in plan mode entered via EnterPlanMode. - pub plan_state: Option, -} - -/// Snapshot of pre-plan-mode state, captured atomically on EnterPlanMode. -pub struct PlanModeState { - pub previous_mode: AgentMode, - pub previous_permission_mode: PermissionMode, - pub tool_filter: HashSet, -} - -impl AgentConfig { - /// The effective main conversation model (respects model_routing.default override). - pub fn model(&self) -> &str { - self.router.resolve(loopal_provider_api::TaskType::Default) - } -} - -impl Default for AgentConfig { - fn default() -> Self { - Self { - lifecycle: LifecycleMode::default(), - router: ModelRouter::new("claude-sonnet-4-20250514".into()), - system_prompt: String::new(), - mode: AgentMode::Act, - permission_mode: PermissionMode::Bypass, - tool_filter: None, - thinking_config: ThinkingConfig::Auto, - context_tokens_cap: 0, - plan_state: None, - } - } -} - -/// Injected dependencies — set once at construction, never modified. -pub struct AgentDeps { - pub kernel: Arc, - pub frontend: Arc, - pub session_manager: SessionManager, -} - -/// Interrupt/cancellation signals shared with the consumer. -pub struct InterruptHandle { - pub signal: InterruptSignal, - pub tx: Arc>, -} - -impl InterruptHandle { - pub fn new() -> Self { - Self { - signal: InterruptSignal::new(), - tx: Arc::new(watch::channel(0u64).0), - } - } -} - -impl Default for InterruptHandle { - fn default() -> Self { - Self::new() - } -} - -pub struct AgentLoopParams { - pub config: AgentConfig, - pub deps: AgentDeps, - pub session: Session, - pub store: ContextStore, - pub interrupt: InterruptHandle, - /// Opaque shared state forwarded to ToolContext for agent tool access. - pub shared: Option>, - /// Memory channel for the Memory tool → Observer sidebar. - pub memory_channel: Option>, - /// Receive end for scheduler-injected messages. - /// When a cron job fires, an `Envelope` with `MessageSource::Scheduled` - /// arrives here and is consumed by `wait_for_input()` alongside normal - /// user input. - pub scheduled_rx: Option>, - /// Auto-mode LLM classifier (active when permission_mode == Auto). - pub auto_classifier: Option>, - /// Harness control parameters (loop thresholds, continuations, etc.). - pub harness: HarnessConfig, - /// Receive end for async hook rewake messages (exit code 2 from background hooks). - pub rewake_rx: Option>, - /// Shared conversation snapshot for fork context — updated before tool execution. - pub message_snapshot: Option>>>, -} +use finished_guard::FinishedGuard; -/// Public wrapper — constructs default observers and runs the loop. -/// -/// A `FinishedGuard` ensures `Finished` is always emitted — even on panic. pub async fn agent_loop(params: AgentLoopParams) -> Result { let mut guard = FinishedGuard::new(params.deps.frontend.clone()); let h = ¶ms.harness; @@ -188,14 +66,15 @@ pub async fn agent_loop(params: AgentLoopParams) -> Result { )), Box::new(diff_tracker::DiffTracker::new(params.deps.frontend.clone())), ]; + let pipeline = pipeline_setup::build_context_pipeline(¶ms.session.cwd); let mut runner = AgentLoopRunner::new(params); runner.observers = observers; + runner.pipeline = pipeline; let result = runner.run().await; guard.disarm(); result } -/// Output from a single turn (LLM → [tools → LLM]* → done). pub(crate) struct TurnOutput { pub output: String, } diff --git a/crates/loopal-runtime/src/agent_loop/params.rs b/crates/loopal-runtime/src/agent_loop/params.rs new file mode 100644 index 0000000..ecddfcd --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/params.rs @@ -0,0 +1,105 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use loopal_config::HarnessConfig; +use loopal_context::ContextStore; +use loopal_kernel::Kernel; +use loopal_protocol::InterruptSignal; +use loopal_provider_api::{ModelRouter, ThinkingConfig}; +use loopal_storage::Session; +use loopal_tool_api::{MemoryChannel, PermissionMode}; +use tokio::sync::watch; + +use crate::frontend::traits::AgentFrontend; +use crate::mode::AgentMode; +use crate::session::SessionManager; + +/// Agent lifecycle mode — determines idle behavior after turn completion. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum LifecycleMode { + #[default] + Persistent, + Ephemeral, +} + +/// Agent configuration — mostly immutable, some fields switchable at runtime. +pub struct AgentConfig { + pub lifecycle: LifecycleMode, + pub router: ModelRouter, + pub system_prompt: String, + pub mode: AgentMode, + pub permission_mode: PermissionMode, + pub tool_filter: Option>, + pub thinking_config: ThinkingConfig, + pub context_tokens_cap: u32, + pub plan_state: Option, +} + +pub struct PlanModeState { + pub previous_mode: AgentMode, + pub previous_permission_mode: PermissionMode, + pub tool_filter: HashSet, +} + +impl AgentConfig { + pub fn model(&self) -> &str { + self.router.resolve(loopal_provider_api::TaskType::Default) + } +} + +impl Default for AgentConfig { + fn default() -> Self { + Self { + lifecycle: LifecycleMode::default(), + router: ModelRouter::new("claude-sonnet-4-20250514".into()), + system_prompt: String::new(), + mode: AgentMode::Act, + permission_mode: PermissionMode::Bypass, + tool_filter: None, + thinking_config: ThinkingConfig::Auto, + context_tokens_cap: 0, + plan_state: None, + } + } +} + +pub struct AgentDeps { + pub kernel: Arc, + pub frontend: Arc, + pub session_manager: SessionManager, +} + +pub struct InterruptHandle { + pub signal: InterruptSignal, + pub tx: Arc>, +} + +impl InterruptHandle { + pub fn new() -> Self { + Self { + signal: InterruptSignal::new(), + tx: Arc::new(watch::channel(0u64).0), + } + } +} + +impl Default for InterruptHandle { + fn default() -> Self { + Self::new() + } +} + +pub struct AgentLoopParams { + pub config: AgentConfig, + pub deps: AgentDeps, + pub session: Session, + pub store: ContextStore, + pub interrupt: InterruptHandle, + pub shared: Option>, + pub memory_channel: Option>, + pub scheduled_rx: Option>, + pub auto_classifier: Option>, + pub harness: HarnessConfig, + pub rewake_rx: Option>, + pub message_snapshot: Option>>>, +} diff --git a/crates/loopal-runtime/src/agent_loop/pipeline_setup.rs b/crates/loopal-runtime/src/agent_loop/pipeline_setup.rs new file mode 100644 index 0000000..a1792bc --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/pipeline_setup.rs @@ -0,0 +1,51 @@ +use std::path::Path; + +use loopal_context::ContextPipeline; +use loopal_context::middleware::config_refresh::ConfigRefreshMiddleware; +use loopal_context::middleware::file_snapshot::FileSnapshot; + +pub(super) fn build_context_pipeline(cwd: &str) -> ContextPipeline { + let cwd = Path::new(cwd); + let mut snapshots = Vec::new(); + + if let Ok(global) = loopal_config::global_config_dir() { + snapshots.push(FileSnapshot::load( + global.join("memory/MEMORY.md"), + "Global Memory", + )); + if let Ok(p) = loopal_config::global_instructions_path() { + snapshots.push(FileSnapshot::load(p, "Global Instructions")); + } + if let Ok(p) = loopal_config::global_local_instructions_path() { + snapshots.push(FileSnapshot::load(p, "Global Local Instructions")); + } + if let Ok(p) = loopal_config::global_settings_path() { + snapshots.push(FileSnapshot::load(p, "Global Settings")); + } + } + + snapshots.push(FileSnapshot::load( + cwd.join(".loopal/memory/MEMORY.md"), + "Project Memory", + )); + snapshots.push(FileSnapshot::load( + loopal_config::project_instructions_path(cwd), + "Project Instructions", + )); + snapshots.push(FileSnapshot::load( + loopal_config::project_local_instructions_path(cwd), + "Local Instructions", + )); + snapshots.push(FileSnapshot::load( + loopal_config::project_settings_path(cwd), + "Project Settings", + )); + snapshots.push(FileSnapshot::load( + loopal_config::project_local_settings_path(cwd), + "Local Settings", + )); + + let mut pipeline = ContextPipeline::new(); + pipeline.add(Box::new(ConfigRefreshMiddleware::new(snapshots))); + pipeline +} diff --git a/crates/loopal-runtime/src/agent_loop/runner.rs b/crates/loopal-runtime/src/agent_loop/runner.rs index 48dbfae..0269458 100644 --- a/crates/loopal-runtime/src/agent_loop/runner.rs +++ b/crates/loopal-runtime/src/agent_loop/runner.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use loopal_context::ContextPipeline; use loopal_error::{AgentOutput, Result}; use loopal_protocol::{AgentEventPayload, AgentStatus, InterruptSignal}; use loopal_tool_api::ToolContext; @@ -23,6 +24,7 @@ pub struct AgentLoopRunner { pub interrupt: InterruptSignal, pub interrupt_tx: Arc>, pub observers: Vec>, + pub pipeline: ContextPipeline, /// Scheduler message receiver — consumed in `wait_for_input()`. pub trigger_rx: Option>, /// Async hook rewake channel — background hooks send Envelopes here. @@ -69,6 +71,7 @@ impl AgentLoopRunner { interrupt, interrupt_tx, observers: Vec::new(), + pipeline: ContextPipeline::new(), trigger_rx, rewake_rx, status: AgentStatus::Starting, diff --git a/crates/loopal-runtime/src/agent_loop/turn_exec.rs b/crates/loopal-runtime/src/agent_loop/turn_exec.rs index a54f1b2..d0e5a66 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_exec.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_exec.rs @@ -5,11 +5,9 @@ use tracing::{info, warn}; use super::TurnOutput; use super::runner::AgentLoopRunner; -use super::streaming_tool_exec::{self, StreamingToolHandle, ToolUseArrived}; use super::turn_context::TurnContext; impl AgentLoopRunner { - /// Inner loop: LLM → [tools → LLM]* → done. pub(super) async fn execute_turn_inner( &mut self, turn_ctx: &mut TurnContext, @@ -25,11 +23,11 @@ impl AgentLoopRunner { } self.check_and_compact().await?; - let working = self.params.store.prepare_for_llm(); + let mut working = self.params.store.prepare_for_llm(); + self.run_context_pipeline(&mut working).await; turn_ctx.metrics.llm_calls += 1; let result = self.stream_llm_with(&working, &turn_ctx.cancel).await?; - // MaxTokens + tool calls = truncated args, discard tools. let truncated = result.stop_reason == StopReason::MaxTokens && !result.tool_uses.is_empty(); if truncated { @@ -41,7 +39,6 @@ impl AgentLoopRunner { &result.tool_uses }; - // Auto-continue: MaxTokens+tools, PauseTurn, or stream truncation. let needs_auto_continue = truncated || result.stop_reason == StopReason::PauseTurn; let stream_truncated = result.stream_error && !turn_ctx.cancel.is_cancelled() @@ -80,7 +77,6 @@ impl AgentLoopRunner { return Ok(TurnOutput { output: last_text }); } - // Stream error (cancel or empty truncation) — record partial text, then exit. if result.stream_error { if !result.assistant_text.is_empty() { let no_tools: &[(String, String, serde_json::Value)] = &[]; @@ -107,7 +103,6 @@ impl AgentLoopRunner { last_text.clone_from(&result.assistant_text); } - // No tool calls — check MaxTokens continuation or stop hooks. if result.tool_uses.is_empty() { if result.stop_reason == StopReason::MaxTokens && continuation_count < self.params.harness.max_auto_continuations @@ -132,66 +127,12 @@ impl AgentLoopRunner { return Ok(TurnOutput { output: last_text }); } - // Observer: on_before_tools — may inject warnings or abort. if self.run_before_tools(turn_ctx, &result.tool_uses).await? { return Ok(TurnOutput { output: last_text }); } - self.update_fork_snapshot(&result.tool_uses); // fork context - - // Start ReadOnly tools early (parallel with permission checks). - let kernel = std::sync::Arc::clone(&self.params.deps.kernel); - let mut early_handle = StreamingToolHandle::empty(); - for (idx, (id, name, input)) in result.tool_uses.iter().enumerate() { - streaming_tool_exec::feed_tool( - &mut early_handle, - &kernel, - &self.tool_ctx, - self.params.config.mode, - &ToolUseArrived { - index: idx, - id: id.clone(), - name: name.clone(), - input: input.clone(), - }, - self.params.deps.frontend.event_emitter(), - ); - } - - let tool_names: Vec<&str> = result - .tool_uses - .iter() - .map(|(_, n, _)| n.as_str()) - .collect(); - info!( - tool_count = result.tool_uses.len(), - ?tool_names, - "tool exec start" - ); - let cancel = &turn_ctx.cancel; - turn_ctx.metrics.tool_calls_requested += result.tool_uses.len() as u32; - let stats = self - .execute_tools_with_early(result.tool_uses.clone(), cancel, early_handle) + self.execute_tool_phase(turn_ctx, result.tool_uses.clone()) .await?; - turn_ctx.metrics.tool_calls_approved += stats.approved; - turn_ctx.metrics.tool_calls_denied += stats.denied; - turn_ctx.metrics.tool_errors += stats.errors; - info!("tool exec complete"); - - let warnings = std::mem::take(&mut turn_ctx.pending_warnings); - self.params.store.append_warnings_to_last_user(warnings); - - self.inject_pending_messages().await; - let result_blocks = self - .params - .store - .messages() - .last() - .map(|m| m.content.as_slice()) - .unwrap_or(&[]); - for obs in &mut self.observers { - obs.on_after_tools(turn_ctx, &result.tool_uses, result_blocks); - } continuation_count = 0; } diff --git a/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs b/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs new file mode 100644 index 0000000..cb0c351 --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs @@ -0,0 +1,62 @@ +use loopal_error::Result; +use tracing::info; + +use super::runner::AgentLoopRunner; +use super::streaming_tool_exec::{self, StreamingToolHandle, ToolUseArrived}; +use super::turn_context::TurnContext; + +impl AgentLoopRunner { + pub(super) async fn execute_tool_phase( + &mut self, + turn_ctx: &mut TurnContext, + tool_uses: Vec<(String, String, serde_json::Value)>, + ) -> Result<()> { + self.update_fork_snapshot(&tool_uses); + + let kernel = std::sync::Arc::clone(&self.params.deps.kernel); + let mut early_handle = StreamingToolHandle::empty(); + for (idx, (id, name, input)) in tool_uses.iter().enumerate() { + streaming_tool_exec::feed_tool( + &mut early_handle, + &kernel, + &self.tool_ctx, + self.params.config.mode, + &ToolUseArrived { + index: idx, + id: id.clone(), + name: name.clone(), + input: input.clone(), + }, + self.params.deps.frontend.event_emitter(), + ); + } + + let tool_names: Vec<&str> = tool_uses.iter().map(|(_, n, _)| n.as_str()).collect(); + info!(tool_count = tool_uses.len(), ?tool_names, "tool exec start"); + let cancel = &turn_ctx.cancel; + turn_ctx.metrics.tool_calls_requested += tool_uses.len() as u32; + let stats = self + .execute_tools_with_early(tool_uses.clone(), cancel, early_handle) + .await?; + turn_ctx.metrics.tool_calls_approved += stats.approved; + turn_ctx.metrics.tool_calls_denied += stats.denied; + turn_ctx.metrics.tool_errors += stats.errors; + info!("tool exec complete"); + + let warnings = std::mem::take(&mut turn_ctx.pending_warnings); + self.params.store.append_warnings_to_last_user(warnings); + + self.inject_pending_messages().await; + let result_blocks = self + .params + .store + .messages() + .last() + .map(|m| m.content.as_slice()) + .unwrap_or(&[]); + for obs in &mut self.observers { + obs.on_after_tools(turn_ctx, &tool_uses, result_blocks); + } + Ok(()) + } +}