From c6229038a490e99ab964fcac47ff055fbe003765 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 1 Mar 2026 16:20:33 -0800 Subject: [PATCH] fix: prevent duplicate worker spawns when LLM emits parallel spawn_worker calls When the LLM returns multiple spawn_worker tool calls in a single response and one fails (e.g. missing directory param), the retry spawns a second worker for the same task since the first is already running. Add task dedup checks in both the tool layer (clear error for the LLM) and the dispatch layer (authoritative guard) using the status block's active worker list. --- src/agent/channel_dispatch.rs | 25 +++++++++++++++++++++++-- src/agent/status.rs | 16 ++++++++++++++++ src/error.rs | 8 ++++++++ src/tools/spawn_worker.rs | 13 +++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/agent/channel_dispatch.rs b/src/agent/channel_dispatch.rs index 42b24aed7..a71924541 100644 --- a/src/agent/channel_dispatch.rs +++ b/src/agent/channel_dispatch.rs @@ -234,6 +234,25 @@ async fn check_worker_limit(state: &ChannelState) -> std::result::Result<(), Age Ok(()) } +/// Reject spawn if an active worker already has the same task. +/// +/// This prevents duplicate workers when the LLM emits multiple spawn_worker +/// calls in a single response and one fails then gets retried on the next +/// depth. +async fn check_duplicate_task( + state: &ChannelState, + task: &str, +) -> std::result::Result<(), AgentError> { + let status = state.status_block.read().await; + if let Some(existing_id) = status.find_duplicate_worker_task(task) { + return Err(AgentError::DuplicateWorkerTask { + channel_id: state.channel_id.to_string(), + existing_worker_id: existing_id.to_string(), + }); + } + Ok(()) +} + /// Spawn a worker from a ChannelState. Used by the SpawnWorkerTool. pub async fn spawn_worker_from_state( state: &ChannelState, @@ -242,8 +261,9 @@ pub async fn spawn_worker_from_state( suggested_skills: &[&str], ) -> std::result::Result { check_worker_limit(state).await?; - ensure_dispatch_readiness(state, "worker"); let task = task.into(); + check_duplicate_task(state, &task).await?; + ensure_dispatch_readiness(state, "worker"); let rc = &state.deps.runtime_config; let prompt_engine = rc.prompts.load(); @@ -376,8 +396,9 @@ pub async fn spawn_opencode_worker_from_state( interactive: bool, ) -> std::result::Result { check_worker_limit(state).await?; - ensure_dispatch_readiness(state, "opencode_worker"); let task = task.into(); + check_duplicate_task(state, &task).await?; + ensure_dispatch_readiness(state, "opencode_worker"); let directory = std::path::PathBuf::from(directory); let rc = &state.deps.runtime_config; diff --git a/src/agent/status.rs b/src/agent/status.rs index bb321a75d..c8b94b724 100644 --- a/src/agent/status.rs +++ b/src/agent/status.rs @@ -249,6 +249,22 @@ impl StatusBlock { self.active_workers.iter().any(|w| w.id == worker_id) } + /// Check if an active worker already exists with a matching task. + /// + /// The status block stores OpenCode tasks with a `[opencode] ` prefix, so + /// comparisons strip that prefix before matching. Returns the existing + /// worker's ID if found. + pub fn find_duplicate_worker_task(&self, task: &str) -> Option { + let normalized = task.strip_prefix("[opencode] ").unwrap_or(task); + self.active_workers.iter().find_map(|worker| { + let existing = worker + .task + .strip_prefix("[opencode] ") + .unwrap_or(&worker.task); + (existing == normalized).then_some(worker.id) + }) + } + /// Get the number of active branches. pub fn active_branch_count(&self) -> usize { self.active_branches.len() diff --git a/src/error.rs b/src/error.rs index 952fa34dc..9ae5b503c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -187,6 +187,14 @@ pub enum AgentError { #[error("max concurrent workers ({max}) reached for channel {channel_id}")] WorkerLimitReached { channel_id: String, max: usize }, + #[error( + "duplicate worker task on channel {channel_id}: worker {existing_worker_id} is already running this task" + )] + DuplicateWorkerTask { + channel_id: String, + existing_worker_id: String, + }, + #[error("worker state transition failed: {0}")] InvalidStateTransition(String), diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 8edc4ccc5..2b1377572 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -147,6 +147,19 @@ impl Tool for SpawnWorkerTool { let readiness = self.state.deps.runtime_config.work_readiness(); let is_opencode = args.worker_type.as_deref() == Some("opencode"); + // Reject if an active worker already has the same task. This prevents + // duplicate workers when the LLM emits multiple spawn_worker calls in + // a single response and one fails/retries. + { + let status = self.state.status_block.read().await; + if let Some(existing_id) = status.find_duplicate_worker_task(&args.task) { + return Err(SpawnWorkerError(format!( + "a worker is already running this task (worker {existing_id}). \ + Wait for it to complete or cancel it before spawning another." + ))); + } + } + let worker_id = if is_opencode { let directory = args.directory.as_deref().ok_or_else(|| { SpawnWorkerError("directory is required for opencode workers".into())