From c2641901e864ca7ae0653120f6e2fff771b50417 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Sat, 3 Jan 2026 12:22:52 -0800 Subject: [PATCH] fix(codex-api): handle Chat Completions DONE sentinel Context - This code parses Server-Sent Events (SSE) from the legacy Chat Completions streaming API (wire_api = "chat"). - The upstream protocol terminates a stream with a final sentinel event: data: [DONE]. - Some of our test stubs/helpers historically end the stream with data: DONE (no brackets). How this was found - GitHub Actions on Windows failed in codex-app-server integration tests with wiremock verification errors (expected multiple POSTs, got 1). Diagnosis - The job logs included: codex_api::sse::chat: Failed to parse ChatCompletions SSE event ... data: DONE. - eventsource_stream surfaces the sentinel as a normal SSE event; it does not automatically close the stream. - The parser previously attempted to JSON-decode every data: payload. The sentinel is not JSON, so we logged and skipped it, then continued polling. - On servers that keep the HTTP connection open after emitting the sentinel (notably wiremock on Windows), skipping the sentinel meant we never emitted ResponseEvent::Completed. - Higher layers wait for completion before progressing (emitting approval requests and issuing follow-up model calls), so the test never reached the subsequent requests and wiremock panicked when its expected-call count was not met. Fix - Treat both data: [DONE] and data: DONE as explicit end-of-stream sentinels. - When a sentinel is seen, flush any pending assistant/reasoning items and emit ResponseEvent::Completed once. Tests - Add a regression unit test asserting we complete on the sentinel even if the underlying connection is not closed. --- codex-rs/codex-api/src/sse/chat.rs | 83 +++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 20 deletions(-) diff --git a/codex-rs/codex-api/src/sse/chat.rs b/codex-rs/codex-api/src/sse/chat.rs index 21adfa571a2..dec35890b78 100644 --- a/codex-rs/codex-api/src/sse/chat.rs +++ b/codex-rs/codex-api/src/sse/chat.rs @@ -30,6 +30,21 @@ pub(crate) fn spawn_chat_stream( ResponseStream { rx_event } } +/// Processes Server-Sent Events from the legacy Chat Completions streaming API. +/// +/// The upstream protocol terminates a streaming response with a final sentinel event +/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE` +/// (without brackets) instead. +/// +/// `eventsource_stream` delivers these sentinels as regular events rather than signaling +/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep +/// polling for more events. +/// +/// On servers that keep the HTTP connection open after emitting the sentinel (notably +/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`. +/// Higher-level workflows/tests that wait for completion before issuing subsequent model +/// calls will then stall, which shows up as "expected N requests, got 1" verification +/// failures in the mock server. pub async fn process_chat_sse( stream: S, tx_event: mpsc::Sender>, @@ -57,6 +72,31 @@ pub async fn process_chat_sse( let mut reasoning_item: Option = None; let mut completed_sent = false; + async fn flush_and_complete( + tx_event: &mpsc::Sender>, + reasoning_item: &mut Option, + assistant_item: &mut Option, + ) { + if let Some(reasoning) = reasoning_item.take() { + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(reasoning))) + .await; + } + + if let Some(assistant) = assistant_item.take() { + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(assistant))) + .await; + } + + let _ = tx_event + .send(Ok(ResponseEvent::Completed { + response_id: String::new(), + token_usage: None, + })) + .await; + } + loop { let start = Instant::now(); let response = timeout(idle_timeout, stream.next()).await; @@ -70,24 +110,8 @@ pub async fn process_chat_sse( return; } Ok(None) => { - if let Some(reasoning) = reasoning_item { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - if let Some(assistant) = assistant_item { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(assistant))) - .await; - } if !completed_sent { - let _ = tx_event - .send(Ok(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - })) - .await; + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; } return; } @@ -101,16 +125,25 @@ pub async fn process_chat_sse( trace!("SSE event: {}", sse.data); - if sse.data.trim().is_empty() { + let data = sse.data.trim(); + + if data.is_empty() { continue; } - let value: serde_json::Value = match serde_json::from_str(&sse.data) { + if data == "[DONE]" || data == "DONE" { + if !completed_sent { + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; + } + return; + } + + let value: serde_json::Value = match serde_json::from_str(data) { Ok(val) => val, Err(err) => { debug!( "Failed to parse ChatCompletions SSE event: {err}, data: {}", - &sse.data + data ); continue; } @@ -362,6 +395,16 @@ mod tests { body } + /// Regression test: the stream should complete when we see a `[DONE]` sentinel. + /// + /// This is important for tests/mocks that don't immediately close the underlying + /// connection after emitting the sentinel. + #[tokio::test] + async fn completes_on_done_sentinel_without_json() { + let events = collect_events("event: message\ndata: [DONE]\n\n").await; + assert_matches!(&events[..], [ResponseEvent::Completed { .. }]); + } + async fn collect_events(body: &str) -> Vec { let reader = ReaderStream::new(std::io::Cursor::new(body.to_string())) .map_err(|err| codex_client::TransportError::Network(err.to_string()));