From 44a71e20638595fcda1a026af4fd6283230cde01 Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Tue, 21 Apr 2026 18:57:39 -0700 Subject: [PATCH] Keep remote app-server events draining Fixes #18860 --- codex-rs/app-server-client/src/remote.rs | 175 ++--------------------- 1 file changed, 14 insertions(+), 161 deletions(-) diff --git a/codex-rs/app-server-client/src/remote.rs b/codex-rs/app-server-client/src/remote.rs index 51015f8e7b34..6ac40c24001e 100644 --- a/codex-rs/app-server-client/src/remote.rs +++ b/codex-rs/app-server-client/src/remote.rs @@ -20,7 +20,6 @@ use crate::RequestResult; use crate::SHUTDOWN_TIMEOUT; use crate::TypedRequestError; use crate::request_method_name; -use crate::server_notification_requires_delivery; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; @@ -126,7 +125,7 @@ enum RemoteClientCommand { pub struct RemoteAppServerClient { command_tx: mpsc::Sender, - event_rx: mpsc::Receiver, + event_rx: mpsc::UnboundedReceiver, pending_events: VecDeque, worker_handle: tokio::task::JoinHandle<()>, } @@ -195,11 +194,10 @@ impl RemoteAppServerClient { .await?; let (command_tx, mut command_rx) = mpsc::channel::(channel_capacity); - let (event_tx, event_rx) = mpsc::channel::(channel_capacity); + let (event_tx, event_rx) = mpsc::unbounded_channel::(); let worker_handle = tokio::spawn(async move { let mut pending_requests = HashMap::>>::new(); - let mut skipped_events = 0usize; loop { tokio::select! { command = command_rx.recv() => { @@ -231,15 +229,12 @@ impl RemoteAppServerClient { } let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` write failed: {err_message}" ), }, - &mut stream, - ) - .await; + ); break; } } @@ -316,11 +311,8 @@ impl RemoteAppServerClient { app_server_event_from_notification(notification) && let Err(err) = deliver_event( &event_tx, - &mut skipped_events, event, - &mut stream, ) - .await { warn!(%err, "failed to deliver remote app-server event"); break; @@ -333,11 +325,8 @@ impl RemoteAppServerClient { Ok(request) => { if let Err(err) = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::ServerRequest(request), - &mut stream, ) - .await { warn!(%err, "failed to deliver remote app-server server request"); break; @@ -364,15 +353,12 @@ impl RemoteAppServerClient { let err_message = reject_err.to_string(); let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` write failed: {err_message}" ), }, - &mut stream, - ) - .await; + ); break; } } @@ -381,15 +367,12 @@ impl RemoteAppServerClient { Err(err) => { let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}" ), }, - &mut stream, - ) - .await; + ); break; } } @@ -402,15 +385,12 @@ impl RemoteAppServerClient { .unwrap_or_else(|| "connection closed".to_string()); let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` disconnected: {reason}" ), }, - &mut stream, - ) - .await; + ); break; } Some(Ok(Message::Binary(_))) @@ -420,29 +400,23 @@ impl RemoteAppServerClient { Some(Err(err)) => { let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` transport failed: {err}" ), }, - &mut stream, - ) - .await; + ); break; } None => { let _ = deliver_event( &event_tx, - &mut skipped_events, AppServerEvent::Disconnected { message: format!( "remote app server at `{websocket_url}` closed the connection" ), }, - &mut stream, - ) - .await; + ); break; } } @@ -806,100 +780,16 @@ fn app_server_event_from_notification(notification: JSONRPCNotification) -> Opti } } -async fn deliver_event( - event_tx: &mpsc::Sender, - skipped_events: &mut usize, +fn deliver_event( + event_tx: &mpsc::UnboundedSender, event: AppServerEvent, - stream: &mut WebSocketStream>, ) -> IoResult<()> { - if *skipped_events > 0 { - if event_requires_delivery(&event) { - if event_tx - .send(AppServerEvent::Lagged { - skipped: *skipped_events, - }) - .await - .is_err() - { - return Err(IoError::new( - ErrorKind::BrokenPipe, - "remote app-server event consumer channel is closed", - )); - } - *skipped_events = 0; - } else { - match event_tx.try_send(AppServerEvent::Lagged { - skipped: *skipped_events, - }) { - Ok(()) => *skipped_events = 0, - Err(mpsc::error::TrySendError::Full(_)) => { - *skipped_events = (*skipped_events).saturating_add(1); - reject_if_server_request_dropped(stream, &event).await?; - return Ok(()); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - return Err(IoError::new( - ErrorKind::BrokenPipe, - "remote app-server event consumer channel is closed", - )); - } - } - } - } - - if event_requires_delivery(&event) { - event_tx.send(event).await.map_err(|_| { - IoError::new( - ErrorKind::BrokenPipe, - "remote app-server event consumer channel is closed", - ) - })?; - return Ok(()); - } - - match event_tx.try_send(event) { - Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Full(event)) => { - *skipped_events = (*skipped_events).saturating_add(1); - reject_if_server_request_dropped(stream, &event).await - } - Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new( + event_tx.send(event).map_err(|_| { + IoError::new( ErrorKind::BrokenPipe, "remote app-server event consumer channel is closed", - )), - } -} - -async fn reject_if_server_request_dropped( - stream: &mut WebSocketStream>, - event: &AppServerEvent, -) -> IoResult<()> { - let AppServerEvent::ServerRequest(request) = event else { - return Ok(()); - }; - write_jsonrpc_message( - stream, - JSONRPCMessage::Error(JSONRPCError { - error: JSONRPCErrorError { - code: -32001, - message: "remote app-server event queue is full".to_string(), - data: None, - }, - id: request.id().clone(), - }), - "", - ) - .await -} - -fn event_requires_delivery(event: &AppServerEvent) -> bool { - match event { - AppServerEvent::ServerNotification(notification) => { - server_notification_requires_delivery(notification) - } - AppServerEvent::Disconnected { .. } => true, - AppServerEvent::Lagged { .. } | AppServerEvent::ServerRequest(_) => false, - } + ) + }) } fn request_id_from_client_request(request: &ClientRequest) -> RequestId { @@ -945,40 +835,3 @@ async fn write_jsonrpc_message( )) }) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn event_requires_delivery_marks_transcript_and_disconnect_events() { - assert!(event_requires_delivery( - &AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta( - codex_app_server_protocol::AgentMessageDeltaNotification { - thread_id: "thread".to_string(), - turn_id: "turn".to_string(), - item_id: "item".to_string(), - delta: "hello".to_string(), - }, - ),) - )); - assert!(event_requires_delivery( - &AppServerEvent::ServerNotification(ServerNotification::ItemCompleted( - codex_app_server_protocol::ItemCompletedNotification { - thread_id: "thread".to_string(), - turn_id: "turn".to_string(), - item: codex_app_server_protocol::ThreadItem::Plan { - id: "item".to_string(), - text: "step".to_string(), - }, - } - ),) - )); - assert!(event_requires_delivery(&AppServerEvent::Disconnected { - message: "closed".to_string(), - })); - assert!(!event_requires_delivery(&AppServerEvent::Lagged { - skipped: 1 - })); - } -}