Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions astrbot/core/platform/sources/telegram/tg_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ def __init__(

self.scheduler = AsyncIOScheduler()

# Media group handling
# Cache structure: {media_group_id: {"created_at": datetime, "items": [(update, context), ...]}}
self.media_group_cache: dict[str, dict] = {}
self.media_group_timeout = self.config.get(
"telegram_media_group_timeout", 2.5
) # seconds - debounce delay between messages
self.media_group_max_wait = self.config.get(
"telegram_media_group_max_wait", 10.0
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
) # max seconds - hard cap to prevent indefinite delay

@override
async def send_by_session(
self,
Expand Down Expand Up @@ -225,6 +235,13 @@ async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):

async def message_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.debug(f"Telegram message: {update.message}")

# Handle media group messages
if update.message and update.message.media_group_id:
await self.handle_media_group_message(update, context)
return

# Handle regular messages
abm = await self.convert_message(update, context)
if abm:
await self.handle_msg(abm)
Expand Down Expand Up @@ -399,6 +416,113 @@ async def convert_message(

return message

async def handle_media_group_message(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
):
"""Handle messages that are part of a media group (album).

Caches incoming messages and schedules delayed processing to collect all
media items before sending to the pipeline. Uses debounce mechanism with
a hard cap (max_wait) to prevent indefinite delay.
"""
from datetime import datetime, timedelta

if not update.message:
return

media_group_id = update.message.media_group_id
if not media_group_id:
return

# Initialize cache for this media group if needed
if media_group_id not in self.media_group_cache:
self.media_group_cache[media_group_id] = {
"created_at": datetime.now(),
"items": [],
}
logger.debug(f"Create media group cache: {media_group_id}")

# Add this message to the cache
entry = self.media_group_cache[media_group_id]
entry["items"].append((update, context))
logger.debug(
f"Add message to media group {media_group_id}, "
f"currently has {len(entry['items'])} items.",
)

# Calculate delay: if already waited too long, process immediately;
# otherwise use normal debounce timeout
elapsed = (datetime.now() - entry["created_at"]).total_seconds()
if elapsed >= self.media_group_max_wait:
delay = 0
logger.debug(
f"Media group {media_group_id} has reached max wait time "
f"({elapsed:.1f}s >= {self.media_group_max_wait}s), processing immediately.",
)
else:
delay = self.media_group_timeout
logger.debug(
f"Scheduled media group {media_group_id} to be processed in {delay} seconds "
f"(already waited {elapsed:.1f}s)"
)

# Schedule/reschedule processing (replace_existing=True handles debounce)
job_id = f"media_group_{media_group_id}"
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=delay),
args=[media_group_id],
id=job_id,
replace_existing=True,
)

async def process_media_group(self, media_group_id: str):
"""Process a complete media group by merging all collected messages.

Args:
media_group_id: The unique identifier for this media group
"""
if media_group_id not in self.media_group_cache:
logger.warning(f"Media group {media_group_id} not found in cache")
return

entry = self.media_group_cache.pop(media_group_id)
updates_and_contexts = entry["items"]
if not updates_and_contexts:
logger.warning(f"Media group {media_group_id} is empty")
return

logger.info(
f"Processing media group {media_group_id}, total {len(updates_and_contexts)} items"
)

# Use the first update to create the base message (with reply, caption, etc.)
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)

if not abm:
logger.warning(
f"Failed to convert the first message of media group {media_group_id}"
)
return

# Add additional media from remaining updates by reusing convert_message
for update, context in updates_and_contexts[1:]:
# Convert the message but skip reply chains (get_reply=False)
extra = await self.convert_message(update, context, get_reply=False)
if not extra:
continue

# Merge only the message components (keep base session/meta from first)
abm.message.extend(extra.message)
logger.debug(
f"Added {len(extra.message)} components to media group {media_group_id}"
)

# Process the merged message
await self.handle_msg(abm)

async def handle_msg(self, message: AstrBotMessage):
message_event = TelegramPlatformEvent(
message_str=message.message_str,
Expand Down Expand Up @@ -426,6 +550,6 @@ async def terminate(self):
if self.application.updater is not None:
await self.application.updater.stop()

logger.info("Telegram 适配器已被关闭")
logger.info("Telegram adapter has been closed.")
except Exception as e:
logger.error(f"Telegram 适配器关闭时出错: {e}")
logger.error(f"Error occurred while closing Telegram adapter: {e}")