Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions anton/core/dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Anton Dispatch — bring agents to messaging platforms safely.

Dispatch is Anton's bridge between external messaging platforms (Telegram,
Slack, Discord, Gmail, CLI, …) and agent runtimes. It combines:

- **Channel adapters + isolation modes** — modeled on nanoclaw's design:
pluggable adapters, three isolation levels (shared session /
same-agent-separate-sessions / separate agent groups), per-session
SQLite as the single IO surface.
- **Cowork-style safety** — modeled on Claude Cowork's permission model:
declarative :class:`PermissionPolicy` per agent group, action-card
prompts for destructive actions, conservative defaults for scheduled
dispatch.

Public surface:

- :class:`ChannelAdapter`, :class:`InboundEvent`, :class:`OutboundMessage`,
:class:`ActionCard`, :class:`ActionResponse` — the adapter contract.
- :class:`AgentGroup`, :class:`MessagingGroup`, :class:`MessagingGroupAgent`,
:class:`Session`, :class:`SessionMode`, :class:`TriggerRule` — entities.
- :class:`PermissionPolicy`, :class:`GateDecision`, :func:`evaluate` —
safety gates.
- :class:`SQLiteSessionStore`, :class:`SessionStoreProtocol`,
:func:`open_store` — message persistence.
- :class:`DispatchRouter`, :class:`DispatchRepository`,
:class:`RuntimeOrchestrator`, :func:`matches_trigger` — orchestration.
- :class:`SqliteDispatchRepository` — concrete repository.
- :class:`InProcessRuntimeOrchestrator` — concrete runtime for in-process
agents (tests, CLI demos).
- :func:`register_channel_adapter`, :func:`init_channel_adapters` —
adapter discovery.
"""

from anton.core.dispatch.adapter import (
ActionCard,
ActionOption,
ActionResponse,
Attachment,
ChannelAdapter,
ChannelSetup,
InboundEvent,
InboundMessage,
MessageKind,
OutboundMessage,
PlatformAddress,
)
from anton.core.dispatch.entities import (
AgentGroup,
MessagingGroup,
MessagingGroupAgent,
Session,
SessionMode,
TriggerRule,
)
from anton.core.dispatch.local_runtime import (
AgentCallable,
InProcessRuntimeOrchestrator,
LocalScratchpadOrchestrator,
)
from anton.core.dispatch.policy import (
FileScope,
GateDecision,
GateResult,
PermissionPolicy,
ProposedAction,
evaluate,
)
from anton.core.dispatch.registry import (
AdapterFactory,
ChannelRegistration,
get_active_adapter,
get_active_adapters,
get_registered_channel_types,
init_channel_adapters,
register_channel_adapter,
shutdown_channel_adapters,
)
from anton.core.dispatch.repository import SqliteDispatchRepository
from anton.core.dispatch.router import (
DispatchRepository,
DispatchRouter,
PendingAction,
RuntimeOrchestrator,
matches_trigger,
)
from anton.core.dispatch.session_store import (
Direction,
SQLiteSessionStore,
SessionStoreProtocol,
StoredMessage,
open_store,
)

__all__ = [
# adapter
"ActionCard",
"ActionOption",
"ActionResponse",
"Attachment",
"ChannelAdapter",
"ChannelSetup",
"InboundEvent",
"InboundMessage",
"MessageKind",
"OutboundMessage",
"PlatformAddress",
# entities
"AgentGroup",
"MessagingGroup",
"MessagingGroupAgent",
"Session",
"SessionMode",
"TriggerRule",
# policy
"FileScope",
"GateDecision",
"GateResult",
"PermissionPolicy",
"ProposedAction",
"evaluate",
# registry
"AdapterFactory",
"ChannelRegistration",
"get_active_adapter",
"get_active_adapters",
"get_registered_channel_types",
"init_channel_adapters",
"register_channel_adapter",
"shutdown_channel_adapters",
# repository
"SqliteDispatchRepository",
# router
"DispatchRepository",
"DispatchRouter",
"PendingAction",
"RuntimeOrchestrator",
"matches_trigger",
# local_runtime
"AgentCallable",
"InProcessRuntimeOrchestrator",
"LocalScratchpadOrchestrator",
# session_store
"Direction",
"SQLiteSessionStore",
"SessionStoreProtocol",
"StoredMessage",
"open_store",
]
227 changes: 227 additions & 0 deletions anton/core/dispatch/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""Channel adapter protocol — the boundary between platforms and Anton.

Adapters bridge messaging platforms (Telegram, Slack, Discord, Gmail, CLI, …)
with the dispatch router. Each adapter is responsible for three things and
three things only:

1. **Ingress** — receive platform events (webhook / poll / websocket) and
normalize them into :class:`InboundEvent` objects.
2. **Filtering** — decide which events warrant agent attention. This may
be stateless (regex, mention-only) or stateful (e.g. "bot was once
mentioned in this thread → forward all subsequent messages"). How it
decides is the adapter's business; the router does not care.
3. **Egress** — deliver outbound :class:`OutboundMessage` payloads back
to the platform, including action cards (permission prompts) and
attachments.

Adapters do **not** know about agent groups, sessions, or vault credentials.
They speak in :class:`PlatformAddress` tuples ``(channel_type, platform_id,
thread_id)``; the router maps those to the entity model.

Two patterns are supported:
- **Native adapters** implement :class:`ChannelAdapter` directly.
- **Bridge adapters** wrap an existing SDK (e.g. python-telegram-bot)
and translate to/from the protocol.

Inspired by nanoclaw's channel adapter design (qwibitai/nanoclaw); the
safety primitives (:class:`PermissionPolicy`, action gates) are modeled on
Claude Cowork's permission semantics.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Awaitable, Callable, Literal, Protocol, runtime_checkable

# ---------------------------------------------------------------------------
# Address & event primitives
# ---------------------------------------------------------------------------


@dataclass(frozen=True)
class PlatformAddress:
"""A platform-level conversation address.

