From 905469956ea1c6289cd267d4db0160fe9bb58019 Mon Sep 17 00:00:00 2001 From: wsp Date: Wed, 18 Mar 2026 21:40:38 +0800 Subject: [PATCH] feat(flow-chat): fix tool event handling and add stream diagnostics - Fix `ParamsPartial` batching to match backend payload fields (`event_type` and `params`), so partial tool params are dispatched correctly. - Ignore late `ParamsPartial` events after terminal tool states, preventing completed tool cards from reverting to `receiving` and staying expanded. - Replace the `` marker with an explicit end flag for thinking chunk events across backend transport and frontend handling. - Add stream statistics collection and summary logging for AI stream handlers to improve streaming diagnostics. - Sync frontend log level with `app.logging.level` at runtime so logging changes take effect immediately in the UI. --- .../src/agentic/execution/stream_processor.rs | 8 +- .../ai/ai_stream_handlers/Cargo.toml | 1 + .../src/stream_handler/anthropic.rs | 19 ++ .../src/stream_handler/gemini.rs | 19 ++ .../src/stream_handler/mod.rs | 1 + .../src/stream_handler/openai.rs | 22 ++ .../src/stream_handler/responses.rs | 94 ++++++-- .../src/stream_handler/stream_stats.rs | 148 ++++++++++++ .../service/remote_connect/remote_server.rs | 7 +- src/crates/events/src/agentic.rs | 2 + src/crates/transport/src/adapters/tauri.rs | 2 + .../transport/src/adapters/websocket.rs | 18 ++ .../src/flow_chat/services/EventBatcher.ts | 155 ++++++++++--- .../src/flow_chat/services/FlowChatManager.ts | 3 +- .../flow-chat-manager/EventHandlerModule.ts | 45 ++-- .../flow-chat-manager/SubagentModule.ts | 26 +-- .../flow-chat-manager/TextChunkModule.ts | 78 +------ .../flow-chat-manager/ToolEventModule.ts | 210 +++++++++++++----- .../services/flow-chat-manager/index.ts | 4 +- .../services/flow-chat-manager/types.ts | 9 +- .../api/service-api/AgentAPI.ts | 3 +- .../config/services/FrontendLogLevelSync.ts | 110 +++++++++ .../src/locales/en-US/settings/logging.json | 2 +- .../src/locales/zh-CN/settings/logging.json | 2 +- src/web-ui/src/main.tsx | 11 +- 25 files changed, 762 insertions(+), 237 deletions(-) create mode 100644 src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/stream_stats.rs create mode 100644 src/web-ui/src/infrastructure/config/services/FrontendLogLevelSync.ts diff --git a/src/crates/core/src/agentic/execution/stream_processor.rs b/src/crates/core/src/agentic/execution/stream_processor.rs index 761fa888..c2b8861e 100644 --- a/src/crates/core/src/agentic/execution/stream_processor.rs +++ b/src/crates/core/src/agentic/execution/stream_processor.rs @@ -294,11 +294,11 @@ impl StreamProcessor { // ==================== Helper Methods ==================== - /// Send thinking end marker (if needed) + /// Send thinking end event (if needed) async fn send_thinking_end_if_needed(&self, ctx: &mut StreamContext) { if ctx.thinking_chunks_count > 0 && !ctx.thinking_completed_sent { ctx.thinking_completed_sent = true; - debug!("Thinking process ended, sending ThinkingChunk with end marker"); + debug!("Thinking process ended, sending ThinkingChunk end event"); let _ = self .event_queue .enqueue( @@ -306,7 +306,8 @@ impl StreamProcessor { session_id: ctx.session_id.clone(), turn_id: ctx.dialog_turn_id.clone(), round_id: ctx.round_id.clone(), - content: "".to_string(), + content: String::new(), + is_end: true, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, Some(EventPriority::Normal), @@ -578,6 +579,7 @@ impl StreamProcessor { turn_id: ctx.dialog_turn_id.clone(), round_id: ctx.round_id.clone(), content: thinking_content, + is_end: false, subagent_parent_info: ctx.event_subagent_parent_info.clone(), }, Some(EventPriority::Normal), diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/Cargo.toml b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/Cargo.toml index fd58fc03..5b5718d1 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/Cargo.toml +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +chrono = { workspace = true } eventsource-stream = { workspace = true } futures = { workspace = true } reqwest = { workspace = true } diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/anthropic.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/anthropic.rs index c00e3e5a..60593ae5 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/anthropic.rs +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/anthropic.rs @@ -1,3 +1,4 @@ +use super::stream_stats::StreamStats; use crate::types::anthropic::{ AnthropicSSEError, ContentBlock, ContentBlockDelta, ContentBlockStart, MessageDelta, MessageStart, Usage, @@ -26,6 +27,7 @@ pub async fn handle_anthropic_stream( let mut stream = response.bytes_stream().eventsource(); let idle_timeout = Duration::from_secs(600); let mut usage = Usage::default(); + let mut stats = StreamStats::new("Anthropic"); loop { let sse_event = timeout(idle_timeout, stream.next()).await; @@ -33,18 +35,21 @@ pub async fn handle_anthropic_stream( Ok(Some(Ok(sse))) => sse, Ok(None) => { let error_msg = "SSE Error: stream closed before response completed"; + stats.log_summary("stream_closed_before_completion"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Ok(Some(Err(e))) => { let error_msg = format!("SSE Error: {}", e); + stats.log_summary("sse_stream_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Err(_) => { let error_msg = "SSE Timeout: idle timeout waiting for SSE"; + stats.log_summary("sse_stream_timeout"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -54,6 +59,7 @@ pub async fn handle_anthropic_stream( trace!("Anthropic SSE: {:?}", sse); let event_type = sse.event; let data = sse.data; + stats.record_sse_event(&event_type); if let Some(ref tx) = tx_raw_sse { let _ = tx.send(format!("[{}] {}", event_type, data)); @@ -64,6 +70,7 @@ pub async fn handle_anthropic_stream( let message_start: MessageStart = match serde_json::from_str(&data) { Ok(message_start) => message_start, Err(e) => { + stats.increment("error:sse_parsing"); let err_str = format!("SSE Parsing Error: {e}, data: {}", &data); error!("{}", err_str); continue; @@ -77,6 +84,7 @@ pub async fn handle_anthropic_stream( let content_block_start: ContentBlockStart = match serde_json::from_str(&data) { Ok(content_block_start) => content_block_start, Err(e) => { + stats.increment("error:sse_parsing"); let err_str = format!("SSE Parsing Error: {e}, data: {}", &data); error!("{}", err_str); continue; @@ -88,6 +96,7 @@ pub async fn handle_anthropic_stream( ) { let unified_response = UnifiedResponse::from(content_block_start); trace!("Anthropic unified response: {:?}", unified_response); + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } } @@ -95,6 +104,7 @@ pub async fn handle_anthropic_stream( let content_block_delta: ContentBlockDelta = match serde_json::from_str(&data) { Ok(content_block_delta) => content_block_delta, Err(e) => { + stats.increment("error:sse_parsing"); let err_str = format!("SSE Parsing Error: {e}, data: {}", &data); error!("{}", err_str); continue; @@ -103,9 +113,11 @@ pub async fn handle_anthropic_stream( match UnifiedResponse::try_from(content_block_delta) { Ok(unified_response) => { trace!("Anthropic unified response: {:?}", unified_response); + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } Err(e) => { + stats.increment("skip:invalid_content_block_delta"); error!("Skipping invalid content_block_delta: {}", e); } }; @@ -114,6 +126,7 @@ pub async fn handle_anthropic_stream( let mut message_delta: MessageDelta = match serde_json::from_str(&data) { Ok(message_delta) => message_delta, Err(e) => { + stats.increment("error:sse_parsing"); let err_str = format!("SSE Parsing Error: {e}, data: {}", &data); error!("{}", err_str); continue; @@ -129,22 +142,28 @@ pub async fn handle_anthropic_stream( }; let unified_response = UnifiedResponse::from(message_delta); trace!("Anthropic unified response: {:?}", unified_response); + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } "error" => { let sse_error: AnthropicSSEError = match serde_json::from_str(&data) { Ok(message_delta) => message_delta, Err(e) => { + stats.increment("error:sse_parsing"); let err_str = format!("SSE Parsing Error: {e}, data: {}", &data); + stats.log_summary("sse_parsing_error"); error!("{}", err_str); let _ = tx_event.send(Err(anyhow!(err_str))); return; } }; + stats.increment("error:api"); + stats.log_summary("error_event_received"); let _ = tx_event.send(Err(anyhow!(String::from(sse_error.error)))); return; } "message_stop" => { + stats.log_summary("message_stop"); return; } _ => {} diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/gemini.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/gemini.rs index 395ea7d8..957ea6bb 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/gemini.rs +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/gemini.rs @@ -1,3 +1,4 @@ +use super::stream_stats::StreamStats; use crate::types::gemini::GeminiSSEData; use crate::types::unified::UnifiedResponse; use anyhow::{anyhow, Result}; @@ -79,6 +80,7 @@ pub async fn handle_gemini_stream( let idle_timeout = Duration::from_secs(600); let mut received_finish_reason = false; let mut tool_call_state = GeminiToolCallState::new(); + let mut stats = StreamStats::new("Gemini"); loop { let sse_event = timeout(idle_timeout, stream.next()).await; @@ -86,15 +88,18 @@ pub async fn handle_gemini_stream( Ok(Some(Ok(sse))) => sse, Ok(None) => { if received_finish_reason { + stats.log_summary("stream_closed_after_finish_reason"); return; } let error_msg = "Gemini SSE stream closed before response completed"; + stats.log_summary("stream_closed_before_completion"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Ok(Some(Err(e))) => { let error_msg = format!("Gemini SSE stream error: {}", e); + stats.log_summary("sse_stream_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -104,6 +109,7 @@ pub async fn handle_gemini_stream( "Gemini SSE stream timeout after {}s", idle_timeout.as_secs() ); + stats.log_summary("sse_stream_timeout"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -111,6 +117,7 @@ pub async fn handle_gemini_stream( }; let raw = sse.data; + stats.record_sse_event("data"); trace!("Gemini SSE: {:?}", raw); if let Some(ref tx) = tx_raw_sse { @@ -118,6 +125,8 @@ pub async fn handle_gemini_stream( } if raw == "[DONE]" { + stats.increment("marker:done"); + stats.log_summary("done_marker_received"); return; } @@ -125,6 +134,8 @@ pub async fn handle_gemini_stream( Ok(json) => json, Err(e) => { let error_msg = format!("Gemini SSE parsing error: {}, data: {}", e, raw); + stats.increment("error:sse_parsing"); + stats.log_summary("sse_parsing_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -133,6 +144,8 @@ pub async fn handle_gemini_stream( if let Some(message) = extract_api_error_message(&event_json) { let error_msg = format!("Gemini SSE API error: {}, data: {}", message, raw); + stats.increment("error:api"); + stats.log_summary("sse_api_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -142,6 +155,8 @@ pub async fn handle_gemini_stream( Ok(data) => data, Err(e) => { let error_msg = format!("Gemini SSE data schema error: {}, data: {}", e, raw); + stats.increment("error:schema"); + stats.log_summary("sse_data_schema_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -149,6 +164,9 @@ pub async fn handle_gemini_stream( }; let mut unified_responses = sse_data.into_unified_responses(); + if unified_responses.is_empty() { + stats.increment("skip:empty_unified_responses"); + } for unified_response in &mut unified_responses { if let Some(tool_call) = unified_response.tool_call.as_mut() { tool_call_state.assign_id(tool_call); @@ -163,6 +181,7 @@ pub async fn handle_gemini_stream( } for unified_response in unified_responses { + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } } diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/mod.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/mod.rs index 865f0ac4..31f2b8c4 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/mod.rs +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/mod.rs @@ -1,3 +1,4 @@ +mod stream_stats; mod anthropic; mod gemini; mod openai; diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/openai.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/openai.rs index df5216d4..a5a86ef8 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/openai.rs +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/openai.rs @@ -1,3 +1,4 @@ +use super::stream_stats::StreamStats; use crate::types::openai::OpenAISSEData; use crate::types::unified::UnifiedResponse; use anyhow::{anyhow, Result}; @@ -43,6 +44,7 @@ pub async fn handle_openai_stream( ) { let mut stream = response.bytes_stream().eventsource(); let idle_timeout = Duration::from_secs(600); + let mut stats = StreamStats::new("OpenAI"); // Track whether a chunk with `finish_reason` was received. // Some providers (e.g. MiniMax) close the stream after the final chunk // without sending `[DONE]`, so we treat `Ok(None)` as a normal termination @@ -55,21 +57,25 @@ pub async fn handle_openai_stream( Ok(Some(Ok(sse))) => sse, Ok(None) => { if received_finish_reason { + stats.log_summary("stream_closed_after_finish_reason"); return; } let error_msg = "SSE stream closed before response completed"; + stats.log_summary("stream_closed_before_completion"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Ok(Some(Err(e))) => { let error_msg = format!("SSE stream error: {}", e); + stats.log_summary("sse_stream_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Err(_) => { let error_msg = format!("SSE stream timeout after {}s", idle_timeout.as_secs()); + stats.log_summary("sse_stream_timeout"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -77,11 +83,14 @@ pub async fn handle_openai_stream( }; let raw = sse.data; + stats.record_sse_event("data"); trace!("OpenAI SSE: {:?}", raw); if let Some(ref tx) = tx_raw_sse { let _ = tx.send(raw.clone()); } if raw == "[DONE]" { + stats.increment("marker:done"); + stats.log_summary("done_marker_received"); return; } @@ -89,6 +98,8 @@ pub async fn handle_openai_stream( Ok(json) => json, Err(e) => { let error_msg = format!("SSE parsing error: {}, data: {}", e, &raw); + stats.increment("error:sse_parsing"); + stats.log_summary("sse_parsing_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -97,12 +108,15 @@ pub async fn handle_openai_stream( if let Some(api_error_message) = extract_sse_api_error_message(&event_json) { let error_msg = format!("SSE API error: {}, data: {}", api_error_message, raw); + stats.increment("error:api"); + stats.log_summary("sse_api_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } if !is_valid_chat_completion_chunk_weak(&event_json) { + stats.increment("skip:non_standard_event"); warn!( "Skipping non-standard OpenAI SSE event; object={}", event_json @@ -113,10 +127,13 @@ pub async fn handle_openai_stream( continue; } + stats.increment("chunk:chat_completion"); let sse_data: OpenAISSEData = match serde_json::from_value(event_json) { Ok(event) => event, Err(e) => { let error_msg = format!("SSE data schema error: {}, data: {}", e, &raw); + stats.increment("error:schema"); + stats.log_summary("sse_data_schema_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -125,6 +142,7 @@ pub async fn handle_openai_stream( let tool_call_count = sse_data.first_choice_tool_call_count(); if tool_call_count > 1 { + stats.increment("chunk:multi_tool_call"); warn!( "OpenAI SSE chunk contains {} tool calls in the first choice; splitting and sending sequentially", tool_call_count @@ -136,6 +154,7 @@ pub async fn handle_openai_stream( trace!("OpenAI unified responses: {:?}", unified_responses); if unified_responses.is_empty() { if has_empty_choices { + stats.increment("skip:empty_choices_no_usage"); warn!( "Ignoring OpenAI SSE chunk with empty choices and no usage payload: {}", raw @@ -146,6 +165,8 @@ pub async fn handle_openai_stream( // Defensive fallback: this should be unreachable if OpenAISSEData::into_unified_responses // keeps returning at least one event for all non-empty-choices chunks. let error_msg = format!("OpenAI SSE chunk produced no unified events, data: {}", raw); + stats.increment("error:no_unified_events"); + stats.log_summary("no_unified_events"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -155,6 +176,7 @@ pub async fn handle_openai_stream( if unified_response.finish_reason.is_some() { received_finish_reason = true; } + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } } diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/responses.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/responses.rs index 7d38aac1..48bd55a3 100644 --- a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/responses.rs +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/responses.rs @@ -1,3 +1,4 @@ +use super::stream_stats::StreamStats; use crate::types::responses::{ parse_responses_output_item, ResponsesCompleted, ResponsesDone, ResponsesStreamEvent, }; @@ -45,10 +46,12 @@ impl InProgressToolCall { fn emit_tool_call_item( tx_event: &mpsc::UnboundedSender>, + stats: &mut StreamStats, item_value: Value, ) { if let Some(unified_response) = parse_responses_output_item(item_value) { if unified_response.tool_call.is_some() { + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } } @@ -68,6 +71,7 @@ fn cleanup_tool_call_tracking( fn handle_function_call_output_item_done( tx_event: &mpsc::UnboundedSender>, + stats: &mut StreamStats, event_output_index: Option, item_value: Value, tool_calls_by_output_index: &mut HashMap, @@ -82,14 +86,14 @@ fn handle_function_call_output_item_done( }); let Some(output_index) = output_index else { - emit_tool_call_item(tx_event, item_value); + emit_tool_call_item(tx_event, stats, item_value); return; }; let Some(tc) = tool_calls_by_output_index.get_mut(&output_index) else { // The provider may send `output_item.done` with an output_index even when the // earlier `output_item.added` event was omitted or missed. Fall back to the full item. - emit_tool_call_item(tx_event, item_value); + emit_tool_call_item(tx_event, stats, item_value); return; }; @@ -117,14 +121,16 @@ fn handle_function_call_output_item_done( tc.sent_header = true; (tc.call_id.clone(), tc.name.clone()) }; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { tool_call: Some(crate::types::unified::UnifiedToolCall { id, name, arguments: Some(delta), }), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); } } @@ -165,6 +171,7 @@ pub async fn handle_responses_stream( let mut received_text_delta = false; let mut tool_calls_by_output_index: HashMap = HashMap::new(); let mut tool_call_index_by_id: HashMap = HashMap::new(); + let mut stats = StreamStats::new("Responses"); loop { let sse_event = timeout(idle_timeout, stream.next()).await; @@ -172,15 +179,18 @@ pub async fn handle_responses_stream( Ok(Some(Ok(sse))) => sse, Ok(None) => { if received_finish_reason { + stats.log_summary("stream_closed_after_finish_reason"); return; } let error_msg = "Responses SSE stream closed before response completed"; + stats.log_summary("stream_closed_before_completion"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } Ok(Some(Err(e))) => { let error_msg = format!("Responses SSE stream error: {}", e); + stats.log_summary("sse_stream_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -190,6 +200,7 @@ pub async fn handle_responses_stream( "Responses SSE stream timeout after {}s", idle_timeout.as_secs() ); + stats.log_summary("sse_stream_timeout"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -197,11 +208,14 @@ pub async fn handle_responses_stream( }; let raw = sse.data; + stats.record_sse_event("data"); trace!("Responses SSE: {:?}", raw); if let Some(ref tx) = tx_raw_sse { let _ = tx.send(raw.clone()); } if raw == "[DONE]" { + stats.increment("marker:done"); + stats.log_summary("done_marker_received"); return; } @@ -209,6 +223,8 @@ pub async fn handle_responses_stream( Ok(json) => json, Err(e) => { let error_msg = format!("Responses SSE parsing error: {}, data: {}", e, &raw); + stats.increment("error:sse_parsing"); + stats.log_summary("sse_parsing_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -220,6 +236,8 @@ pub async fn handle_responses_stream( "Responses SSE API error: {}, data: {}", api_error_message, raw ); + stats.increment("error:api"); + stats.log_summary("sse_api_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -229,11 +247,14 @@ pub async fn handle_responses_stream( Ok(event) => event, Err(e) => { let error_msg = format!("Responses SSE schema error: {}, data: {}", e, &raw); + stats.increment("error:schema"); + stats.log_summary("sse_schema_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } }; + stats.increment(format!("event:{}", event.kind)); match event.kind.as_str() { "response.output_item.added" => { @@ -251,18 +272,22 @@ pub async fn handle_responses_stream( "response.output_text.delta" => { if let Some(delta) = event.delta.filter(|delta| !delta.is_empty()) { received_text_delta = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { text: Some(delta), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); } } "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => { if let Some(delta) = event.delta.filter(|delta| !delta.is_empty()) { - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { reasoning_content: Some(delta), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); } } "response.function_call_arguments.delta" => { @@ -288,14 +313,16 @@ pub async fn handle_responses_stream( (tc.call_id.clone(), tc.name.clone()) }; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { tool_call: Some(crate::types::unified::UnifiedToolCall { id, name, arguments: Some(delta), }), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); } "response.output_item.done" => { let Some(item_value) = event.item else { @@ -306,6 +333,7 @@ pub async fn handle_responses_stream( if item_value.get("type").and_then(Value::as_str) == Some("function_call") { handle_function_call_output_item_done( &tx_event, + &mut stats, event.output_index, item_value, &mut tool_calls_by_output_index, @@ -319,6 +347,7 @@ pub async fn handle_responses_stream( unified_response.text = None; } if unified_response.text.is_some() || unified_response.tool_call.is_some() { + stats.record_unified_response(&unified_response); let _ = tx_event.send(Ok(unified_response)); } } @@ -353,14 +382,16 @@ pub async fn handle_responses_stream( tc.sent_header = true; (tc.call_id.clone(), tc.name.clone()) }; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { tool_call: Some(crate::types::unified::UnifiedToolCall { id, name, arguments: Some(delta), }), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); } } } @@ -372,26 +403,32 @@ pub async fn handle_responses_stream( { Some(Ok(response)) => { received_finish_reason = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { usage: response.usage.map(Into::into), finish_reason: Some("stop".to_string()), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); continue; } Some(Err(e)) => { let error_msg = format!("Failed to parse response.completed payload: {}", e); + stats.increment("error:completed_payload"); + stats.log_summary("response_completed_parse_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } None => { received_finish_reason = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { finish_reason: Some("stop".to_string()), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); continue; } } @@ -403,25 +440,31 @@ pub async fn handle_responses_stream( match event.response.map(serde_json::from_value::) { Some(Ok(response)) => { received_finish_reason = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { usage: response.usage.map(Into::into), finish_reason: Some("stop".to_string()), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); continue; } Some(Err(e)) => { let error_msg = format!("Failed to parse response.done payload: {}", e); + stats.increment("error:done_payload"); + stats.log_summary("response_done_parse_error"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; } None => { received_finish_reason = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { finish_reason: Some("stop".to_string()), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); continue; } } @@ -435,6 +478,8 @@ pub async fn handle_responses_stream( .and_then(Value::as_str) .unwrap_or("Responses API returned response.failed") .to_string(); + stats.increment("error:failed"); + stats.log_summary("response_failed"); error!("{}", error_msg); let _ = tx_event.send(Err(anyhow!(error_msg))); return; @@ -466,11 +511,13 @@ pub async fn handle_responses_stream( .map(Into::into); received_finish_reason = true; - let _ = tx_event.send(Ok(UnifiedResponse { + let unified_response = UnifiedResponse { usage, finish_reason: Some(finish_reason), ..Default::default() - })); + }; + stats.record_unified_response(&unified_response); + let _ = tx_event.send(Ok(unified_response)); continue; } _ => {} @@ -481,6 +528,7 @@ pub async fn handle_responses_stream( #[cfg(test)] mod tests { use super::{ + super::stream_stats::StreamStats, extract_api_error_message, handle_function_call_output_item_done, InProgressToolCall, }; use serde_json::json; @@ -534,9 +582,11 @@ mod tests { let (tx_event, mut rx_event) = mpsc::unbounded_channel(); let mut tool_calls_by_output_index: HashMap = HashMap::new(); let mut tool_call_index_by_id: HashMap = HashMap::new(); + let mut stats = StreamStats::new("Responses"); handle_function_call_output_item_done( &tx_event, + &mut stats, Some(3), json!({ "type": "function_call", diff --git a/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/stream_stats.rs b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/stream_stats.rs new file mode 100644 index 00000000..ecad7abd --- /dev/null +++ b/src/crates/core/src/infrastructure/ai/ai_stream_handlers/src/stream_handler/stream_stats.rs @@ -0,0 +1,148 @@ +use crate::types::unified::UnifiedResponse; +use chrono::{DateTime, Local}; +use log::debug; +use std::collections::BTreeMap; +use std::time::Instant; + +#[derive(Debug)] +pub(super) struct StreamStats { + provider: &'static str, + started_at: Instant, + started_at_wall: DateTime, + first_event_at: Option, + first_event_at_wall: Option>, + last_event_at: Option, + last_event_at_wall: Option>, + total_sse_events: usize, + total_unified_responses: usize, + counters: BTreeMap, +} + +impl StreamStats { + pub(super) fn new(provider: &'static str) -> Self { + Self { + provider, + started_at: Instant::now(), + started_at_wall: Local::now(), + first_event_at: None, + first_event_at_wall: None, + last_event_at: None, + last_event_at_wall: None, + total_sse_events: 0, + total_unified_responses: 0, + counters: BTreeMap::new(), + } + } + + pub(super) fn record_sse_event(&mut self, event_kind: impl AsRef) { + let now = Instant::now(); + let now_wall = Local::now(); + if self.first_event_at.is_none() { + self.first_event_at = Some(now); + self.first_event_at_wall = Some(now_wall); + } + self.last_event_at = Some(now); + self.last_event_at_wall = Some(now_wall); + self.total_sse_events += 1; + self.increment(format!("sse:{}", event_kind.as_ref())); + } + + pub(super) fn increment(&mut self, label: impl Into) { + *self.counters.entry(label.into()).or_insert(0) += 1; + } + + pub(super) fn record_unified_response(&mut self, response: &UnifiedResponse) { + self.total_unified_responses += 1; + + let mut classified = false; + + if response.text.is_some() { + self.increment("out:text"); + classified = true; + } + if response.reasoning_content.is_some() { + self.increment("out:reasoning"); + classified = true; + } + if response.tool_call.is_some() { + self.increment("out:tool_call"); + classified = true; + } + if response.usage.is_some() { + self.increment("out:usage"); + classified = true; + } + if response.finish_reason.is_some() { + self.increment("out:finish_reason"); + classified = true; + } + if response.thinking_signature.is_some() { + self.increment("out:thinking_signature"); + classified = true; + } + if response.provider_metadata.is_some() { + self.increment("out:provider_metadata"); + classified = true; + } + + if !classified { + self.increment("out:other"); + } + } + + pub(super) fn log_summary(&self, reason: &str) { + let ended_at_wall = Local::now(); + let wall_elapsed = self.started_at.elapsed(); + let wall_elapsed_ms = wall_elapsed.as_millis(); + let first_event_latency_ms = self + .first_event_at + .map(|instant| instant.duration_since(self.started_at).as_millis()) + .unwrap_or(0); + let receive_elapsed_secs = match (self.first_event_at, self.last_event_at) { + (Some(first), Some(last)) => last.duration_since(first).as_secs_f64(), + _ => 0.0, + }; + let receive_elapsed_ms = (receive_elapsed_secs * 1000.0).round() as u128; + let unified_response_rate_per_sec = if receive_elapsed_secs > 0.0 { + self.total_unified_responses as f64 / receive_elapsed_secs + } else { + 0.0 + }; + let started_at = self.started_at_wall.format("%Y-%m-%d %H:%M:%S%.3f"); + let first_event_at = self + .first_event_at_wall + .map(|value| value.format("%Y-%m-%d %H:%M:%S%.3f").to_string()) + .unwrap_or_else(|| "none".to_string()); + let last_event_at = self + .last_event_at_wall + .map(|value| value.format("%Y-%m-%d %H:%M:%S%.3f").to_string()) + .unwrap_or_else(|| "none".to_string()); + let ended_at = ended_at_wall.format("%Y-%m-%d %H:%M:%S%.3f"); + let counter_lines = if self.counters.is_empty() { + "counter.none=0".to_string() + } else { + self.counters + .iter() + .map(|(label, count)| format!("counter.{}={}", label, count)) + .collect::>() + .join("\n") + }; + + debug!( + "{} stream stats\nreason={}\nstarted_at={}\nfirst_event_at={}\nlast_event_at={}\nended_at={}\ntotal_sse_events={}\ntotal_unified_responses={}\nfirst_event_latency_ms={}\nreceive_elapsed_ms={}\nwall_elapsed_ms={}\nunified_response_rate_per_sec={:.2}\n{}", + self.provider, + reason, + started_at, + first_event_at, + last_event_at, + ended_at, + self.total_sse_events, + self.total_unified_responses, + first_event_latency_ms, + receive_elapsed_ms, + wall_elapsed_ms, + unified_response_rate_per_sec, + counter_lines + ); + } +} diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index e893fcc7..0cf6ec39 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -1219,9 +1219,8 @@ impl RemoteSessionStateTracker { self.bump_version(); let _ = self.event_tx.send(TrackerEvent::TextChunk(text.clone())); } - AE::ThinkingChunk { content, .. } => { + AE::ThinkingChunk { content, is_end, .. } => { let clean = content - .replace("", "") .replace("", "") .replace("", ""); let subagent_marker = if is_subagent { Some(true) } else { None }; @@ -1245,9 +1244,9 @@ impl RemoteSessionStateTracker { } drop(s); self.bump_version(); - if content == "" { + if *is_end { let _ = self.event_tx.send(TrackerEvent::ThinkingEnd); - } else { + } else if !content.is_empty() { let _ = self .event_tx .send(TrackerEvent::ThinkingChunk(content.clone())); diff --git a/src/crates/events/src/agentic.rs b/src/crates/events/src/agentic.rs index 9735e8fc..f61264eb 100644 --- a/src/crates/events/src/agentic.rs +++ b/src/crates/events/src/agentic.rs @@ -166,6 +166,8 @@ pub enum AgenticEvent { turn_id: String, round_id: String, content: String, + #[serde(default)] + is_end: bool, subagent_parent_info: Option, }, diff --git a/src/crates/transport/src/adapters/tauri.rs b/src/crates/transport/src/adapters/tauri.rs index b8eb0869..5a933e10 100644 --- a/src/crates/transport/src/adapters/tauri.rs +++ b/src/crates/transport/src/adapters/tauri.rs @@ -155,6 +155,7 @@ impl TransportAdapter for TauriTransportAdapter { turn_id, round_id, content, + is_end, subagent_parent_info, } => { self.app_handle.emit( @@ -165,6 +166,7 @@ impl TransportAdapter for TauriTransportAdapter { "roundId": round_id, "text": content, "contentType": "thinking", + "isThinkingEnd": is_end, "subagentParentInfo": subagent_parent_info, }), )?; diff --git a/src/crates/transport/src/adapters/websocket.rs b/src/crates/transport/src/adapters/websocket.rs index 889ad219..0a009634 100644 --- a/src/crates/transport/src/adapters/websocket.rs +++ b/src/crates/transport/src/adapters/websocket.rs @@ -121,6 +121,24 @@ impl TransportAdapter for WebSocketTransportAdapter { "text": text, }) } + AgenticEvent::ThinkingChunk { + session_id, + turn_id, + round_id, + content, + is_end, + .. + } => { + json!({ + "type": "text-chunk", + "sessionId": session_id, + "turnId": turn_id, + "roundId": round_id, + "text": content, + "contentType": "thinking", + "isThinkingEnd": is_end, + }) + } AgenticEvent::ToolEvent { session_id, turn_id, diff --git a/src/web-ui/src/flow_chat/services/EventBatcher.ts b/src/web-ui/src/flow_chat/services/EventBatcher.ts index 3a905be5..f2d7459b 100644 --- a/src/web-ui/src/flow_chat/services/EventBatcher.ts +++ b/src/web-ui/src/flow_chat/services/EventBatcher.ts @@ -20,19 +20,18 @@ export interface BatchedEvent { payload: T; strategy: MergeStrategy; accumulator?: (existing: T, incoming: T) => T; + sourceCount: number; timestamp: number; } export interface EventBatcherOptions { onFlush: (events: Array<{ key: string; payload: any }>) => void; - debug?: boolean; } export class EventBatcher { private buffer: Map = new Map(); private scheduled = false; private onFlush: (events: Array<{ key: string; payload: any }>) => void; - private debug: boolean; private frameId: number | null = null; // Update frequency control to prevent UI blocking with many events @@ -42,7 +41,6 @@ export class EventBatcher { constructor(options: EventBatcherOptions) { this.onFlush = options.onFlush; - this.debug = options.debug ?? false; } add( @@ -61,22 +59,20 @@ export class EventBatcher { existing.payload = payload; existing.timestamp = Date.now(); } + existing.sourceCount += 1; - if (this.debug) { - log.debug('Merged event', { key, strategy }); - } + log.trace('Merged event', { key, strategy }); } else { this.buffer.set(key, { key, payload, strategy, accumulator, + sourceCount: 1, timestamp: Date.now() }); - if (this.debug) { - log.debug('Added new event', { key, strategy }); - } + log.trace('Added new event', { key, strategy }); } this.scheduleFlush(); @@ -114,34 +110,38 @@ export class EventBatcher { if (this.buffer.size === 0) return; const startTime = performance.now(); - const bufferSize = this.buffer.size; + const bufferedEvents = Array.from(this.buffer.values()); + const mergedEventCount = bufferedEvents.length; + const rawEventCount = bufferedEvents.reduce((count, event) => count + event.sourceCount, 0); - const events = Array.from(this.buffer.values()).map(({ key, payload }) => ({ + const events = bufferedEvents.map(({ key, payload }) => ({ key, payload })); - if (this.debug) { - log.debug('Flushing batched events', { count: events.length }); - } + log.trace('Flushing batched events', { + rawEventCount, + mergedEventCount, + mergedEvents: bufferedEvents.map(({ key, payload, strategy, sourceCount, timestamp }) => ({ + key, + strategy, + sourceCount, + timestamp, + payload + })) + }); this.buffer = new Map(); this.onFlush(events); const duration = performance.now() - startTime; - if (this.debug || duration > 10) { - log.warn('Event batch processing took longer than expected', { - eventCount: bufferSize, + if (duration > 10) { + log.warn('Event batch processing took longer than expected', { + rawEventCount, + mergedEventCount, duration: duration.toFixed(2) }); } - if (duration > 16.67) { - log.error('Event batch processing exceeded frame time', { - eventCount: bufferSize, - duration: duration.toFixed(2), - frameTime: 16.67 - }); - } } flushNow(): void { @@ -172,10 +172,6 @@ export class EventBatcher { } this.buffer.clear(); this.scheduled = false; - - if (this.debug) { - log.debug('Buffer cleared'); - } } destroy(): void { @@ -189,23 +185,111 @@ export interface SubagentParentInfo { dialogTurnId: string; } +export type ToolEventType = + | 'EarlyDetected' + | 'ParamsPartial' + | 'Queued' + | 'Waiting' + | 'Started' + | 'Progress' + | 'Streaming' + | 'StreamChunk' + | 'ConfirmationNeeded' + | 'Confirmed' + | 'Rejected' + | 'Completed' + | 'Failed' + | 'Cancelled'; + +interface BaseToolEvent { + event_type: T; + tool_id: string; + tool_name: string; +} + +export interface EarlyDetectedToolEvent extends BaseToolEvent<'EarlyDetected'> {} + +export interface ParamsPartialToolEvent extends BaseToolEvent<'ParamsPartial'> { + params: string; +} + +export interface QueuedToolEvent extends BaseToolEvent<'Queued'> { + position: number; +} + +export interface WaitingToolEvent extends BaseToolEvent<'Waiting'> { + dependencies: string[]; +} + +export interface StartedToolEvent extends BaseToolEvent<'Started'> { + params: unknown; +} + +export interface ProgressToolEvent extends BaseToolEvent<'Progress'> { + message: string; + percentage: number; +} + +export interface StreamingToolEvent extends BaseToolEvent<'Streaming'> { + chunks_received: number; +} + +export interface StreamChunkToolEvent extends BaseToolEvent<'StreamChunk'> { + data: unknown; +} + +export interface ConfirmationNeededToolEvent extends BaseToolEvent<'ConfirmationNeeded'> { + params: unknown; +} + +export interface ConfirmedToolEvent extends BaseToolEvent<'Confirmed'> {} + +export interface RejectedToolEvent extends BaseToolEvent<'Rejected'> {} + +export interface CompletedToolEvent extends BaseToolEvent<'Completed'> { + result: unknown; + result_for_assistant?: string; + duration_ms: number; +} + +export interface FailedToolEvent extends BaseToolEvent<'Failed'> { + error: string; +} + +export interface CancelledToolEvent extends BaseToolEvent<'Cancelled'> { + reason: string; +} + +export type FlowToolEvent = + | EarlyDetectedToolEvent + | ParamsPartialToolEvent + | QueuedToolEvent + | WaitingToolEvent + | StartedToolEvent + | ProgressToolEvent + | StreamingToolEvent + | StreamChunkToolEvent + | ConfirmationNeededToolEvent + | ConfirmedToolEvent + | RejectedToolEvent + | CompletedToolEvent + | FailedToolEvent + | CancelledToolEvent; + export interface TextChunkEventData { sessionId: string; turnId: string; roundId: string; text: string; contentType: 'text' | 'thinking'; + isThinkingEnd?: boolean; subagentParentInfo?: SubagentParentInfo; } export interface ToolEventData { sessionId: string; turnId: string; - toolEvent: { - tool_id: string; - eventType: string; - [key: string]: any; - }; + toolEvent: FlowToolEvent; subagentParentInfo?: SubagentParentInfo; } @@ -240,9 +324,10 @@ export function generateTextChunkKey(data: TextChunkEventData): string { */ export function generateToolEventKey(data: ToolEventData): { key: string; strategy: MergeStrategy } | null { const { sessionId, toolEvent, subagentParentInfo } = data; - const { tool_id: toolUseId, eventType } = toolEvent; + const toolUseId = toolEvent.tool_id; + const eventType = toolEvent.event_type; - const isolatedEvents = ['Detected', 'Started', 'Completed', 'Failed', 'Cancelled', 'ConfirmationNeeded']; + const isolatedEvents: ToolEventType[] = ['EarlyDetected', 'Started', 'Completed', 'Failed', 'Cancelled', 'ConfirmationNeeded']; if (isolatedEvents.includes(eventType)) { return null; } diff --git a/src/web-ui/src/flow_chat/services/FlowChatManager.ts b/src/web-ui/src/flow_chat/services/FlowChatManager.ts index fd3e0d6b..265d2ba3 100644 --- a/src/web-ui/src/flow_chat/services/FlowChatManager.ts +++ b/src/web-ui/src/flow_chat/services/FlowChatManager.ts @@ -48,8 +48,7 @@ export class FlowChatManager { flowChatStore: FlowChatStore.getInstance(), processingManager: processingStatusManager, eventBatcher: new EventBatcher({ - onFlush: (events) => this.processBatchedEvents(events), - debug: false + onFlush: (events) => this.processBatchedEvents(events) }), contentBuffers: new Map(), activeTextItems: new Map(), diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts index 730e1549..2d42f7f1 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts @@ -11,8 +11,11 @@ import { generateTextChunkKey, generateToolEventKey, parseEventKey, + type FlowToolEvent, + type SubagentParentInfo, type TextChunkEventData, - type ToolEventData + type ToolEventData, + type ParamsPartialToolEvent } from '../EventBatcher'; import { notificationService } from '../../../shared/notification-system'; import { createLogger } from '@/shared/utils/logger'; @@ -30,13 +33,13 @@ import { import { processNormalTextChunkInternal, processThinkingChunkInternal, - processToolParamsPartialInternal, - processToolProgressInternal, completeActiveTextItems, cleanupSessionBuffers } from './TextChunkModule'; import { processToolEvent, + processToolParamsPartialInternal, + processToolProgressInternal, handleToolExecutionProgress } from './ToolEventModule'; import { @@ -527,7 +530,7 @@ function handleDialogTurnStarted(context: FlowChatContext, event: any): void { * Handle text chunk event */ function handleTextChunk(context: FlowChatContext, event: any): void { - const { sessionId, turnId, roundId, text, contentType = 'text', subagentParentInfo } = event; + const { sessionId, turnId, roundId, text, contentType = 'text', isThinkingEnd = false, subagentParentInfo } = event; const parentSessionId = subagentParentInfo?.sessionId; const parentTurnId = subagentParentInfo?.dialogTurnId; @@ -570,6 +573,7 @@ function handleTextChunk(context: FlowChatContext, event: any): void { roundId, text, contentType: contentType as 'text' | 'thinking', + isThinkingEnd, subagentParentInfo }; @@ -581,7 +585,8 @@ function handleTextChunk(context: FlowChatContext, event: any): void { 'accumulate', (existing, incoming) => ({ ...existing, - text: existing.text + incoming.text + text: existing.text + incoming.text, + isThinkingEnd: existing.isThinkingEnd || incoming.isThinkingEnd }) ); } @@ -607,7 +612,7 @@ export function processBatchedEvents( if (eventType === 'text') { if (isSubagent) { - const { sessionId, turnId, roundId, text, contentType, subagentParentInfo } = payload; + const { sessionId, turnId, roundId, text, contentType, isThinkingEnd, subagentParentInfo } = payload; const parentSessionId = subagentParentInfo?.sessionId; const parentToolId = subagentParentInfo?.toolCallId; @@ -617,13 +622,14 @@ export function processBatchedEvents( turnId, roundId, text, - contentType + contentType, + isThinkingEnd }); } } else { - const { sessionId, turnId, roundId, text, contentType } = payload; + const { sessionId, turnId, roundId, text, contentType, isThinkingEnd } = payload; if (contentType === 'thinking') { - processThinkingChunkInternal(context, sessionId, turnId, roundId, text); + processThinkingChunkInternal(context, sessionId, turnId, roundId, text, isThinkingEnd); } else { processNormalTextChunkInternal(context, sessionId, turnId, roundId, text); } @@ -641,7 +647,7 @@ export function processBatchedEvents( } } else { const { sessionId, turnId, toolEvent } = payload; - processToolParamsPartialInternal(context, sessionId, turnId, toolEvent); + processToolParamsPartialInternal(sessionId, turnId, toolEvent); } } else if (eventType === 'tool:progress') { if (isSubagent) { @@ -654,7 +660,7 @@ export function processBatchedEvents( } } else { const { sessionId, turnId, toolEvent } = payload; - processToolProgressInternal(context, sessionId, turnId, toolEvent); + processToolProgressInternal(sessionId, turnId, toolEvent); } } } @@ -668,10 +674,19 @@ export function processBatchedEvents( */ function handleToolEvent( context: FlowChatContext, - event: any, + event: { + sessionId: string; + turnId?: string; + toolEvent: FlowToolEvent; + subagentParentInfo?: SubagentParentInfo; + }, onTodoWriteResult: (sessionId: string, turnId: string, result: any) => void ): void { const { sessionId, turnId, toolEvent, subagentParentInfo } = event; + if (!turnId) { + log.debug('Tool event missing turnId', { sessionId, toolId: toolEvent.tool_id, eventType: toolEvent.event_type }); + return; + } const parentSessionId = subagentParentInfo?.sessionId; const parentToolId = subagentParentInfo?.toolCallId; @@ -704,8 +719,10 @@ function handleToolEvent( (existing, incoming) => ({ ...existing, toolEvent: { - ...existing.toolEvent, - params_partial: (existing.toolEvent.params_partial || '') + (incoming.toolEvent.params_partial || '') + ...(existing.toolEvent as ParamsPartialToolEvent), + params: + (existing.toolEvent as ParamsPartialToolEvent).params + + (incoming.toolEvent as ParamsPartialToolEvent).params } }) ); diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/SubagentModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/SubagentModule.ts index 509b092a..61d2faea 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/SubagentModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/SubagentModule.ts @@ -5,8 +5,8 @@ import { FlowChatStore } from '../../store/FlowChatStore'; import { createLogger } from '@/shared/utils/logger'; import type { FlowChatContext, FlowTextItem, SubagentTextChunkData, SubagentToolEventData } from './types'; -import { THINKING_END_MARKER } from './types'; import { processToolEvent } from './ToolEventModule'; +import type { ToolEventData } from '../EventBatcher'; const log = createLogger('SubagentModule'); @@ -49,9 +49,8 @@ export function routeTextChunkToToolCard( // Format: subagent-{type}-{parentToolId}-{sessionId}-{roundId} const itemId = `${itemPrefix}-${parentToolId}-${data.sessionId}-${data.roundId}`; - const hasEndMarker = isThinking && data.text.includes(THINKING_END_MARKER); - // Strip the end marker from the rendered content. - const cleanText = data.text.replace(THINKING_END_MARKER, ''); + const isThinkingEnd = isThinking && !!data.isThinkingEnd; + const textContent = data.text; const parentTurn = parentSession.dialogTurns.find(turn => turn.id === parentTurnId); let existingItem: FlowTextItem | import('../../types/flow-chat').FlowThinkingItem | null = null; @@ -67,9 +66,9 @@ export function routeTextChunkToToolCard( } if (existingItem) { - if (hasEndMarker) { + if (isThinkingEnd) { store.updateModelRoundItem(parentSessionId, parentTurnId, itemId, { - content: existingItem.content + cleanText, + content: existingItem.content + textContent, isStreaming: false, isCollapsed: true, status: 'completed', @@ -78,7 +77,7 @@ export function routeTextChunkToToolCard( } else { store.updateModelRoundItem(parentSessionId, parentTurnId, itemId, { - content: existingItem.content + cleanText, + content: existingItem.content + textContent, timestamp: Date.now() } as any); } @@ -91,11 +90,11 @@ export function routeTextChunkToToolCard( const newThinkingItem: import('../../types/flow-chat').FlowThinkingItem = { id: itemId, type: 'thinking', - content: cleanText, + content: textContent, timestamp: parentTimestamp + 1, - isStreaming: !hasEndMarker, - isCollapsed: hasEndMarker, - status: hasEndMarker ? 'completed' : 'streaming', + isStreaming: !isThinkingEnd, + isCollapsed: isThinkingEnd, + status: isThinkingEnd ? 'completed' : 'streaming', isSubagentItem: true, parentTaskToolId: parentToolId, subagentSessionId: data.sessionId @@ -106,7 +105,7 @@ export function routeTextChunkToToolCard( const newTextItem: FlowTextItem = { id: itemId, type: 'text', - content: cleanText, + content: textContent, timestamp: parentTimestamp + 1, isStreaming: true, status: 'streaming', @@ -182,6 +181,7 @@ export function routeTextChunkToToolCardInternal( roundId: string; text: string; contentType: string; + isThinkingEnd?: boolean; } ): void { routeTextChunkToToolCard(context, parentSessionId, parentToolId, chunkData); @@ -194,7 +194,7 @@ export function routeToolEventToToolCardInternal( context: FlowChatContext, parentSessionId: string, parentToolId: string, - eventData: any, + eventData: ToolEventData, onTodoWriteResult?: (sessionId: string, turnId: string, result: any) => void ): void { routeToolEventToToolCard(context, parentSessionId, parentToolId, eventData, onTodoWriteResult); diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts index b702d784..6ce4ccf0 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/TextChunkModule.ts @@ -2,10 +2,7 @@ * Handles streamed text chunks and thinking content. */ -import { FlowChatStore } from '../../store/FlowChatStore'; -import { parsePartialJson } from '../../../shared/utils/partialJsonParser'; import type { FlowChatContext, FlowTextItem } from './types'; -import { THINKING_END_MARKER } from './types'; /** * Process a normal text chunk without notifying the store. */ @@ -58,14 +55,14 @@ export function processNormalTextChunkInternal( /** * Process thinking chunks without notifying the store. - * Uses the marker to avoid ordering issues. */ export function processThinkingChunkInternal( context: FlowChatContext, sessionId: string, turnId: string, roundId: string, - text: string + text: string, + isThinkingEnd = false ): void { if (!context.contentBuffers.has(sessionId)) { context.contentBuffers.set(sessionId, new Map()); @@ -79,13 +76,9 @@ export function processThinkingChunkInternal( // Store thinking content under a separate key. const thinkingKey = `thinking_${roundId}`; - - const hasEndMarker = text.includes(THINKING_END_MARKER); - // Strip the end marker from stored content. - const cleanText = text.replace(THINKING_END_MARKER, ''); - + const currentContent = sessionContentBuffer.get(thinkingKey) || ''; - const cleanedContent = (currentContent + cleanText).replace(/\n{3,}/g, '\n\n'); + const cleanedContent = (currentContent + text).replace(/\n{3,}/g, '\n\n'); sessionContentBuffer.set(thinkingKey, cleanedContent); let thinkingItemId = sessionActiveTextItems.get(thinkingKey); @@ -97,22 +90,21 @@ export function processThinkingChunkInternal( id: thinkingItemId, type: 'thinking', content: cleanedContent, - isStreaming: !hasEndMarker, - isCollapsed: hasEndMarker, + isStreaming: !isThinkingEnd, + isCollapsed: isThinkingEnd, timestamp: Date.now(), - status: hasEndMarker ? 'completed' : 'streaming' + status: isThinkingEnd ? 'completed' : 'streaming' }; context.flowChatStore.addModelRoundItemSilent(sessionId, turnId, thinkingItem, roundId); sessionActiveTextItems.set(thinkingKey, thinkingItemId); - // Clear buffers once the end marker arrives. - if (hasEndMarker) { + if (isThinkingEnd) { sessionContentBuffer.delete(thinkingKey); sessionActiveTextItems.delete(thinkingKey); } } else { - if (hasEndMarker) { + if (isThinkingEnd) { context.flowChatStore.updateModelRoundItemSilent(sessionId, turnId, thinkingItemId, { content: cleanedContent, isStreaming: false, @@ -132,58 +124,6 @@ export function processThinkingChunkInternal( } } -/** - * Merge partial tool params without notifying the store. - */ -export function processToolParamsPartialInternal( - _context: FlowChatContext, - sessionId: string, - turnId: string, - toolEvent: any -): void { - const store = FlowChatStore.getInstance(); - const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); - - if (existingItem) { - const currentParams = (existingItem as any).parameters || {}; - let newParams = currentParams; - - if (toolEvent.params_partial) { - try { - const partialParams = parsePartialJson(toolEvent.params_partial); - newParams = { ...currentParams, ...partialParams }; - } catch { - // Ignore parse errors to keep streaming resilient. - } - } - - store.updateModelRoundItemSilent(sessionId, turnId, toolEvent.tool_id, { - parameters: newParams, - _rawParamsPartial: ((existingItem as any)._rawParamsPartial || '') + (toolEvent.params_partial || '') - } as any); - } -} - -/** - * Update tool progress without notifying the store. - */ -export function processToolProgressInternal( - _context: FlowChatContext, - sessionId: string, - turnId: string, - toolEvent: any -): void { - const store = FlowChatStore.getInstance(); - const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); - - if (existingItem) { - store.updateModelRoundItemSilent(sessionId, turnId, toolEvent.tool_id, { - _progressMessage: toolEvent.message, - _progressPercentage: toolEvent.percentage - } as any); - } -} - /** * Finalize streaming state for active text items. */ diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts index ebd107ff..beb625de 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/ToolEventModule.ts @@ -8,6 +8,17 @@ import { parsePartialJson } from '../../../shared/utils/partialJsonParser'; import { createLogger } from '@/shared/utils/logger'; import type { FlowChatContext, FlowToolItem, ToolEventOptions, DialogTurn } from './types'; import { immediateSaveDialogTurn } from './PersistenceModule'; +import type { + CancelledToolEvent, + CompletedToolEvent, + ConfirmationNeededToolEvent, + EarlyDetectedToolEvent, + FailedToolEvent, + FlowToolEvent, + ParamsPartialToolEvent, + ProgressToolEvent, + StartedToolEvent, +} from '../EventBatcher'; const log = createLogger('ToolEventModule'); @@ -19,7 +30,7 @@ export function processToolEvent( context: FlowChatContext, sessionId: string, turnId: string, - toolEvent: any, + toolEvent: FlowToolEvent, options?: ToolEventOptions, onTodoWriteResult?: (sessionId: string, turnId: string, result: any) => void ): void { @@ -50,26 +61,31 @@ export function processToolEvent( } case 'Started': { + flushPendingBatchedEvents(context); handleStarted(store, sessionId, turnId, dialogTurn, toolEvent, options); break; } case 'Completed': { + flushPendingBatchedEvents(context); handleCompleted(context, store, sessionId, turnId, toolEvent, options, onTodoWriteResult); break; } case 'Failed': { + flushPendingBatchedEvents(context); handleFailed(context, store, sessionId, turnId, toolEvent); break; } case 'Cancelled': { + flushPendingBatchedEvents(context); handleCancelled(context, store, sessionId, turnId, toolEvent); break; } case 'ConfirmationNeeded': { + flushPendingBatchedEvents(context); handleConfirmationNeeded(store, sessionId, turnId, toolEvent); break; } @@ -84,6 +100,121 @@ export function processToolEvent( } } +function flushPendingBatchedEvents(context: FlowChatContext): void { + if (context.eventBatcher.getBufferSize() > 0) { + context.eventBatcher.flushNow(); + } +} + +function updateToolItem( + store: FlowChatStore, + sessionId: string, + turnId: string, + toolId: string, + updates: Record, + silent = false +): void { + if (silent) { + store.updateModelRoundItemSilent(sessionId, turnId, toolId, updates as any); + return; + } + + store.updateModelRoundItem(sessionId, turnId, toolId, updates as any); +} + +function isTodoWriteSuccessResult(result: unknown): result is Record { + return typeof result === 'object' && result !== null && (result as { success?: unknown }).success === true; +} + +function isWriteLikeToolName(toolName: string): boolean { + return ['write', 'write_notebook', 'file_write', 'Write'].includes(toolName); +} + +function shouldIgnoreParamsPartial(status: FlowToolItem['status']): boolean { + return ['running', 'completed', 'error', 'cancelled', 'pending_confirmation', 'confirmed'].includes(status); +} + +function applyParamsPartial( + store: FlowChatStore, + sessionId: string, + turnId: string, + toolEvent: ParamsPartialToolEvent, + silent = false +): void { + const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); + + if (existingItem && existingItem.type === 'tool') { + const existingToolItem = existingItem as FlowToolItem; + if (shouldIgnoreParamsPartial(existingToolItem.status)) { + return; + } + + const prevBuffer = existingToolItem._paramsBuffer || ''; + const newBuffer = prevBuffer + (toolEvent.params || ''); + + let parsedParams: Record = {}; + try { + parsedParams = parsePartialJson(newBuffer); + } catch { + } + + const isWriteTool = isWriteLikeToolName(toolEvent.tool_name); + const isEditTool = ['edit', 'search_replace', 'Edit'].includes(toolEvent.tool_name); + const hasContentField = parsedParams && ('content' in parsedParams || 'contents' in parsedParams); + const hasNewString = parsedParams && 'new_string' in parsedParams; + + let status: 'streaming' | 'receiving' = 'streaming'; + if ((isWriteTool && hasContentField) || (isEditTool && hasNewString)) { + status = 'receiving'; + } + + updateToolItem(store, sessionId, turnId, toolEvent.tool_id, { + toolCall: { + input: parsedParams, + id: toolEvent.tool_id + }, + partialParams: parsedParams, + _paramsBuffer: newBuffer, + status, + isParamsStreaming: true, + _contentSize: hasContentField ? ((parsedParams.content || parsedParams.contents || '').length) : undefined + }, silent); + } +} + +function applyProgress( + store: FlowChatStore, + sessionId: string, + turnId: string, + toolEvent: ProgressToolEvent, + silent = false +): void { + const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); + + if (existingItem) { + updateToolItem(store, sessionId, turnId, toolEvent.tool_id, { + _progressMessage: toolEvent.message, + _progressPercentage: toolEvent.percentage + }, silent); + } +} + +export function processToolParamsPartialInternal( + sessionId: string, + turnId: string, + toolEvent: ParamsPartialToolEvent +): void { + applyParamsPartial(FlowChatStore.getInstance(), sessionId, turnId, toolEvent, true); +} + +export function processToolProgressInternal( + sessionId: string, + turnId: string, + toolEvent: ProgressToolEvent +): void { + applyProgress(FlowChatStore.getInstance(), sessionId, turnId, toolEvent, true); +} + /** * Handle tool early detection event */ @@ -93,10 +224,10 @@ function handleEarlyDetected( sessionId: string, turnId: string, dialogTurn: DialogTurn, - toolEvent: any, + toolEvent: EarlyDetectedToolEvent, options?: ToolEventOptions ): void { - context.eventBatcher.flushNow(); + flushPendingBatchedEvents(context); const shouldDisplayInMainFlow = toolEvent.tool_name === 'submit_code_review' || toolEvent.tool_name === 'AskUserQuestion'; @@ -150,42 +281,9 @@ function handleParamsPartial( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any + toolEvent: ParamsPartialToolEvent ): void { - const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); - - if (existingItem && existingItem.type === 'tool') { - const prevBuffer = (existingItem as FlowToolItem)._paramsBuffer || ''; - const newBuffer = prevBuffer + (toolEvent.params || ''); - - let parsedParams: Record = {}; - try { - parsedParams = parsePartialJson(newBuffer); - } catch (e) { - } - - const isWriteTool = ['write', 'write_notebook', 'file_write', 'Write'].includes(toolEvent.tool_name); - const isEditTool = ['edit', 'search_replace', 'Edit'].includes(toolEvent.tool_name); - const hasContentField = parsedParams && ('content' in parsedParams || 'contents' in parsedParams); - const hasNewString = parsedParams && 'new_string' in parsedParams; - - let status: 'streaming' | 'receiving' = 'streaming'; - if ((isWriteTool && hasContentField) || (isEditTool && hasNewString)) { - status = 'receiving'; - } - - store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, { - toolCall: { - input: parsedParams, - id: toolEvent.tool_id - }, - partialParams: parsedParams, - _paramsBuffer: newBuffer, - status, - isParamsStreaming: true, - _contentSize: hasContentField ? ((parsedParams.content || parsedParams.contents || '').length) : undefined - } as any); - } + applyParamsPartial(store, sessionId, turnId, toolEvent); } /** @@ -196,7 +294,7 @@ function handleStarted( sessionId: string, turnId: string, dialogTurn: DialogTurn, - toolEvent: any, + toolEvent: StartedToolEvent, options?: ToolEventOptions ): void { const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); @@ -257,26 +355,27 @@ function handleCompleted( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any, + toolEvent: CompletedToolEvent, options?: ToolEventOptions, onTodoWriteResult?: (sessionId: string, turnId: string, result: any) => void ): void { - if (!options?.isSubagent && toolEvent.tool_name === 'TodoWrite' && toolEvent.result?.success) { + if (!options?.isSubagent && toolEvent.tool_name === 'TodoWrite' && isTodoWriteSuccessResult(toolEvent.result)) { onTodoWriteResult?.(sessionId, turnId, toolEvent.result); } - const updates: any = { + const updates = { toolResult: { result: toolEvent.result, success: true, resultForAssistant: toolEvent.result_for_assistant, duration_ms: toolEvent.duration_ms }, - status: 'completed', + status: 'completed' as const, + isParamsStreaming: false, endTime: Date.now() }; - store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, updates); + store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, updates as any); immediateSaveDialogTurn(context, sessionId, turnId); } @@ -289,14 +388,13 @@ function handleFailed( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any + toolEvent: FailedToolEvent ): void { store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, { toolResult: { result: null, success: false, - error: toolEvent.error, - duration_ms: toolEvent.duration_ms + error: toolEvent.error }, status: 'error', endTime: Date.now() @@ -313,7 +411,7 @@ function handleCancelled( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any + toolEvent: CancelledToolEvent ): void { const existingToolItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); const currentStatus = existingToolItem?.status; @@ -323,8 +421,7 @@ function handleCancelled( toolResult: { result: null, success: false, - error: toolEvent.reason || 'User cancelled operation', - duration_ms: toolEvent.duration_ms + error: toolEvent.reason || 'User cancelled operation' }, status: finalStatus, endTime: Date.now() @@ -340,7 +437,7 @@ function handleConfirmationNeeded( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any + toolEvent: ConfirmationNeededToolEvent ): void { store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, { requiresConfirmation: true, @@ -355,16 +452,9 @@ function handleProgress( store: FlowChatStore, sessionId: string, turnId: string, - toolEvent: any + toolEvent: ProgressToolEvent ): void { - const existingItem = store.findToolItem(sessionId, turnId, toolEvent.tool_id); - - if (existingItem) { - store.updateModelRoundItem(sessionId, turnId, toolEvent.tool_id, { - _progressMessage: toolEvent.message, - _progressPercentage: toolEvent.percentage - } as any); - } + applyProgress(store, sessionId, turnId, toolEvent); } /** diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/index.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/index.ts index bc364933..e460b40d 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/index.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/index.ts @@ -16,8 +16,6 @@ export { export { processNormalTextChunkInternal, processThinkingChunkInternal, - processToolParamsPartialInternal, - processToolProgressInternal, completeActiveTextItems, cleanupSessionBuffers, clearAllBuffers @@ -25,6 +23,8 @@ export { export { processToolEvent, + processToolParamsPartialInternal, + processToolProgressInternal, handleToolExecutionProgress } from './ToolEventModule'; diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts index 1f3f2374..3575101a 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/types.ts @@ -5,6 +5,7 @@ import type { FlowChatStore } from '../../store/FlowChatStore'; import type { EventBatcher } from '../EventBatcher'; import type { processingStatusManager } from '../ProcessingStatusManager'; +import type { FlowToolEvent } from '../EventBatcher'; /** * Shared context for FlowChatManager modules. @@ -50,17 +51,13 @@ export interface SubagentTextChunkData { roundId: string; text: string; contentType: string; + isThinkingEnd?: boolean; } export interface SubagentToolEventData { sessionId: string; turnId: string; - toolEvent: any; + toolEvent: FlowToolEvent; } export type { SessionConfig, DialogTurn, ModelRound, FlowTextItem, FlowToolItem } from '../../types/flow-chat'; - -/** - * Thinking content end marker. - */ -export const THINKING_END_MARKER = ''; diff --git a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts index 75934bbd..53479050 100644 --- a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts +++ b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts @@ -126,7 +126,8 @@ export interface AgenticEvent { export interface TextChunkEvent extends AgenticEvent { roundId: string; text: string; - contentType?: 'text' | 'thinking'; + contentType?: 'text' | 'thinking'; + isThinkingEnd?: boolean; subagentParentInfo?: SubagentParentInfo; } diff --git a/src/web-ui/src/infrastructure/config/services/FrontendLogLevelSync.ts b/src/web-ui/src/infrastructure/config/services/FrontendLogLevelSync.ts new file mode 100644 index 00000000..ab9f3dd9 --- /dev/null +++ b/src/web-ui/src/infrastructure/config/services/FrontendLogLevelSync.ts @@ -0,0 +1,110 @@ +import { configAPI } from '@/infrastructure/api'; +import { LogLevel, createLogger, logger } from '@/shared/utils/logger'; +import type { BackendLogLevel } from '../types'; +import { configManager } from './ConfigManager'; + +const log = createLogger('FrontendLogLevelSync'); +const LOGGING_LEVEL_PATH = 'app.logging.level'; + +let initialized = false; + +function toFrontendLogLevel(level: string | null | undefined): LogLevel | null { + switch (level?.trim().toLowerCase()) { + case 'trace': + return LogLevel.TRACE; + case 'debug': + return LogLevel.DEBUG; + case 'info': + return LogLevel.INFO; + case 'warn': + return LogLevel.WARN; + case 'error': + return LogLevel.ERROR; + case 'off': + return LogLevel.NONE; + default: + return null; + } +} + +function toBackendLogLevel(level: LogLevel): BackendLogLevel { + switch (level) { + case LogLevel.TRACE: + return 'trace'; + case LogLevel.DEBUG: + return 'debug'; + case LogLevel.INFO: + return 'info'; + case LogLevel.WARN: + return 'warn'; + case LogLevel.ERROR: + return 'error'; + case LogLevel.NONE: + return 'off'; + } +} + +function applyFrontendLogLevel(level: string | null | undefined, source: string): void { + const nextLevel = toFrontendLogLevel(level); + if (nextLevel === null) { + if (level) { + log.warn('Ignoring invalid frontend log level', { level, source }); + } + return; + } + + const previousLevel = logger.getLevel(); + if (previousLevel === nextLevel) { + return; + } + + logger.setLevel(nextLevel); + log.info('Frontend log level updated', { + oldLevel: toBackendLogLevel(previousLevel), + newLevel: toBackendLogLevel(nextLevel), + source, + }); +} + +async function resolveInitialLogLevel(): Promise { + const [savedLevelResult, runtimeInfoResult] = await Promise.allSettled([ + configManager.getConfig(LOGGING_LEVEL_PATH), + configAPI.getRuntimeLoggingInfo(), + ]); + + if (savedLevelResult.status === 'fulfilled' && toFrontendLogLevel(savedLevelResult.value) !== null) { + return savedLevelResult.value; + } + + if (runtimeInfoResult.status === 'fulfilled') { + const runtimeLevel = runtimeInfoResult.value?.effectiveLevel; + if (toFrontendLogLevel(runtimeLevel) !== null) { + return runtimeLevel; + } + } + + return undefined; +} + +export async function initializeFrontendLogLevelSync(): Promise { + if (initialized) { + return; + } + + initialized = true; + + configManager.onConfigChange((path, _oldValue, newValue) => { + if (path !== LOGGING_LEVEL_PATH) { + return; + } + + applyFrontendLogLevel(typeof newValue === 'string' ? newValue : undefined, 'config_change'); + }); + + try { + const initialLevel = await resolveInitialLogLevel(); + applyFrontendLogLevel(initialLevel, 'startup'); + } catch (error) { + log.error('Failed to initialize frontend log level sync', error); + } +} diff --git a/src/web-ui/src/locales/en-US/settings/logging.json b/src/web-ui/src/locales/en-US/settings/logging.json index d532f0a0..da517272 100644 --- a/src/web-ui/src/locales/en-US/settings/logging.json +++ b/src/web-ui/src/locales/en-US/settings/logging.json @@ -6,7 +6,7 @@ "path": "Startup Log Path" }, "level": { - "description": "Controls backend log verbosity in real time. Changes apply immediately." + "description": "Controls log verbosity in real time. Changes apply immediately." }, "levels": { "trace": "Trace", diff --git a/src/web-ui/src/locales/zh-CN/settings/logging.json b/src/web-ui/src/locales/zh-CN/settings/logging.json index ee3a78a9..49589ae6 100644 --- a/src/web-ui/src/locales/zh-CN/settings/logging.json +++ b/src/web-ui/src/locales/zh-CN/settings/logging.json @@ -6,7 +6,7 @@ "path": "本次启动日志路径" }, "level": { - "description": "实时控制后端日志输出级别,修改后立即生效。" + "description": "实时控制日志输出级别,修改后立即生效。" }, "levels": { "trace": "Trace", diff --git a/src/web-ui/src/main.tsx b/src/web-ui/src/main.tsx index 2e558b39..0e280eab 100644 --- a/src/web-ui/src/main.tsx +++ b/src/web-ui/src/main.tsx @@ -141,8 +141,6 @@ loader.config({ } }); -log.debug('Monaco loader configured', { vs: monacoPath, isDev }); - // Debug: check resource availability in production. if (!isDev) { // Delay checks to avoid blocking startup. @@ -186,9 +184,14 @@ async function initializeApp() { // Initialize logger first (attaches console in dev mode) const { initLogger } = await import('./shared/utils/logger'); await initLogger(); - + + // Sync frontend logger with app.logging.level before startup logs. + const { initializeFrontendLogLevelSync } = await import('./infrastructure/config/services/FrontendLogLevelSync'); + await initializeFrontendLogLevelSync(); + + log.debug('Monaco loader configured', { vs: monacoPath, isDev }); log.info('Initializing BitFun'); - + // Synchronous initialization: core systems that must run first. const { registerDefaultContextTypes } = await import('./shared/context-system/core/registerDefaultTypes'); registerDefaultContextTypes();