From 25e19f169403ed08ff63982c41a3bfbe4ada1fe6 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 10:26:47 -0700 Subject: [PATCH 1/3] Split realtime websocket methods by version Extract the version-specific request builders into common, v1, and v2 modules while preserving the existing runtime behavior. Co-authored-by: Codex --- .../endpoint/realtime_websocket/methods.rs | 135 +++--------------- .../realtime_websocket/methods_common.rs | 67 +++++++++ .../endpoint/realtime_websocket/methods_v1.rs | 60 ++++++++ .../endpoint/realtime_websocket/methods_v2.rs | 103 +++++++++++++ .../src/endpoint/realtime_websocket/mod.rs | 3 + 5 files changed, 250 insertions(+), 118 deletions(-) create mode 100644 codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs create mode 100644 codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs create mode 100644 codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index e62649e42e9..5082f6314cf 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -1,7 +1,8 @@ -use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem; -use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; -use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; -use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; +use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message; +use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message; +use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode; +use crate::endpoint::realtime_websocket::methods_common::session_update_session; +use crate::endpoint::realtime_websocket::methods_common::websocket_intent; use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame; use crate::endpoint::realtime_websocket::protocol::RealtimeEvent; use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser; @@ -10,13 +11,6 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta; use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry; -use crate::endpoint::realtime_websocket::protocol::SessionAudio; -use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; -use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; -use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; -use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; -use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool; -use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; use crate::endpoint::realtime_websocket::protocol::parse_realtime_event; use crate::error::ApiError; use crate::provider::Provider; @@ -26,7 +20,6 @@ use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; -use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -47,22 +40,6 @@ use tracing::trace; use tungstenite::protocol::WebSocketConfig; use url::Url; -const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000; -const REALTIME_V1_SESSION_TYPE: &str = "quicksilver"; -const REALTIME_V2_SESSION_TYPE: &str = "realtime"; -const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex"; -const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result."; - -fn normalized_session_mode( - event_parser: RealtimeEventParser, - session_mode: RealtimeSessionMode, -) -> RealtimeSessionMode { - match event_parser { - RealtimeEventParser::V1 => RealtimeSessionMode::Conversational, - RealtimeEventParser::RealtimeV2 => session_mode, - } -} - struct WsStream { tx_command: mpsc::Sender, pump_task: tokio::task::JoinHandle<()>, @@ -300,21 +277,8 @@ impl RealtimeWebsocketWriter { } pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> { - let content_kind = match self.event_parser { - RealtimeEventParser::V1 => "text", - RealtimeEventParser::RealtimeV2 => "input_text", - }; - self.send_json(RealtimeOutboundMessage::ConversationItemCreate { - item: ConversationItemPayload::Message(ConversationMessageItem { - kind: "message".to_string(), - role: "user".to_string(), - content: vec![ConversationItemContent { - kind: content_kind.to_string(), - text, - }], - }), - }) - .await + self.send_json(conversation_item_create_message(self.event_parser, text)) + .await } pub async fn send_conversation_handoff_append( @@ -322,23 +286,12 @@ impl RealtimeWebsocketWriter { handoff_id: String, output_text: String, ) -> Result<(), ApiError> { - let message = match self.event_parser { - RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend { - handoff_id, - output_text, - }, - RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate { - item: ConversationItemPayload::FunctionCallOutput( - ConversationFunctionCallOutputItem { - kind: "function_call_output".to_string(), - call_id: handoff_id, - output: output_text, - }, - ), - }, - }; - - self.send_json(message).await + self.send_json(conversation_handoff_append_message( + self.event_parser, + handoff_id, + output_text, + )) + .await } pub async fn send_session_update( @@ -347,60 +300,9 @@ impl RealtimeWebsocketWriter { session_mode: RealtimeSessionMode, ) -> Result<(), ApiError> { let session_mode = normalized_session_mode(self.event_parser, session_mode); - let (session_kind, session_instructions, output_audio) = match session_mode { - RealtimeSessionMode::Conversational => { - let kind = match self.event_parser { - RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(), - RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(), - }; - let voice = match self.event_parser { - RealtimeEventParser::V1 => SessionAudioVoice::Fathom, - RealtimeEventParser::RealtimeV2 => SessionAudioVoice::Alloy, - }; - (kind, Some(instructions), Some(SessionAudioOutput { voice })) - } - RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None), - }; - let tools = match (self.event_parser, session_mode) { - (RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Conversational) => { - Some(vec![SessionFunctionTool { - kind: "function".to_string(), - name: REALTIME_V2_CODEX_TOOL_NAME.to_string(), - description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(), - parameters: json!({ - "type": "object", - "properties": { - "prompt": { - "type": "string", - "description": "Prompt text for the delegated Codex task." - } - }, - "required": ["prompt"], - "additionalProperties": false - }), - }]) - } - (RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Transcription) - | (RealtimeEventParser::V1, RealtimeSessionMode::Conversational) - | (RealtimeEventParser::V1, RealtimeSessionMode::Transcription) => None, - }; - self.send_json(RealtimeOutboundMessage::SessionUpdate { - session: SessionUpdateSession { - kind: session_kind, - instructions: session_instructions, - audio: SessionAudio { - input: SessionAudioInput { - format: SessionAudioFormat { - kind: "audio/pcm".to_string(), - rate: REALTIME_AUDIO_SAMPLE_RATE, - }, - }, - output: output_audio, - }, - tools, - }, - }) - .await + let session = session_update_session(self.event_parser, instructions, session_mode); + self.send_json(RealtimeOutboundMessage::SessionUpdate { session }) + .await } pub async fn close(&self) -> Result<(), ApiError> { @@ -655,10 +557,7 @@ fn websocket_url_from_api_url( } } - let intent = match event_parser { - RealtimeEventParser::V1 => Some("quicksilver"), - RealtimeEventParser::RealtimeV2 => None, - }; + let intent = websocket_intent(event_parser); let has_extra_query_params = query_params.is_some_and(|query_params| { query_params .iter() diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs new file mode 100644 index 00000000000..4a5013c6565 --- /dev/null +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs @@ -0,0 +1,67 @@ +use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append_message as v1_conversation_handoff_append_message; +use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message; +use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session; +use crate::endpoint::realtime_websocket::methods_v1::websocket_intent as v1_websocket_intent; +use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message; +use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message; +use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session; +use crate::endpoint::realtime_websocket::methods_v2::websocket_intent as v2_websocket_intent; +use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser; +use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; +use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; +use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; + +pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000; +pub(super) const REALTIME_AUDIO_FORMAT: &str = "audio/pcm"; + +pub(super) fn normalized_session_mode( + event_parser: RealtimeEventParser, + session_mode: RealtimeSessionMode, +) -> RealtimeSessionMode { + match event_parser { + RealtimeEventParser::V1 => RealtimeSessionMode::Conversational, + RealtimeEventParser::RealtimeV2 => session_mode, + } +} + +pub(super) fn conversation_item_create_message( + event_parser: RealtimeEventParser, + text: String, +) -> RealtimeOutboundMessage { + match event_parser { + RealtimeEventParser::V1 => v1_conversation_item_create_message(text), + RealtimeEventParser::RealtimeV2 => v2_conversation_item_create_message(text), + } +} + +pub(super) fn conversation_handoff_append_message( + event_parser: RealtimeEventParser, + handoff_id: String, + output_text: String, +) -> RealtimeOutboundMessage { + match event_parser { + RealtimeEventParser::V1 => v1_conversation_handoff_append_message(handoff_id, output_text), + RealtimeEventParser::RealtimeV2 => { + v2_conversation_handoff_append_message(handoff_id, output_text) + } + } +} + +pub(super) fn session_update_session( + event_parser: RealtimeEventParser, + instructions: String, + session_mode: RealtimeSessionMode, +) -> SessionUpdateSession { + let session_mode = normalized_session_mode(event_parser, session_mode); + match event_parser { + RealtimeEventParser::V1 => v1_session_update_session(instructions), + RealtimeEventParser::RealtimeV2 => v2_session_update_session(instructions, session_mode), + } +} + +pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> { + match event_parser { + RealtimeEventParser::V1 => v1_websocket_intent(), + RealtimeEventParser::RealtimeV2 => v2_websocket_intent(), + } +} diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs new file mode 100644 index 00000000000..8280c4d9a77 --- /dev/null +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs @@ -0,0 +1,60 @@ +use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT; +use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE; +use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; +use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; +use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; +use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; +use crate::endpoint::realtime_websocket::protocol::SessionAudio; +use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; +use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; +use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; + +const REALTIME_V1_SESSION_TYPE: &str = "quicksilver"; + +pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { + RealtimeOutboundMessage::ConversationItemCreate { + item: ConversationItemPayload::Message(ConversationMessageItem { + kind: "message".to_string(), + role: "user".to_string(), + content: vec![ConversationItemContent { + kind: "text".to_string(), + text, + }], + }), + } +} + +pub(super) fn conversation_handoff_append_message( + handoff_id: String, + output_text: String, +) -> RealtimeOutboundMessage { + RealtimeOutboundMessage::ConversationHandoffAppend { + handoff_id, + output_text, + } +} + +pub(super) fn session_update_session(instructions: String) -> SessionUpdateSession { + SessionUpdateSession { + kind: REALTIME_V1_SESSION_TYPE.to_string(), + instructions: Some(instructions), + audio: SessionAudio { + input: SessionAudioInput { + format: SessionAudioFormat { + kind: REALTIME_AUDIO_FORMAT.to_string(), + rate: REALTIME_AUDIO_SAMPLE_RATE, + }, + }, + output: Some(SessionAudioOutput { + voice: SessionAudioVoice::Fathom, + }), + }, + tools: None, + } +} + +pub(super) fn websocket_intent() -> Option<&'static str> { + Some("quicksilver") +} diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs new file mode 100644 index 00000000000..59a8f1284b3 --- /dev/null +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs @@ -0,0 +1,103 @@ +use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT; +use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE; +use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem; +use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; +use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; +use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; +use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; +use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; +use crate::endpoint::realtime_websocket::protocol::SessionAudio; +use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; +use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; +use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool; +use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; +use serde_json::json; + +const REALTIME_V2_SESSION_TYPE: &str = "realtime"; +const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex"; +const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result."; + +pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { + RealtimeOutboundMessage::ConversationItemCreate { + item: ConversationItemPayload::Message(ConversationMessageItem { + kind: "message".to_string(), + role: "user".to_string(), + content: vec![ConversationItemContent { + kind: "input_text".to_string(), + text, + }], + }), + } +} + +pub(super) fn conversation_handoff_append_message( + handoff_id: String, + output_text: String, +) -> RealtimeOutboundMessage { + RealtimeOutboundMessage::ConversationItemCreate { + item: ConversationItemPayload::FunctionCallOutput(ConversationFunctionCallOutputItem { + kind: "function_call_output".to_string(), + call_id: handoff_id, + output: output_text, + }), + } +} + +pub(super) fn session_update_session( + instructions: String, + session_mode: RealtimeSessionMode, +) -> SessionUpdateSession { + match session_mode { + RealtimeSessionMode::Conversational => SessionUpdateSession { + kind: REALTIME_V2_SESSION_TYPE.to_string(), + instructions: Some(instructions), + audio: SessionAudio { + input: SessionAudioInput { + format: SessionAudioFormat { + kind: REALTIME_AUDIO_FORMAT.to_string(), + rate: REALTIME_AUDIO_SAMPLE_RATE, + }, + }, + output: Some(SessionAudioOutput { + voice: SessionAudioVoice::Alloy, + }), + }, + tools: Some(vec![SessionFunctionTool { + kind: "function".to_string(), + name: REALTIME_V2_CODEX_TOOL_NAME.to_string(), + description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(), + parameters: json!({ + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "Prompt text for the delegated Codex task." + } + }, + "required": ["prompt"], + "additionalProperties": false + }), + }]), + }, + RealtimeSessionMode::Transcription => SessionUpdateSession { + kind: "transcription".to_string(), + instructions: None, + audio: SessionAudio { + input: SessionAudioInput { + format: SessionAudioFormat { + kind: REALTIME_AUDIO_FORMAT.to_string(), + rate: REALTIME_AUDIO_SAMPLE_RATE, + }, + }, + output: None, + }, + tools: None, + }, + } +} + +pub(super) fn websocket_intent() -> Option<&'static str> { + None +} diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs index f307e60914b..d13585034a3 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs @@ -1,4 +1,7 @@ pub mod methods; +mod methods_common; +mod methods_v1; +mod methods_v2; pub mod protocol; mod protocol_common; mod protocol_v1; From dfd06e1bf7fe5cd0de256573f82904c9365cda01 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 11:28:01 -0700 Subject: [PATCH 2/3] Handle interrupted agent status in tui_app_server Keep the rebased realtime stack compiling on current main by rendering the Interrupted multi-agent status in tui_app_server. Co-authored-by: Codex --- codex-rs/tui_app_server/src/multi_agents.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/codex-rs/tui_app_server/src/multi_agents.rs b/codex-rs/tui_app_server/src/multi_agents.rs index 672e20e1a28..61907f67fb0 100644 --- a/codex-rs/tui_app_server/src/multi_agents.rs +++ b/codex-rs/tui_app_server/src/multi_agents.rs @@ -537,6 +537,8 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> { status_summary_spans(status).into() } +// Allow `.yellow()` +#[allow(clippy::disallowed_methods)] fn status_summary_spans(status: &AgentStatus) -> Vec> { match status { AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()], From 33607d31b02819f50f1570a3ea51b8da2f3e5dca Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 15:28:24 -0700 Subject: [PATCH 3/3] Update multi_agents.rs --- codex-rs/tui_app_server/src/multi_agents.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/codex-rs/tui_app_server/src/multi_agents.rs b/codex-rs/tui_app_server/src/multi_agents.rs index 61907f67fb0..672e20e1a28 100644 --- a/codex-rs/tui_app_server/src/multi_agents.rs +++ b/codex-rs/tui_app_server/src/multi_agents.rs @@ -537,8 +537,6 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> { status_summary_spans(status).into() } -// Allow `.yellow()` -#[allow(clippy::disallowed_methods)] fn status_summary_spans(status: &AgentStatus) -> Vec> { match status { AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()],