The router maps ``(channel_type, platform_id, thread_id)`` triples to
``(agent_group, session)`` pairs via the entity model. Adapters never
construct or read agent-group / session IDs — they only deal in
platform-level identifiers.

Attributes:
channel_type: Adapter name (``"telegram"``, ``"slack"``, ``"cli"``).
platform_id: Conversation identifier on the platform (chat ID, channel ID).
thread_id: Optional sub-context (Slack thread, GitHub PR comment thread).
``None`` means "the conversation as a whole".
"""

channel_type: str
platform_id: str
thread_id: str | None = None


@dataclass
class Attachment:
"""A file/media attachment carried with a message."""

filename: str
mime_type: str
data: bytes | None = None # inline payload
url: str | None = None # or remote reference


MessageKind = Literal["chat", "system", "webhook", "scheduled", "action_response"]


@dataclass
class InboundMessage:
"""Normalized inbound message from a platform.

Adapters construct these from platform-specific event payloads. The
``content`` field is a free-form JSON-serializable object — the router
forwards it to the agent runtime as-is.
"""

id: str
content: Any # dict / str — host JSON-encodes before persisting
timestamp: datetime
kind: MessageKind = "chat"
sender_id: str | None = None
sender_name: str | None = None
is_mention: bool | None = None
"""Platform-confirmed mention signal. ``None`` means the adapter doesn't
know — the router falls back to text-matching the agent name."""
is_group: bool | None = None
"""``True`` when the source is a group/channel; ``False`` for DMs."""
attachments: list[Attachment] = field(default_factory=list)


@dataclass
class InboundEvent:
"""A fully-addressed inbound message handed to the router."""

address: PlatformAddress
message: InboundMessage
reply_to: PlatformAddress | None = None
"""Override delivery destination. Used by admin/CLI transports that want
to inject a message into one channel but route replies elsewhere. Agents
cannot set this; only adapters may."""


# ---------------------------------------------------------------------------
# Outbound primitives
# ---------------------------------------------------------------------------


@dataclass
class OutboundMessage:
"""A reply produced by an agent, ready for delivery."""

address: PlatformAddress
text: str
attachments: list[Attachment] = field(default_factory=list)
reply_to_message_id: str | None = None


@dataclass
class ActionOption:
"""One choice in an action card."""

id: str
label: str
style: Literal["default", "primary", "destructive"] = "default"


@dataclass
class ActionCard:
"""An interactive prompt requiring a user decision.

Used for Cowork-style permission gates: when an agent attempts a gated
operation (file deletion, network egress to a non-allowlisted host,
spending money, etc.), the router holds the call and emits an action
card via the originating adapter. The agent resumes only after the
user clicks an option.

Attributes:
question_id: Stable identifier; the corresponding
:class:`ActionResponse` references it.
prompt: Human-readable question.
options: Choices presented to the user.
timeout_seconds: If the user doesn't respond within this window,
the gated call fails closed (denied). ``None`` = wait forever.
"""

question_id: str
prompt: str
options: list[ActionOption]
timeout_seconds: int | None = 300


@dataclass
class ActionResponse:
"""User's response to an :class:`ActionCard`."""

question_id: str
selected_option_id: str
user_id: str | None = None
timestamp: datetime | None = None


# ---------------------------------------------------------------------------
# Adapter protocol
# ---------------------------------------------------------------------------


@dataclass
class ChannelSetup:
"""Callbacks handed to an adapter at startup.

The adapter retains references to these callbacks and invokes them as
platform events arrive.
"""

on_inbound: Callable[[InboundEvent], Awaitable[None]]
on_metadata: Callable[[PlatformAddress, dict[str, Any]], Awaitable[None]]
"""Called when the adapter learns metadata about a conversation
(display name, member count, group/DM flag). The router persists this
for routing decisions and observability."""
on_action_response: Callable[[ActionResponse], Awaitable[None]]


@runtime_checkable
class ChannelAdapter(Protocol):
"""Structural interface every channel adapter must satisfy.

Adapters are instantiated by :mod:`anton.core.dispatch.registry` at
startup and given a :class:`ChannelSetup` to wire up callbacks. They
run for the lifetime of the dispatch process.
"""

@property
def channel_type(self) -> str:
"""Stable adapter name. Used in :class:`PlatformAddress`."""
...

async def setup(self, setup: ChannelSetup) -> None:
"""Begin listening for platform events and remember the callbacks."""
...

async def shutdown(self) -> None:
"""Cleanly stop listening; release platform resources."""
...

async def deliver(self, message: OutboundMessage) -> None:
"""Send a reply back to the platform."""
...

async def show_action_card(
self,
address: PlatformAddress,
card: ActionCard,
) -> None:
"""Render an interactive prompt in the originating channel.

The adapter chooses the platform-native rendering (Telegram inline
keyboard, Slack block kit, Discord button row, CLI numbered prompt).
"""
...
7 changes: 7 additions & 0 deletions anton/core/dispatch/channels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Channel adapter implementations.

Each module here is a self-contained :class:`ChannelAdapter` with its
own credentials and platform SDK. Native adapters (CLI, Telegram, Slack,
…) live in this package; bridge adapters that wrap external SDKs follow
the same layout.
"""
Loading
Loading