diff --git a/astrbot/core/platform/sources/telegram/tg_adapter.py b/astrbot/core/platform/sources/telegram/tg_adapter.py index e7f2a102e2..1e92972d65 100644 --- a/astrbot/core/platform/sources/telegram/tg_adapter.py +++ b/astrbot/core/platform/sources/telegram/tg_adapter.py @@ -89,6 +89,16 @@ def __init__( self.scheduler = AsyncIOScheduler() + # Media group handling + # Cache structure: {media_group_id: {"created_at": datetime, "items": [(update, context), ...]}} + self.media_group_cache: dict[str, dict] = {} + self.media_group_timeout = self.config.get( + "telegram_media_group_timeout", 2.5 + ) # seconds - debounce delay between messages + self.media_group_max_wait = self.config.get( + "telegram_media_group_max_wait", 10.0 + ) # max seconds - hard cap to prevent indefinite delay + @override async def send_by_session( self, @@ -225,6 +235,13 @@ async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def message_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE): logger.debug(f"Telegram message: {update.message}") + + # Handle media group messages + if update.message and update.message.media_group_id: + await self.handle_media_group_message(update, context) + return + + # Handle regular messages abm = await self.convert_message(update, context) if abm: await self.handle_msg(abm) @@ -399,6 +416,113 @@ async def convert_message( return message + async def handle_media_group_message( + self, update: Update, context: ContextTypes.DEFAULT_TYPE + ): + """Handle messages that are part of a media group (album). + + Caches incoming messages and schedules delayed processing to collect all + media items before sending to the pipeline. Uses debounce mechanism with + a hard cap (max_wait) to prevent indefinite delay. + """ + from datetime import datetime, timedelta + + if not update.message: + return + + media_group_id = update.message.media_group_id + if not media_group_id: + return + + # Initialize cache for this media group if needed + if media_group_id not in self.media_group_cache: + self.media_group_cache[media_group_id] = { + "created_at": datetime.now(), + "items": [], + } + logger.debug(f"Create media group cache: {media_group_id}") + + # Add this message to the cache + entry = self.media_group_cache[media_group_id] + entry["items"].append((update, context)) + logger.debug( + f"Add message to media group {media_group_id}, " + f"currently has {len(entry['items'])} items.", + ) + + # Calculate delay: if already waited too long, process immediately; + # otherwise use normal debounce timeout + elapsed = (datetime.now() - entry["created_at"]).total_seconds() + if elapsed >= self.media_group_max_wait: + delay = 0 + logger.debug( + f"Media group {media_group_id} has reached max wait time " + f"({elapsed:.1f}s >= {self.media_group_max_wait}s), processing immediately.", + ) + else: + delay = self.media_group_timeout + logger.debug( + f"Scheduled media group {media_group_id} to be processed in {delay} seconds " + f"(already waited {elapsed:.1f}s)" + ) + + # Schedule/reschedule processing (replace_existing=True handles debounce) + job_id = f"media_group_{media_group_id}" + self.scheduler.add_job( + self.process_media_group, + "date", + run_date=datetime.now() + timedelta(seconds=delay), + args=[media_group_id], + id=job_id, + replace_existing=True, + ) + + async def process_media_group(self, media_group_id: str): + """Process a complete media group by merging all collected messages. + + Args: + media_group_id: The unique identifier for this media group + """ + if media_group_id not in self.media_group_cache: + logger.warning(f"Media group {media_group_id} not found in cache") + return + + entry = self.media_group_cache.pop(media_group_id) + updates_and_contexts = entry["items"] + if not updates_and_contexts: + logger.warning(f"Media group {media_group_id} is empty") + return + + logger.info( + f"Processing media group {media_group_id}, total {len(updates_and_contexts)} items" + ) + + # Use the first update to create the base message (with reply, caption, etc.) + first_update, first_context = updates_and_contexts[0] + abm = await self.convert_message(first_update, first_context) + + if not abm: + logger.warning( + f"Failed to convert the first message of media group {media_group_id}" + ) + return + + # Add additional media from remaining updates by reusing convert_message + for update, context in updates_and_contexts[1:]: + # Convert the message but skip reply chains (get_reply=False) + extra = await self.convert_message(update, context, get_reply=False) + if not extra: + continue + + # Merge only the message components (keep base session/meta from first) + abm.message.extend(extra.message) + logger.debug( + f"Added {len(extra.message)} components to media group {media_group_id}" + ) + + # Process the merged message + await self.handle_msg(abm) + async def handle_msg(self, message: AstrBotMessage): message_event = TelegramPlatformEvent( message_str=message.message_str, @@ -426,6 +550,6 @@ async def terminate(self): if self.application.updater is not None: await self.application.updater.stop() - logger.info("Telegram 适配器已被关闭") + logger.info("Telegram adapter has been closed.") except Exception as e: - logger.error(f"Telegram 适配器关闭时出错: {e}") + logger.error(f"Error occurred while closing Telegram adapter: {e}")