Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions astrbot/core/platform/sources/webchat/webchat_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import os
import time
import uuid
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -92,32 +91,37 @@ async def send_by_session(
active_request_ids = self._webchat_queue_mgr.list_back_request_ids(
conversation_id
)
subscription_request_ids = [
req_id for req_id in active_request_ids if req_id.startswith("ws_sub_")
stream_request_ids = [
req_id for req_id in active_request_ids if not req_id.startswith("ws_sub_")
]
target_request_ids = subscription_request_ids or active_request_ids

if target_request_ids:
for request_id in target_request_ids:
await WebChatMessageEvent._send(
request_id,
message_chain,
session.session_id,
target_request_ids = stream_request_ids or active_request_ids
Comment thread
Soulter marked this conversation as resolved.

if not target_request_ids:
# No active streams to consume this proactive message.
# Persist directly and return to avoid creating an unused queue.
try:
await self._save_proactive_message(conversation_id, message_chain)
except Exception as e:
logger.error(
f"[WebChatAdapter] Failed to save proactive message: {e}",
exc_info=True,
)
else:
message_id = f"active_{uuid.uuid4()!s}"
await super().send_by_session(session, message_chain)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the scenario where there are no active streams to consume a proactive message, the message is first saved via _save_proactive_message (lines 102-108) and then super().send_by_session(session, message_chain) is called. If the base send_by_session method also handles message persistence, this could lead to duplicate message storage. Please clarify if the base method's persistence is idempotent or if this could result in redundant entries.

return

for request_id in target_request_ids:
await WebChatMessageEvent._send(
message_id,
request_id,
message_chain,
session.session_id,
streaming=True,
emit_complete=True,
)

should_persist = (
bool(subscription_request_ids)
or not active_request_ids
or all(req_id.startswith("active_") for req_id in active_request_ids)
)
if should_persist:
# If only passive subscription queues exist for this conversation,
# keep a proactive save as a fallback since they are not tied to
# the normal streaming persistence path.
if not stream_request_ids:
try:
await self._save_proactive_message(conversation_id, message_chain)
except Exception as e:
Expand Down
12 changes: 12 additions & 0 deletions astrbot/core/platform/sources/webchat/webchat_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async def _send(
message: MessageChain | None,
session_id: str,
streaming: bool = False,
emit_complete: bool = False,
) -> str | None:
request_id = str(message_id)
conversation_id = _extract_conversation_id(session_id)
Expand Down Expand Up @@ -127,6 +128,17 @@ async def _send(
else:
logger.debug(f"webchat 忽略: {comp.type}")

if emit_complete:
await web_chat_back_queue.put(
{
"type": "complete",
"data": data,
"streaming": streaming,
"chain_type": message.type,
"message_id": message_id,
},
)
Comment on lines +131 to +140
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The data field within the complete event is currently set to the data from the last processed message component (comp). If a message chain consists of multiple components (e.g., plain text followed by an image), this data might not accurately represent the full or most relevant "final data" for the entire message chain. Consider accumulating all plain text or providing a more comprehensive summary of the message chain for the data field in the complete event, or ensure the client-side handling accounts for this behavior.


return data

async def send(self, message: MessageChain | None) -> None:
Expand Down
5 changes: 4 additions & 1 deletion astrbot/dashboard/routes/open_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ async def _handle_chat_ws_send(self, post_data: dict) -> None:
if msg_type == "end":
break
if (streaming and msg_type == "complete") or not streaming:
if chain_type in ("tool_call", "tool_call_result"):
if chain_type in (
"tool_call",
"tool_call_result",
):
continue
try:
refs = self.chat_route._extract_web_search_refs(
Expand Down
28 changes: 28 additions & 0 deletions dashboard/src/composables/useMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ type WsStreamContext = {

const STREAMING_STORAGE_KEY = 'enableStreaming';
const TRANSPORT_MODE_STORAGE_KEY = 'chatTransportMode';
const HIDDEN_TOOL_CALL_NAMES = new Set(['send_message_to_user']);

function isHiddenToolCall(toolCall: ToolCall | { name?: unknown } | null | undefined): boolean {
if (!toolCall || typeof toolCall !== 'object') {
return false;
}
const name = toolCall.name;
return typeof name === 'string' && HIDDEN_TOOL_CALL_NAMES.has(name);
}

export function useMessages(
currSessionId: Ref<string>,
Expand Down Expand Up @@ -489,6 +498,9 @@ export function useMessages(
} catch {
return;
}
if (isHiddenToolCall(toolCallData)) {
return;
}

const toolCall: ToolCall = {
id: toolCallData.id,
Expand Down Expand Up @@ -528,6 +540,9 @@ export function useMessages(
} catch {
return;
}
if (isHiddenToolCall(resultData)) {
return;
}

if (messageObj) {
for (const part of messageObj.message) {
Expand Down Expand Up @@ -658,7 +673,18 @@ export function useMessages(

// 如果 message 是数组 (新格式),遍历并填充 embedded 字段
if (Array.isArray(message)) {
const filteredMessage: MessagePart[] = [];
for (const part of message as MessagePart[]) {
if (part.type === 'tool_call' && Array.isArray(part.tool_calls)) {
const visibleToolCalls = part.tool_calls.filter(
(toolCall) => !isHiddenToolCall(toolCall),
);
if (!visibleToolCalls.length) {
continue;
}
part.tool_calls = visibleToolCalls;
}

if (part.type === 'image' && part.attachment_id) {
part.embedded_url = await getAttachment(part.attachment_id);
} else if (part.type === 'record' && part.attachment_id) {
Expand All @@ -671,7 +697,9 @@ export function useMessages(
};
}
// plain, reply, tool_call, video 保持原样
filteredMessage.push(part);
}
content.message = filteredMessage;
}

// 处理 agent_stats (snake_case -> camelCase)
Expand Down
Loading