diff --git a/codex-rs/codex-api/src/sse/chat.rs b/codex-rs/codex-api/src/sse/chat.rs index 21adfa571a2..dec35890b78 100644 --- a/codex-rs/codex-api/src/sse/chat.rs +++ b/codex-rs/codex-api/src/sse/chat.rs @@ -30,6 +30,21 @@ pub(crate) fn spawn_chat_stream( ResponseStream { rx_event } } +/// Processes Server-Sent Events from the legacy Chat Completions streaming API. +/// +/// The upstream protocol terminates a streaming response with a final sentinel event +/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE` +/// (without brackets) instead. +/// +/// `eventsource_stream` delivers these sentinels as regular events rather than signaling +/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep +/// polling for more events. +/// +/// On servers that keep the HTTP connection open after emitting the sentinel (notably +/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`. +/// Higher-level workflows/tests that wait for completion before issuing subsequent model +/// calls will then stall, which shows up as "expected N requests, got 1" verification +/// failures in the mock server. pub async fn process_chat_sse( stream: S, tx_event: mpsc::Sender>, @@ -57,6 +72,31 @@ pub async fn process_chat_sse( let mut reasoning_item: Option = None; let mut completed_sent = false; + async fn flush_and_complete( + tx_event: &mpsc::Sender>, + reasoning_item: &mut Option, + assistant_item: &mut Option, + ) { + if let Some(reasoning) = reasoning_item.take() { + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(reasoning))) + .await; + } + + if let Some(assistant) = assistant_item.take() { + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(assistant))) + .await; + } + + let _ = tx_event + .send(Ok(ResponseEvent::Completed { + response_id: String::new(), + token_usage: None, + })) + .await; + } + loop { let start = Instant::now(); let response = timeout(idle_timeout, stream.next()).await; @@ -70,24 +110,8 @@ pub async fn process_chat_sse( return; } Ok(None) => { - if let Some(reasoning) = reasoning_item { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - if let Some(assistant) = assistant_item { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(assistant))) - .await; - } if !completed_sent { - let _ = tx_event - .send(Ok(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - })) - .await; + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; } return; } @@ -101,16 +125,25 @@ pub async fn process_chat_sse( trace!("SSE event: {}", sse.data); - if sse.data.trim().is_empty() { + let data = sse.data.trim(); + + if data.is_empty() { continue; } - let value: serde_json::Value = match serde_json::from_str(&sse.data) { + if data == "[DONE]" || data == "DONE" { + if !completed_sent { + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; + } + return; + } + + let value: serde_json::Value = match serde_json::from_str(data) { Ok(val) => val, Err(err) => { debug!( "Failed to parse ChatCompletions SSE event: {err}, data: {}", - &sse.data + data ); continue; } @@ -362,6 +395,16 @@ mod tests { body } + /// Regression test: the stream should complete when we see a `[DONE]` sentinel. + /// + /// This is important for tests/mocks that don't immediately close the underlying + /// connection after emitting the sentinel. + #[tokio::test] + async fn completes_on_done_sentinel_without_json() { + let events = collect_events("event: message\ndata: [DONE]\n\n").await; + assert_matches!(&events[..], [ResponseEvent::Completed { .. }]); + } + async fn collect_events(body: &str) -> Vec { let reader = ReaderStream::new(std::io::Cursor::new(body.to_string())) .map_err(|err| codex_client::TransportError::Network(err.to_string()));