Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/agent/channel_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,25 @@ async fn check_worker_limit(state: &ChannelState) -> std::result::Result<(), Age
reserve_worker_slot_local(active_worker_count, &state.channel_id, max_workers)
}

/// Reject spawn if an active worker already has the same task.
///
/// This prevents duplicate workers when the LLM emits multiple spawn_worker
/// calls in a single response and one fails then gets retried on the next
/// depth.
async fn check_duplicate_task(
state: &ChannelState,
task: &str,
) -> std::result::Result<(), AgentError> {
let status = state.status_block.read().await;
if let Some(existing_id) = status.find_duplicate_worker_task(task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small race window here: this is a check-then-spawn, and the task only gets added to the status block later. If spawn_worker tool calls can actually execute concurrently, two identical spawns could still both pass. Reserving the task under a write lock (or a per-channel spawn mutex) would close that gap.

return Err(AgentError::DuplicateWorkerTask {
channel_id: state.channel_id.to_string(),
existing_worker_id: existing_id.to_string(),
});
}
Ok(())
}
Comment on lines +332 to +344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Duplicate check is still racy (TOCTOU) under concurrent spawns.

The check reads status and returns, but task registration happens later. Two parallel requests for the same task can both pass this check before either is registered, resulting in duplicate workers anyway.

🛠️ Proposed fix (atomic check + registration)
-async fn check_duplicate_task(
-    state: &ChannelState,
-    task: &str,
-) -> std::result::Result<(), AgentError> {
-    let status = state.status_block.read().await;
-    if let Some(existing_id) = status.find_duplicate_worker_task(task) {
-        return Err(AgentError::DuplicateWorkerTask {
-            channel_id: state.channel_id.to_string(),
-            existing_worker_id: existing_id.to_string(),
-        });
-    }
-    Ok(())
-}
+async fn check_and_register_worker_task(
+    state: &ChannelState,
+    worker_id: WorkerId,
+    task_to_match: &str,
+    task_to_store: &str,
+) -> std::result::Result<(), AgentError> {
+    let mut status = state.status_block.write().await;
+    if let Some(existing_id) = status.find_duplicate_worker_task(task_to_match) {
+        return Err(AgentError::DuplicateWorkerTask {
+            channel_id: state.channel_id.to_string(),
+            existing_worker_id: existing_id.to_string(),
+        });
+    }
+    status.add_worker(worker_id, task_to_store, false);
+    Ok(())
+}
-    check_duplicate_task(state, &task).await?;
+    check_duplicate_task(state, &task).await?; // optional fast-fail
...
     let worker_id = worker.id;
+    check_and_register_worker_task(state, worker_id, &task, &task).await?;
...
-    {
-        let mut status = state.status_block.write().await;
-        status.add_worker(worker_id, &task, false);
-    }
-    check_duplicate_task(state, &task).await?;
+    check_duplicate_task(state, &task).await?; // optional fast-fail
...
     let worker_id = worker.id;
     let opencode_task = format!("[opencode] {task}");
