From aa8ca06e83cf2a3dc22f86f37caec6cc2d9533ea Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 22 Apr 2026 10:49:48 +0100 Subject: [PATCH] fix: mailbox wait timeout --- codex-rs/core/src/session/mod.rs | 6 ++- .../src/tools/handlers/multi_agents_tests.rs | 49 ++++++------------- .../tools/handlers/multi_agents_v2/wait.rs | 8 ++- 3 files changed, 25 insertions(+), 38 deletions(-) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 2efb59cdfae3..d06666dd195d 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -2976,6 +2976,10 @@ impl Session { self.mailbox_rx.lock().await.has_pending_trigger_turn() } + pub(crate) async fn has_pending_mailbox_items(&self) -> bool { + self.mailbox_rx.lock().await.has_pending() + } + #[expect( clippy::await_holding_invalid_type, reason = "active turn checks and turn state updates must remain atomic" @@ -3075,7 +3079,7 @@ impl Session { if !accepts_mailbox_delivery { return false; } - self.mailbox_rx.lock().await.has_pending() + self.has_pending_mailbox_items().await } pub async fn interrupt_task(self: &Arc) { diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index b886664e358a..d7117dcc4dfc 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -2851,7 +2851,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() { } #[tokio::test] -async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() { +async fn multi_agent_v2_wait_agent_returns_for_already_queued_mail() { let (mut session, mut turn) = make_session_and_context().await; let manager = thread_manager(); let root = manager @@ -2895,47 +2895,26 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() { .agent_path .expect("worker path"); - session.enqueue_mailbox_communication(InterAgentCommunication::new( - worker_path.clone(), - AgentPath::root(), - Vec::new(), - "already queued".to_string(), - /*trigger_turn*/ false, - )); - - let wait_task = tokio::spawn({ - let session = session.clone(); - let turn = turn.clone(); - async move { - WaitAgentHandlerV2 - .handle(invocation( - session, - turn, - "wait_agent", - function_payload(json!({"timeout_ms": 1000})), - )) - .await - } - }); - tokio::task::yield_now().await; - tokio::time::sleep(Duration::from_millis(50)).await; - assert!( - !wait_task.is_finished(), - "mail already queued before wait should not wake wait_agent" - ); - session.enqueue_mailbox_communication(InterAgentCommunication::new( worker_path, AgentPath::root(), Vec::new(), - "new mail".to_string(), + "already queued".to_string(), /*trigger_turn*/ false, )); - let output = wait_task - .await - .expect("wait task should join") - .expect("wait_agent should succeed"); + let output = timeout( + Duration::from_millis(500), + WaitAgentHandlerV2.handle(invocation( + session, + turn, + "wait_agent", + function_payload(json!({"timeout_ms": 1000})), + )), + ) + .await + .expect("already queued mail should complete wait_agent immediately") + .expect("wait_agent should succeed"); let (content, success) = expect_text_output(output); let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult = serde_json::from_str(&content).expect("wait_agent result should be json"); diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs index 777fe1c2e764..e50c6cab2334 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs @@ -52,8 +52,12 @@ impl ToolHandler for Handler { ) .await; - let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); - let timed_out = !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await; + let timed_out = if session.has_pending_mailbox_items().await { + false + } else { + let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64); + !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await + }; let result = WaitAgentResult::from_timed_out(timed_out); session