From 5eb7e861142e693b6354298ea0aefcea283c982f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 00:11:28 -0700 Subject: [PATCH 01/13] Unify realtime shutdown in core Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 135 ++++++++++++------ .../core/tests/suite/realtime_conversation.rs | 52 +++++++ 2 files changed, 147 insertions(+), 40 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 938f922f877..44b8cfde48f 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -56,6 +56,19 @@ const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000; const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str = "Conversation already has an active response in progress:"; +#[derive(Debug)] +enum RealtimeConversationEnd { + Requested, + TransportClosed, + Error(RealtimeConversationError), +} + +#[derive(Debug)] +enum RealtimeConversationError { + Emit(String), + AlreadySent, +} + pub(crate) struct RealtimeConversationManager { state: Mutex>, } @@ -344,6 +357,23 @@ pub(crate) async fn handle_start( sess: &Arc, sub_id: String, params: ConversationStartParams, +) -> CodexResult<()> { + if let Err(err) = handle_start_inner(sess, &sub_id, params).await { + error!("failed to start realtime conversation: {err}"); + end_realtime_conversation( + sess, + sub_id, + RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + ) + .await; + } + Ok(()) +} + +async fn handle_start_inner( + sess: &Arc, + sub_id: &str, + params: ConversationStartParams, ) -> CodexResult<()> { let provider = sess.provider().await; let auth = sess.services.auth_manager.auth().await; @@ -392,23 +422,15 @@ pub(crate) async fn handle_start( let extra_headers = realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?; info!("starting realtime conversation"); - let (events_rx, realtime_active) = match sess + let (events_rx, realtime_active) = sess .conversation .start(api_provider, extra_headers, session_config) - .await - { - Ok(events_rx) => events_rx, - Err(err) => { - error!("failed to start realtime conversation: {err}"); - send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await; - return Ok(()); - } - }; + .await?; info!("realtime conversation started"); sess.send_event_raw(Event { - id: sub_id.clone(), + id: sub_id.to_string(), msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent { session_id: requested_session_id, }), @@ -416,11 +438,13 @@ pub(crate) async fn handle_start( .await; let sess_clone = Arc::clone(sess); + let sub_id = sub_id.to_string(); tokio::spawn(async move { let ev = |msg| Event { id: sub_id.clone(), msg, }; + let mut end = RealtimeConversationEnd::TransportClosed; while let Ok(event) = events_rx.recv().await { // if not audio out, log the event if !matches!(event, RealtimeEvent::AudioOut(_)) { @@ -429,6 +453,9 @@ pub(crate) async fn handle_start( "received realtime conversation event" ); } + if matches!(event, RealtimeEvent::Error(_)) { + end = RealtimeConversationEnd::Error(RealtimeConversationError::AlreadySent); + } let maybe_routed_text = match &event { RealtimeEvent::HandoffRequested(handoff) => { realtime_text_from_handoff_request(handoff) @@ -449,14 +476,10 @@ pub(crate) async fn handle_start( .await; } if realtime_active.swap(false, Ordering::Relaxed) { - info!("realtime conversation transport closed"); - sess_clone - .send_event_raw(ev(EventMsg::RealtimeConversationClosed( - RealtimeConversationClosedEvent { - reason: Some("transport_closed".to_string()), - }, - ))) - .await; + if matches!(end, RealtimeConversationEnd::TransportClosed) { + info!("realtime conversation transport closed"); + } + end_realtime_conversation(&sess_clone, sub_id, end).await; } }); @@ -470,7 +493,17 @@ pub(crate) async fn handle_audio( ) { if let Err(err) = sess.conversation.audio_in(params.frame).await { error!("failed to append realtime audio: {err}"); - send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; + if sess.conversation.running_state().await.is_some() { + end_realtime_conversation( + sess, + sub_id, + RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + ) + .await; + } else { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) + .await; + } } } @@ -545,25 +578,22 @@ pub(crate) async fn handle_text( debug!(text = %params.text, "[realtime-text] appending realtime conversation text input"); if let Err(err) = sess.conversation.text_in(params.text).await { error!("failed to append realtime text: {err}"); - send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; + if sess.conversation.running_state().await.is_some() { + end_realtime_conversation( + sess, + sub_id, + RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + ) + .await; + } else { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) + .await; + } } } pub(crate) async fn handle_close(sess: &Arc, sub_id: String) { - match sess.conversation.shutdown().await { - Ok(()) => { - sess.send_event_raw(Event { - id: sub_id, - msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { - reason: Some("requested".to_string()), - }), - }) - .await; - } - Err(err) => { - send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await; - } - } + end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await; } fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { @@ -771,11 +801,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { } } Ok(None) => { - let _ = events_tx - .send(RealtimeEvent::Error( - "realtime websocket connection is closed".to_string(), - )) - .await; break; } Err(err) => { @@ -868,6 +893,36 @@ async fn send_conversation_error( .await; } +async fn end_realtime_conversation( + sess: &Arc, + sub_id: String, + end: RealtimeConversationEnd, +) { + let _ = sess.conversation.shutdown().await; + + if let RealtimeConversationEnd::Error(RealtimeConversationError::Emit(message)) = &end { + sess.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message.clone()), + }), + }) + .await; + } + + let reason = match end { + RealtimeConversationEnd::Requested => Some("requested".to_string()), + RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()), + RealtimeConversationEnd::Error(_) => Some("error".to_string()), + }; + + sess.send_event_raw(Event { + id: sub_id, + msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { reason }), + }) + .await; +} + #[cfg(test)] #[path = "realtime_conversation_tests.rs"] mod tests; diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 4ab98712147..f409dab3d79 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -12,6 +12,7 @@ use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; +use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::SessionSource; @@ -381,6 +382,15 @@ impl EnvGuard { } Self { key, original } } + + fn unset(key: &'static str) -> Self { + let original = std::env::var_os(key); + // SAFETY: this guard restores the original value before the test exits. + unsafe { + std::env::remove_var(key); + } + Self { key, original } + } } impl Drop for EnvGuard { @@ -427,6 +437,48 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial(openai_api_key_env)] +async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> { + skip_if_no_network!(Ok(())); + + let _env_guard = EnvGuard::unset(OPENAI_API_KEY_ENV_VAR); + let server = start_websocket_server(vec![]).await; + let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let test = builder.build_with_websocket_server(&server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let err = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }) => Some(message.clone()), + _ => None, + }) + .await; + assert_eq!(err, "realtime conversation requires API key auth"); + + let closed = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + assert_eq!( + closed, + RealtimeConversationClosedEvent { + reason: Some("error".to_string()), + } + ); + + server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_text_before_start_emits_error() -> Result<()> { skip_if_no_network!(Ok(())); From 5f2320342e6a5fa1d1b69b5be7ef89346568e3bc Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 09:33:52 -0700 Subject: [PATCH 02/13] Align realtime error close handling Co-authored-by: Codex --- .../tests/suite/v2/realtime_conversation.rs | 2 +- codex-rs/core/src/realtime_conversation.rs | 23 ++++++++----------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 71b6d6dcf33..a7aa1eb07a4 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -188,7 +188,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { read_notification::(&mut mcp, "thread/realtime/closed") .await?; assert_eq!(closed.thread_id, output_audio.thread_id); - assert_eq!(closed.reason.as_deref(), Some("transport_closed")); + assert_eq!(closed.reason.as_deref(), Some("error")); let connections = realtime_server.connections(); assert_eq!(connections.len(), 1); diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 44b8cfde48f..749d6742b75 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -60,13 +60,8 @@ const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str = enum RealtimeConversationEnd { Requested, TransportClosed, - Error(RealtimeConversationError), -} - -#[derive(Debug)] -enum RealtimeConversationError { - Emit(String), - AlreadySent, + Error(String), + ErrorAlreadySent, } pub(crate) struct RealtimeConversationManager { @@ -363,7 +358,7 @@ pub(crate) async fn handle_start( end_realtime_conversation( sess, sub_id, - RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + RealtimeConversationEnd::Error(err.to_string()), ) .await; } @@ -454,7 +449,7 @@ async fn handle_start_inner( ); } if matches!(event, RealtimeEvent::Error(_)) { - end = RealtimeConversationEnd::Error(RealtimeConversationError::AlreadySent); + end = RealtimeConversationEnd::ErrorAlreadySent; } let maybe_routed_text = match &event { RealtimeEvent::HandoffRequested(handoff) => { @@ -497,7 +492,7 @@ pub(crate) async fn handle_audio( end_realtime_conversation( sess, sub_id, - RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + RealtimeConversationEnd::Error(err.to_string()), ) .await; } else { @@ -582,7 +577,7 @@ pub(crate) async fn handle_text( end_realtime_conversation( sess, sub_id, - RealtimeConversationEnd::Error(RealtimeConversationError::Emit(err.to_string())), + RealtimeConversationEnd::Error(err.to_string()), ) .await; } else { @@ -900,7 +895,7 @@ async fn end_realtime_conversation( ) { let _ = sess.conversation.shutdown().await; - if let RealtimeConversationEnd::Error(RealtimeConversationError::Emit(message)) = &end { + if let RealtimeConversationEnd::Error(message) = &end { sess.send_event_raw(Event { id: sub_id.clone(), msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { @@ -913,7 +908,9 @@ async fn end_realtime_conversation( let reason = match end { RealtimeConversationEnd::Requested => Some("requested".to_string()), RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()), - RealtimeConversationEnd::Error(_) => Some("error".to_string()), + RealtimeConversationEnd::Error(_) | RealtimeConversationEnd::ErrorAlreadySent => { + Some("error".to_string()) + } }; sess.send_event_raw(Event { From f4fe74b190c94a5b56fab440815ca66d12419283 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 09:46:27 -0700 Subject: [PATCH 03/13] Simplify realtime error shutdown Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 58 ++++++++++------------ 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 749d6742b75..9802577ca18 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -60,8 +60,7 @@ const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str = enum RealtimeConversationEnd { Requested, TransportClosed, - Error(String), - ErrorAlreadySent, + Error, } pub(crate) struct RealtimeConversationManager { @@ -355,12 +354,15 @@ pub(crate) async fn handle_start( ) -> CodexResult<()> { if let Err(err) = handle_start_inner(sess, &sub_id, params).await { error!("failed to start realtime conversation: {err}"); - end_realtime_conversation( - sess, - sub_id, - RealtimeConversationEnd::Error(err.to_string()), - ) + let message = err.to_string(); + sess.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }), + }) .await; + end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; } Ok(()) } @@ -449,7 +451,7 @@ async fn handle_start_inner( ); } if matches!(event, RealtimeEvent::Error(_)) { - end = RealtimeConversationEnd::ErrorAlreadySent; + end = RealtimeConversationEnd::Error; } let maybe_routed_text = match &event { RealtimeEvent::HandoffRequested(handoff) => { @@ -489,12 +491,15 @@ pub(crate) async fn handle_audio( if let Err(err) = sess.conversation.audio_in(params.frame).await { error!("failed to append realtime audio: {err}"); if sess.conversation.running_state().await.is_some() { - end_realtime_conversation( - sess, - sub_id, - RealtimeConversationEnd::Error(err.to_string()), - ) + let message = err.to_string(); + sess.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }), + }) .await; + end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; } else { send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) .await; @@ -574,12 +579,15 @@ pub(crate) async fn handle_text( if let Err(err) = sess.conversation.text_in(params.text).await { error!("failed to append realtime text: {err}"); if sess.conversation.running_state().await.is_some() { - end_realtime_conversation( - sess, - sub_id, - RealtimeConversationEnd::Error(err.to_string()), - ) + let message = err.to_string(); + sess.send_event_raw(Event { + id: sub_id.clone(), + msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }), + }) .await; + end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; } else { send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) .await; @@ -895,22 +903,10 @@ async fn end_realtime_conversation( ) { let _ = sess.conversation.shutdown().await; - if let RealtimeConversationEnd::Error(message) = &end { - sess.send_event_raw(Event { - id: sub_id.clone(), - msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::Error(message.clone()), - }), - }) - .await; - } - let reason = match end { RealtimeConversationEnd::Requested => Some("requested".to_string()), RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()), - RealtimeConversationEnd::Error(_) | RealtimeConversationEnd::ErrorAlreadySent => { - Some("error".to_string()) - } + RealtimeConversationEnd::Error => Some("error".to_string()), }; sess.send_event_raw(Event { From daf3c8b1f861d13e579b8fb6e19f9c01c97d6ae9 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 09:51:00 -0700 Subject: [PATCH 04/13] Remove unsafe env mutation from realtime tests Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 6 +- .../core/src/realtime_conversation_tests.rs | 31 +++++ .../core/tests/suite/realtime_conversation.rs | 108 +----------------- 3 files changed, 40 insertions(+), 105 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 9802577ca18..62007c11295 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -374,7 +374,8 @@ async fn handle_start_inner( ) -> CodexResult<()> { let provider = sess.provider().await; let auth = sess.services.auth_manager.auth().await; - let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; + let realtime_api_key = + realtime_api_key(auth.as_ref(), &provider, read_openai_api_key_from_env)?; let mut api_provider = provider.to_api_provider(Some(crate::auth::AuthMode::ApiKey))?; let config = sess.get_config().await; if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url { @@ -524,6 +525,7 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt fn realtime_api_key( auth: Option<&CodexAuth>, provider: &crate::ModelProviderInfo, + openai_api_key_from_env: impl FnOnce() -> Option, ) -> CodexResult { if let Some(api_key) = provider.api_key()? { return Ok(api_key); @@ -540,7 +542,7 @@ fn realtime_api_key( // TODO(aibrahim): Remove this temporary fallback once realtime auth no longer // requires API key auth for ChatGPT/SIWC sessions. if provider.is_openai() - && let Some(api_key) = read_openai_api_key_from_env() + && let Some(api_key) = openai_api_key_from_env() { return Ok(api_key); } diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index 0a32d063c06..9ebff31afe4 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -1,6 +1,9 @@ use super::RealtimeHandoffState; use super::RealtimeSessionKind; +use super::realtime_api_key; use super::realtime_text_from_handoff_request; +use crate::CodexAuth; +use crate::ModelProviderInfo; use async_channel::bounded; use codex_protocol::protocol::RealtimeHandoffRequested; use codex_protocol::protocol::RealtimeTranscriptEntry; @@ -68,3 +71,31 @@ async fn clears_active_handoff_explicitly() { *state.active_handoff.lock().await = None; assert_eq!(state.active_handoff.lock().await.clone(), None); } + +#[test] +fn uses_openai_env_fallback_for_chatgpt_auth() { + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None); + + let api_key = realtime_api_key(Some(&auth), &provider, || { + Some("env-realtime-key".to_string()) + }) + .expect("openai env fallback should provide realtime api key"); + + assert_eq!(api_key, "env-realtime-key".to_string()); +} + +#[test] +fn errors_without_api_key_for_non_openai_chatgpt_auth() { + let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); + let mut provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None); + provider.name = "Test Provider".to_string(); + + let err = realtime_api_key(Some(&auth), &provider, || Some("ignored".to_string())) + .expect_err("non-openai provider should not use openai env fallback"); + + assert_eq!( + err.to_string(), + "realtime conversation requires API key auth" + ); +} diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index f409dab3d79..046b669550e 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -2,7 +2,6 @@ use anyhow::Context; use anyhow::Result; use chrono::Utc; use codex_core::CodexAuth; -use codex_core::auth::OPENAI_API_KEY_ENV_VAR; use codex_protocol::ThreadId; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::ConversationAudioParams; @@ -30,8 +29,6 @@ use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; -use serial_test::serial; -use std::ffi::OsString; use std::fs; use std::time::Duration; use tokio::sync::oneshot; @@ -258,66 +255,6 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[serial(openai_api_key_env)] -async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> { - skip_if_no_network!(Ok(())); - - let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key"); - let server = start_websocket_server(vec![ - vec![], - vec![vec![json!({ - "type": "session.updated", - "session": { "id": "sess_env", "instructions": "backend prompt" } - })]], - ]) - .await; - - let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let test = builder.build_with_websocket_server(&server).await?; - assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); - - test.codex - .submit(Op::RealtimeConversationStart(ConversationStartParams { - prompt: "backend prompt".to_string(), - session_id: None, - })) - .await?; - - let started = wait_for_event_match(&test.codex, |msg| match msg { - EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), - EventMsg::Error(err) => Some(Err(err.clone())), - _ => None, - }) - .await - .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); - assert!(started.session_id.is_some()); - - let session_updated = wait_for_event_match(&test.codex, |msg| match msg { - EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionUpdated { session_id, .. }, - }) => Some(session_id.clone()), - _ => None, - }) - .await; - assert_eq!(session_updated, "sess_env"); - - assert_eq!( - server.handshakes()[1].header("authorization").as_deref(), - Some("Bearer env-realtime-key") - ); - - test.codex.submit(Op::RealtimeConversationClose).await?; - let _closed = wait_for_event_match(&test.codex, |msg| match msg { - EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), - _ => None, - }) - .await; - - server.shutdown().await; - Ok(()) -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_transport_close_emits_closed_event() -> Result<()> { skip_if_no_network!(Ok(())); @@ -368,43 +305,6 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> { Ok(()) } -struct EnvGuard { - key: &'static str, - original: Option, -} - -impl EnvGuard { - fn set(key: &'static str, value: &str) -> Self { - let original = std::env::var_os(key); - // SAFETY: this guard restores the original value before the test exits. - unsafe { - std::env::set_var(key, value); - } - Self { key, original } - } - - fn unset(key: &'static str) -> Self { - let original = std::env::var_os(key); - // SAFETY: this guard restores the original value before the test exits. - unsafe { - std::env::remove_var(key); - } - Self { key, original } - } -} - -impl Drop for EnvGuard { - fn drop(&mut self) { - // SAFETY: this guard restores the original value for the modified env var. - unsafe { - match &self.original { - Some(value) => std::env::set_var(self.key, value), - None => std::env::remove_var(self.key), - } - } - } -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_audio_before_start_emits_error() -> Result<()> { skip_if_no_network!(Ok(())); @@ -438,13 +338,15 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[serial(openai_api_key_env)] async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> { skip_if_no_network!(Ok(())); - let _env_guard = EnvGuard::unset(OPENAI_API_KEY_ENV_VAR); let server = start_websocket_server(vec![]).await; - let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let mut builder = test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_provider.name = "Test Provider".to_string(); + }); let test = builder.build_with_websocket_server(&server).await?; test.codex From 24ffd8d6f85f569c7e53a58f794034f6c72c3388 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 09:58:59 -0700 Subject: [PATCH 05/13] Keep realtime env tests end-to-end Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 6 +- .../core/src/realtime_conversation_tests.rs | 31 ----- .../core/tests/suite/realtime_conversation.rs | 109 +++++++++++++++++- 3 files changed, 106 insertions(+), 40 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 62007c11295..9802577ca18 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -374,8 +374,7 @@ async fn handle_start_inner( ) -> CodexResult<()> { let provider = sess.provider().await; let auth = sess.services.auth_manager.auth().await; - let realtime_api_key = - realtime_api_key(auth.as_ref(), &provider, read_openai_api_key_from_env)?; + let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; let mut api_provider = provider.to_api_provider(Some(crate::auth::AuthMode::ApiKey))?; let config = sess.get_config().await; if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url { @@ -525,7 +524,6 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt fn realtime_api_key( auth: Option<&CodexAuth>, provider: &crate::ModelProviderInfo, - openai_api_key_from_env: impl FnOnce() -> Option, ) -> CodexResult { if let Some(api_key) = provider.api_key()? { return Ok(api_key); @@ -542,7 +540,7 @@ fn realtime_api_key( // TODO(aibrahim): Remove this temporary fallback once realtime auth no longer // requires API key auth for ChatGPT/SIWC sessions. if provider.is_openai() - && let Some(api_key) = openai_api_key_from_env() + && let Some(api_key) = read_openai_api_key_from_env() { return Ok(api_key); } diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index 9ebff31afe4..0a32d063c06 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -1,9 +1,6 @@ use super::RealtimeHandoffState; use super::RealtimeSessionKind; -use super::realtime_api_key; use super::realtime_text_from_handoff_request; -use crate::CodexAuth; -use crate::ModelProviderInfo; use async_channel::bounded; use codex_protocol::protocol::RealtimeHandoffRequested; use codex_protocol::protocol::RealtimeTranscriptEntry; @@ -71,31 +68,3 @@ async fn clears_active_handoff_explicitly() { *state.active_handoff.lock().await = None; assert_eq!(state.active_handoff.lock().await.clone(), None); } - -#[test] -fn uses_openai_env_fallback_for_chatgpt_auth() { - let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); - let provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None); - - let api_key = realtime_api_key(Some(&auth), &provider, || { - Some("env-realtime-key".to_string()) - }) - .expect("openai env fallback should provide realtime api key"); - - assert_eq!(api_key, "env-realtime-key".to_string()); -} - -#[test] -fn errors_without_api_key_for_non_openai_chatgpt_auth() { - let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing(); - let mut provider = ModelProviderInfo::create_openai_provider(/*base_url*/ None); - provider.name = "Test Provider".to_string(); - - let err = realtime_api_key(Some(&auth), &provider, || Some("ignored".to_string())) - .expect_err("non-openai provider should not use openai env fallback"); - - assert_eq!( - err.to_string(), - "realtime conversation requires API key auth" - ); -} diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 046b669550e..87aee749169 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -2,6 +2,7 @@ use anyhow::Context; use anyhow::Result; use chrono::Utc; use codex_core::CodexAuth; +use codex_core::auth::OPENAI_API_KEY_ENV_VAR; use codex_protocol::ThreadId; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::ConversationAudioParams; @@ -30,12 +31,15 @@ use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::fs; +use std::process::Command; use std::time::Duration; use tokio::sync::oneshot; const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; const MEMORY_PROMPT_PHRASE: &str = "You have access to a memory folder with guidance from prior runs."; +const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str = + "CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS"; fn websocket_request_text( request: &core_test_support::responses::WebSocketRequest, ) -> Option { @@ -79,6 +83,33 @@ where tokio::time::sleep(Duration::from_millis(10)).await; } } + +fn run_realtime_conversation_test_in_subprocess( + test_name: &str, + openai_api_key: Option<&str>, +) -> Result<()> { + let mut command = Command::new(std::env::current_exe()?); + command + .arg("--exact") + .arg(test_name) + .env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1"); + match openai_api_key { + Some(openai_api_key) => { + command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key); + } + None => { + command.env_remove(OPENAI_API_KEY_ENV_VAR); + } + } + let output = command.output()?; + assert!( + output.status.success(), + "subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr), + ); + Ok(()) +} async fn seed_recent_thread( test: &TestCodex, title: &str, @@ -255,6 +286,71 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> { + if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() { + return run_realtime_conversation_test_in_subprocess( + "suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth", + Some("env-realtime-key"), + ); + } + + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![ + vec![], + vec![vec![json!({ + "type": "session.updated", + "session": { "id": "sess_env", "instructions": "backend prompt" } + })]], + ]) + .await; + + let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let test = builder.build_with_websocket_server(&server).await?; + assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let started = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + assert!(started.session_id.is_some()); + + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + assert_eq!(session_updated, "sess_env"); + + assert_eq!( + server.handshakes()[1].header("authorization").as_deref(), + Some("Bearer env-realtime-key") + ); + + test.codex.submit(Op::RealtimeConversationClose).await?; + let _closed = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + + server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_transport_close_emits_closed_event() -> Result<()> { skip_if_no_network!(Ok(())); @@ -339,14 +435,17 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> { + if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() { + return run_realtime_conversation_test_in_subprocess( + "suite::realtime_conversation::conversation_start_failure_emits_realtime_error_and_closed", + None, + ); + } + skip_if_no_network!(Ok(())); let server = start_websocket_server(vec![]).await; - let mut builder = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(|config| { - config.model_provider.name = "Test Provider".to_string(); - }); + let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); let test = builder.build_with_websocket_server(&server).await?; test.codex From 51108dccaa6e3330a5e95af037b4b0e4ba962043 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 10:34:05 -0700 Subject: [PATCH 06/13] codex: address PR review feedback (#14902) Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 49 +++++++++++++++++-- .../core/tests/suite/realtime_conversation.rs | 44 ++++++++++++++++- 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 9802577ca18..7f10aa49b3a 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -352,7 +352,23 @@ pub(crate) async fn handle_start( sub_id: String, params: ConversationStartParams, ) -> CodexResult<()> { - if let Err(err) = handle_start_inner(sess, &sub_id, params).await { + let prepared_start = match prepare_realtime_start(sess, params).await { + Ok(prepared_start) => prepared_start, + Err(err) => { + error!("failed to prepare realtime conversation: {err}"); + let message = err.to_string(); + sess.send_event_raw(Event { + id: sub_id, + msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }), + }) + .await; + return Ok(()); + } + }; + + if let Err(err) = handle_start_inner(sess, &sub_id, prepared_start).await { error!("failed to start realtime conversation: {err}"); let message = err.to_string(); sess.send_event_raw(Event { @@ -367,11 +383,17 @@ pub(crate) async fn handle_start( Ok(()) } -async fn handle_start_inner( +struct PreparedRealtimeConversationStart { + api_provider: ApiProvider, + extra_headers: Option, + requested_session_id: Option, + session_config: RealtimeSessionConfig, +} + +async fn prepare_realtime_start( sess: &Arc, - sub_id: &str, params: ConversationStartParams, -) -> CodexResult<()> { +) -> CodexResult { let provider = sess.provider().await; let auth = sess.services.auth_manager.auth().await; let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; @@ -418,6 +440,25 @@ async fn handle_start_inner( }; let extra_headers = realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?; + Ok(PreparedRealtimeConversationStart { + api_provider, + extra_headers, + requested_session_id, + session_config, + }) +} + +async fn handle_start_inner( + sess: &Arc, + sub_id: &str, + prepared_start: PreparedRealtimeConversationStart, +) -> CodexResult<()> { + let PreparedRealtimeConversationStart { + api_provider, + extra_headers, + requested_session_id, + session_config, + } = prepared_start; info!("starting realtime conversation"); let (events_rx, realtime_active) = sess .conversation diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 87aee749169..e149c06d59c 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -434,10 +434,10 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result<()> { +async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> { if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() { return run_realtime_conversation_test_in_subprocess( - "suite::realtime_conversation::conversation_start_failure_emits_realtime_error_and_closed", + "suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only", None, ); } @@ -464,6 +464,46 @@ async fn conversation_start_failure_emits_realtime_error_and_closed() -> Result< .await; assert_eq!(err, "realtime conversation requires API key auth"); + let closed = timeout(Duration::from_millis(200), async { + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await + }) + .await; + assert!(closed.is_err(), "preflight failure should not emit closed"); + + server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_start_connect_failure_emits_realtime_error_and_closed() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_websocket_server(vec![]).await; + let mut builder = test_codex().with_config(|config| { + config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string()); + }); + let test = builder.build_with_websocket_server(&server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let err = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::Error(message), + }) => Some(message.clone()), + _ => None, + }) + .await; + assert!(!err.is_empty()); + let closed = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), _ => None, From 0e0481ba0fa51ae3ca28e1e5da8cd370402938f2 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 10:50:11 -0700 Subject: [PATCH 07/13] codex: fix realtime preflight test import (#14902) Co-authored-by: Codex --- codex-rs/core/tests/suite/realtime_conversation.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index e149c06d59c..d2f51bb1ec5 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -34,6 +34,7 @@ use std::fs; use std::process::Command; use std::time::Duration; use tokio::sync::oneshot; +use tokio::time::timeout; const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; const MEMORY_PROMPT_PHRASE: &str = From ed3454c3850e56246bb0c7c2398ff2eac47844a8 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 17 Mar 2026 11:14:01 -0700 Subject: [PATCH 08/13] codex: tighten realtime shutdown ordering (#14902) Co-own the realtime fanout task so shutdown stops the full session before closed is emitted. Also keep start/connect failures as error-only until started is sent.\n\nCo-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 148 ++++++++++++++---- .../core/tests/suite/realtime_conversation.rs | 19 +-- 2 files changed, 126 insertions(+), 41 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 7f10aa49b3a..17c1450cca4 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -63,6 +63,11 @@ enum RealtimeConversationEnd { Error, } +enum RealtimeFanoutTaskStop { + Abort, + Detach, +} + pub(crate) struct RealtimeConversationManager { state: Mutex>, } @@ -127,7 +132,8 @@ struct ConversationState { user_text_tx: Sender, writer: RealtimeWebsocketWriter, handoff: RealtimeHandoffState, - task: JoinHandle<()>, + input_task: JoinHandle<()>, + fanout_task: Option>, realtime_active: Arc, } @@ -157,9 +163,7 @@ impl RealtimeConversationManager { guard.take() }; if let Some(state) = previous_state { - state.realtime_active.store(false, Ordering::Relaxed); - state.task.abort(); - let _ = state.task.await; + stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await; } let session_kind = match session_config.event_parser { RealtimeEventParser::V1 => RealtimeSessionKind::V1, @@ -206,12 +210,48 @@ impl RealtimeConversationManager { user_text_tx, writer, handoff, - task, + input_task: task, + fanout_task: None, realtime_active: Arc::clone(&realtime_active), }); Ok((events_rx, realtime_active)) } + pub(crate) async fn register_fanout_task( + &self, + realtime_active: &Arc, + fanout_task: JoinHandle<()>, + ) { + let mut fanout_task = Some(fanout_task); + { + let mut guard = self.state.lock().await; + if let Some(state) = guard.as_mut() + && Arc::ptr_eq(&state.realtime_active, realtime_active) + { + state.fanout_task = fanout_task.take(); + } + } + + if let Some(fanout_task) = fanout_task { + fanout_task.abort(); + let _ = fanout_task.await; + } + } + + pub(crate) async fn finish_if_active(&self, realtime_active: &Arc) { + let state = { + let mut guard = self.state.lock().await; + match guard.as_ref() { + Some(state) if Arc::ptr_eq(&state.realtime_active, realtime_active) => guard.take(), + _ => None, + } + }; + + if let Some(state) = state { + stop_conversation_state(state, RealtimeFanoutTaskStop::Detach).await; + } + } + pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> { let sender = { let guard = self.state.lock().await; @@ -339,14 +379,32 @@ impl RealtimeConversationManager { }; if let Some(state) = state { - state.realtime_active.store(false, Ordering::Relaxed); - state.task.abort(); - let _ = state.task.await; + stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await; } Ok(()) } } +async fn stop_conversation_state( + mut state: ConversationState, + fanout_task_stop: RealtimeFanoutTaskStop, +) { + state.realtime_active.store(false, Ordering::Relaxed); + state.input_task.abort(); + let _ = state.input_task.await; + + match state.fanout_task.take() { + Some(fanout_task) => match fanout_task_stop { + RealtimeFanoutTaskStop::Abort => { + fanout_task.abort(); + let _ = fanout_task.await; + } + RealtimeFanoutTaskStop::Detach => {} + }, + None => {} + } +} + pub(crate) async fn handle_start( sess: &Arc, sub_id: String, @@ -378,7 +436,6 @@ pub(crate) async fn handle_start( }), }) .await; - end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; } Ok(()) } @@ -477,13 +534,17 @@ async fn handle_start_inner( let sess_clone = Arc::clone(sess); let sub_id = sub_id.to_string(); - tokio::spawn(async move { + let fanout_realtime_active = Arc::clone(&realtime_active); + let fanout_task = tokio::spawn(async move { let ev = |msg| Event { id: sub_id.clone(), msg, }; let mut end = RealtimeConversationEnd::TransportClosed; while let Ok(event) = events_rx.recv().await { + if !fanout_realtime_active.load(Ordering::Relaxed) { + break; + } // if not audio out, log the event if !matches!(event, RealtimeEvent::AudioOut(_)) { info!( @@ -505,6 +566,9 @@ async fn handle_start_inner( let sess_for_routed_text = Arc::clone(&sess_clone); sess_for_routed_text.route_realtime_text_input(text).await; } + if !fanout_realtime_active.load(Ordering::Relaxed) { + break; + } sess_clone .send_event_raw(ev(EventMsg::RealtimeConversationRealtime( RealtimeConversationRealtimeEvent { @@ -513,13 +577,20 @@ async fn handle_start_inner( ))) .await; } - if realtime_active.swap(false, Ordering::Relaxed) { + if fanout_realtime_active.swap(false, Ordering::Relaxed) { if matches!(end, RealtimeConversationEnd::TransportClosed) { info!("realtime conversation transport closed"); } - end_realtime_conversation(&sess_clone, sub_id, end).await; + sess_clone + .conversation + .finish_if_active(&fanout_realtime_active) + .await; + send_realtime_conversation_closed(&sess_clone, sub_id, end).await; } }); + sess.conversation + .register_fanout_task(&realtime_active, fanout_task) + .await; Ok(()) } @@ -532,15 +603,7 @@ pub(crate) async fn handle_audio( if let Err(err) = sess.conversation.audio_in(params.frame).await { error!("failed to append realtime audio: {err}"); if sess.conversation.running_state().await.is_some() { - let message = err.to_string(); - sess.send_event_raw(Event { - id: sub_id.clone(), - msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::Error(message), - }), - }) - .await; - end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; + warn!("realtime audio input failed while the session was already ending"); } else { send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) .await; @@ -620,15 +683,7 @@ pub(crate) async fn handle_text( if let Err(err) = sess.conversation.text_in(params.text).await { error!("failed to append realtime text: {err}"); if sess.conversation.running_state().await.is_some() { - let message = err.to_string(); - sess.send_event_raw(Event { - id: sub_id.clone(), - msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::Error(message), - }), - }) - .await; - end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Error).await; + warn!("realtime text input failed while the session was already ending"); } else { send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) .await; @@ -665,6 +720,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { if let Err(err) = writer.send_conversation_item_create(text).await { let mapped_error = map_api_error(err); warn!("failed to send input text: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } if matches!(session_kind, RealtimeSessionKind::V2) { @@ -673,6 +731,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { } else if let Err(err) = writer.send_response_create().await { let mapped_error = map_api_error(err); warn!("failed to send text response.create: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } else { pending_response_create = false; @@ -697,6 +758,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { { let mapped_error = map_api_error(err); warn!("failed to send handoff output: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } } @@ -710,6 +774,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { { let mapped_error = map_api_error(err); warn!("failed to send handoff output: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } if matches!(session_kind, RealtimeSessionKind::V2) { @@ -720,6 +787,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { warn!( "failed to send handoff response.create: {mapped_error}" ); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } else { pending_response_create = false; @@ -757,6 +827,11 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { warn!( "failed to send deferred response.create: {mapped_error}" ); + let _ = events_tx + .send(RealtimeEvent::Error( + mapped_error.to_string(), + )) + .await; break; } pending_response_create = false; @@ -804,6 +879,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { warn!( "failed to send deferred response.create after cancellation: {mapped_error}" ); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } pending_response_create = false; @@ -867,6 +945,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { if let Err(err) = writer.send_audio_frame(frame).await { let mapped_error = map_api_error(err); error!("failed to send input audio: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; break; } } @@ -943,7 +1024,14 @@ async fn end_realtime_conversation( end: RealtimeConversationEnd, ) { let _ = sess.conversation.shutdown().await; + send_realtime_conversation_closed(sess, sub_id, end).await; +} +async fn send_realtime_conversation_closed( + sess: &Arc, + sub_id: String, + end: RealtimeConversationEnd, +) { let reason = match end { RealtimeConversationEnd::Requested => Some("requested".to_string()), RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()), diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index d2f51bb1ec5..7a6f521d1fc 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -12,7 +12,6 @@ use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; -use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::SessionSource; @@ -480,7 +479,7 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn conversation_start_connect_failure_emits_realtime_error_and_closed() -> Result<()> { +async fn conversation_start_connect_failure_emits_realtime_error_only() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_websocket_server(vec![]).await; @@ -505,17 +504,15 @@ async fn conversation_start_connect_failure_emits_realtime_error_and_closed() -> .await; assert!(!err.is_empty()); - let closed = wait_for_event_match(&test.codex, |msg| match msg { - EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), - _ => None, + let closed = timeout(Duration::from_millis(200), async { + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await }) .await; - assert_eq!( - closed, - RealtimeConversationClosedEvent { - reason: Some("error".to_string()), - } - ); + assert!(closed.is_err(), "connect failure should not emit closed"); server.shutdown().await; Ok(()) From de9f3d3835e44c1b520d3b0a8393315e6a5b6d31 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:04:17 +0000 Subject: [PATCH 09/13] codex: fix CI failure on PR #14902 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 17c1450cca4..277e9c58396 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -485,9 +485,7 @@ async fn prepare_realtime_start( RealtimeWsMode::Conversational => RealtimeSessionMode::Conversational, RealtimeWsMode::Transcription => RealtimeSessionMode::Transcription, }; - let requested_session_id = params - .session_id - .or_else(|| Some(sess.conversation_id.to_string())); + let requested_session_id = params.session_id.or(Some(sess.conversation_id.to_string())); let session_config = RealtimeSessionConfig { instructions: prompt, model, @@ -620,9 +618,7 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt .join("\n"); (!active_transcript.is_empty()) .then_some(active_transcript) - .or_else(|| { - (!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone()) - }) + .or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone())) } fn realtime_api_key( From b8d83bb0ef410fde94b7183924ad084e6c3e4332 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:14:02 +0000 Subject: [PATCH 10/13] codex: fix remaining lint on PR #14902 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 277e9c58396..cf7f008a609 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -983,7 +983,7 @@ fn update_output_audio_state( fn audio_duration_ms(frame: &RealtimeAudioFrame) -> u32 { let Some(samples_per_channel) = frame .samples_per_channel - .or_else(|| decoded_samples_per_channel(frame)) + .or(decoded_samples_per_channel(frame)) else { return 0; }; From 548f90dc4824e5d7398bf23ce8ed5b143a6a8214 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:57:32 +0000 Subject: [PATCH 11/13] codex: fix clippy single-match on PR #14902 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index cf7f008a609..13727d0b260 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -393,15 +393,14 @@ async fn stop_conversation_state( state.input_task.abort(); let _ = state.input_task.await; - match state.fanout_task.take() { - Some(fanout_task) => match fanout_task_stop { + if let Some(fanout_task) = state.fanout_task.take() { + match fanout_task_stop { RealtimeFanoutTaskStop::Abort => { fanout_task.abort(); let _ = fanout_task.await; } RealtimeFanoutTaskStop::Detach => {} - }, - None => {} + } } } From 66759a2a3c8b2434427884f734aa8bb52773cf98 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com> Date: Tue, 17 Mar 2026 22:11:49 +0000 Subject: [PATCH 12/13] codex: fix CI failure on PR #14902 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 13727d0b260..7ab6ae4906f 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -612,7 +612,7 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt let active_transcript = handoff .active_transcript .iter() - .map(|entry| format!("{}: {}", entry.role, entry.text)) + .map(|entry| format!("{role}: {text}", role = entry.role, text = entry.text)) .collect::>() .join("\n"); (!active_transcript.is_empty()) From 80f7813d6b01692a47c342a3ae6de768ea87844c Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim <219906144+aibrahim-oai@users.noreply.github.com> Date: Tue, 17 Mar 2026 22:30:47 +0000 Subject: [PATCH 13/13] codex: fix CI failure on PR #14902 Co-authored-by: Codex --- codex-rs/core/src/realtime_conversation.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 6c1d69b7cbe..c1c117b2f1e 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -443,6 +443,7 @@ struct PreparedRealtimeConversationStart { api_provider: ApiProvider, extra_headers: Option, requested_session_id: Option, + version: RealtimeWsVersion, session_config: RealtimeSessionConfig, } @@ -499,6 +500,7 @@ async fn prepare_realtime_start( api_provider, extra_headers, requested_session_id, + version, session_config, }) } @@ -512,6 +514,7 @@ async fn handle_start_inner( api_provider, extra_headers, requested_session_id, + version, session_config, } = prepared_start; info!("starting realtime conversation");