From 9e4de3e66445ee61e2ea728a5d55dac2c7523dbe Mon Sep 17 00:00:00 2001 From: Ciaran Ashton Date: Thu, 5 Mar 2026 19:23:36 +0000 Subject: [PATCH 1/3] fix(channel): use worker_handles as guard for WorkerComplete events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit active_workers (HashMap) is never populated because the Worker struct is consumed by worker.run() when spawned. This caused every WorkerComplete event to be silently dropped — the guard check at handle_event always returned early since active_workers was empty. Switch to worker_handles (which IS populated by spawn_worker_task) as the source of truth for whether a worker is active. This matches how cancel_worker_with_reason already treats worker_handles as sufficient. Co-Authored-By: Claude Opus 4.6 --- src/agent/channel.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 1790bf949..8ce9875cb 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -2244,15 +2244,15 @@ impl Channel { success, .. } => { - let mut workers = self.state.active_workers.write().await; - if workers.remove(worker_id).is_none() { + // Use worker_handles as the source of truth for active workers. + // (active_workers is never populated because Worker is consumed by .run()) + if self.state.worker_handles.write().await.remove(worker_id).is_none() { return Ok(()); } - drop(workers); run_logger.log_worker_completed(*worker_id, result, *success); - self.state.worker_handles.write().await.remove(worker_id); + self.state.active_workers.write().await.remove(worker_id); self.state.worker_inputs.write().await.remove(worker_id); if *notify { From 2445032c15cd0ae7e0708cda83fce5b05d5dcec6 Mon Sep 17 00:00:00 2001 From: Ciaran Ashton Date: Thu, 5 Mar 2026 19:43:26 +0000 Subject: [PATCH 2/3] test(channel): add tests for WorkerComplete event filtering and channel_id propagation Co-Authored-By: Claude Opus 4.6 --- src/agent/channel.rs | 45 +++++++++++++++++++++++++++++++++++ src/agent/channel_dispatch.rs | 36 ++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 8ce9875cb..90f4c2b42 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -2619,4 +2619,49 @@ mod tests { assert!(should_process_event_for_channel(&event, &channel_id)); } + + #[test] + fn worker_complete_event_matches_own_channel() { + let channel_id: ChannelId = Arc::from("channel-a"); + let event = ProcessEvent::WorkerComplete { + agent_id: Arc::from("agent"), + worker_id: uuid::Uuid::new_v4(), + channel_id: Some(channel_id.clone()), + result: "done".to_string(), + notify: true, + success: true, + }; + + assert!(should_process_event_for_channel(&event, &channel_id)); + } + + #[test] + fn worker_complete_event_ignored_for_other_channel() { + let channel_id: ChannelId = Arc::from("channel-a"); + let event = ProcessEvent::WorkerComplete { + agent_id: Arc::from("agent"), + worker_id: uuid::Uuid::new_v4(), + channel_id: Some(Arc::from("channel-b")), + result: "done".to_string(), + notify: true, + success: true, + }; + + assert!(!should_process_event_for_channel(&event, &channel_id)); + } + + #[test] + fn worker_complete_event_ignored_when_no_channel() { + let channel_id: ChannelId = Arc::from("channel-a"); + let event = ProcessEvent::WorkerComplete { + agent_id: Arc::from("agent"), + worker_id: uuid::Uuid::new_v4(), + channel_id: None, + result: "done".to_string(), + notify: true, + success: true, + }; + + assert!(!should_process_event_for_channel(&event, &channel_id)); + } } diff --git a/src/agent/channel_dispatch.rs b/src/agent/channel_dispatch.rs index 81053bc3d..15a43c3f2 100644 --- a/src/agent/channel_dispatch.rs +++ b/src/agent/channel_dispatch.rs @@ -739,4 +739,40 @@ mod tests { other => panic!("unexpected event: {other:?}"), } } + + #[tokio::test] + async fn spawn_worker_task_carries_channel_id() { + let (event_tx, mut event_rx) = broadcast::channel(8); + let worker_id: WorkerId = Uuid::new_v4(); + let channel_id: crate::ChannelId = Arc::from("test-channel"); + + let handle = spawn_worker_task( + worker_id, + event_tx, + Arc::::from("agent"), + Some(channel_id.clone()), + None, + async { Ok::("result".to_string()) }, + ); + + let event = tokio::time::timeout(Duration::from_secs(2), event_rx.recv()) + .await + .expect("worker completion event should be delivered") + .expect("broadcast receive should succeed"); + handle.await.expect("worker task should join cleanly"); + + match event { + ProcessEvent::WorkerComplete { + channel_id: event_channel_id, + worker_id: completed_worker_id, + success, + .. + } => { + assert_eq!(completed_worker_id, worker_id); + assert_eq!(event_channel_id, Some(channel_id)); + assert!(success); + } + other => panic!("unexpected event: {other:?}"), + } + } } From b99c2735b8c148e4e7f8835dd19832de02e38bc2 Mon Sep 17 00:00:00 2001 From: James Pine Date: Thu, 5 Mar 2026 12:16:15 -0800 Subject: [PATCH 3/3] fix: cargo fmt --- src/agent/channel.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 90f4c2b42..fbc0d23cb 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -2246,7 +2246,14 @@ impl Channel { } => { // Use worker_handles as the source of truth for active workers. // (active_workers is never populated because Worker is consumed by .run()) - if self.state.worker_handles.write().await.remove(worker_id).is_none() { + if self + .state + .worker_handles + .write() + .await + .remove(worker_id) + .is_none() + { return Ok(()); }