From b354831ba2a17353ffc31916b72dcf4c80504d5a Mon Sep 17 00:00:00 2001 From: camera-2018 <40380042+camera-2018@users.noreply.github.com> Date: Wed, 4 Mar 2026 00:59:51 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat(telegram):=20=E4=BD=BF=E7=94=A8=20send?= =?UTF-8?q?MessageDraft=20API=20=E5=AE=9E=E7=8E=B0=E7=A7=81=E8=81=8A?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 _send_message_draft 方法封装 Telegram Bot API sendMessageDraft - 私聊流式输出使用 sendMessageDraft 推送草稿动画,群聊保留 edit_message_text 回退 - 使用独立异步发送循环 (_draft_sender_loop) 按固定间隔推送最新缓冲区内容, 完全解耦 token 到达速度与 API 网络延迟 - 流式结束后发送真实消息保留最终内容(draft 是临时的) - 使用模块级递增 draft_id 替代随机生成,确保 Telegram 端动画连续性 --- .../platform/sources/telegram/tg_event.py | 226 +++++++++++++++++- pyproject.toml | 2 +- requirements.txt | 2 +- tests/fixtures/mocks/telegram.py | 1 + 4 files changed, 227 insertions(+), 4 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index ade72f1107..1fdf3c9440 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -22,6 +22,19 @@ ) from astrbot.api.platform import AstrBotMessage, MessageType, PlatformMetadata +# sendMessageDraft 的 draft_id 模块级递增计数器(溢出时归 1) +_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647 +_next_draft_id = 0 + + +def _allocate_draft_id() -> int: + """分配一个全局递增的 draft_id,溢出时归 1。""" + global _next_draft_id + _next_draft_id = ( + 1 if _next_draft_id >= _TELEGRAM_DRAFT_ID_MAX else _next_draft_id + 1 + ) + return _next_draft_id + class TelegramPlatformEvent(AstrMessageEvent): # Telegram 的最大消息长度限制 @@ -339,6 +352,44 @@ async def react(self, emoji: str | None, big: bool = False) -> None: except Exception as e: logger.error(f"[Telegram] 添加反应失败: {e}") + async def _send_message_draft( + self, + chat_id: str, + draft_id: int, + text: str, + message_thread_id: str | None = None, + parse_mode: str | None = None, + ) -> None: + """通过 Bot.send_message_draft 发送草稿消息(流式推送部分消息)。 + + 该 API 仅支持私聊。 + + Args: + chat_id: 目标私聊的 chat_id + draft_id: 草稿唯一标识,非零整数;相同 draft_id 的变更会以动画展示 + text: 消息文本,1-4096 字符 + message_thread_id: 可选,目标消息线程 ID + parse_mode: 可选,消息文本的解析模式 + """ + kwargs: dict[str, Any] = {} + if message_thread_id: + kwargs["message_thread_id"] = int(message_thread_id) + if parse_mode: + kwargs["parse_mode"] = parse_mode + + try: + logger.debug( + f"[Telegram] sendMessageDraft: chat_id={chat_id}, draft_id={draft_id}, text_len={len(text)}" + ) + await self.client.send_message_draft( + chat_id=int(chat_id), + draft_id=draft_id, + text=text, + **kwargs, + ) + except Exception as e: + logger.warning(f"[Telegram] sendMessageDraft 失败: {e!s}") + async def send_streaming(self, generator, use_fallback: bool = False): message_thread_id = None @@ -356,6 +407,179 @@ async def send_streaming(self, generator, use_fallback: bool = False): if message_thread_id: payload["message_thread_id"] = message_thread_id + # sendMessageDraft 仅支持私聊 + is_private = self.get_message_type() != MessageType.GROUP_MESSAGE + + if is_private: + logger.info("[Telegram] 流式输出: 使用 sendMessageDraft (私聊)") + await self._send_streaming_draft( + user_name, message_thread_id, payload, generator + ) + else: + logger.info("[Telegram] 流式输出: 使用 edit_message_text fallback (群聊)") + await self._send_streaming_edit( + user_name, message_thread_id, payload, generator + ) + + return await super().send_streaming(generator, use_fallback) + + async def _send_streaming_draft( + self, + user_name: str, + message_thread_id: str | None, + payload: dict[str, Any], + generator, + ) -> None: + """使用 sendMessageDraft API 进行流式推送(私聊专用)。 + + 流式过程中使用 sendMessageDraft 推送草稿动画, + 流式结束后发送一条真实消息保留最终内容(draft 是临时的,会消失)。 + 使用独立的异步发送循环,按固定间隔发送最新缓冲区内容, + 完全解耦 token 到达速度与 API 网络延迟。 + """ + draft_id = _allocate_draft_id() + delta = "" + last_sent_text = "" + send_interval = 0.5 # 独立发送循环间隔 (秒) + streaming_done = False # 信号:生成器已结束 + + async def _draft_sender_loop() -> None: + """独立的草稿发送循环,按固定间隔发送最新内容。""" + nonlocal last_sent_text + while not streaming_done: + await asyncio.sleep(send_interval) + if delta and delta != last_sent_text: + draft_text = delta[: self.MAX_MESSAGE_LENGTH] + if draft_text != last_sent_text: + try: + await self._send_message_draft( + user_name, + draft_id, + draft_text, + message_thread_id, + ) + last_sent_text = draft_text + except Exception: + pass # 草稿发送失败不影响流式 + + # 启动独立发送循环 + sender_task = asyncio.create_task(_draft_sender_loop()) + + try: + async for chain in generator: + if isinstance(chain, MessageChain): + if chain.type == "break": + # 分割符:停止发送循环,发送真实消息,重置状态 + streaming_done = True + await sender_task + if delta: + try: + markdown_text = telegramify_markdown.markdownify( + delta, + normalize_whitespace=False, + ) + await self.client.send_message( + text=markdown_text, + parse_mode="MarkdownV2", + **cast(Any, payload), + ) + except Exception as e: + logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") + await self.client.send_message( + text=delta, **cast(Any, payload) + ) + # 重置并启动新的发送循环 + delta = "" + last_sent_text = "" + draft_id = _allocate_draft_id() + streaming_done = False + sender_task = asyncio.create_task(_draft_sender_loop()) + continue + + # 处理消息链中的每个组件 + for i in chain.chain: + if isinstance(i, Plain): + delta += i.text + elif isinstance(i, Image): + image_path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_PHOTO, + self.client.send_photo, + user_name=user_name, + photo=image_path, + **cast(Any, payload), + ) + continue + elif isinstance(i, File): + path = await i.get_file() + name = i.name or os.path.basename(path) + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_DOCUMENT, + self.client.send_document, + user_name=user_name, + document=path, + filename=name, + **cast(Any, payload), + ) + continue + elif isinstance(i, Record): + path = await i.convert_to_file_path() + await self._send_voice_with_fallback( + self.client, + path, + payload, + caption=i.text or delta or None, + user_name=user_name, + message_thread_id=message_thread_id, + use_media_action=True, + ) + continue + elif isinstance(i, Video): + path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_VIDEO, + self.client.send_video, + user_name=user_name, + video=path, + **cast(Any, payload), + ) + continue + else: + logger.warning(f"不支持的消息类型: {type(i)}") + continue + finally: + # 停止发送循环 + streaming_done = True + if not sender_task.done(): + await sender_task + + # 流式结束:发送真实消息保留最终内容 + if delta: + try: + markdown_text = telegramify_markdown.markdownify( + delta, + normalize_whitespace=False, + ) + await self.client.send_message( + text=markdown_text, + parse_mode="MarkdownV2", + **cast(Any, payload), + ) + except Exception as e: + logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") + await self.client.send_message(text=delta, **cast(Any, payload)) + + async def _send_streaming_edit( + self, + user_name: str, + message_thread_id: str | None, + payload: dict[str, Any], + generator, + ) -> None: + """使用 send_message + edit_message_text 进行流式推送(群聊 fallback)。""" delta = "" current_content = "" message_id = None @@ -506,5 +730,3 @@ async def send_streaming(self, generator, use_fallback: bool = False): ) except Exception as e: logger.warning(f"编辑消息失败(streaming): {e!s}") - - return await super().send_streaming(generator, use_fallback) diff --git a/pyproject.toml b/pyproject.toml index e57a0216c3..d981c24708 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "pydantic>=2.12.5", "pydub>=0.25.1", "pyjwt>=2.10.1", - "python-telegram-bot>=22.0", + "python-telegram-bot>=22.6", "qq-botpy>=1.2.1", "quart>=0.20.0", "readability-lxml>=0.8.4.1", diff --git a/requirements.txt b/requirements.txt index c06c1d0f29..d76a11ddeb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,7 +32,7 @@ py-cord>=2.6.1 pydantic>=2.12.5 pydub>=0.25.1 pyjwt>=2.10.1 -python-telegram-bot>=22.0 +python-telegram-bot>=22.6 qq-botpy>=1.2.1 quart>=0.20.0 readability-lxml>=0.8.4.1 diff --git a/tests/fixtures/mocks/telegram.py b/tests/fixtures/mocks/telegram.py index fbe4d04364..904ec4d093 100644 --- a/tests/fixtures/mocks/telegram.py +++ b/tests/fixtures/mocks/telegram.py @@ -110,6 +110,7 @@ def create_bot(): bot.set_my_commands = AsyncMock() bot.set_message_reaction = AsyncMock() bot.edit_message_text = AsyncMock() + bot.send_message_draft = AsyncMock() return bot @staticmethod From cd0c14cdfd07b2b1e1f37e3b99bb4dabc93df99e Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Wed, 4 Mar 2026 18:35:58 +0800 Subject: [PATCH 2/5] fix(telegram): convert draft text to Markdown before sending message draft --- astrbot/core/platform/sources/telegram/tg_event.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index 1fdf3c9440..ea4e1cda48 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -452,11 +452,16 @@ async def _draft_sender_loop() -> None: draft_text = delta[: self.MAX_MESSAGE_LENGTH] if draft_text != last_sent_text: try: + markdown_text = telegramify_markdown.markdownify( + draft_text, + normalize_whitespace=False, + ) await self._send_message_draft( user_name, draft_id, - draft_text, + markdown_text, message_thread_id, + parse_mode="MarkdownV2", ) last_sent_text = draft_text except Exception: From 97f87408e3f261f51b2a83b315a99b5db05080c9 Mon Sep 17 00:00:00 2001 From: camera-2018 <40380042+camera-2018@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:08:33 +0800 Subject: [PATCH 3/5] =?UTF-8?q?chore(telegram):=20telegram=20=E9=80=82?= =?UTF-8?q?=E9=85=8D=E5=99=A8=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 提取公共方法 - 有新 token 到达时触发流式 - 生成结束后清除draft内容 - 默认draft发送md格式 --- .../platform/sources/telegram/tg_event.py | 446 ++++++++---------- 1 file changed, 206 insertions(+), 240 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index ea4e1cda48..d19ba2e14a 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -1,7 +1,7 @@ import asyncio import os import re -from typing import Any, cast +from typing import Any, Callable, cast import telegramify_markdown from telegram import ReactionTypeCustomEmoji, ReactionTypeEmoji @@ -10,6 +10,7 @@ from telegram.ext import ExtBot from astrbot import logger +from astrbot.core.utils.metrics import Metric from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.message_components import ( At, @@ -22,19 +23,6 @@ ) from astrbot.api.platform import AstrBotMessage, MessageType, PlatformMetadata -# sendMessageDraft 的 draft_id 模块级递增计数器(溢出时归 1) -_TELEGRAM_DRAFT_ID_MAX = 2_147_483_647 -_next_draft_id = 0 - - -def _allocate_draft_id() -> int: - """分配一个全局递增的 draft_id,溢出时归 1。""" - global _next_draft_id - _next_draft_id = ( - 1 if _next_draft_id >= _TELEGRAM_DRAFT_ID_MAX else _next_draft_id + 1 - ) - return _next_draft_id - class TelegramPlatformEvent(AstrMessageEvent): # Telegram 的最大消息长度限制 @@ -47,6 +35,20 @@ class TelegramPlatformEvent(AstrMessageEvent): "word": re.compile(r"\s"), } + # sendMessageDraft 的 draft_id 类级递增计数器 + _TELEGRAM_DRAFT_ID_MAX = 2_147_483_647 + _next_draft_id: int = 0 + + @classmethod + def _allocate_draft_id(cls) -> int: + """分配一个递增的 draft_id,溢出时归 1。""" + cls._next_draft_id = ( + 1 + if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX + else cls._next_draft_id + 1 + ) + return cls._next_draft_id + # 消息类型到 chat action 的映射,用于优先级判断 ACTION_BY_TYPE: dict[type, str] = { Record: ChatAction.UPLOAD_VOICE, @@ -390,6 +392,80 @@ async def _send_message_draft( except Exception as e: logger.warning(f"[Telegram] sendMessageDraft 失败: {e!s}") + async def _process_chain_items( + self, + chain: MessageChain, + payload: dict[str, Any], + user_name: str, + message_thread_id: str | None, + on_text: Callable[[str], None], + ) -> None: + """处理 MessageChain 中的各类组件,文本通过 on_text 回调追加,媒体直接发送。""" + for i in chain.chain: + if isinstance(i, Plain): + on_text(i.text) + elif isinstance(i, Image): + image_path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_PHOTO, + self.client.send_photo, + user_name=user_name, + photo=image_path, + **cast(Any, payload), + ) + elif isinstance(i, File): + path = await i.get_file() + name = i.name or os.path.basename(path) + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_DOCUMENT, + self.client.send_document, + user_name=user_name, + document=path, + filename=name, + **cast(Any, payload), + ) + elif isinstance(i, Record): + path = await i.convert_to_file_path() + await self._send_voice_with_fallback( + self.client, + path, + payload, + caption=i.text or None, + user_name=user_name, + message_thread_id=message_thread_id, + use_media_action=True, + ) + elif isinstance(i, Video): + path = await i.convert_to_file_path() + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_VIDEO, + self.client.send_video, + user_name=user_name, + video=path, + **cast(Any, payload), + ) + else: + logger.warning(f"不支持的消息类型: {type(i)}") + + async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None: + """将累积文本作为 MarkdownV2 真实消息发送,失败时回退到纯文本。""" + try: + markdown_text = telegramify_markdown.markdownify( + delta, + normalize_whitespace=False, + ) + await self.client.send_message( + text=markdown_text, + parse_mode="MarkdownV2", + **cast(Any, payload), + ) + except Exception as e: + logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") + await self.client.send_message(text=delta, **cast(Any, payload)) + async def send_streaming(self, generator, use_fallback: bool = False): message_thread_id = None @@ -407,8 +483,8 @@ async def send_streaming(self, generator, use_fallback: bool = False): if message_thread_id: payload["message_thread_id"] = message_thread_id - # sendMessageDraft 仅支持私聊 - is_private = self.get_message_type() != MessageType.GROUP_MESSAGE + # sendMessageDraft 仅支持私聊(显式检查 FRIEND_MESSAGE) + is_private = self.get_message_type() == MessageType.FRIEND_MESSAGE if is_private: logger.info("[Telegram] 流式输出: 使用 sendMessageDraft (私聊)") @@ -421,7 +497,11 @@ async def send_streaming(self, generator, use_fallback: bool = False): user_name, message_thread_id, payload, generator ) - return await super().send_streaming(generator, use_fallback) + # 内联父类 send_streaming 的副作用(避免传入已消费的 generator) + asyncio.create_task( + Metric.upload(msg_event_tick=1, adapter_name=self.platform_meta.name), + ) + self._has_send_oper = True async def _send_streaming_draft( self, @@ -434,148 +514,88 @@ async def _send_streaming_draft( 流式过程中使用 sendMessageDraft 推送草稿动画, 流式结束后发送一条真实消息保留最终内容(draft 是临时的,会消失)。 - 使用独立的异步发送循环,按固定间隔发送最新缓冲区内容, - 完全解耦 token 到达速度与 API 网络延迟。 + 使用信号驱动的发送循环:每次有新 token 到达时唤醒发送, + 发送频率由网络 RTT 自然限制(最多一个请求 in-flight)。 """ - draft_id = _allocate_draft_id() + draft_id = self._allocate_draft_id() delta = "" last_sent_text = "" - send_interval = 0.5 # 独立发送循环间隔 (秒) - streaming_done = False # 信号:生成器已结束 + done = False # 信号:生成器已结束 + text_changed = asyncio.Event() # 有新 token 到达时触发 async def _draft_sender_loop() -> None: - """独立的草稿发送循环,按固定间隔发送最新内容。""" + """信号驱动的草稿发送循环,有新内容就发,RTT 自然限流。""" nonlocal last_sent_text - while not streaming_done: - await asyncio.sleep(send_interval) + while not done: + await text_changed.wait() + text_changed.clear() + # 发送最新的缓冲区内容(MarkdownV2 渲染,与真实消息一致) if delta and delta != last_sent_text: draft_text = delta[: self.MAX_MESSAGE_LENGTH] if draft_text != last_sent_text: try: - markdown_text = telegramify_markdown.markdownify( - draft_text, - normalize_whitespace=False, + md = telegramify_markdown.markdownify( + draft_text, normalize_whitespace=False, ) await self._send_message_draft( - user_name, - draft_id, - markdown_text, - message_thread_id, - parse_mode="MarkdownV2", + user_name, draft_id, md, + message_thread_id, parse_mode="MarkdownV2", ) last_sent_text = draft_text - except Exception: - pass # 草稿发送失败不影响流式 + except Exception as e: + # markdownify 对未闭合语法可能失败,回退纯文本 + try: + await self._send_message_draft( + user_name, draft_id, draft_text, + message_thread_id, + ) + last_sent_text = draft_text + except Exception as e2: + logger.debug( + f"[Telegram] sendMessageDraft failed (ignored): {e2!s}" + ) - # 启动独立发送循环 sender_task = asyncio.create_task(_draft_sender_loop()) + def _append_text(t: str) -> None: + nonlocal delta + delta += t + text_changed.set() # 唤醒发送循环 + try: async for chain in generator: - if isinstance(chain, MessageChain): - if chain.type == "break": - # 分割符:停止发送循环,发送真实消息,重置状态 - streaming_done = True - await sender_task - if delta: - try: - markdown_text = telegramify_markdown.markdownify( - delta, - normalize_whitespace=False, - ) - await self.client.send_message( - text=markdown_text, - parse_mode="MarkdownV2", - **cast(Any, payload), - ) - except Exception as e: - logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") - await self.client.send_message( - text=delta, **cast(Any, payload) - ) - # 重置并启动新的发送循环 - delta = "" - last_sent_text = "" - draft_id = _allocate_draft_id() - streaming_done = False - sender_task = asyncio.create_task(_draft_sender_loop()) - continue - - # 处理消息链中的每个组件 - for i in chain.chain: - if isinstance(i, Plain): - delta += i.text - elif isinstance(i, Image): - image_path = await i.convert_to_file_path() - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_PHOTO, - self.client.send_photo, - user_name=user_name, - photo=image_path, - **cast(Any, payload), - ) - continue - elif isinstance(i, File): - path = await i.get_file() - name = i.name or os.path.basename(path) - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_DOCUMENT, - self.client.send_document, - user_name=user_name, - document=path, - filename=name, - **cast(Any, payload), - ) - continue - elif isinstance(i, Record): - path = await i.convert_to_file_path() - await self._send_voice_with_fallback( - self.client, - path, - payload, - caption=i.text or delta or None, - user_name=user_name, - message_thread_id=message_thread_id, - use_media_action=True, - ) - continue - elif isinstance(i, Video): - path = await i.convert_to_file_path() - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_VIDEO, - self.client.send_video, - user_name=user_name, - video=path, - **cast(Any, payload), - ) - continue - else: - logger.warning(f"不支持的消息类型: {type(i)}") - continue + if not isinstance(chain, MessageChain): + continue + + if chain.type == "break": + # 分割符:发送真实消息保留内容,重置缓冲区 + if delta: + # 用 emoji 清空 draft 显示,避免 draft 和真实消息同时可见 + await self._send_message_draft( + user_name, draft_id, "\u23f3", + message_thread_id, + ) + await self._send_final_segment(delta, payload) + delta = "" + last_sent_text = "" + draft_id = self._allocate_draft_id() + continue + + await self._process_chain_items( + chain, payload, user_name, message_thread_id, _append_text + ) finally: - # 停止发送循环 - streaming_done = True - if not sender_task.done(): - await sender_task + done = True + text_changed.set() # 唤醒循环使其退出 + await sender_task - # 流式结束:发送真实消息保留最终内容 + # 流式结束:用 emoji 清空 draft,然后发真实消息持久化 if delta: - try: - markdown_text = telegramify_markdown.markdownify( - delta, - normalize_whitespace=False, - ) - await self.client.send_message( - text=markdown_text, - parse_mode="MarkdownV2", - **cast(Any, payload), - ) - except Exception as e: - logger.warning(f"Markdown转换失败,使用普通文本: {e!s}") - await self.client.send_message(text=delta, **cast(Any, payload)) + await self._send_message_draft( + user_name, draft_id, "\u23f3", + message_thread_id, + ) + await self._send_final_segment(delta, payload) async def _send_streaming_edit( self, @@ -597,121 +617,67 @@ async def _send_streaming_edit( await self._ensure_typing(user_name, message_thread_id) last_chat_action_time = asyncio.get_event_loop().time() + def _append_text(t: str) -> None: + nonlocal delta + delta += t + async for chain in generator: - if isinstance(chain, MessageChain): - if chain.type == "break": - # 分割符 - if message_id: - try: - await self.client.edit_message_text( - text=delta, - chat_id=payload["chat_id"], - message_id=message_id, - ) - except Exception as e: - logger.warning(f"编辑消息失败(streaming-break): {e!s}") - message_id = None # 重置消息 ID - delta = "" # 重置 delta - continue + if not isinstance(chain, MessageChain): + continue - # 处理消息链中的每个组件 - for i in chain.chain: - if isinstance(i, Plain): - delta += i.text - elif isinstance(i, Image): - image_path = await i.convert_to_file_path() - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_PHOTO, - self.client.send_photo, - user_name=user_name, - photo=image_path, - **cast(Any, payload), - ) - continue - elif isinstance(i, File): - path = await i.get_file() - name = i.name or os.path.basename(path) - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_DOCUMENT, - self.client.send_document, - user_name=user_name, - document=path, - filename=name, - **cast(Any, payload), - ) - continue - elif isinstance(i, Record): - path = await i.convert_to_file_path() - await self._send_voice_with_fallback( - self.client, - path, - payload, - caption=i.text or delta or None, - user_name=user_name, - message_thread_id=message_thread_id, - use_media_action=True, - ) - continue - elif isinstance(i, Video): - path = await i.convert_to_file_path() - await self._send_media_with_action( - self.client, - ChatAction.UPLOAD_VIDEO, - self.client.send_video, - user_name=user_name, - video=path, - **cast(Any, payload), + if chain.type == "break": + # 分割符 + if message_id: + try: + await self.client.edit_message_text( + text=delta, + chat_id=payload["chat_id"], + message_id=message_id, ) - continue - else: - logger.warning(f"不支持的消息类型: {type(i)}") - continue + except Exception as e: + logger.warning(f"编辑消息失败(streaming-break): {e!s}") + message_id = None + delta = "" + continue - # Plain - if message_id and len(delta) <= self.MAX_MESSAGE_LENGTH: - current_time = asyncio.get_event_loop().time() - time_since_last_edit = current_time - last_edit_time - - # 如果距离上次编辑的时间 >= 设定的间隔,等待一段时间 - if time_since_last_edit >= throttle_interval: - # 发送 typing 状态(带节流) - current_time = asyncio.get_event_loop().time() - if current_time - last_chat_action_time >= chat_action_interval: - await self._ensure_typing(user_name, message_thread_id) - last_chat_action_time = current_time - # 编辑消息 - try: - await self.client.edit_message_text( - text=delta, - chat_id=payload["chat_id"], - message_id=message_id, - ) - current_content = delta - except Exception as e: - logger.warning(f"编辑消息失败(streaming): {e!s}") - last_edit_time = ( - asyncio.get_event_loop().time() - ) # 更新上次编辑的时间 - else: - # delta 长度一般不会大于 4096,因此这里直接发送 - # 发送 typing 状态(带节流) + await self._process_chain_items( + chain, payload, user_name, message_thread_id, _append_text + ) + + # 编辑或发送消息 + if message_id and len(delta) <= self.MAX_MESSAGE_LENGTH: + current_time = asyncio.get_event_loop().time() + time_since_last_edit = current_time - last_edit_time + + if time_since_last_edit >= throttle_interval: current_time = asyncio.get_event_loop().time() if current_time - last_chat_action_time >= chat_action_interval: await self._ensure_typing(user_name, message_thread_id) last_chat_action_time = current_time try: - msg = await self.client.send_message( - text=delta, **cast(Any, payload) + await self.client.edit_message_text( + text=delta, + chat_id=payload["chat_id"], + message_id=message_id, ) current_content = delta except Exception as e: - logger.warning(f"发送消息失败(streaming): {e!s}") - message_id = msg.message_id - last_edit_time = ( - asyncio.get_event_loop().time() - ) # 记录初始消息发送时间 + logger.warning(f"编辑消息失败(streaming): {e!s}") + last_edit_time = asyncio.get_event_loop().time() + else: + current_time = asyncio.get_event_loop().time() + if current_time - last_chat_action_time >= chat_action_interval: + await self._ensure_typing(user_name, message_thread_id) + last_chat_action_time = current_time + try: + msg = await self.client.send_message( + text=delta, **cast(Any, payload) + ) + current_content = delta + except Exception as e: + logger.warning(f"发送消息失败(streaming): {e!s}") + message_id = msg.message_id + last_edit_time = asyncio.get_event_loop().time() try: if delta and current_content != delta: From ed8e8403ad788d2577a3efb9cc50789091d12c48 Mon Sep 17 00:00:00 2001 From: camera-2018 <40380042+camera-2018@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:10:01 +0800 Subject: [PATCH 4/5] style(telegram): ruff format --- .../platform/sources/telegram/tg_event.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index d19ba2e14a..a2d987e08e 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -535,18 +535,24 @@ async def _draft_sender_loop() -> None: if draft_text != last_sent_text: try: md = telegramify_markdown.markdownify( - draft_text, normalize_whitespace=False, + draft_text, + normalize_whitespace=False, ) await self._send_message_draft( - user_name, draft_id, md, - message_thread_id, parse_mode="MarkdownV2", + user_name, + draft_id, + md, + message_thread_id, + parse_mode="MarkdownV2", ) last_sent_text = draft_text except Exception as e: # markdownify 对未闭合语法可能失败,回退纯文本 try: await self._send_message_draft( - user_name, draft_id, draft_text, + user_name, + draft_id, + draft_text, message_thread_id, ) last_sent_text = draft_text @@ -572,7 +578,9 @@ def _append_text(t: str) -> None: if delta: # 用 emoji 清空 draft 显示,避免 draft 和真实消息同时可见 await self._send_message_draft( - user_name, draft_id, "\u23f3", + user_name, + draft_id, + "\u23f3", message_thread_id, ) await self._send_final_segment(delta, payload) @@ -592,7 +600,9 @@ def _append_text(t: str) -> None: # 流式结束:用 emoji 清空 draft,然后发真实消息持久化 if delta: await self._send_message_draft( - user_name, draft_id, "\u23f3", + user_name, + draft_id, + "\u23f3", message_thread_id, ) await self._send_final_segment(delta, payload) From ce479d7179cb903341c8766574ec7a6c1b8e04e3 Mon Sep 17 00:00:00 2001 From: camera-2018 <40380042+camera-2018@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:16:30 +0800 Subject: [PATCH 5/5] style(telegram): ruff check --- astrbot/core/platform/sources/telegram/tg_event.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index a2d987e08e..96c7c5568c 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -1,7 +1,8 @@ import asyncio import os import re -from typing import Any, Callable, cast +from collections.abc import Callable +from typing import Any, cast import telegramify_markdown from telegram import ReactionTypeCustomEmoji, ReactionTypeEmoji @@ -10,7 +11,6 @@ from telegram.ext import ExtBot from astrbot import logger -from astrbot.core.utils.metrics import Metric from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.message_components import ( At, @@ -22,6 +22,7 @@ Video, ) from astrbot.api.platform import AstrBotMessage, MessageType, PlatformMetadata +from astrbot.core.utils.metrics import Metric class TelegramPlatformEvent(AstrMessageEvent): @@ -546,7 +547,7 @@ async def _draft_sender_loop() -> None: parse_mode="MarkdownV2", ) last_sent_text = draft_text - except Exception as e: + except Exception: # markdownify 对未闭合语法可能失败,回退纯文本 try: await self._send_message_draft(