Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 3 additions & 69 deletions desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use huddle::{
speak_agent_message, start_huddle, start_stt_pipeline,
};
use managed_agents::{
ensure_nest, find_managed_agent_mut, kill_stale_tracked_processes, load_managed_agents,
save_managed_agents, start_managed_agent_process, sync_managed_agent_processes, BackendKind,
ManagedAgentProcess,
ensure_nest, kill_stale_tracked_processes, load_managed_agents,
restore_managed_agents_on_launch, save_managed_agents, sync_managed_agent_processes,
BackendKind, ManagedAgentProcess,
};
use std::sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -31,72 +31,6 @@ use std::sync::{
use tauri::{http, Emitter, Manager, RunEvent};
use tauri_plugin_window_state::StateFlags;

fn restore_managed_agents_on_launch(
app: &tauri::AppHandle,
shutdown_started: &AtomicBool,
) -> Result<(), String> {
if shutdown_started.load(Ordering::SeqCst) {
return Ok(());
}

let state = app.state::<AppState>();
let _store_guard = state
.managed_agents_store_lock
.lock()
.map_err(|error| error.to_string())?;

if shutdown_started.load(Ordering::SeqCst) {
return Ok(());
}

let mut records = load_managed_agents(app)?;
let mut runtimes = state
.managed_agent_processes
.lock()
.map_err(|error| error.to_string())?;
let mut changed = sync_managed_agent_processes(&mut records, &mut runtimes);
changed |= kill_stale_tracked_processes(&mut records, &runtimes);

// PID-file sweep: kill any orphaned agent processes we have receipts for
// that weren’t tracked in records (e.g. escaped process groups, double-forked).
let tracked_pids: Vec<u32> = records
.iter()
.filter_map(|r| r.runtime_pid)
.chain(runtimes.values().map(|rt| rt.child.id()))
.collect();
managed_agents::sweep_orphaned_agent_processes(app, &tracked_pids);

let pubkeys_to_restore = records
.iter()
.filter(|record| record.start_on_app_launch && record.backend == BackendKind::Local)
.map(|record| record.pubkey.clone())
.collect::<Vec<_>>();

for pubkey in pubkeys_to_restore {
if shutdown_started.load(Ordering::SeqCst) {
break;
}

let record = find_managed_agent_mut(&mut records, &pubkey)?;
match start_managed_agent_process(app, record, &mut runtimes) {
Ok(()) => {
changed = true;
}
Err(error) => {
record.updated_at = util::now_iso();
record.last_error = Some(error);
changed = true;
}
}
}

if changed {
save_managed_agents(app, &records)?;
}

Ok(())
}

fn shutdown_managed_agents(app: &tauri::AppHandle) -> Result<(), String> {
let state = app.state::<AppState>();
let _store_guard = state
Expand Down
29 changes: 29 additions & 0 deletions desktop/src-tauri/src/managed_agents/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,36 @@ fn resolve_workspace_command(command: &str, app: Option<&AppHandle>) -> Option<P
.find(|candidate| candidate.exists())
}

/// Resolve a command to an absolute path, caching results for the app lifetime.
/// The cache eliminates redundant login-shell spawns when multiple agents share
/// the same binaries (e.g. `npx`, `uvx`).
pub fn resolve_command(command: &str, app: Option<&AppHandle>) -> Option<PathBuf> {
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};

static CACHE: OnceLock<Mutex<HashMap<String, Option<PathBuf>>>> = OnceLock::new();
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));

// Fast path: return cached result without allocating a key.
if let Ok(guard) = cache.lock() {
if let Some(result) = guard.get(command) {
return result.clone();
}
}

// Slow path: resolve and cache.
let result = resolve_command_uncached(command, app);

if result.is_some() {
if let Ok(mut guard) = cache.lock() {
guard.insert(command.to_string(), result.clone());
}
}

result
}

