Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 17 additions & 118 deletions codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::endpoint::realtime_websocket::protocol::ConversationFunctionCallOutputItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode;
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
use crate::endpoint::realtime_websocket::methods_common::websocket_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
Expand All @@ -10,13 +11,6 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice;
use crate::endpoint::realtime_websocket::protocol::SessionFunctionTool;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
use crate::error::ApiError;
use crate::provider::Provider;
Expand All @@ -26,7 +20,6 @@ use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand All @@ -47,22 +40,6 @@ use tracing::trace;
use tungstenite::protocol::WebSocketConfig;
use url::Url;

const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";
const REALTIME_V2_SESSION_TYPE: &str = "realtime";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate work to Codex and return the result.";

fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}

struct WsStream {
tx_command: mpsc::Sender<WsCommand>,
pump_task: tokio::task::JoinHandle<()>,
Expand Down Expand Up @@ -300,45 +277,21 @@ impl RealtimeWebsocketWriter {
}

pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
let content_kind = match self.event_parser {
RealtimeEventParser::V1 => "text",
RealtimeEventParser::RealtimeV2 => "input_text",
};
self.send_json(RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: content_kind.to_string(),
text,
}],
}),
})
.await
self.send_json(conversation_item_create_message(self.event_parser, text))
.await
}

pub async fn send_conversation_handoff_append(
&self,
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
let message = match self.event_parser {
RealtimeEventParser::V1 => RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
},
RealtimeEventParser::RealtimeV2 => RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::FunctionCallOutput(
ConversationFunctionCallOutputItem {
kind: "function_call_output".to_string(),
call_id: handoff_id,
output: output_text,
},
),
},
};

self.send_json(message).await
self.send_json(conversation_handoff_append_message(
self.event_parser,
handoff_id,
output_text,
))
.await
}

pub async fn send_session_update(
Expand All @@ -347,60 +300,9 @@ impl RealtimeWebsocketWriter {
session_mode: RealtimeSessionMode,
) -> Result<(), ApiError> {
let session_mode = normalized_session_mode(self.event_parser, session_mode);
let (session_kind, session_instructions, output_audio) = match session_mode {
RealtimeSessionMode::Conversational => {
let kind = match self.event_parser {
RealtimeEventParser::V1 => REALTIME_V1_SESSION_TYPE.to_string(),
RealtimeEventParser::RealtimeV2 => REALTIME_V2_SESSION_TYPE.to_string(),
};
let voice = match self.event_parser {
RealtimeEventParser::V1 => SessionAudioVoice::Fathom,
RealtimeEventParser::RealtimeV2 => SessionAudioVoice::Alloy,
};
(kind, Some(instructions), Some(SessionAudioOutput { voice }))
}
RealtimeSessionMode::Transcription => ("transcription".to_string(), None, None),
};
let tools = match (self.event_parser, session_mode) {
(RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Conversational) => {
Some(vec![SessionFunctionTool {
kind: "function".to_string(),
name: REALTIME_V2_CODEX_TOOL_NAME.to_string(),
description: REALTIME_V2_CODEX_TOOL_DESCRIPTION.to_string(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Prompt text for the delegated Codex task."
}
},
"required": ["prompt"],
"additionalProperties": false
}),
}])
}
(RealtimeEventParser::RealtimeV2, RealtimeSessionMode::Transcription)
| (RealtimeEventParser::V1, RealtimeSessionMode::Conversational)
| (RealtimeEventParser::V1, RealtimeSessionMode::Transcription) => None,
};
self.send_json(RealtimeOutboundMessage::SessionUpdate {
session: SessionUpdateSession {
kind: session_kind,
instructions: session_instructions,
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: "audio/pcm".to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: output_audio,
},
tools,
},
})
.await
let session = session_update_session(self.event_parser, instructions, session_mode);
self.send_json(RealtimeOutboundMessage::SessionUpdate { session })
.await
}

pub async fn close(&self) -> Result<(), ApiError> {
Expand Down Expand Up @@ -655,10 +557,7 @@ fn websocket_url_from_api_url(
}
}

let intent = match event_parser {
RealtimeEventParser::V1 => Some("quicksilver"),
RealtimeEventParser::RealtimeV2 => None,
};
let intent = websocket_intent(event_parser);
let has_extra_query_params = query_params.is_some_and(|query_params| {
query_params
.iter()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append_message as v1_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session;
use crate::endpoint::realtime_websocket::methods_v1::websocket_intent as v1_websocket_intent;
use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session;
use crate::endpoint::realtime_websocket::methods_v2::websocket_intent as v2_websocket_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;

pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
pub(super) const REALTIME_AUDIO_FORMAT: &str = "audio/pcm";

pub(super) fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
match event_parser {
RealtimeEventParser::V1 => RealtimeSessionMode::Conversational,
RealtimeEventParser::RealtimeV2 => session_mode,
}
}

pub(super) fn conversation_item_create_message(
event_parser: RealtimeEventParser,
text: String,
) -> RealtimeOutboundMessage {
match event_parser {
RealtimeEventParser::V1 => v1_conversation_item_create_message(text),
RealtimeEventParser::RealtimeV2 => v2_conversation_item_create_message(text),
}
}

pub(super) fn conversation_handoff_append_message(
event_parser: RealtimeEventParser,
handoff_id: String,
output_text: String,
) -> RealtimeOutboundMessage {
match event_parser {
RealtimeEventParser::V1 => v1_conversation_handoff_append_message(handoff_id, output_text),
RealtimeEventParser::RealtimeV2 => {
v2_conversation_handoff_append_message(handoff_id, output_text)
}
}
}

pub(super) fn session_update_session(
event_parser: RealtimeEventParser,
instructions: String,
session_mode: RealtimeSessionMode,
) -> SessionUpdateSession {
let session_mode = normalized_session_mode(event_parser, session_mode);
match event_parser {
RealtimeEventParser::V1 => v1_session_update_session(instructions),
RealtimeEventParser::RealtimeV2 => v2_session_update_session(instructions, session_mode),
}
}

pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_websocket_intent(),
RealtimeEventParser::RealtimeV2 => v2_websocket_intent(),
}
}
60 changes: 60 additions & 0 deletions codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_FORMAT;
use crate::endpoint::realtime_websocket::methods_common::REALTIME_AUDIO_SAMPLE_RATE;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload;
use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::SessionAudio;
use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat;
use crate::endpoint::realtime_websocket::protocol::SessionAudioInput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput;
use crate::endpoint::realtime_websocket::protocol::SessionAudioVoice;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;

const REALTIME_V1_SESSION_TYPE: &str = "quicksilver";

pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {
item: ConversationItemPayload::Message(ConversationMessageItem {
kind: "message".to_string(),
role: "user".to_string(),
content: vec![ConversationItemContent {
kind: "text".to_string(),
text,
}],
}),
}
}

pub(super) fn conversation_handoff_append_message(
handoff_id: String,
output_text: String,
) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
}
}

pub(super) fn session_update_session(instructions: String) -> SessionUpdateSession {
SessionUpdateSession {
kind: REALTIME_V1_SESSION_TYPE.to_string(),
instructions: Some(instructions),
audio: SessionAudio {
input: SessionAudioInput {
format: SessionAudioFormat {
kind: REALTIME_AUDIO_FORMAT.to_string(),
rate: REALTIME_AUDIO_SAMPLE_RATE,
},
},
output: Some(SessionAudioOutput {
voice: SessionAudioVoice::Fathom,
}),
},
tools: None,
}
}

pub(super) fn websocket_intent() -> Option<&'static str> {
Some("quicksilver")
}
Loading
Loading