Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2976,6 +2976,10 @@ impl Session {
self.mailbox_rx.lock().await.has_pending_trigger_turn()
}

pub(crate) async fn has_pending_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending()
}

#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
Expand Down Expand Up @@ -3075,7 +3079,7 @@ impl Session {
if !accepts_mailbox_delivery {
return false;
}
self.mailbox_rx.lock().await.has_pending()
self.has_pending_mailbox_items().await
}

pub async fn interrupt_task(self: &Arc<Self>) {
Expand Down
49 changes: 14 additions & 35 deletions codex-rs/core/src/tools/handlers/multi_agents_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2851,7 +2851,7 @@ async fn multi_agent_v2_wait_agent_returns_summary_for_mailbox_activity() {
}

#[tokio::test]
async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
async fn multi_agent_v2_wait_agent_returns_for_already_queued_mail() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
Expand Down Expand Up @@ -2895,47 +2895,26 @@ async fn multi_agent_v2_wait_agent_waits_for_new_mail_after_start() {
.agent_path
.expect("worker path");

session.enqueue_mailbox_communication(InterAgentCommunication::new(
worker_path.clone(),
AgentPath::root(),
Vec::new(),
"already queued".to_string(),
/*trigger_turn*/ false,
));

let wait_task = tokio::spawn({
let session = session.clone();
let turn = turn.clone();
async move {
WaitAgentHandlerV2
.handle(invocation(
session,
turn,
"wait_agent",
function_payload(json!({"timeout_ms": 1000})),
))
.await
}
});
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
!wait_task.is_finished(),
"mail already queued before wait should not wake wait_agent"
);

session.enqueue_mailbox_communication(InterAgentCommunication::new(
worker_path,
AgentPath::root(),
Vec::new(),
"new mail".to_string(),
"already queued".to_string(),
/*trigger_turn*/ false,
));

let output = wait_task
.await
.expect("wait task should join")
.expect("wait_agent should succeed");
let output = timeout(
Duration::from_millis(500),
WaitAgentHandlerV2.handle(invocation(
session,
turn,
"wait_agent",
function_payload(json!({"timeout_ms": 1000})),
)),
)
.await
.expect("already queued mail should complete wait_agent immediately")
.expect("wait_agent should succeed");
let (content, success) = expect_text_output(output);
let result: crate::tools::handlers::multi_agents_v2::wait::WaitAgentResult =
serde_json::from_str(&content).expect("wait_agent result should be json");
Expand Down
8 changes: 6 additions & 2 deletions codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ impl ToolHandler for Handler {
)
.await;

let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
let timed_out = !wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await;
let timed_out = if session.has_pending_mailbox_items().await {
false
} else {
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
!wait_for_mailbox_change(&mut mailbox_seq_rx, deadline).await
};
let result = WaitAgentResult::from_timed_out(timed_out);

session
Expand Down
Loading