Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 130 additions & 122 deletions matrix/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import logging

from typing import Union, Optional
from typing import Union, Optional, Any

from nio import AsyncClient, Event, MatrixRoom

Expand Down Expand Up @@ -49,9 +49,27 @@ def __init__(
self.help: HelpCommand = help or DefaultHelpCommand(prefix=self.prefix)
self.register_command(self.help)

self.client.add_event_callback(self._on_event, Event)
self.client.add_event_callback(self._on_matrix_event, Event)
self._auto_register_events()

def _auto_register_events(self) -> None:
for attr in dir(self):
if not attr.startswith("on_"):
continue

coro = getattr(self, attr, None)
if not inspect.iscoroutinefunction(coro):
continue

try:
if attr in self.LIFECYCLE_EVENTS:
self.hook(coro)

if attr in self.EVENT_MAP:
self.event(coro)
except ValueError:
continue

def get_room(self, room_id: str) -> Room:
"""Retrieve a Room instance based on the room_id."""
matrix_room = self.client.rooms[room_id]
Expand All @@ -72,6 +90,9 @@ def load_extension(self, extension: Extension) -> None:
for event_type, handlers in extension._event_handlers.items():
self._event_handlers[event_type].extend(handlers)

for hook_name, handlers in extension._hook_handlers.items():
self._hook_handlers[hook_name].extend(handlers)

self._checks.extend(extension._checks)
self._error_handlers.update(extension._error_handlers)
self._command_error_handlers.update(extension._command_error_handlers)
Expand Down Expand Up @@ -118,133 +139,74 @@ def unload_extension(self, ext_name: str) -> None:
extension.unload()
self.log.debug("unloaded extension '%s'", ext_name)

def _auto_register_events(self) -> None:
for attr in dir(self):
if not attr.startswith("on_"):
continue
coro = getattr(self, attr, None)
if inspect.iscoroutinefunction(coro):
try:
self.event(coro)
except ValueError: # ignore unknown name
continue

async def _on_event(self, room: MatrixRoom, event: Event) -> None:
# ignore bot events
if event.sender == self.client.user:
return

# ignore events that happened before the bot started
if self.start_at and self.start_at > (event.server_timestamp / 1000):
return

try:
await self._dispatch(room, event)
except Exception as error:
await self.on_error(error)

async def _dispatch(self, room: MatrixRoom, event: Event) -> None:
"""Internal type-based fan-out plus optional command handling."""
for event_type, funcs in self._event_handlers.items():
if isinstance(event, event_type):
for func in funcs:
await func(room, event)

async def _process_commands(self, room: MatrixRoom, event: Event) -> None:
"""Parse and execute commands"""
ctx = await self._build_context(room, event)

if ctx.command:
for check in self._checks:
if not await check(ctx):
raise CheckError(ctx.command, check)

await ctx.command(ctx)

async def _build_context(self, matrix_room: MatrixRoom, event: Event) -> Context:
room = self.get_room(matrix_room.room_id)
ctx = Context(bot=self, room=room, event=event)
prefix: str | None = None

if self.prefix is not None and ctx.body.startswith(self.prefix):
prefix = self.prefix
else:
prefix = next(
(
cmd.prefix
for cmd in self._commands.values()
if cmd.prefix is not None and ctx.body.startswith(cmd.prefix)
),
self.config.prefix,
)

if prefix is None or not ctx.body.startswith(prefix):
return ctx

if parts := ctx.body[len(prefix) :].split():
cmd_name = parts[0]
cmd = self._commands.get(cmd_name)

if cmd and cmd.prefix and not ctx.body.startswith(cmd.prefix):
return ctx

if not cmd:
raise CommandNotFoundError(cmd_name)

ctx.command = cmd

return ctx

async def on_message(self, room: MatrixRoom, event: Event) -> None:
"""
Invoked when a message event is received.

This method is automatically called when a :class:`nio.RoomMessageText`
event is detected. It is primarily responsible for detecting and
processing commands that match the bot's defined prefix.

:param ctx: The context object containing information about the Matrix
room and the message event.
:type ctx: Context
"""
await self._process_commands(room, event)
# LIFECYCLE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this comments?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to organise/separate the code a bit better to make it easier to read.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be worth separating those into files rather than keeping all in the same file?

Copy link
Copy Markdown
Contributor Author

@PenguinBoi12 PenguinBoi12 Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I thought about it but no, those belong here. They are part of the logic of the bot; it's the bot that need to handle events it's just that we have 2 types of events now - lifecycle events (our internal events) and matrix events (the events we receive from matrix). If you look you'll see that it's mostly event related methods. The only reason why I added the comment was to cleanly create a separation between our types of events. The rest is mostly small helpers related to events (so it's still belong there).

The only thing that might be moved away since it's more "client" related is the get_room. But that would require a wrapper around AsyncClient and it's out of scope for this PR. Plus, it's not super urgent since we only have this method for now.


async def on_ready(self) -> None:
"""Invoked after a successful login, before sync starts."""
self.log.info("bot is ready")
"""Override this in a subclass."""
pass

async def _on_ready(self) -> None:
"""Internal hook — always fires, calls public override then extension handlers."""
await self.on_ready()
await self._dispatch("on_ready")

async def on_error(self, error: Exception) -> None:
"""
Handle errors by invoking a registered error handler,
a generic error callback, or logging the exception.
"""Override this in a subclass."""
self.log.exception("Unhandled error: '%s'", error)

:param error: The exception instance that was raised.
:type error: Exceptipon
"""
async def _on_error(self, error: Exception) -> None:
if handler := self._error_handlers.get(type(error)):
await handler(error)
return

if self._on_error:
await self._on_error(error)
if self._fallback_error_handler:
await self._fallback_error_handler(error)
return

await self._dispatch("on_error", error)

async def on_command(self, _ctx: Context) -> None:
"""Override this in a subclass."""
pass

async def _on_command(self, ctx: Context) -> None:
await self._dispatch("on_command", ctx)

async def on_command_error(self, _ctx: Context, error: Exception) -> None:
"""Override this in a subclass."""
self.log.exception("Unhandled error: '%s'", error)

async def on_command_error(self, ctx: "Context", error: Exception) -> None:
async def _on_command_error(self, ctx: Context, error: Exception) -> None:
"""
Handles errors raised during command invocation.

This method is called automatically when a command error occurs.
If a specific error handler is registered for the type of the
exception, it will be invoked with the current context and error.

:param ctx: The context in which the command was invoked.
:type ctx: Context
:param error: The exception that was raised during command execution.
:type error: Exception
"""
if handler := self._command_error_handlers.get(type(error)):
await handler(ctx, error)
return

await self._dispatch("on_command_error", ctx, error)

# ENTRYPOINT

def start(self) -> None:
"""
Synchronous entry point for running the bot.

This is a convenience wrapper that allows running the bot like a
script using a blocking call. It internally calls :meth:`run` within
:func:`asyncio.run`, and ensures the client is closed gracefully
on interruption.
"""
try:
asyncio.run(self.run())
except KeyboardInterrupt:
self.log.info("bot interrupted by user")
finally:
asyncio.run(self.client.close())

async def run(self) -> None:
"""
Expand All @@ -268,21 +230,67 @@ async def run(self) -> None:

self.scheduler.start()

await self.on_ready()
await self._on_ready()
await self.client.sync_forever(timeout=30_000)

def start(self) -> None:
"""
Synchronous entry point for running the bot.
# MATRIX EVENTS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain the purpose of those comments?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


async def on_message(self, room: MatrixRoom, event: Event) -> None:
await self._process_commands(room, event)

async def _on_matrix_event(self, room: MatrixRoom, event: Event) -> None:
# ignore bot events
if event.sender == self.client.user:
return

# ignore events that happened before the bot started
if self.start_at and self.start_at > (event.server_timestamp / 1000):
return

This is a convenience wrapper that allows running the bot like a
script using a blocking call. It internally calls :meth:`run` within
:func:`asyncio.run`, and ensures the client is closed gracefully
on interruption.
"""
try:
asyncio.run(self.run())
except KeyboardInterrupt:
self.log.info("bot interrupted by user")
finally:
asyncio.run(self.client.close())
await self._dispatch_matrix_event(room, event)
except Exception as error:
await self._on_error(error)

async def _dispatch(self, event_name: str, *args: Any, **kwargs: Any) -> None:
"""Fire all listeners registered for a named lifecycle event."""
for handler in self._hook_handlers.get(event_name, []):
await handler(*args, **kwargs)

async def _dispatch_matrix_event(self, room: MatrixRoom, event: Event) -> None:
"""Fire all listeners registered for a named matrix event."""
for event_type, funcs in self._event_handlers.items():
if isinstance(event, event_type):
for func in funcs:
await func(room, event)

async def _process_commands(self, room: MatrixRoom, event: Event) -> None:
"""Parse and execute commands"""
ctx = await self._build_context(room, event)

if ctx.command:
for check in self._checks:
if not await check(ctx):
raise CheckError(ctx.command, check)

await self._on_command(ctx)
await ctx.command(ctx)

async def _build_context(self, matrix_room: MatrixRoom, event: Event) -> Context:
room = self.get_room(matrix_room.room_id)
ctx = Context(bot=self, room=room, event=event)
prefix = self.prefix or self.config.prefix

if not ctx.body.startswith(prefix):
return ctx

if parts := ctx.body[len(prefix) :].split():
cmd_name = parts[0]
cmd = self._commands.get(cmd_name)

if not cmd:
raise CommandNotFoundError(cmd_name)

ctx.command = cmd

return ctx
Loading
Loading