From a66366b7804b454118814c4729eb0c12d168c979 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 23 Mar 2026 12:28:11 -0700 Subject: [PATCH 1/4] app-server: Add back pressure and batching to `command/exec` --- .../app-server/src/codex_message_processor.rs | 1 + codex-rs/app-server/src/command_exec.rs | 15 ++- codex-rs/app-server/src/in_process.rs | 11 ++- codex-rs/app-server/src/lib.rs | 3 +- .../src/message_processor/tracing_tests.rs | 2 + codex-rs/app-server/src/outgoing_message.rs | 91 ++++++++++++++++++- codex-rs/app-server/src/transport.rs | 86 +++++++++++------- 7 files changed, 170 insertions(+), 39 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 8917f392dd0..68672ac9a78 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -8890,6 +8890,7 @@ mod tests { request_id: sent_request_id, .. }), + .. } = request_message else { panic!("expected tool request to be sent to the subscribed connection"); diff --git a/codex-rs/app-server/src/command_exec.rs b/codex-rs/app-server/src/command_exec.rs index e1a9cb3def1..8055a7beda2 100644 --- a/codex-rs/app-server/src/command_exec.rs +++ b/codex-rs/app-server/src/command_exec.rs @@ -42,6 +42,7 @@ use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; +const OUTPUT_CHUNK_SIZE_HINT: usize = 64 * 1024; #[derive(Clone)] pub(crate) struct CommandExecManager { @@ -577,13 +578,19 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa let mut buffer: Vec = Vec::new(); let mut observed_num_bytes = 0usize; loop { - let chunk = tokio::select! { + let mut chunk = tokio::select! { chunk = output_rx.recv() => match chunk { Some(chunk) => chunk, None => break, }, _ = stdio_timeout_rx.wait_for(|&v| v) => break, }; + // Individual chunks are at most 8KiB, so overshooting a bit is acceptable. + while chunk.len() < OUTPUT_CHUNK_SIZE_HINT + && let Ok(next_chunk) = output_rx.try_recv() + { + chunk.extend_from_slice(&next_chunk); + } let capped_chunk = match output_bytes_cap { Some(output_bytes_cap) => { let capped_chunk_len = output_bytes_cap @@ -597,8 +604,8 @@ fn spawn_process_output(params: SpawnProcessOutputParams) -> tokio::task::JoinHa let cap_reached = Some(observed_num_bytes) == output_bytes_cap; if let (true, Some(process_id)) = (stream_output, process_id.as_ref()) { outgoing - .send_server_notification_to_connections( - &[connection_id], + .send_server_notification_to_connection_and_wait( + connection_id, ServerNotification::CommandExecOutputDelta( CommandExecOutputDeltaNotification { process_id: process_id.clone(), @@ -809,6 +816,7 @@ mod tests { let OutgoingEnvelope::ToConnection { connection_id, message, + .. } = envelope else { panic!("expected connection-scoped outgoing message"); @@ -891,6 +899,7 @@ mod tests { let OutgoingEnvelope::ToConnection { connection_id, message, + .. } = envelope else { panic!("expected connection-scoped outgoing message"); diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 4288d153936..8d6686c053b 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -60,6 +60,7 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; +use crate::outgoing_message::QueuedOutgoingMessage; use crate::transport::CHANNEL_CAPACITY; use crate::transport::OutboundConnectionState; use crate::transport::route_outgoing_envelope; @@ -377,7 +378,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(channel_capacity); let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); - let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); + let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); let outbound_initialized = Arc::new(AtomicBool::new(false)); let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false)); let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new())); @@ -574,10 +575,11 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } } } - outgoing_message = writer_rx.recv() => { - let Some(outgoing_message) = outgoing_message else { + queued_message = writer_rx.recv() => { + let Some(queued_message) = queued_message else { break; }; + let outgoing_message = queued_message.message; match outgoing_message { OutgoingMessage::Response(response) => { if let Some(response_tx) = pending_request_responses.remove(&response.id) { @@ -682,6 +684,9 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { } } } + if let Some(write_complete_tx) = queued_message.write_complete_tx { + let _ = write_complete_tx.send(()); + } } } } diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index a4994d34b80..de3119f1eed 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -22,6 +22,7 @@ use crate::message_processor::MessageProcessorArgs; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessageSender; +use crate::outgoing_message::QueuedOutgoingMessage; use crate::transport::CHANNEL_CAPACITY; use crate::transport::ConnectionState; use crate::transport::OutboundConnectionState; @@ -103,7 +104,7 @@ enum OutboundControlEvent { /// Register a new writer for an opened connection. Opened { connection_id: ConnectionId, - writer: mpsc::Sender, + writer: mpsc::Sender, // Allow codex/event/* notifications to be emitted. allow_legacy_notifications: bool, disconnect_sender: Option, diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index 58499745bd5..cbf812050ae 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -392,6 +392,7 @@ async fn read_response( let crate::outgoing_message::OutgoingEnvelope::ToConnection { connection_id, message, + .. } = envelope else { continue; @@ -422,6 +423,7 @@ async fn read_thread_started_notification( crate::outgoing_message::OutgoingEnvelope::ToConnection { connection_id, message, + .. } => { if connection_id != TEST_CONNECTION_ID { continue; diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 67761525b36..b1c5491bbc0 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -81,17 +81,33 @@ impl RequestContext { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) enum OutgoingEnvelope { ToConnection { connection_id: ConnectionId, message: OutgoingMessage, + write_complete_tx: Option>, }, Broadcast { message: OutgoingMessage, }, } +#[derive(Debug)] +pub(crate) struct QueuedOutgoingMessage { + pub(crate) message: OutgoingMessage, + pub(crate) write_complete_tx: Option>, +} + +impl QueuedOutgoingMessage { + pub(crate) fn new(message: OutgoingMessage) -> Self { + Self { + message, + write_complete_tx: None, + } + } +} + /// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { next_server_request_id: AtomicI64, @@ -299,6 +315,7 @@ impl OutgoingMessageSender { .send(OutgoingEnvelope::ToConnection { connection_id: *connection_id, message: outgoing_message.clone(), + write_complete_tx: None, }) .await { @@ -333,6 +350,7 @@ impl OutgoingMessageSender { .send(OutgoingEnvelope::ToConnection { connection_id, message: OutgoingMessage::Request(request), + write_complete_tx: None, }) .await { @@ -519,6 +537,7 @@ impl OutgoingMessageSender { .send(OutgoingEnvelope::ToConnection { connection_id: *connection_id, message: outgoing_message.clone(), + write_complete_tx: None, }) .await { @@ -527,6 +546,28 @@ impl OutgoingMessageSender { } } + pub(crate) async fn send_server_notification_to_connection_and_wait( + &self, + connection_id: ConnectionId, + notification: ServerNotification, + ) { + tracing::trace!("app-server event: {notification}"); + let outgoing_message = OutgoingMessage::AppServerNotification(notification); + let (write_complete_tx, write_complete_rx) = oneshot::channel(); + if let Err(err) = self + .sender + .send(OutgoingEnvelope::ToConnection { + connection_id, + message: outgoing_message, + write_complete_tx: Some(write_complete_tx), + }) + .await + { + warn!("failed to send server notification to client: {err:?}"); + } + let _ = write_complete_rx.await; + } + pub(crate) async fn send_notification_to_connections( &self, connection_ids: &[ConnectionId], @@ -551,6 +592,7 @@ impl OutgoingMessageSender { .send(OutgoingEnvelope::ToConnection { connection_id: *connection_id, message: outgoing_message.clone(), + write_complete_tx: None, }) .await { @@ -598,6 +640,7 @@ impl OutgoingMessageSender { let send_fut = self.sender.send(OutgoingEnvelope::ToConnection { connection_id, message, + write_complete_tx: None, }); let send_result = if let Some(request_context) = request_context { send_fut.instrument(request_context.span()).await @@ -858,6 +901,7 @@ mod tests { OutgoingEnvelope::ToConnection { connection_id, message, + .. } => { assert_eq!(connection_id, ConnectionId(42)); let OutgoingMessage::Response(response) = message else { @@ -920,6 +964,7 @@ mod tests { OutgoingEnvelope::ToConnection { connection_id, message, + .. } => { assert_eq!(connection_id, ConnectionId(9)); let OutgoingMessage::Error(outgoing_error) = message else { @@ -932,6 +977,50 @@ mod tests { } } + #[tokio::test] + async fn send_server_notification_to_connection_and_wait_tracks_write_completion() { + let (tx, mut rx) = mpsc::channel::(4); + let outgoing = OutgoingMessageSender::new(tx); + let send_task = tokio::spawn(async move { + outgoing + .send_server_notification_to_connection_and_wait( + ConnectionId(42), + ServerNotification::ModelRerouted(ModelReroutedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + from_model: "gpt-5.3-codex".to_string(), + to_model: "gpt-5.2".to_string(), + reason: ModelRerouteReason::HighRiskCyberActivity, + }), + ) + .await + }); + + let envelope = timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("should receive envelope before timeout") + .expect("channel should contain one message"); + let OutgoingEnvelope::ToConnection { + connection_id, + message, + write_complete_tx, + } = envelope + else { + panic!("expected targeted server notification envelope"); + }; + assert_eq!(connection_id, ConnectionId(42)); + assert!(matches!(message, OutgoingMessage::AppServerNotification(_))); + write_complete_tx + .expect("write completion sender should be attached") + .send(()) + .expect("receiver should still be waiting"); + + timeout(Duration::from_secs(1), send_task) + .await + .expect("send task should finish after write completion is signaled") + .expect("send task should not panic"); + } + #[tokio::test] async fn connection_closed_clears_registered_request_contexts() { let (tx, _rx) = mpsc::channel::(4); diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 3e24d831ae5..09cd1dcce9d 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -4,6 +4,7 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingError; use crate::outgoing_message::OutgoingMessage; +use crate::outgoing_message::QueuedOutgoingMessage; use axum::Router; use axum::body::Body; use axum::extract::ConnectInfo; @@ -187,7 +188,7 @@ impl FromStr for AppServerTransport { pub(crate) enum TransportEvent { ConnectionOpened { connection_id: ConnectionId, - writer: mpsc::Sender, + writer: mpsc::Sender, allow_legacy_notifications: bool, disconnect_sender: Option, }, @@ -227,13 +228,13 @@ pub(crate) struct OutboundConnectionState { pub(crate) experimental_api_enabled: Arc, pub(crate) opted_out_notification_methods: Arc>>, pub(crate) allow_legacy_notifications: bool, - pub(crate) writer: mpsc::Sender, + pub(crate) writer: mpsc::Sender, disconnect_sender: Option, } impl OutboundConnectionState { pub(crate) fn new( - writer: mpsc::Sender, + writer: mpsc::Sender, initialized: Arc, experimental_api_enabled: Arc, opted_out_notification_methods: Arc>>, @@ -266,7 +267,7 @@ pub(crate) async fn start_stdio_connection( stdio_handles: &mut Vec>, ) -> IoResult<()> { let connection_id = ConnectionId(0); - let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); let writer_tx_for_reader = writer_tx.clone(); transport_event_tx .send(TransportEvent::ConnectionOpened { @@ -314,8 +315,8 @@ pub(crate) async fn start_stdio_connection( stdio_handles.push(tokio::spawn(async move { let mut stdout = io::stdout(); - while let Some(outgoing_message) = writer_rx.recv().await { - let Some(mut json) = serialize_outgoing_message(outgoing_message) else { + while let Some(queued_message) = writer_rx.recv().await { + let Some(mut json) = serialize_outgoing_message(queued_message.message) else { continue; }; json.push('\n'); @@ -323,6 +324,9 @@ pub(crate) async fn start_stdio_connection( error!("Failed to write to stdout: {err}"); break; } + if let Some(write_complete_tx) = queued_message.write_complete_tx { + let _ = write_complete_tx.send(()); + } } info!("stdout writer exited (channel closed)"); })); @@ -369,7 +373,7 @@ async fn run_websocket_connection( websocket_stream: WebSocket, transport_event_tx: mpsc::Sender, ) { - let (writer_tx, writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (writer_tx, writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); let writer_tx_for_reader = writer_tx.clone(); let disconnect_token = CancellationToken::new(); if transport_event_tx @@ -421,7 +425,7 @@ async fn run_websocket_connection( async fn run_websocket_outbound_loop( mut websocket_writer: futures::stream::SplitSink, - mut writer_rx: mpsc::Receiver, + mut writer_rx: mpsc::Receiver, mut writer_control_rx: mpsc::Receiver, disconnect_token: CancellationToken, ) { @@ -438,16 +442,19 @@ async fn run_websocket_outbound_loop( break; } } - outgoing_message = writer_rx.recv() => { - let Some(outgoing_message) = outgoing_message else { + queued_message = writer_rx.recv() => { + let Some(queued_message) = queued_message else { break; }; - let Some(json) = serialize_outgoing_message(outgoing_message) else { + let Some(json) = serialize_outgoing_message(queued_message.message) else { continue; }; if websocket_writer.send(WebSocketMessage::Text(json.into())).await.is_err() { break; } + if let Some(write_complete_tx) = queued_message.write_complete_tx { + let _ = write_complete_tx.send(()); + } } } } @@ -456,7 +463,7 @@ async fn run_websocket_outbound_loop( async fn run_websocket_inbound_loop( mut websocket_reader: futures::stream::SplitStream, transport_event_tx: mpsc::Sender, - writer_tx_for_reader: mpsc::Sender, + writer_tx_for_reader: mpsc::Sender, writer_control_tx: mpsc::Sender, connection_id: ConnectionId, disconnect_token: CancellationToken, @@ -507,7 +514,7 @@ async fn run_websocket_inbound_loop( async fn forward_incoming_message( transport_event_tx: &mpsc::Sender, - writer: &mpsc::Sender, + writer: &mpsc::Sender, connection_id: ConnectionId, payload: &str, ) -> bool { @@ -524,7 +531,7 @@ async fn forward_incoming_message( async fn enqueue_incoming_message( transport_event_tx: &mpsc::Sender, - writer: &mpsc::Sender, + writer: &mpsc::Sender, connection_id: ConnectionId, message: JSONRPCMessage, ) -> bool { @@ -547,7 +554,7 @@ async fn enqueue_incoming_message( data: None, }, }); - match writer.try_send(overload_error) { + match writer.try_send(QueuedOutgoingMessage::new(overload_error)) { Ok(()) => true, Err(mpsc::error::TrySendError::Closed(_)) => false, Err(mpsc::error::TrySendError::Full(_overload_error)) => { @@ -626,6 +633,7 @@ async fn send_message_to_connection( connections: &mut HashMap, connection_id: ConnectionId, message: OutgoingMessage, + write_complete_tx: Option>, ) -> bool { let Some(connection_state) = connections.get(&connection_id) else { warn!("dropping message for disconnected connection: {connection_id:?}"); @@ -637,8 +645,12 @@ async fn send_message_to_connection( } let writer = connection_state.writer.clone(); + let queued_message = QueuedOutgoingMessage { + message, + write_complete_tx, + }; if connection_state.can_disconnect() { - match writer.try_send(message) { + match writer.try_send(queued_message) { Ok(()) => false, Err(mpsc::error::TrySendError::Full(_)) => { warn!( @@ -650,7 +662,7 @@ async fn send_message_to_connection( disconnect_connection(connections, connection_id) } } - } else if writer.send(message).await.is_err() { + } else if writer.send(queued_message).await.is_err() { disconnect_connection(connections, connection_id) } else { false @@ -689,8 +701,11 @@ pub(crate) async fn route_outgoing_envelope( OutgoingEnvelope::ToConnection { connection_id, message, + write_complete_tx, } => { - let _ = send_message_to_connection(connections, connection_id, message).await; + let _ = + send_message_to_connection(connections, connection_id, message, write_complete_tx) + .await; } OutgoingEnvelope::Broadcast { message } => { let target_connections: Vec = connections @@ -708,7 +723,8 @@ pub(crate) async fn route_outgoing_envelope( for connection_id in target_connections { let _ = - send_message_to_connection(connections, connection_id, message.clone()).await; + send_message_to_connection(connections, connection_id, message.clone(), None) + .await; } } } @@ -817,7 +833,8 @@ mod tests { .recv() .await .expect("request should receive overload error"); - let overload_json = serde_json::to_value(overload).expect("serialize overload error"); + let overload_json = + serde_json::to_value(overload.message).expect("serialize overload error"); assert_eq!( overload_json, json!({ @@ -921,12 +938,12 @@ mod tests { .expect("transport queue should accept first message"); writer_tx - .send(OutgoingMessage::Notification( + .send(QueuedOutgoingMessage::new(OutgoingMessage::Notification( crate::outgoing_message::OutgoingNotification { method: "queued".to_string(), params: None, }, - )) + ))) .await .expect("writer queue should accept first message"); @@ -949,7 +966,8 @@ mod tests { .recv() .await .expect("writer queue should still contain original message"); - let queued_json = serde_json::to_value(queued_outgoing).expect("serialize queued message"); + let queued_json = + serde_json::to_value(queued_outgoing.message).expect("serialize queued message"); assert_eq!(queued_json, json!({ "method": "queued" })); } @@ -985,6 +1003,7 @@ mod tests { params: None, }, ), + write_complete_tx: None, }, ) .await; @@ -1023,6 +1042,7 @@ mod tests { params: None, }, ), + write_complete_tx: None, }, ) .await; @@ -1061,6 +1081,7 @@ mod tests { params: None, }, ), + write_complete_tx: None, }, ) .await; @@ -1070,7 +1091,7 @@ mod tests { .await .expect("legacy notification should reach in-process clients"); assert!(matches!( - message, + message.message, OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification { method, params: None, @@ -1132,6 +1153,7 @@ mod tests { available_decisions: None, }, }), + write_complete_tx: None, }, ) .await; @@ -1140,7 +1162,7 @@ mod tests { .recv() .await .expect("request should be delivered to the connection"); - let json = serde_json::to_value(message).expect("request should serialize"); + let json = serde_json::to_value(message.message).expect("request should serialize"); assert_eq!(json["params"].get("additionalPermissions"), None); assert_eq!(json["params"].get("skillMetadata"), None); } @@ -1199,6 +1221,7 @@ mod tests { available_decisions: None, }, }), + write_complete_tx: None, }, ) .await; @@ -1207,7 +1230,7 @@ mod tests { .recv() .await .expect("request should be delivered to the connection"); - let json = serde_json::to_value(message).expect("request should serialize"); + let json = serde_json::to_value(message.message).expect("request should serialize"); let allowed_path = absolute_path("/tmp/allowed").to_string_lossy().into_owned(); assert_eq!( json["params"]["additionalPermissions"], @@ -1268,7 +1291,7 @@ mod tests { params: None, }); slow_writer_tx - .try_send(queued_message) + .try_send(QueuedOutgoingMessage::new(queued_message)) .expect("channel should have room"); let broadcast_message = @@ -1299,7 +1322,7 @@ mod tests { .try_recv() .expect("slow connection should retain its original buffered message"); assert!(matches!( - slow_message, + slow_message.message, OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification { method, params: None, @@ -1312,12 +1335,12 @@ mod tests { let connection_id = ConnectionId(3); let (writer_tx, mut writer_rx) = mpsc::channel(1); writer_tx - .send(OutgoingMessage::Notification( + .send(QueuedOutgoingMessage::new(OutgoingMessage::Notification( crate::outgoing_message::OutgoingNotification { method: "queued".to_string(), params: None, }, - )) + ))) .await .expect("channel should accept the first queued message"); @@ -1345,6 +1368,7 @@ mod tests { params: None, }, ), + write_complete_tx: None, }, ) .await @@ -1360,7 +1384,7 @@ mod tests { .expect("routing task should succeed"); assert!(matches!( - first, + first.message, OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification { method, params: None, From ad76e40f8ca93b5f770877d617a27d2acdf44a0a Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Mon, 23 Mar 2026 13:22:51 -0700 Subject: [PATCH 2/4] lint --- codex-rs/app-server/src/transport.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 09cd1dcce9d..d8a7b03bdb4 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -722,9 +722,13 @@ pub(crate) async fn route_outgoing_envelope( .collect(); for connection_id in target_connections { - let _ = - send_message_to_connection(connections, connection_id, message.clone(), None) - .await; + let _ = send_message_to_connection( + connections, + connection_id, + message.clone(), + /*write_complete_tx*/ None, + ) + .await; } } } From c346a3a39bbaa1444b2a09f7c5ac0ff69e121a03 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Tue, 24 Mar 2026 10:45:50 -0700 Subject: [PATCH 3/4] fix --- codex-rs/app-server/src/lib.rs | 4 -- codex-rs/app-server/src/transport.rs | 94 +++++++++++++--------------- 2 files changed, 45 insertions(+), 53 deletions(-) diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 2dd9d770242..63f72d5224b 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -105,8 +105,6 @@ enum OutboundControlEvent { Opened { connection_id: ConnectionId, writer: mpsc::Sender, - // Allow codex/event/* notifications to be emitted. - allow_legacy_notifications: bool, disconnect_sender: Option, initialized: Arc, experimental_api_enabled: Arc, @@ -563,7 +561,6 @@ pub async fn run_main_with_transport( OutboundControlEvent::Opened { connection_id, writer, - allow_legacy_notifications, disconnect_sender, initialized, experimental_api_enabled, @@ -576,7 +573,6 @@ pub async fn run_main_with_transport( initialized, experimental_api_enabled, opted_out_notification_methods, - allow_legacy_notifications, disconnect_sender, ), ); diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 80a5cfbf1fd..17c9ea764b8 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -189,7 +189,6 @@ pub(crate) enum TransportEvent { ConnectionOpened { connection_id: ConnectionId, writer: mpsc::Sender, - allow_legacy_notifications: bool, disconnect_sender: Option, }, ConnectionClosed { @@ -227,7 +226,6 @@ pub(crate) struct OutboundConnectionState { pub(crate) initialized: Arc, pub(crate) experimental_api_enabled: Arc, pub(crate) opted_out_notification_methods: Arc>>, - pub(crate) allow_legacy_notifications: bool, pub(crate) writer: mpsc::Sender, disconnect_sender: Option, } @@ -238,14 +236,12 @@ impl OutboundConnectionState { initialized: Arc, experimental_api_enabled: Arc, opted_out_notification_methods: Arc>>, - allow_legacy_notifications: bool, disconnect_sender: Option, ) -> Self { Self { initialized, experimental_api_enabled, opted_out_notification_methods, - allow_legacy_notifications, writer, disconnect_sender, } @@ -273,7 +269,6 @@ pub(crate) async fn start_stdio_connection( .send(TransportEvent::ConnectionOpened { connection_id, writer: writer_tx, - allow_legacy_notifications: true, disconnect_sender: None, }) .await @@ -380,7 +375,6 @@ async fn run_websocket_connection( .send(TransportEvent::ConnectionOpened { connection_id, writer: writer_tx, - allow_legacy_notifications: false, disconnect_sender: Some(disconnect_token.clone()), }) .await @@ -931,12 +925,16 @@ mod tests { .expect("transport queue should accept first message"); writer_tx - .send(QueuedOutgoingMessage::new(OutgoingMessage::Notification( - crate::outgoing_message::OutgoingNotification { - method: "queued".to_string(), - params: None, - }, - ))) + .send(QueuedOutgoingMessage::new( + OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { + summary: "queued".to_string(), + details: None, + path: None, + range: None, + }, + )), + )) .await .expect("writer queue should accept first message"); @@ -980,7 +978,6 @@ mod tests { initialized, Arc::new(AtomicBool::new(true)), opted_out_notification_methods, - false, None, ), ); @@ -1009,7 +1006,7 @@ mod tests { } #[tokio::test] - async fn to_connection_legacy_notifications_are_dropped_for_external_clients() { + async fn to_connection_notifications_are_dropped_for_opted_out_clients() { let connection_id = ConnectionId(10); let (writer_tx, mut writer_rx) = mpsc::channel(1); @@ -1020,8 +1017,7 @@ mod tests { writer_tx, Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), - Arc::new(RwLock::new(HashSet::new())), - false, + Arc::new(RwLock::new(HashSet::from(["configWarning".to_string()]))), None, ), ); @@ -1030,12 +1026,14 @@ mod tests { &mut connections, OutgoingEnvelope::ToConnection { connection_id, - message: OutgoingMessage::Notification( - crate::outgoing_message::OutgoingNotification { - method: "codex/event/task_started".to_string(), - params: None, + message: OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { + summary: "task_started".to_string(), + details: None, + path: None, + range: None, }, - ), + )), write_complete_tx: None, }, ) @@ -1043,12 +1041,12 @@ mod tests { assert!( writer_rx.try_recv().is_err(), - "legacy notifications should not reach external clients" + "opted-out notifications should not reach clients" ); } #[tokio::test] - async fn to_connection_legacy_notifications_are_preserved_for_in_process_clients() { + async fn to_connection_notifications_are_preserved_for_non_opted_out_clients() { let connection_id = ConnectionId(11); let (writer_tx, mut writer_rx) = mpsc::channel(1); @@ -1060,7 +1058,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), Arc::new(RwLock::new(HashSet::new())), - true, None, ), ); @@ -1069,12 +1066,14 @@ mod tests { &mut connections, OutgoingEnvelope::ToConnection { connection_id, - message: OutgoingMessage::Notification( - crate::outgoing_message::OutgoingNotification { - method: "codex/event/task_started".to_string(), - params: None, + message: OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { + summary: "task_started".to_string(), + details: None, + path: None, + range: None, }, - ), + )), write_complete_tx: None, }, ) @@ -1083,13 +1082,12 @@ mod tests { let message = writer_rx .recv() .await - .expect("legacy notification should reach in-process clients"); + .expect("notification should reach non-opted-out clients"); assert!(matches!( message.message, - OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification { - method, - params: None, - }) if method == "codex/event/task_started" + OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { summary, .. } + )) if summary == "task_started" )); } @@ -1106,7 +1104,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(false)), Arc::new(RwLock::new(HashSet::new())), - false, None, ), ); @@ -1174,7 +1171,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), Arc::new(RwLock::new(HashSet::new())), - false, None, ), ); @@ -1263,7 +1259,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), Arc::new(RwLock::new(HashSet::new())), - false, Some(fast_disconnect_token.clone()), ), ); @@ -1274,7 +1269,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), Arc::new(RwLock::new(HashSet::new())), - false, Some(slow_disconnect_token.clone()), ), ); @@ -1339,12 +1333,16 @@ mod tests { let connection_id = ConnectionId(3); let (writer_tx, mut writer_rx) = mpsc::channel(1); writer_tx - .send(QueuedOutgoingMessage::new(OutgoingMessage::Notification( - crate::outgoing_message::OutgoingNotification { - method: "queued".to_string(), - params: None, - }, - ))) + .send(QueuedOutgoingMessage::new( + OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { + summary: "queued".to_string(), + details: None, + path: None, + range: None, + }, + )), + )) .await .expect("channel should accept the first queued message"); @@ -1356,7 +1354,6 @@ mod tests { Arc::new(AtomicBool::new(true)), Arc::new(AtomicBool::new(true)), Arc::new(RwLock::new(HashSet::new())), - false, None, ), ); @@ -1391,10 +1388,9 @@ mod tests { assert!(matches!( first.message, - OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification { - method, - params: None, - }) if method == "queued" + OutgoingMessage::AppServerNotification(ServerNotification::ConfigWarning( + ConfigWarningNotification { summary, .. } + )) if summary == "queued" )); let second = writer_rx .try_recv() From d954aa21f35612eabdb4919c618d728f145be440 Mon Sep 17 00:00:00 2001 From: Ruslan Nigmatullin Date: Tue, 24 Mar 2026 11:00:44 -0700 Subject: [PATCH 4/4] fix --- codex-rs/app-server/src/transport.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index 17c9ea764b8..21fbc9965bb 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -959,7 +959,16 @@ mod tests { .expect("writer queue should still contain original message"); let queued_json = serde_json::to_value(queued_outgoing.message).expect("serialize queued message"); - assert_eq!(queued_json, json!({ "method": "queued" })); + assert_eq!( + queued_json, + json!({ + "method": "configWarning", + "params": { + "summary": "queued", + "details": null, + }, + }) + ); } #[tokio::test]