diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 14908dbb1f70..aa66a83097cc 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1694,6 +1694,13 @@ ], "type": "object" }, + "RealtimeConversationVersion": { + "enum": [ + "v1", + "v2" + ], + "type": "string" + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ @@ -2857,10 +2864,14 @@ }, "threadId": { "type": "string" + }, + "version": { + "$ref": "#/definitions/RealtimeConversationVersion" } }, "required": [ - "threadId" + "threadId", + "version" ], "type": "object" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index e370546dc2fe..7d9ee35b2de4 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -9785,6 +9785,13 @@ } ] }, + "RealtimeConversationVersion": { + "enum": [ + "v1", + "v2" + ], + "type": "string" + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ @@ -12925,10 +12932,14 @@ }, "threadId": { "type": "string" + }, + "version": { + "$ref": "#/definitions/v2/RealtimeConversationVersion" } }, "required": [ - "threadId" + "threadId", + "version" ], "title": "ThreadRealtimeStartedNotification", "type": "object" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index b069d3e5e7e6..8081af2e3983 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -6573,6 +6573,13 @@ } ] }, + "RealtimeConversationVersion": { + "enum": [ + "v1", + "v2" + ], + "type": "string" + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ @@ -10685,10 +10692,14 @@ }, "threadId": { "type": "string" + }, + "version": { + "$ref": "#/definitions/RealtimeConversationVersion" } }, "required": [ - "threadId" + "threadId", + "version" ], "title": "ThreadRealtimeStartedNotification", "type": "object" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeStartedNotification.json index 1584112640e9..dd94a5cc4985 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRealtimeStartedNotification.json @@ -1,5 +1,14 @@ { "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "RealtimeConversationVersion": { + "enum": [ + "v1", + "v2" + ], + "type": "string" + } + }, "description": "EXPERIMENTAL - emitted when thread realtime startup is accepted.", "properties": { "sessionId": { @@ -10,10 +19,14 @@ }, "threadId": { "type": "string" + }, + "version": { + "$ref": "#/definitions/RealtimeConversationVersion" } }, "required": [ - "threadId" + "threadId", + "version" ], "title": "ThreadRealtimeStartedNotification", "type": "object" diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationVersion.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationVersion.ts new file mode 100644 index 000000000000..cedc4bbe5255 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeConversationVersion.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeConversationVersion = "v1" | "v2"; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 396d07e12b9b..09e75abed8cf 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -49,6 +49,7 @@ export type { NetworkPolicyRuleAction } from "./NetworkPolicyRuleAction"; export type { ParsedCommand } from "./ParsedCommand"; export type { Personality } from "./Personality"; export type { PlanType } from "./PlanType"; +export type { RealtimeConversationVersion } from "./RealtimeConversationVersion"; export type { ReasoningEffort } from "./ReasoningEffort"; export type { ReasoningItemContent } from "./ReasoningItemContent"; export type { ReasoningItemReasoningSummary } from "./ReasoningItemReasoningSummary"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeStartedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeStartedNotification.ts index 736ecde1fe17..d4941006115d 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeStartedNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadRealtimeStartedNotification.ts @@ -1,8 +1,9 @@ // GENERATED CODE! DO NOT MODIFY BY HAND! // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RealtimeConversationVersion } from "../RealtimeConversationVersion"; /** * EXPERIMENTAL - emitted when thread realtime startup is accepted. */ -export type ThreadRealtimeStartedNotification = { threadId: string, sessionId: string | null, }; +export type ThreadRealtimeStartedNotification = { threadId: string, sessionId: string | null, version: RealtimeConversationVersion, }; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 73139a2e09bd..0ecd17b3d3cc 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -946,6 +946,7 @@ mod tests { use codex_protocol::ThreadId; use codex_protocol::account::PlanType; use codex_protocol::parse_command::ParsedCommand; + use codex_protocol::protocol::RealtimeConversationVersion; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; use serde_json::json; @@ -1628,6 +1629,7 @@ mod tests { ServerNotification::ThreadRealtimeStarted(v2::ThreadRealtimeStartedNotification { thread_id: "thr_123".to_string(), session_id: Some("sess_456".to_string()), + version: RealtimeConversationVersion::V1, }); let reason = crate::experimental_api::ExperimentalApi::experimental_reason(¬ification); assert_eq!(reason, Some("thread/realtime/started")); diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index e2316d8e788d..1156ed213f40 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -68,6 +68,7 @@ use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; use codex_protocol::protocol::ReadOnlyAccess as CoreReadOnlyAccess; use codex_protocol::protocol::RealtimeAudioFrame as CoreRealtimeAudioFrame; +use codex_protocol::protocol::RealtimeConversationVersion; use codex_protocol::protocol::ReviewDecision as CoreReviewDecision; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SkillDependencies as CoreSkillDependencies; @@ -3775,6 +3776,7 @@ pub struct ThreadRealtimeStopResponse {} pub struct ThreadRealtimeStartedNotification { pub thread_id: String, pub session_id: Option, + pub version: RealtimeConversationVersion, } /// EXPERIMENTAL - raw non-audio thread realtime item emitted by the backend. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 4f4f995e2c74..8a6b48a47de6 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -338,6 +338,7 @@ pub(crate) async fn apply_bespoke_event_handling( let notification = ThreadRealtimeStartedNotification { thread_id: conversation_id.to_string(), session_id: event.session_id, + version: event.version, }; outgoing .send_server_notification(ServerNotification::ThreadRealtimeStarted( diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 71b6d6dcf338..1c30ee530d11 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -25,6 +25,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_core::features::FEATURES; use codex_core::features::Feature; +use codex_protocol::protocol::RealtimeConversationVersion; use core_test_support::responses::start_websocket_server; use core_test_support::skip_if_no_network; use pretty_assertions::assert_eq; @@ -115,6 +116,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { .await?; assert_eq!(started.thread_id, thread_start.thread.id); assert!(started.session_id.is_some()); + assert_eq!(started.version, RealtimeConversationVersion::V2); let startup_context_request = realtime_server.wait_for_request(0, 0).await; assert_eq!( diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 7d3ecdaa0124..ea00a7a2a2b2 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -1359,6 +1359,13 @@ }, "type": "object" }, + "RealtimeConversationVersion": { + "enum": [ + "v1", + "v2" + ], + "type": "string" + }, "RealtimeToml": { "additionalProperties": false, "properties": { @@ -1366,7 +1373,7 @@ "$ref": "#/definitions/RealtimeWsMode" }, "version": { - "$ref": "#/definitions/RealtimeWsVersion" + "$ref": "#/definitions/RealtimeConversationVersion" } }, "type": "object" @@ -1378,13 +1385,6 @@ ], "type": "string" }, - "RealtimeWsVersion": { - "enum": [ - "v1", - "v2" - ], - "type": "string" - }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/core/src/auth_env_telemetry.rs b/codex-rs/core/src/auth_env_telemetry.rs index be281e05a1ff..85cd23fe06f7 100644 --- a/codex-rs/core/src/auth_env_telemetry.rs +++ b/codex-rs/core/src/auth_env_telemetry.rs @@ -71,6 +71,7 @@ mod tests { request_max_retries: None, stream_max_retries: None, stream_idle_timeout_ms: None, + websocket_connect_timeout_ms: None, requires_openai_auth: false, supports_websockets: false, }; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 7a543161e6ca..bc436a0cf92e 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1535,13 +1535,7 @@ pub enum RealtimeWsMode { Transcription, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy, Default, PartialEq, Eq, JsonSchema)] -#[serde(rename_all = "snake_case")] -pub enum RealtimeWsVersion { - #[default] - V1, - V2, -} +pub use codex_protocol::protocol::RealtimeConversationVersion as RealtimeWsVersion; #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, JsonSchema)] #[schemars(deny_unknown_fields)] diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 938f922f8774..2a8a6337a79c 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -371,7 +371,8 @@ pub(crate) async fn handle_start( format!("{prompt}\n\n{startup_context}") }; let model = config.experimental_realtime_ws_model.clone(); - let event_parser = match config.realtime.version { + let version = config.realtime.version; + let event_parser = match version { RealtimeWsVersion::V1 => RealtimeEventParser::V1, RealtimeWsVersion::V2 => RealtimeEventParser::RealtimeV2, }; @@ -411,6 +412,7 @@ pub(crate) async fn handle_start( id: sub_id.clone(), msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent { session_id: requested_session_id, + version, }), }) .await; diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 4ab987121479..8d156d17dd43 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -13,6 +13,7 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; +use codex_protocol::protocol::RealtimeConversationVersion; use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::user_input::UserInput; @@ -159,6 +160,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { .await .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); assert!(started.session_id.is_some()); + assert_eq!(started.version, RealtimeConversationVersion::V1); let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index daf3b7d74a39..f8ea7fec54f4 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1446,9 +1446,18 @@ pub struct HookCompletedEvent { pub run: HookRunSummary, } +#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +pub enum RealtimeConversationVersion { + #[default] + V1, + V2, +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] pub struct RealtimeConversationStartedEvent { pub session_id: Option, + pub version: RealtimeConversationVersion, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 37646880e9db..892e241836bb 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -4,6 +4,7 @@ use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeConversationStartedEvent; +use codex_protocol::protocol::RealtimeConversationVersion; use codex_protocol::protocol::RealtimeEvent; #[cfg(not(target_os = "linux"))] use std::sync::atomic::AtomicUsize; @@ -22,6 +23,7 @@ pub(super) enum RealtimeConversationPhase { #[derive(Default)] pub(super) struct RealtimeConversationUiState { phase: RealtimeConversationPhase, + audio_behavior: RealtimeAudioBehavior, requested_close: bool, session_id: Option, warned_audio_only_submission: bool, @@ -38,6 +40,35 @@ pub(super) struct RealtimeConversationUiState { playback_queued_samples: Arc, } +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +enum RealtimeAudioBehavior { + #[default] + Legacy, + PlaybackAware, +} + +impl RealtimeAudioBehavior { + fn from_version(version: RealtimeConversationVersion) -> Self { + match version { + RealtimeConversationVersion::V1 => Self::Legacy, + RealtimeConversationVersion::V2 => Self::PlaybackAware, + } + } + + #[cfg(not(target_os = "linux"))] + fn input_behavior( + self, + playback_queued_samples: Arc, + ) -> crate::voice::RealtimeInputBehavior { + match self { + Self::Legacy => crate::voice::RealtimeInputBehavior::Ungated, + Self::PlaybackAware => crate::voice::RealtimeInputBehavior::PlaybackAware { + playback_queued_samples, + }, + } + } +} + impl RealtimeConversationUiState { pub(super) fn is_live(&self) -> bool { matches!( @@ -202,6 +233,7 @@ impl ChatWidget { self.realtime_conversation.phase = RealtimeConversationPhase::Starting; self.realtime_conversation.requested_close = false; self.realtime_conversation.session_id = None; + self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::Legacy; self.realtime_conversation.warned_audio_only_submission = false; self.set_footer_hint_override(Some(vec![( "/realtime".to_string(), @@ -241,6 +273,7 @@ impl ChatWidget { self.realtime_conversation.phase = RealtimeConversationPhase::Inactive; self.realtime_conversation.requested_close = false; self.realtime_conversation.session_id = None; + self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::Legacy; self.realtime_conversation.warned_audio_only_submission = false; } @@ -255,6 +288,7 @@ impl ChatWidget { } self.realtime_conversation.phase = RealtimeConversationPhase::Active; self.realtime_conversation.session_id = ev.session_id; + self.realtime_conversation.audio_behavior = RealtimeAudioBehavior::from_version(ev.version); self.realtime_conversation.warned_audio_only_submission = false; self.set_footer_hint_override(Some(vec![( "/realtime".to_string(), @@ -274,7 +308,11 @@ impl ChatWidget { } RealtimeEvent::InputAudioSpeechStarted(_) | RealtimeEvent::ResponseCancelled(_) => { #[cfg(not(target_os = "linux"))] - if let Some(player) = &self.realtime_conversation.audio_player { + if matches!( + self.realtime_conversation.audio_behavior, + RealtimeAudioBehavior::PlaybackAware + ) && let Some(player) = &self.realtime_conversation.audio_player + { // Once the server detects user speech or the current response is cancelled, // any buffered assistant audio is stale and should stop gating mic input. player.clear(); @@ -341,7 +379,11 @@ impl ChatWidget { let capture = match crate::voice::VoiceCapture::start_realtime( &self.config, self.app_event_tx.clone(), - Arc::clone(&self.realtime_conversation.playback_queued_samples), + self.realtime_conversation + .audio_behavior + .input_behavior(Arc::clone( + &self.realtime_conversation.playback_queued_samples, + )), ) { Ok(capture) => capture, Err(err) => { diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index f537c95bbc03..6d52e020f14f 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -147,6 +147,14 @@ mod voice { pub(crate) struct RealtimeAudioPlayer; + #[derive(Clone)] + pub(crate) enum RealtimeInputBehavior { + Ungated, + PlaybackAware { + playback_queued_samples: Arc, + }, + } + impl VoiceCapture { pub fn start() -> Result { Err("voice input is unavailable in this build".to_string()) @@ -155,7 +163,7 @@ mod voice { pub fn start_realtime( _config: &Config, _tx: AppEventSender, - _playback_queued_samples: Arc, + _input_behavior: RealtimeInputBehavior, ) -> Result { Err("voice input is unavailable in this build".to_string()) } diff --git a/codex-rs/tui/src/voice.rs b/codex-rs/tui/src/voice.rs index ba260b028fbb..510010c3038e 100644 --- a/codex-rs/tui/src/voice.rs +++ b/codex-rs/tui/src/voice.rs @@ -44,6 +44,14 @@ const REALTIME_INTERRUPT_INPUT_PEAK_THRESHOLD: u16 = 4_000; // callbacks so trailing syllables are not chopped up between chunks. const REALTIME_INTERRUPT_GRACE_PERIOD: Duration = Duration::from_millis(900); +#[derive(Clone)] +pub(crate) enum RealtimeInputBehavior { + Ungated, + PlaybackAware { + playback_queued_samples: Arc, + }, +} + struct TranscriptionAuthContext { mode: AuthMode, bearer_token: String, @@ -94,7 +102,7 @@ impl VoiceCapture { pub fn start_realtime( config: &Config, tx: AppEventSender, - playback_queued_samples: Arc, + input_behavior: RealtimeInputBehavior, ) -> Result { let (device, config) = select_configured_input_device_and_config(config)?; @@ -110,7 +118,7 @@ impl VoiceCapture { sample_rate, channels, tx, - playback_queued_samples, + input_behavior, last_peak.clone(), )?; stream @@ -354,7 +362,7 @@ fn build_realtime_input_stream( sample_rate: u32, channels: u16, tx: AppEventSender, - playback_queued_samples: Arc, + input_behavior: RealtimeInputBehavior, last_peak: Arc, ) -> Result { match config.sample_format() { @@ -362,7 +370,6 @@ fn build_realtime_input_stream( .build_input_stream( &config.clone().into(), { - let playback_queued_samples = Arc::clone(&playback_queued_samples); let last_peak = Arc::clone(&last_peak); let tx = tx; let mut allow_input_until = None; @@ -370,9 +377,8 @@ fn build_realtime_input_stream( let peak = peak_f32(input); if !should_send_realtime_input( peak, - &playback_queued_samples, + &input_behavior, &mut allow_input_until, - Instant::now(), ) { last_peak.store(0, Ordering::Relaxed); return; @@ -390,7 +396,6 @@ fn build_realtime_input_stream( .build_input_stream( &config.clone().into(), { - let playback_queued_samples = Arc::clone(&playback_queued_samples); let last_peak = Arc::clone(&last_peak); let tx = tx; let mut allow_input_until = None; @@ -398,9 +403,8 @@ fn build_realtime_input_stream( let peak = peak_i16(input); if !should_send_realtime_input( peak, - &playback_queued_samples, + &input_behavior, &mut allow_input_until, - Instant::now(), ) { last_peak.store(0, Ordering::Relaxed); return; @@ -417,7 +421,6 @@ fn build_realtime_input_stream( .build_input_stream( &config.clone().into(), { - let playback_queued_samples = Arc::clone(&playback_queued_samples); let last_peak = Arc::clone(&last_peak); let tx = tx; let mut allow_input_until = None; @@ -426,9 +429,8 @@ fn build_realtime_input_stream( let peak = convert_u16_to_i16_and_peak(input, &mut samples); if !should_send_realtime_input( peak, - &playback_queued_samples, + &input_behavior, &mut allow_input_until, - Instant::now(), ) { last_peak.store(0, Ordering::Relaxed); return; @@ -739,10 +741,21 @@ fn fill_output_u16( /// utterance reaches the server. fn should_send_realtime_input( peak: u16, - playback_queued_samples: &Arc, + input_behavior: &RealtimeInputBehavior, allow_input_until: &mut Option, - now: Instant, ) -> bool { + let playback_queued_samples = match input_behavior { + RealtimeInputBehavior::Ungated => { + *allow_input_until = None; + return true; + } + RealtimeInputBehavior::PlaybackAware { + playback_queued_samples, + } => playback_queued_samples, + }; + + let now = Instant::now(); + if playback_queued_samples.load(Ordering::Relaxed) == 0 { *allow_input_until = None; return true; @@ -1021,11 +1034,18 @@ async fn transcribe_bytes( #[cfg(test)] mod tests { + use super::RealtimeInputBehavior; use super::RecordedAudio; use super::convert_pcm16; use super::encode_wav_normalized; + use super::should_send_realtime_input; use pretty_assertions::assert_eq; use std::io::Cursor; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::time::Duration; + use std::time::Instant; #[test] fn convert_pcm16_downmixes_and_resamples_for_model_input() { @@ -1054,4 +1074,39 @@ mod tests { assert_eq!(spec.sample_rate, 24_000); assert_eq!(samples, vec![8_426, 29_490]); } + + #[test] + fn ungated_realtime_input_ignores_playback_backlog() { + let mut allow_input_until = Some(Instant::now() + Duration::from_secs(1)); + let playback_queued_samples = Arc::new(AtomicUsize::new(1024)); + + assert!(should_send_realtime_input( + 0, + &RealtimeInputBehavior::Ungated, + &mut allow_input_until, + )); + assert_eq!(allow_input_until, None); + assert_eq!(playback_queued_samples.load(Ordering::Relaxed), 1024); + } + + #[test] + fn playback_aware_realtime_input_requires_an_interrupt_peak() { + let mut allow_input_until = None; + let playback_queued_samples = Arc::new(AtomicUsize::new(1024)); + let input_behavior = RealtimeInputBehavior::PlaybackAware { + playback_queued_samples: Arc::clone(&playback_queued_samples), + }; + + assert!(!should_send_realtime_input( + 100, + &input_behavior, + &mut allow_input_until, + )); + assert!(should_send_realtime_input( + 5_000, + &input_behavior, + &mut allow_input_until, + )); + assert!(allow_input_until.is_some()); + } } diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index 6c54bf3ce8f4..5efac9bc0d3e 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -492,6 +492,7 @@ fn server_notification_thread_events( id: String::new(), msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent { session_id: notification.session_id, + version: notification.version, }), }], )),