From 06e299ea56dd1556af8a8023dacb42b79390cc81 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 11:28:01 -0700 Subject: [PATCH 01/17] Handle interrupted agent status in tui_app_server Keep the rebased realtime stack compiling on current main by rendering the Interrupted multi-agent status in tui_app_server. Co-authored-by: Codex --- codex-rs/tui_app_server/src/multi_agents.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/codex-rs/tui_app_server/src/multi_agents.rs b/codex-rs/tui_app_server/src/multi_agents.rs index 672e20e1a28..61907f67fb0 100644 --- a/codex-rs/tui_app_server/src/multi_agents.rs +++ b/codex-rs/tui_app_server/src/multi_agents.rs @@ -537,6 +537,8 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> { status_summary_spans(status).into() } +// Allow `.yellow()` +#[allow(clippy::disallowed_methods)] fn status_summary_spans(status: &AgentStatus) -> Vec> { match status { AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()], From f8c56e66ee0311f55da193aa9b37dcedfb8f59c5 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 10:44:10 -0700 Subject: [PATCH 02/17] Align main realtime v2 wire and runtime flow Port the realtime v2 session, parser, app-server, and conversation runtime behavior onto the split websocket-method base without pulling in the startup-context or TUI playback slices. Co-authored-by: Codex --- .../schema/json/ClientRequest.json | 6 + .../schema/json/ServerNotification.json | 6 + .../codex_app_server_protocol.schemas.json | 6 + .../codex_app_server_protocol.v2.schemas.json | 6 + ...dRealtimeOutputAudioDeltaNotification.json | 6 + .../typescript/v2/ThreadRealtimeAudioChunk.ts | 2 +- .../src/protocol/common.rs | 5 +- .../app-server-protocol/src/protocol/v2.rs | 5 + .../app-server/src/bespoke_event_handling.rs | 28 ++ .../tests/suite/v2/realtime_conversation.rs | 1 + .../endpoint/realtime_websocket/methods.rs | 199 +++++++++- .../endpoint/realtime_websocket/methods_v1.rs | 5 + .../endpoint/realtime_websocket/methods_v2.rs | 31 +- .../endpoint/realtime_websocket/protocol.rs | 43 ++- .../realtime_websocket/protocol_v1.rs | 1 + .../realtime_websocket/protocol_v2.rs | 60 ++- .../codex-api/tests/realtime_websocket_e2e.rs | 3 + codex-rs/core/src/codex.rs | 3 + codex-rs/core/src/codex_tests.rs | 1 + codex-rs/core/src/realtime_conversation.rs | 347 ++++++++++++++++-- .../core/src/realtime_conversation_tests.rs | 8 +- .../core/tests/suite/realtime_conversation.rs | 5 + codex-rs/protocol/src/protocol.rs | 15 + 23 files changed, 744 insertions(+), 48 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 6ccec6fe882..63991ef476d 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2771,6 +2771,12 @@ "data": { "type": "string" }, + "itemId": { + "type": [ + "string", + "null" + ] + }, "numChannels": { "format": "uint16", "minimum": 0.0, diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 7303fa1ca7c..14908dbb1f7 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -2750,6 +2750,12 @@ "data": { "type": "string" }, + "itemId": { + "type": [ + "string", + "null" + ] + }, "numChannels": { "format": "uint16", "minimum": 0.0, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index bfb716c9976..e8440bcdb98 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -12803,6 +12803,12 @@ "data": { "type": "string" }, + "itemId": { + "type": [ + "string", + "null" + ] + }, "numChannels": { "format": "uint16", "minimum": 0.0, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index b726a187647..4aab5ff8139 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -10563,6 +10563,12 @@ "data": { "type": "string" }, + "itemId": { + "type": [ + "string", + "null" + ] + }, "numChannels": { "format": "uint16", "minimum": 0.0, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeOutputAudioDeltaNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeOutputAudioDeltaNotification.json index d4df6194fa5..6c75f675539 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeOutputAudioDeltaNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeOutputAudioDeltaNotification.json @@ -7,6 +7,12 @@ "data": { "type": "string" }, + "itemId": { + "type": [ + "string", + "null" + ] + }, "numChannels": { "format": "uint16", "minimum": 0.0, diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeAudioChunk.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeAudioChunk.ts index 078f6422472..eefb79dd656 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeAudioChunk.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeAudioChunk.ts @@ -5,4 +5,4 @@ /** * EXPERIMENTAL - thread realtime audio chunk. */ -export type ThreadRealtimeAudioChunk = { data: string, sampleRate: number, numChannels: number, samplesPerChannel: number | null, }; +export type ThreadRealtimeAudioChunk = { data: string, sampleRate: number, numChannels: number, samplesPerChannel: number | null, itemId: string | null, }; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 75aa7768d1a..73139a2e09b 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1577,6 +1577,7 @@ mod tests { sample_rate: 24_000, num_channels: 1, samples_per_channel: Some(512), + item_id: None, }, }, ); @@ -1589,7 +1590,8 @@ mod tests { "data": "AQID", "sampleRate": 24000, "numChannels": 1, - "samplesPerChannel": 512 + "samplesPerChannel": 512, + "itemId": null } } }), @@ -1641,6 +1643,7 @@ mod tests { sample_rate: 24_000, num_channels: 1, samples_per_channel: Some(512), + item_id: None, }, }, ); diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index c5546f49b71..6ae4763d509 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3652,6 +3652,7 @@ pub struct ThreadRealtimeAudioChunk { pub sample_rate: u32, pub num_channels: u16, pub samples_per_channel: Option, + pub item_id: Option, } impl From for ThreadRealtimeAudioChunk { @@ -3661,12 +3662,14 @@ impl From for ThreadRealtimeAudioChunk { sample_rate, num_channels, samples_per_channel, + item_id, } = value; Self { data, sample_rate, num_channels, samples_per_channel, + item_id, } } } @@ -3678,12 +3681,14 @@ impl From for CoreRealtimeAudioFrame { sample_rate, num_channels, samples_per_channel, + item_id, } = value; Self { data, sample_rate, num_channels, samples_per_channel, + item_id, } } } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 0595110fa0a..a3f0994573b 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -350,6 +350,20 @@ pub(crate) async fn apply_bespoke_event_handling( if let ApiVersion::V2 = api_version { match event.payload { RealtimeEvent::SessionUpdated { .. } => {} + RealtimeEvent::InputAudioSpeechStarted(event) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item: serde_json::json!({ + "type": "input_audio_buffer.speech_started", + "item_id": event.item_id, + }), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(audio) => { @@ -363,6 +377,20 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } + RealtimeEvent::ResponseCancelled(event) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item: serde_json::json!({ + "type": "response.cancelled", + "response_id": event.response_id, + }), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } RealtimeEvent::ConversationItemAdded(item) => { let notification = ThreadRealtimeItemAddedNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index a771fb87447..7d472c50361 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -135,6 +135,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { sample_rate: 24_000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, }) .await?; diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 5082f6314cf..22fb3276a14 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -231,6 +231,21 @@ impl RealtimeWebsocketConnection { .await } + pub async fn send_response_create(&self) -> Result<(), ApiError> { + self.writer.send_response_create().await + } + + pub async fn send_conversation_item_truncate( + &self, + item_id: String, + content_index: u32, + audio_end_ms: u32, + ) -> Result<(), ApiError> { + self.writer + .send_conversation_item_truncate(item_id, content_index, audio_end_ms) + .await + } + pub async fn close(&self) -> Result<(), ApiError> { self.writer.close().await } @@ -294,6 +309,25 @@ impl RealtimeWebsocketWriter { .await } + pub async fn send_response_create(&self) -> Result<(), ApiError> { + self.send_json(RealtimeOutboundMessage::ResponseCreate) + .await + } + + pub async fn send_conversation_item_truncate( + &self, + item_id: String, + content_index: u32, + audio_end_ms: u32, + ) -> Result<(), ApiError> { + self.send_json(RealtimeOutboundMessage::ConversationItemTruncate { + item_id, + content_index, + audio_end_ms, + }) + .await + } + pub async fn send_session_update( &self, instructions: String, @@ -301,6 +335,14 @@ impl RealtimeWebsocketWriter { ) -> Result<(), ApiError> { let session_mode = normalized_session_mode(self.event_parser, session_mode); let session = session_update_session(self.event_parser, instructions, session_mode); + debug!( + event_parser = ?self.event_parser, + session_mode = ?session_mode, + instructions_len = session.instructions.as_ref().map(String::len).unwrap_or_default(), + has_output_audio = session.audio.output.is_some(), + has_tools = session.tools.is_some(), + "realtime websocket prepared session.update" + ); self.send_json(RealtimeOutboundMessage::SessionUpdate { session }) .await } @@ -392,6 +434,7 @@ impl RealtimeWebsocketEvents { async fn update_active_transcript(&self, event: &mut RealtimeEvent) { let mut active_transcript = self.active_transcript.lock().await; match event { + RealtimeEvent::InputAudioSpeechStarted(_) => {} RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => { append_transcript_delta(&mut active_transcript.entries, "user", delta); } @@ -399,10 +442,18 @@ impl RealtimeWebsocketEvents { append_transcript_delta(&mut active_transcript.entries, "assistant", delta); } RealtimeEvent::HandoffRequested(handoff) => { + debug!( + handoff_id = handoff.handoff_id, + item_id = handoff.item_id, + input_len = handoff.input_transcript.len(), + transcript_entries = active_transcript.entries.len(), + "realtime websocket parsed codex function call" + ); handoff.active_transcript = std::mem::take(&mut active_transcript.entries); } RealtimeEvent::SessionUpdated { .. } | RealtimeEvent::AudioOut(_) + | RealtimeEvent::ResponseCancelled(_) | RealtimeEvent::ConversationItemAdded(_) | RealtimeEvent::ConversationItemDone { .. } | RealtimeEvent::Error(_) => {} @@ -486,7 +537,11 @@ impl RealtimeWebsocketClient { let (stream, rx_message) = WsStream::new(stream); let connection = RealtimeWebsocketConnection::new(stream, rx_message, config.event_parser); debug!( + event_parser = ?config.event_parser, + session_mode = ?config.session_mode, + model = config.model.as_deref().unwrap_or(""), session_id = config.session_id.as_deref().unwrap_or(""), + instructions_len = config.instructions.len(), "realtime websocket sending session.update" ); connection @@ -616,6 +671,8 @@ mod tests { use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested; use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta; use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry; + use codex_protocol::protocol::RealtimeInputAudioSpeechStarted; + use codex_protocol::protocol::RealtimeResponseCancelled; use http::HeaderValue; use pretty_assertions::assert_eq; use serde_json::Value; @@ -660,6 +717,7 @@ mod tests { sample_rate: 48000, num_channels: 1, samples_per_channel: Some(960), + item_id: None, })) ); } @@ -809,10 +867,112 @@ mod tests { sample_rate: 24_000, num_channels: 1, samples_per_channel: None, + item_id: None, + })) + ); + } + + #[test] + fn parse_realtime_v2_response_audio_delta_with_item_id() { + let payload = json!({ + "type": "response.audio.delta", + "delta": "AQID", + "item_id": "item_audio_1" + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::AudioOut(RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24_000, + num_channels: 1, + samples_per_channel: None, + item_id: Some("item_audio_1".to_string()), })) ); } + #[test] + fn parse_realtime_v2_speech_started_event() { + let payload = json!({ + "type": "input_audio_buffer.speech_started", + "item_id": "item_input_1" + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::InputAudioSpeechStarted( + RealtimeInputAudioSpeechStarted { + item_id: Some("item_input_1".to_string()), + } + )) + ); + } + + #[test] + fn parse_realtime_v2_response_cancelled_event() { + let payload = json!({ + "type": "response.cancelled", + "response": {"id": "resp_cancelled_1"} + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::ResponseCancelled( + RealtimeResponseCancelled { + response_id: Some("resp_cancelled_1".to_string()), + } + )) + ); + } + + #[test] + fn parse_realtime_v2_response_done_handoff_event() { + let payload = json!({ + "type": "response.done", + "response": { + "output": [{ + "id": "item_123", + "type": "function_call", + "name": "codex", + "call_id": "call_123", + "arguments": "{\"prompt\":\"delegate from done\"}" + }] + } + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: "call_123".to_string(), + item_id: "item_123".to_string(), + input_transcript: "delegate from done".to_string(), + active_transcript: Vec::new(), + })) + ); + } + + #[test] + fn parse_realtime_v2_response_created_event() { + let payload = json!({ + "type": "response.created", + "response": {"id": "resp_created_1"} + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::ConversationItemAdded(json!({ + "type": "response.created", + "response": {"id": "resp_created_1"} + }))) + ); + } + #[test] fn merge_request_headers_matches_http_precedence() { let mut provider_headers = HeaderMap::new(); @@ -1169,6 +1329,7 @@ mod tests { sample_rate: 48000, num_channels: 1, samples_per_channel: Some(960), + item_id: None, }) .await .expect("send audio"); @@ -1196,6 +1357,7 @@ mod tests { sample_rate: 48000, num_channels: 1, samples_per_channel: None, + item_id: None, }) ); @@ -1285,9 +1447,38 @@ mod tests { first_json["session"]["type"], Value::String("realtime".to_string()) ); + assert_eq!(first_json["session"]["output_modalities"], json!(["audio"])); + assert_eq!( + first_json["session"]["audio"]["input"]["format"], + json!({ + "type": "audio/pcm", + "rate": 24_000, + }) + ); + assert_eq!( + first_json["session"]["audio"]["input"]["noise_reduction"], + json!({ + "type": "near_field", + }) + ); + assert_eq!( + first_json["session"]["audio"]["input"]["turn_detection"], + json!({ + "type": "server_vad", + "interrupt_response": true, + "create_response": true, + }) + ); + assert_eq!( + first_json["session"]["audio"]["output"]["format"], + json!({ + "type": "audio/pcm", + "rate": 24_000, + }) + ); assert_eq!( first_json["session"]["audio"]["output"]["voice"], - Value::String("alloy".to_string()) + Value::String("marin".to_string()) ); assert_eq!( first_json["session"]["tools"][0]["type"], @@ -1301,6 +1492,10 @@ mod tests { first_json["session"]["tools"][0]["parameters"]["required"], json!(["prompt"]) ); + assert_eq!( + first_json["session"]["tool_choice"], + Value::String("auto".to_string()) + ); ws.send(Message::Text( json!({ @@ -1511,6 +1706,7 @@ mod tests { sample_rate: 24_000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }) .await .expect("send audio"); @@ -1690,6 +1886,7 @@ mod tests { sample_rate: 48000, num_channels: 1, samples_per_channel: Some(960), + item_id: None, }), ) .await diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs index 8280c4d9a77..429d06b0053 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs @@ -40,18 +40,23 @@ pub(super) fn session_update_session(instructions: String) -> SessionUpdateSessi SessionUpdateSession { kind: REALTIME_V1_SESSION_TYPE.to_string(), instructions: Some(instructions), + output_modalities: None, audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { kind: REALTIME_AUDIO_FORMAT.to_string(), rate: REALTIME_AUDIO_SAMPLE_RATE, }, + noise_reduction: None, + turn_detection: None, }, output: Some(SessionAudioOutput { + format: None, voice: SessionAudioVoice::Fathom, }), }, tools: None, + tool_choice: None, } } diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs index 59a8f1284b3..50de80610e7 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs @@ -10,14 +10,21 @@ use crate::endpoint::realtime_websocket::protocol::SessionAudio; use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioOutputFormat; use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool; +use crate::endpoint::realtime_websocket::protocol::SessionNoiseReduction; +use crate::endpoint::realtime_websocket::protocol::SessionTurnDetection; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; use serde_json::json; +const REALTIME_V2_NOISE_REDUCTION: &str = "near_field"; +const REALTIME_V2_TURN_DETECTION: &str = "server_vad"; +const REALTIME_V2_OUTPUT_MODALITY_AUDIO: &str = "audio"; +const REALTIME_V2_TOOL_CHOICE: &str = "auto"; const REALTIME_V2_SESSION_TYPE: &str = "realtime"; const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex"; -const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result."; +const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate a request to Codex and return the final result to the user. Use this as the default action. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later."; pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { @@ -53,15 +60,28 @@ pub(super) fn session_update_session( RealtimeSessionMode::Conversational => SessionUpdateSession { kind: REALTIME_V2_SESSION_TYPE.to_string(), instructions: Some(instructions), + output_modalities: Some(vec![REALTIME_V2_OUTPUT_MODALITY_AUDIO.to_string()]), audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { kind: REALTIME_AUDIO_FORMAT.to_string(), rate: REALTIME_AUDIO_SAMPLE_RATE, }, + noise_reduction: Some(SessionNoiseReduction { + kind: REALTIME_V2_NOISE_REDUCTION.to_string(), + }), + turn_detection: Some(SessionTurnDetection { + kind: REALTIME_V2_TURN_DETECTION.to_string(), + interrupt_response: true, + create_response: true, + }), }, output: Some(SessionAudioOutput { - voice: SessionAudioVoice::Alloy, + format: Some(SessionAudioOutputFormat { + kind: REALTIME_AUDIO_FORMAT.to_string(), + rate: REALTIME_AUDIO_SAMPLE_RATE, + }), + voice: SessionAudioVoice::Marin, }), }, tools: Some(vec![SessionFunctionTool { @@ -73,27 +93,32 @@ pub(super) fn session_update_session( "properties": { "prompt": { "type": "string", - "description": "Prompt text for the delegated Codex task." + "description": "The user request to delegate to Codex." } }, "required": ["prompt"], "additionalProperties": false }), }]), + tool_choice: Some(REALTIME_V2_TOOL_CHOICE.to_string()), }, RealtimeSessionMode::Transcription => SessionUpdateSession { kind: "transcription".to_string(), instructions: None, + output_modalities: None, audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { kind: REALTIME_AUDIO_FORMAT.to_string(), rate: REALTIME_AUDIO_SAMPLE_RATE, }, + noise_reduction: None, + turn_detection: None, }, output: None, }, tools: None, + tool_choice: None, }, } } diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index 73c2c1052da..a6bef882ea6 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -39,6 +39,14 @@ pub(super) enum RealtimeOutboundMessage { handoff_id: String, output_text: String, }, + #[serde(rename = "response.create")] + ResponseCreate, + #[serde(rename = "conversation.item.truncate")] + ConversationItemTruncate { + item_id: String, + content_index: u32, + audio_end_ms: u32, + }, #[serde(rename = "session.update")] SessionUpdate { session: SessionUpdateSession }, #[serde(rename = "conversation.item.create")] @@ -51,9 +59,13 @@ pub(super) struct SessionUpdateSession { pub(super) kind: String, #[serde(skip_serializing_if = "Option::is_none")] pub(super) instructions: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) output_modalities: Option>, pub(super) audio: SessionAudio, #[serde(skip_serializing_if = "Option::is_none")] pub(super) tools: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) tool_choice: Option, } #[derive(Debug, Clone, Serialize)] @@ -66,6 +78,10 @@ pub(super) struct SessionAudio { #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudioInput { pub(super) format: SessionAudioFormat, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) noise_reduction: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) turn_detection: Option, } #[derive(Debug, Clone, Serialize)] @@ -77,6 +93,8 @@ pub(super) struct SessionAudioFormat { #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudioOutput { + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) format: Option, pub(super) voice: SessionAudioVoice, } @@ -84,8 +102,29 @@ pub(super) struct SessionAudioOutput { pub(super) enum SessionAudioVoice { #[serde(rename = "fathom")] Fathom, - #[serde(rename = "alloy")] - Alloy, + #[serde(rename = "marin")] + Marin, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionNoiseReduction { + #[serde(rename = "type")] + pub(super) kind: String, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionTurnDetection { + #[serde(rename = "type")] + pub(super) kind: String, + pub(super) interrupt_response: bool, + pub(super) create_response: bool, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionAudioOutputFormat { + #[serde(rename = "type")] + pub(super) kind: String, + pub(super) rate: u32, } #[derive(Debug, Clone, Serialize)] diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs index 04e76fb447e..b66cf2b24b8 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs @@ -35,6 +35,7 @@ pub(super) fn parse_realtime_event_v1(payload: &str) -> Option { .get("samples_per_channel") .and_then(Value::as_u64) .and_then(|value| u32::try_from(value).ok()), + item_id: None, })) } "conversation.input_transcript.delta" => { diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs index 7ef318d3f92..b33007519ed 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs @@ -5,6 +5,8 @@ use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_delta use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RealtimeHandoffRequested; +use codex_protocol::protocol::RealtimeInputAudioSpeechStarted; +use codex_protocol::protocol::RealtimeResponseCancelled; use serde_json::Map as JsonMap; use serde_json::Value; use tracing::debug; @@ -19,7 +21,9 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { match message_type.as_str() { "session.updated" => parse_session_updated_event(&parsed), - "response.output_audio.delta" => parse_output_audio_delta_event(&parsed), + "response.output_audio.delta" | "response.audio.delta" => { + parse_output_audio_delta_event(&parsed) + } "conversation.item.input_audio_transcription.delta" => { parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::InputTranscriptDelta) } @@ -30,11 +34,37 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { "response.output_text.delta" | "response.output_audio_transcript.delta" => { parse_transcript_delta_event(&parsed, "delta").map(RealtimeEvent::OutputTranscriptDelta) } + "input_audio_buffer.speech_started" => Some(RealtimeEvent::InputAudioSpeechStarted( + RealtimeInputAudioSpeechStarted { + item_id: parsed + .get("item_id") + .and_then(Value::as_str) + .map(str::to_string), + }, + )), "conversation.item.added" => parsed .get("item") .cloned() .map(RealtimeEvent::ConversationItemAdded), "conversation.item.done" => parse_conversation_item_done_event(&parsed), + "response.created" => Some(RealtimeEvent::ConversationItemAdded(parsed)), + "response.done" => parse_response_done_event(parsed), + "response.cancelled" => Some(RealtimeEvent::ResponseCancelled( + RealtimeResponseCancelled { + response_id: parsed + .get("response") + .and_then(Value::as_object) + .and_then(|response| response.get("id")) + .and_then(Value::as_str) + .map(str::to_string) + .or_else(|| { + parsed + .get("response_id") + .and_then(Value::as_str) + .map(str::to_string) + }), + }, + )), "error" => parse_error_event(&parsed), _ => { debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}"); @@ -67,6 +97,10 @@ fn parse_output_audio_delta_event(parsed: &Value) -> Option { .get("samples_per_channel") .and_then(Value::as_u64) .and_then(|value| u32::try_from(value).ok()), + item_id: parsed + .get("item_id") + .and_then(Value::as_str) + .map(str::to_string), })) } @@ -82,6 +116,30 @@ fn parse_conversation_item_done_event(parsed: &Value) -> Option { .map(|item_id| RealtimeEvent::ConversationItemDone { item_id }) } +fn parse_response_done_event(parsed: Value) -> Option { + if let Some(handoff) = parse_response_done_handoff_requested_event(&parsed) { + return Some(handoff); + } + + Some(RealtimeEvent::ConversationItemAdded(parsed)) +} + +fn parse_response_done_handoff_requested_event(parsed: &Value) -> Option { + let item = parsed + .get("response") + .and_then(Value::as_object) + .and_then(|response| response.get("output")) + .and_then(Value::as_array)? + .iter() + .find(|item| { + item.get("type").and_then(Value::as_str) == Some("function_call") + && item.get("name").and_then(Value::as_str) == Some(CODEX_TOOL_NAME) + })? + .as_object()?; + + parse_handoff_requested_event(item) +} + fn parse_handoff_requested_event(item: &JsonMap) -> Option { let item_type = item.get("type").and_then(Value::as_str); let item_name = item.get("name").and_then(Value::as_str); diff --git a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs index 30786ad92da..130ab6fd353 100644 --- a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs +++ b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs @@ -170,6 +170,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { sample_rate: 48000, num_channels: 1, samples_per_channel: Some(960), + item_id: None, }) .await .expect("send audio"); @@ -186,6 +187,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { sample_rate: 48000, num_channels: 1, samples_per_channel: None, + item_id: None, }) ); @@ -254,6 +256,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { sample_rate: 48000, num_channels: 1, samples_per_channel: Some(960), + item_id: None, }), ) .await diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d30e5a3eafa..d486984f3be 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2605,6 +2605,9 @@ impl Session { if !matches!(msg, EventMsg::TurnComplete(_)) { return; } + if let Err(err) = self.conversation.handoff_complete().await { + debug!("failed to finalize realtime handoff output: {err}"); + } self.conversation.clear_active_handoff().await; } diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index fd1bb576b40..34ed7bcd63b 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2735,6 +2735,7 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() { sample_rate: 16_000, num_channels: 1, samples_per_channel: Some(160), + item_id: None, }, }), trace: None, diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 243f4d8f224..7b69d795d52 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -11,6 +11,8 @@ use crate::realtime_context::build_realtime_startup_context; use async_channel::Receiver; use async_channel::Sender; use async_channel::TrySendError; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_api::Provider as ApiProvider; use codex_api::RealtimeAudioFrame; use codex_api::RealtimeEvent; @@ -34,6 +36,7 @@ use codex_protocol::protocol::RealtimeHandoffRequested; use http::HeaderMap; use http::HeaderValue; use http::header::AUTHORIZATION; +use serde_json::Value; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -49,6 +52,8 @@ const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64; const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000; +const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str = + "Conversation already has an active response in progress:"; pub(crate) struct RealtimeConversationManager { state: Mutex>, @@ -58,19 +63,52 @@ pub(crate) struct RealtimeConversationManager { struct RealtimeHandoffState { output_tx: Sender, active_handoff: Arc>>, + last_output_text: Arc>>, + use_final_tool_output: bool, } #[derive(Debug, PartialEq, Eq)] -struct HandoffOutput { - handoff_id: String, - output_text: String, +enum HandoffOutput { + ImmediateAppend { + handoff_id: String, + output_text: String, + }, + FinalToolCall { + handoff_id: String, + output_text: String, + }, +} + +#[derive(Debug, PartialEq, Eq)] +struct OutputAudioState { + item_id: String, + audio_end_ms: u32, +} + +struct OutputAudioTruncate { + item_id: String, + content_index: u32, + audio_end_ms: u32, +} + +struct RealtimeInputTask { + writer: RealtimeWebsocketWriter, + events: RealtimeWebsocketEvents, + user_text_rx: Receiver, + handoff_output_rx: Receiver, + audio_rx: Receiver, + events_tx: Sender, + handoff_state: RealtimeHandoffState, + use_response_create_flow: bool, } impl RealtimeHandoffState { - fn new(output_tx: Sender) -> Self { + fn new(output_tx: Sender, use_final_tool_output: bool) -> Self { Self { output_tx, active_handoff: Arc::new(Mutex::new(None)), + last_output_text: Arc::new(Mutex::new(None)), + use_final_tool_output, } } @@ -79,8 +117,33 @@ impl RealtimeHandoffState { return Ok(()); }; + *self.last_output_text.lock().await = Some(output_text.clone()); + if !self.use_final_tool_output { + self.output_tx + .send(HandoffOutput::ImmediateAppend { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + } + Ok(()) + } + + async fn send_final_output(&self) -> CodexResult<()> { + if !self.use_final_tool_output { + return Ok(()); + } + + let Some(handoff_id) = self.active_handoff.lock().await.clone() else { + return Ok(()); + }; + let Some(output_text) = self.last_output_text.lock().await.clone() else { + return Ok(()); + }; + self.output_tx - .send(HandoffOutput { + .send(HandoffOutput::FinalToolCall { handoff_id, output_text, }) @@ -94,6 +157,7 @@ impl RealtimeHandoffState { struct ConversationState { audio_tx: Sender, user_text_tx: Sender, + writer: RealtimeWebsocketWriter, handoff: RealtimeHandoffState, task: JoinHandle<()>, realtime_active: Arc, @@ -129,6 +193,8 @@ impl RealtimeConversationManager { state.task.abort(); let _ = state.task.await; } + let use_response_create_flow = + session_config.event_parser == RealtimeEventParser::RealtimeV2; let client = RealtimeWebsocketClient::new(api_provider); let connection = client @@ -152,21 +218,23 @@ impl RealtimeConversationManager { async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); let realtime_active = Arc::new(AtomicBool::new(true)); - let handoff = RealtimeHandoffState::new(handoff_output_tx); - let task = spawn_realtime_input_task( - writer, + let handoff = RealtimeHandoffState::new(handoff_output_tx, use_response_create_flow); + let task = spawn_realtime_input_task(RealtimeInputTask { + writer: writer.clone(), events, user_text_rx, handoff_output_rx, audio_rx, events_tx, - handoff.clone(), - ); + handoff_state: handoff.clone(), + use_response_create_flow, + }); let mut guard = self.state.lock().await; *guard = Some(ConversationState { audio_tx, user_text_tx, + writer, handoff, task, realtime_active: Arc::clone(&realtime_active), @@ -231,6 +299,17 @@ impl RealtimeConversationManager { handoff.send_output(output_text).await } + pub(crate) async fn handoff_complete(&self) -> CodexResult<()> { + let handoff = { + let guard = self.state.lock().await; + guard.as_ref().map(|state| state.handoff.clone()) + }; + let Some(handoff) = handoff else { + return Ok(()); + }; + handoff.send_final_output().await + } + pub(crate) async fn active_handoff_id(&self) -> Option { let handoff = { let guard = self.state.lock().await; @@ -246,6 +325,7 @@ impl RealtimeConversationManager { }; if let Some(handoff) = handoff { *handoff.active_handoff.lock().await = None; + *handoff.last_output_text.lock().await = None; } } @@ -491,16 +571,23 @@ pub(crate) async fn handle_close(sess: &Arc, sub_id: String) { } } -fn spawn_realtime_input_task( - writer: RealtimeWebsocketWriter, - events: RealtimeWebsocketEvents, - user_text_rx: Receiver, - handoff_output_rx: Receiver, - audio_rx: Receiver, - events_tx: Sender, - handoff_state: RealtimeHandoffState, -) -> JoinHandle<()> { +fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { + let RealtimeInputTask { + writer, + events, + user_text_rx, + handoff_output_rx, + audio_rx, + events_tx, + handoff_state, + use_response_create_flow, + } = input; + tokio::spawn(async move { + let mut pending_response_create = false; + let mut response_in_progress = false; + let mut output_audio_state: Option = None; + loop { tokio::select! { text = user_text_rx.recv() => { @@ -511,23 +598,66 @@ fn spawn_realtime_input_task( warn!("failed to send input text: {mapped_error}"); break; } + if use_response_create_flow { + if response_in_progress { + pending_response_create = true; + } else if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + warn!("failed to send text response.create: {mapped_error}"); + break; + } else { + pending_response_create = false; + response_in_progress = true; + } + } } Err(_) => break, } } handoff_output = handoff_output_rx.recv() => { match handoff_output { - Ok(HandoffOutput { - handoff_id, - output_text, - }) => { - if let Err(err) = writer - .send_conversation_handoff_append(handoff_id, output_text) - .await - { - let mapped_error = map_api_error(err); - warn!("failed to send handoff output: {mapped_error}"); - break; + Ok(handoff_output) => { + match handoff_output { + HandoffOutput::ImmediateAppend { + handoff_id, + output_text, + } => { + if let Err(err) = writer + .send_conversation_handoff_append(handoff_id, output_text) + .await + { + let mapped_error = map_api_error(err); + warn!("failed to send handoff output: {mapped_error}"); + break; + } + } + HandoffOutput::FinalToolCall { + handoff_id, + output_text, + } => { + if let Err(err) = writer + .send_conversation_handoff_append(handoff_id, output_text) + .await + { + let mapped_error = map_api_error(err); + warn!("failed to send handoff output: {mapped_error}"); + break; + } + if use_response_create_flow { + if response_in_progress { + pending_response_create = true; + } else if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + warn!( + "failed to send handoff response.create: {mapped_error}" + ); + break; + } else { + pending_response_create = false; + response_in_progress = true; + } + } + } } } Err(_) => break, @@ -536,12 +666,98 @@ fn spawn_realtime_input_task( event = events.next_event() => { match event { Ok(Some(event)) => { - if let RealtimeEvent::HandoffRequested(handoff) = &event { - *handoff_state.active_handoff.lock().await = - Some(handoff.handoff_id.clone()); + let mut should_stop = false; + let mut forward_event = true; + + match &event { + RealtimeEvent::ConversationItemAdded(item) => { + match item.get("type").and_then(Value::as_str) { + Some("response.created") if use_response_create_flow => { + response_in_progress = true; + } + Some("response.done") if use_response_create_flow => { + response_in_progress = false; + output_audio_state = None; + if pending_response_create { + if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + warn!( + "failed to send deferred response.create: {mapped_error}" + ); + break; + } + pending_response_create = false; + response_in_progress = true; + } + } + _ => {} + } + } + RealtimeEvent::AudioOut(frame) => { + if use_response_create_flow { + update_output_audio_state(&mut output_audio_state, frame); + } + } + RealtimeEvent::InputAudioSpeechStarted(event) => { + if use_response_create_flow + && let Some(truncate) = output_audio_truncate_params( + &mut output_audio_state, + event.item_id.as_deref(), + ) + && let Err(err) = writer + .send_conversation_item_truncate( + truncate.item_id, + truncate.content_index, + truncate.audio_end_ms, + ) + .await + { + let mapped_error = map_api_error(err); + warn!("failed to truncate realtime audio: {mapped_error}"); + } + } + RealtimeEvent::ResponseCancelled(_) => { + response_in_progress = false; + output_audio_state = None; + if use_response_create_flow && pending_response_create { + if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + warn!( + "failed to send deferred response.create after cancellation: {mapped_error}" + ); + break; + } + pending_response_create = false; + response_in_progress = true; + } + } + RealtimeEvent::HandoffRequested(handoff) => { + *handoff_state.active_handoff.lock().await = + Some(handoff.handoff_id.clone()); + *handoff_state.last_output_text.lock().await = None; + response_in_progress = false; + output_audio_state = None; + } + RealtimeEvent::Error(message) + if use_response_create_flow + && message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) => + { + warn!( + "realtime rejected response.create because a response is already in progress; deferring follow-up response.create" + ); + pending_response_create = true; + response_in_progress = true; + forward_event = false; + } + RealtimeEvent::Error(_) => { + should_stop = true; + } + RealtimeEvent::SessionUpdated { .. } + | RealtimeEvent::InputTranscriptDelta(_) + | RealtimeEvent::OutputTranscriptDelta(_) + | RealtimeEvent::ConversationItemDone { .. } => {} } - let should_stop = matches!(&event, RealtimeEvent::Error(_)); - if events_tx.send(event).await.is_err() { + if forward_event && events_tx.send(event).await.is_err() { break; } if should_stop { @@ -588,6 +804,67 @@ fn spawn_realtime_input_task( }) } +fn update_output_audio_state( + output_audio_state: &mut Option, + frame: &RealtimeAudioFrame, +) { + let Some(item_id) = frame.item_id.clone() else { + return; + }; + let audio_end_ms = audio_duration_ms(frame); + if audio_end_ms == 0 { + return; + } + + if let Some(current) = output_audio_state.as_mut() + && current.item_id == item_id + { + current.audio_end_ms = current.audio_end_ms.saturating_add(audio_end_ms); + return; + } + + *output_audio_state = Some(OutputAudioState { + item_id, + audio_end_ms, + }); +} + +fn audio_duration_ms(frame: &RealtimeAudioFrame) -> u32 { + let Some(samples_per_channel) = frame + .samples_per_channel + .or_else(|| decoded_samples_per_channel(frame)) + else { + return 0; + }; + let sample_rate = u64::from(frame.sample_rate.max(1)); + ((u64::from(samples_per_channel) * 1_000) / sample_rate) as u32 +} + +fn decoded_samples_per_channel(frame: &RealtimeAudioFrame) -> Option { + let bytes = BASE64_STANDARD.decode(&frame.data).ok()?; + let channels = usize::from(frame.num_channels.max(1)); + let samples = bytes.len().checked_div(2)?.checked_div(channels)?; + u32::try_from(samples).ok() +} + +fn output_audio_truncate_params( + output_audio_state: &mut Option, + item_id: Option<&str>, +) -> Option { + let state = output_audio_state.take()?; + if let Some(item_id) = item_id + && item_id != state.item_id + { + return None; + } + + Some(OutputAudioTruncate { + item_id: state.item_id, + content_index: 0, + audio_end_ms: state.audio_end_ms, + }) +} + async fn send_conversation_error( sess: &Arc, sub_id: String, diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index d6b85a92daf..e15d5b1e728 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -57,7 +57,7 @@ fn ignores_empty_handoff_request_input_transcript() { #[tokio::test] async fn clears_active_handoff_explicitly() { let (tx, _rx) = bounded(1); - let state = RealtimeHandoffState::new(tx); + let state = RealtimeHandoffState::new(tx, false); *state.active_handoff.lock().await = Some("handoff_1".to_string()); assert_eq!( @@ -72,7 +72,7 @@ async fn clears_active_handoff_explicitly() { #[tokio::test] async fn sends_multiple_handoff_outputs_until_cleared() { let (tx, rx) = bounded(4); - let state = RealtimeHandoffState::new(tx); + let state = RealtimeHandoffState::new(tx, false); state .send_output("ignored".to_string()) @@ -90,7 +90,7 @@ async fn sends_multiple_handoff_outputs_until_cleared() { let output_1 = rx.recv().await.expect("recv"); assert_eq!( output_1, - HandoffOutput { + HandoffOutput::ImmediateAppend { handoff_id: "handoff_1".to_string(), output_text: "result".to_string(), } @@ -99,7 +99,7 @@ async fn sends_multiple_handoff_outputs_until_cleared() { let output_2 = rx.recv().await.expect("recv"); assert_eq!( output_2, - HandoffOutput { + HandoffOutput::ImmediateAppend { handoff_id: "handoff_1".to_string(), output_text: "result 2".to_string(), } diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 0d49f8c8d57..4ab98712147 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -176,6 +176,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { sample_rate: 24000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, })) .await?; @@ -409,6 +410,7 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> { sample_rate: 24000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, })) .await?; @@ -518,6 +520,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { sample_rate: 24000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, })) .await?; @@ -1469,6 +1472,7 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() - sample_rate: 24000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, })) .await?; @@ -1954,6 +1958,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> { sample_rate: 24000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, })) .await?; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index f1f60e163b5..152743b3e13 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -139,6 +139,8 @@ pub struct RealtimeAudioFrame { pub num_channels: u16, #[serde(skip_serializing_if = "Option::is_none")] pub samples_per_channel: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub item_id: Option, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] @@ -160,15 +162,27 @@ pub struct RealtimeHandoffRequested { pub active_transcript: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeInputAudioSpeechStarted { + pub item_id: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeResponseCancelled { + pub response_id: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] pub enum RealtimeEvent { SessionUpdated { session_id: String, instructions: Option, }, + InputAudioSpeechStarted(RealtimeInputAudioSpeechStarted), InputTranscriptDelta(RealtimeTranscriptDelta), OutputTranscriptDelta(RealtimeTranscriptDelta), AudioOut(RealtimeAudioFrame), + ResponseCancelled(RealtimeResponseCancelled), ConversationItemAdded(Value), ConversationItemDone { item_id: String, @@ -4078,6 +4092,7 @@ mod tests { sample_rate: 24_000, num_channels: 1, samples_per_channel: Some(480), + item_id: None, }, }); let start = Op::RealtimeConversationStart(ConversationStartParams { From bb87c8ecb1fdbfa18a2a1003433bcc605c155d1f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 12:26:00 -0700 Subject: [PATCH 03/17] Handle realtime audio events in UI clients Add the new realtime event cases needed by the wire/runtime parity PR and fill the new audio frame field so the stack compiles cleanly on current main. Co-authored-by: Codex --- codex-rs/tui/src/chatwidget/realtime.rs | 2 ++ codex-rs/tui/src/voice.rs | 1 + codex-rs/tui_app_server/src/chatwidget/realtime.rs | 12 ++++++++++++ codex-rs/tui_app_server/src/voice.rs | 1 + 4 files changed, 16 insertions(+) diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 4e4f2f0e709..463d0899d87 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -264,9 +264,11 @@ impl ChatWidget { RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } + RealtimeEvent::InputAudioSpeechStarted(_) => {} RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), + RealtimeEvent::ResponseCancelled(_) => {} RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {} RealtimeEvent::HandoffRequested(_) => {} diff --git a/codex-rs/tui/src/voice.rs b/codex-rs/tui/src/voice.rs index 7e4d8a85e8d..07adcfd0a88 100644 --- a/codex-rs/tui/src/voice.rs +++ b/codex-rs/tui/src/voice.rs @@ -428,6 +428,7 @@ fn send_realtime_audio_chunk( sample_rate: MODEL_AUDIO_SAMPLE_RATE, num_channels: MODEL_AUDIO_CHANNELS, samples_per_channel: Some(samples_per_channel), + item_id: None, }, }, ))); diff --git a/codex-rs/tui_app_server/src/chatwidget/realtime.rs b/codex-rs/tui_app_server/src/chatwidget/realtime.rs index 14a08a15554..559428c9c50 100644 --- a/codex-rs/tui_app_server/src/chatwidget/realtime.rs +++ b/codex-rs/tui_app_server/src/chatwidget/realtime.rs @@ -268,9 +268,11 @@ impl ChatWidget { RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } + RealtimeEvent::InputAudioSpeechStarted(_) => self.interrupt_realtime_audio_playback(), RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), + RealtimeEvent::ResponseCancelled(_) => self.interrupt_realtime_audio_playback(), RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {} RealtimeEvent::HandoffRequested(_) => {} @@ -310,6 +312,16 @@ impl ChatWidget { } } + #[cfg(not(target_os = "linux"))] + fn interrupt_realtime_audio_playback(&mut self) { + if let Some(player) = &self.realtime_conversation.audio_player { + player.clear(); + } + } + + #[cfg(target_os = "linux")] + fn interrupt_realtime_audio_playback(&mut self) {} + #[cfg(not(target_os = "linux"))] fn start_realtime_local_audio(&mut self) { if self.realtime_conversation.capture_stop_flag.is_some() { diff --git a/codex-rs/tui_app_server/src/voice.rs b/codex-rs/tui_app_server/src/voice.rs index f448c457346..6758eff4d64 100644 --- a/codex-rs/tui_app_server/src/voice.rs +++ b/codex-rs/tui_app_server/src/voice.rs @@ -426,6 +426,7 @@ fn send_realtime_audio_chunk( sample_rate: MODEL_AUDIO_SAMPLE_RATE, num_channels: MODEL_AUDIO_CHANNELS, samples_per_channel: Some(samples_per_channel), + item_id: None, }, }); } From 3c15083067b47b19ca6a63ada000c58f7ec32de9 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 12:57:48 -0700 Subject: [PATCH 04/17] Branch realtime flow on session kind Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 74 ++++++++++++++-------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 7b69d795d52..6c9530b37e2 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -59,12 +59,18 @@ pub(crate) struct RealtimeConversationManager { state: Mutex>, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum RealtimeSessionKind { + V1, + V2, +} + #[derive(Clone, Debug)] struct RealtimeHandoffState { output_tx: Sender, active_handoff: Arc>>, last_output_text: Arc>>, - use_final_tool_output: bool, + session_kind: RealtimeSessionKind, } #[derive(Debug, PartialEq, Eq)] @@ -99,16 +105,16 @@ struct RealtimeInputTask { audio_rx: Receiver, events_tx: Sender, handoff_state: RealtimeHandoffState, - use_response_create_flow: bool, + session_kind: RealtimeSessionKind, } impl RealtimeHandoffState { - fn new(output_tx: Sender, use_final_tool_output: bool) -> Self { + fn new(output_tx: Sender, session_kind: RealtimeSessionKind) -> Self { Self { output_tx, active_handoff: Arc::new(Mutex::new(None)), last_output_text: Arc::new(Mutex::new(None)), - use_final_tool_output, + session_kind, } } @@ -118,21 +124,27 @@ impl RealtimeHandoffState { }; *self.last_output_text.lock().await = Some(output_text.clone()); - if !self.use_final_tool_output { - self.output_tx - .send(HandoffOutput::ImmediateAppend { - handoff_id, - output_text, - }) - .await - .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + match self.session_kind { + RealtimeSessionKind::V1 => { + self.output_tx + .send(HandoffOutput::ImmediateAppend { + handoff_id, + output_text, + }) + .await + .map_err(|_| { + CodexErr::InvalidRequest("conversation is not running".to_string()) + })?; + } + RealtimeSessionKind::V2 => {} } Ok(()) } async fn send_final_output(&self) -> CodexResult<()> { - if !self.use_final_tool_output { - return Ok(()); + match self.session_kind { + RealtimeSessionKind::V1 => return Ok(()), + RealtimeSessionKind::V2 => {} } let Some(handoff_id) = self.active_handoff.lock().await.clone() else { @@ -193,8 +205,10 @@ impl RealtimeConversationManager { state.task.abort(); let _ = state.task.await; } - let use_response_create_flow = - session_config.event_parser == RealtimeEventParser::RealtimeV2; + let session_kind = match session_config.event_parser { + RealtimeEventParser::V1 => RealtimeSessionKind::V1, + RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2, + }; let client = RealtimeWebsocketClient::new(api_provider); let connection = client @@ -218,7 +232,7 @@ impl RealtimeConversationManager { async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); let realtime_active = Arc::new(AtomicBool::new(true)); - let handoff = RealtimeHandoffState::new(handoff_output_tx, use_response_create_flow); + let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind); let task = spawn_realtime_input_task(RealtimeInputTask { writer: writer.clone(), events, @@ -227,7 +241,7 @@ impl RealtimeConversationManager { audio_rx, events_tx, handoff_state: handoff.clone(), - use_response_create_flow, + session_kind, }); let mut guard = self.state.lock().await; @@ -580,7 +594,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { audio_rx, events_tx, handoff_state, - use_response_create_flow, + session_kind, } = input; tokio::spawn(async move { @@ -598,7 +612,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { warn!("failed to send input text: {mapped_error}"); break; } - if use_response_create_flow { + if matches!(session_kind, RealtimeSessionKind::V2) { if response_in_progress { pending_response_create = true; } else if let Err(err) = writer.send_response_create().await { @@ -643,7 +657,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { warn!("failed to send handoff output: {mapped_error}"); break; } - if use_response_create_flow { + if matches!(session_kind, RealtimeSessionKind::V2) { if response_in_progress { pending_response_create = true; } else if let Err(err) = writer.send_response_create().await { @@ -672,10 +686,14 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { match &event { RealtimeEvent::ConversationItemAdded(item) => { match item.get("type").and_then(Value::as_str) { - Some("response.created") if use_response_create_flow => { + Some("response.created") + if matches!(session_kind, RealtimeSessionKind::V2) => + { response_in_progress = true; } - Some("response.done") if use_response_create_flow => { + Some("response.done") + if matches!(session_kind, RealtimeSessionKind::V2) => + { response_in_progress = false; output_audio_state = None; if pending_response_create { @@ -694,12 +712,12 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { } } RealtimeEvent::AudioOut(frame) => { - if use_response_create_flow { + if matches!(session_kind, RealtimeSessionKind::V2) { update_output_audio_state(&mut output_audio_state, frame); } } RealtimeEvent::InputAudioSpeechStarted(event) => { - if use_response_create_flow + if matches!(session_kind, RealtimeSessionKind::V2) && let Some(truncate) = output_audio_truncate_params( &mut output_audio_state, event.item_id.as_deref(), @@ -719,7 +737,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { RealtimeEvent::ResponseCancelled(_) => { response_in_progress = false; output_audio_state = None; - if use_response_create_flow && pending_response_create { + if matches!(session_kind, RealtimeSessionKind::V2) + && pending_response_create + { if let Err(err) = writer.send_response_create().await { let mapped_error = map_api_error(err); warn!( @@ -739,7 +759,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { output_audio_state = None; } RealtimeEvent::Error(message) - if use_response_create_flow + if matches!(session_kind, RealtimeSessionKind::V2) && message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) => { warn!( From 6475be2bab8ab86d3664d2fbb8239f1f4afd4648 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 15:31:19 -0700 Subject: [PATCH 05/17] codex: fix CI failure on PR #14827 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation_tests.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index e15d5b1e728..6b021ed2022 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -1,5 +1,6 @@ use super::HandoffOutput; use super::RealtimeHandoffState; +use super::RealtimeSessionKind; use super::realtime_text_from_handoff_request; use async_channel::bounded; use codex_protocol::protocol::RealtimeHandoffRequested; @@ -57,7 +58,7 @@ fn ignores_empty_handoff_request_input_transcript() { #[tokio::test] async fn clears_active_handoff_explicitly() { let (tx, _rx) = bounded(1); - let state = RealtimeHandoffState::new(tx, false); + let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1); *state.active_handoff.lock().await = Some("handoff_1".to_string()); assert_eq!( @@ -72,7 +73,7 @@ async fn clears_active_handoff_explicitly() { #[tokio::test] async fn sends_multiple_handoff_outputs_until_cleared() { let (tx, rx) = bounded(4); - let state = RealtimeHandoffState::new(tx, false); + let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1); state .send_output("ignored".to_string()) From 55c550532768caba2dfad4d9c37452c5aaeca75a Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 15:50:11 -0700 Subject: [PATCH 06/17] Remove typed realtime truncate message Co-authored-by: Codex --- .../endpoint/realtime_websocket/methods.rs | 42 +++++------------ .../endpoint/realtime_websocket/protocol.rs | 6 --- codex-rs/core/src/realtime_conversation.rs | 46 ++++++------------- 3 files changed, 24 insertions(+), 70 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 22fb3276a14..5dce51c0112 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -20,7 +20,9 @@ use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; +use serde::Serialize; use std::collections::HashMap; +use std::fmt::Debug; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -235,17 +237,6 @@ impl RealtimeWebsocketConnection { self.writer.send_response_create().await } - pub async fn send_conversation_item_truncate( - &self, - item_id: String, - content_index: u32, - audio_end_ms: u32, - ) -> Result<(), ApiError> { - self.writer - .send_conversation_item_truncate(item_id, content_index, audio_end_ms) - .await - } - pub async fn close(&self) -> Result<(), ApiError> { self.writer.close().await } @@ -287,12 +278,12 @@ impl RealtimeWebsocketConnection { impl RealtimeWebsocketWriter { pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::InputAudioBufferAppend { audio: frame.data }) + self.send_json(&RealtimeOutboundMessage::InputAudioBufferAppend { audio: frame.data }) .await } pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> { - self.send_json(conversation_item_create_message(self.event_parser, text)) + self.send_json(&conversation_item_create_message(self.event_parser, text)) .await } @@ -301,7 +292,7 @@ impl RealtimeWebsocketWriter { handoff_id: String, output_text: String, ) -> Result<(), ApiError> { - self.send_json(conversation_handoff_append_message( + self.send_json(&conversation_handoff_append_message( self.event_parser, handoff_id, output_text, @@ -310,24 +301,10 @@ impl RealtimeWebsocketWriter { } pub async fn send_response_create(&self) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::ResponseCreate) + self.send_json(&RealtimeOutboundMessage::ResponseCreate) .await } - pub async fn send_conversation_item_truncate( - &self, - item_id: String, - content_index: u32, - audio_end_ms: u32, - ) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::ConversationItemTruncate { - item_id, - content_index, - audio_end_ms, - }) - .await - } - pub async fn send_session_update( &self, instructions: String, @@ -343,7 +320,7 @@ impl RealtimeWebsocketWriter { has_tools = session.tools.is_some(), "realtime websocket prepared session.update" ); - self.send_json(RealtimeOutboundMessage::SessionUpdate { session }) + self.send_json(&RealtimeOutboundMessage::SessionUpdate { session }) .await } @@ -361,7 +338,10 @@ impl RealtimeWebsocketWriter { Ok(()) } - async fn send_json(&self, message: RealtimeOutboundMessage) -> Result<(), ApiError> { + pub async fn send_json(&self, message: &T) -> Result<(), ApiError> + where + T: Serialize + Debug, + { let payload = serde_json::to_string(&message) .map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?; debug!(?message, "realtime websocket request"); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index a6bef882ea6..7ea7e563c9e 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -41,12 +41,6 @@ pub(super) enum RealtimeOutboundMessage { }, #[serde(rename = "response.create")] ResponseCreate, - #[serde(rename = "conversation.item.truncate")] - ConversationItemTruncate { - item_id: String, - content_index: u32, - audio_end_ms: u32, - }, #[serde(rename = "session.update")] SessionUpdate { session: SessionUpdateSession }, #[serde(rename = "conversation.item.create")] diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 6c9530b37e2..76bbbd2d285 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -37,6 +37,7 @@ use http::HeaderMap; use http::HeaderValue; use http::header::AUTHORIZATION; use serde_json::Value; +use serde_json::json; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -91,12 +92,6 @@ struct OutputAudioState { audio_end_ms: u32, } -struct OutputAudioTruncate { - item_id: String, - content_index: u32, - audio_end_ms: u32, -} - struct RealtimeInputTask { writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, @@ -718,16 +713,19 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { } RealtimeEvent::InputAudioSpeechStarted(event) => { if matches!(session_kind, RealtimeSessionKind::V2) - && let Some(truncate) = output_audio_truncate_params( - &mut output_audio_state, - event.item_id.as_deref(), - ) + && let Some(output_audio_state) = + output_audio_state.take() + && event + .item_id + .as_deref() + .is_none_or(|item_id| item_id == output_audio_state.item_id) && let Err(err) = writer - .send_conversation_item_truncate( - truncate.item_id, - truncate.content_index, - truncate.audio_end_ms, - ) + .send_json(&json!({ + "type": "conversation.item.truncate", + "item_id": output_audio_state.item_id, + "content_index": 0, + "audio_end_ms": output_audio_state.audio_end_ms, + })) .await { let mapped_error = map_api_error(err); @@ -867,24 +865,6 @@ fn decoded_samples_per_channel(frame: &RealtimeAudioFrame) -> Option { u32::try_from(samples).ok() } -fn output_audio_truncate_params( - output_audio_state: &mut Option, - item_id: Option<&str>, -) -> Option { - let state = output_audio_state.take()?; - if let Some(item_id) = item_id - && item_id != state.item_id - { - return None; - } - - Some(OutputAudioTruncate { - item_id: state.item_id, - content_index: 0, - audio_end_ms: state.audio_end_ms, - }) -} - async fn send_conversation_error( sess: &Arc, sub_id: String, From 03c7c4d1446d312142e0e29baf7f50d932477e0a Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 15:56:18 -0700 Subject: [PATCH 07/17] Remove realtime debug logging Co-authored-by: Codex --- .../endpoint/realtime_websocket/methods.rs | 45 +++++-------------- codex-rs/core/src/realtime_conversation.rs | 6 +-- 2 files changed, 11 insertions(+), 40 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 5dce51c0112..6f437fa410f 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -20,9 +20,8 @@ use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; -use serde::Serialize; +use serde_json::Value; use std::collections::HashMap; -use std::fmt::Debug; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -35,7 +34,6 @@ use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Error as WsError; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; -use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; @@ -74,7 +72,6 @@ impl WsStream { }; match command { WsCommand::Send { message, tx_result } => { - debug!("realtime websocket sending message"); let result = inner.send(message).await; let should_break = result.is_err(); if let Err(err) = &result { @@ -305,6 +302,10 @@ impl RealtimeWebsocketWriter { .await } + pub async fn send_json_value(&self, message: Value) -> Result<(), ApiError> { + self.send_payload(message.to_string()).await + } + pub async fn send_session_update( &self, instructions: String, @@ -312,14 +313,6 @@ impl RealtimeWebsocketWriter { ) -> Result<(), ApiError> { let session_mode = normalized_session_mode(self.event_parser, session_mode); let session = session_update_session(self.event_parser, instructions, session_mode); - debug!( - event_parser = ?self.event_parser, - session_mode = ?session_mode, - instructions_len = session.instructions.as_ref().map(String::len).unwrap_or_default(), - has_output_audio = session.audio.output.is_some(), - has_tools = session.tools.is_some(), - "realtime websocket prepared session.update" - ); self.send_json(&RealtimeOutboundMessage::SessionUpdate { session }) .await } @@ -338,14 +331,13 @@ impl RealtimeWebsocketWriter { Ok(()) } - pub async fn send_json(&self, message: &T) -> Result<(), ApiError> - where - T: Serialize + Debug, - { - let payload = serde_json::to_string(&message) + async fn send_json(&self, message: &RealtimeOutboundMessage) -> Result<(), ApiError> { + let payload = serde_json::to_string(message) .map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?; - debug!(?message, "realtime websocket request"); + self.send_payload(payload).await + } + async fn send_payload(&self, payload: String) -> Result<(), ApiError> { if self.is_closed.load(Ordering::SeqCst) { return Err(ApiError::Stream( "realtime websocket connection is closed".to_string(), @@ -387,10 +379,8 @@ impl RealtimeWebsocketEvents { Message::Text(text) => { if let Some(mut event) = parse_realtime_event(&text, self.event_parser) { self.update_active_transcript(&mut event).await; - debug!(?event, "realtime websocket parsed event"); return Ok(Some(event)); } - debug!("realtime websocket ignored unsupported text frame"); } Message::Close(frame) => { self.is_closed.store(true, Ordering::SeqCst); @@ -422,13 +412,6 @@ impl RealtimeWebsocketEvents { append_transcript_delta(&mut active_transcript.entries, "assistant", delta); } RealtimeEvent::HandoffRequested(handoff) => { - debug!( - handoff_id = handoff.handoff_id, - item_id = handoff.item_id, - input_len = handoff.input_transcript.len(), - transcript_entries = active_transcript.entries.len(), - "realtime websocket parsed codex function call" - ); handoff.active_transcript = std::mem::take(&mut active_transcript.entries); } RealtimeEvent::SessionUpdated { .. } @@ -516,14 +499,6 @@ impl RealtimeWebsocketClient { let (stream, rx_message) = WsStream::new(stream); let connection = RealtimeWebsocketConnection::new(stream, rx_message, config.event_parser); - debug!( - event_parser = ?config.event_parser, - session_mode = ?config.session_mode, - model = config.model.as_deref().unwrap_or(""), - session_id = config.session_id.as_deref().unwrap_or(""), - instructions_len = config.instructions.len(), - "realtime websocket sending session.update" - ); connection .writer .send_session_update(config.instructions, config.session_mode) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 76bbbd2d285..829778e4189 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -43,7 +43,6 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; @@ -449,7 +448,6 @@ pub(crate) async fn handle_start( _ => None, }; if let Some(text) = maybe_routed_text { - debug!(text = %text, "[realtime-text] realtime conversation text output"); let sess_for_routed_text = Arc::clone(&sess_clone); sess_for_routed_text.route_realtime_text_input(text).await; } @@ -555,8 +553,6 @@ pub(crate) async fn handle_text( sub_id: String, params: ConversationTextParams, ) { - debug!(text = %params.text, "[realtime-text] appending realtime conversation text input"); - if let Err(err) = sess.conversation.text_in(params.text).await { error!("failed to append realtime text: {err}"); send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; @@ -720,7 +716,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { .as_deref() .is_none_or(|item_id| item_id == output_audio_state.item_id) && let Err(err) = writer - .send_json(&json!({ + .send_json_value(json!({ "type": "conversation.item.truncate", "item_id": output_audio_state.item_id, "content_index": 0, From 062857c543a070ceff3f8dc738f0e86aad0d6d7c Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:13:35 -0700 Subject: [PATCH 08/17] Inline thin realtime runtime helpers Co-authored-by: Codex --- .../endpoint/realtime_websocket/methods.rs | 11 +-- codex-rs/core/src/realtime_conversation.rs | 88 ++++++++----------- 2 files changed, 39 insertions(+), 60 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 6f437fa410f..bcbd356c670 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -20,7 +20,6 @@ use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; -use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -230,10 +229,6 @@ impl RealtimeWebsocketConnection { .await } - pub async fn send_response_create(&self) -> Result<(), ApiError> { - self.writer.send_response_create().await - } - pub async fn close(&self) -> Result<(), ApiError> { self.writer.close().await } @@ -302,10 +297,6 @@ impl RealtimeWebsocketWriter { .await } - pub async fn send_json_value(&self, message: Value) -> Result<(), ApiError> { - self.send_payload(message.to_string()).await - } - pub async fn send_session_update( &self, instructions: String, @@ -337,7 +328,7 @@ impl RealtimeWebsocketWriter { self.send_payload(payload).await } - async fn send_payload(&self, payload: String) -> Result<(), ApiError> { + pub async fn send_payload(&self, payload: String) -> Result<(), ApiError> { if self.is_closed.load(Ordering::SeqCst) { return Err(ApiError::Stream( "realtime websocket connection is closed".to_string(), diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 829778e4189..4584c7ff669 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -111,52 +111,6 @@ impl RealtimeHandoffState { session_kind, } } - - async fn send_output(&self, output_text: String) -> CodexResult<()> { - let Some(handoff_id) = self.active_handoff.lock().await.clone() else { - return Ok(()); - }; - - *self.last_output_text.lock().await = Some(output_text.clone()); - match self.session_kind { - RealtimeSessionKind::V1 => { - self.output_tx - .send(HandoffOutput::ImmediateAppend { - handoff_id, - output_text, - }) - .await - .map_err(|_| { - CodexErr::InvalidRequest("conversation is not running".to_string()) - })?; - } - RealtimeSessionKind::V2 => {} - } - Ok(()) - } - - async fn send_final_output(&self) -> CodexResult<()> { - match self.session_kind { - RealtimeSessionKind::V1 => return Ok(()), - RealtimeSessionKind::V2 => {} - } - - let Some(handoff_id) = self.active_handoff.lock().await.clone() else { - return Ok(()); - }; - let Some(output_text) = self.last_output_text.lock().await.clone() else { - return Ok(()); - }; - - self.output_tx - .send(HandoffOutput::FinalToolCall { - handoff_id, - output_text, - }) - .await - .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; - Ok(()) - } } #[allow(dead_code)] @@ -304,7 +258,22 @@ impl RealtimeConversationManager { state.handoff.clone() }; - handoff.send_output(output_text).await + let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else { + return Ok(()); + }; + + *handoff.last_output_text.lock().await = Some(output_text.clone()); + if matches!(handoff.session_kind, RealtimeSessionKind::V1) { + handoff + .output_tx + .send(HandoffOutput::ImmediateAppend { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + } + Ok(()) } pub(crate) async fn handoff_complete(&self) -> CodexResult<()> { @@ -315,7 +284,25 @@ impl RealtimeConversationManager { let Some(handoff) = handoff else { return Ok(()); }; - handoff.send_final_output().await + if matches!(handoff.session_kind, RealtimeSessionKind::V1) { + return Ok(()); + } + + let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else { + return Ok(()); + }; + let Some(output_text) = handoff.last_output_text.lock().await.clone() else { + return Ok(()); + }; + + handoff + .output_tx + .send(HandoffOutput::FinalToolCall { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string())) } pub(crate) async fn active_handoff_id(&self) -> Option { @@ -716,12 +703,13 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { .as_deref() .is_none_or(|item_id| item_id == output_audio_state.item_id) && let Err(err) = writer - .send_json_value(json!({ + .send_payload(json!({ "type": "conversation.item.truncate", "item_id": output_audio_state.item_id, "content_index": 0, "audio_end_ms": output_audio_state.audio_end_ms, - })) + }) + .to_string()) .await { let mapped_error = map_api_error(err); From 8b3ee3b3eb30ded335b565ea899f5215115838a7 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:27:54 -0700 Subject: [PATCH 09/17] Restore preexisting realtime debug logging Co-authored-by: Codex --- .../codex-api/src/endpoint/realtime_websocket/methods.rs | 9 +++++++++ codex-rs/core/src/realtime_conversation.rs | 3 +++ 2 files changed, 12 insertions(+) diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index bcbd356c670..fe83c751a21 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -33,6 +33,7 @@ use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Error as WsError; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; @@ -71,6 +72,7 @@ impl WsStream { }; match command { WsCommand::Send { message, tx_result } => { + debug!("realtime websocket sending message"); let result = inner.send(message).await; let should_break = result.is_err(); if let Err(err) = &result { @@ -325,6 +327,7 @@ impl RealtimeWebsocketWriter { async fn send_json(&self, message: &RealtimeOutboundMessage) -> Result<(), ApiError> { let payload = serde_json::to_string(message) .map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?; + debug!(?message, "realtime websocket request"); self.send_payload(payload).await } @@ -370,8 +373,10 @@ impl RealtimeWebsocketEvents { Message::Text(text) => { if let Some(mut event) = parse_realtime_event(&text, self.event_parser) { self.update_active_transcript(&mut event).await; + debug!(?event, "realtime websocket parsed event"); return Ok(Some(event)); } + debug!("realtime websocket ignored unsupported text frame"); } Message::Close(frame) => { self.is_closed.store(true, Ordering::SeqCst); @@ -490,6 +495,10 @@ impl RealtimeWebsocketClient { let (stream, rx_message) = WsStream::new(stream); let connection = RealtimeWebsocketConnection::new(stream, rx_message, config.event_parser); + debug!( + session_id = config.session_id.as_deref().unwrap_or(""), + "realtime websocket sending session.update" + ); connection .writer .send_session_update(config.instructions, config.session_mode) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 4584c7ff669..938f922f877 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -43,6 +43,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::task::JoinHandle; +use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; @@ -435,6 +436,7 @@ pub(crate) async fn handle_start( _ => None, }; if let Some(text) = maybe_routed_text { + debug!(text = %text, "[realtime-text] realtime conversation text output"); let sess_for_routed_text = Arc::clone(&sess_clone); sess_for_routed_text.route_realtime_text_input(text).await; } @@ -540,6 +542,7 @@ pub(crate) async fn handle_text( sub_id: String, params: ConversationTextParams, ) { + debug!(text = %params.text, "[realtime-text] appending realtime conversation text input"); if let Err(err) = sess.conversation.text_in(params.text).await { error!("failed to append realtime text: {err}"); send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; From 81c173c4cf9e861f940a6df8f787cf2613e3ba3c Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:39:23 -0700 Subject: [PATCH 10/17] Strongly type realtime websocket outbound types Co-authored-by: Codex --- .../realtime_websocket/methods_common.rs | 1 - .../endpoint/realtime_websocket/methods_v1.rs | 18 ++--- .../endpoint/realtime_websocket/methods_v2.rs | 36 +++++----- .../endpoint/realtime_websocket/protocol.rs | 72 ++++++++++++++++--- 4 files changed, 92 insertions(+), 35 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs index 4a5013c6565..48f21964a89 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs @@ -12,7 +12,6 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000; -pub(super) const REALTIME_AUDIO_FORMAT: &str = "audio/pcm"; pub(super) fn normalized_session_mode( event_parser: RealtimeEventParser, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs index 429d06b0053..b31899ff8d7 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs @@ -1,25 +1,27 @@ -use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT; use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE; +use crate::endpoint::realtime_websocket::protocol::AudioFormatType; +use crate::endpoint::realtime_websocket::protocol::ConversationContentType; use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; +use crate::endpoint::realtime_websocket::protocol::ConversationItemType; use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; +use crate::endpoint::realtime_websocket::protocol::ConversationRole; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::SessionAudio; use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; +use crate::endpoint::realtime_websocket::protocol::SessionType; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; -const REALTIME_V1_SESSION_TYPE: &str = "quicksilver"; - pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { item: ConversationItemPayload::Message(ConversationMessageItem { - kind: "message".to_string(), - role: "user".to_string(), + r#type: ConversationItemType::Message, + role: ConversationRole::User, content: vec![ConversationItemContent { - kind: "text".to_string(), + r#type: ConversationContentType::Text, text, }], }), @@ -38,13 +40,13 @@ pub(super) fn conversation_handoff_append_message( pub(super) fn session_update_session(instructions: String) -> SessionUpdateSession { SessionUpdateSession { - kind: REALTIME_V1_SESSION_TYPE.to_string(), + r#type: SessionType::Quicksilver, instructions: Some(instructions), output_modalities: None, audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { - kind: REALTIME_AUDIO_FORMAT.to_string(), + r#type: AudioFormatType::AudioPcm, rate: REALTIME_AUDIO_SAMPLE_RATE, }, noise_reduction: None, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs index 50de80610e7..afff680c132 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs @@ -1,9 +1,13 @@ -use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT; use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE; +use crate::endpoint::realtime_websocket::protocol::AudioFormatType; +use crate::endpoint::realtime_websocket::protocol::ConversationContentType; use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem; use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; +use crate::endpoint::realtime_websocket::protocol::ConversationItemType; use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; +use crate::endpoint::realtime_websocket::protocol::ConversationRole; +use crate::endpoint::realtime_websocket::protocol::NoiseReductionType; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; use crate::endpoint::realtime_websocket::protocol::SessionAudio; @@ -14,25 +18,25 @@ use crate::endpoint::realtime_websocket::protocol::SessionAudioOutputFormat; use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice; use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool; use crate::endpoint::realtime_websocket::protocol::SessionNoiseReduction; +use crate::endpoint::realtime_websocket::protocol::SessionToolType; use crate::endpoint::realtime_websocket::protocol::SessionTurnDetection; +use crate::endpoint::realtime_websocket::protocol::SessionType; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; +use crate::endpoint::realtime_websocket::protocol::TurnDetectionType; use serde_json::json; -const REALTIME_V2_NOISE_REDUCTION: &str = "near_field"; -const REALTIME_V2_TURN_DETECTION: &str = "server_vad"; const REALTIME_V2_OUTPUT_MODALITY_AUDIO: &str = "audio"; const REALTIME_V2_TOOL_CHOICE: &str = "auto"; -const REALTIME_V2_SESSION_TYPE: &str = "realtime"; const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex"; const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate a request to Codex and return the final result to the user. Use this as the default action. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later."; pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { item: ConversationItemPayload::Message(ConversationMessageItem { - kind: "message".to_string(), - role: "user".to_string(), + r#type: ConversationItemType::Message, + role: ConversationRole::User, content: vec![ConversationItemContent { - kind: "input_text".to_string(), + r#type: ConversationContentType::InputText, text, }], }), @@ -45,7 +49,7 @@ pub(super) fn conversation_handoff_append_message( ) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { item: ConversationItemPayload::FunctionCallOutput(ConversationFunctionCallOutputItem { - kind: "function_call_output".to_string(), + r#type: ConversationItemType::FunctionCallOutput, call_id: handoff_id, output: output_text, }), @@ -58,34 +62,34 @@ pub(super) fn session_update_session( ) -> SessionUpdateSession { match session_mode { RealtimeSessionMode::Conversational => SessionUpdateSession { - kind: REALTIME_V2_SESSION_TYPE.to_string(), + r#type: SessionType::Realtime, instructions: Some(instructions), output_modalities: Some(vec![REALTIME_V2_OUTPUT_MODALITY_AUDIO.to_string()]), audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { - kind: REALTIME_AUDIO_FORMAT.to_string(), + r#type: AudioFormatType::AudioPcm, rate: REALTIME_AUDIO_SAMPLE_RATE, }, noise_reduction: Some(SessionNoiseReduction { - kind: REALTIME_V2_NOISE_REDUCTION.to_string(), + r#type: NoiseReductionType::NearField, }), turn_detection: Some(SessionTurnDetection { - kind: REALTIME_V2_TURN_DETECTION.to_string(), + r#type: TurnDetectionType::ServerVad, interrupt_response: true, create_response: true, }), }, output: Some(SessionAudioOutput { format: Some(SessionAudioOutputFormat { - kind: REALTIME_AUDIO_FORMAT.to_string(), + r#type: AudioFormatType::AudioPcm, rate: REALTIME_AUDIO_SAMPLE_RATE, }), voice: SessionAudioVoice::Marin, }), }, tools: Some(vec![SessionFunctionTool { - kind: "function".to_string(), + r#type: SessionToolType::Function, name: REALTIME_V2_CODEX_TOOL_NAME.to_string(), description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(), parameters: json!({ @@ -103,13 +107,13 @@ pub(super) fn session_update_session( tool_choice: Some(REALTIME_V2_TOOL_CHOICE.to_string()), }, RealtimeSessionMode::Transcription => SessionUpdateSession { - kind: "transcription".to_string(), + r#type: SessionType::Transcription, instructions: None, output_modalities: None, audio: SessionAudio { input: SessionAudioInput { format: SessionAudioFormat { - kind: REALTIME_AUDIO_FORMAT.to_string(), + r#type: AudioFormatType::AudioPcm, rate: REALTIME_AUDIO_SAMPLE_RATE, }, noise_reduction: None, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index 7ea7e563c9e..2c629249fa3 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -50,7 +50,7 @@ pub(super) enum RealtimeOutboundMessage { #[derive(Debug, Clone, Serialize)] pub(super) struct SessionUpdateSession { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: SessionType, #[serde(skip_serializing_if = "Option::is_none")] pub(super) instructions: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -62,6 +62,14 @@ pub(super) struct SessionUpdateSession { pub(super) tool_choice: Option, } +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum SessionType { + Quicksilver, + Realtime, + Transcription, +} + #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudio { pub(super) input: SessionAudioInput, @@ -81,10 +89,16 @@ pub(super) struct SessionAudioInput { #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudioFormat { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: AudioFormatType, pub(super) rate: u32, } +#[derive(Debug, Clone, Copy, Serialize)] +pub(super) enum AudioFormatType { + #[serde(rename = "audio/pcm")] + AudioPcm, +} + #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudioOutput { #[serde(skip_serializing_if = "Option::is_none")] @@ -103,32 +117,57 @@ pub(super) enum SessionAudioVoice { #[derive(Debug, Clone, Serialize)] pub(super) struct SessionNoiseReduction { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: NoiseReductionType, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum NoiseReductionType { + NearField, } #[derive(Debug, Clone, Serialize)] pub(super) struct SessionTurnDetection { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: TurnDetectionType, pub(super) interrupt_response: bool, pub(super) create_response: bool, } +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum TurnDetectionType { + ServerVad, +} + #[derive(Debug, Clone, Serialize)] pub(super) struct SessionAudioOutputFormat { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: AudioFormatType, pub(super) rate: u32, } #[derive(Debug, Clone, Serialize)] pub(super) struct ConversationMessageItem { #[serde(rename = "type")] - pub(super) kind: String, - pub(super) role: String, + pub(super) r#type: ConversationItemType, + pub(super) role: ConversationRole, pub(super) content: Vec, } +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum ConversationItemType { + Message, + FunctionCallOutput, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum ConversationRole { + User, +} + #[derive(Debug, Clone, Serialize)] #[serde(untagged)] pub(super) enum ConversationItemPayload { @@ -139,7 +178,7 @@ pub(super) enum ConversationItemPayload { #[derive(Debug, Clone, Serialize)] pub(super) struct ConversationFunctionCallOutputItem { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: ConversationItemType, pub(super) call_id: String, pub(super) output: String, } @@ -147,19 +186,32 @@ pub(super) struct ConversationFunctionCallOutputItem { #[derive(Debug, Clone, Serialize)] pub(super) struct ConversationItemContent { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: ConversationContentType, pub(super) text: String, } +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum ConversationContentType { + Text, + InputText, +} + #[derive(Debug, Clone, Serialize)] pub(super) struct SessionFunctionTool { #[serde(rename = "type")] - pub(super) kind: String, + pub(super) r#type: SessionToolType, pub(super) name: String, pub(super) description: String, pub(super) parameters: Value, } +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum SessionToolType { + Function, +} + pub(super) fn parse_realtime_event( payload: &str, event_parser: RealtimeEventParser, From 052163350d9668518c002daa49cc43cfdb1a4ecc Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:49:53 -0700 Subject: [PATCH 11/17] Remove extra multi-agent status allow Co-authored-by: Codex --- codex-rs/tui_app_server/src/multi_agents.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/codex-rs/tui_app_server/src/multi_agents.rs b/codex-rs/tui_app_server/src/multi_agents.rs index 61907f67fb0..672e20e1a28 100644 --- a/codex-rs/tui_app_server/src/multi_agents.rs +++ b/codex-rs/tui_app_server/src/multi_agents.rs @@ -537,8 +537,6 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> { status_summary_spans(status).into() } -// Allow `.yellow()` -#[allow(clippy::disallowed_methods)] fn status_summary_spans(status: &AgentStatus) -> Vec> { match status { AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()], From b165b0bcc2f6d61d3f5e4a95bd53e459d339d591 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:57:58 -0700 Subject: [PATCH 12/17] Drop stale realtime handoff unit test Co-authored-by: Codex --- .../core/src/realtime_conversation_tests.rs | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index 6b021ed2022..0a32d063c06 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -1,4 +1,3 @@ -use super::HandoffOutput; use super::RealtimeHandoffState; use super::RealtimeSessionKind; use super::realtime_text_from_handoff_request; @@ -69,47 +68,3 @@ async fn clears_active_handoff_explicitly() { *state.active_handoff.lock().await = None; assert_eq!(state.active_handoff.lock().await.clone(), None); } - -#[tokio::test] -async fn sends_multiple_handoff_outputs_until_cleared() { - let (tx, rx) = bounded(4); - let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1); - - state - .send_output("ignored".to_string()) - .await - .expect("send"); - assert!(rx.is_empty()); - - *state.active_handoff.lock().await = Some("handoff_1".to_string()); - state.send_output("result".to_string()).await.expect("send"); - state - .send_output("result 2".to_string()) - .await - .expect("send"); - - let output_1 = rx.recv().await.expect("recv"); - assert_eq!( - output_1, - HandoffOutput::ImmediateAppend { - handoff_id: "handoff_1".to_string(), - output_text: "result".to_string(), - } - ); - - let output_2 = rx.recv().await.expect("recv"); - assert_eq!( - output_2, - HandoffOutput::ImmediateAppend { - handoff_id: "handoff_1".to_string(), - output_text: "result 2".to_string(), - } - ); - - *state.active_handoff.lock().await = None; - state - .send_output("ignored after clear".to_string()) - .await - .expect("send"); - assert!(rx.is_empty()); -} From 9170fe1bac60879fa5ad455ed3e2167fcf7a7db0 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 19:47:07 -0700 Subject: [PATCH 13/17] Update realtime v2 app-server e2e for response.create Co-authored-by: Codex --- .../app-server/tests/suite/v2/realtime_conversation.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 7d472c50361..71b6d6dcf33 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -70,6 +70,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { "message": "upstream boom" }), ], + vec![], ]]) .await; @@ -192,7 +193,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { let connections = realtime_server.connections(); assert_eq!(connections.len(), 1); let connection = &connections[0]; - assert_eq!(connection.len(), 3); + assert_eq!(connection.len(), 4); assert_eq!( connection[0].body_json()["type"].as_str(), Some("session.update") @@ -212,6 +213,10 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { .as_str() .context("expected websocket request type")? .to_string(), + connection[3].body_json()["type"] + .as_str() + .context("expected websocket request type")? + .to_string(), ]; request_types.sort(); assert_eq!( @@ -219,6 +224,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { [ "conversation.item.create".to_string(), "input_audio_buffer.append".to_string(), + "response.create".to_string(), ] ); From 2b21271d3caec5fabd1da65f3203a3478df55c04 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 10:49:11 -0700 Subject: [PATCH 14/17] Add current thread context to realtime startup Extend the realtime startup context with a bounded summary of the latest user and assistant turns from the active thread for better continuity. Co-authored-by: Codex --- codex-rs/core/src/realtime_context.rs | 107 +++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs index 80167635144..f2d603425e6 100644 --- a/codex-rs/core/src/realtime_context.rs +++ b/codex-rs/core/src/realtime_context.rs @@ -1,8 +1,11 @@ use crate::codex::Session; +use crate::compact::content_items_to_text; +use crate::event_mapping::is_contextual_user_message_content; use crate::git_info::resolve_root_git_project_for_trust; use crate::truncate::TruncationPolicy; use crate::truncate::truncate_text; use chrono::Utc; +use codex_protocol::models::ResponseItem; use codex_state::SortKey; use codex_state::ThreadMetadata; use dirs::home_dir; @@ -19,9 +22,11 @@ use tracing::info; use tracing::warn; const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.\nThis is background context about recent work and machine/workspace layout. It may be incomplete or stale. Use it to inform responses, and do not repeat it back unless relevant."; +const CURRENT_THREAD_SECTION_TOKEN_BUDGET: usize = 1_200; const RECENT_WORK_SECTION_TOKEN_BUDGET: usize = 2_200; const WORKSPACE_SECTION_TOKEN_BUDGET: usize = 1_600; const NOTES_SECTION_TOKEN_BUDGET: usize = 300; +const MAX_CURRENT_THREAD_TURNS: usize = 2; const MAX_RECENT_THREADS: usize = 40; const MAX_RECENT_WORK_GROUPS: usize = 8; const MAX_CURRENT_CWD_ASKS: usize = 8; @@ -49,20 +54,33 @@ pub(crate) async fn build_realtime_startup_context( ) -> Option { let config = sess.get_config().await; let cwd = config.cwd.clone(); + let history = sess.clone_history().await; + let current_thread_section = build_current_thread_section(history.raw_items()); let recent_threads = load_recent_threads(sess).await; let recent_work_section = build_recent_work_section(&cwd, &recent_threads); let workspace_section = build_workspace_section(&cwd); - if recent_work_section.is_none() && workspace_section.is_none() { + if current_thread_section.is_none() + && recent_work_section.is_none() + && workspace_section.is_none() + { debug!("realtime startup context unavailable; skipping injection"); return None; } let mut parts = vec![STARTUP_CONTEXT_HEADER.to_string()]; + let has_current_thread_section = current_thread_section.is_some(); let has_recent_work_section = recent_work_section.is_some(); let has_workspace_section = workspace_section.is_some(); + if let Some(section) = format_section( + "Current Thread", + current_thread_section, + CURRENT_THREAD_SECTION_TOKEN_BUDGET, + ) { + parts.push(section); + } if let Some(section) = format_section( "Recent Work", recent_work_section, @@ -79,7 +97,7 @@ pub(crate) async fn build_realtime_startup_context( } if let Some(section) = format_section( "Notes", - Some("Built at realtime startup from persisted thread metadata in the state DB and a bounded local workspace scan. This excludes repo memory instructions, AGENTS files, project-doc prompt blends, and memory summaries.".to_string()), + Some("Built at realtime startup from the current thread history, persisted thread metadata in the state DB, and a bounded local workspace scan. This excludes repo memory instructions, AGENTS files, project-doc prompt blends, and memory summaries.".to_string()), NOTES_SECTION_TOKEN_BUDGET, ) { parts.push(section); @@ -89,6 +107,7 @@ pub(crate) async fn build_realtime_startup_context( debug!( approx_tokens = approx_token_count(&context), bytes = context.len(), + has_current_thread_section, has_recent_work_section, has_workspace_section, "built realtime startup context" @@ -167,6 +186,90 @@ fn build_recent_work_section(cwd: &Path, recent_threads: &[ThreadMetadata]) -> O (!sections.is_empty()).then(|| sections.join("\n\n")) } +fn build_current_thread_section(items: &[ResponseItem]) -> Option { + let mut turns = Vec::new(); + let mut current_user = Vec::new(); + let mut current_assistant = Vec::new(); + + for item in items { + match item { + ResponseItem::Message { role, content, .. } if role == "user" => { + if is_contextual_user_message_content(content) { + continue; + } + let Some(text) = content_items_to_text(content) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + else { + continue; + }; + if !current_user.is_empty() || !current_assistant.is_empty() { + turns.push(( + std::mem::take(&mut current_user), + std::mem::take(&mut current_assistant), + )); + } + current_user.push(text); + } + ResponseItem::Message { role, content, .. } if role == "assistant" => { + let Some(text) = content_items_to_text(content) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + else { + continue; + }; + if current_user.is_empty() && current_assistant.is_empty() { + continue; + } + current_assistant.push(text); + } + _ => {} + } + } + + if !current_user.is_empty() || !current_assistant.is_empty() { + turns.push((current_user, current_assistant)); + } + + let retained_turns = turns + .into_iter() + .rev() + .take(MAX_CURRENT_THREAD_TURNS) + .collect::>() + .into_iter() + .rev() + .collect::>(); + if retained_turns.is_empty() { + return None; + } + + let mut lines = vec![ + "Most recent user/assistant turns from this exact thread. Use them for continuity when responding.".to_string(), + ]; + + let retained_turn_count = retained_turns.len(); + for (index, (user_messages, assistant_messages)) in retained_turns.into_iter().enumerate() { + lines.push(String::new()); + if retained_turn_count == 1 || index + 1 == retained_turn_count { + lines.push("### Latest turn".to_string()); + } else { + lines.push(format!("### Prior turn {}", index + 1)); + } + + if !user_messages.is_empty() { + lines.push("User:".to_string()); + lines.push(user_messages.join("\n\n")); + } + if !assistant_messages.is_empty() { + lines.push(String::new()); + lines.push("Assistant:".to_string()); + lines.push(assistant_messages.join("\n\n")); + } + } + + Some(lines.join("\n")) +} + fn build_workspace_section(cwd: &Path) -> Option { build_workspace_section_with_user_root(cwd, home_dir()) } From 278f3a7b9e19cfa462986aeffdede34ea42cf2de Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 16:14:16 -0700 Subject: [PATCH 15/17] Inline thin realtime context helpers Co-authored-by: Codex --- codex-rs/core/src/realtime_context.rs | 17 +++-------------- codex-rs/core/src/realtime_context_tests.rs | 4 ++-- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs index f2d603425e6..73b17ee1fd4 100644 --- a/codex-rs/core/src/realtime_context.rs +++ b/codex-rs/core/src/realtime_context.rs @@ -58,7 +58,7 @@ pub(crate) async fn build_realtime_startup_context( let current_thread_section = build_current_thread_section(history.raw_items()); let recent_threads = load_recent_threads(sess).await; let recent_work_section = build_recent_work_section(&cwd, &recent_threads); - let workspace_section = build_workspace_section(&cwd); + let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()); if current_thread_section.is_none() && recent_work_section.is_none() @@ -270,10 +270,6 @@ fn build_current_thread_section(items: &[ResponseItem]) -> Option { Some(lines.join("\n")) } -fn build_workspace_section(cwd: &Path) -> Option { - build_workspace_section_with_user_root(cwd, home_dir()) -} - fn build_workspace_section_with_user_root( cwd: &Path, user_root: Option, @@ -300,12 +296,12 @@ fn build_workspace_section_with_user_root( let mut lines = vec![ format!("Current working directory: {}", cwd.display()), - format!("Working directory name: {}", display_name(cwd)), + format!("Working directory name: {}", file_name_string(cwd)), ]; if let Some(git_root) = &git_root { lines.push(format!("Git root: {}", git_root.display())); - lines.push(format!("Git project: {}", display_name(git_root))); + lines.push(format!("Git project: {}", file_name_string(git_root))); } if let Some(user_root) = &user_root { lines.push(format!("User root: {}", user_root.display())); @@ -479,13 +475,6 @@ fn format_thread_group( (lines.len() > 5).then(|| lines.join("\n")) } -fn display_name(path: &Path) -> String { - path.file_name() - .and_then(OsStr::to_str) - .map(str::to_owned) - .unwrap_or_else(|| path.display().to_string()) -} - fn file_name_string(path: &Path) -> String { path.file_name() .and_then(OsStr::to_str) diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index b23c2743cf2..1e23b73b32a 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -1,5 +1,4 @@ use super::build_recent_work_section; -use super::build_workspace_section; use super::build_workspace_section_with_user_root; use chrono::TimeZone; use chrono::Utc; @@ -56,7 +55,8 @@ fn workspace_section_includes_tree_when_entries_exist() { fs::create_dir(cwd.path().join("docs")).expect("create docs dir"); fs::write(cwd.path().join("README.md"), "hello").expect("write readme"); - let section = build_workspace_section(cwd.path()).expect("workspace section"); + let section = + build_workspace_section_with_user_root(cwd.path(), None).expect("workspace section"); assert!(section.contains("Working directory tree:")); assert!(section.contains("- docs/")); assert!(section.contains("- README.md")); From b12599fa6803aa97e16810cef2af6a466ad871ca Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 21:29:50 -0700 Subject: [PATCH 16/17] Reuse shared transcript helpers in realtime context Move realtime startup transcript collection over to the shared guardian transcript path and rename the shared helper functions to generic names. Co-authored-by: Codex --- codex-rs/core/src/guardian/mod.rs | 11 +++---- codex-rs/core/src/guardian/prompt.rs | 10 +++---- codex-rs/core/src/guardian/tests.rs | 14 ++++----- codex-rs/core/src/realtime_context.rs | 42 ++++++++++----------------- 4 files changed, 30 insertions(+), 47 deletions(-) diff --git a/codex-rs/core/src/guardian/mod.rs b/codex-rs/core/src/guardian/mod.rs index 8fd0e994a95..833684aaf43 100644 --- a/codex-rs/core/src/guardian/mod.rs +++ b/codex-rs/core/src/guardian/mod.rs @@ -24,6 +24,9 @@ use serde::Serialize; pub(crate) use approval_request::GuardianApprovalRequest; pub(crate) use approval_request::GuardianMcpAnnotations; pub(crate) use approval_request::guardian_approval_request_to_json; +pub(crate) use prompt::GuardianTranscriptEntry; +pub(crate) use prompt::GuardianTranscriptEntryKind; +pub(crate) use prompt::collect_transcript_entries; pub(crate) use review::GUARDIAN_REJECTION_MESSAGE; pub(crate) use review::is_guardian_reviewer_source; pub(crate) use review::review_approval_request; @@ -66,14 +69,8 @@ use approval_request::guardian_assessment_action_value; #[cfg(test)] use approval_request::guardian_request_turn_id; #[cfg(test)] -use prompt::GuardianTranscriptEntry; -#[cfg(test)] -use prompt::GuardianTranscriptEntryKind; -#[cfg(test)] use prompt::build_guardian_prompt_items; #[cfg(test)] -use prompt::collect_guardian_transcript_entries; -#[cfg(test)] use prompt::guardian_output_schema; #[cfg(test)] pub(crate) use prompt::guardian_policy_prompt; @@ -82,7 +79,7 @@ use prompt::guardian_truncate_text; #[cfg(test)] use prompt::parse_guardian_assessment; #[cfg(test)] -use prompt::render_guardian_transcript_entries; +use prompt::render_transcript_entries; #[cfg(test)] use review::GuardianReviewOutcome; #[cfg(test)] diff --git a/codex-rs/core/src/guardian/prompt.rs b/codex-rs/core/src/guardian/prompt.rs index 2d5b19765c2..a62e369e4b7 100644 --- a/codex-rs/core/src/guardian/prompt.rs +++ b/codex-rs/core/src/guardian/prompt.rs @@ -67,11 +67,11 @@ pub(crate) async fn build_guardian_prompt_items( request: GuardianApprovalRequest, ) -> serde_json::Result> { let history = session.clone_history().await; - let transcript_entries = collect_guardian_transcript_entries(history.raw_items()); + let transcript_entries = collect_transcript_entries(history.raw_items()); let planned_action_json = format_guardian_action_pretty(&request)?; let (transcript_entries, omission_note) = - render_guardian_transcript_entries(transcript_entries.as_slice()); + render_transcript_entries(transcript_entries.as_slice()); let mut items = Vec::new(); let mut push_text = |text: String| { items.push(UserInput::Text { @@ -117,7 +117,7 @@ pub(crate) async fn build_guardian_prompt_items( /// conversation /// /// User messages are never dropped unless the entire transcript must be omitted. -pub(crate) fn render_guardian_transcript_entries( +pub(crate) fn render_transcript_entries( entries: &[GuardianTranscriptEntry], ) -> (Vec, Option) { if entries.is_empty() { @@ -205,9 +205,7 @@ pub(crate) fn render_guardian_transcript_entries( /// Keep both tool calls and tool results here. The reviewer often needs the /// agent's exact queried path / arguments as well as the returned evidence to /// decide whether the pending approval is justified. -pub(crate) fn collect_guardian_transcript_entries( - items: &[ResponseItem], -) -> Vec { +pub(crate) fn collect_transcript_entries(items: &[ResponseItem]) -> Vec { let mut entries = Vec::new(); let mut tool_names_by_call_id = HashMap::new(); let non_empty_entry = |kind, text: String| { diff --git a/codex-rs/core/src/guardian/tests.rs b/codex-rs/core/src/guardian/tests.rs index dd2f944782a..afbaa7bcedc 100644 --- a/codex-rs/core/src/guardian/tests.rs +++ b/codex-rs/core/src/guardian/tests.rs @@ -121,7 +121,7 @@ fn guardian_snapshot_options() -> ContextSnapshotOptions { } #[test] -fn build_guardian_transcript_keeps_original_numbering() { +fn render_transcript_entries_keeps_original_numbering() { let entries = [ GuardianTranscriptEntry { kind: GuardianTranscriptEntryKind::User, @@ -137,7 +137,7 @@ fn build_guardian_transcript_keeps_original_numbering() { }, ]; - let (transcript, omission) = render_guardian_transcript_entries(&entries[..2]); + let (transcript, omission) = render_transcript_entries(&entries[..2]); assert_eq!( transcript, @@ -150,7 +150,7 @@ fn build_guardian_transcript_keeps_original_numbering() { } #[test] -fn collect_guardian_transcript_entries_skips_contextual_user_messages() { +fn collect_transcript_entries_skips_contextual_user_messages() { let items = vec![ ResponseItem::Message { id: None, @@ -172,7 +172,7 @@ fn collect_guardian_transcript_entries_skips_contextual_user_messages() { }, ]; - let entries = collect_guardian_transcript_entries(&items); + let entries = collect_transcript_entries(&items); assert_eq!(entries.len(), 1); assert_eq!( @@ -185,7 +185,7 @@ fn collect_guardian_transcript_entries_skips_contextual_user_messages() { } #[test] -fn collect_guardian_transcript_entries_includes_recent_tool_calls_and_output() { +fn collect_transcript_entries_includes_recent_tool_calls_and_output() { let items = vec![ ResponseItem::Message { id: None, @@ -220,7 +220,7 @@ fn collect_guardian_transcript_entries_includes_recent_tool_calls_and_output() { }, ]; - let entries = collect_guardian_transcript_entries(&items); + let entries = collect_transcript_entries(&items); assert_eq!(entries.len(), 4); assert_eq!( @@ -444,7 +444,7 @@ fn build_guardian_transcript_reserves_separate_budget_for_tool_evidence() { text: repeated.clone(), })); - let (transcript, omission) = render_guardian_transcript_entries(&entries); + let (transcript, omission) = render_transcript_entries(&entries); assert!( transcript diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs index 73b17ee1fd4..c8275a3602d 100644 --- a/codex-rs/core/src/realtime_context.rs +++ b/codex-rs/core/src/realtime_context.rs @@ -1,11 +1,11 @@ use crate::codex::Session; -use crate::compact::content_items_to_text; -use crate::event_mapping::is_contextual_user_message_content; use crate::git_info::resolve_root_git_project_for_trust; +use crate::guardian::GuardianTranscriptEntry; +use crate::guardian::GuardianTranscriptEntryKind; +use crate::guardian::collect_transcript_entries; use crate::truncate::TruncationPolicy; use crate::truncate::truncate_text; use chrono::Utc; -use codex_protocol::models::ResponseItem; use codex_state::SortKey; use codex_state::ThreadMetadata; use dirs::home_dir; @@ -55,7 +55,8 @@ pub(crate) async fn build_realtime_startup_context( let config = sess.get_config().await; let cwd = config.cwd.clone(); let history = sess.clone_history().await; - let current_thread_section = build_current_thread_section(history.raw_items()); + let transcript_entries = collect_transcript_entries(history.raw_items()); + let current_thread_section = build_recent_turns_section(transcript_entries.as_slice()); let recent_threads = load_recent_threads(sess).await; let recent_work_section = build_recent_work_section(&cwd, &recent_threads); let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()); @@ -186,44 +187,31 @@ fn build_recent_work_section(cwd: &Path, recent_threads: &[ThreadMetadata]) -> O (!sections.is_empty()).then(|| sections.join("\n\n")) } -fn build_current_thread_section(items: &[ResponseItem]) -> Option { +fn build_recent_turns_section(entries: &[GuardianTranscriptEntry]) -> Option { let mut turns = Vec::new(); let mut current_user = Vec::new(); let mut current_assistant = Vec::new(); - for item in items { - match item { - ResponseItem::Message { role, content, .. } if role == "user" => { - if is_contextual_user_message_content(content) { - continue; - } - let Some(text) = content_items_to_text(content) - .map(|text| text.trim().to_string()) - .filter(|text| !text.is_empty()) - else { - continue; - }; + for entry in entries { + match &entry.kind { + GuardianTranscriptEntryKind::User => { + let text = entry.text.trim(); if !current_user.is_empty() || !current_assistant.is_empty() { turns.push(( std::mem::take(&mut current_user), std::mem::take(&mut current_assistant), )); } - current_user.push(text); + current_user.push(text.to_string()); } - ResponseItem::Message { role, content, .. } if role == "assistant" => { - let Some(text) = content_items_to_text(content) - .map(|text| text.trim().to_string()) - .filter(|text| !text.is_empty()) - else { - continue; - }; + GuardianTranscriptEntryKind::Assistant => { + let text = entry.text.trim(); if current_user.is_empty() && current_assistant.is_empty() { continue; } - current_assistant.push(text); + current_assistant.push(text.to_string()); } - _ => {} + GuardianTranscriptEntryKind::Tool(_) => {} } } From de9c75b27cf9cd03cf069a0d5342e3090aaf2518 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 21:32:12 -0700 Subject: [PATCH 17/17] Revert "Reuse shared transcript helpers in realtime context" This reverts commit b12599fa6. Co-authored-by: Codex --- codex-rs/core/src/guardian/mod.rs | 11 ++++--- codex-rs/core/src/guardian/prompt.rs | 10 ++++--- codex-rs/core/src/guardian/tests.rs | 14 ++++----- codex-rs/core/src/realtime_context.rs | 42 +++++++++++++++++---------- 4 files changed, 47 insertions(+), 30 deletions(-) diff --git a/codex-rs/core/src/guardian/mod.rs b/codex-rs/core/src/guardian/mod.rs index 833684aaf43..8fd0e994a95 100644 --- a/codex-rs/core/src/guardian/mod.rs +++ b/codex-rs/core/src/guardian/mod.rs @@ -24,9 +24,6 @@ use serde::Serialize; pub(crate) use approval_request::GuardianApprovalRequest; pub(crate) use approval_request::GuardianMcpAnnotations; pub(crate) use approval_request::guardian_approval_request_to_json; -pub(crate) use prompt::GuardianTranscriptEntry; -pub(crate) use prompt::GuardianTranscriptEntryKind; -pub(crate) use prompt::collect_transcript_entries; pub(crate) use review::GUARDIAN_REJECTION_MESSAGE; pub(crate) use review::is_guardian_reviewer_source; pub(crate) use review::review_approval_request; @@ -69,8 +66,14 @@ use approval_request::guardian_assessment_action_value; #[cfg(test)] use approval_request::guardian_request_turn_id; #[cfg(test)] +use prompt::GuardianTranscriptEntry; +#[cfg(test)] +use prompt::GuardianTranscriptEntryKind; +#[cfg(test)] use prompt::build_guardian_prompt_items; #[cfg(test)] +use prompt::collect_guardian_transcript_entries; +#[cfg(test)] use prompt::guardian_output_schema; #[cfg(test)] pub(crate) use prompt::guardian_policy_prompt; @@ -79,7 +82,7 @@ use prompt::guardian_truncate_text; #[cfg(test)] use prompt::parse_guardian_assessment; #[cfg(test)] -use prompt::render_transcript_entries; +use prompt::render_guardian_transcript_entries; #[cfg(test)] use review::GuardianReviewOutcome; #[cfg(test)] diff --git a/codex-rs/core/src/guardian/prompt.rs b/codex-rs/core/src/guardian/prompt.rs index a62e369e4b7..2d5b19765c2 100644 --- a/codex-rs/core/src/guardian/prompt.rs +++ b/codex-rs/core/src/guardian/prompt.rs @@ -67,11 +67,11 @@ pub(crate) async fn build_guardian_prompt_items( request: GuardianApprovalRequest, ) -> serde_json::Result> { let history = session.clone_history().await; - let transcript_entries = collect_transcript_entries(history.raw_items()); + let transcript_entries = collect_guardian_transcript_entries(history.raw_items()); let planned_action_json = format_guardian_action_pretty(&request)?; let (transcript_entries, omission_note) = - render_transcript_entries(transcript_entries.as_slice()); + render_guardian_transcript_entries(transcript_entries.as_slice()); let mut items = Vec::new(); let mut push_text = |text: String| { items.push(UserInput::Text { @@ -117,7 +117,7 @@ pub(crate) async fn build_guardian_prompt_items( /// conversation /// /// User messages are never dropped unless the entire transcript must be omitted. -pub(crate) fn render_transcript_entries( +pub(crate) fn render_guardian_transcript_entries( entries: &[GuardianTranscriptEntry], ) -> (Vec, Option) { if entries.is_empty() { @@ -205,7 +205,9 @@ pub(crate) fn render_transcript_entries( /// Keep both tool calls and tool results here. The reviewer often needs the /// agent's exact queried path / arguments as well as the returned evidence to /// decide whether the pending approval is justified. -pub(crate) fn collect_transcript_entries(items: &[ResponseItem]) -> Vec { +pub(crate) fn collect_guardian_transcript_entries( + items: &[ResponseItem], +) -> Vec { let mut entries = Vec::new(); let mut tool_names_by_call_id = HashMap::new(); let non_empty_entry = |kind, text: String| { diff --git a/codex-rs/core/src/guardian/tests.rs b/codex-rs/core/src/guardian/tests.rs index afbaa7bcedc..dd2f944782a 100644 --- a/codex-rs/core/src/guardian/tests.rs +++ b/codex-rs/core/src/guardian/tests.rs @@ -121,7 +121,7 @@ fn guardian_snapshot_options() -> ContextSnapshotOptions { } #[test] -fn render_transcript_entries_keeps_original_numbering() { +fn build_guardian_transcript_keeps_original_numbering() { let entries = [ GuardianTranscriptEntry { kind: GuardianTranscriptEntryKind::User, @@ -137,7 +137,7 @@ fn render_transcript_entries_keeps_original_numbering() { }, ]; - let (transcript, omission) = render_transcript_entries(&entries[..2]); + let (transcript, omission) = render_guardian_transcript_entries(&entries[..2]); assert_eq!( transcript, @@ -150,7 +150,7 @@ fn render_transcript_entries_keeps_original_numbering() { } #[test] -fn collect_transcript_entries_skips_contextual_user_messages() { +fn collect_guardian_transcript_entries_skips_contextual_user_messages() { let items = vec![ ResponseItem::Message { id: None, @@ -172,7 +172,7 @@ fn collect_transcript_entries_skips_contextual_user_messages() { }, ]; - let entries = collect_transcript_entries(&items); + let entries = collect_guardian_transcript_entries(&items); assert_eq!(entries.len(), 1); assert_eq!( @@ -185,7 +185,7 @@ fn collect_transcript_entries_skips_contextual_user_messages() { } #[test] -fn collect_transcript_entries_includes_recent_tool_calls_and_output() { +fn collect_guardian_transcript_entries_includes_recent_tool_calls_and_output() { let items = vec![ ResponseItem::Message { id: None, @@ -220,7 +220,7 @@ fn collect_transcript_entries_includes_recent_tool_calls_and_output() { }, ]; - let entries = collect_transcript_entries(&items); + let entries = collect_guardian_transcript_entries(&items); assert_eq!(entries.len(), 4); assert_eq!( @@ -444,7 +444,7 @@ fn build_guardian_transcript_reserves_separate_budget_for_tool_evidence() { text: repeated.clone(), })); - let (transcript, omission) = render_transcript_entries(&entries); + let (transcript, omission) = render_guardian_transcript_entries(&entries); assert!( transcript diff --git a/codex-rs/core/src/realtime_context.rs b/codex-rs/core/src/realtime_context.rs index c8275a3602d..73b17ee1fd4 100644 --- a/codex-rs/core/src/realtime_context.rs +++ b/codex-rs/core/src/realtime_context.rs @@ -1,11 +1,11 @@ use crate::codex::Session; +use crate::compact::content_items_to_text; +use crate::event_mapping::is_contextual_user_message_content; use crate::git_info::resolve_root_git_project_for_trust; -use crate::guardian::GuardianTranscriptEntry; -use crate::guardian::GuardianTranscriptEntryKind; -use crate::guardian::collect_transcript_entries; use crate::truncate::TruncationPolicy; use crate::truncate::truncate_text; use chrono::Utc; +use codex_protocol::models::ResponseItem; use codex_state::SortKey; use codex_state::ThreadMetadata; use dirs::home_dir; @@ -55,8 +55,7 @@ pub(crate) async fn build_realtime_startup_context( let config = sess.get_config().await; let cwd = config.cwd.clone(); let history = sess.clone_history().await; - let transcript_entries = collect_transcript_entries(history.raw_items()); - let current_thread_section = build_recent_turns_section(transcript_entries.as_slice()); + let current_thread_section = build_current_thread_section(history.raw_items()); let recent_threads = load_recent_threads(sess).await; let recent_work_section = build_recent_work_section(&cwd, &recent_threads); let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()); @@ -187,31 +186,44 @@ fn build_recent_work_section(cwd: &Path, recent_threads: &[ThreadMetadata]) -> O (!sections.is_empty()).then(|| sections.join("\n\n")) } -fn build_recent_turns_section(entries: &[GuardianTranscriptEntry]) -> Option { +fn build_current_thread_section(items: &[ResponseItem]) -> Option { let mut turns = Vec::new(); let mut current_user = Vec::new(); let mut current_assistant = Vec::new(); - for entry in entries { - match &entry.kind { - GuardianTranscriptEntryKind::User => { - let text = entry.text.trim(); + for item in items { + match item { + ResponseItem::Message { role, content, .. } if role == "user" => { + if is_contextual_user_message_content(content) { + continue; + } + let Some(text) = content_items_to_text(content) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + else { + continue; + }; if !current_user.is_empty() || !current_assistant.is_empty() { turns.push(( std::mem::take(&mut current_user), std::mem::take(&mut current_assistant), )); } - current_user.push(text.to_string()); + current_user.push(text); } - GuardianTranscriptEntryKind::Assistant => { - let text = entry.text.trim(); + ResponseItem::Message { role, content, .. } if role == "assistant" => { + let Some(text) = content_items_to_text(content) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + else { + continue; + }; if current_user.is_empty() && current_assistant.is_empty() { continue; } - current_assistant.push(text.to_string()); + current_assistant.push(text); } - GuardianTranscriptEntryKind::Tool(_) => {} + _ => {} } }