Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions astrbot/core/platform/sources/telegram/tg_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ def __init__(
logger.debug(f"Telegram base url: {self.client.base_url}")

self.scheduler = AsyncIOScheduler()
self._terminating = False
raw_delay = self.config.get("telegram_polling_restart_delay", 5.0)
try:
delay = float(raw_delay)
except (TypeError, ValueError):
logger.warning(
"Invalid 'telegram_polling_restart_delay' value %r in config, "
"falling back to default 5.0s",
raw_delay,
)
delay = 5.0

if delay < 0.1:
logger.warning(
"Configured 'telegram_polling_restart_delay' (%s) is too small; "
"enforcing minimum of 0.1s to avoid tight restart loops",
delay,
)
delay = 0.1
self._polling_restart_delay = delay

# Media group handling
# Cache structure: {media_group_id: {"created_at": datetime, "items": [(update, context), ...]}}
Expand Down Expand Up @@ -145,9 +165,41 @@ async def run(self) -> None:
logger.error("Telegram Updater is not initialized. Cannot start polling.")
return

queue = self.application.updater.start_polling()
logger.info("Telegram Platform Adapter is running.")
await queue
while not self._terminating:
try:
await self.application.updater.start_polling(
error_callback=self._on_polling_error
)
logger.info("Telegram Platform Adapter is running.")
polling_check_event = asyncio.Event()
while self.application.updater.running and not self._terminating:
try:
await asyncio.wait_for(polling_check_event.wait(), timeout=1)
except TimeoutError:
continue

if not self._terminating:
logger.warning(
"Telegram polling loop exited unexpectedly, "
f"retrying in {self._polling_restart_delay}s."
)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(
"Telegram polling crashed with exception: "
f"{type(e).__name__}: {e!s}. "
f"Retrying in {self._polling_restart_delay}s.",
)

if not self._terminating:
await asyncio.sleep(self._polling_restart_delay)

def _on_polling_error(self, error: Exception) -> None:
logger.error(
f"Telegram polling request failed: {type(error).__name__}: {error!s}",
exc_info=(type(error), error, error.__traceback__),
)

async def register_commands(self) -> None:
"""收集所有注册的指令并注册到 Telegram"""
Expand Down Expand Up @@ -567,6 +619,7 @@ def get_client(self) -> ExtBot:

async def terminate(self) -> None:
try:
self._terminating = True
if self.scheduler.running:
self.scheduler.shutdown()

Expand Down
Loading