Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/crates/core/src/agentic/execution/stream_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,20 @@ impl StreamProcessor {

// ==================== Helper Methods ====================

/// Send thinking end marker (if needed)
/// Send thinking end event (if needed)
async fn send_thinking_end_if_needed(&self, ctx: &mut StreamContext) {
if ctx.thinking_chunks_count > 0 && !ctx.thinking_completed_sent {
ctx.thinking_completed_sent = true;
debug!("Thinking process ended, sending ThinkingChunk with end marker");
debug!("Thinking process ended, sending ThinkingChunk end event");
let _ = self
.event_queue
.enqueue(
AgenticEvent::ThinkingChunk {
session_id: ctx.session_id.clone(),
turn_id: ctx.dialog_turn_id.clone(),
round_id: ctx.round_id.clone(),
content: "<thinking_end>".to_string(),
content: String::new(),
is_end: true,
subagent_parent_info: ctx.event_subagent_parent_info.clone(),
},
Some(EventPriority::Normal),
Expand Down Expand Up @@ -578,6 +579,7 @@ impl StreamProcessor {
turn_id: ctx.dialog_turn_id.clone(),
round_id: ctx.round_id.clone(),
content: thinking_content,
is_end: false,
subagent_parent_info: ctx.event_subagent_parent_info.clone(),
},
Some(EventPriority::Normal),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true

[dependencies]
anyhow = { workspace = true }
chrono = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }
reqwest = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::stream_stats::StreamStats;
use crate::types::anthropic::{
AnthropicSSEError, ContentBlock, ContentBlockDelta, ContentBlockStart, MessageDelta,
MessageStart, Usage,
Expand Down Expand Up @@ -26,25 +27,29 @@ pub async fn handle_anthropic_stream(
let mut stream = response.bytes_stream().eventsource();
let idle_timeout = Duration::from_secs(600);
let mut usage = Usage::default();
let mut stats = StreamStats::new("Anthropic");

loop {
let sse_event = timeout(idle_timeout, stream.next()).await;
let sse = match sse_event {
Ok(Some(Ok(sse))) => sse,
Ok(None) => {
let error_msg = "SSE Error: stream closed before response completed";
stats.log_summary("stream_closed_before_completion");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
Ok(Some(Err(e))) => {
let error_msg = format!("SSE Error: {}", e);
stats.log_summary("sse_stream_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
Err(_) => {
let error_msg = "SSE Timeout: idle timeout waiting for SSE";
stats.log_summary("sse_stream_timeout");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -54,6 +59,7 @@ pub async fn handle_anthropic_stream(
trace!("Anthropic SSE: {:?}", sse);
let event_type = sse.event;
let data = sse.data;
stats.record_sse_event(&event_type);

if let Some(ref tx) = tx_raw_sse {
let _ = tx.send(format!("[{}] {}", event_type, data));
Expand All @@ -64,6 +70,7 @@ pub async fn handle_anthropic_stream(
let message_start: MessageStart = match serde_json::from_str(&data) {
Ok(message_start) => message_start,
Err(e) => {
stats.increment("error:sse_parsing");
let err_str = format!("SSE Parsing Error: {e}, data: {}", &data);
error!("{}", err_str);
continue;
Expand All @@ -77,6 +84,7 @@ pub async fn handle_anthropic_stream(
let content_block_start: ContentBlockStart = match serde_json::from_str(&data) {
Ok(content_block_start) => content_block_start,
Err(e) => {
stats.increment("error:sse_parsing");
let err_str = format!("SSE Parsing Error: {e}, data: {}", &data);
error!("{}", err_str);
continue;
Expand All @@ -88,13 +96,15 @@ pub async fn handle_anthropic_stream(
) {
let unified_response = UnifiedResponse::from(content_block_start);
trace!("Anthropic unified response: {:?}", unified_response);
stats.record_unified_response(&unified_response);
let _ = tx_event.send(Ok(unified_response));
}
}
"content_block_delta" => {
let content_block_delta: ContentBlockDelta = match serde_json::from_str(&data) {
Ok(content_block_delta) => content_block_delta,
Err(e) => {
stats.increment("error:sse_parsing");
let err_str = format!("SSE Parsing Error: {e}, data: {}", &data);
error!("{}", err_str);
continue;
Expand All @@ -103,9 +113,11 @@ pub async fn handle_anthropic_stream(
match UnifiedResponse::try_from(content_block_delta) {
Ok(unified_response) => {
trace!("Anthropic unified response: {:?}", unified_response);
stats.record_unified_response(&unified_response);
let _ = tx_event.send(Ok(unified_response));
}
Err(e) => {
stats.increment("skip:invalid_content_block_delta");
error!("Skipping invalid content_block_delta: {}", e);
}
};
Expand All @@ -114,6 +126,7 @@ pub async fn handle_anthropic_stream(
let mut message_delta: MessageDelta = match serde_json::from_str(&data) {
Ok(message_delta) => message_delta,
Err(e) => {
stats.increment("error:sse_parsing");
let err_str = format!("SSE Parsing Error: {e}, data: {}", &data);
error!("{}", err_str);
continue;
Expand All @@ -129,22 +142,28 @@ pub async fn handle_anthropic_stream(
};
let unified_response = UnifiedResponse::from(message_delta);
trace!("Anthropic unified response: {:?}", unified_response);
stats.record_unified_response(&unified_response);
let _ = tx_event.send(Ok(unified_response));
}
"error" => {
let sse_error: AnthropicSSEError = match serde_json::from_str(&data) {
Ok(message_delta) => message_delta,
Err(e) => {
stats.increment("error:sse_parsing");
let err_str = format!("SSE Parsing Error: {e}, data: {}", &data);
stats.log_summary("sse_parsing_error");
error!("{}", err_str);
let _ = tx_event.send(Err(anyhow!(err_str)));
return;
}
};
stats.increment("error:api");
stats.log_summary("error_event_received");
let _ = tx_event.send(Err(anyhow!(String::from(sse_error.error))));
return;
}
"message_stop" => {
stats.log_summary("message_stop");
return;
}
_ => {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::stream_stats::StreamStats;
use crate::types::gemini::GeminiSSEData;
use crate::types::unified::UnifiedResponse;
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -79,22 +80,26 @@ pub async fn handle_gemini_stream(
let idle_timeout = Duration::from_secs(600);
let mut received_finish_reason = false;
let mut tool_call_state = GeminiToolCallState::new();
let mut stats = StreamStats::new("Gemini");

loop {
let sse_event = timeout(idle_timeout, stream.next()).await;
let sse = match sse_event {
Ok(Some(Ok(sse))) => sse,
Ok(None) => {
if received_finish_reason {
stats.log_summary("stream_closed_after_finish_reason");
return;
}
let error_msg = "Gemini SSE stream closed before response completed";
stats.log_summary("stream_closed_before_completion");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
Ok(Some(Err(e))) => {
let error_msg = format!("Gemini SSE stream error: {}", e);
stats.log_summary("sse_stream_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -104,27 +109,33 @@ pub async fn handle_gemini_stream(
"Gemini SSE stream timeout after {}s",
idle_timeout.as_secs()
);
stats.log_summary("sse_stream_timeout");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
};

let raw = sse.data;
stats.record_sse_event("data");
trace!("Gemini SSE: {:?}", raw);

if let Some(ref tx) = tx_raw_sse {
let _ = tx.send(raw.clone());
}

if raw == "[DONE]" {
stats.increment("marker:done");
stats.log_summary("done_marker_received");
return;
}

let event_json: Value = match serde_json::from_str(&raw) {
Ok(json) => json,
Err(e) => {
let error_msg = format!("Gemini SSE parsing error: {}, data: {}", e, raw);
stats.increment("error:sse_parsing");
stats.log_summary("sse_parsing_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -133,6 +144,8 @@ pub async fn handle_gemini_stream(

if let Some(message) = extract_api_error_message(&event_json) {
let error_msg = format!("Gemini SSE API error: {}, data: {}", message, raw);
stats.increment("error:api");
stats.log_summary("sse_api_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -142,13 +155,18 @@ pub async fn handle_gemini_stream(
Ok(data) => data,
Err(e) => {
let error_msg = format!("Gemini SSE data schema error: {}, data: {}", e, raw);
stats.increment("error:schema");
stats.log_summary("sse_data_schema_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
};

let mut unified_responses = sse_data.into_unified_responses();
if unified_responses.is_empty() {
stats.increment("skip:empty_unified_responses");
}
for unified_response in &mut unified_responses {
if let Some(tool_call) = unified_response.tool_call.as_mut() {
tool_call_state.assign_id(tool_call);
Expand All @@ -163,6 +181,7 @@ pub async fn handle_gemini_stream(
}

for unified_response in unified_responses {
stats.record_unified_response(&unified_response);
let _ = tx_event.send(Ok(unified_response));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod stream_stats;
mod anthropic;
mod gemini;
mod openai;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::stream_stats::StreamStats;
use crate::types::openai::OpenAISSEData;
use crate::types::unified::UnifiedResponse;
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -43,6 +44,7 @@ pub async fn handle_openai_stream(
) {
let mut stream = response.bytes_stream().eventsource();
let idle_timeout = Duration::from_secs(600);
let mut stats = StreamStats::new("OpenAI");
// Track whether a chunk with `finish_reason` was received.
// Some providers (e.g. MiniMax) close the stream after the final chunk
// without sending `[DONE]`, so we treat `Ok(None)` as a normal termination
Expand All @@ -55,40 +57,49 @@ pub async fn handle_openai_stream(
Ok(Some(Ok(sse))) => sse,
Ok(None) => {
if received_finish_reason {
stats.log_summary("stream_closed_after_finish_reason");
return;
}
let error_msg = "SSE stream closed before response completed";
stats.log_summary("stream_closed_before_completion");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
Ok(Some(Err(e))) => {
let error_msg = format!("SSE stream error: {}", e);
stats.log_summary("sse_stream_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
Err(_) => {
let error_msg = format!("SSE stream timeout after {}s", idle_timeout.as_secs());
stats.log_summary("sse_stream_timeout");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}
};

let raw = sse.data;
stats.record_sse_event("data");
trace!("OpenAI SSE: {:?}", raw);
if let Some(ref tx) = tx_raw_sse {
let _ = tx.send(raw.clone());
}
if raw == "[DONE]" {
stats.increment("marker:done");
stats.log_summary("done_marker_received");
return;
}

let event_json: Value = match serde_json::from_str(&raw) {
Ok(json) => json,
Err(e) => {
let error_msg = format!("SSE parsing error: {}, data: {}", e, &raw);
stats.increment("error:sse_parsing");
stats.log_summary("sse_parsing_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -97,12 +108,15 @@ pub async fn handle_openai_stream(

if let Some(api_error_message) = extract_sse_api_error_message(&event_json) {
let error_msg = format!("SSE API error: {}, data: {}", api_error_message, raw);
stats.increment("error:api");
stats.log_summary("sse_api_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
}

if !is_valid_chat_completion_chunk_weak(&event_json) {
stats.increment("skip:non_standard_event");
warn!(
"Skipping non-standard OpenAI SSE event; object={}",
event_json
Expand All @@ -113,10 +127,13 @@ pub async fn handle_openai_stream(
continue;
}

stats.increment("chunk:chat_completion");
let sse_data: OpenAISSEData = match serde_json::from_value(event_json) {
Ok(event) => event,
Err(e) => {
let error_msg = format!("SSE data schema error: {}, data: {}", e, &raw);
stats.increment("error:schema");
stats.log_summary("sse_data_schema_error");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -125,6 +142,7 @@ pub async fn handle_openai_stream(

let tool_call_count = sse_data.first_choice_tool_call_count();
if tool_call_count > 1 {
stats.increment("chunk:multi_tool_call");
warn!(
"OpenAI SSE chunk contains {} tool calls in the first choice; splitting and sending sequentially",
tool_call_count
Expand All @@ -136,6 +154,7 @@ pub async fn handle_openai_stream(
trace!("OpenAI unified responses: {:?}", unified_responses);
if unified_responses.is_empty() {
if has_empty_choices {
stats.increment("skip:empty_choices_no_usage");
warn!(
"Ignoring OpenAI SSE chunk with empty choices and no usage payload: {}",
raw
Expand All @@ -146,6 +165,8 @@ pub async fn handle_openai_stream(
// Defensive fallback: this should be unreachable if OpenAISSEData::into_unified_responses
// keeps returning at least one event for all non-empty-choices chunks.
let error_msg = format!("OpenAI SSE chunk produced no unified events, data: {}", raw);
stats.increment("error:no_unified_events");
stats.log_summary("no_unified_events");
error!("{}", error_msg);
let _ = tx_event.send(Err(anyhow!(error_msg)));
return;
Expand All @@ -155,6 +176,7 @@ pub async fn handle_openai_stream(
if unified_response.finish_reason.is_some() {
received_finish_reason = true;
}
stats.record_unified_response(&unified_response);
let _ = tx_event.send(Ok(unified_response));
}
}
Expand Down
Loading
Loading