+    check_and_register_worker_task(state, worker_id, &task, &opencode_task).await?;
...
-    {
-        let mut status = state.status_block.write().await;
-        status.add_worker(worker_id, &opencode_task, false);
-    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/channel_dispatch.rs` around lines 242 - 254, The duplicate-task
check in check_duplicate_task is TOCTOU-racy because it only reads status_block
via status_block.read().await then later registration happens, allowing
concurrent spawns to both pass; fix by making the check-and-register atomic
under the same lock: change check_duplicate_task to acquire
status_block.write().await (or call an existing atomic API on ChannelState) and
inside the write lock call status.find_duplicate_worker_task(task) and, if none,
register a placeholder/claim for the task (or return a token/worker id) so the
subsequent spawn uses that claim; update any callers to use the new
atomic-check-or-claim behavior (references: ChannelState, status_block,
find_duplicate_worker_task, and the task registration logic) so two concurrent
requests cannot both pass the check.


/// Spawn a worker from a ChannelState. Used by the SpawnWorkerTool.
pub async fn spawn_worker_from_state(
state: &ChannelState,
Expand All @@ -332,8 +351,9 @@ pub async fn spawn_worker_from_state(
suggested_skills: &[&str],
) -> std::result::Result<WorkerId, AgentError> {
check_worker_limit(state).await?;
ensure_dispatch_readiness(state, "worker");
let task = task.into();
check_duplicate_task(state, &task).await?;
ensure_dispatch_readiness(state, "worker");

let rc = &state.deps.runtime_config;
let prompt_engine = rc.prompts.load();
Expand Down Expand Up @@ -466,8 +486,9 @@ pub async fn spawn_opencode_worker_from_state(
interactive: bool,
) -> std::result::Result<crate::WorkerId, AgentError> {
check_worker_limit(state).await?;
ensure_dispatch_readiness(state, "opencode_worker");
let task = task.into();
check_duplicate_task(state, &task).await?;
ensure_dispatch_readiness(state, "opencode_worker");
let directory = std::path::PathBuf::from(directory);

let rc = &state.deps.runtime_config;
Expand Down
16 changes: 16 additions & 0 deletions src/agent/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ impl StatusBlock {
self.active_workers.iter().any(|w| w.id == worker_id)
}

/// Check if an active worker already exists with a matching task.
///
/// The status block stores OpenCode tasks with a `[opencode] ` prefix, so
/// comparisons strip that prefix before matching. Returns the existing
/// worker's ID if found.
pub fn find_duplicate_worker_task(&self, task: &str) -> Option<WorkerId> {
let normalized = task.strip_prefix("[opencode] ").unwrap_or(task);
self.active_workers.iter().find_map(|worker| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth trimming/normalizing whitespace (and [opencode] spacing) so retries like task vs task still dedup.

Suggested change
self.active_workers.iter().find_map(|worker| {
let task = task.trim();
let normalized = task
.strip_prefix("[opencode]")
.unwrap_or(task)
.trim_start();
self.active_workers.iter().find_map(|worker| {
let existing_task = worker.task.trim();
let existing = existing_task
.strip_prefix("[opencode]")
.unwrap_or(existing_task)
.trim_start();
(existing == normalized).then_some(worker.id)
})

let existing = worker
.task
.strip_prefix("[opencode] ")
.unwrap_or(&worker.task);
(existing == normalized).then_some(worker.id)
})
}

/// Get the number of active branches.
pub fn active_branch_count(&self) -> usize {
self.active_branches.len()
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ pub enum AgentError {
#[error("max concurrent workers ({max}) reached for channel {channel_id}")]
WorkerLimitReached { channel_id: String, max: usize },

#[error(
"duplicate worker task on channel {channel_id}: worker {existing_worker_id} is already running this task"
)]
DuplicateWorkerTask {
channel_id: String,
existing_worker_id: String,
},

#[error("worker state transition failed: {0}")]
InvalidStateTransition(String),

Expand Down
13 changes: 13 additions & 0 deletions src/tools/spawn_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ impl Tool for SpawnWorkerTool {
let readiness = self.state.deps.runtime_config.work_readiness();
let is_opencode = args.worker_type.as_deref() == Some("opencode");

// Reject if an active worker already has the same task. This prevents
// duplicate workers when the LLM emits multiple spawn_worker calls in
// a single response and one fails/retries.
{
let status = self.state.status_block.read().await;
if let Some(existing_id) = status.find_duplicate_worker_task(&args.task) {
return Err(SpawnWorkerError(format!(
"a worker is already running this task (worker {existing_id}). \
Wait for it to complete or cancel it before spawning another."
)));
}
}

let worker_id = if is_opencode {
let directory = args.directory.as_deref().ok_or_else(|| {
SpawnWorkerError("directory is required for opencode workers".into())
Expand Down