From 288c57dd63b9f5ffb838db5b79cfc228d092a06b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 5 Feb 2026 20:21:54 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(telegram):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=AA=92=E4=BD=93=E7=BB=84=EF=BC=88=E7=9B=B8=E5=86=8C=EF=BC=89?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20/=20add=20media=20group=20(album)=20suppor?= =?UTF-8?q?t?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 功能说明 支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。 ## 主要改动 ### 1. 初始化媒体组缓存 (__init__) - 添加 `media_group_cache` 字典存储待处理的媒体组消息 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) ### 2. 消息处理流程 (message_handler) - 检测 `media_group_id` 判断是否为媒体组消息 - 媒体组消息走特殊处理流程,避免分散处理 ### 3. 媒体组消息缓存 (handle_media_group_message) - 缓存收到的媒体组消息 - 使用 APScheduler 实现防抖(debounce)机制 - 每收到新消息时重置超时计时器 - 超时后触发统一处理 ### 4. 媒体组合并处理 (process_media_group) - 从缓存中取出所有媒体项 - 使用第一条消息作为基础(保留文本、回复等信息) - 依次添加所有图片、视频、文档到消息链 - 将合并后的消息发送到处理流程 ## 技术方案论证 Telegram Bot API 在处理媒体组时的设计限制: 1. 将媒体组的每个消息作为独立的 update 发送 2. 每个 update 带有相同的 `media_group_id` 3. **不提供**组的总数、结束标志或一次性完整组的机制 因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。 这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。 ### 官方和社区证据: - **Telegram Bot API 服务器实现(tdlib)**:明确指出缺少结束标志或总数信息 https://github.com/tdlib/telegram-bot-api/issues/643 - **Telegram Bot API 服务器 issue**:讨论媒体组处理的不便性,推荐使用超时机制 https://github.com/tdlib/telegram-bot-api/issues/339 - **Telegraf(Node.js 框架)**:专用媒体组中间件使用 timeout 控制等待时间 https://github.com/DieTime/telegraf-media-group - **StackOverflow 讨论**:无法一次性获取媒体组所有文件,必须手动收集 https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id - **python-telegram-bot 社区**:确认媒体组消息单独到达,需手动处理 https://github.com/python-telegram-bot/python-telegram-bot/discussions/3143 - **Telegram Bot API 官方文档**:仅定义 `media_group_id` 为可选字段,不提供获取完整组的接口 https://core.telegram.org/bots/api#message ## 实现细节 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) - 采用防抖(debounce)机制:每收到新消息重置计时器 - 利用 APScheduler 实现延迟处理和任务调度 ## 测试验证 - ✅ 发送 5 张图片相册,成功合并为一条消息 - ✅ 保留原始文本说明和回复信息 - ✅ 支持图片、视频、文档混合的媒体组 - ✅ 日志显示 Processing media group with 5 items ## 代码变更 - 文件:astrbot/core/platform/sources/telegram/tg_adapter.py - 新增代码:124 行 - 新增方法:handle_media_group_message(), process_media_group() Co-Authored-By: Claude Sonnet 4.5 --- .../platform/sources/telegram/tg_adapter.py | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/astrbot/core/platform/sources/telegram/tg_adapter.py b/astrbot/core/platform/sources/telegram/tg_adapter.py index e7f2a102e2..c7a38d9dfb 100644 --- a/astrbot/core/platform/sources/telegram/tg_adapter.py +++ b/astrbot/core/platform/sources/telegram/tg_adapter.py @@ -89,6 +89,15 @@ def __init__( self.scheduler = AsyncIOScheduler() + # Media group handling + self.media_group_cache: dict[str, list] = {} + self.media_group_timeout = self.config.get( + "telegram_media_group_timeout", 2.5 + ) # seconds + self.media_group_max_wait = self.config.get( + "telegram_media_group_max_wait", 10.0 + ) # max seconds + @override async def send_by_session( self, @@ -225,6 +234,13 @@ async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def message_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE): logger.debug(f"Telegram message: {update.message}") + + # Handle media group messages + if update.message and update.message.media_group_id: + await self.handle_media_group_message(update, context) + return + + # Handle regular messages abm = await self.convert_message(update, context) if abm: await self.handle_msg(abm) @@ -399,6 +415,112 @@ async def convert_message( return message + async def handle_media_group_message( + self, update: Update, context: ContextTypes.DEFAULT_TYPE + ): + """Handle messages that are part of a media group (album). + + Caches incoming messages and schedules delayed processing to collect all + media items before sending to the pipeline. + """ + media_group_id = update.message.media_group_id + if not media_group_id: + return + + # Initialize cache for this media group if needed + if media_group_id not in self.media_group_cache: + self.media_group_cache[media_group_id] = [] + logger.debug(f"创建媒体组缓存: {media_group_id}") + + # Add this message to the cache + self.media_group_cache[media_group_id].append((update, context)) + logger.debug( + f"添加消息到媒体组 {media_group_id}, " + f"当前共 {len(self.media_group_cache[media_group_id])} 条" + ) + + # Cancel any existing scheduled job for this media group + job_id = f"media_group_{media_group_id}" + existing_jobs = self.scheduler.get_jobs() + for job in existing_jobs: + if job.id == job_id: + job.remove() + logger.debug(f"取消媒体组 {media_group_id} 的旧任务") + + # Schedule processing after timeout (debounced) + from datetime import datetime, timedelta + + self.scheduler.add_job( + self.process_media_group, + "date", + run_date=datetime.now() + timedelta(seconds=self.media_group_timeout), + args=[media_group_id], + id=job_id, + replace_existing=True, + ) + logger.debug( + f"已安排媒体组 {media_group_id} 在 {self.media_group_timeout} 秒后处理" + ) + + async def process_media_group(self, media_group_id: str): + """Process a complete media group by merging all collected messages. + + Args: + media_group_id: The unique identifier for this media group + """ + if media_group_id not in self.media_group_cache: + logger.warning(f"媒体组 {media_group_id} 未在缓存中找到") + return + + updates_and_contexts = self.media_group_cache.pop(media_group_id) + if not updates_and_contexts: + logger.warning(f"媒体组 {media_group_id} 为空") + return + + logger.info( + f"正在处理媒体组 {media_group_id},共 {len(updates_and_contexts)} 项" + ) + + # Use the first update to create the base message + first_update, first_context = updates_and_contexts[0] + abm = await self.convert_message(first_update, first_context) + + if not abm: + logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败") + return + + # Add additional media from remaining updates + for update, _context in updates_and_contexts[1:]: + if not update.message: + continue + + # Add photos + if update.message.photo: + photo = update.message.photo[-1] + file = await photo.get_file() + abm.message.append(Comp.Image(file=file.file_path, url=file.file_path)) + logger.debug(f"添加图片到媒体组: {file.file_path}") + + # Add videos + elif update.message.video: + file = await update.message.video.get_file() + if file.file_path: + abm.message.append( + Comp.Video(file=file.file_path, path=file.file_path) + ) + logger.debug(f"添加视频到媒体组: {file.file_path}") + + # Add documents + elif update.message.document: + file = await update.message.document.get_file() + file_name = update.message.document.file_name or uuid.uuid4().hex + if file.file_path: + abm.message.append(Comp.File(file=file.file_path, name=file_name)) + logger.debug(f"添加文档到媒体组: {file_name}") + + # Process the merged message + await self.handle_msg(abm) + async def handle_msg(self, message: AstrBotMessage): message_event = TelegramPlatformEvent( message_str=message.message_str, From edfb3d5c3318db3aafa9b2b49acd0d2d27159dcb Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 5 Feb 2026 21:35:36 +0800 Subject: [PATCH 2/3] =?UTF-8?q?refactor(telegram):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AA=92=E4=BD=93=E7=BB=84=E5=A4=84=E7=90=86=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E5=92=8C=E5=8F=AF=E9=9D=A0=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根据代码审查反馈改进: 1. 实现 media_group_max_wait 防止无限延迟 - 跟踪媒体组创建时间,超过最大等待时间立即处理 - 最坏情况下 10 秒内必定处理,防止消息持续到达导致无限延迟 2. 移除手动 job 查找优化性能 - 删除 O(N) 的 get_jobs() 循环扫描 - 依赖 replace_existing=True 自动替换任务 3. 重用 convert_message 减少代码重复 - 统一所有媒体类型转换逻辑 - 未来添加新媒体类型只需修改一处 Co-Authored-By: Claude Sonnet 4.5 --- .../platform/sources/telegram/tg_adapter.py | 95 +++++++++---------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_adapter.py b/astrbot/core/platform/sources/telegram/tg_adapter.py index c7a38d9dfb..f79d81faa0 100644 --- a/astrbot/core/platform/sources/telegram/tg_adapter.py +++ b/astrbot/core/platform/sources/telegram/tg_adapter.py @@ -90,13 +90,14 @@ def __init__( self.scheduler = AsyncIOScheduler() # Media group handling - self.media_group_cache: dict[str, list] = {} + # Cache structure: {media_group_id: {"created_at": datetime, "items": [(update, context), ...]}} + self.media_group_cache: dict[str, dict] = {} self.media_group_timeout = self.config.get( "telegram_media_group_timeout", 2.5 - ) # seconds + ) # seconds - debounce delay between messages self.media_group_max_wait = self.config.get( "telegram_media_group_max_wait", 10.0 - ) # max seconds + ) # max seconds - hard cap to prevent indefinite delay @override async def send_by_session( @@ -421,46 +422,57 @@ async def handle_media_group_message( """Handle messages that are part of a media group (album). Caches incoming messages and schedules delayed processing to collect all - media items before sending to the pipeline. + media items before sending to the pipeline. Uses debounce mechanism with + a hard cap (max_wait) to prevent indefinite delay. """ + from datetime import datetime, timedelta + media_group_id = update.message.media_group_id if not media_group_id: return # Initialize cache for this media group if needed if media_group_id not in self.media_group_cache: - self.media_group_cache[media_group_id] = [] + self.media_group_cache[media_group_id] = { + "created_at": datetime.now(), + "items": [], + } logger.debug(f"创建媒体组缓存: {media_group_id}") # Add this message to the cache - self.media_group_cache[media_group_id].append((update, context)) + entry = self.media_group_cache[media_group_id] + entry["items"].append((update, context)) logger.debug( f"添加消息到媒体组 {media_group_id}, " - f"当前共 {len(self.media_group_cache[media_group_id])} 条" + f"当前共 {len(entry['items'])} 条" ) - # Cancel any existing scheduled job for this media group - job_id = f"media_group_{media_group_id}" - existing_jobs = self.scheduler.get_jobs() - for job in existing_jobs: - if job.id == job_id: - job.remove() - logger.debug(f"取消媒体组 {media_group_id} 的旧任务") - - # Schedule processing after timeout (debounced) - from datetime import datetime, timedelta + # Calculate delay: if already waited too long, process immediately; + # otherwise use normal debounce timeout + elapsed = (datetime.now() - entry["created_at"]).total_seconds() + if elapsed >= self.media_group_max_wait: + delay = 0 + logger.debug( + f"媒体组 {media_group_id} 已达到最大等待时间 " + f"({elapsed:.1f}s >= {self.media_group_max_wait}s),立即处理" + ) + else: + delay = self.media_group_timeout + logger.debug( + f"已安排媒体组 {media_group_id} 在 {delay} 秒后处理 " + f"(已等待 {elapsed:.1f}s)" + ) + # Schedule/reschedule processing (replace_existing=True handles debounce) + job_id = f"media_group_{media_group_id}" self.scheduler.add_job( self.process_media_group, "date", - run_date=datetime.now() + timedelta(seconds=self.media_group_timeout), + run_date=datetime.now() + timedelta(seconds=delay), args=[media_group_id], id=job_id, replace_existing=True, ) - logger.debug( - f"已安排媒体组 {media_group_id} 在 {self.media_group_timeout} 秒后处理" - ) async def process_media_group(self, media_group_id: str): """Process a complete media group by merging all collected messages. @@ -472,7 +484,8 @@ async def process_media_group(self, media_group_id: str): logger.warning(f"媒体组 {media_group_id} 未在缓存中找到") return - updates_and_contexts = self.media_group_cache.pop(media_group_id) + entry = self.media_group_cache.pop(media_group_id) + updates_and_contexts = entry["items"] if not updates_and_contexts: logger.warning(f"媒体组 {media_group_id} 为空") return @@ -481,7 +494,7 @@ async def process_media_group(self, media_group_id: str): f"正在处理媒体组 {media_group_id},共 {len(updates_and_contexts)} 项" ) - # Use the first update to create the base message + # Use the first update to create the base message (with reply, caption, etc.) first_update, first_context = updates_and_contexts[0] abm = await self.convert_message(first_update, first_context) @@ -489,34 +502,18 @@ async def process_media_group(self, media_group_id: str): logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败") return - # Add additional media from remaining updates - for update, _context in updates_and_contexts[1:]: - if not update.message: + # Add additional media from remaining updates by reusing convert_message + for update, context in updates_and_contexts[1:]: + # Convert the message but skip reply chains (get_reply=False) + extra = await self.convert_message(update, context, get_reply=False) + if not extra: continue - # Add photos - if update.message.photo: - photo = update.message.photo[-1] - file = await photo.get_file() - abm.message.append(Comp.Image(file=file.file_path, url=file.file_path)) - logger.debug(f"添加图片到媒体组: {file.file_path}") - - # Add videos - elif update.message.video: - file = await update.message.video.get_file() - if file.file_path: - abm.message.append( - Comp.Video(file=file.file_path, path=file.file_path) - ) - logger.debug(f"添加视频到媒体组: {file.file_path}") - - # Add documents - elif update.message.document: - file = await update.message.document.get_file() - file_name = update.message.document.file_name or uuid.uuid4().hex - if file.file_path: - abm.message.append(Comp.File(file=file.file_path, name=file_name)) - logger.debug(f"添加文档到媒体组: {file_name}") + # Merge only the message components (keep base session/meta from first) + abm.message.extend(extra.message) + logger.debug( + f"添加 {len(extra.message)} 个组件到媒体组 {media_group_id}" + ) # Process the merged message await self.handle_msg(abm) From 62ff4d9306dc9ae2c823bfcdb6fccc7987d835e3 Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Sun, 8 Feb 2026 13:20:57 +0800 Subject: [PATCH 3/3] fix(telegram): handle missing message in media group processing and improve logging messages --- .../platform/sources/telegram/tg_adapter.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/astrbot/core/platform/sources/telegram/tg_adapter.py b/astrbot/core/platform/sources/telegram/tg_adapter.py index f79d81faa0..1e92972d65 100644 --- a/astrbot/core/platform/sources/telegram/tg_adapter.py +++ b/astrbot/core/platform/sources/telegram/tg_adapter.py @@ -427,6 +427,9 @@ async def handle_media_group_message( """ from datetime import datetime, timedelta + if not update.message: + return + media_group_id = update.message.media_group_id if not media_group_id: return @@ -437,14 +440,14 @@ async def handle_media_group_message( "created_at": datetime.now(), "items": [], } - logger.debug(f"创建媒体组缓存: {media_group_id}") + logger.debug(f"Create media group cache: {media_group_id}") # Add this message to the cache entry = self.media_group_cache[media_group_id] entry["items"].append((update, context)) logger.debug( - f"添加消息到媒体组 {media_group_id}, " - f"当前共 {len(entry['items'])} 条" + f"Add message to media group {media_group_id}, " + f"currently has {len(entry['items'])} items.", ) # Calculate delay: if already waited too long, process immediately; @@ -453,14 +456,14 @@ async def handle_media_group_message( if elapsed >= self.media_group_max_wait: delay = 0 logger.debug( - f"媒体组 {media_group_id} 已达到最大等待时间 " - f"({elapsed:.1f}s >= {self.media_group_max_wait}s),立即处理" + f"Media group {media_group_id} has reached max wait time " + f"({elapsed:.1f}s >= {self.media_group_max_wait}s), processing immediately.", ) else: delay = self.media_group_timeout logger.debug( - f"已安排媒体组 {media_group_id} 在 {delay} 秒后处理 " - f"(已等待 {elapsed:.1f}s)" + f"Scheduled media group {media_group_id} to be processed in {delay} seconds " + f"(already waited {elapsed:.1f}s)" ) # Schedule/reschedule processing (replace_existing=True handles debounce) @@ -481,17 +484,17 @@ async def process_media_group(self, media_group_id: str): media_group_id: The unique identifier for this media group """ if media_group_id not in self.media_group_cache: - logger.warning(f"媒体组 {media_group_id} 未在缓存中找到") + logger.warning(f"Media group {media_group_id} not found in cache") return entry = self.media_group_cache.pop(media_group_id) updates_and_contexts = entry["items"] if not updates_and_contexts: - logger.warning(f"媒体组 {media_group_id} 为空") + logger.warning(f"Media group {media_group_id} is empty") return logger.info( - f"正在处理媒体组 {media_group_id},共 {len(updates_and_contexts)} 项" + f"Processing media group {media_group_id}, total {len(updates_and_contexts)} items" ) # Use the first update to create the base message (with reply, caption, etc.) @@ -499,7 +502,9 @@ async def process_media_group(self, media_group_id: str): abm = await self.convert_message(first_update, first_context) if not abm: - logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败") + logger.warning( + f"Failed to convert the first message of media group {media_group_id}" + ) return # Add additional media from remaining updates by reusing convert_message @@ -512,7 +517,7 @@ async def process_media_group(self, media_group_id: str): # Merge only the message components (keep base session/meta from first) abm.message.extend(extra.message) logger.debug( - f"添加 {len(extra.message)} 个组件到媒体组 {media_group_id}" + f"Added {len(extra.message)} components to media group {media_group_id}" ) # Process the merged message @@ -545,6 +550,6 @@ async def terminate(self): if self.application.updater is not None: await self.application.updater.stop() - logger.info("Telegram 适配器已被关闭") + logger.info("Telegram adapter has been closed.") except Exception as e: - logger.error(f"Telegram 适配器关闭时出错: {e}") + logger.error(f"Error occurred while closing Telegram adapter: {e}")