diff --git a/anton/cli.py b/anton/cli.py index 74b9cfc2..f7233cd6 100644 --- a/anton/cli.py +++ b/anton/cli.py @@ -13,6 +13,7 @@ import typer from rich.console import Console from rich.live import Live +from rich.panel import Panel from rich.prompt import Confirm from rich.spinner import Spinner from rich.table import Table @@ -175,9 +176,26 @@ def _ensure_dependencies(console: Console) -> None: raise typer.Exit(1) -def _ensure_terms_consent(console: Console, settings) -> None: - """Show terms acceptance screen on first run and persist the choice.""" - # Clear screen +_TERMS_SUMMARY = ( + "[bold]What you should know[/]\n\n" + " • Anton runs on your machine. Your code, files, and queries\n" + " stay local unless you explicitly ask Anton to send them.\n\n" + " • Anton sends anonymous usage events (e.g. session started)\n" + " to MindsDB to help us improve. No code or query content is\n" + " included. Disable with [anton.cyan]ANTON_ANALYTICS_ENABLED=false[/].\n\n" + " • When Anton uses an LLM provider you configure (OpenAI,\n" + " Anthropic, Minds, etc.), the prompts you send go to that\n" + " provider and are governed by their own terms.\n\n" + " • You're responsible for what Anton does on your behalf —\n" + " review proposed actions before authorizing them.\n\n" + "[bold]Full text[/]\n\n" + " Terms: [link=https://mindsdb.com/terms]https://mindsdb.com/terms[/]\n" + " Privacy: [link=https://mindsdb.com/privacy-policy]https://mindsdb.com/privacy-policy[/]\n" +) + + +def _render_terms_screen(console: Console) -> None: + """Render the welcome banner + policy summary panel.""" os.system("cls" if sys.platform == "win32" else "clear") logo = "A N T O N" @@ -192,21 +210,55 @@ def _ensure_terms_consent(console: Console, settings) -> None: " and accept our Anton policies." ) console.print() + console.print( + Panel( + _TERMS_SUMMARY, + title="Anton — Terms & Privacy", + title_align="left", + border_style="anton.cyan", + padding=(1, 2), + ) + ) + console.print() - if Confirm.ask( - " Would you like to read the policies?", default=True, console=console - ): - webbrowser.open("https://mindsdb.com/terms") - webbrowser.open("https://mindsdb.com/privacy-policy") + +def _show_full_policies(console: Console) -> None: + """Render the full Terms + Privacy Policy through the system pager.""" + from rich.markdown import Markdown + from rich.rule import Rule + + from anton.policies import PRIVACY_POLICY_MD, TERMS_OF_USE_MD + + with console.pager(styles=True): + console.print(Markdown(TERMS_OF_USE_MD)) console.print() - console.print(" [anton.muted]Policies opened in your browser.[/]") + console.print(Rule(style="anton.cyan")) console.print() + console.print(Markdown(PRIVACY_POLICY_MD)) - accepted = Confirm.ask( - " Do you accept the Terms and Privacy Policy?", - default=True, - console=console, - ) + +def _ensure_terms_consent(console: Console, settings) -> None: + """Show terms acceptance screen on first run and persist the choice.""" + from rich.prompt import Prompt + + accepted: bool | None = None + while accepted is None: + _render_terms_screen(console) + choice = Prompt.ask( + " Accept Terms and Privacy Policy? " + "[bold]y[/]es / [bold]n[/]o / [bold]s[/]how full text", + choices=["y", "n", "s"], + default="y", + show_choices=False, + console=console, + ).lower() + + if choice == "y": + accepted = True + elif choice == "n": + accepted = False + elif choice == "s": + _show_full_policies(console) if not accepted: console.print() diff --git a/anton/core/dispatch/__init__.py b/anton/core/dispatch/__init__.py new file mode 100644 index 00000000..a396625e --- /dev/null +++ b/anton/core/dispatch/__init__.py @@ -0,0 +1,148 @@ +"""Anton Dispatch — bring agents to messaging platforms safely. + +Dispatch is Anton's bridge between external messaging platforms (Telegram, +Slack, Discord, Gmail, CLI, …) and agent runtimes. It combines: + + - **Channel adapters + isolation modes** — modeled on nanoclaw's design: + pluggable adapters, three isolation levels (shared session / + same-agent-separate-sessions / separate agent groups), per-session + SQLite as the single IO surface. + - **Cowork-style safety** — modeled on Claude Cowork's permission model: + declarative :class:`PermissionPolicy` per agent group, action-card + prompts for destructive actions, conservative defaults for scheduled + dispatch. + +Public surface: + + - :class:`ChannelAdapter`, :class:`InboundEvent`, :class:`OutboundMessage`, + :class:`ActionCard`, :class:`ActionResponse` — the adapter contract. + - :class:`AgentGroup`, :class:`MessagingGroup`, :class:`MessagingGroupAgent`, + :class:`Session`, :class:`SessionMode`, :class:`TriggerRule` — entities. + - :class:`PermissionPolicy`, :class:`GateDecision`, :func:`evaluate` — + safety gates. + - :class:`SQLiteSessionStore`, :class:`SessionStoreProtocol`, + :func:`open_store` — message persistence. + - :class:`DispatchRouter`, :class:`DispatchRepository`, + :class:`RuntimeOrchestrator`, :func:`matches_trigger` — orchestration. + - :class:`SqliteDispatchRepository` — concrete repository. + - :class:`InProcessRuntimeOrchestrator` — concrete runtime for in-process + agents (tests, CLI demos). + - :func:`register_channel_adapter`, :func:`init_channel_adapters` — + adapter discovery. +""" + +from anton.core.dispatch.adapter import ( + ActionCard, + ActionOption, + ActionResponse, + Attachment, + ChannelAdapter, + ChannelSetup, + InboundEvent, + InboundMessage, + MessageKind, + OutboundMessage, + PlatformAddress, +) +from anton.core.dispatch.entities import ( + AgentGroup, + MessagingGroup, + MessagingGroupAgent, + Session, + SessionMode, + TriggerRule, +) +from anton.core.dispatch.local_runtime import ( + AgentCallable, + InProcessRuntimeOrchestrator, + LocalScratchpadOrchestrator, +) +from anton.core.dispatch.policy import ( + FileScope, + GateDecision, + GateResult, + PermissionPolicy, + ProposedAction, + evaluate, +) +from anton.core.dispatch.registry import ( + AdapterFactory, + ChannelRegistration, + get_active_adapter, + get_active_adapters, + get_registered_channel_types, + init_channel_adapters, + register_channel_adapter, + shutdown_channel_adapters, +) +from anton.core.dispatch.repository import SqliteDispatchRepository +from anton.core.dispatch.router import ( + DispatchRepository, + DispatchRouter, + PendingAction, + RuntimeOrchestrator, + matches_trigger, +) +from anton.core.dispatch.session_store import ( + Direction, + SQLiteSessionStore, + SessionStoreProtocol, + StoredMessage, + open_store, +) + +__all__ = [ + # adapter + "ActionCard", + "ActionOption", + "ActionResponse", + "Attachment", + "ChannelAdapter", + "ChannelSetup", + "InboundEvent", + "InboundMessage", + "MessageKind", + "OutboundMessage", + "PlatformAddress", + # entities + "AgentGroup", + "MessagingGroup", + "MessagingGroupAgent", + "Session", + "SessionMode", + "TriggerRule", + # policy + "FileScope", + "GateDecision", + "GateResult", + "PermissionPolicy", + "ProposedAction", + "evaluate", + # registry + "AdapterFactory", + "ChannelRegistration", + "get_active_adapter", + "get_active_adapters", + "get_registered_channel_types", + "init_channel_adapters", + "register_channel_adapter", + "shutdown_channel_adapters", + # repository + "SqliteDispatchRepository", + # router + "DispatchRepository", + "DispatchRouter", + "PendingAction", + "RuntimeOrchestrator", + "matches_trigger", + # local_runtime + "AgentCallable", + "InProcessRuntimeOrchestrator", + "LocalScratchpadOrchestrator", + # session_store + "Direction", + "SQLiteSessionStore", + "SessionStoreProtocol", + "StoredMessage", + "open_store", +] diff --git a/anton/core/dispatch/adapter.py b/anton/core/dispatch/adapter.py new file mode 100644 index 00000000..edbaaeac --- /dev/null +++ b/anton/core/dispatch/adapter.py @@ -0,0 +1,227 @@ +"""Channel adapter protocol — the boundary between platforms and Anton. + +Adapters bridge messaging platforms (Telegram, Slack, Discord, Gmail, CLI, …) +with the dispatch router. Each adapter is responsible for three things and +three things only: + + 1. **Ingress** — receive platform events (webhook / poll / websocket) and + normalize them into :class:`InboundEvent` objects. + 2. **Filtering** — decide which events warrant agent attention. This may + be stateless (regex, mention-only) or stateful (e.g. "bot was once + mentioned in this thread → forward all subsequent messages"). How it + decides is the adapter's business; the router does not care. + 3. **Egress** — deliver outbound :class:`OutboundMessage` payloads back + to the platform, including action cards (permission prompts) and + attachments. + +Adapters do **not** know about agent groups, sessions, or vault credentials. +They speak in :class:`PlatformAddress` tuples ``(channel_type, platform_id, +thread_id)``; the router maps those to the entity model. + +Two patterns are supported: + - **Native adapters** implement :class:`ChannelAdapter` directly. + - **Bridge adapters** wrap an existing SDK (e.g. python-telegram-bot) + and translate to/from the protocol. + +Inspired by nanoclaw's channel adapter design (qwibitai/nanoclaw); the +safety primitives (:class:`PermissionPolicy`, action gates) are modeled on +Claude Cowork's permission semantics. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Awaitable, Callable, Literal, Protocol, runtime_checkable + +# --------------------------------------------------------------------------- +# Address & event primitives +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class PlatformAddress: + """A platform-level conversation address. + + The router maps ``(channel_type, platform_id, thread_id)`` triples to + ``(agent_group, session)`` pairs via the entity model. Adapters never + construct or read agent-group / session IDs — they only deal in + platform-level identifiers. + + Attributes: + channel_type: Adapter name (``"telegram"``, ``"slack"``, ``"cli"``). + platform_id: Conversation identifier on the platform (chat ID, channel ID). + thread_id: Optional sub-context (Slack thread, GitHub PR comment thread). + ``None`` means "the conversation as a whole". + """ + + channel_type: str + platform_id: str + thread_id: str | None = None + + +@dataclass +class Attachment: + """A file/media attachment carried with a message.""" + + filename: str + mime_type: str + data: bytes | None = None # inline payload + url: str | None = None # or remote reference + + +MessageKind = Literal["chat", "system", "webhook", "scheduled", "action_response"] + + +@dataclass +class InboundMessage: + """Normalized inbound message from a platform. + + Adapters construct these from platform-specific event payloads. The + ``content`` field is a free-form JSON-serializable object — the router + forwards it to the agent runtime as-is. + """ + + id: str + content: Any # dict / str — host JSON-encodes before persisting + timestamp: datetime + kind: MessageKind = "chat" + sender_id: str | None = None + sender_name: str | None = None + is_mention: bool | None = None + """Platform-confirmed mention signal. ``None`` means the adapter doesn't + know — the router falls back to text-matching the agent name.""" + is_group: bool | None = None + """``True`` when the source is a group/channel; ``False`` for DMs.""" + attachments: list[Attachment] = field(default_factory=list) + + +@dataclass +class InboundEvent: + """A fully-addressed inbound message handed to the router.""" + + address: PlatformAddress + message: InboundMessage + reply_to: PlatformAddress | None = None + """Override delivery destination. Used by admin/CLI transports that want + to inject a message into one channel but route replies elsewhere. Agents + cannot set this; only adapters may.""" + + +# --------------------------------------------------------------------------- +# Outbound primitives +# --------------------------------------------------------------------------- + + +@dataclass +class OutboundMessage: + """A reply produced by an agent, ready for delivery.""" + + address: PlatformAddress + text: str + attachments: list[Attachment] = field(default_factory=list) + reply_to_message_id: str | None = None + + +@dataclass +class ActionOption: + """One choice in an action card.""" + + id: str + label: str + style: Literal["default", "primary", "destructive"] = "default" + + +@dataclass +class ActionCard: + """An interactive prompt requiring a user decision. + + Used for Cowork-style permission gates: when an agent attempts a gated + operation (file deletion, network egress to a non-allowlisted host, + spending money, etc.), the router holds the call and emits an action + card via the originating adapter. The agent resumes only after the + user clicks an option. + + Attributes: + question_id: Stable identifier; the corresponding + :class:`ActionResponse` references it. + prompt: Human-readable question. + options: Choices presented to the user. + timeout_seconds: If the user doesn't respond within this window, + the gated call fails closed (denied). ``None`` = wait forever. + """ + + question_id: str + prompt: str + options: list[ActionOption] + timeout_seconds: int | None = 300 + + +@dataclass +class ActionResponse: + """User's response to an :class:`ActionCard`.""" + + question_id: str + selected_option_id: str + user_id: str | None = None + timestamp: datetime | None = None + + +# --------------------------------------------------------------------------- +# Adapter protocol +# --------------------------------------------------------------------------- + + +@dataclass +class ChannelSetup: + """Callbacks handed to an adapter at startup. + + The adapter retains references to these callbacks and invokes them as + platform events arrive. + """ + + on_inbound: Callable[[InboundEvent], Awaitable[None]] + on_metadata: Callable[[PlatformAddress, dict[str, Any]], Awaitable[None]] + """Called when the adapter learns metadata about a conversation + (display name, member count, group/DM flag). The router persists this + for routing decisions and observability.""" + on_action_response: Callable[[ActionResponse], Awaitable[None]] + + +@runtime_checkable +class ChannelAdapter(Protocol): + """Structural interface every channel adapter must satisfy. + + Adapters are instantiated by :mod:`anton.core.dispatch.registry` at + startup and given a :class:`ChannelSetup` to wire up callbacks. They + run for the lifetime of the dispatch process. + """ + + @property + def channel_type(self) -> str: + """Stable adapter name. Used in :class:`PlatformAddress`.""" + ... + + async def setup(self, setup: ChannelSetup) -> None: + """Begin listening for platform events and remember the callbacks.""" + ... + + async def shutdown(self) -> None: + """Cleanly stop listening; release platform resources.""" + ... + + async def deliver(self, message: OutboundMessage) -> None: + """Send a reply back to the platform.""" + ... + + async def show_action_card( + self, + address: PlatformAddress, + card: ActionCard, + ) -> None: + """Render an interactive prompt in the originating channel. + + The adapter chooses the platform-native rendering (Telegram inline + keyboard, Slack block kit, Discord button row, CLI numbered prompt). + """ + ... diff --git a/anton/core/dispatch/channels/__init__.py b/anton/core/dispatch/channels/__init__.py new file mode 100644 index 00000000..3c08db3d --- /dev/null +++ b/anton/core/dispatch/channels/__init__.py @@ -0,0 +1,7 @@ +"""Channel adapter implementations. + +Each module here is a self-contained :class:`ChannelAdapter` with its +own credentials and platform SDK. Native adapters (CLI, Telegram, Slack, +…) live in this package; bridge adapters that wrap external SDKs follow +the same layout. +""" diff --git a/anton/core/dispatch/channels/cli.py b/anton/core/dispatch/channels/cli.py new file mode 100644 index 00000000..95a24797 --- /dev/null +++ b/anton/core/dispatch/channels/cli.py @@ -0,0 +1,221 @@ +"""CLI channel adapter — terminal-friendly transport for the dispatcher. + +Two operating modes: + + - **Programmatic** (default): Other code calls :meth:`feed_message` to + inject inbound events and reads :attr:`delivered` to inspect + outbound replies. This is the workhorse for tests and for embedded + use (notebook cells, scripted demos). + - **Interactive**: :func:`run_interactive` pipes stdin/stdout to the + same adapter, giving you a REPL where each line is a chat message + and replies print back. Action cards render as numbered prompts. + +The CLI adapter is the simplest possible :class:`ChannelAdapter` impl +and serves as the reference for new adapters. It deliberately avoids +async I/O on stdin to keep the example readable. +""" + +from __future__ import annotations + +import asyncio +import sys +import uuid +from datetime import datetime, timezone +from typing import Any + +from anton.core.dispatch.adapter import ( + ActionCard, + ActionResponse, + ChannelAdapter, + ChannelSetup, + InboundEvent, + InboundMessage, + OutboundMessage, + PlatformAddress, +) + + +CLI_CHANNEL_TYPE = "cli" + + +class CliChannelAdapter(ChannelAdapter): + """In-process CLI adapter — feeds inbound events and captures replies. + + Attributes: + delivered: Every :class:`OutboundMessage` the dispatcher sent to + this adapter. Tests assert against this list. + cards: Every :class:`ActionCard` the dispatcher emitted. The + interactive helper picks them up to render numbered prompts; + tests can answer them via :meth:`respond_to_card`. + setup_obj: The :class:`ChannelSetup` handed in at startup. Stored + so :meth:`feed_message` and :meth:`respond_to_card` can call + back into the router. + """ + + channel_type = CLI_CHANNEL_TYPE + + def __init__(self, default_platform_id: str = "local") -> None: + self.default_platform_id = default_platform_id + self.delivered: list[OutboundMessage] = [] + self.cards: list[tuple[PlatformAddress, ActionCard]] = [] + self.setup_obj: ChannelSetup | None = None + + async def setup(self, setup: ChannelSetup) -> None: + """Remember callbacks; the CLI has no platform connection to open.""" + self.setup_obj = setup + + async def shutdown(self) -> None: + """No-op — no sockets to close.""" + self.setup_obj = None + + async def deliver(self, message: OutboundMessage) -> None: + """Capture an outbound message; print it if no listener is registered.""" + self.delivered.append(message) + + async def show_action_card( + self, + address: PlatformAddress, + card: ActionCard, + ) -> None: + """Capture a card; tests resolve it via :meth:`respond_to_card`.""" + self.cards.append((address, card)) + + # ----------------------------------------------------------------- + # Programmatic API + # ----------------------------------------------------------------- + + async def feed_message( + self, + text: str, + *, + platform_id: str | None = None, + thread_id: str | None = None, + sender_id: str | None = "local-user", + sender_name: str | None = "local", + is_mention: bool | None = None, + is_group: bool | None = False, + ) -> None: + """Inject one chat message into the dispatcher. + + Mirrors what a real adapter does on a webhook event: build an + :class:`InboundEvent` and pass it to ``setup_obj.on_inbound``. + """ + if self.setup_obj is None: + raise RuntimeError("CliChannelAdapter not set up; call setup() first") + event = InboundEvent( + address=PlatformAddress( + channel_type=CLI_CHANNEL_TYPE, + platform_id=platform_id or self.default_platform_id, + thread_id=thread_id, + ), + message=InboundMessage( + id=str(uuid.uuid4()), + content={"text": text}, + timestamp=datetime.now(timezone.utc), + kind="chat", + sender_id=sender_id, + sender_name=sender_name, + is_mention=is_mention, + is_group=is_group, + ), + ) + await self.setup_obj.on_inbound(event) + + async def respond_to_card( + self, + question_id: str, + option_id: str, + *, + user_id: str = "local-user", + ) -> None: + """Answer a previously-emitted :class:`ActionCard`. + + Looks up the card, calls back into the router via + ``setup_obj.on_action_response``, and removes it from + :attr:`cards`. + """ + if self.setup_obj is None: + raise RuntimeError("CliChannelAdapter not set up; call setup() first") + # Drop the card from our queue if present (best-effort). + self.cards = [(a, c) for (a, c) in self.cards if c.question_id != question_id] + await self.setup_obj.on_action_response( + ActionResponse( + question_id=question_id, + selected_option_id=option_id, + user_id=user_id, + timestamp=datetime.now(timezone.utc), + ) + ) + + def drain_delivered(self) -> list[OutboundMessage]: + """Return all captured outbound messages and clear the buffer.""" + out, self.delivered = self.delivered, [] + return out + + +# --------------------------------------------------------------------------- +# Optional interactive REPL helper +# --------------------------------------------------------------------------- + + +async def run_interactive( + adapter: CliChannelAdapter, + *, + prompt: str = "you> ", + print_replies: bool = True, +) -> None: + """Run a tiny stdin/stdout REPL on top of ``adapter``. + + Each line typed becomes a chat message. Replies (already captured by + :meth:`CliChannelAdapter.deliver`) are flushed after each turn. + Action cards prompt for ``approve`` / ``deny`` (or any option id). + + This is a convenience for demos — production CLIs will want richer + rendering (color, multi-line input, history) but the protocol is + the same. + """ + loop = asyncio.get_event_loop() + while True: + # Drain pending action cards first. + while adapter.cards: + address, card = adapter.cards.pop(0) + print(f"\n[gate] {card.prompt}") + for opt in card.options: + print(f" - {opt.id}: {opt.label}") + choice = await loop.run_in_executor(None, lambda: input("choice> ").strip()) + await adapter.respond_to_card(card.question_id, choice or "deny") + + try: + line = await loop.run_in_executor(None, lambda: input(prompt)) + except (EOFError, KeyboardInterrupt): + print() + return + line = line.strip() + if not line: + continue + if line in (":quit", ":exit"): + return + await adapter.feed_message(line) + + # Give the runtime a moment to produce a reply, then flush. + await asyncio.sleep(0.6) + if print_replies: + for msg in adapter.drain_delivered(): + print(f"agent> {msg.text}") + + +def make_cli_adapter_factory(platform_id: str = "local"): + """Return an adapter factory suitable for :func:`register_channel_adapter`. + + Usage:: + + from anton.core.dispatch import register_channel_adapter + from anton.core.dispatch.channels.cli import make_cli_adapter_factory + + register_channel_adapter("cli", make_cli_adapter_factory()) + """ + + async def _factory() -> ChannelAdapter: + return CliChannelAdapter(default_platform_id=platform_id) + + return _factory diff --git a/anton/core/dispatch/entities.py b/anton/core/dispatch/entities.py new file mode 100644 index 00000000..3d2d8310 --- /dev/null +++ b/anton/core/dispatch/entities.py @@ -0,0 +1,170 @@ +"""Entity model — agent groups, messaging groups, and the wiring between them. + +The dispatch system separates **what an agent is** (an agent group with a +filesystem, memory, CLAUDE.md, and policy) from **where it's reachable** +(a messaging group: a specific Telegram chat, Slack channel, or CLI +session). The wiring between them — :class:`MessagingGroupAgent` — encodes +the *isolation mode*, lifted from nanoclaw's three-level model. + +Three isolation modes: + + 1. **Shared session** — multiple messaging groups feed one conversation. + Webhook + chat use case (GitHub events arriving alongside Slack + discussion). Session lookup ignores the messaging group. + 2. **Same agent, separate sessions** — one agent identity, independent + threads. Personal multi-platform use case. Workspace and memory are + shared; conversation context is not. + 3. **Separate agent groups** — full isolation. Different memory, CLAUDE.md, + workspace, and container. The cross-channel privacy boundary. + +The first two are configurations of the same agent group; the third is +simply two agent groups that don't know about each other. So the entity +model only needs the first two as enum values — separate-agent-groups is +implicit in having distinct ``agent_group_id`` values. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path + +from anton.core.dispatch.policy import PermissionPolicy + + +class SessionMode(str, Enum): + """How sessions are resolved within an agent group.""" + + AGENT_SHARED = "agent-shared" + """All messaging groups wired to this agent group share one session. + Used for shared-session isolation (level 1).""" + + PER_MESSAGING_GROUP = "per-messaging-group" + """Each messaging group gets its own session within the agent group. + Used for same-agent-separate-sessions isolation (level 2). Default.""" + + PER_THREAD = "per-thread" + """Each ``(messaging_group, thread_id)`` pair gets its own session. + Useful for platforms with first-class threads (Slack, Discord forum + channels) when each thread should have independent context.""" + + +class TriggerRule(str, Enum): + """When the router considers a message worth waking the agent for.""" + + ALWAYS = "always" + """Every inbound message triggers the agent. Right for DMs and + dedicated bot channels.""" + + MENTION_ONLY = "mention-only" + """Only messages where ``is_mention`` is ``True`` (or the agent's + name appears, as a fallback) trigger the agent. Right for shared + channels where the bot is a participant, not the focus.""" + + REGEX = "regex" + """A custom regex must match the message content. ``trigger_pattern`` + on the wiring carries the pattern.""" + + +@dataclass +class AgentGroup: + """An agent identity — the unit of memory, workspace, and policy. + + Attributes: + id: Stable identifier, used as the directory name under + ``groups/`` and as the routing key for sessions. + name: Human display name. Used in mention-matching fallback when + the platform doesn't supply ``is_mention``. + workspace: Filesystem root for this agent's CLAUDE.md, skills, + and any persistent files. + policy: Permission policy governing what this agent may do. + created_at: Creation timestamp. + """ + + id: str + name: str + workspace: Path + policy: PermissionPolicy = field(default_factory=PermissionPolicy) + created_at: datetime | None = None + + +@dataclass +class MessagingGroup: + """A specific conversation on a platform — a chat ID, channel ID, etc. + + The ``(channel_type, platform_id)`` pair is the natural key. ``thread_id`` + is *not* part of the key here — threads are resolved at session time + based on the wiring's :class:`SessionMode`. + + Attributes: + id: Internal stable ID (router uses this for joins). + channel_type: Adapter name (``"telegram"``, ``"slack"``). + platform_id: Platform's own conversation identifier. + display_name: Human-readable name learned via ``on_metadata``. + May be ``None`` until the adapter reports it. + is_group: ``True`` for group/channel, ``False`` for DM. + created_at: Creation timestamp. + """ + + id: str + channel_type: str + platform_id: str + display_name: str | None = None + is_group: bool | None = None + created_at: datetime | None = None + + +@dataclass +class MessagingGroupAgent: + """Wiring: a messaging group routed to an agent group. + + A given messaging group may be wired to multiple agent groups (one + Slack channel hosting two different bots), and an agent group may be + wired to multiple messaging groups (one bot present in many places). + The cross-product is the *messaging_group_agents* table. + + Attributes: + messaging_group_id: FK to :class:`MessagingGroup`. + agent_group_id: FK to :class:`AgentGroup`. + session_mode: How sessions are resolved for this wiring. + trigger_rule: When inbound messages wake this agent. + trigger_pattern: Regex source when ``trigger_rule == REGEX``. + priority: When multiple wirings match, lower numbers win. Lets + you have a "default" agent (priority=100) plus a specialist + (priority=10) that handles only its regex. + """ + + messaging_group_id: str + agent_group_id: str + session_mode: SessionMode = SessionMode.PER_MESSAGING_GROUP + trigger_rule: TriggerRule = TriggerRule.ALWAYS + trigger_pattern: str | None = None + priority: int = 100 + + +@dataclass +class Session: + """A live conversation with persistent message history. + + Sessions are derived from wirings — you don't create them directly, + the router resolves or creates them when an inbound event arrives. + + Attributes: + id: Stable session identifier. + agent_group_id: Which agent owns this session. + session_key: The deterministic key used to resolve this session + (``"agent:"`` for shared, ``"mg:"`` for per-MG, + ``"mg::thread:"`` for per-thread). Lets the router + recover the same session on restart. + store_path: Filesystem path to the per-session message store. + created_at: Creation timestamp. + last_active_at: Updated whenever a message is appended. + """ + + id: str + agent_group_id: str + session_key: str + store_path: Path + created_at: datetime | None = None + last_active_at: datetime | None = None diff --git a/anton/core/dispatch/local_runtime.py b/anton/core/dispatch/local_runtime.py new file mode 100644 index 00000000..522f7865 --- /dev/null +++ b/anton/core/dispatch/local_runtime.py @@ -0,0 +1,421 @@ +"""Runtime orchestration — wakes per-session agent runtimes. + +A :class:`RuntimeOrchestrator` is the dispatch system's hand-off point to +the actual agent runtime. The router calls :meth:`wake` after appending a +new ``messages_in`` row; the orchestrator ensures *something* is running +that will pick it up. + +Two implementations ship: + + - :class:`InProcessRuntimeOrchestrator` — a pluggable in-process loop. + Useful for tests, the CLI, and embedded deployments. Takes a + user-supplied async callable that produces a reply; runs once per + session in a background task. + - :class:`LocalScratchpadOrchestrator` — bridges to Anton's + :func:`anton.core.runtime.build_chat_session`. Each dispatch session + maps to one cached :class:`ChatSession` so conversation memory and + vault credentials persist across inbound messages. + +Both share the same :class:`RuntimeOrchestrator` Protocol from +:mod:`anton.core.dispatch.router`. Cloud deployments can supply a third +implementation that talks to scratchpad_service over HTTP. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from typing import Any + +from anton.core.dispatch.entities import AgentGroup, Session +from anton.core.dispatch.router import RuntimeOrchestrator +from anton.core.dispatch.session_store import SessionStoreProtocol + +logger = logging.getLogger(__name__) + + +# Agent callable contract. Given (agent_group, session, content_dict), +# return a string reply or ``None`` to suppress. +AgentCallable = Callable[[AgentGroup, Session, Any], Awaitable[str | None]] + + +class InProcessRuntimeOrchestrator(RuntimeOrchestrator): + """Background-task orchestrator for in-process agents. + + Each session gets one long-running task that polls its store for + undelivered ``messages_in`` rows, calls ``agent_fn``, and appends the + reply to ``messages_out``. The router's :meth:`deliver` side then + pushes those replies out via the originating adapter. + + Attributes: + agent_fn: User-supplied async callable that produces replies. + store_opener: Async callable returning a session store; the + orchestrator uses this so it doesn't depend on the repository + directly. The router wires this up at construction time. + poll_interval_s: Sleep between empty polls. Loops are otherwise + event-driven via :meth:`wake`'s wakeup signal. + """ + + def __init__( + self, + agent_fn: AgentCallable, + store_opener: Callable[[Session], Awaitable[SessionStoreProtocol]], + *, + poll_interval_s: float = 0.5, + ) -> None: + self.agent_fn = agent_fn + self.store_opener = store_opener + self.poll_interval_s = poll_interval_s + self._tasks: dict[str, asyncio.Task[None]] = {} + self._wakeups: dict[str, asyncio.Event] = {} + self._stops: dict[str, asyncio.Event] = {} + + async def wake(self, session: Session, agent_group: AgentGroup) -> None: + """Ensure a runtime task exists for this session and notify it.""" + wakeup = self._wakeups.get(session.id) + if wakeup is None: + wakeup = asyncio.Event() + stop = asyncio.Event() + self._wakeups[session.id] = wakeup + self._stops[session.id] = stop + self._tasks[session.id] = asyncio.create_task( + self._run(session, agent_group, wakeup, stop), + name=f"dispatch-runtime-{session.id}", + ) + wakeup.set() + + async def stop(self, session: Session) -> None: + """Signal the per-session task to stop and await its exit.""" + stop = self._stops.pop(session.id, None) + wakeup = self._wakeups.pop(session.id, None) + task = self._tasks.pop(session.id, None) + if stop is not None: + stop.set() + if wakeup is not None: + wakeup.set() + if task is not None: + try: + await asyncio.wait_for(task, timeout=5.0) + except asyncio.TimeoutError: + task.cancel() + + async def stop_all(self) -> None: + """Stop every running session task. Used at dispatcher shutdown.""" + for sid in list(self._tasks): + stop = self._stops.get(sid) + wakeup = self._wakeups.get(sid) + if stop is not None: + stop.set() + if wakeup is not None: + wakeup.set() + tasks = list(self._tasks.values()) + self._tasks.clear() + self._wakeups.clear() + self._stops.clear() + for t in tasks: + try: + await asyncio.wait_for(t, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + t.cancel() + + # ----------------------------------------------------------------- + # Internal loop + # ----------------------------------------------------------------- + + async def _run( + self, + session: Session, + agent_group: AgentGroup, + wakeup: asyncio.Event, + stop: asyncio.Event, + ) -> None: + """Per-session loop: drain messages_in → invoke agent → append messages_out.""" + store = await self.store_opener(session) + try: + while not stop.is_set(): + # Drain pending inbound rows. + rows = store.read_undelivered("in") + for row in rows: + if stop.is_set(): + break + try: + reply = await self.agent_fn(agent_group, session, row.content) + except Exception as e: + reply = f"[agent error] {e!r}" + finally: + store.mark_delivered(row.rowid, "in") + if reply: + store.append("out", "reply", {"text": reply}) + + if stop.is_set(): + break + + # Wait for next wakeup or the poll fallback. + wakeup.clear() + try: + await asyncio.wait_for(wakeup.wait(), timeout=self.poll_interval_s) + except asyncio.TimeoutError: + pass + finally: + try: + store.close() + except Exception: + pass + + +def _extract_text(content: Any) -> str: + """Pull a user-visible text string out of a stored ``messages_in`` row. + + The router stores inbound messages as a dict:: + + {"id": ..., "content": , "sender_id": ..., "sender_name": ..., + "thread_id": ...} + + where ``content`` is whatever the channel adapter handed in. Most + channels send a string; some send a dict (e.g. Chat-SDK style). + Anything non-string falls back to ``repr`` so the agent at least sees + *something* it can reason about instead of crashing on type mismatch. + """ + payload = content + if isinstance(payload, dict): + inner = payload.get("content") + if inner is not None: + payload = inner + if isinstance(payload, str): + return payload + return repr(payload) + + +class LocalScratchpadOrchestrator(RuntimeOrchestrator): + """Bridges dispatch sessions to Anton's :class:`ChatSession`. + + One :class:`ChatSession` is built per dispatch session and cached for + the lifetime of the runtime task — conversation memory, vault env + injection, and history all persist across inbound messages. + + Each inbound row drains through ``ChatSession.turn_stream(text)``; + text deltas are concatenated into a single user-visible reply written + to ``messages_out`` (kind ``"reply"``). Tool-result / progress events + are forwarded as separate rows (kind ``"activity"``) so dashboards + that care about agent internals can render them — channel adapters + typically ignore non-``"reply"`` rows. + + Parameters + ---------- + store_opener + Async callable returning the session's SQLite store. Wired by the + router at construction time. + extra_tools + Tools added on top of the Anton defaults (e.g. publish, datasource + connect). Defaults to none. + system_prompt_suffix + Free-form text appended to the system prompt — hosts use this to + describe their UI affordances. + poll_interval_s + Sleep between empty polls. Otherwise event-driven via :meth:`wake`. + emit_activity_rows + Write tool-result and progress events to ``messages_out`` as + ``"activity"`` rows. Set False to keep the outbox text-only. + """ + + def __init__( + self, + store_opener: Callable[[Session], Awaitable[SessionStoreProtocol]], + *, + extra_tools: list[Any] | None = None, + system_prompt_suffix: str | None = None, + poll_interval_s: float = 0.5, + emit_activity_rows: bool = True, + ) -> None: + self.store_opener = store_opener + self.extra_tools = extra_tools + self.system_prompt_suffix = system_prompt_suffix + self.poll_interval_s = poll_interval_s + self.emit_activity_rows = emit_activity_rows + self._tasks: dict[str, asyncio.Task[None]] = {} + self._wakeups: dict[str, asyncio.Event] = {} + self._stops: dict[str, asyncio.Event] = {} + # One ChatSession per dispatch session.id, lazily built on first inbound. + self._chat_sessions: dict[str, Any] = {} + + async def wake(self, session: Session, agent_group: AgentGroup) -> None: + """Ensure a runtime task exists for this session and notify it.""" + wakeup = self._wakeups.get(session.id) + if wakeup is None: + wakeup = asyncio.Event() + stop = asyncio.Event() + self._wakeups[session.id] = wakeup + self._stops[session.id] = stop + self._tasks[session.id] = asyncio.create_task( + self._run(session, agent_group, wakeup, stop), + name=f"dispatch-scratchpad-{session.id}", + ) + wakeup.set() + + async def stop(self, session: Session) -> None: + """Signal the per-session task to stop and await its exit.""" + stop = self._stops.pop(session.id, None) + wakeup = self._wakeups.pop(session.id, None) + task = self._tasks.pop(session.id, None) + self._chat_sessions.pop(session.id, None) + if stop is not None: + stop.set() + if wakeup is not None: + wakeup.set() + if task is not None: + try: + await asyncio.wait_for(task, timeout=5.0) + except asyncio.TimeoutError: + task.cancel() + + async def stop_all(self) -> None: + """Stop every running session task. Used at dispatcher shutdown.""" + for sid in list(self._tasks): + stop = self._stops.get(sid) + wakeup = self._wakeups.get(sid) + if stop is not None: + stop.set() + if wakeup is not None: + wakeup.set() + tasks = list(self._tasks.values()) + self._tasks.clear() + self._wakeups.clear() + self._stops.clear() + self._chat_sessions.clear() + for t in tasks: + try: + await asyncio.wait_for(t, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + t.cancel() + + # ----------------------------------------------------------------- + # Internal: build / cache the Anton ChatSession + # ----------------------------------------------------------------- + + async def _get_chat_session( + self, + session: Session, + agent_group: AgentGroup, + ) -> Any: + cached = self._chat_sessions.get(session.id) + if cached is not None: + return cached + + # Lazy import keeps the dispatch module importable on hosts where + # Anton's heavy deps aren't installed (e.g. router-only test rigs). + from anton.core.runtime import build_chat_session + + chat = await build_chat_session( + session_id=session.id, + workspace_path=str(agent_group.workspace) if agent_group.workspace else None, + extra_tools=self.extra_tools, + system_prompt_suffix=self.system_prompt_suffix, + ) + self._chat_sessions[session.id] = chat + return chat + + # ----------------------------------------------------------------- + # Internal: per-session loop + # ----------------------------------------------------------------- + + async def _run( + self, + session: Session, + agent_group: AgentGroup, + wakeup: asyncio.Event, + stop: asyncio.Event, + ) -> None: + """Per-session loop: drain messages_in → run turn_stream → append messages_out.""" + store = await self.store_opener(session) + try: + while not stop.is_set(): + rows = store.read_undelivered("in") + for row in rows: + if stop.is_set(): + break + text = _extract_text(row.content) + try: + reply = await self._run_turn(session, agent_group, store, text) + except Exception as exc: + logger.exception( + "ChatSession turn failed for session %s", session.id + ) + reply = self._safe_error_message(exc) + finally: + store.mark_delivered(row.rowid, "in") + if reply: + store.append("out", "reply", {"text": reply}) + + if stop.is_set(): + break + + wakeup.clear() + try: + await asyncio.wait_for(wakeup.wait(), timeout=self.poll_interval_s) + except asyncio.TimeoutError: + pass + finally: + try: + store.close() + except Exception: + pass + + async def _run_turn( + self, + session: Session, + agent_group: AgentGroup, + store: SessionStoreProtocol, + text: str, + ) -> str: + """Run one ChatSession turn, collect deltas, optionally emit activity rows.""" + from anton.core.llm.provider import ( + StreamComplete, + StreamContextCompacted, + StreamTaskProgress, + StreamTextDelta, + StreamToolResult, + ) + + chat = await self._get_chat_session(session, agent_group) + parts: list[str] = [] + + async for event in chat.turn_stream(text): + if isinstance(event, StreamTextDelta): + parts.append(event.text) + elif isinstance(event, StreamTaskProgress): + if self.emit_activity_rows: + store.append("out", "activity", { + "kind": "progress", + "phase": event.phase, + "message": event.message, + }) + elif isinstance(event, StreamToolResult): + if self.emit_activity_rows: + store.append("out", "activity", { + "kind": "tool_result", + "name": event.name, + "action": event.action, + "content": event.content, + }) + elif isinstance(event, StreamContextCompacted): + if self.emit_activity_rows: + store.append("out", "activity", { + "kind": "context", + "message": event.message, + }) + elif isinstance(event, StreamComplete): + # End-of-turn marker — nothing to write; the loop falls through + # and the consolidated reply gets appended by `_run`. + pass + + return "".join(parts).strip() + + @staticmethod + def _safe_error_message(exc: Exception) -> str: + """Render an exception as a user-facing error with API keys redacted.""" + try: + from anton.core.runtime import safe_redact_error + return f"[agent error] {safe_redact_error(exc)}" + except Exception: + return f"[agent error] {exc!r}" diff --git a/anton/core/dispatch/policy.py b/anton/core/dispatch/policy.py new file mode 100644 index 00000000..99b641ce --- /dev/null +++ b/anton/core/dispatch/policy.py @@ -0,0 +1,241 @@ +"""Permission policy — Cowork-style safety gates per agent group. + +Anton Dispatch wraps every inbound message with a :class:`PermissionPolicy`. +The policy is a small, declarative description of what an agent group is +allowed to do; the router consults it before forwarding gated tool calls +to the runtime, and emits an :class:`ActionCard` when user approval is +required. + +Modeled on Claude Cowork's safety primitives (selective file access, +network allowlist, "Act without asking" mode, deletion protection, +scheduled-task caution). The mapping: + + - File scopes ↔ Cowork's "be selective about file access" + - Network allowlist ↔ Cowork's network egress restrictions + - Act-without-asking flag ↔ Cowork's "Act without asking" mode + - Destructive-action gate ↔ Cowork's deletion permission prompt + - Scheduled-task constraints ↔ Cowork's scheduled-task guidance + +Policies are *per agent group*, not per session — they describe the +agent's standing capability, not a particular conversation. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Literal + + +class GateDecision(str, Enum): + """Outcome of evaluating a policy against a proposed action.""" + + ALLOW = "allow" + """Action is permitted; runtime may proceed.""" + + DENY = "deny" + """Action is forbidden; runtime is told the call failed (closed).""" + + PROMPT = "prompt" + """Action is gated; router emits an ActionCard and pauses the runtime.""" + + +@dataclass(frozen=True) +class FileScope: + """A filesystem permission grant. + + Attributes: + path: Directory or file the agent may access. Subpaths inherit + unless an explicit deny scope shadows them. + mode: ``"read"``, ``"write"``, or ``"read-write"``. ``"write"`` + implies the ability to create new files but **not** to delete + existing ones — deletion always goes through the destructive + gate. + """ + + path: str + mode: Literal["read", "write", "read-write"] = "read" + + +@dataclass +class PermissionPolicy: + """Declarative capability set for an agent group. + + Defaults are conservative — a freshly-created agent group with no + explicit policy can only read its own workspace and respond in chat. + Every escalation is opt-in. + + Attributes: + file_scopes: Filesystem grants. Empty = workspace only. + network_allowlist: Hostnames/domains the agent may reach via + network egress tools. Empty = none. Matches both exact host + and subdomains (``"github.com"`` matches ``"api.github.com"``). + Note: web fetch / web search run server-side and are not + governed here, matching Cowork's carve-out. + mcp_allowlist: MCP server names this agent group may invoke. + Empty = none. + act_without_asking: When ``True``, the runtime does not pause + between tool calls for confirmation. Off by default. Should + only be enabled when the user is actively supervising and + working with trusted inputs. + require_approval_for_destructive: When ``True`` (the default), + destructive actions (file deletion, irreversible API calls) + emit an ActionCard regardless of ``act_without_asking``. + scheduled_dispatch_allowed: Whether this agent group may be + triggered by scheduled inbound. Off by default per Cowork's + "start simple" guidance. + scheduled_destructive_blocked: When ``True`` (the default), + scheduled inbound can never escalate destructive gates — + they auto-deny without prompting, since the user isn't there + to click. + """ + + file_scopes: list[FileScope] = field(default_factory=list) + network_allowlist: list[str] = field(default_factory=list) + mcp_allowlist: list[str] = field(default_factory=list) + act_without_asking: bool = False + require_approval_for_destructive: bool = True + scheduled_dispatch_allowed: bool = False + scheduled_destructive_blocked: bool = True + + +@dataclass +class ProposedAction: + """An action the runtime wants to take, evaluated by the policy. + + The dispatcher inspects pending tool calls before they execute and + constructs a :class:`ProposedAction`. The policy gate returns a + :class:`GateDecision`. For ``PROMPT`` decisions, the gate also + supplies a human-readable reason that becomes the ActionCard prompt. + """ + + kind: Literal[ + "file_read", + "file_write", + "file_delete", + "network_egress", + "mcp_call", + "shell_exec", + "scheduled_trigger", + ] + target: str + """What is being acted on — a path, hostname, MCP name, command, etc.""" + + is_destructive: bool = False + is_scheduled_context: bool = False + + +@dataclass +class GateResult: + """Outcome of :func:`evaluate`.""" + + decision: GateDecision + reason: str = "" + """Human-readable explanation. Surfaced in ActionCard prompts and + in deny messages returned to the agent.""" + + +def evaluate(policy: PermissionPolicy, action: ProposedAction) -> GateResult: + """Evaluate a proposed action against a policy. + + Returns one of: + - :attr:`GateDecision.ALLOW` — runtime proceeds silently. + - :attr:`GateDecision.DENY` — runtime is told the call failed. + - :attr:`GateDecision.PROMPT` — router emits an ActionCard. + + Pure function with no side effects; safe to call repeatedly. + """ + + # Scheduled-context destructive actions auto-deny when the user + # explicitly asked to block them — there's nobody to click. + if ( + action.is_scheduled_context + and action.is_destructive + and policy.scheduled_destructive_blocked + ): + return GateResult( + GateDecision.DENY, + "Destructive actions blocked in scheduled-dispatch context.", + ) + + if action.kind == "file_delete": + # Deletion always prompts unless explicitly disabled — mirrors + # Cowork's deletion-protection gate. + if policy.require_approval_for_destructive: + return GateResult( + GateDecision.PROMPT, + f"Allow deletion of `{action.target}`?", + ) + return GateResult(GateDecision.ALLOW) + + if action.kind in ("file_read", "file_write"): + if not _path_in_scopes(action.target, policy.file_scopes, action.kind): + return GateResult( + GateDecision.DENY, + f"Path `{action.target}` is outside allowed file scopes.", + ) + return GateResult(GateDecision.ALLOW) + + if action.kind == "network_egress": + if not _host_in_allowlist(action.target, policy.network_allowlist): + return GateResult( + GateDecision.DENY, + f"Host `{action.target}` is not in the network allowlist.", + ) + return GateResult(GateDecision.ALLOW) + + if action.kind == "mcp_call": + if action.target not in policy.mcp_allowlist: + return GateResult( + GateDecision.DENY, + f"MCP server `{action.target}` is not allowlisted.", + ) + return GateResult(GateDecision.ALLOW) + + if action.kind == "shell_exec": + if action.is_destructive and policy.require_approval_for_destructive: + return GateResult( + GateDecision.PROMPT, + f"Run command `{action.target}`?", + ) + if not policy.act_without_asking: + return GateResult( + GateDecision.PROMPT, + f"Run command `{action.target}`?", + ) + return GateResult(GateDecision.ALLOW) + + if action.kind == "scheduled_trigger": + if not policy.scheduled_dispatch_allowed: + return GateResult( + GateDecision.DENY, + "Scheduled dispatch is not enabled for this agent group.", + ) + return GateResult(GateDecision.ALLOW) + + return GateResult(GateDecision.DENY, f"Unknown action kind: {action.kind}") + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _path_in_scopes(path: str, scopes: list[FileScope], kind: str) -> bool: + """Check whether ``path`` falls under any scope with sufficient mode.""" + needs_write = kind == "file_write" + for scope in scopes: + if not (path == scope.path or path.startswith(scope.path.rstrip("/") + "/")): + continue + if needs_write and scope.mode == "read": + continue + return True + return False + + +def _host_in_allowlist(host: str, allowlist: list[str]) -> bool: + """Check whether ``host`` matches any entry, including subdomain matches.""" + for entry in allowlist: + if host == entry or host.endswith("." + entry): + return True + return False diff --git a/anton/core/dispatch/registry.py b/anton/core/dispatch/registry.py new file mode 100644 index 00000000..318263a8 --- /dev/null +++ b/anton/core/dispatch/registry.py @@ -0,0 +1,140 @@ +"""Channel adapter registry — discovery and lifecycle. + +Adapters self-register at import time via :func:`register_channel_adapter`. +The dispatch service calls :func:`init_channel_adapters` at startup, which +instantiates every registered adapter, hands it a :class:`ChannelSetup`, +and retains the live instance for outbound delivery. + +Registration is by ``channel_type`` (the adapter's own stable name). At +most one adapter per ``channel_type`` may be active at a time — the second +``register_channel_adapter("telegram", …)`` call overwrites the first, +matching nanoclaw's behavior and supporting hot-reload during development. + +Adapter factories may return ``None`` to indicate missing or invalid +credentials; the registry skips those without crashing the dispatcher. +This lets you ship a build with telegram + slack + discord adapters all +linked in but only run the ones the user has configured. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from anton.core.dispatch.adapter import ChannelAdapter, ChannelSetup + + +AdapterFactory = Callable[[], Awaitable[ChannelAdapter | None]] + + +@dataclass +class ChannelRegistration: + """Everything the registry needs to instantiate an adapter.""" + + channel_type: str + factory: AdapterFactory + """Async callable returning a ready (but not yet ``setup``-ed) adapter, + or ``None`` if credentials are missing.""" + + +_registry: dict[str, ChannelRegistration] = {} +_active: dict[str, ChannelAdapter] = {} + + +def register_channel_adapter(channel_type: str, factory: AdapterFactory) -> None: + """Register an adapter factory by channel type. + + Called at module import time by adapter modules. Idempotent: re-registering + the same channel type replaces the prior factory, useful for tests and + hot-reload. + """ + _registry[channel_type] = ChannelRegistration(channel_type, factory) + + +def get_active_adapter(channel_type: str) -> ChannelAdapter | None: + """Return the live adapter for a channel type, or ``None`` if not running.""" + return _active.get(channel_type) + + +def get_active_adapters() -> list[ChannelAdapter]: + """Return all currently-running adapters.""" + return list(_active.values()) + + +def get_registered_channel_types() -> list[str]: + """Return the names of every registered adapter (running or not).""" + return list(_registry.keys()) + + +async def init_channel_adapters( + setup_factory: Callable[[ChannelAdapter], ChannelSetup], + *, + network_retry_delays_s: tuple[float, ...] = (2.0, 5.0, 10.0), +) -> None: + """Instantiate and start every registered adapter. + + Args: + setup_factory: Called once per adapter to build its + :class:`ChannelSetup`. Lets the router inject per-adapter + callbacks (the same router handles all adapters, so we + don't share one ``ChannelSetup``). + network_retry_delays_s: Backoff delays for transient network + failures during ``setup``. Misconfigurations (bad tokens) + still fail fast — only ``OSError``-like network errors + trigger retries. + """ + for channel_type, registration in _registry.items(): + try: + adapter = await registration.factory() + except Exception as e: + # Factory failed entirely — log and continue. Don't take down + # the dispatcher because one adapter has a bug. + _log_warn(f"adapter factory failed: {channel_type}: {e!r}") + continue + + if adapter is None: + _log_warn(f"adapter credentials missing, skipping: {channel_type}") + continue + + setup = setup_factory(adapter) + for attempt, delay in enumerate((0.0, *network_retry_delays_s)): + if delay: + await asyncio.sleep(delay) + try: + await adapter.setup(setup) + _active[channel_type] = adapter + break + except OSError as e: + if attempt == len(network_retry_delays_s): + _log_warn( + f"adapter setup gave up after retries: {channel_type}: {e!r}" + ) + break + except Exception as e: + _log_warn(f"adapter setup failed (no retry): {channel_type}: {e!r}") + break + + +async def shutdown_channel_adapters() -> None: + """Cleanly stop every active adapter.""" + for adapter in list(_active.values()): + try: + await adapter.shutdown() + except Exception as e: + _log_warn(f"adapter shutdown error: {adapter.channel_type}: {e!r}") + _active.clear() + + +def _reset_for_tests() -> None: + """Clear the registry. Test-only helper.""" + _registry.clear() + _active.clear() + + +def _log_warn(msg: str) -> None: + """Lightweight logging hook — replaced by the host logger when wired.""" + # Defer to the standard library so this module has no required deps. + import logging + + logging.getLogger("anton.dispatch.registry").warning(msg) diff --git a/anton/core/dispatch/repository.py b/anton/core/dispatch/repository.py new file mode 100644 index 00000000..1e0aa4cb --- /dev/null +++ b/anton/core/dispatch/repository.py @@ -0,0 +1,497 @@ +"""Concrete :class:`DispatchRepository` backed by a central SQLite database. + +The central DB holds the entity tables — agent groups, messaging groups, +the ``messaging_group_agents`` wiring table, and a sessions index. It does +**not** hold messages; those live in per-session stores +(:class:`anton.core.dispatch.session_store.SQLiteSessionStore`). + +Schema: + + agent_groups (id, name, workspace, policy_json, created_at) + messaging_groups (id, channel_type, platform_id, display_name, + is_group, created_at, + UNIQUE(channel_type, platform_id)) + messaging_group_agents + (messaging_group_id, agent_group_id, session_mode, + trigger_rule, trigger_pattern, priority, + PRIMARY KEY(messaging_group_id, agent_group_id)) + sessions (id, agent_group_id, session_key, store_path, + created_at, last_active_at, + UNIQUE(agent_group_id, session_key)) + +Sessions are keyed by ``(agent_group_id, session_key)`` where +``session_key`` is derived from the wiring's :class:`SessionMode`. +""" + +from __future__ import annotations + +import dataclasses +import json +import sqlite3 +import uuid +from datetime import datetime, timezone +from pathlib import Path + +from anton.core.dispatch.adapter import PlatformAddress +from anton.core.dispatch.entities import ( + AgentGroup, + MessagingGroup, + MessagingGroupAgent, + Session, + SessionMode, + TriggerRule, +) +from anton.core.dispatch.policy import FileScope, PermissionPolicy +from anton.core.dispatch.router import DispatchRepository +from anton.core.dispatch.session_store import ( + SessionStoreProtocol, + SQLiteSessionStore, +) + + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS agent_groups ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + workspace TEXT NOT NULL, + policy_json TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS messaging_groups ( + id TEXT PRIMARY KEY, + channel_type TEXT NOT NULL, + platform_id TEXT NOT NULL, + display_name TEXT, + is_group INTEGER, + created_at TEXT NOT NULL, + UNIQUE(channel_type, platform_id) +); + +CREATE TABLE IF NOT EXISTS messaging_group_agents ( + messaging_group_id TEXT NOT NULL, + agent_group_id TEXT NOT NULL, + session_mode TEXT NOT NULL, + trigger_rule TEXT NOT NULL, + trigger_pattern TEXT, + priority INTEGER NOT NULL DEFAULT 100, + PRIMARY KEY (messaging_group_id, agent_group_id), + FOREIGN KEY (messaging_group_id) REFERENCES messaging_groups(id), + FOREIGN KEY (agent_group_id) REFERENCES agent_groups(id) +); + +CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + agent_group_id TEXT NOT NULL, + session_key TEXT NOT NULL, + store_path TEXT NOT NULL, + created_at TEXT NOT NULL, + last_active_at TEXT NOT NULL, + UNIQUE(agent_group_id, session_key), + FOREIGN KEY (agent_group_id) REFERENCES agent_groups(id) +); + +CREATE INDEX IF NOT EXISTS idx_mga_mg ON messaging_group_agents (messaging_group_id, priority); +""" + + +class SqliteDispatchRepository(DispatchRepository): + """File-based dispatch repository. + + All methods are ``async`` to satisfy the :class:`DispatchRepository` + protocol, but SQLite work runs synchronously — fine for local + deployments. Cloud backends (Postgres, etc.) implement the same + protocol with truly-async drivers. + """ + + def __init__(self, db_path: Path, sessions_root: Path) -> None: + self.db_path = db_path + self.sessions_root = sessions_root + db_path.parent.mkdir(parents=True, exist_ok=True) + sessions_root.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect( + str(db_path), + isolation_level=None, + check_same_thread=False, + ) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute("PRAGMA foreign_keys=ON") + self._conn.executescript(_SCHEMA) + + # ----------------------------------------------------------------- + # Agent groups + # ----------------------------------------------------------------- + + async def create_agent_group(self, group: AgentGroup) -> AgentGroup: + """Persist a new agent group; idempotent on the ``id`` column.""" + ts = (group.created_at or datetime.now(timezone.utc)).isoformat() + self._conn.execute( + "INSERT OR REPLACE INTO agent_groups " + "(id, name, workspace, policy_json, created_at) VALUES (?, ?, ?, ?, ?)", + ( + group.id, + group.name, + str(group.workspace), + _serialize_policy(group.policy), + ts, + ), + ) + return dataclasses.replace( + group, created_at=datetime.fromisoformat(ts) + ) + + async def get_agent_group(self, agent_group_id: str) -> AgentGroup: + """Load an agent group by id; raises :class:`KeyError` if missing.""" + row = self._conn.execute( + "SELECT id, name, workspace, policy_json, created_at " + "FROM agent_groups WHERE id = ?", + (agent_group_id,), + ).fetchone() + if row is None: + raise KeyError(f"agent_group not found: {agent_group_id}") + return AgentGroup( + id=row[0], + name=row[1], + workspace=Path(row[2]), + policy=_deserialize_policy(row[3]), + created_at=datetime.fromisoformat(row[4]), + ) + + # ----------------------------------------------------------------- + # Messaging groups + # ----------------------------------------------------------------- + + async def get_or_create_messaging_group( + self, + channel_type: str, + platform_id: str, + ) -> MessagingGroup: + """Find an existing messaging group or insert a new one.""" + row = self._conn.execute( + "SELECT id, channel_type, platform_id, display_name, is_group, created_at " + "FROM messaging_groups WHERE channel_type = ? AND platform_id = ?", + (channel_type, platform_id), + ).fetchone() + if row is not None: + return _row_to_messaging_group(row) + + new_id = str(uuid.uuid4()) + ts = datetime.now(timezone.utc).isoformat() + self._conn.execute( + "INSERT INTO messaging_groups (id, channel_type, platform_id, created_at) " + "VALUES (?, ?, ?, ?)", + (new_id, channel_type, platform_id, ts), + ) + return MessagingGroup( + id=new_id, + channel_type=channel_type, + platform_id=platform_id, + created_at=datetime.fromisoformat(ts), + ) + + async def update_messaging_group_metadata( + self, + messaging_group_id: str, + display_name: str | None = None, + is_group: bool | None = None, + ) -> None: + """Patch metadata fields learned via the adapter's ``on_metadata`` callback.""" + sets: list[str] = [] + params: list = [] + if display_name is not None: + sets.append("display_name = ?") + params.append(display_name) + if is_group is not None: + sets.append("is_group = ?") + params.append(1 if is_group else 0) + if not sets: + return + params.append(messaging_group_id) + self._conn.execute( + f"UPDATE messaging_groups SET {', '.join(sets)} WHERE id = ?", + params, + ) + + # ----------------------------------------------------------------- + # Wirings + # ----------------------------------------------------------------- + + async def add_wiring(self, wiring: MessagingGroupAgent) -> None: + """Insert or replace a messaging-group → agent-group wiring.""" + self._conn.execute( + "INSERT OR REPLACE INTO messaging_group_agents " + "(messaging_group_id, agent_group_id, session_mode, " + " trigger_rule, trigger_pattern, priority) " + "VALUES (?, ?, ?, ?, ?, ?)", + ( + wiring.messaging_group_id, + wiring.agent_group_id, + wiring.session_mode.value, + wiring.trigger_rule.value, + wiring.trigger_pattern, + wiring.priority, + ), + ) + + async def get_wirings(self, messaging_group_id: str) -> list[MessagingGroupAgent]: + """Return every wiring for a messaging group, lowest priority first.""" + rows = self._conn.execute( + "SELECT messaging_group_id, agent_group_id, session_mode, " + "trigger_rule, trigger_pattern, priority " + "FROM messaging_group_agents " + "WHERE messaging_group_id = ? ORDER BY priority ASC", + (messaging_group_id,), + ).fetchall() + return [ + MessagingGroupAgent( + messaging_group_id=r[0], + agent_group_id=r[1], + session_mode=SessionMode(r[2]), + trigger_rule=TriggerRule(r[3]), + trigger_pattern=r[4], + priority=r[5], + ) + for r in rows + ] + + # ----------------------------------------------------------------- + # Sessions + # ----------------------------------------------------------------- + + async def resolve_session( + self, + agent_group: AgentGroup, + wiring: MessagingGroupAgent, + address: PlatformAddress, + ) -> Session: + """Find or create the session matching ``wiring.session_mode``.""" + session_key = _session_key_for(wiring, address) + row = self._conn.execute( + "SELECT id, agent_group_id, session_key, store_path, " + "created_at, last_active_at " + "FROM sessions WHERE agent_group_id = ? AND session_key = ?", + (agent_group.id, session_key), + ).fetchone() + + now = datetime.now(timezone.utc).isoformat() + if row is not None: + self._conn.execute( + "UPDATE sessions SET last_active_at = ? WHERE id = ?", + (now, row[0]), + ) + return Session( + id=row[0], + agent_group_id=row[1], + session_key=row[2], + store_path=Path(row[3]), + created_at=datetime.fromisoformat(row[4]), + last_active_at=datetime.fromisoformat(now), + ) + + # Fresh session. + new_id = str(uuid.uuid4()) + store_path = self.sessions_root / agent_group.id / new_id + store_path.mkdir(parents=True, exist_ok=True) + self._conn.execute( + "INSERT INTO sessions " + "(id, agent_group_id, session_key, store_path, created_at, last_active_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (new_id, agent_group.id, session_key, str(store_path), now, now), + ) + return Session( + id=new_id, + agent_group_id=agent_group.id, + session_key=session_key, + store_path=store_path, + created_at=datetime.fromisoformat(now), + last_active_at=datetime.fromisoformat(now), + ) + + async def open_session_store(self, session: Session) -> SessionStoreProtocol: + """Open the per-session SQLite message store.""" + return SQLiteSessionStore(session.store_path / "session.db") + + # ----------------------------------------------------------------- + # Listing / deletion (control-plane API for the dispatch UI) + # ----------------------------------------------------------------- + + async def list_agent_groups(self) -> list[AgentGroup]: + """Every persisted agent group, ordered by name.""" + rows = self._conn.execute( + "SELECT id, name, workspace, policy_json, created_at " + "FROM agent_groups ORDER BY name COLLATE NOCASE" + ).fetchall() + return [ + AgentGroup( + id=r[0], + name=r[1], + workspace=Path(r[2]), + policy=_deserialize_policy(r[3]), + created_at=datetime.fromisoformat(r[4]), + ) + for r in rows + ] + + async def delete_agent_group(self, agent_group_id: str) -> bool: + """Remove an agent group. Fails if any wirings or sessions still reference it.""" + used = self._conn.execute( + "SELECT 1 FROM messaging_group_agents WHERE agent_group_id = ? " + "UNION SELECT 1 FROM sessions WHERE agent_group_id = ? LIMIT 1", + (agent_group_id, agent_group_id), + ).fetchone() + if used is not None: + raise ValueError( + f"agent_group {agent_group_id} still has wirings or sessions; " + "remove those first." + ) + cursor = self._conn.execute( + "DELETE FROM agent_groups WHERE id = ?", (agent_group_id,) + ) + return cursor.rowcount > 0 + + async def list_messaging_groups(self) -> list[MessagingGroup]: + """Every messaging group seen so far. Created lazily on first inbound event.""" + rows = self._conn.execute( + "SELECT id, channel_type, platform_id, display_name, is_group, created_at " + "FROM messaging_groups ORDER BY created_at DESC" + ).fetchall() + return [_row_to_messaging_group(r) for r in rows] + + async def list_wirings(self) -> list[MessagingGroupAgent]: + """Every wiring across all messaging groups.""" + rows = self._conn.execute( + "SELECT messaging_group_id, agent_group_id, session_mode, " + "trigger_rule, trigger_pattern, priority " + "FROM messaging_group_agents ORDER BY priority ASC" + ).fetchall() + return [ + MessagingGroupAgent( + messaging_group_id=r[0], + agent_group_id=r[1], + session_mode=SessionMode(r[2]), + trigger_rule=TriggerRule(r[3]), + trigger_pattern=r[4], + priority=r[5], + ) + for r in rows + ] + + async def delete_wiring( + self, + messaging_group_id: str, + agent_group_id: str, + ) -> bool: + """Remove a single wiring; returns True if a row was deleted.""" + cursor = self._conn.execute( + "DELETE FROM messaging_group_agents " + "WHERE messaging_group_id = ? AND agent_group_id = ?", + (messaging_group_id, agent_group_id), + ) + return cursor.rowcount > 0 + + async def list_sessions( + self, + *, + agent_group_id: str | None = None, + ) -> list[Session]: + """Every session, optionally scoped to one agent group, newest activity first.""" + if agent_group_id is None: + rows = self._conn.execute( + "SELECT id, agent_group_id, session_key, store_path, " + "created_at, last_active_at " + "FROM sessions ORDER BY last_active_at DESC" + ).fetchall() + else: + rows = self._conn.execute( + "SELECT id, agent_group_id, session_key, store_path, " + "created_at, last_active_at " + "FROM sessions WHERE agent_group_id = ? ORDER BY last_active_at DESC", + (agent_group_id,), + ).fetchall() + return [ + Session( + id=r[0], + agent_group_id=r[1], + session_key=r[2], + store_path=Path(r[3]), + created_at=datetime.fromisoformat(r[4]), + last_active_at=datetime.fromisoformat(r[5]), + ) + for r in rows + ] + + async def close(self) -> None: + """Close the underlying SQLite connection.""" + self._conn.close() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _session_key_for( + wiring: MessagingGroupAgent, + address: PlatformAddress, +) -> str: + """Compute the deterministic session_key used to resolve sessions. + + Mirrors the three :class:`SessionMode` values: + + - ``AGENT_SHARED`` → ``"agent"`` (one session per agent group). + - ``PER_MESSAGING_GROUP`` → ``"mg:"``. + - ``PER_THREAD`` → ``"mg::thread:"``, + falling back to per-MG when the platform has no thread. + """ + if wiring.session_mode is SessionMode.AGENT_SHARED: + return "agent" + if wiring.session_mode is SessionMode.PER_THREAD and address.thread_id: + return f"mg:{wiring.messaging_group_id}:thread:{address.thread_id}" + return f"mg:{wiring.messaging_group_id}" + + +def _row_to_messaging_group(row) -> MessagingGroup: + return MessagingGroup( + id=row[0], + channel_type=row[1], + platform_id=row[2], + display_name=row[3], + is_group=bool(row[4]) if row[4] is not None else None, + created_at=datetime.fromisoformat(row[5]), + ) + + +def _serialize_policy(policy: PermissionPolicy) -> str: + return json.dumps( + { + "file_scopes": [ + {"path": s.path, "mode": s.mode} for s in policy.file_scopes + ], + "network_allowlist": policy.network_allowlist, + "mcp_allowlist": policy.mcp_allowlist, + "act_without_asking": policy.act_without_asking, + "require_approval_for_destructive": policy.require_approval_for_destructive, + "scheduled_dispatch_allowed": policy.scheduled_dispatch_allowed, + "scheduled_destructive_blocked": policy.scheduled_destructive_blocked, + } + ) + + +def _deserialize_policy(blob: str) -> PermissionPolicy: + data = json.loads(blob) + return PermissionPolicy( + file_scopes=[ + FileScope(path=s["path"], mode=s["mode"]) + for s in data.get("file_scopes", []) + ], + network_allowlist=list(data.get("network_allowlist", [])), + mcp_allowlist=list(data.get("mcp_allowlist", [])), + act_without_asking=bool(data.get("act_without_asking", False)), + require_approval_for_destructive=bool( + data.get("require_approval_for_destructive", True) + ), + scheduled_dispatch_allowed=bool(data.get("scheduled_dispatch_allowed", False)), + scheduled_destructive_blocked=bool( + data.get("scheduled_destructive_blocked", True) + ), + ) diff --git a/anton/core/dispatch/router.py b/anton/core/dispatch/router.py new file mode 100644 index 00000000..5f428e81 --- /dev/null +++ b/anton/core/dispatch/router.py @@ -0,0 +1,500 @@ +"""Dispatch router — the brain that maps platform events to agent sessions. + +The router sits between channel adapters and the agent runtime. For every +inbound :class:`InboundEvent` it: + + 1. **Resolves the wiring**: looks up which agent groups are wired to the + event's ``(channel_type, platform_id)`` and applies trigger rules + (mention-only, regex, always) and priority. + 2. **Resolves the session**: based on the wiring's :class:`SessionMode` + it picks (or creates) the session whose store will receive the message. + 3. **Persists**: appends the message to ``messages_in`` in the session + store. The agent runtime, polling its store, picks it up. + 4. **Wakes the runtime**: ensures the runtime for that session is up. + If it's already running, the new row is enough. + +The router is also the **safety gate**. When the agent runtime emits a +*proposed action* (a tool call awaiting approval), the router consults +the agent group's :class:`PermissionPolicy` and: + + - :attr:`GateDecision.ALLOW` → forwards the call. + - :attr:`GateDecision.DENY` → returns a synthetic failure to the runtime. + - :attr:`GateDecision.PROMPT` → emits an :class:`ActionCard` via the + originating adapter and pauses the runtime until the user clicks. + +Outbound delivery uses a separate :func:`run_delivery_loop` task that +polls every active session's store for ``messages_out`` rows and dispatches +them through the originating adapter — keeping the runtime decoupled from +network I/O. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field + +from anton.core.dispatch.adapter import ( + ActionCard, + ActionResponse, + InboundEvent, + OutboundMessage, + PlatformAddress, +) +from anton.core.dispatch.entities import ( + AgentGroup, + MessagingGroup, + MessagingGroupAgent, + Session, + SessionMode, + TriggerRule, +) +from anton.core.dispatch.policy import ( + GateDecision, + GateResult, + PermissionPolicy, + ProposedAction, + evaluate as evaluate_policy, +) +from anton.core.dispatch.registry import get_active_adapter +from anton.core.dispatch.session_store import SessionStoreProtocol + + +_log = logging.getLogger("anton.dispatch.router") + + +# --------------------------------------------------------------------------- +# Repository protocol — abstracts the central DB +# --------------------------------------------------------------------------- + + +class DispatchRepository: + """Persistence interface for the entity model. + + Wraps the central database holding agent groups, messaging groups, + wirings, and sessions. Kept as a separate Protocol so the router has + no SQL knowledge — and so cloud deployments can supply a managed + backend without changes here. + + The concrete file-backed implementation is + :class:`anton.core.dispatch.repository.SqliteDispatchRepository`. + """ + + async def get_or_create_messaging_group( + self, + channel_type: str, + platform_id: str, + ) -> MessagingGroup: + raise NotImplementedError + + async def update_messaging_group_metadata( + self, + messaging_group_id: str, + display_name: str | None = None, + is_group: bool | None = None, + ) -> None: + raise NotImplementedError + + async def get_wirings(self, messaging_group_id: str) -> list[MessagingGroupAgent]: + raise NotImplementedError + + async def get_agent_group(self, agent_group_id: str) -> AgentGroup: + raise NotImplementedError + + async def resolve_session( + self, + agent_group: AgentGroup, + wiring: MessagingGroupAgent, + address: PlatformAddress, + ) -> Session: + raise NotImplementedError + + async def open_session_store(self, session: Session) -> SessionStoreProtocol: + raise NotImplementedError + + +# --------------------------------------------------------------------------- +# Runtime hook — abstracts how sessions are spun up +# --------------------------------------------------------------------------- + + +class RuntimeOrchestrator: + """Starts/wakes per-session agent runtimes. + + In-process implementations (tests, CLI) live in + :mod:`anton.core.dispatch.local_runtime`. Container-backed + implementations bridge to ``LocalScratchpadRuntime`` or + scratchpad_service. + """ + + async def wake(self, session: Session, agent_group: AgentGroup) -> None: + """Ensure a runtime is running for this session. + + Idempotent: if the runtime is already up, returns immediately. + """ + raise NotImplementedError + + async def stop(self, session: Session) -> None: + """Tear down a session's runtime (used for pause/cleanup).""" + raise NotImplementedError + + +# --------------------------------------------------------------------------- +# Pending action — bookkeeping for ActionCard prompts +# --------------------------------------------------------------------------- + + +@dataclass +class PendingAction: + """A gated action awaiting user approval via :class:`ActionCard`. + + The router stashes one of these per emitted card; when an + :class:`ActionResponse` arrives it's matched by ``question_id`` and + the awaiting coroutine is unblocked. + """ + + question_id: str + address: PlatformAddress + action: ProposedAction + future: asyncio.Future[bool] + + +# --------------------------------------------------------------------------- +# The router +# --------------------------------------------------------------------------- + + +@dataclass +class DispatchRouter: + """Central dispatch coordinator. + + Holds references to the persistence and runtime layers and the + in-process pending-action table. Every adapter callback flows through + here. + + The :attr:`adapter_lookup` field defaults to + :func:`anton.core.dispatch.registry.get_active_adapter` so the router + plays nicely with the registry; tests can pass a custom callable to + bypass the global registry. + """ + + repo: DispatchRepository + runtime: RuntimeOrchestrator + adapter_lookup: Callable[[str], object | None] = field(default=get_active_adapter) + _pending: dict[str, PendingAction] = field(default_factory=dict) + _delivery_task: asyncio.Task[None] | None = None + _active_sessions: dict[str, tuple[Session, AgentGroup, PlatformAddress]] = field( + default_factory=dict + ) + _stop_delivery: asyncio.Event = field(default_factory=asyncio.Event) + + # ----------------------------------------------------------------- + # Inbound + # ----------------------------------------------------------------- + + async def on_inbound(self, event: InboundEvent) -> None: + """Adapter → router entry point for inbound platform events.""" + addr = event.address + mg = await self.repo.get_or_create_messaging_group( + addr.channel_type, addr.platform_id + ) + # Opportunistically record DM-vs-group hints from the message. + if event.message.is_group is not None and mg.is_group is None: + await self.repo.update_messaging_group_metadata( + mg.id, is_group=event.message.is_group + ) + + wirings = await self.repo.get_wirings(mg.id) + if not wirings: + _log.debug("no wirings for %s/%s", addr.channel_type, addr.platform_id) + return + + for wiring in wirings: + agent_group = await self.repo.get_agent_group(wiring.agent_group_id) + if not matches_trigger(wiring, event, agent_group.name): + continue + + session = await self.repo.resolve_session(agent_group, wiring, addr) + + # Remember which platform address last produced traffic for + # this session so outbound delivery knows where to send replies. + reply_addr = event.reply_to or addr + self._active_sessions[session.id] = (session, agent_group, reply_addr) + + store = await self.repo.open_session_store(session) + try: + store.append( + "in", + event.message.kind, + { + "id": event.message.id, + "content": event.message.content, + "sender_id": event.message.sender_id, + "sender_name": event.message.sender_name, + "thread_id": addr.thread_id, + }, + ) + finally: + store.close() + + await self.runtime.wake(session, agent_group) + + async def on_metadata( + self, + address: PlatformAddress, + metadata: dict, + ) -> None: + """Adapter → router metadata channel. + + Persists display name and group/DM flag when the adapter learns + them. Tolerant of partial updates. + """ + mg = await self.repo.get_or_create_messaging_group( + address.channel_type, address.platform_id + ) + await self.repo.update_messaging_group_metadata( + mg.id, + display_name=metadata.get("display_name"), + is_group=metadata.get("is_group"), + ) + + # ----------------------------------------------------------------- + # Action cards / gates + # ----------------------------------------------------------------- + + async def evaluate_gate( + self, + agent_group: AgentGroup, + action: ProposedAction, + ) -> GateResult: + """Apply :class:`PermissionPolicy` to a proposed runtime action. + + Pure delegation to :func:`anton.core.dispatch.policy.evaluate`, + kept as a method so future enhancements (per-session overrides, + rate limiting, cumulative-spend caps) have a hook point. + """ + return evaluate_policy(agent_group.policy, action) + + async def request_approval( + self, + address: PlatformAddress, + action: ProposedAction, + prompt: str, + *, + timeout_seconds: int | None = 300, + ) -> bool: + """Emit an :class:`ActionCard` and wait for the user's choice. + + Returns ``True`` on approve, ``False`` on deny or timeout. Used + by runtime adapters when :meth:`evaluate_gate` returns + :attr:`GateDecision.PROMPT`. + """ + from anton.core.dispatch.adapter import ActionOption + + question_id = str(uuid.uuid4()) + card = ActionCard( + question_id=question_id, + prompt=prompt, + options=[ + ActionOption(id="approve", label="Approve", style="primary"), + ActionOption(id="deny", label="Deny", style="destructive"), + ], + timeout_seconds=timeout_seconds, + ) + loop = asyncio.get_event_loop() + future: asyncio.Future[bool] = loop.create_future() + self._pending[question_id] = PendingAction( + question_id=question_id, + address=address, + action=action, + future=future, + ) + + await self.emit_action_card(address, card) + + try: + if timeout_seconds is None: + return await future + return await asyncio.wait_for(future, timeout=timeout_seconds) + except asyncio.TimeoutError: + self._pending.pop(question_id, None) + return False + + async def on_action_response(self, response: ActionResponse) -> None: + """Adapter → router entry point for ActionCard responses.""" + pending = self._pending.pop(response.question_id, None) + if pending is None: + _log.debug("no pending action for question_id=%s", response.question_id) + return + approved = response.selected_option_id == "approve" + if not pending.future.done(): + pending.future.set_result(approved) + + async def emit_action_card( + self, + address: PlatformAddress, + card: ActionCard, + ) -> None: + """Render an action card via the appropriate adapter.""" + adapter = self.adapter_lookup(address.channel_type) + if adapter is None: + _log.warning( + "no adapter for channel_type=%s; auto-denying card %s", + address.channel_type, + card.question_id, + ) + pending = self._pending.pop(card.question_id, None) + if pending and not pending.future.done(): + pending.future.set_result(False) + return + await adapter.show_action_card(address, card) # type: ignore[attr-defined] + + # ----------------------------------------------------------------- + # Outbound + # ----------------------------------------------------------------- + + async def deliver(self, message: OutboundMessage) -> None: + """Forward one outbound message to its adapter. + + Failures (adapter offline, platform error) are logged. The caller + decides whether to mark the source store row delivered — typically + :meth:`run_delivery_loop` only marks on success. + """ + adapter = self.adapter_lookup(message.address.channel_type) + if adapter is None: + raise RuntimeError( + f"no active adapter for channel_type={message.address.channel_type}" + ) + await adapter.deliver(message) # type: ignore[attr-defined] + + async def run_delivery_loop(self, *, poll_interval_s: float = 0.5) -> None: + """Background task: drain ``messages_out`` from active sessions. + + Polls each active session's store and ships any undelivered + ``out`` rows through :meth:`deliver`. Successful deliveries are + marked on the store; failures are logged and retried on the next + pass (the row stays undelivered). + """ + self._stop_delivery.clear() + while not self._stop_delivery.is_set(): + for sid, (session, _ag, addr) in list(self._active_sessions.items()): + try: + store = await self.repo.open_session_store(session) + except Exception as e: + _log.warning("delivery: open_session_store failed for %s: %r", sid, e) + continue + try: + rows = store.read_undelivered("out") + for row in rows: + text = "" + if isinstance(row.content, dict): + text = str(row.content.get("text", "")) + elif isinstance(row.content, str): + text = row.content + if not text: + store.mark_delivered(row.rowid, "out") + continue + out = OutboundMessage(address=addr, text=text) + try: + await self.deliver(out) + store.mark_delivered(row.rowid, "out") + except Exception as e: + _log.warning( + "delivery failed for session=%s row=%d: %r", + sid, row.rowid, e, + ) + finally: + store.close() + + try: + await asyncio.wait_for( + self._stop_delivery.wait(), + timeout=poll_interval_s, + ) + except asyncio.TimeoutError: + pass + + def start_delivery_loop(self) -> None: + """Spawn :meth:`run_delivery_loop` as a background task.""" + if self._delivery_task is not None and not self._delivery_task.done(): + return + self._delivery_task = asyncio.create_task( + self.run_delivery_loop(), + name="dispatch-delivery-loop", + ) + + async def stop_delivery_loop(self) -> None: + """Stop the background delivery loop.""" + self._stop_delivery.set() + if self._delivery_task is not None: + try: + await asyncio.wait_for(self._delivery_task, timeout=5.0) + except asyncio.TimeoutError: + self._delivery_task.cancel() + self._delivery_task = None + + +# --------------------------------------------------------------------------- +# Trigger filtering — pure helpers, no IO +# --------------------------------------------------------------------------- + + +def matches_trigger( + wiring: MessagingGroupAgent, + event: InboundEvent, + agent_group_name: str, +) -> bool: + """Return ``True`` if this wiring should fire for this event. + + Implements the three :class:`TriggerRule` modes: + + - ``ALWAYS`` — every message fires. + - ``MENTION_ONLY`` — fires when the platform-confirmed + ``is_mention`` is set, falling back to a case-insensitive + substring match of the agent's display name in the message + text. The fallback handles platforms (or adapters) that don't + report mentions natively. + - ``REGEX`` — fires when ``wiring.trigger_pattern`` matches + ``event.message.content`` interpreted as text. + """ + rule = wiring.trigger_rule + if rule is TriggerRule.ALWAYS: + return True + + if rule is TriggerRule.MENTION_ONLY: + if event.message.is_mention is True: + return True + if event.message.is_mention is False: + return False + return _text_contains(event.message.content, agent_group_name) + + if rule is TriggerRule.REGEX: + if not wiring.trigger_pattern: + return False + import re + + try: + return bool( + re.search(wiring.trigger_pattern, _as_text(event.message.content)) + ) + except re.error: + return False + + return False + + +def _as_text(content: object) -> str: + """Best-effort string view of message content for regex matching.""" + if isinstance(content, str): + return content + if isinstance(content, dict): + text = content.get("text") or content.get("content") or "" + return text if isinstance(text, str) else "" + return "" + + +def _text_contains(content: object, needle: str) -> bool: + """Case-insensitive substring check used by the mention fallback.""" + return needle.lower() in _as_text(content).lower() diff --git a/anton/core/dispatch/session_store.py b/anton/core/dispatch/session_store.py new file mode 100644 index 00000000..aa7ea36a --- /dev/null +++ b/anton/core/dispatch/session_store.py @@ -0,0 +1,182 @@ +"""Per-session message store — the single IO surface between host and agent. + +Following nanoclaw's "the DB is the only IO mechanism" principle, every +dispatch session has its own SQLite database with two tables: + + - ``messages_in`` — written by the host (router), read by the agent runtime. + - ``messages_out`` — written by the agent runtime, read by the host (delivery). + +Everything is a message: chat, webhooks, system actions, scheduled triggers, +action-card responses, agent-to-agent. This collapses what would otherwise be +several IPC channels (stdin pipes, control sockets, status files) into one +inspectable, restartable, debuggable surface. + +The store is intentionally narrow — append, read-since, mark-delivered. No +joins, no transactions across rows, no schema migrations beyond the bootstrap. +If the agent runtime crashes, the next process opens the same SQLite file +and resumes from the last unread row. + +This module implements :class:`SessionStoreProtocol` so cloud deployments +(Postgres, Redis Streams, etc.) can supply alternate backends without +inheriting from the file-based one — same pattern as :class:`HippocampusProtocol`. +""" + +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, Literal, Protocol, runtime_checkable + +Direction = Literal["in", "out"] + + +@dataclass +class StoredMessage: + """A row from ``messages_in`` or ``messages_out``.""" + + rowid: int + direction: Direction + kind: str + """Free-form tag: ``"chat"``, ``"webhook"``, ``"system"``, + ``"action_response"``, ``"scheduled"``, ``"reply"``, etc.""" + content: Any + """JSON-decoded payload.""" + timestamp: datetime + delivered: bool + """``True`` once the host has delivered an outbound row to the platform, + or once the agent runtime has consumed an inbound row. Acts as the + 'has been processed' flag for both directions.""" + + +@runtime_checkable +class SessionStoreProtocol(Protocol): + """Backend-agnostic message store interface.""" + + def append(self, direction: Direction, kind: str, content: Any) -> int: + """Append a row; return its rowid.""" + ... + + def read_undelivered(self, direction: Direction) -> list[StoredMessage]: + """Return undelivered rows for the given direction, oldest first.""" + ... + + def mark_delivered(self, rowid: int, direction: Direction) -> None: + """Flip the ``delivered`` flag for one row in the given direction's table.""" + ... + + def close(self) -> None: + """Release backend resources.""" + ... + + +# --------------------------------------------------------------------------- +# SQLite implementation +# --------------------------------------------------------------------------- + + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS messages_in ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + kind TEXT NOT NULL, + content TEXT NOT NULL, + timestamp TEXT NOT NULL, + delivered INTEGER NOT NULL DEFAULT 0 +); +CREATE TABLE IF NOT EXISTS messages_out ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + kind TEXT NOT NULL, + content TEXT NOT NULL, + timestamp TEXT NOT NULL, + delivered INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_in_undelivered ON messages_in (delivered, rowid); +CREATE INDEX IF NOT EXISTS idx_out_undelivered ON messages_out (delivered, rowid); +""" + + +class SQLiteSessionStore(SessionStoreProtocol): + """File-based message store mounted into the agent runtime. + + The SQLite file lives at ``/session.db``. The host writes + to ``messages_in`` and reads from ``messages_out``; the agent runtime + does the inverse. SQLite's file locking handles the concurrent access + safely when both sides use ``WAL`` journaling. + """ + + def __init__(self, db_path: Path) -> None: + self.db_path = db_path + db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect( + str(db_path), + isolation_level=None, # autocommit + check_same_thread=False, + ) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.executescript(_SCHEMA) + + def append(self, direction: Direction, kind: str, content: Any) -> int: + """Append a row to ``messages_in`` or ``messages_out``.""" + table = self._table(direction) + ts = datetime.now(timezone.utc).isoformat() + cur = self._conn.execute( + f"INSERT INTO {table} (kind, content, timestamp) VALUES (?, ?, ?)", + (kind, json.dumps(content), ts), + ) + return cur.lastrowid + + def read_undelivered(self, direction: Direction) -> list[StoredMessage]: + """Return undelivered rows for the direction, oldest first.""" + table = self._table(direction) + rows = self._conn.execute( + f"SELECT rowid, kind, content, timestamp, delivered " + f"FROM {table} WHERE delivered = 0 ORDER BY rowid ASC" + ).fetchall() + return [ + StoredMessage( + rowid=r[0], + direction=direction, + kind=r[1], + content=json.loads(r[2]), + timestamp=datetime.fromisoformat(r[3]), + delivered=bool(r[4]), + ) + for r in rows + ] + + def mark_delivered(self, rowid: int, direction: Direction) -> None: + """Mark a single row delivered in the given direction's table. + + ``direction`` is required because ``messages_in`` and ``messages_out`` + have independent rowid sequences — a rowid alone is ambiguous, and + guessing the wrong table silently flips the wrong row. + """ + table = self._table(direction) + self._conn.execute( + f"UPDATE {table} SET delivered = 1 WHERE rowid = ?", + (rowid,), + ) + + def close(self) -> None: + """Close the underlying SQLite connection.""" + self._conn.close() + + @staticmethod + def _table(direction: Direction) -> str: + if direction == "in": + return "messages_in" + if direction == "out": + return "messages_out" + raise ValueError(f"Invalid direction: {direction!r}") + + +# --------------------------------------------------------------------------- +# Convenience helpers used by the router +# --------------------------------------------------------------------------- + + +def open_store(session_dir: Path) -> SQLiteSessionStore: + """Open (or create) the standard ``session.db`` for a session directory.""" + return SQLiteSessionStore(session_dir / "session.db") diff --git a/anton/core/runtime.py b/anton/core/runtime.py new file mode 100644 index 00000000..f840d860 --- /dev/null +++ b/anton/core/runtime.py @@ -0,0 +1,209 @@ +"""Shared ChatSession builder for any host (CoWork desktop, dispatch, CLI integrations). + +Wraps the boilerplate that used to live inline in `anton-cowork/server/routes/anton_bridge.py` +so the same workspace + memory + vault wiring is reused by every host. Hosts customize +behavior via parameters (extra tools, system-prompt suffix, output-context template) rather +than forking the builder. + +Public API: + build_chat_session(...) — async builder returning a ready ChatSession + resolve_workspace_base(...) — workspace path normalizer + safe_redact_error(exc) — error-message redactor that strips API keys + AntonConfigurationError — raise when setup is missing/invalid + AntonRuntimeError — raise when an Anton call fails after configuration is OK +""" +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any, Optional, Sequence + +logger = logging.getLogger(__name__) + +REDACTED_ENV_KEYS: tuple[str, ...] = ( + "ANTON_ANTHROPIC_API_KEY", + "ANTON_OPENAI_API_KEY", + "ANTON_MINDS_API_KEY", + "ANTHROPIC_API_KEY", + "OPENAI_API_KEY", +) + + +class AntonConfigurationError(RuntimeError): + """Setup is missing or invalid — Anton cannot run yet.""" + + +class AntonRuntimeError(RuntimeError): + """A real Anton call failed after configuration passed.""" + + +def safe_redact_error(exc: Exception) -> str: + """Stringify an exception, replacing any live API-key values with [redacted].""" + message = str(exc).strip() or exc.__class__.__name__ + for key in REDACTED_ENV_KEYS: + value = os.environ.get(key) + if value: + message = message.replace(value, "[redacted]") + return message + + +def resolve_workspace_base(workspace_path: Optional[str]) -> Path: + """Normalize a user-provided workspace path; default to the current working directory.""" + if workspace_path: + return Path(workspace_path).expanduser().resolve() + return Path.cwd().resolve() + + +async def build_chat_session( + *, + session_id: str, + workspace_path: Optional[str] = None, + model: Optional[str] = None, + extra_tools: Optional[Sequence[Any]] = None, + system_prompt_suffix: Optional[str] = None, + output_context: Optional[str] = None, +): + """Build a ChatSession scoped to one workspace. + + Replicates the same wiring the Anton CLI uses: settings → workspace → LLM client → + memory (cortex + episodes + history) → data vault env injection → ChatSession. + + Parameters + ---------- + session_id + Stable id used for episodic-memory resume + history-store load. + workspace_path + Project workspace root. None → current working directory. + model + Override planning model on the resolved settings. + extra_tools + Tools added on top of the default Anton tool set (e.g. publishing, datasource connect). + Hosts pass their own tool set; pass None for the bare default. + system_prompt_suffix + Free-form text appended to the system prompt. Hosts use this to nudge tone or + describe their UI affordances. None → no suffix. + output_context + Override for the per-session output-folder hint. None → use the default template + pointing at `settings.artifacts_dir`. + + Returns + ------- + anton.core.session.ChatSession + Ready to call `turn_stream(text)`. + """ + # Imports kept inside the function so this module is importable without anton's + # heavy deps (typer, rich) at module load time. + from anton.chat_session import build_runtime_context + from anton.config.settings import AntonSettings + from anton.context.self_awareness import SelfAwarenessContext + from anton.core.llm.client import LLMClient + from anton.core.memory.cortex import Cortex + from anton.core.memory.episodes import EpisodicMemory + from anton.core.memory.hippocampus import Hippocampus + from anton.core.session import ChatSession, ChatSessionConfig, SystemPromptContext + from anton.memory.history_store import HistoryStore + from anton.workspace import Workspace + + try: + from anton.core.datasources.data_vault import LocalDataVault + except Exception: # pragma: no cover — old Anton builds may not expose it + LocalDataVault = None # type: ignore[assignment] + + base = resolve_workspace_base(workspace_path) + settings = AntonSettings() + settings.resolve_workspace(str(base)) + if model: + settings.planning_model = model + + workspace = Workspace(base) + workspace.initialize() + workspace.apply_env_to_process() + + anton_dir = base / ".anton" + output_dir = Path(settings.artifacts_dir) + context_dir = Path(settings.context_dir) + episodes_dir = anton_dir / "episodes" + project_memory_dir = anton_dir / "memory" + for directory in (output_dir, context_dir, episodes_dir, project_memory_dir): + directory.mkdir(parents=True, exist_ok=True) + + llm_client = LLMClient.from_settings(settings) + self_awareness = SelfAwarenessContext(context_dir) + + global_memory_dir = Path.home() / ".anton" / "memory" + global_memory_dir.mkdir(parents=True, exist_ok=True) + cortex = Cortex( + global_hc=Hippocampus(global_memory_dir), + project_hc=Hippocampus(project_memory_dir), + mode=settings.memory_mode if settings.memory_enabled else "off", + llm_client=llm_client, + ) + episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory) + episodic.resume_session(session_id) + history_store = HistoryStore(episodes_dir) + initial_history = history_store.load(session_id) + + resolved_output_context = output_context or ( + f"Save generated files and dashboards to `{output_dir}`. " + "When you create a user-facing HTML dashboard or report, save it there." + ) + + data_vault = LocalDataVault() if LocalDataVault is not None else None + google_drive_oauth_connected = False + if data_vault is not None: + try: + for conn in data_vault.list_connections(): + engine = conn.get("engine") + name = conn.get("name") + if not (engine and name): + continue + data_vault.inject_env(engine, name) + if engine == "google_drive": + fields = data_vault.load(engine, name) or {} + if fields.get("auth_type") == "oauth": + google_drive_oauth_connected = True + except Exception: + logger.debug("Could not inject Anton data vault env", exc_info=True) + + integration_guidance = "" + if google_drive_oauth_connected: + integration_guidance = ( + " Connected Google Drive accounts are available through Google OAuth credentials " + "in the injected `DS_GOOGLE_DRIVE___...` environment variables. " + "Only claim Google Drive access if you can actually use those credentials successfully." + ) + + suffix_parts = [s for s in (system_prompt_suffix, integration_guidance) if s] + final_suffix = "".join(suffix_parts) if suffix_parts else None + + config = ChatSessionConfig( + llm_client=llm_client, + settings=settings, + self_awareness=self_awareness, + cortex=cortex, + episodic=episodic, + system_prompt_context=SystemPromptContext( + runtime_context=build_runtime_context(settings), + suffix=final_suffix, + output_context=resolved_output_context, + ), + workspace=workspace, + data_vault=data_vault, + initial_history=initial_history, + history_store=history_store, + session_id=session_id, + proactive_dashboards=settings.proactive_dashboards, + tools=list(extra_tools) if extra_tools else [], + ) + return ChatSession(config) + + +__all__ = [ + "AntonConfigurationError", + "AntonRuntimeError", + "REDACTED_ENV_KEYS", + "build_chat_session", + "resolve_workspace_base", + "safe_redact_error", +] diff --git a/anton/policies.py b/anton/policies.py new file mode 100644 index 00000000..e4880b27 --- /dev/null +++ b/anton/policies.py @@ -0,0 +1,236 @@ +"""Bundled MindsDB policy texts shown inline by the CLI consent flow. + +These constants mirror the canonical versions hosted at +https://mindsdb.com/terms and https://mindsdb.com/privacy-policy. Update +both this file and the hosted pages together so they stay in sync. + +Module is imported lazily inside `_ensure_terms_consent` so the ~14KB of +string data does not load on every CLI invocation. +""" + +from __future__ import annotations + + +TERMS_OF_USE_MD = """\ +# MindsDB Digital Website Terms of Use + +*Last Updated: September 24, 2024* + +Welcome, and thank you for your interest in MindsDB, Inc. ("MindsDB," "we," or "us") and our websites at https://mindsdb.com, https://docs.mindsdb.com, https://mdb.ai, and https://docs.mdb.ai and related website (collectively, the "Sites"). These Terms of Use are a legally binding contract between you and MindsDB regarding your use of the Sites. Please read the following terms carefully before using the Sites. + +These terms do not apply to your access to or use of MindDB's products, services, or other offerings ("MindsDB Offerings"). MindsDB Offerings are subject to a separate agreements between you and MindsDB concerning such MindsDB Offerings. + +By using the Sites, you acknowledge that you have read, understood, and agree to be bound by the following terms and conditions, including the MindsDB Privacy Policy at https://mindsdb.com/privacy-policy (together, the "Terms"). If you are not eligible, or you do not agree to the Terms, then you do not have our permission to use the Sites. + +## Eligibility + +You must be at least 18 years of age to use the Sites. By agreeing to these Terms, you represent and warrant to us that: (a) you are at least 18 years of age; and (b) your use of the Sites is in compliance with any and all applicable laws and regulations. If you are an entity, organization, or company, the individual accepting these Terms on your behalf represents and warrants that they have authority to bind you to these Terms and you agree to be bound by these Terms. + +## Changes to the Terms + +We may periodically make changes to these Terms. When we do, we will update the "Last Updated" date above. It is your responsibility to review the most recent version of these Terms and remain informed of any changes. You agree that your continued use of a Site after the effective date of any changes will constitute your acceptance of the changed Terms for your continued use. Disputes arising under these Terms will be resolved in accordance with the version of these Terms that was in effect at the time the dispute arose. + +## Changes to the Sites + +We reserve the right to modify or discontinue, temporarily or permanently, all or a part of the Sites without notice. We will not be liable to you or to any third party for any modification, suspension, or discontinuance of the Sites. + +## Limited License + +Subject to these Terms, MindsDB grants you a limited, revocable license to access and use the Sites to learn more about our products and services. No other use of the Sites is authorized. + +## Restrictions + +You must comply with all applicable laws when using the Sites. Except as may be expressly permitted by applicable law or expressly permitted by us in writing, you will not, and will not permit anyone else to: (a) store, copy, modify, distribute, or resell any information or material available on the Sites ("Site Content") or compile or collect any Site Content as part of a database or other work; (b) use any automated tool (e.g., robots, spiders) to use the Sites or store, copy, modify, distribute, or resell any Site Content; (c) rent, lease, or sublicense your access to the Sites; (d) use the Sites or Site Content for any purpose except for your own personal use; (e) circumvent or disable any digital rights management, usage rules, or other security features of the Sites; (f) reproduce, modify, translate, enhance, decompile, disassemble, reverse engineer, or create derivative works of the Sites; (g) use the Sites in a manner that threatens the integrity, performance, or availability of the Sites; or (h) remove, alter, or obscure any proprietary notices (including copyright notices) on any portion of the Sites or Site Content. + +## Ownership + +The Sites are owned and operated by MindsDB Inc. We or our licensors retain all right, title, and interest in and to the Sites and Site Content and any trademarks, logos, or service marks displayed on the Sites or in Site Content ("Marks"). The Sites, Site Content, and Marks are protected by applicable intellectual property laws and international treaties. Except as expressly authorized by MindsDB, you may not make use of the Sites, Site Content, and Marks. + +## Privacy Policy + +Please read the MindsDB Privacy Policy carefully for information relating to our collection, use, storage, disclosure of your personal information. The Privacy Policy is incorporated by this reference into, and made a part of, these Terms. + +## Links and Third-Party Content + +The Sites may contain links to third party products, services, and websites. We exercise no control over the third-party products, services, and websites and we are not responsible for their performance, do not endorse them, and are not responsible or liable for any content, advertising, or other materials available through the third-party products, services, and websites. We are not responsible or liable, directly or indirectly, for any damage or loss caused to you by your use of or reliance on any goods or services available through the third-party products, services, and websites. + +Additionally, if you follow a link or otherwise navigate away from the Sites, please be aware that these Terms will no longer govern. You should review the applicable terms and policies, including privacy and data gathering practices, of any third-party websites to which you navigate to from the Sites. + +## Feedback + +MindsDB may provide you with a mechanism to provide feedback, suggestions, and ideas about the Sites or us ("Feedback"). You agree that we may, in our sole discretion, use the Feedback you provide in any way, including in future modifications to the Sites, our products, or services. You hereby grant us an unrestricted, perpetual, worldwide, fully transferable, irrevocable, royalty-free right to exploit the Feedback in any manner for any purpose. + +## Disclaimer of Warranties + +YOUR USE OF THE SITES AND SITE CONTENT IS AT YOUR SOLE RISK. THE SITES AND SITE CONTENT ARE PROVIDED ON AN "AS IS" AND "AS AVAILABLE" BASIS. MINDSDB EXPRESSLY DISCLAIMS ALL WARRANTIES OF ANY KIND, WHETHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE, AND NON-INFRINGEMENT. WE DO NOT GUARANTEE THE ACCURACY, COMPLETENESS, OR USEFULNESS OF THE SITES OR SITE CONTENT, AND YOU RELY ON THE SITES AND SITE CONTENT AT YOUR OWN RISK. ANY MATERIAL OBTAINED THROUGH THE SITES IS DONE AT YOUR OWN DISCRETION AND RISK AND YOU WILL BE SOLELY RESPONSIBLE FOR ANY DAMAGE TO YOUR COMPUTER OR LOSS OF DATA THAT RESULTS FROM THE DOWNLOAD OF ANY MATERIAL THROUGH THE SITES. NO ADVICE OR INFORMATION, WHETHER ORAL OR WRITTEN, OBTAINED BY YOU FROM MINDSDB OR THROUGH OR FROM THE SITES WILL CREATE ANY WARRANTY NOT EXPRESSLY STATED IN THIS AGREEMENT. HOWEVER, MINDSDB DOES NOT DISCLAIM ANY WARRANTY OR OTHER RIGHT THAT MINDSDB IS PROHIBITED FROM DISCLAIMING UNDER APPLICABLE LAW. + +## Limitation of Liability + +MINDSDB WILL NOT BE LIABLE FOR ANY INDIRECT, INCIDENTAL, SPECIAL, CONSEQUENTIAL, OR EXEMPLARY DAMAGES, INCLUDING BUT NOT LIMITED TO, DAMAGES FOR LOSS OF PROFITS, GOODWILL, USE, DATA OR OTHER INTANGIBLE LOSSES (EVEN IF MINDSDB HAS BEEN ADVISED OF THE POSSIBILITY OF THESE DAMAGES), RESULTING FROM YOUR USE OF THE SITES AND SITE CONTENT. UNDER NO CIRCUMSTANCES WILL MINDSDB'S TOTAL LIABILITY OF ALL KINDS ARISING OUT OF OR RELATED TO YOUR USE OF THE SITES OR SITE CONTENT (INCLUDING BUT NOT LIMITED TO WARRANTY CLAIMS), REGARDLESS OF THE FORUM AND REGARDLESS OF WHETHER ANY ACTION OR CLAIM IS BASED ON CONTRACT, TORT, OR OTHERWISE, EXCEED $50. BECAUSE SOME STATES DO NOT ALLOW THE EXCLUSION OR LIMITATION OF LIABILITY FOR CONSEQUENTIAL OR INCIDENTAL DAMAGES, THE ABOVE LIMITATION MAY NOT APPLY TO YOU. + +EACH PROVISION OF THESE TERMS THAT PROVIDES FOR A LIMITATION OF LIABILITY, DISCLAIMER OF WARRANTIES, OR EXCLUSION OF DAMAGES IS INTENDED TO AND DOES ALLOCATE THE RISKS BETWEEN THE PARTIES UNDER THESE TERMS. THIS ALLOCATION IS AN ESSENTIAL ELEMENT OF THE BASIS OF THE BARGAIN BETWEEN THE PARTIES. EACH OF THESE PROVISIONS IS SEVERABLE AND INDEPENDENT OF ALL OTHER PROVISIONS OF THESE TERMS. THE LIMITATIONS IN THIS SECTION WILL APPLY EVEN IF ANY LIMITED REMEDY FAILS OF ITS ESSENTIAL PURPOSE. + +## Indemnity + +You will indemnify and hold MindsDB, and affiliates, officers, agents, and employees, harmless from any costs, damages, expenses, and liability caused by your use of the Sites or Site Content, your violation of these Terms, or your violation of any rights of a third party through use of the Sites or Site Content. We reserve the right, at our own expense, to assume the exclusive defense and control of any matter otherwise subject to indemnification by you (without limiting your indemnification obligations with respect to that matter), and in that case, you agree to cooperate with our defense of those claims. + +## General Terms + +These Terms, together with the Privacy Policy and any other agreements expressly incorporated by reference into these Terms, are the entire and exclusive understanding and agreement between you and MindsDB regarding your use of the Sites. You may not assign or transfer these Terms or your rights under these Terms, in whole or in part, by operation of law or otherwise, without our prior written consent. We may assign these Terms at any time without notice or consent. The failure to require performance of any provision will not affect our right to require performance at any other time after that, nor will a waiver by us of any breach or default of these Terms, or any provision of these Terms, be a waiver of any subsequent breach or default or a waiver of the provision itself. Use of section headers in these Terms is for convenience only and will not have any impact on the interpretation of any provision. Throughout these Terms the use of the word "including" means "including but not limited to". If any part of these Terms is held to be invalid or unenforceable, the unenforceable part will be given effect to the greatest extent possible, and the remaining parts will remain in full force and effect. + +## Legal Notices + +These Terms are governed by the laws of the state of California without regard to conflict of law principles. The exclusive jurisdiction and venue for any claims arising out of or related to these Terms or your use of the Sites will lie in the state and federal courts located in San Francisco, California, and you irrevocably agree to submit to the jurisdiction of such courts. The failure of MindsDB to enforce any right or provision in these Terms will not constitute a waiver of such right or provision unless acknowledged and agreed to by MindsDB in writing. In the event that a court of competent jurisdiction finds any provision of these Terms to be illegal, invalid or unenforceable, the remaining provisions will remain in full force and effect. + +## Contacting MindsDB + +If you have any questions or concerns about the Sites or these Terms, you may contact us by email at hello@mindsdb.com, or write to us at: + +MindsDB +3277 S White Rd PMB 10166, +San Jose, CA 95148 +USA +""" + + +PRIVACY_POLICY_MD = """\ +# MindsDB Privacy Policy + +*Effective date: September 4, 2025 — Last updated: September 4, 2025* + +This Privacy Notice explains how MindsDB ("we," "us," or "our") collects, uses, discloses, and protects personal information when you visit our websites, interact with our marketing, register for events, contact us, or otherwise communicate with us in a business context. It also describes your privacy rights and how to exercise them. + +**Who we are.** MindsDB, legal address: 3277 S White Rd PMB 10166, San Jose, CA 95148, USA. + +**Data Protection Officer (DPO).** Adam Carrigan – hello@mindsdb.com. + +If you are a user of our products under a separate agreement, or an employee/candidate, this website notice may be supplemented by product- or HR-specific notices. + +## 1) Scope and audience + +This notice applies to personal information about visitors, prospective customers and partners, vendors, and other contacts who interact with our public websites and marketing (collectively, Business Contacts). It does not cover information we process solely on behalf of our customers as a processor/service provider — those activities are governed by a separate data processing agreement. + +## 2) Information we collect + +We collect personal information in the categories below (examples are illustrative): + +- **Identifiers & contact details** – name, employer, job title/role, business email, phone, country/region, and any information you provide in forms or communications. +- **Professional information** – company, team, interests, purchase intent, and CRM notes. +- **Internet / device & usage data** – IP address, device and browser type, operating system, pages viewed, links clicked, referring URLs, time spent, language, and approximate geolocation (derived from IP). +- **Cookies & similar technologies** – identifiers set by us and our partners for analytics, site performance, fraud/security, and (if enabled) advertising/retargeting. +- **Event & webinar data** – registration information, attendance, and feedback. +- **Support interactions** – content of messages, call/chat metadata. +- **Sensitive personal information** – we do not intentionally collect sensitive personal information via our marketing websites. If you choose to provide such information in free-text fields, we will process it only as necessary to handle your request. + +**Sources of information.** We collect information directly from you (forms, emails, support), automatically via your device (cookies, logs), and from third parties (co-marketing partners, event organizers, lead-generation providers, and publicly available sources such as LinkedIn when allowed by law). + +## 3) How we use personal information (purposes) + +We use personal information to: + +- Provide, operate, and improve our websites and services. +- Communicate with you, respond to inquiries, send administrative messages, and provide requested materials (e.g., whitepapers, demos). +- Market and advertise our services, including measuring campaign effectiveness and building audiences (see Cookies & Tracking). +- Plan and host events and manage registrations. +- Maintain security and prevent fraud, abuse, or misuse. +- Comply with legal obligations and enforce our agreements. + +Where required by law, we will obtain consent before using cookies for non-essential purposes or sending you certain marketing communications. + +## 4) Cookies & tracking controls + +We and our partners use cookies, SDKs, and similar technologies. + +In the Regions requiring prior consent (for example, the EU/UK/EEA/Switzerland/Canada), we seek opt-in consent for any non-essential cookies (e.g., analytics/advertising). These cookies are not set until you accept. + +In the United States, you may opt out of interest-based advertising and the disclosure of your data for targeted advertising via the Your Privacy Choices link and by enabling recognized opt-out preference signals (see below). + +You can update your cookie preferences at any time via the cookie banner, the Cookie Preferences control in our footer, or your browser settings. Blocking cookies may affect some site features. + +For a current, always-up-to-date list of cookies and partners we use, see our Cookie Policy (Cookie Declaration), accessible via the cookie banner and the Cookie Preferences control in our footer. + +## 5) When and with whom we disclose information + +We disclose personal information as described below (we do not disclose lists of individual names unless required by law): + +- **Service providers/contractors** who process data for us under written contracts (e.g., hosting, analytics, email delivery, CRM, event management, customer support, security). They may access personal information only to perform services for us and must not use it for their own purposes. +- **Advertising and analytics partners** to measure campaigns, improve our sites, and (if enabled) show you relevant ads on other sites. See Your choices below for how to opt out. +- **Business partners** in co-marketing or joint events when you register for those programs. +- **Professional advisors** (lawyers, accountants), authorities, and others when necessary for compliance, safety, and legal reasons. +- **Corporate transactions** – if we engage in a merger, acquisition, financing, or sale of assets, your information may be transferred as part of that transaction, subject to confidentiality. + +We do not knowingly allow our service providers to collect, use, or disclose personal information except to provide the services we request or as permitted by law and contract. + +## 6) "Selling" or "sharing" personal information / targeted advertising + +We do not "sell" personal information for money. We may "share" identifiers and internet/activity data with advertising partners for cross-context behavioral advertising (targeted ads). You may opt out at any time via the Your Privacy Choices link and by using recognized opt-out preference signals (Global Privacy Control). + +We honor Global Privacy Control (GPC) and other recognized regional opt-out signals where required. See Your rights & choices. + +## 7) Your rights & choices + +Your rights depend on where you live. We will honor requests as required by applicable law and, where reasonable, in other regions as well. + +### California & other U.S. state privacy laws + +If you live in CA, CO, CT, UT, VA, OR (and other states with comprehensive privacy laws), you may have the right to: + +- **Know/Access** the categories and specific pieces of personal information we collected about you. +- **Delete** personal information we collected from you. +- **Correct** inaccurate personal information. +- **Opt out** of the sale or sharing of personal information and of targeted advertising. +- **Limit** the use and disclosure of sensitive personal information (if we collect it). +- **Appeal** our decision if we decline to act on your request (where applicable). + +**How to exercise your rights.** Submit a request at hello@mindsdb.com (subject line: Privacy Request). For opt-outs of targeted advertising, use the Your Privacy Choices link on our site or enable GPC in your browser. We will confirm receipt within 10 business days and respond within 45 days (or as allowed by law). We will take steps to verify your identity (and authority of any authorized agent). + +We will not discriminate against you for exercising your rights. + +**Do Not Track.** Our websites do respond to browser Do-Not-Track (DNT) signals by disabling non-essential tracking used for targeted advertising (and limiting analytics as configured). We also honor Global Privacy Control (GPC). + +### EU/EEA, UK, and Switzerland (GDPR/UK GDPR) + +If you are in the EU/EEA, UK, or Switzerland, you have the rights to access, rectify, erase, restrict, object (including to direct marketing), and data portability, and the right to withdraw consent at any time (without affecting the lawfulness of processing before withdrawal). You also have the right to lodge a complaint with your local supervisory authority. + +**Legal bases.** Depending on the context, we rely on: contract (to provide requested materials), legitimate interests (to operate and secure our sites, to conduct B2B marketing compatible with your expectations, to improve our services), consent (for non-essential cookies/marketing where required), and legal obligation. + +**DPO contact.** Adam Carrigan – hello@mindsdb.com. + +### Brazil (LGPD), Canada (PIPEDA/Provincial), Japan (APPI), South Korea (PIPA), and other regions + +Residents of these jurisdictions may have similar rights to access, correct, delete, or object/opt-out of certain processing, and to lodge complaints with local authorities. You can exercise rights by contacting hello@mindsdb.com. + +## 8) International data transfers + +We are headquartered in the United States and may transfer your personal information to countries that may not provide the same level of protection as your home country. Where required, we use legally approved safeguards (for example, Standard Contractual Clauses (SCCs) or country-specific equivalents). + +## 9) Data retention + +We keep personal information for as long as necessary to achieve the purposes described above, including to comply with legal, accounting, or reporting obligations, resolve disputes, and enforce agreements. We also consider the amount, nature, and sensitivity of the data and the risk of harm from unauthorized use or disclosure. + +Typical retention periods: + +- Website analytics: 13–24 months (per cookie/tool settings). +- CRM & marketing records for Business Contacts: 3 years after last meaningful interaction. +- Event registration: 3 years after the event. +- Support communications: 5 years after resolution. + +## 10) Security + +We use administrative, technical, and physical measures designed to protect personal information. No method of transmission or storage is 100% secure; if we learn of a security incident that affects your information, we will notify you as required by law. + +## 11) Children's privacy + +Our websites are intended for adults and are not directed to children. We do not knowingly collect personal information from anyone under 16. If you believe a child has provided personal information to us, contact hello@mindsdb.com and we will take appropriate steps. + +## 12) Changes to this notice + +We may update this notice from time to time. When we do, we will change the "Last updated" date at the top and, when required, provide additional notice. + +## 13) How to contact us + +MindsDB +Legal address: 3277 S White Rd PMB 10166, San Jose, CA 95148, USA +Data Protection Officer: Adam Carrigan – hello@mindsdb.com +For privacy requests, email hello@mindsdb.com and include Privacy Request in the subject line. +"""