diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 6dd05e13ed3a..c37e9242775d 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -1,12 +1,18 @@ use anyhow::Context; use anyhow::Result; use app_test_support::McpProcess; +use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_responses_server_sequence_unchecked; +use app_test_support::create_shell_command_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::LoginAccountResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; @@ -26,28 +32,38 @@ use codex_app_server_protocol::ThreadRealtimeStopResponse; use codex_app_server_protocol::ThreadRealtimeTranscriptUpdatedNotification; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnStartedNotification; use codex_features::FEATURES; use codex_features::Feature; use codex_protocol::protocol::RealtimeConversationVersion; +use core_test_support::responses; use core_test_support::responses::WebSocketConnectionConfig; +use core_test_support::responses::WebSocketRequest; +use core_test_support::responses::WebSocketTestServer; use core_test_support::responses::start_websocket_server; use core_test_support::responses::start_websocket_server_with_headers; use core_test_support::skip_if_no_network; use pretty_assertions::assert_eq; use serde::de::DeserializeOwned; +use serde_json::Value; use serde_json::json; use std::path::Path; use std::sync::Arc; use std::sync::Mutex; +use std::sync::mpsc; use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; use wiremock::Match; use wiremock::Mock; +use wiremock::MockServer; use wiremock::Request as WiremockRequest; +use wiremock::Respond; use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; +use wiremock::matchers::path_regex; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; @@ -90,6 +106,333 @@ impl Match for RealtimeCallRequestCapture { } } +struct GatedSseResponse { + gate_rx: Mutex>>, + response: String, +} + +impl Respond for GatedSseResponse { + fn respond(&self, _: &WiremockRequest) -> ResponseTemplate { + let gate_rx = self + .gate_rx + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take(); + if let Some(gate_rx) = gate_rx { + let _ = gate_rx.recv(); + } + responses::sse_response(self.response.clone()) + } +} + +#[derive(Debug, Clone, Copy)] +enum RealtimeTestVersion { + V1, + V2, +} + +impl RealtimeTestVersion { + fn config_value(self) -> &'static str { + match self { + RealtimeTestVersion::V1 => "v1", + RealtimeTestVersion::V2 => "v2", + } + } +} + +#[derive(Debug, Clone, Copy)] +enum RealtimeTestSandbox { + ReadOnly, + DangerFullAccess, +} + +impl RealtimeTestSandbox { + fn config_value(self) -> &'static str { + match self { + RealtimeTestSandbox::ReadOnly => "read-only", + RealtimeTestSandbox::DangerFullAccess => "danger-full-access", + } + } +} + +#[derive(Debug, PartialEq)] +struct StartedWebrtcRealtime { + started: ThreadRealtimeStartedNotification, + sdp: ThreadRealtimeSdpNotification, +} + +// Scripted SSE responses for the normal Codex agent loop. Realtime can ask for a delegated +// Codex turn; that turn talks to this mock `/responses` endpoint and may request ordinary tools. +struct MainLoopResponsesScript { + responses: Vec, +} + +// Scripted server events for the direct realtime sideband WebSocket. This mock is the realtime +// session app-server joins after call creation; it is not the main-loop Responses stream. +struct RealtimeSidebandScript { + connections: Vec, +} + +struct RealtimeE2eHarness { + mcp: McpProcess, + _codex_home: TempDir, + main_loop_responses_server: MockServer, + realtime_server: WebSocketTestServer, + call_capture: RealtimeCallRequestCapture, + thread_id: String, +} + +impl RealtimeE2eHarness { + // Owns the full mocked app-server realtime route: MCP client, Responses mocks, WebRTC call + // creation capture, sideband WebSocket server, login, config, and a started thread. + async fn new( + realtime_version: RealtimeTestVersion, + main_loop: MainLoopResponsesScript, + realtime_sideband: RealtimeSidebandScript, + ) -> Result { + let main_loop_responses_server = + create_mock_responses_server_sequence_unchecked(main_loop.responses).await; + Self::new_with_main_loop_responses_server_and_sandbox( + realtime_version, + main_loop_responses_server, + realtime_sideband, + RealtimeTestSandbox::ReadOnly, + ) + .await + } + + async fn new_with_sandbox( + realtime_version: RealtimeTestVersion, + main_loop: MainLoopResponsesScript, + realtime_sideband: RealtimeSidebandScript, + sandbox: RealtimeTestSandbox, + ) -> Result { + let main_loop_responses_server = + create_mock_responses_server_sequence_unchecked(main_loop.responses).await; + Self::new_with_main_loop_responses_server_and_sandbox( + realtime_version, + main_loop_responses_server, + realtime_sideband, + sandbox, + ) + .await + } + + async fn new_with_main_loop_responses_server( + realtime_version: RealtimeTestVersion, + main_loop_responses_server: MockServer, + realtime_sideband: RealtimeSidebandScript, + ) -> Result { + Self::new_with_main_loop_responses_server_and_sandbox( + realtime_version, + main_loop_responses_server, + realtime_sideband, + RealtimeTestSandbox::ReadOnly, + ) + .await + } + + async fn new_with_main_loop_responses_server_and_sandbox( + realtime_version: RealtimeTestVersion, + main_loop_responses_server: MockServer, + realtime_sideband: RealtimeSidebandScript, + sandbox: RealtimeTestSandbox, + ) -> Result { + let call_capture = RealtimeCallRequestCapture::new(); + Mock::given(method("POST")) + .and(path("/v1/realtime/calls")) + .and(call_capture.clone()) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Location", "/v1/realtime/calls/rtc_e2e") + .set_body_string("v=answer\r\n"), + ) + .mount(&main_loop_responses_server) + .await; + + let realtime_server = + start_websocket_server_with_headers(realtime_sideband.connections).await; + let codex_home = TempDir::new()?; + create_config_toml_with_realtime_version( + codex_home.path(), + &main_loop_responses_server.uri(), + realtime_server.uri(), + /*realtime_enabled*/ true, + StartupContextConfig::Override("startup context"), + realtime_version, + sandbox, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + mcp.initialize().await?; + login_with_api_key(&mut mcp, "sk-test-key").await?; + + let thread_start_request_id = mcp + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let thread_start_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)), + ) + .await??; + let thread_start: ThreadStartResponse = to_response(thread_start_response)?; + + Ok(Self { + mcp, + _codex_home: codex_home, + main_loop_responses_server, + realtime_server, + call_capture, + thread_id: thread_start.thread.id, + }) + } + + async fn start_webrtc_realtime(&mut self, offer_sdp: &str) -> Result { + // Starts realtime through the public JSON-RPC method, then waits for the same client-visible + // notifications a desktop app needs: started first, SDP answer second. + let start_request_id = self + .mcp + .send_thread_realtime_start_request(ThreadRealtimeStartParams { + thread_id: self.thread_id.clone(), + prompt: "backend prompt".to_string(), + session_id: None, + transport: Some(ThreadRealtimeStartTransport::Webrtc { + sdp: offer_sdp.to_string(), + }), + }) + .await?; + let start_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + self.mcp + .read_stream_until_response_message(RequestId::Integer(start_request_id)), + ) + .await??; + let _: ThreadRealtimeStartResponse = to_response(start_response)?; + + let started = self + .read_notification::("thread/realtime/started") + .await?; + let sdp = self + .read_notification::("thread/realtime/sdp") + .await?; + + Ok(StartedWebrtcRealtime { started, sdp }) + } + + async fn read_notification(&mut self, method: &str) -> Result { + read_notification(&mut self.mcp, method).await + } + + async fn sideband_outbound_request(&self, request_index: usize) -> Value { + self.realtime_server + .wait_for_request(/*connection_index*/ 0, request_index) + .await + .body_json() + } + + async fn append_audio(&mut self, thread_id: String) -> Result<()> { + let request_id = self + .mcp + .send_thread_realtime_append_audio_request(ThreadRealtimeAppendAudioParams { + thread_id, + audio: ThreadRealtimeAudioChunk { + data: "BQYH".to_string(), + sample_rate: 24_000, + num_channels: 1, + samples_per_channel: Some(480), + item_id: None, + }, + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + self.mcp + .read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: ThreadRealtimeAppendAudioResponse = to_response(response)?; + Ok(()) + } + + async fn append_text(&mut self, thread_id: String, text: &str) -> Result<()> { + let request_id = self + .mcp + .send_thread_realtime_append_text_request(ThreadRealtimeAppendTextParams { + thread_id, + text: text.to_string(), + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + self.mcp + .read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: ThreadRealtimeAppendTextResponse = to_response(response)?; + Ok(()) + } + + async fn main_loop_responses_requests(&self) -> Result> { + responses_requests(&self.main_loop_responses_server).await + } + + async fn shutdown(self) { + self.realtime_server.shutdown().await; + } +} + +fn main_loop_responses(responses: Vec) -> MainLoopResponsesScript { + MainLoopResponsesScript { responses } +} + +fn no_main_loop_responses() -> MainLoopResponsesScript { + main_loop_responses(Vec::new()) +} + +fn realtime_sideband(connections: Vec) -> RealtimeSidebandScript { + RealtimeSidebandScript { connections } +} + +fn realtime_sideband_connection( + realtime_server_events: Vec>, +) -> WebSocketConnectionConfig { + WebSocketConnectionConfig { + requests: realtime_server_events, + response_headers: Vec::new(), + accept_delay: None, + close_after_requests: true, + } +} + +fn open_realtime_sideband_connection( + realtime_server_events: Vec>, +) -> WebSocketConnectionConfig { + WebSocketConnectionConfig { + close_after_requests: false, + ..realtime_sideband_connection(realtime_server_events) + } +} + +fn session_updated(session_id: &str) -> Value { + json!({ + "type": "session.updated", + "session": { "id": session_id, "instructions": "backend prompt" } + }) +} + +fn v2_codex_tool_call(call_id: &str, prompt: &str) -> Value { + json!({ + "type": "conversation.item.done", + "item": { + "id": format!("item_{call_id}"), + "type": "function_call", + "name": "codex", + "call_id": call_id, + "arguments": json!({ "prompt": prompt }).to_string() + } + }) +} + #[tokio::test] async fn realtime_conversation_streams_v2_notifications() -> Result<()> { skip_if_no_network!(Ok(())); @@ -582,10 +925,425 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> { Ok(()) } +#[tokio::test] +async fn webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: build a v1 realtime thread with a mocked call-create response and a sideband socket + // that immediately proves the joined connection can receive server events. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V1, + no_main_loop_responses(), + realtime_sideband(vec![open_realtime_sideband_connection(vec![vec![ + session_updated("sess_v1_webrtc"), + ]])]), + ) + .await?; + + // Phase 2: start through app-server and assert the app receives both the started notification + // and the answer SDP. + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!( + started, + StartedWebrtcRealtime { + started: ThreadRealtimeStartedNotification { + thread_id: harness.thread_id.clone(), + session_id: Some(harness.thread_id.clone()), + version: RealtimeConversationVersion::V1, + }, + sdp: ThreadRealtimeSdpNotification { + thread_id: harness.thread_id.clone(), + sdp: "v=answer\r\n".to_string(), + }, + } + ); + + // Phase 3: verify the HTTP call-create leg, the direct sideband join, and the normal v1 + // session.update; the WebRTC transport should remain alive instead of closing after SDP. + assert_call_create_multipart( + harness.call_capture.single_request(), + "v=offer\r\n", + v1_session_create_json(), + )?; + assert_eq!( + harness.realtime_server.single_handshake().uri(), + "/v1/realtime?intent=quicksilver&call_id=rtc_e2e" + ); + + let session_update = harness.sideband_outbound_request(/*request_index*/ 0).await; + assert_v1_session_update(&session_update)?; + + let closed = timeout( + Duration::from_millis(100), + harness + .mcp + .read_stream_until_notification_message("thread/realtime/closed"), + ) + .await; + assert!(closed.is_err(), "WebRTC start should not close immediately"); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: script one v1 handoff request on the sideband and one delegated Responses turn. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V1, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "delegated from v1", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v1_handoff"), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate from v1" + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_v1", + "item_id": "item_v1", + "input_transcript": "delegate from v1" + }), + ], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V1); + + // Phase 2: wait for the delegated Codex turn that is launched by the handoff request. + let turn_started = harness + .read_notification::("turn/started") + .await?; + assert_eq!(turn_started.thread_id, harness.thread_id); + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + // Phase 3: assert the delegated prompt went to Responses, then the v1 handoff append went back + // over the existing sideband connection. + let requests = harness.main_loop_responses_requests().await?; + assert_eq!(requests.len(), 1); + assert!( + response_request_contains_text(&requests[0], "user: delegate from v1"), + "delegated Responses request should contain realtime text: {}", + requests[0] + ); + + let handoff_append = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_eq!( + handoff_append, + json!({ + "type": "conversation.handoff.append", + "handoff_id": "handoff_v1", + "output_text": "\"Agent Final Message\":\n\ndelegated from v1", + }) + ); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: create a v2 WebRTC conversation whose sideband sends transcript + output audio + // after the client has had a chance to append input. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + no_main_loop_responses(), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_v2_stream")], + vec![], + vec![ + json!({ + "type": "conversation.item.input_audio_transcription.delta", + "delta": "transcribed audio" + }), + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24_000, + "channels": 1, + "samples_per_channel": 512 + }), + ], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + assert_v2_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?; + + // Phase 2: drive app-server as the client would: append audio, append text, then receive + // transcript/audio notifications that came from the sideband socket. + let thread_id = started.started.thread_id.clone(); + harness.append_audio(thread_id.clone()).await?; + harness.append_text(thread_id, "hello").await?; + + let transcript = harness + .read_notification::( + "thread/realtime/transcriptUpdated", + ) + .await?; + assert_eq!(transcript.text, "transcribed audio"); + let output_audio = harness + .read_notification::( + "thread/realtime/outputAudio/delta", + ) + .await?; + assert_eq!(output_audio.audio.data, "AQID"); + + // Phase 3: prove the client inputs were translated into the v2 realtime sideband events. + let requests = [ + harness.sideband_outbound_request(/*request_index*/ 1).await, + harness.sideband_outbound_request(/*request_index*/ 2).await, + harness.sideband_outbound_request(/*request_index*/ 3).await, + ]; + assert!( + requests + .iter() + .any(|request| request["type"] == "input_audio_buffer.append" + && request["audio"] == "BQYH"), + "sideband requests should include audio append: {requests:?}" + ); + assert!( + requests.iter().any(|request| { + request["type"] == "conversation.item.create" + && request["item"]["type"] == "message" + && request["item"]["role"] == "user" + && request["item"]["content"][0]["text"] == "hello" + }), + "sideband requests should include user text item: {requests:?}" + ); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v2_codex_tool_call_delegates_and_returns_function_output() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: script a v2 codex function call and a delegated Responses turn that returns final + // assistant text. + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "delegated from v2", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v2_tool"), + v2_codex_tool_call("call_v2", "delegate from v2"), + ], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + + // Phase 2: wait for the delegated turn lifecycle kicked off by the v2 function-call item. + let turn_started = harness + .read_notification::("turn/started") + .await?; + assert_eq!(turn_started.thread_id, harness.thread_id); + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + // Phase 3: assert the delegated prompt went to Responses and the result returned as exactly one + // v2 function-call output event on the sideband. + let requests = harness.main_loop_responses_requests().await?; + assert_eq!(requests.len(), 1); + assert!( + response_request_contains_text(&requests[0], "delegate from v2"), + "delegated Responses request should contain tool prompt: {}", + requests[0] + ); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2"); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: keep the two mocked OpenAI conversations explicit. The realtime sideband only + // calls the `codex` function; the shell command is requested by the delegated main-loop + // Responses turn that app-server starts after receiving that function call. + let main_loop = main_loop_responses(vec![ + create_shell_command_sse_response( + realtime_tool_ok_command(), + /*workdir*/ None, + Some(5000), + "shell_call", + )?, + create_final_assistant_message_sse_response("shell tool finished")?, + ]); + let realtime = realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v2_shell"), + v2_codex_tool_call("call_shell", "run shell through delegated turn"), + ], + vec![], + ])]); + + let mut harness = RealtimeE2eHarness::new_with_sandbox( + RealtimeTestVersion::V2, + main_loop, + realtime, + RealtimeTestSandbox::DangerFullAccess, + ) + .await?; + + let _ = harness.start_webrtc_realtime("v=offer\r\n").await?; + + // Phase 2: observe the delegated main-loop turn executing the requested shell command. + let started_command = wait_for_started_command_execution(&mut harness.mcp).await?; + let ThreadItem::CommandExecution { id, status, .. } = started_command.item else { + unreachable!("helper returns command execution items"); + }; + assert_eq!( + (id.as_str(), status), + ("shell_call", CommandExecutionStatus::InProgress) + ); + + let completed_command = wait_for_completed_command_execution(&mut harness.mcp).await?; + let ThreadItem::CommandExecution { + id, + status, + aggregated_output, + .. + } = completed_command.item + else { + unreachable!("helper returns command execution items"); + }; + assert_eq!(id.as_str(), "shell_call"); + assert_eq!(status, CommandExecutionStatus::Completed); + assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok")); + + // Phase 3: verify the shell output reached Responses and the final delegated answer returned + // to realtime as a single function-call-output item. + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + let requests = harness.main_loop_responses_requests().await?; + assert_eq!(requests.len(), 2); + assert!( + response_request_contains_text(&requests[1], "realtime-tool-ok"), + "follow-up Responses request should contain shell output: {}", + requests[1] + ); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_function_call_output(&tool_output, "call_shell", "shell tool finished"); + assert_eq!( + function_call_output_sideband_requests(&harness.realtime_server).len(), + 1 + ); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> { + skip_if_no_network!(Ok(())); + + // Phase 1: gate the delegated Responses stream so the sideband can send audio while the tool + // call is still waiting on its delegated turn. + let main_loop_responses_server = responses::start_mock_server().await; + let (gate_completed_tx, gate_completed_rx) = mpsc::channel(); + let gated_response = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "late delegated result"), + responses::ev_completed("resp-1"), + ]); + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .respond_with(GatedSseResponse { + gate_rx: Mutex::new(Some(gate_completed_rx)), + response: gated_response, + }) + .expect(1) + .mount(&main_loop_responses_server) + .await; + + let mut harness = RealtimeE2eHarness::new_with_main_loop_responses_server( + RealtimeTestVersion::V2, + main_loop_responses_server, + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v2_nonblocking"), + v2_codex_tool_call("call_audio", "delegate while audio continues"), + json!({ + "type": "response.output_audio.delta", + "delta": "CQoL", + "sample_rate": 24_000, + "channels": 1, + "samples_per_channel": 256 + }), + ], + vec![], + ])]), + ) + .await?; + + let _ = harness.start_webrtc_realtime("v=offer\r\n").await?; + let _ = harness + .read_notification::("turn/started") + .await?; + + // Phase 2: require app-server to fan out sideband audio before the delegated tool call is + // allowed to finish. + let audio = harness + .read_notification::( + "thread/realtime/outputAudio/delta", + ) + .await?; + assert_eq!(audio.audio.data, "CQoL"); + + // Phase 3: release the delegated turn and assert the sideband function-call output is delivered + // after the nonblocking audio. + let _ = gate_completed_tx.send(()); + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result"); + + harness.shutdown().await; + Ok(()) +} + #[tokio::test] async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> { skip_if_no_network!(Ok(())); + // Phase 1: make call creation fail before any sideband connection can matter. let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await; Mock::given(method("POST")) .and(path("/v1/realtime/calls")) @@ -607,6 +1365,7 @@ async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> { mcp.initialize().await?; login_with_api_key(&mut mcp, "sk-test-key").await?; + // Phase 2: start a normal app-server thread and request realtime over WebRTC. let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; @@ -634,6 +1393,8 @@ async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> { .await??; let _: ThreadRealtimeStartResponse = to_response(start_response)?; + // Phase 3: the JSON-RPC start request returns, and the realtime failure is delivered as the + // typed realtime error notification. let error = read_notification::(&mut mcp, "thread/realtime/error") .await?; @@ -722,18 +1483,196 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> { Ok(()) } +async fn wait_for_started_command_execution( + mcp: &mut McpProcess, +) -> Result { + loop { + let started = read_notification::(mcp, "item/started").await?; + if let ThreadItem::CommandExecution { .. } = &started.item { + return Ok(started); + } + } +} + +async fn wait_for_completed_command_execution( + mcp: &mut McpProcess, +) -> Result { + loop { + let completed = + read_notification::(mcp, "item/completed").await?; + if let ThreadItem::CommandExecution { .. } = &completed.item { + return Ok(completed); + } + } +} + +async fn responses_requests(server: &MockServer) -> Result> { + server + .received_requests() + .await + .context("failed to fetch received requests")? + .into_iter() + .filter(|request| request.url.path().ends_with("/responses")) + .map(|request| { + request + .body_json::() + .context("Responses request body should be JSON") + }) + .collect() +} + +fn response_request_contains_text(request: &Value, text: &str) -> bool { + request.to_string().contains(text) +} + +fn realtime_tool_ok_command() -> Vec { + #[cfg(windows)] + { + vec![ + "powershell.exe".to_string(), + "-NoProfile".to_string(), + "-Command".to_string(), + "[Console]::Write('realtime-tool-ok')".to_string(), + ] + } + + #[cfg(not(windows))] + { + vec!["printf".to_string(), "realtime-tool-ok".to_string()] + } +} + +fn function_call_output_sideband_requests(server: &WebSocketTestServer) -> Vec { + server + .single_connection() + .iter() + .map(WebSocketRequest::body_json) + .filter(|request| { + request["type"] == "conversation.item.create" + && request["item"]["type"] == "function_call_output" + }) + .collect() +} + +fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_output: &str) { + assert_eq!( + request, + &json!({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": format!("\"Agent Final Message\":\n\n{expected_output}"), + } + }) + ); +} + +fn assert_v1_session_update(request: &Value) -> Result<()> { + assert_eq!(request["type"].as_str(), Some("session.update")); + assert_eq!(request["session"]["type"].as_str(), Some("quicksilver")); + assert!( + request["session"]["instructions"] + .as_str() + .context("v1 session.update instructions")? + .contains("startup context") + ); + assert_eq!( + request["session"]["audio"]["output"]["voice"].as_str(), + Some("fathom") + ); + assert_eq!(request["session"]["tools"], Value::Null); + Ok(()) +} + +fn assert_v2_session_update(request: &Value) -> Result<()> { + assert_eq!(request["type"].as_str(), Some("session.update")); + assert_eq!(request["session"]["type"].as_str(), Some("realtime")); + assert!( + request["session"]["instructions"] + .as_str() + .context("v2 session.update instructions")? + .contains("startup context") + ); + assert_eq!( + request["session"]["tools"][0]["name"].as_str(), + Some("codex") + ); + Ok(()) +} + +fn assert_call_create_multipart( + request: WiremockRequest, + offer_sdp: &str, + session: &str, +) -> Result<()> { + assert_eq!(request.url.path(), "/v1/realtime/calls"); + assert_eq!(request.url.query(), None); + assert_eq!( + request + .headers + .get("content-type") + .and_then(|value| value.to_str().ok()), + Some("multipart/form-data; boundary=codex-realtime-call-boundary") + ); + let body = String::from_utf8(request.body).context("multipart body should be utf-8")?; + assert_eq!( + body, + format!( + "--codex-realtime-call-boundary\r\n\ + Content-Disposition: form-data; name=\"sdp\"\r\n\ + Content-Type: application/sdp\r\n\ + \r\n\ + {offer_sdp}\r\n\ + --codex-realtime-call-boundary\r\n\ + Content-Disposition: form-data; name=\"session\"\r\n\ + Content-Type: application/json\r\n\ + \r\n\ + {session}\r\n\ + --codex-realtime-call-boundary--\r\n" + ) + ); + Ok(()) +} + +fn v1_session_create_json() -> &'static str { + r#"{"audio":{"input":{"format":{"type":"audio/pcm","rate":24000}},"output":{"voice":"fathom"}},"type":"quicksilver","instructions":"backend prompt\n\nstartup context"}"# +} + fn create_config_toml( codex_home: &Path, responses_server_uri: &str, realtime_server_uri: &str, realtime_enabled: bool, startup_context: StartupContextConfig<'_>, +) -> std::io::Result<()> { + create_config_toml_with_realtime_version( + codex_home, + responses_server_uri, + realtime_server_uri, + realtime_enabled, + startup_context, + RealtimeTestVersion::V2, + RealtimeTestSandbox::ReadOnly, + ) +} + +fn create_config_toml_with_realtime_version( + codex_home: &Path, + responses_server_uri: &str, + realtime_server_uri: &str, + realtime_enabled: bool, + startup_context: StartupContextConfig<'_>, + realtime_version: RealtimeTestVersion, + sandbox: RealtimeTestSandbox, ) -> std::io::Result<()> { let realtime_feature_key = FEATURES .iter() .find(|spec| spec.id == Feature::RealtimeConversation) .map(|spec| spec.key) .unwrap_or("realtime_conversation"); + let realtime_version = realtime_version.config_value(); + let sandbox = sandbox.config_value(); let startup_context = match startup_context { StartupContextConfig::Generated => String::new(), StartupContextConfig::Override(context) => { @@ -747,14 +1686,14 @@ fn create_config_toml( r#" model = "mock-model" approval_policy = "never" -sandbox_mode = "read-only" +sandbox_mode = "{sandbox}" model_provider = "mock_provider" experimental_realtime_ws_base_url = "{realtime_server_uri}" experimental_realtime_ws_backend_prompt = "backend prompt" {startup_context} [realtime] -version = "v2" +version = "{realtime_version}" type = "conversational" [features]