Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions codex-rs/codex-api/src/sse/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ pub(crate) fn spawn_chat_stream(
ResponseStream { rx_event }
}

/// Processes Server-Sent Events from the legacy Chat Completions streaming API.
///
/// The upstream protocol terminates a streaming response with a final sentinel event
/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE`
/// (without brackets) instead.
///
/// `eventsource_stream` delivers these sentinels as regular events rather than signaling
/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep
/// polling for more events.
///
/// On servers that keep the HTTP connection open after emitting the sentinel (notably
/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`.
/// Higher-level workflows/tests that wait for completion before issuing subsequent model
/// calls will then stall, which shows up as "expected N requests, got 1" verification
/// failures in the mock server.
pub async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
Expand Down Expand Up @@ -57,6 +72,31 @@ pub async fn process_chat_sse<S>(
let mut reasoning_item: Option<ResponseItem> = None;
let mut completed_sent = false;

async fn flush_and_complete(
tx_event: &mpsc::Sender<Result<ResponseEvent, ApiError>>,
reasoning_item: &mut Option<ResponseItem>,
assistant_item: &mut Option<ResponseItem>,
) {
if let Some(reasoning) = reasoning_item.take() {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
.await;
}

if let Some(assistant) = assistant_item.take() {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(assistant)))
.await;
}

let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: String::new(),
token_usage: None,
}))
.await;
}

loop {
let start = Instant::now();
let response = timeout(idle_timeout, stream.next()).await;
Expand All @@ -70,24 +110,8 @@ pub async fn process_chat_sse<S>(
return;
}
Ok(None) => {
if let Some(reasoning) = reasoning_item {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
.await;
}

if let Some(assistant) = assistant_item {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(assistant)))
.await;
}
if !completed_sent {
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: String::new(),
token_usage: None,
}))
.await;
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
}
return;
}
Expand All @@ -101,16 +125,25 @@ pub async fn process_chat_sse<S>(

trace!("SSE event: {}", sse.data);

if sse.data.trim().is_empty() {
let data = sse.data.trim();

if data.is_empty() {
continue;
}

let value: serde_json::Value = match serde_json::from_str(&sse.data) {
if data == "[DONE]" || data == "DONE" {
if !completed_sent {
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
}
return;
}

let value: serde_json::Value = match serde_json::from_str(data) {
Ok(val) => val,
Err(err) => {
debug!(
"Failed to parse ChatCompletions SSE event: {err}, data: {}",
&sse.data
data
);
continue;
}
Expand Down Expand Up @@ -362,6 +395,16 @@ mod tests {
body
}

/// Regression test: the stream should complete when we see a `[DONE]` sentinel.
///
/// This is important for tests/mocks that don't immediately close the underlying
/// connection after emitting the sentinel.
#[tokio::test]
async fn completes_on_done_sentinel_without_json() {
let events = collect_events("event: message\ndata: [DONE]\n\n").await;
assert_matches!(&events[..], [ResponseEvent::Completed { .. }]);
}

async fn collect_events(body: &str) -> Vec<ResponseEvent> {
let reader = ReaderStream::new(std::io::Cursor::new(body.to_string()))
.map_err(|err| codex_client::TransportError::Network(err.to_string()));
Expand Down
Loading