diff --git a/src/agent/channel_dispatch.rs b/src/agent/channel_dispatch.rs index ab88dc074..798056c9e 100644 --- a/src/agent/channel_dispatch.rs +++ b/src/agent/channel_dispatch.rs @@ -324,6 +324,25 @@ async fn check_worker_limit(state: &ChannelState) -> std::result::Result<(), Age reserve_worker_slot_local(active_worker_count, &state.channel_id, max_workers) } +/// Reject spawn if an active worker already has the same task. +/// +/// This prevents duplicate workers when the LLM emits multiple spawn_worker +/// calls in a single response and one fails then gets retried on the next +/// depth. +async fn check_duplicate_task( + state: &ChannelState, + task: &str, +) -> std::result::Result<(), AgentError> { + let status = state.status_block.read().await; + if let Some(existing_id) = status.find_duplicate_worker_task(task) { + return Err(AgentError::DuplicateWorkerTask { + channel_id: state.channel_id.to_string(), + existing_worker_id: existing_id.to_string(), + }); + } + Ok(()) +} + /// Spawn a worker from a ChannelState. Used by the SpawnWorkerTool. pub async fn spawn_worker_from_state( state: &ChannelState, @@ -332,8 +351,9 @@ pub async fn spawn_worker_from_state( suggested_skills: &[&str], ) -> std::result::Result { check_worker_limit(state).await?; - ensure_dispatch_readiness(state, "worker"); let task = task.into(); + check_duplicate_task(state, &task).await?; + ensure_dispatch_readiness(state, "worker"); let rc = &state.deps.runtime_config; let prompt_engine = rc.prompts.load(); @@ -466,8 +486,9 @@ pub async fn spawn_opencode_worker_from_state( interactive: bool, ) -> std::result::Result { check_worker_limit(state).await?; - ensure_dispatch_readiness(state, "opencode_worker"); let task = task.into(); + check_duplicate_task(state, &task).await?; + ensure_dispatch_readiness(state, "opencode_worker"); let directory = std::path::PathBuf::from(directory); let rc = &state.deps.runtime_config; diff --git a/src/agent/status.rs b/src/agent/status.rs index b19ee6794..c03387775 100644 --- a/src/agent/status.rs +++ b/src/agent/status.rs @@ -263,6 +263,22 @@ impl StatusBlock { self.active_workers.iter().any(|w| w.id == worker_id) } + /// Check if an active worker already exists with a matching task. + /// + /// The status block stores OpenCode tasks with a `[opencode] ` prefix, so + /// comparisons strip that prefix before matching. Returns the existing + /// worker's ID if found. + pub fn find_duplicate_worker_task(&self, task: &str) -> Option { + let normalized = task.strip_prefix("[opencode] ").unwrap_or(task); + self.active_workers.iter().find_map(|worker| { + let existing = worker + .task + .strip_prefix("[opencode] ") + .unwrap_or(&worker.task); + (existing == normalized).then_some(worker.id) + }) + } + /// Get the number of active branches. pub fn active_branch_count(&self) -> usize { self.active_branches.len() diff --git a/src/error.rs b/src/error.rs index 952fa34dc..9ae5b503c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -187,6 +187,14 @@ pub enum AgentError { #[error("max concurrent workers ({max}) reached for channel {channel_id}")] WorkerLimitReached { channel_id: String, max: usize }, + #[error( + "duplicate worker task on channel {channel_id}: worker {existing_worker_id} is already running this task" + )] + DuplicateWorkerTask { + channel_id: String, + existing_worker_id: String, + }, + #[error("worker state transition failed: {0}")] InvalidStateTransition(String), diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 814fc0bba..9f9b9df23 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -147,6 +147,19 @@ impl Tool for SpawnWorkerTool { let readiness = self.state.deps.runtime_config.work_readiness(); let is_opencode = args.worker_type.as_deref() == Some("opencode"); + // Reject if an active worker already has the same task. This prevents + // duplicate workers when the LLM emits multiple spawn_worker calls in + // a single response and one fails/retries. + { + let status = self.state.status_block.read().await; + if let Some(existing_id) = status.find_duplicate_worker_task(&args.task) { + return Err(SpawnWorkerError(format!( + "a worker is already running this task (worker {existing_id}). \ + Wait for it to complete or cancel it before spawning another." + ))); + } + } + let worker_id = if is_opencode { let directory = args.directory.as_deref().ok_or_else(|| { SpawnWorkerError("directory is required for opencode workers".into())