diff --git a/astrbot/core/platform/sources/webchat/webchat_adapter.py b/astrbot/core/platform/sources/webchat/webchat_adapter.py index 54718fefb0..26b434573f 100644 --- a/astrbot/core/platform/sources/webchat/webchat_adapter.py +++ b/astrbot/core/platform/sources/webchat/webchat_adapter.py @@ -1,7 +1,6 @@ import asyncio import os import time -import uuid from collections.abc import Callable, Coroutine from pathlib import Path from typing import Any @@ -92,32 +91,37 @@ async def send_by_session( active_request_ids = self._webchat_queue_mgr.list_back_request_ids( conversation_id ) - subscription_request_ids = [ - req_id for req_id in active_request_ids if req_id.startswith("ws_sub_") + stream_request_ids = [ + req_id for req_id in active_request_ids if not req_id.startswith("ws_sub_") ] - target_request_ids = subscription_request_ids or active_request_ids - - if target_request_ids: - for request_id in target_request_ids: - await WebChatMessageEvent._send( - request_id, - message_chain, - session.session_id, + target_request_ids = stream_request_ids or active_request_ids + + if not target_request_ids: + # No active streams to consume this proactive message. + # Persist directly and return to avoid creating an unused queue. + try: + await self._save_proactive_message(conversation_id, message_chain) + except Exception as e: + logger.error( + f"[WebChatAdapter] Failed to save proactive message: {e}", + exc_info=True, ) - else: - message_id = f"active_{uuid.uuid4()!s}" + await super().send_by_session(session, message_chain) + return + + for request_id in target_request_ids: await WebChatMessageEvent._send( - message_id, + request_id, message_chain, session.session_id, + streaming=True, + emit_complete=True, ) - should_persist = ( - bool(subscription_request_ids) - or not active_request_ids - or all(req_id.startswith("active_") for req_id in active_request_ids) - ) - if should_persist: + # If only passive subscription queues exist for this conversation, + # keep a proactive save as a fallback since they are not tied to + # the normal streaming persistence path. + if not stream_request_ids: try: await self._save_proactive_message(conversation_id, message_chain) except Exception as e: diff --git a/astrbot/core/platform/sources/webchat/webchat_event.py b/astrbot/core/platform/sources/webchat/webchat_event.py index b7da864aae..bc1e1a6bcd 100644 --- a/astrbot/core/platform/sources/webchat/webchat_event.py +++ b/astrbot/core/platform/sources/webchat/webchat_event.py @@ -34,6 +34,7 @@ async def _send( message: MessageChain | None, session_id: str, streaming: bool = False, + emit_complete: bool = False, ) -> str | None: request_id = str(message_id) conversation_id = _extract_conversation_id(session_id) @@ -127,6 +128,17 @@ async def _send( else: logger.debug(f"webchat 忽略: {comp.type}") + if emit_complete: + await web_chat_back_queue.put( + { + "type": "complete", + "data": data, + "streaming": streaming, + "chain_type": message.type, + "message_id": message_id, + }, + ) + return data async def send(self, message: MessageChain | None) -> None: diff --git a/astrbot/dashboard/routes/open_api.py b/astrbot/dashboard/routes/open_api.py index 9a736b1763..c1b290ae15 100644 --- a/astrbot/dashboard/routes/open_api.py +++ b/astrbot/dashboard/routes/open_api.py @@ -455,7 +455,10 @@ async def _handle_chat_ws_send(self, post_data: dict) -> None: if msg_type == "end": break if (streaming and msg_type == "complete") or not streaming: - if chain_type in ("tool_call", "tool_call_result"): + if chain_type in ( + "tool_call", + "tool_call_result", + ): continue try: refs = self.chat_route._extract_web_search_refs( diff --git a/dashboard/src/composables/useMessages.ts b/dashboard/src/composables/useMessages.ts index c593fb283b..52ef54e90a 100644 --- a/dashboard/src/composables/useMessages.ts +++ b/dashboard/src/composables/useMessages.ts @@ -91,6 +91,15 @@ type WsStreamContext = { const STREAMING_STORAGE_KEY = 'enableStreaming'; const TRANSPORT_MODE_STORAGE_KEY = 'chatTransportMode'; +const HIDDEN_TOOL_CALL_NAMES = new Set(['send_message_to_user']); + +function isHiddenToolCall(toolCall: ToolCall | { name?: unknown } | null | undefined): boolean { + if (!toolCall || typeof toolCall !== 'object') { + return false; + } + const name = toolCall.name; + return typeof name === 'string' && HIDDEN_TOOL_CALL_NAMES.has(name); +} export function useMessages( currSessionId: Ref, @@ -489,6 +498,9 @@ export function useMessages( } catch { return; } + if (isHiddenToolCall(toolCallData)) { + return; + } const toolCall: ToolCall = { id: toolCallData.id, @@ -528,6 +540,9 @@ export function useMessages( } catch { return; } + if (isHiddenToolCall(resultData)) { + return; + } if (messageObj) { for (const part of messageObj.message) { @@ -658,7 +673,18 @@ export function useMessages( // 如果 message 是数组 (新格式),遍历并填充 embedded 字段 if (Array.isArray(message)) { + const filteredMessage: MessagePart[] = []; for (const part of message as MessagePart[]) { + if (part.type === 'tool_call' && Array.isArray(part.tool_calls)) { + const visibleToolCalls = part.tool_calls.filter( + (toolCall) => !isHiddenToolCall(toolCall), + ); + if (!visibleToolCalls.length) { + continue; + } + part.tool_calls = visibleToolCalls; + } + if (part.type === 'image' && part.attachment_id) { part.embedded_url = await getAttachment(part.attachment_id); } else if (part.type === 'record' && part.attachment_id) { @@ -671,7 +697,9 @@ export function useMessages( }; } // plain, reply, tool_call, video 保持原样 + filteredMessage.push(part); } + content.message = filteredMessage; } // 处理 agent_stats (snake_case -> camelCase)