fn resolve_command_uncached(command: &str, app: Option<&AppHandle>) -> Option<PathBuf> {
if let Some(path) = resolve_workspace_command(command, app) {
return Some(path);
}
Expand Down
2 changes: 2 additions & 0 deletions desktop/src-tauri/src/managed_agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod nest;
mod persona_avatars;
mod persona_card;
mod personas;
mod restore;
mod runtime;
mod storage;
mod teams;
Expand All @@ -14,6 +15,7 @@ pub use discovery::*;
pub use nest::*;
pub use persona_card::*;
pub use personas::*;
pub use restore::*;
pub use runtime::*;
pub use storage::*;
pub use teams::*;
Expand Down
148 changes: 148 additions & 0 deletions desktop/src-tauri/src/managed_agents/restore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use super::{
find_managed_agent_mut, kill_stale_tracked_processes, load_managed_agents, save_managed_agents,
spawn_agent_child, sync_managed_agent_processes, BackendKind, ManagedAgentProcess,
};
use crate::app_state::AppState;
use crate::util;
use std::sync::atomic::{AtomicBool, Ordering};
use tauri::Manager;

/// Restore managed agents that were running before the app was closed.
///
/// Split into three phases to minimise lock contention with the frontend:
/// A (under lock): sync process state, cleanup, collect agents to start
/// B (no locks): resolve commands and spawn processes in parallel
/// C (re-lock): write back PIDs and status to records on disk
pub fn restore_managed_agents_on_launch(
app: &tauri::AppHandle,
shutdown_started: &AtomicBool,
) -> Result<(), String> {
if shutdown_started.load(Ordering::SeqCst) {
return Ok(());
}

let state = app.state::<AppState>();

// ── Phase A (under lock): housekeeping + collect agents to restore ──
let agents_to_start: Vec<super::ManagedAgentRecord>;
{
let _store_guard = state
.managed_agents_store_lock
.lock()
.map_err(|error| error.to_string())?;

if shutdown_started.load(Ordering::SeqCst) {
return Ok(());
}

let mut records = load_managed_agents(app)?;
let mut runtimes = state
.managed_agent_processes
.lock()
.map_err(|error| error.to_string())?;
let mut changed = sync_managed_agent_processes(&mut records, &mut runtimes);
changed |= kill_stale_tracked_processes(&mut records, &runtimes);

let tracked_pids: Vec<u32> = records
.iter()
.filter_map(|r| r.runtime_pid)
.chain(runtimes.values().map(|rt| rt.child.id()))
.collect();
super::sweep_orphaned_agent_processes(app, &tracked_pids);

let candidates: Vec<String> = records
.iter()
.filter(|record| record.start_on_app_launch && record.backend == BackendKind::Local)
.map(|record| record.pubkey.clone())
.collect();

let mut to_start = Vec::new();
for pubkey in &candidates {
if let Some(runtime) = runtimes.get_mut(pubkey) {
if runtime.child.try_wait().ok().flatten().is_none() {
continue;
}
}
if let Some(record) = records.iter().find(|r| r.pubkey == *pubkey) {
if let Some(pid) = record.runtime_pid {
if super::process_is_running(pid) {
continue;
}
}
to_start.push(record.clone());
}
}
agents_to_start = to_start;

if changed {
save_managed_agents(app, &records)?;
}
}

if agents_to_start.is_empty() {
return Ok(());
}

// ── Phase B (no locks): resolve commands and spawn processes in parallel ──
let spawn_results: Vec<(
String,
Result<(std::process::Child, std::path::PathBuf), String>,
)> = std::thread::scope(|scope| {
let handles: Vec<_> = agents_to_start
.iter()
.filter(|_| !shutdown_started.load(Ordering::SeqCst))
.map(|record| {
let pubkey = record.pubkey.clone();
let handle = scope.spawn(move || {
let result = spawn_agent_child(app, record);
(pubkey, result)
});
handle
})
.collect();

handles.into_iter().map(|h| h.join().unwrap()).collect()
});

if spawn_results.is_empty() {
return Ok(());
}

// ── Phase C (re-acquire lock): write back PIDs and status to records ──
let _store_guard = state
.managed_agents_store_lock
.lock()
.map_err(|error| error.to_string())?;
let mut records = load_managed_agents(app)?;
let mut runtimes = state
.managed_agent_processes
.lock()
.map_err(|error| error.to_string())?;

for (pubkey, result) in spawn_results {
let record = match find_managed_agent_mut(&mut records, &pubkey) {
Ok(r) => r,
Err(_) => continue,
};
match result {
Ok((child, log_path)) => {
let now = util::now_iso();
record.updated_at = now.clone();
record.runtime_pid = Some(child.id());
record.last_started_at = Some(now);
record.last_stopped_at = None;
record.last_exit_code = None;
record.last_error = None;
runtimes.insert(pubkey, ManagedAgentProcess { child, log_path });
}
Err(error) => {
record.updated_at = util::now_iso();
record.last_error = Some(error);
}
}
}

save_managed_agents(app, &records)?;

Ok(())
}
Loading