diff --git a/.github/workflows/on-release-main.yml b/.github/workflows/on-release-main.yml index fec7fbb..4941dbe 100644 --- a/.github/workflows/on-release-main.yml +++ b/.github/workflows/on-release-main.yml @@ -5,7 +5,6 @@ on: types: [published] jobs: - set-version: runs-on: ubuntu-24.04 steps: @@ -63,4 +62,13 @@ jobs: uses: ./.github/actions/setup-python-env - name: Deploy documentation - run: uv run mkdocs gh-deploy --force + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GIT_AUTHOR_NAME: acp-bot + GIT_AUTHOR_EMAIL: noreply@github.com + GIT_COMMITTER_NAME: acp-bot + GIT_COMMITTER_EMAIL: noreply@github.com + run: | + git config user.name "$GIT_AUTHOR_NAME" + git config user.email "$GIT_AUTHOR_EMAIL" + uv run mkdocs gh-deploy --force --remote-branch gh-pages --remote-name origin diff --git a/.gitignore b/.gitignore index 3311e1c..88ea146 100644 --- a/.gitignore +++ b/.gitignore @@ -209,3 +209,10 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + +# .zed +.zed/ + +# others +reference/ +.DS_Store diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..2c04e75 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,19 @@ +# Repository Guidelines + +## Project Structure & Module Organization +The package code lives under `src/acp`, exposing the high-level Agent, transport helpers, and generated protocol schema. Generated artifacts such as `schema/` and `src/acp/schema.py` are refreshed via `scripts/gen_all.py` against the upstream ACP schema. Integration examples are in `examples/`, including `echo_agent.py` and the mini SWE bridge. Tests reside in `tests/` with async fixtures and doctests; documentation sources live in `docs/` and publish via MkDocs. Built distributions drop into `dist/` after builds. + +## Build, Test, and Development Commands +Run `make install` to create a `uv` managed virtualenv and install pre-commit hooks. `make check` executes lock verification, Ruff linting, `ty` static checks, and deptry analysis. `make test` calls `uv run python -m pytest --doctest-modules`. For release prep use `make build` or `make build-and-publish`. `make gen-all` regenerates protocol models; export `ACP_SCHEMA_VERSION=` beforehand to fetch a specific upstream schema (defaults to the cached copy). `make docs` serves MkDocs locally; `make docs-test` ensures clean builds. + +## Coding Style & Naming Conventions +Target Python 3.10+ with type hints and 120-character lines enforced by Ruff (`pyproject.toml`). Prefer dataclasses/pydantic models from the schema modules rather than bare dicts. Tests may ignore security lint (see per-file ignores) but still follow snake_case names. Keep public API modules under `acp/*` lean; place utilities in internal `_`-prefixed modules when needed. + +## Testing Guidelines +Pytest is the main framework with `pytest-asyncio` for coroutine tests and doctests activated on modules. Name test files `test_*.py` and co-locate fixtures under `tests/conftest.py`. Aim to cover new protocol surfaces with integration-style tests using the async agent stubs. Generate coverage reports via `tox -e py310` when assessing CI parity. + +## Commit & Pull Request Guidelines +Commit history follows Conventional Commits (`feat:`, `fix:`, `docs:`). Scope commits narrowly and include context on affected protocol version or tooling. PRs should describe agent behaviors exercised, link related issues, and mention schema regeneration if applicable. Attach test output (`make check` or targeted pytest) and screenshots only when UI-adjacent docs change. Update docs/examples when altering the public agent API. + +## Agent Integration Tips +Leverage `examples/mini_swe_agent/` as a template when bridging other command executors. Use `AgentSideConnection` with `stdio_streams()` for ACP-compliant clients; document any extra environment variables in README updates. diff --git a/README.md b/README.md index ede9990..94408fc 100644 --- a/README.md +++ b/README.md @@ -1,69 +1,56 @@ # Agent Client Protocol (Python) -A Python implementation of the Agent Client Protocol (ACP). Use it to build agents that communicate with ACP-capable clients (e.g. Zed) over stdio. +Python SDK for the Agent Client Protocol (ACP). Build agents that speak ACP over stdio so tools like Zed can orchestrate them. -- Package name: `agent-client-protocol` (import as `acp`) -- Repository: https://github.com/psiace/agent-client-protocol-python -- Docs: https://psiace.github.io/agent-client-protocol-python/ -- Featured: Listed as the first third-party SDK on the official ACP site — see https://agentclientprotocol.com/libraries/community +> Each release tracks the matching ACP schema version. Contributions that improve coverage or tooling are very welcome. + +**Highlights** + +- Typed dataclasses generated from the upstream ACP schema (`acp.schema`) +- Async agent base class plus stdio transport helpers for quick bootstrapping +- Included examples that stream content updates and tool calls end-to-end ## Install ```bash pip install agent-client-protocol -# or +# or with uv uv add agent-client-protocol ``` -## Development (contributors) +## Quickstart -```bash -make install # set up venv -make check # lint + typecheck -make test # run tests -``` +1. Install the package and point your ACP-capable client at the provided echo agent: + ```bash + pip install agent-client-protocol + python examples/echo_agent.py + ``` +2. Wire it into your client (e.g. Zed → Agents panel) so stdio is connected; the SDK handles JSON-RPC framing and lifecycle messages. -## Minimal agent example +Prefer a step-by-step walkthrough? Read the [Quickstart guide](docs/quickstart.md) or the hosted docs: https://psiace.github.io/agent-client-protocol-python/. -See a complete streaming echo example in [examples/echo_agent.py](examples/echo_agent.py). It streams back each text block using `session/update` and ends the turn. +### Minimal agent sketch ```python import asyncio -from acp import ( - Agent, - AgentSideConnection, - InitializeRequest, - InitializeResponse, - NewSessionRequest, - NewSessionResponse, - PromptRequest, - PromptResponse, - SessionNotification, - stdio_streams, -) -from acp.schema import ContentBlock1, SessionUpdate2 +from acp import Agent, AgentSideConnection, PromptRequest, PromptResponse, SessionNotification, stdio_streams +from acp.schema import AgentMessageChunk, TextContentBlock class EchoAgent(Agent): def __init__(self, conn): self._conn = conn - async def initialize(self, params: InitializeRequest) -> InitializeResponse: - return InitializeResponse(protocolVersion=params.protocolVersion) - - async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: - return NewSessionResponse(sessionId="sess-1") - async def prompt(self, params: PromptRequest) -> PromptResponse: for block in params.prompt: - text = block.get("text", "") if isinstance(block, dict) else getattr(block, "text", "") + text = getattr(block, "text", "") await self._conn.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=text), + content=TextContentBlock(type="text", text=text), ), ) ) @@ -80,17 +67,27 @@ if __name__ == "__main__": asyncio.run(main()) ``` -Run this executable from your ACP-capable client (e.g. configure Zed to launch it). The library takes care of the stdio JSON-RPC transport. +Full example with streaming and lifecycle hooks lives in [examples/echo_agent.py](examples/echo_agent.py). -## Example: Mini SWE Agent bridge +## Examples -A minimal ACP bridge for mini-swe-agent is provided under [`examples/mini_swe_agent`](examples/mini_swe_agent/README.md). It demonstrates: - -- Parsing a prompt from ACP content blocks -- Streaming agent output via `session/update` -- Mapping command execution to `tool_call` and `tool_call_update` +- `examples/mini_swe_agent`: bridges mini-swe-agent into ACP, including a duet launcher and Textual TUI client +- Additional transport helpers are documented in the [Mini SWE guide](docs/mini-swe-agent.md) ## Documentation -- Quickstart: [docs/quickstart.md](docs/quickstart.md) -- Mini SWE Agent example: [docs/mini-swe-agent.md](docs/mini-swe-agent.md) +- Project docs (MkDocs): https://psiace.github.io/agent-client-protocol-python/ +- Local sources: `docs/` + - [Quickstart](docs/quickstart.md) + - [Mini SWE Agent bridge](docs/mini-swe-agent.md) + +## Development workflow + +```bash +make install # create uv virtualenv and install hooks +ACP_SCHEMA_VERSION= make gen-all # refresh generated schema bindings +make check # lint, types, dependency analysis +make test # run pytest + doctests +``` + +After local changes, consider updating docs/examples if the public API surface shifts. diff --git a/docs/index.md b/docs/index.md index f742df0..77f022e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,67 +1,30 @@ -# Agent Client Protocol (Python) +# Agent Client Protocol SDK (Python) -A Python implementation of the Agent Client Protocol (ACP). Build agents that communicate with ACP-capable clients (e.g. Zed) over stdio. +Welcome to the Python SDK for the Agent Client Protocol (ACP). The package ships ready-to-use transports, typed protocol models, and examples that stream messages to ACP-aware clients such as Zed. -## Install +## What you get -```bash -pip install agent-client-protocol -``` +- Fully typed dataclasses generated from the upstream ACP schema (`acp.schema`) +- Async agent base class and stdio helpers to spin up an agent in a few lines +- Examples that demonstrate streaming updates and tool execution over ACP -## Minimal usage +## Getting started -```python -import asyncio +1. Install the package: + ```bash + pip install agent-client-protocol + ``` +2. Launch the provided echo agent to verify your setup: + ```bash + python examples/echo_agent.py + ``` +3. Point your ACP-capable client at the running process (for Zed, configure an Agent Server entry). The SDK takes care of JSON-RPC framing and lifecycle transitions. -from acp import ( - Agent, - AgentSideConnection, - InitializeRequest, - InitializeResponse, - NewSessionRequest, - NewSessionResponse, - PromptRequest, - PromptResponse, - SessionNotification, - stdio_streams, -) -from acp.schema import ContentBlock1, SessionUpdate2 +Prefer a guided tour? Head to the [Quickstart](quickstart.md) for step-by-step instructions, including how to run the agent from an editor or terminal. +## Documentation map -class EchoAgent(Agent): - def __init__(self, conn): - self._conn = conn +- [Quickstart](quickstart.md): install, run, and extend the echo agent +- [Mini SWE Agent guide](mini-swe-agent.md): bridge mini-swe-agent over ACP, including duet launcher and Textual client - async def initialize(self, params: InitializeRequest) -> InitializeResponse: - return InitializeResponse(protocolVersion=params.protocolVersion) - - async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: - return NewSessionResponse(sessionId="sess-1") - - async def prompt(self, params: PromptRequest) -> PromptResponse: - for block in params.prompt: - text = block.get("text", "") if isinstance(block, dict) else getattr(block, "text", "") - await self._conn.sessionUpdate( - SessionNotification( - sessionId=params.sessionId, - update=SessionUpdate2( - sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=text), - ), - ) - ) - return PromptResponse(stopReason="end_turn") - - -async def main() -> None: - reader, writer = await stdio_streams() - AgentSideConnection(lambda conn: EchoAgent(conn), writer, reader) - await asyncio.Event().wait() - - -if __name__ == "__main__": - asyncio.run(main()) -``` - -- Quickstart: [quickstart.md](quickstart.md) -- Mini SWE Agent example: [mini-swe-agent.md](mini-swe-agent.md) +Source code lives under `src/acp/`, while tests and additional examples are available in `tests/` and `examples/`. If you plan to contribute, see the repository README for the development workflow. diff --git a/docs/mini-swe-agent.md b/docs/mini-swe-agent.md index b68f1eb..70df79d 100644 --- a/docs/mini-swe-agent.md +++ b/docs/mini-swe-agent.md @@ -1,49 +1,54 @@ # Mini SWE Agent bridge -> Just a show of the bridge in action. Not a best-effort or absolutely-correct implementation of the agent. +This example wraps mini-swe-agent behind ACP so editors such as Zed can interact with it over stdio. A duet launcher is included to run a local Textual client beside the bridge for quick experimentation. -This example wraps mini-swe-agent behind ACP so Zed can run it as an external agent over stdio. It also includes a local Textual UI client connected via a duet launcher +## Overview -## Behavior +- Accepts ACP prompts, concatenates text blocks, and forwards them to mini-swe-agent +- Streams language-model output via `session/update` → `agent_message_chunk` +- Emits `tool_call` / `tool_call_update` pairs for shell execution, including stdout and return codes +- Sends a final `agent_message_chunk` when mini-swe-agent prints `COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT` -- Prompts: text blocks are concatenated into a single task string. (Resource embedding is not used in this example.) -- Streaming: only LM output is streamed via `session/update` → `agent_message_chunk`. -- Tool calls: when the agent executes a shell command, the bridge sends: - - `tool_call` with `kind=execute`, pending status, and a bash code block containing the command - - `tool_call_update` upon completion, including output and a `rawOutput` object with `output` and `returncode` -- Final result: on task submission (mini-swe-agent prints `COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT` as the first line), a final `agent_message_chunk` with the submission content is sent. +## Requirements -## Configuration +- Python environment with `mini-swe-agent` installed (`pip install mini-swe-agent`) +- ACP-capable client (e.g. Zed) or the bundled Textual client +- Optional: `.env` file at the repo root for shared configuration when using the duet launcher -Environment variables control the model: +If `mini-swe-agent` is missing, the bridge falls back to the reference copy at `reference/mini-swe-agent/src`. -- `MINI_SWE_MODEL`: model ID (e.g. `openrouter/openai/gpt-4o-mini`) -- `OPENROUTER_API_KEY` for OpenRouter; or `OPENAI_API_KEY` / `ANTHROPIC_API_KEY` for native providers -- Optional `MINI_SWE_MODEL_KWARGS`: JSON, e.g. `{ "api_base": "https://openrouter.ai/api/v1" }` (auto-injected for OpenRouter if missing) +## Configure models and credentials -Agent behavior automatically maps the appropriate API key based on the chosen model and available environment variables. +Set environment variables before launching the bridge: -If `mini-swe-agent` is not installed in the venv, the bridge attempts to import a vendored reference copy under `reference/mini-swe-agent/src`. +- `MINI_SWE_MODEL`: model identifier such as `openrouter/openai/gpt-4o-mini` +- `OPENROUTER_API_KEY` for OpenRouter models, or `OPENAI_API_KEY` / `ANTHROPIC_API_KEY` for native providers +- Optional `MINI_SWE_MODEL_KWARGS`: JSON blob of extra keyword arguments (OpenRouter defaults are injected automatically when omitted) -## How to run +The bridge selects the correct API key based on the chosen model and available variables. -- In Zed (editor integration): configure an agent server to launch `examples/mini_swe_agent/agent.py` and set the environment variables there. Use Zed’s “Open ACP Logs” to inspect `tool_call`/`tool_call_update` and message chunks. -- In terminal (local TUI): run the duet launcher to start both the agent and the Textual client with the same environment and dedicated pipes: +## Run inside Zed + +Add an Agent Server entry targeting `examples/mini_swe_agent/agent.py` and provide the environment variables there. Use Zed’s “Open ACP Logs” panel to observe streamed message chunks and tool call events in real time. + +## Run locally with the duet launcher + +To pair the bridge with the Textual TUI client, run: ```bash python examples/mini_swe_agent/duet.py ``` -The launcher loads `.env` from the repo root (using python-dotenv) so both processes share the same configuration. - -### TUI usage +Both processes inherit settings from `.env` (thanks to `python-dotenv`) and communicate over dedicated pipes. -- Hotkeys: `y` → YOLO, `c` → Confirm, `u` → Human, `Enter` → Continue. -- In Human mode, you’ll be prompted for a bash command; it will be executed and streamed back as a tool call. -- Each executed command appears in the “TOOL CALLS” section with live status and output. +**TUI shortcuts** +- `y`: YOLO +- `c`: Confirm +- `u`: Human (prompts for a shell command and streams it back as a tool call) +- `Enter`: Continue -## Files +## Related files -- Agent entry: [`examples/mini_swe_agent/agent.py`](https://github.com/psiace/agent-client-protocol-python/blob/main/examples/mini_swe_agent/agent.py) -- Duet launcher: [`examples/mini_swe_agent/duet.py`](https://github.com/psiace/agent-client-protocol-python/blob/main/examples/mini_swe_agent/duet.py) -- Textual client: [`examples/mini_swe_agent/client.py`](https://github.com/psiace/agent-client-protocol-python/blob/main/examples/mini_swe_agent/client.py) +- Agent entrypoint: `examples/mini_swe_agent/agent.py` +- Duet launcher: `examples/mini_swe_agent/duet.py` +- Textual client: `examples/mini_swe_agent/client.py` diff --git a/docs/quickstart.md b/docs/quickstart.md index 82bf7ef..a840e37 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,110 +1,78 @@ # Quickstart -Use the published package to build an ACP agent, or run the included example. +This guide gets you from a clean environment to streaming ACP messages from a Python agent. -## Install the SDK +## Prerequisites + +- Python 3.10+ and either `pip` or `uv` +- An ACP-capable client such as Zed (optional but recommended for testing) + +## 1. Install the SDK ```bash pip install agent-client-protocol +# or +uv add agent-client-protocol ``` -## Minimal agent - -```python -import asyncio - -from acp import ( - Agent, - AgentSideConnection, - InitializeRequest, - InitializeResponse, - NewSessionRequest, - NewSessionResponse, - PromptRequest, - PromptResponse, - SessionNotification, - stdio_streams, -) -from acp.schema import ContentBlock1, SessionUpdate2 - - -class EchoAgent(Agent): - def __init__(self, conn): - self._conn = conn - - async def initialize(self, params: InitializeRequest) -> InitializeResponse: - return InitializeResponse(protocolVersion=params.protocolVersion) - - async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: - return NewSessionResponse(sessionId="sess-1") - - async def prompt(self, params: PromptRequest) -> PromptResponse: - for block in params.prompt: - text = block.get("text", "") if isinstance(block, dict) else getattr(block, "text", "") - await self._conn.sessionUpdate( - SessionNotification( - sessionId=params.sessionId, - update=SessionUpdate2( - sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=text), - ), - ) - ) - return PromptResponse(stopReason="end_turn") - - -async def main() -> None: - reader, writer = await stdio_streams() - AgentSideConnection(lambda conn: EchoAgent(conn), writer, reader) - await asyncio.Event().wait() +## 2. Run the echo agent +Launch the ready-made echo example, which streams text blocks back over ACP: -if __name__ == "__main__": - asyncio.run(main()) +```bash +python examples/echo_agent.py ``` -Run this program from your ACP-capable client. - -## Run the Mini SWE Agent bridge in Zed +Keep it running while you connect your client. -Install `mini-swe-agent` (or at least its core dependencies) into the same environment that will run the example: +## 3. Connect from your client -```bash -pip install mini-swe-agent -``` +### Zed -Add an agent server to Zed’s `settings.json`: +Add an Agent Server entry in `settings.json` (Zed → Settings → Agents panel): ```json { "agent_servers": { - "Mini SWE Agent (Python)": { + "Echo Agent (Python)": { "command": "/abs/path/to/python", "args": [ - "/abs/path/to/agent-client-protocol-python/examples/mini_swe_agent/agent.py" - ], - "env": { - "MINI_SWE_MODEL": "openrouter/openai/gpt-4o-mini", - "OPENROUTER_API_KEY": "sk-or-..." - } + "/abs/path/to/agent-client-protocol-python/examples/echo_agent.py" + ] } } } ``` -- For OpenRouter, `api_base` is set automatically to `https://openrouter.ai/api/v1` if not provided. -- Alternatively, use native providers by setting `MINI_SWE_MODEL` accordingly and providing `OPENAI_API_KEY` or `ANTHROPIC_API_KEY`. +Open the Agents panel and start the session. Each message you send should be echoed back via streamed `session/update` notifications. -In Zed, open the Agents panel and select "Mini SWE Agent (Python)". +### Other clients -See [mini-swe-agent.md](mini-swe-agent.md) for behavior and message mapping details. +Any ACP client that communicates over stdio can spawn the same script; no additional transport configuration is required. -## Run locally with a TUI +## 4. Extend the agent -Use the duet launcher to run both the agent and the local Textual client over dedicated pipes: +Create your own agent by subclassing `acp.Agent`. The pattern mirrors the echo example: -```bash -python examples/mini_swe_agent/duet.py +```python +from acp import Agent, PromptRequest, PromptResponse + + +class MyAgent(Agent): + async def prompt(self, params: PromptRequest) -> PromptResponse: + # inspect params.prompt, stream updates, then finish the turn + return PromptResponse(stopReason="end_turn") ``` -The launcher loads `.env` from the repo root so both processes share the same configuration (requires python-dotenv). +Hook it up with `AgentSideConnection` inside an async entrypoint and wire it to your client. Refer to [examples/echo_agent.py](https://github.com/psiace/agent-client-protocol-python/blob/main/examples/echo_agent.py) for the complete structure, including lifetime hooks (`initialize`, `newSession`) and streaming responses. + +## Optional: Mini SWE Agent bridge + +The repository also ships a bridge for [mini-swe-agent](https://github.com/groundx-ai/mini-swe-agent). To try it: + +1. Install the dependency: + ```bash + pip install mini-swe-agent + ``` +2. Configure Zed to run `examples/mini_swe_agent/agent.py` and supply environment variables such as `MINI_SWE_MODEL` and `OPENROUTER_API_KEY`. +3. Review the [Mini SWE Agent guide](mini-swe-agent.md) for environment options, tool-call mapping, and a duet launcher that starts both the bridge and a Textual client (`python examples/mini_swe_agent/duet.py`). diff --git a/examples/agent.py b/examples/agent.py index 2a2dfd7..c398cca 100644 --- a/examples/agent.py +++ b/examples/agent.py @@ -1,4 +1,5 @@ import asyncio +import logging from typing import Any from acp import ( @@ -9,17 +10,25 @@ CancelNotification, InitializeRequest, InitializeResponse, + LoadSessionRequest, + LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse, - SessionNotification, SetSessionModeRequest, SetSessionModeResponse, stdio_streams, PROTOCOL_VERSION, ) -from acp.schema import ContentBlock1, SessionUpdate2 +from acp.schema import ( + AgentCapabilities, + AgentMessageChunk, + McpCapabilities, + PromptCapabilities, + SessionNotification, + TextContentBlock, +) class ExampleAgent(Agent): @@ -27,71 +36,74 @@ def __init__(self, conn: AgentSideConnection) -> None: self._conn = conn self._next_session_id = 0 - async def initialize(self, params: InitializeRequest) -> InitializeResponse: - return InitializeResponse(protocolVersion=PROTOCOL_VERSION, agentCapabilities=None, authMethods=[]) + async def _send_chunk(self, session_id: str, content: Any) -> None: + await self._conn.sessionUpdate( + SessionNotification( + sessionId=session_id, + update=AgentMessageChunk( + sessionUpdate="agent_message_chunk", + content=content, + ), + ) + ) + + async def initialize(self, params: InitializeRequest) -> InitializeResponse: # noqa: ARG002 + logging.info("Received initialize request") + return InitializeResponse( + protocolVersion=PROTOCOL_VERSION, + agentCapabilities=AgentCapabilities( + loadSession=False, + mcpCapabilities=McpCapabilities(http=False, sse=False), + promptCapabilities=PromptCapabilities(audio=False, embeddedContext=False, image=False), + ), + ) async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse | None: # noqa: ARG002 - return {} + logging.info("Received authenticate request") + return AuthenticateResponse() async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: # noqa: ARG002 - session_id = f"sess-{self._next_session_id}" + logging.info("Received new session request") + session_id = str(self._next_session_id) self._next_session_id += 1 return NewSessionResponse(sessionId=session_id) - async def loadSession(self, params): # type: ignore[override] - return None + async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse | None: # noqa: ARG002 + logging.info("Received load session request") + return LoadSessionResponse() async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: # noqa: ARG002 - return {} + logging.info("Received set session mode request") + return SetSessionModeResponse() async def prompt(self, params: PromptRequest) -> PromptResponse: - # Stream a couple of agent message chunks, then end the turn - # 1) Prefix - await self._conn.sessionUpdate( - SessionNotification( - sessionId=params.sessionId, - update=SessionUpdate2( - sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text="Client sent: "), - ), - ) + logging.info("Received prompt request") + + # Notify the client what it just sent and then echo each content block back. + await self._send_chunk( + params.sessionId, + TextContentBlock(type="text", text="Client sent:"), ) - # 2) Echo text blocks for block in params.prompt: - if isinstance(block, dict): - # tolerate raw dicts - if block.get("type") == "text": - text = str(block.get("text", "")) - else: - text = f"<{block.get('type', 'content')}>" - else: - # pydantic model ContentBlock1 - text = getattr(block, "text", "") - await self._conn.sessionUpdate( - SessionNotification( - sessionId=params.sessionId, - update=SessionUpdate2( - sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=text), - ), - ) - ) + await self._send_chunk(params.sessionId, block) + return PromptResponse(stopReason="end_turn") async def cancel(self, params: CancelNotification) -> None: # noqa: ARG002 - return None + logging.info("Received cancel notification") async def extMethod(self, method: str, params: dict) -> dict: # noqa: ARG002 + logging.info("Received extension method call: %s", method) return {"example": "response"} async def extNotification(self, method: str, params: dict) -> None: # noqa: ARG002 - return None + logging.info("Received extension notification: %s", method) async def main() -> None: + logging.basicConfig(level=logging.INFO) reader, writer = await stdio_streams() - # For an agent process, local writes go to client stdin (writer=stdout) - AgentSideConnection(lambda conn: ExampleAgent(conn), writer, reader) + AgentSideConnection(ExampleAgent, writer, reader) await asyncio.Event().wait() diff --git a/examples/client.py b/examples/client.py index 5b07cea..6cde6a8 100644 --- a/examples/client.py +++ b/examples/client.py @@ -1,73 +1,144 @@ import asyncio +import contextlib +import logging import os import sys -from typing import Optional +from pathlib import Path from acp import ( Client, - PROTOCOL_VERSION, ClientSideConnection, InitializeRequest, NewSessionRequest, PromptRequest, + RequestError, SessionNotification, + PROTOCOL_VERSION, ) +from acp.schema import TextContentBlock class ExampleClient(Client): + async def requestPermission(self, params): # type: ignore[override] + raise RequestError.method_not_found("session/request_permission") + + async def writeTextFile(self, params): # type: ignore[override] + raise RequestError.method_not_found("fs/write_text_file") + + async def readTextFile(self, params): # type: ignore[override] + raise RequestError.method_not_found("fs/read_text_file") + + async def createTerminal(self, params): # type: ignore[override] + raise RequestError.method_not_found("terminal/create") + + async def terminalOutput(self, params): # type: ignore[override] + raise RequestError.method_not_found("terminal/output") + + async def releaseTerminal(self, params): # type: ignore[override] + raise RequestError.method_not_found("terminal/release") + + async def waitForTerminalExit(self, params): # type: ignore[override] + raise RequestError.method_not_found("terminal/wait_for_exit") + + async def killTerminal(self, params): # type: ignore[override] + raise RequestError.method_not_found("terminal/kill") + async def sessionUpdate(self, params: SessionNotification) -> None: update = params.update - kind = getattr(update, "sessionUpdate", None) if not isinstance(update, dict) else update.get("sessionUpdate") - if kind == "agent_message_chunk": - # Handle both dict and model shapes - content = update["content"] if isinstance(update, dict) else getattr(update, "content", None) - text = content.get("text") if isinstance(content, dict) else getattr(content, "text", "") - print(f"| Agent: {text}") + if isinstance(update, dict): + kind = update.get("sessionUpdate") + content = update.get("content") + else: + kind = getattr(update, "sessionUpdate", None) + content = getattr(update, "content", None) + if kind != "agent_message_chunk" or content is None: + return -async def interactive_loop(conn: ClientSideConnection, session_id: str) -> None: + if isinstance(content, dict): + text = content.get("text", "") + else: + text = getattr(content, "text", "") + print(f"| Agent: {text}") + + async def extMethod(self, method: str, params: dict) -> dict: # noqa: ARG002 + raise RequestError.method_not_found(method) + + async def extNotification(self, method: str, params: dict) -> None: # noqa: ARG002 + raise RequestError.method_not_found(method) + + +async def read_console(prompt: str) -> str: loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, lambda: input(prompt)) + + +async def interactive_loop(conn: ClientSideConnection, session_id: str) -> None: while True: try: - line = await loop.run_in_executor(None, lambda: input("> ")) + line = await read_console("> ") except EOFError: break + except KeyboardInterrupt: + print("", file=sys.stderr) + break + if not line: continue + try: - await conn.prompt(PromptRequest(sessionId=session_id, prompt=[{"type": "text", "text": line}])) - except Exception as e: # noqa: BLE001 - print(f"error: {e}", file=sys.stderr) + await conn.prompt( + PromptRequest( + sessionId=session_id, + prompt=[TextContentBlock(type="text", text=line)], + ) + ) + except Exception as exc: # noqa: BLE001 + logging.error("Prompt failed: %s", exc) async def main(argv: list[str]) -> int: + logging.basicConfig(level=logging.INFO) + if len(argv) < 2: print("Usage: python examples/client.py AGENT_PROGRAM [ARGS...]", file=sys.stderr) return 2 - # Spawn agent subprocess + program = argv[1] + args = argv[2:] + + program_path = Path(program) + spawn_program = program + spawn_args = args + + if program_path.exists() and not os.access(program_path, os.X_OK): + spawn_program = sys.executable + spawn_args = [str(program_path), *args] + proc = await asyncio.create_subprocess_exec( - sys.executable, - *argv[1:], + spawn_program, + *spawn_args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, ) - assert proc.stdin and proc.stdout - # Connect to agent stdio - conn = ClientSideConnection(lambda _agent: ExampleClient(), proc.stdin, proc.stdout) + if proc.stdin is None or proc.stdout is None: + print("Agent process does not expose stdio pipes", file=sys.stderr) + return 1 + + client_impl = ExampleClient() + conn = ClientSideConnection(lambda _agent: client_impl, proc.stdin, proc.stdout) - # Initialize and create session await conn.initialize(InitializeRequest(protocolVersion=PROTOCOL_VERSION, clientCapabilities=None)) - new_sess = await conn.newSession(NewSessionRequest(mcpServers=[], cwd=os.getcwd())) + session = await conn.newSession(NewSessionRequest(mcpServers=[], cwd=os.getcwd())) - # Run REPL until EOF - await interactive_loop(conn, new_sess.sessionId) + await interactive_loop(conn, session.sessionId) - try: + if proc.returncode is None: proc.terminate() - except ProcessLookupError: - pass + with contextlib.suppress(ProcessLookupError): + await proc.wait() + return 0 diff --git a/examples/echo_agent.py b/examples/echo_agent.py index 92e8f46..3a7f1c9 100644 --- a/examples/echo_agent.py +++ b/examples/echo_agent.py @@ -12,7 +12,7 @@ SessionNotification, stdio_streams, ) -from acp.schema import ContentBlock1, SessionUpdate2 +from acp.schema import TextContentBlock, AgentMessageChunk class EchoAgent(Agent): @@ -31,9 +31,9 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: await self._conn.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=text), + content=TextContentBlock(type="text", text=text), ), ) ) diff --git a/examples/mini_swe_agent/agent.py b/examples/mini_swe_agent/agent.py index e312136..4b24838 100644 --- a/examples/mini_swe_agent/agent.py +++ b/examples/mini_swe_agent/agent.py @@ -26,19 +26,18 @@ PROTOCOL_VERSION, ) from acp.schema import ( - ContentBlock1, + AgentMessageChunk, + AgentThoughtChunk, + AllowedOutcome, + ContentToolCallContent, PermissionOption, RequestPermissionRequest, RequestPermissionResponse, - RequestPermissionOutcome1, - RequestPermissionOutcome2, - SessionUpdate1, - SessionUpdate2, - SessionUpdate3, - SessionUpdate4, - SessionUpdate5, - ToolCallContent1, + TextContentBlock, + ToolCallStart, + ToolCallProgress, ToolCallUpdate, + UserMessageChunk, ) @@ -130,9 +129,9 @@ def _send_cost_hint(self) -> None: cost = float(getattr(self.model, "cost", 0.0)) except Exception: cost = 0.0 - hint = SessionUpdate3( + hint = AgentThoughtChunk( sessionUpdate="agent_thought_chunk", - content=ContentBlock1(type="text", text=f"__COST__:{cost:.2f}"), + content=TextContentBlock(type="text", text=f"__COST__:{cost:.2f}"), ) try: loop = asyncio.get_running_loop() @@ -142,15 +141,15 @@ def _send_cost_hint(self) -> None: async def on_tool_start(self, title: str, command: str, tool_call_id: str) -> None: """Send a tool_call start notification for a bash command.""" - update = SessionUpdate4( + update = ToolCallStart( sessionUpdate="tool_call", toolCallId=tool_call_id, title=title, kind="execute", status="pending", content=[ - ToolCallContent1( - type="content", content=ContentBlock1(type="text", text=f"```bash\n{command}\n```") + ContentToolCallContent( + type="content", content=TextContentBlock(type="text", text=f"```bash\n{command}\n```") ) ], rawInput={"command": command}, @@ -166,13 +165,13 @@ async def on_tool_complete( status: str = "completed", ) -> None: """Send a tool_call_update with the final output and return code.""" - update = SessionUpdate5( + update = ToolCallProgress( sessionUpdate="tool_call_update", toolCallId=tool_call_id, status=status, content=[ - ToolCallContent1( - type="content", content=ContentBlock1(type="text", text=f"```ansi\n{output}\n```") + ContentToolCallContent( + type="content", content=TextContentBlock(type="text", text=f"```ansi\n{output}\n```") ) ], rawOutput={"output": output, "returncode": returncode}, @@ -185,8 +184,8 @@ def add_message(self, role: str, content: str, **kwargs): if not getattr(self, "_emit_updates", True) or role != "assistant": return text = str(content) - block = ContentBlock1(type="text", text=text) - update = SessionUpdate2(sessionUpdate="agent_message_chunk", content=block) + block = TextContentBlock(type="text", text=text) + update = AgentMessageChunk(sessionUpdate="agent_message_chunk", content=block) try: loop = asyncio.get_running_loop() loop.create_task(self._send(update)) @@ -208,9 +207,9 @@ def _confirm_action_sync(self, tool_call_id: str, command: str) -> bool: kind="execute", status="pending", content=[ - ToolCallContent1( + ContentToolCallContent( type="content", - content=ContentBlock1(type="text", text=f"```bash\n{command}\n```"), + content=TextContentBlock(type="text", text=f"```bash\n{command}\n```"), ) ], rawInput={"command": command}, @@ -222,7 +221,7 @@ def _confirm_action_sync(self, tool_call_id: str, command: str) -> bool: except Exception: return False out = resp.outcome - if isinstance(out, RequestPermissionOutcome2) and out.optionId in ("allow-once", "allow-always"): + if isinstance(out, AllowedOutcome) and out.optionId in ("allow-once", "allow-always"): return True return False @@ -253,7 +252,7 @@ def execute_action(self, action: dict) -> dict: # type: ignore[override] # Mark in progress self._schedule( self._send( - SessionUpdate5( + ToolCallProgress( sessionUpdate="tool_call_update", toolCallId=tool_id, status="in_progress", @@ -428,9 +427,9 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: await self._client.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1( + content=TextContentBlock( type="text", text=( "mini-swe-agent load error: " @@ -476,9 +475,9 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: await self._client.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text="Human mode: please submit a bash command."), + content=TextContentBlock(type="text", text="Human mode: please submit a bash command."), ), ) ) @@ -508,9 +507,9 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: await self._client.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1( + content=TextContentBlock( type="text", text=( "Agent finished. Type a new task in the next message to continue, or do nothing to end." @@ -528,9 +527,9 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: await self._client.sessionUpdate( SessionNotification( sessionId=params.sessionId, - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text=f"Error while processing: {e}"), + content=TextContentBlock(type="text", text=f"Error while processing: {e}"), ), ) ) diff --git a/examples/mini_swe_agent/client.py b/examples/mini_swe_agent/client.py index b3e0381..33d6380 100644 --- a/examples/mini_swe_agent/client.py +++ b/examples/mini_swe_agent/client.py @@ -32,17 +32,15 @@ SetSessionModeRequest, ) from acp.schema import ( - ContentBlock1, + AgentMessageChunk, + AgentThoughtChunk, + AllowedOutcome, + ContentToolCallContent, PermissionOption, - RequestPermissionOutcome1, - RequestPermissionOutcome2, - SessionUpdate1, - SessionUpdate2, - SessionUpdate3, - SessionUpdate4, - SessionUpdate5, - ToolCallContent1, - ToolCallUpdate, + TextContentBlock, + ToolCallStart, + ToolCallProgress, + UserMessageChunk, ) from acp.stdio import _WritePipeProtocol @@ -190,24 +188,24 @@ def _post(msg: UIMessage) -> None: else: self._app.call_from_thread(lambda: (self._app.enqueue_message(msg), self._app.on_message_added())) - if isinstance(upd, SessionUpdate2): + if isinstance(upd, AgentMessageChunk): # agent message txt = _content_to_text(upd.content) _post(UIMessage("assistant", txt)) - elif isinstance(upd, SessionUpdate1): + elif isinstance(upd, UserMessageChunk): txt = _content_to_text(upd.content) _post(UIMessage("user", txt)) - elif isinstance(upd, SessionUpdate3): + elif isinstance(upd, AgentThoughtChunk): # agent thought chunk (informational) txt = _content_to_text(upd.content) _post(UIMessage("assistant", f"[thought]\n{txt}")) - elif isinstance(upd, SessionUpdate4): + elif isinstance(upd, ToolCallStart): # tool call start → record structured state self._app._update_tool_call( upd.toolCallId, title=upd.title or "", status=upd.status or "pending", content=upd.content ) self._app.call_from_thread(self._app.update_content) - elif isinstance(upd, SessionUpdate5): + elif isinstance(upd, ToolCallProgress): # tool call update → update structured state self._app._update_tool_call(upd.toolCallId, status=upd.status, content=upd.content) self._app.call_from_thread(self._app.update_content) @@ -216,17 +214,13 @@ async def requestPermission(self, params: RequestPermissionRequest) -> RequestPe # Respect client-side mode shortcuts mode = self._app.mode if mode == "yolo": - return RequestPermissionResponse( - outcome=RequestPermissionOutcome2(outcome="selected", optionId="allow-once") - ) + return RequestPermissionResponse(outcome=AllowedOutcome(outcome="selected", optionId="allow-once")) # Prompt user for decision prompt = "Approve tool call? Press Enter to allow once, type 'n' to reject" ans = self._app.input_container.request_input(prompt).strip().lower() if ans in ("", "y", "yes"): - return RequestPermissionResponse( - outcome=RequestPermissionOutcome2(outcome="selected", optionId="allow-once") - ) - return RequestPermissionResponse(outcome=RequestPermissionOutcome2(outcome="selected", optionId="reject-once")) + return RequestPermissionResponse(outcome=AllowedOutcome(outcome="selected", optionId="allow-once")) + return RequestPermissionResponse(outcome=AllowedOutcome(outcome="selected", optionId="reject-once")) # Optional features not used in this example async def writeTextFile(self, params): @@ -293,7 +287,7 @@ def __init__(self) -> None: self._conn: Optional[ClientSideConnection] = None self._session_id: Optional[str] = None self._pending_human_command: Optional[str] = None - self._outbox: "queue.Queue[list[ContentBlock1]]" = queue.Queue() + self._outbox: "queue.Queue[list[TextContentBlock]]" = queue.Queue() # Pagination and metrics self._i_step: int = 0 self.n_steps: int = 1 @@ -322,7 +316,7 @@ def on_mount(self) -> None: def _ask_initial_task(self) -> None: task = self.input_container.request_input("Enter your task for mini-swe-agent:") - blocks = [ContentBlock1(type="text", text=task)] + blocks = [TextContentBlock(type="text", text=task)] self._outbox.put(blocks) self._start_connection_thread() @@ -417,7 +411,7 @@ async def _run_connection(self) -> None: # Autostep loop: take queued prompts and send; if none and mode != human, keep stepping while self.agent_state != "STOPPED": - blocks: list[ContentBlock1] + blocks: list[TextContentBlock] try: blocks = self._outbox.get_nowait() except queue.Empty: @@ -443,7 +437,7 @@ def _ask_new(): "Turn complete. Type a new task or press Enter to continue:" ) if task.strip(): - self._outbox.put([ContentBlock1(type="text", text=task)]) + self._outbox.put([TextContentBlock(type="text", text=task)]) else: self._outbox.put([]) self._ask_new_task_pending = False @@ -463,7 +457,7 @@ def send_human_command(self, cmd: str) -> None: if not cmd.strip(): return code = f"```bash\n{cmd.strip()}\n```" - self._outbox.put([ContentBlock1(type="text", text=code)]) + self._outbox.put([TextContentBlock(type="text", text=code)]) # --- UI updates --- @@ -493,7 +487,7 @@ def _update_tool_call( # Append any text content blocks texts = [] for c in content: - if isinstance(c, ToolCallContent1) and getattr(c.content, "type", None) == "text": + if isinstance(c, ContentToolCallContent) and getattr(c.content, "type", None) == "text": texts.append(getattr(c.content, "text", "")) if texts: tc.setdefault("content", []).append("\n".join(texts)) diff --git a/pyproject.toml b/pyproject.toml index c14aa3d..9c3409f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agent-client-protocol" -version = "0.3.0" +version = "0.4.5" description = "A Python implement of Agent Client Protocol (ACP, by Zed Industries)" authors = [{ name = "Chojan Shang", email = "psiace@apache.org" }] readme = "README.md" diff --git a/schema/VERSION b/schema/VERSION new file mode 100644 index 0000000..d0b8723 --- /dev/null +++ b/schema/VERSION @@ -0,0 +1 @@ +refs/tags/v0.4.5 diff --git a/schema/meta.json b/schema/meta.json index ec0ba7d..0f0c6c4 100644 --- a/schema/meta.json +++ b/schema/meta.json @@ -6,7 +6,8 @@ "session_load": "session/load", "session_new": "session/new", "session_prompt": "session/prompt", - "session_set_mode": "session/set_mode" + "session_set_mode": "session/set_mode", + "session_set_model": "session/set_model" }, "clientMethods": { "fs_read_text_file": "fs/read_text_file", diff --git a/schema/schema.json b/schema/schema.json index 8e7e476..5ab4110 100644 --- a/schema/schema.json +++ b/schema/schema.json @@ -111,11 +111,15 @@ "$ref": "#/$defs/PromptResponse", "title": "PromptResponse" }, + { + "$ref": "#/$defs/SetSessionModelResponse", + "title": "SetSessionModelResponse" + }, { "title": "ExtMethodResponse" } ], - "description": "All possible responses that an agent can send to a client.\n\nThis enum is used internally for routing RPC responses. You typically won't need\nto use this directly - the responses are handled automatically by the connection.\n\nThese are responses to the corresponding ClientRequest variants.", + "description": "All possible responses that an agent can send to a client.\n\nThis enum is used internally for routing RPC responses. You typically won't need\nto use this directly - the responses are handled automatically by the connection.\n\nThese are responses to the corresponding `ClientRequest` variants.", "x-docs-ignore": true }, "Annotations": { @@ -261,7 +265,7 @@ "description": "Input for the command if required" }, "name": { - "description": "Command name (e.g., \"create_plan\", \"research_codebase\").", + "description": "Command name (e.g., `create_plan`, `research_codebase`).", "type": "string" } }, @@ -394,6 +398,10 @@ "$ref": "#/$defs/PromptRequest", "title": "PromptRequest" }, + { + "$ref": "#/$defs/SetSessionModelRequest", + "title": "SetSessionModelRequest" + }, { "title": "ExtMethodRequest" } @@ -439,7 +447,7 @@ "title": "ExtMethodResponse" } ], - "description": "All possible responses that a client can send to an agent.\n\nThis enum is used internally for routing RPC responses. You typically won't need\nto use this directly - the responses are handled automatically by the connection.\n\nThese are responses to the corresponding AgentRequest variants.", + "description": "All possible responses that a client can send to an agent.\n\nThis enum is used internally for routing RPC responses. You typically won't need\nto use this directly - the responses are handled automatically by the connection.\n\nThese are responses to the corresponding `AgentRequest` variants.", "x-docs-ignore": true }, "ContentBlock": { @@ -987,6 +995,17 @@ "_meta": { "description": "Extension point for implementations" }, + "models": { + "anyOf": [ + { + "$ref": "#/$defs/SessionModelState" + }, + { + "type": "null" + } + ], + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nInitial model state if supported by the Agent" + }, "modes": { "anyOf": [ { @@ -1124,6 +1143,38 @@ ], "description": "Configuration for connecting to an MCP (Model Context Protocol) server.\n\nMCP servers provide tools and context that the agent can use when\nprocessing prompts.\n\nSee protocol docs: [MCP Servers](https://agentclientprotocol.com/protocol/session-setup#mcp-servers)" }, + "ModelId": { + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nA unique identifier for a model.", + "type": "string" + }, + "ModelInfo": { + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nInformation about a selectable model.", + "properties": { + "_meta": { + "description": "Extension point for implementations" + }, + "description": { + "description": "Optional description of the model.", + "type": [ + "string", + "null" + ] + }, + "modelId": { + "$ref": "#/$defs/ModelId", + "description": "Unique identifier for the model." + }, + "name": { + "description": "Human-readable name of the model.", + "type": "string" + } + }, + "required": [ + "modelId", + "name" + ], + "type": "object" + }, "NewSessionRequest": { "description": "Request parameters for creating a new session.\n\nSee protocol docs: [Creating a Session](https://agentclientprotocol.com/protocol/session-setup#creating-a-session)", "properties": { @@ -1156,6 +1207,17 @@ "_meta": { "description": "Extension point for implementations" }, + "models": { + "anyOf": [ + { + "$ref": "#/$defs/SessionModelState" + }, + { + "type": "null" + } + ], + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nInitial model state if supported by the Agent" + }, "modes": { "anyOf": [ { @@ -1690,6 +1752,30 @@ ], "type": "object" }, + "SessionModelState": { + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nThe set of models and the one currently active.", + "properties": { + "_meta": { + "description": "Extension point for implementations" + }, + "availableModels": { + "description": "The set of models that the Agent can use", + "items": { + "$ref": "#/$defs/ModelInfo" + }, + "type": "array" + }, + "currentModelId": { + "$ref": "#/$defs/ModelId", + "description": "The current model the Agent is in." + } + }, + "required": [ + "currentModelId", + "availableModels" + ], + "type": "object" + }, "SessionNotification": { "description": "Notification containing a session update from the agent.\n\nUsed to stream real-time progress and results during prompt processing.\n\nSee protocol docs: [Agent Reports Output](https://agentclientprotocol.com/protocol/prompt-turn#3-agent-reports-output)", "properties": { @@ -1992,6 +2078,40 @@ "x-method": "session/set_mode", "x-side": "agent" }, + "SetSessionModelRequest": { + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nRequest parameters for setting a session model.", + "properties": { + "_meta": { + "description": "Extension point for implementations" + }, + "modelId": { + "$ref": "#/$defs/ModelId", + "description": "The ID of the model to set." + }, + "sessionId": { + "$ref": "#/$defs/SessionId", + "description": "The ID of the session to set the model for." + } + }, + "required": [ + "sessionId", + "modelId" + ], + "type": "object", + "x-method": "session/set_model", + "x-side": "agent" + }, + "SetSessionModelResponse": { + "description": "**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nResponse to `session/set_model` method.", + "properties": { + "_meta": { + "description": "Extension point for implementations" + } + }, + "type": "object", + "x-method": "session/set_model", + "x-side": "agent" + }, "StopReason": { "description": "Reasons why an agent stops processing a prompt turn.\n\nSee protocol docs: [Stop Reasons](https://agentclientprotocol.com/protocol/prompt-turn#stop-reasons)", "oneOf": [ @@ -2535,7 +2655,7 @@ "x-side": "client" }, "WriteTextFileResponse": { - "description": "Response to fs/write_text_file", + "description": "Response to `fs/write_text_file`", "properties": { "_meta": { "description": "Extension point for implementations" diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..01e0c60 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Utility scripts for generating ACP bindings.""" diff --git a/scripts/gen_all.py b/scripts/gen_all.py index 3d7f6c9..f6f1243 100644 --- a/scripts/gen_all.py +++ b/scripts/gen_all.py @@ -1,15 +1,152 @@ #!/usr/bin/env python3 from __future__ import annotations -import runpy +import argparse +import json +import os +import re +import sys +import urllib.error +import urllib.request from pathlib import Path -SCRIPTS = Path(__file__).resolve().parent +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.append(str(ROOT)) + +from scripts import gen_meta, gen_schema # noqa: E402 pylint: disable=wrong-import-position + +SCHEMA_DIR = ROOT / "schema" +SCHEMA_JSON = SCHEMA_DIR / "schema.json" +META_JSON = SCHEMA_DIR / "meta.json" +VERSION_FILE = SCHEMA_DIR / "VERSION" + +DEFAULT_REPO = "zed-industries/agent-client-protocol" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Regenerate schema.py and meta.py from the ACP schema.") + parser.add_argument( + "--version", + "-v", + help=( + "Git ref (tag/branch) of zed-industries/agent-client-protocol to fetch the schema from. " + "If omitted, uses the cached schema files or falls back to 'main' when missing." + ), + ) + parser.add_argument( + "--repo", + default=os.environ.get("ACP_SCHEMA_REPO", DEFAULT_REPO), + help="Source repository providing schema.json/meta.json (default: %(default)s)", + ) + parser.add_argument( + "--no-download", + action="store_true", + help="Skip downloading schema files even when a version is provided.", + ) + parser.add_argument( + "--format", + dest="format_output", + action="store_true", + help="Format generated files with 'uv run ruff format'.", + ) + parser.add_argument( + "--no-format", + dest="format_output", + action="store_false", + help="Disable formatting with ruff.", + ) + parser.set_defaults(format_output=True) + parser.add_argument( + "--force", + action="store_true", + help="Force schema download even if the requested ref is already cached locally.", + ) + return parser.parse_args() def main() -> None: - runpy.run_path(str(SCRIPTS / "gen_schema.py"), run_name="__main__") - runpy.run_path(str(SCRIPTS / "gen_meta.py"), run_name="__main__") + args = parse_args() + + version = args.version or os.environ.get("ACP_SCHEMA_VERSION") + repo = args.repo + should_download = _should_download(args, version) + + if should_download: + ref = resolve_ref(version) + download_schema(repo, ref) + else: + ref = resolve_ref(version) if version else _cached_ref() + + if not (SCHEMA_JSON.exists() and META_JSON.exists()): + print("schema/schema.json or schema/meta.json missing; run with --version to fetch them.", file=sys.stderr) + sys.exit(1) + + gen_schema.generate_schema(format_output=args.format_output) + gen_meta.generate_meta() + + if ref: + print(f"Generated schema using ref: {ref}") + else: + print("Generated schema using local schema files") + + +def _should_download(args: argparse.Namespace, version: str | None) -> bool: + env_override = os.environ.get("ACP_SCHEMA_DOWNLOAD") + if env_override is not None: + return env_override.lower() in {"1", "true", "yes"} + if args.no_download: + return False + if version: + if not SCHEMA_JSON.exists() or not META_JSON.exists(): + return True + cached = _cached_ref() + if args.force: + return True + return cached != resolve_ref(version) + return not (SCHEMA_JSON.exists() and META_JSON.exists()) + + +def resolve_ref(version: str | None) -> str: + if not version: + return "refs/heads/main" + if version.startswith("refs/"): + return version + if re.fullmatch(r"v?\d+\.\d+\.\d+", version): + value = version if version.startswith("v") else f"v{version}" + return f"refs/tags/{value}" + return f"refs/heads/{version}" + + +def download_schema(repo: str, ref: str) -> None: + SCHEMA_DIR.mkdir(parents=True, exist_ok=True) + schema_url = f"https://raw.githubusercontent.com/{repo}/{ref}/schema/schema.json" + meta_url = f"https://raw.githubusercontent.com/{repo}/{ref}/schema/meta.json" + try: + schema_data = fetch_json(schema_url) + meta_data = fetch_json(meta_url) + except RuntimeError as exc: # pragma: no cover - network error path + print(exc, file=sys.stderr) + sys.exit(1) + + SCHEMA_JSON.write_text(json.dumps(schema_data, indent=2), encoding="utf-8") + META_JSON.write_text(json.dumps(meta_data, indent=2), encoding="utf-8") + VERSION_FILE.write_text(ref + "\n", encoding="utf-8") + print(f"Fetched schema and meta from {repo}@{ref}") + + +def fetch_json(url: str) -> dict: + try: + with urllib.request.urlopen(url) as response: # noqa: S310 - trusted source configured by repo + return json.loads(response.read().decode("utf-8")) + except urllib.error.URLError as exc: + raise RuntimeError(f"Failed to fetch {url}: {exc}") from exc # noqa: TRY003 + + +def _cached_ref() -> str | None: + if VERSION_FILE.exists(): + return VERSION_FILE.read_text(encoding="utf-8").strip() or None + return None if __name__ == "__main__": diff --git a/scripts/gen_meta.py b/scripts/gen_meta.py index e381c68..4eb43c5 100644 --- a/scripts/gen_meta.py +++ b/scripts/gen_meta.py @@ -4,21 +4,38 @@ import json from pathlib import Path -ROOT = Path.cwd() +ROOT = Path(__file__).resolve().parents[1] +SCHEMA_DIR = ROOT / "schema" +VERSION_FILE = SCHEMA_DIR / "VERSION" def main() -> None: - meta_json = ROOT / "schema" / "meta.json" + generate_meta() + + +def generate_meta() -> None: + meta_json = SCHEMA_DIR / "meta.json" out_py = ROOT / "src" / "acp" / "meta.py" - data = json.loads(meta_json.read_text()) + if not meta_json.exists(): + raise SystemExit("schema/meta.json not found. Run gen_schema.py first.") # noqa: TRY003 + + data = json.loads(meta_json.read_text("utf-8")) agent_methods = data.get("agentMethods", {}) client_methods = data.get("clientMethods", {}) version = data.get("version", 1) + header_lines = ["# Generated from schema/meta.json. Do not edit by hand."] + if VERSION_FILE.exists(): + ref = VERSION_FILE.read_text("utf-8").strip() + if ref: + header_lines.append(f"# Schema ref: {ref}") + out_py.write_text( - "# This file is generated from schema/meta.json. Do not edit by hand.\n" - f"AGENT_METHODS = {agent_methods!r}\n" - f"CLIENT_METHODS = {client_methods!r}\n" - f"PROTOCOL_VERSION = {int(version)}\n" + "\n".join(header_lines) + + "\n" + + f"AGENT_METHODS = {agent_methods!r}\n" + + f"CLIENT_METHODS = {client_methods!r}\n" + + f"PROTOCOL_VERSION = {int(version)}\n", + encoding="utf-8", ) diff --git a/scripts/gen_schema.py b/scripts/gen_schema.py index cbe25cb..10d4366 100644 --- a/scripts/gen_schema.py +++ b/scripts/gen_schema.py @@ -1,29 +1,90 @@ #!/usr/bin/env python3 from __future__ import annotations +import argparse +import re +import shutil import subprocess import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] +SCHEMA_DIR = ROOT / "schema" +SCHEMA_JSON = SCHEMA_DIR / "schema.json" +VERSION_FILE = SCHEMA_DIR / "VERSION" +SCHEMA_OUT = ROOT / "src" / "acp" / "schema.py" + +BACKCOMPAT_MARKER = "# Backwards compatibility aliases" + +# Map of numbered classes produced by datamodel-code-generator to descriptive names. +# Keep this in sync with the Rust/TypeScript SDK nomenclature. +RENAME_MAP: dict[str, str] = { + "AvailableCommandInput1": "CommandInputHint", + "ContentBlock1": "TextContentBlock", + "ContentBlock2": "ImageContentBlock", + "ContentBlock3": "AudioContentBlock", + "ContentBlock4": "ResourceContentBlock", + "ContentBlock5": "EmbeddedResourceContentBlock", + "McpServer1": "HttpMcpServer", + "McpServer2": "SseMcpServer", + "McpServer3": "StdioMcpServer", + "RequestPermissionOutcome1": "DeniedOutcome", + "RequestPermissionOutcome2": "AllowedOutcome", + "SessionUpdate1": "UserMessageChunk", + "SessionUpdate2": "AgentMessageChunk", + "SessionUpdate3": "AgentThoughtChunk", + "SessionUpdate4": "ToolCallStart", + "SessionUpdate5": "ToolCallProgress", + "SessionUpdate6": "AgentPlanUpdate", + "SessionUpdate7": "AvailableCommandsUpdate", + "SessionUpdate8": "CurrentModeUpdate", + "ToolCallContent1": "ContentToolCallContent", + "ToolCallContent2": "FileEditToolCallContent", + "ToolCallContent3": "TerminalToolCallContent", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Generate src/acp/schema.py from the ACP JSON schema.") + parser.add_argument( + "--format", + dest="format_output", + action="store_true", + help="Format generated files with 'uv run ruff format'.", + ) + parser.add_argument( + "--no-format", + dest="format_output", + action="store_false", + help="Disable formatting with ruff.", + ) + parser.set_defaults(format_output=True) + return parser.parse_args() def main() -> None: - schema_json = ROOT / "schema" / "schema.json" - out_py = ROOT / "src" / "acp" / "schema.py" - if not schema_json.exists(): - print(f"Schema not found at {schema_json}. Run 'npm run generate:json-schema' first.", file=sys.stderr) + args = parse_args() + generate_schema(format_output=args.format_output) + + +def generate_schema(*, format_output: bool = True) -> None: + if not SCHEMA_JSON.exists(): + print( + "Schema file missing. Ensure schema/schema.json exists (run gen_all.py --version to download).", + file=sys.stderr, + ) sys.exit(1) + cmd = [ sys.executable, "-m", "datamodel_code_generator", "--input", - str(schema_json), + str(SCHEMA_JSON), "--input-file-type", "jsonschema", "--output", - str(out_py), + str(SCHEMA_OUT), "--target-python-version", "3.12", "--collapse-root-models", @@ -31,7 +92,73 @@ def main() -> None: "pydantic_v2.BaseModel", "--use-annotated", ] + subprocess.check_call(cmd) # noqa: S603 + warnings = rename_types(SCHEMA_OUT) + for warning in warnings: + print(f"Warning: {warning}", file=sys.stderr) + + if format_output: + format_with_ruff(SCHEMA_OUT) + + +def rename_types(output_path: Path) -> list[str]: + if not output_path.exists(): + raise RuntimeError(f"Generated schema not found at {output_path}") # noqa: TRY003 + + content = output_path.read_text(encoding="utf-8") + + header_lines = ["# Generated from schema/schema.json. Do not edit by hand."] + if VERSION_FILE.exists(): + ref = VERSION_FILE.read_text(encoding="utf-8").strip() + if ref: + header_lines.append(f"# Schema ref: {ref}") + + existing_header = re.match(r"(#.*\n)+", content) + if existing_header: + content = content[existing_header.end() :] + content = content.lstrip("\n") + + marker_index = content.find(BACKCOMPAT_MARKER) + if marker_index != -1: + content = content[:marker_index].rstrip() + + for old, new in sorted(RENAME_MAP.items(), key=lambda item: len(item[0]), reverse=True): + pattern = re.compile(rf"\b{re.escape(old)}\b") + content = pattern.sub(new, content) + + leftover_class_pattern = re.compile(r"^class (\w+\d+)\(", re.MULTILINE) + leftover_classes = sorted(set(leftover_class_pattern.findall(content))) + + header_block = "\n".join(header_lines) + "\n\n" + alias_lines = [f"{old} = {new}" for old, new in sorted(RENAME_MAP.items())] + alias_block = BACKCOMPAT_MARKER + "\n" + "\n".join(alias_lines) + "\n" + + content = header_block + content.rstrip() + "\n\n" + alias_block + if not content.endswith("\n"): + content += "\n" + output_path.write_text(content, encoding="utf-8") + + warnings: list[str] = [] + if leftover_classes: + warnings.append( + "Unrenamed schema models detected: " + + ", ".join(leftover_classes) + + ". Update RENAME_MAP in scripts/gen_schema.py." + ) + + return warnings + + +def format_with_ruff(file_path: Path) -> None: + uv_executable = shutil.which("uv") + if uv_executable is None: + print("Warning: 'uv' executable not found; skipping formatting.", file=sys.stderr) + return + try: + subprocess.check_call([uv_executable, "run", "ruff", "format", str(file_path)]) # noqa: S603 + except (FileNotFoundError, subprocess.CalledProcessError) as exc: # pragma: no cover - best effort + print(f"Warning: failed to format {file_path}: {exc}", file=sys.stderr) if __name__ == "__main__": diff --git a/src/acp/__init__.py b/src/acp/__init__.py index c21712a..9e916de 100644 --- a/src/acp/__init__.py +++ b/src/acp/__init__.py @@ -22,6 +22,7 @@ KillTerminalCommandRequest, KillTerminalCommandResponse, LoadSessionRequest, + LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, @@ -33,6 +34,8 @@ RequestPermissionRequest, RequestPermissionResponse, SessionNotification, + SetSessionModelRequest, + SetSessionModelResponse, SetSessionModeRequest, SetSessionModeResponse, TerminalOutputRequest, @@ -55,6 +58,7 @@ "NewSessionRequest", "NewSessionResponse", "LoadSessionRequest", + "LoadSessionResponse", "AuthenticateRequest", "AuthenticateResponse", "PromptRequest", @@ -69,6 +73,8 @@ "SessionNotification", "SetSessionModeRequest", "SetSessionModeResponse", + "SetSessionModelRequest", + "SetSessionModelResponse", # terminal types "CreateTerminalRequest", "CreateTerminalResponse", diff --git a/src/acp/core.py b/src/acp/core.py index 2d54c09..46e13a3 100644 --- a/src/acp/core.py +++ b/src/acp/core.py @@ -22,6 +22,7 @@ KillTerminalCommandRequest, KillTerminalCommandResponse, LoadSessionRequest, + LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, @@ -33,6 +34,8 @@ RequestPermissionRequest, RequestPermissionResponse, SessionNotification, + SetSessionModelRequest, + SetSessionModelResponse, SetSessionModeRequest, SetSessionModeResponse, TerminalOutputRequest, @@ -79,6 +82,11 @@ def internal_error(data: dict | None = None) -> RequestError: def auth_required(data: dict | None = None) -> RequestError: return RequestError(-32000, "Authentication required", data) + @staticmethod + def resource_not_found(uri: str | None = None) -> RequestError: + data = {"uri": uri} if uri is not None else None + return RequestError(-32002, "Resource not found", data) + def to_error_obj(self) -> dict: return {"code": self.code, "message": str(self), "data": self.data} @@ -116,6 +124,7 @@ def __init__( self._reader = reader self._next_request_id = 0 self._pending: dict[int, _Pending] = {} + self._inflight: set[asyncio.Task[Any]] = set() self._write_lock = asyncio.Lock() self._recv_task = asyncio.create_task(self._receive_loop()) @@ -124,6 +133,13 @@ async def close(self) -> None: self._recv_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._recv_task + if self._inflight: + tasks = list(self._inflight) + for task in tasks: + task.cancel() + for task in tasks: + with contextlib.suppress(asyncio.CancelledError): + await task # Do not close writer here; lifecycle owned by caller # --- IO loops ---------------------------------------------------------------- @@ -150,12 +166,28 @@ async def _process_message(self, message: dict) -> None: has_id = "id" in message if method is not None and has_id: - await self._handle_request(message) - elif method is not None and not has_id: + self._schedule(self._handle_request(message)) + return + if method is not None and not has_id: await self._handle_notification(message) - elif has_id: + return + if has_id: await self._handle_response(message) + def _schedule(self, coro: Awaitable[Any]) -> None: + task = asyncio.create_task(coro) + self._inflight.add(task) + task.add_done_callback(self._task_done) + + def _task_done(self, task: asyncio.Task[Any]) -> None: + self._inflight.discard(task) + if task.cancelled(): + return + try: + task.result() + except Exception: + logging.exception("Unhandled error in JSON-RPC request handler") + async def _handle_request(self, message: dict) -> None: """Handle JSON-RPC request.""" payload = {"jsonrpc": "2.0", "id": message["id"]} @@ -253,7 +285,11 @@ async def initialize(self, params: InitializeRequest) -> InitializeResponse: ... async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: ... - async def loadSession(self, params: LoadSessionRequest) -> None: ... + async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse | None: ... + + async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: ... + + async def setSessionModel(self, params: SetSessionModelRequest) -> SetSessionModelResponse | None: ... async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse | None: ... @@ -261,8 +297,6 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: ... async def cancel(self, params: CancelNotification) -> None: ... - async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: ... - # Extension hooks (optional) async def extMethod(self, method: str, params: dict) -> dict: ... @@ -332,7 +366,10 @@ async def _handle_agent_session_methods(self, agent: Agent, method: str, params: if not hasattr(agent, "loadSession"): raise RequestError.method_not_found(method) p = LoadSessionRequest.model_validate(params) - return await agent.loadSession(p) + result = await agent.loadSession(p) + if isinstance(result, BaseModel): + return result.model_dump() + return result or {} if method == AGENT_METHODS["session_set_mode"]: if not hasattr(agent, "setSessionMode"): raise RequestError.method_not_found(method) @@ -342,6 +379,12 @@ async def _handle_agent_session_methods(self, agent: Agent, method: str, params: if method == AGENT_METHODS["session_prompt"]: p = PromptRequest.model_validate(params) return await agent.prompt(p) + if method == AGENT_METHODS["session_set_model"]: + if not hasattr(agent, "setSessionModel"): + raise RequestError.method_not_found(method) + p = SetSessionModelRequest.model_validate(params) + result = await agent.setSessionModel(p) + return result.model_dump() if isinstance(result, BaseModel) else (result or {}) if method == AGENT_METHODS["session_cancel"]: p = CancelNotification.model_validate(params) return await agent.cancel(p) @@ -548,26 +591,37 @@ async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: ) return NewSessionResponse.model_validate(resp) - async def loadSession(self, params: LoadSessionRequest) -> None: - await self._conn.send_request( + async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse: + resp = await self._conn.send_request( AGENT_METHODS["session_load"], params.model_dump(exclude_none=True, exclude_defaults=True), ) + payload = resp if isinstance(resp, dict) else {} + return LoadSessionResponse.model_validate(payload) - async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: + async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse: resp = await self._conn.send_request( AGENT_METHODS["session_set_mode"], params.model_dump(exclude_none=True, exclude_defaults=True), ) - # May be empty object - return SetSessionModeResponse.model_validate(resp) if isinstance(resp, dict) else None + payload = resp if isinstance(resp, dict) else {} + return SetSessionModeResponse.model_validate(payload) + + async def setSessionModel(self, params: SetSessionModelRequest) -> SetSessionModelResponse: + resp = await self._conn.send_request( + AGENT_METHODS["session_set_model"], + params.model_dump(exclude_none=True, exclude_defaults=True), + ) + payload = resp if isinstance(resp, dict) else {} + return SetSessionModelResponse.model_validate(payload) - async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse | None: + async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse: resp = await self._conn.send_request( AGENT_METHODS["authenticate"], params.model_dump(exclude_none=True, exclude_defaults=True), ) - return AuthenticateResponse.model_validate(resp) if isinstance(resp, dict) else None + payload = resp if isinstance(resp, dict) else {} + return AuthenticateResponse.model_validate(payload) async def prompt(self, params: PromptRequest) -> PromptResponse: resp = await self._conn.send_request( @@ -609,16 +663,18 @@ async def wait_for_exit(self) -> WaitForTerminalExitResponse: ) return WaitForTerminalExitResponse.model_validate(resp) - async def kill(self) -> KillTerminalCommandResponse | None: + async def kill(self) -> KillTerminalCommandResponse: resp = await self._conn.send_request( CLIENT_METHODS["terminal_kill"], {"sessionId": self._session_id, "terminalId": self.id}, ) - return KillTerminalCommandResponse.model_validate(resp) if isinstance(resp, dict) else None + payload = resp if isinstance(resp, dict) else {} + return KillTerminalCommandResponse.model_validate(payload) - async def release(self) -> ReleaseTerminalResponse | None: + async def release(self) -> ReleaseTerminalResponse: resp = await self._conn.send_request( CLIENT_METHODS["terminal_release"], {"sessionId": self._session_id, "terminalId": self.id}, ) - return ReleaseTerminalResponse.model_validate(resp) if isinstance(resp, dict) else None + payload = resp if isinstance(resp, dict) else {} + return ReleaseTerminalResponse.model_validate(payload) diff --git a/src/acp/meta.py b/src/acp/meta.py index 779597e..95db6c0 100644 --- a/src/acp/meta.py +++ b/src/acp/meta.py @@ -1,4 +1,5 @@ -# This file is generated from schema/meta.json. Do not edit by hand. -AGENT_METHODS = {'authenticate': 'authenticate', 'initialize': 'initialize', 'session_cancel': 'session/cancel', 'session_load': 'session/load', 'session_new': 'session/new', 'session_prompt': 'session/prompt', 'session_set_mode': 'session/set_mode'} +# Generated from schema/meta.json. Do not edit by hand. +# Schema ref: refs/tags/v0.4.5 +AGENT_METHODS = {'authenticate': 'authenticate', 'initialize': 'initialize', 'session_cancel': 'session/cancel', 'session_load': 'session/load', 'session_new': 'session/new', 'session_prompt': 'session/prompt', 'session_set_mode': 'session/set_mode', 'session_set_model': 'session/set_model'} CLIENT_METHODS = {'fs_read_text_file': 'fs/read_text_file', 'fs_write_text_file': 'fs/write_text_file', 'session_request_permission': 'session/request_permission', 'session_update': 'session/update', 'terminal_create': 'terminal/create', 'terminal_kill': 'terminal/kill', 'terminal_output': 'terminal/output', 'terminal_release': 'terminal/release', 'terminal_wait_for_exit': 'terminal/wait_for_exit'} PROTOCOL_VERSION = 1 diff --git a/src/acp/schema.py b/src/acp/schema.py index b9600b1..13f1c66 100644 --- a/src/acp/schema.py +++ b/src/acp/schema.py @@ -1,6 +1,5 @@ -# generated by datamodel-codegen: -# filename: schema.json -# timestamp: 2025-09-13T16:38:08+00:00 +# Generated from schema/schema.json. Do not edit by hand. +# Schema ref: refs/tags/v0.4.5 from __future__ import annotations @@ -13,12 +12,12 @@ class AuthenticateRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None methodId: Annotated[ str, Field( - description='The ID of the authentication method to use.\nMust be one of the methods advertised in the initialize response.' + description="The ID of the authentication method to use.\nMust be one of the methods advertised in the initialize response." ), ] @@ -26,28 +25,28 @@ class AuthenticateRequest(BaseModel): class AuthenticateResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None -class AvailableCommandInput1(BaseModel): +class CommandInputHint(BaseModel): hint: Annotated[ str, Field(description="A hint to display when the input hasn't been provided yet"), ] -class AvailableCommandInput(RootModel[AvailableCommandInput1]): +class AvailableCommandInput(RootModel[CommandInputHint]): root: Annotated[ - AvailableCommandInput1, - Field(description='The input specification for a command.'), + CommandInputHint, + Field(description="The input specification for a command."), ] class BlobResourceContents(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None blob: str mimeType: Optional[str] = None @@ -57,155 +56,137 @@ class BlobResourceContents(BaseModel): class CreateTerminalResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - terminalId: Annotated[ - str, Field(description='The unique identifier for the created terminal.') - ] + terminalId: Annotated[str, Field(description="The unique identifier for the created terminal.")] class EnvVariable(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - name: Annotated[str, Field(description='The name of the environment variable.')] - value: Annotated[ - str, Field(description='The value to set for the environment variable.') - ] + name: Annotated[str, Field(description="The name of the environment variable.")] + value: Annotated[str, Field(description="The value to set for the environment variable.")] class FileSystemCapability(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None readTextFile: Annotated[ Optional[bool], - Field(description='Whether the Client supports `fs/read_text_file` requests.'), + Field(description="Whether the Client supports `fs/read_text_file` requests."), ] = False writeTextFile: Annotated[ Optional[bool], - Field(description='Whether the Client supports `fs/write_text_file` requests.'), + Field(description="Whether the Client supports `fs/write_text_file` requests."), ] = False class HttpHeader(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - name: Annotated[str, Field(description='The name of the HTTP header.')] - value: Annotated[str, Field(description='The value to set for the HTTP header.')] + name: Annotated[str, Field(description="The name of the HTTP header.")] + value: Annotated[str, Field(description="The value to set for the HTTP header.")] class KillTerminalCommandResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None class McpCapabilities(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - http: Annotated[ - Optional[bool], Field(description='Agent supports [`McpServer::Http`].') - ] = False - sse: Annotated[ - Optional[bool], Field(description='Agent supports [`McpServer::Sse`].') - ] = False + http: Annotated[Optional[bool], Field(description="Agent supports [`McpServer::Http`].")] = False + sse: Annotated[Optional[bool], Field(description="Agent supports [`McpServer::Sse`].")] = False -class McpServer1(BaseModel): +class HttpMcpServer(BaseModel): headers: Annotated[ List[HttpHeader], - Field( - description='HTTP headers to set when making requests to the MCP server.' - ), - ] - name: Annotated[ - str, Field(description='Human-readable name identifying this MCP server.') + Field(description="HTTP headers to set when making requests to the MCP server."), ] - type: Literal['http'] - url: Annotated[str, Field(description='URL to the MCP server.')] + name: Annotated[str, Field(description="Human-readable name identifying this MCP server.")] + type: Literal["http"] + url: Annotated[str, Field(description="URL to the MCP server.")] -class McpServer2(BaseModel): +class SseMcpServer(BaseModel): headers: Annotated[ List[HttpHeader], - Field( - description='HTTP headers to set when making requests to the MCP server.' - ), + Field(description="HTTP headers to set when making requests to the MCP server."), ] - name: Annotated[ - str, Field(description='Human-readable name identifying this MCP server.') - ] - type: Literal['sse'] - url: Annotated[str, Field(description='URL to the MCP server.')] + name: Annotated[str, Field(description="Human-readable name identifying this MCP server.")] + type: Literal["sse"] + url: Annotated[str, Field(description="URL to the MCP server.")] -class McpServer3(BaseModel): +class StdioMcpServer(BaseModel): args: Annotated[ List[str], - Field(description='Command-line arguments to pass to the MCP server.'), + Field(description="Command-line arguments to pass to the MCP server."), ] - command: Annotated[str, Field(description='Path to the MCP server executable.')] + command: Annotated[str, Field(description="Path to the MCP server executable.")] env: Annotated[ List[EnvVariable], - Field( - description='Environment variables to set when launching the MCP server.' - ), - ] - name: Annotated[ - str, Field(description='Human-readable name identifying this MCP server.') + Field(description="Environment variables to set when launching the MCP server."), ] + name: Annotated[str, Field(description="Human-readable name identifying this MCP server.")] + + +class ModelInfo(BaseModel): + field_meta: Annotated[ + Optional[Any], + Field(alias="_meta", description="Extension point for implementations"), + ] = None + description: Annotated[Optional[str], Field(description="Optional description of the model.")] = None + modelId: Annotated[str, Field(description="Unique identifier for the model.")] + name: Annotated[str, Field(description="Human-readable name of the model.")] class NewSessionRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None cwd: Annotated[ str, - Field( - description='The working directory for this session. Must be an absolute path.' - ), + Field(description="The working directory for this session. Must be an absolute path."), ] mcpServers: Annotated[ - List[Union[McpServer1, McpServer2, McpServer3]], - Field( - description='List of MCP (Model Context Protocol) servers the agent should connect to.' - ), + List[Union[HttpMcpServer, SseMcpServer, StdioMcpServer]], + Field(description="List of MCP (Model Context Protocol) servers the agent should connect to."), ] class PromptCapabilities(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - audio: Annotated[ - Optional[bool], Field(description='Agent supports [`ContentBlock::Audio`].') - ] = False + audio: Annotated[Optional[bool], Field(description="Agent supports [`ContentBlock::Audio`].")] = False embeddedContext: Annotated[ Optional[bool], Field( - description='Agent supports embedded context in `session/prompt` requests.\n\nWhen enabled, the Client is allowed to include [`ContentBlock::Resource`]\nin prompt requests for pieces of context that are referenced in the message.' + description="Agent supports embedded context in `session/prompt` requests.\n\nWhen enabled, the Client is allowed to include [`ContentBlock::Resource`]\nin prompt requests for pieces of context that are referenced in the message." ), ] = False - image: Annotated[ - Optional[bool], Field(description='Agent supports [`ContentBlock::Image`].') - ] = False + image: Annotated[Optional[bool], Field(description="Agent supports [`ContentBlock::Image`].")] = False class ReadTextFileResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: str @@ -213,212 +194,217 @@ class ReadTextFileResponse(BaseModel): class ReleaseTerminalResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None -class RequestPermissionOutcome1(BaseModel): - outcome: Literal['cancelled'] +class DeniedOutcome(BaseModel): + outcome: Literal["cancelled"] -class RequestPermissionOutcome2(BaseModel): - optionId: Annotated[ - str, Field(description='The ID of the option the user selected.') - ] - outcome: Literal['selected'] +class AllowedOutcome(BaseModel): + optionId: Annotated[str, Field(description="The ID of the option the user selected.")] + outcome: Literal["selected"] class RequestPermissionResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None outcome: Annotated[ - Union[RequestPermissionOutcome1, RequestPermissionOutcome2], + Union[DeniedOutcome, AllowedOutcome], Field(description="The user's decision on the permission request."), ] class Role(Enum): - assistant = 'assistant' - user = 'user' + assistant = "assistant" + user = "user" -class SessionUpdate8(BaseModel): - currentModeId: Annotated[ - str, Field(description='Unique identifier for a Session Mode.') - ] - sessionUpdate: Literal['current_mode_update'] +class SessionModelState(BaseModel): + field_meta: Annotated[ + Optional[Any], + Field(alias="_meta", description="Extension point for implementations"), + ] = None + availableModels: Annotated[List[ModelInfo], Field(description="The set of models that the Agent can use")] + currentModelId: Annotated[str, Field(description="The current model the Agent is in.")] + + +class CurrentModeUpdate(BaseModel): + currentModeId: Annotated[str, Field(description="Unique identifier for a Session Mode.")] + sessionUpdate: Literal["current_mode_update"] class SetSessionModeRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - modeId: Annotated[str, Field(description='The ID of the mode to set.')] - sessionId: Annotated[ - str, Field(description='The ID of the session to set the mode for.') - ] + modeId: Annotated[str, Field(description="The ID of the mode to set.")] + sessionId: Annotated[str, Field(description="The ID of the session to set the mode for.")] class SetSessionModeResponse(BaseModel): meta: Optional[Any] = None +class SetSessionModelRequest(BaseModel): + field_meta: Annotated[ + Optional[Any], + Field(alias="_meta", description="Extension point for implementations"), + ] = None + modelId: Annotated[str, Field(description="The ID of the model to set.")] + sessionId: Annotated[str, Field(description="The ID of the session to set the model for.")] + + +class SetSessionModelResponse(BaseModel): + field_meta: Annotated[ + Optional[Any], + Field(alias="_meta", description="Extension point for implementations"), + ] = None + + class TerminalExitStatus(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None exitCode: Annotated[ Optional[int], Field( - description='The process exit code (may be null if terminated by signal).', + description="The process exit code (may be null if terminated by signal).", ge=0, ), ] = None signal: Annotated[ Optional[str], - Field( - description='The signal that terminated the process (may be null if exited normally).' - ), + Field(description="The signal that terminated the process (may be null if exited normally)."), ] = None class TerminalOutputRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[str, Field(description='The session ID for this request.')] - terminalId: Annotated[ - str, Field(description='The ID of the terminal to get output from.') - ] + sessionId: Annotated[str, Field(description="The session ID for this request.")] + terminalId: Annotated[str, Field(description="The ID of the terminal to get output from.")] class TerminalOutputResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None exitStatus: Annotated[ Optional[TerminalExitStatus], - Field(description='Exit status if the command has completed.'), + Field(description="Exit status if the command has completed."), ] = None - output: Annotated[str, Field(description='The terminal output captured so far.')] - truncated: Annotated[ - bool, Field(description='Whether the output was truncated due to byte limits.') - ] + output: Annotated[str, Field(description="The terminal output captured so far.")] + truncated: Annotated[bool, Field(description="Whether the output was truncated due to byte limits.")] class TextResourceContents(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None mimeType: Optional[str] = None text: str uri: str -class ToolCallContent2(BaseModel): +class FileEditToolCallContent(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), - ] = None - newText: Annotated[str, Field(description='The new content after modification.')] - oldText: Annotated[ - Optional[str], Field(description='The original content (None for new files).') + Field(alias="_meta", description="Extension point for implementations"), ] = None - path: Annotated[str, Field(description='The file path being modified.')] - type: Literal['diff'] + newText: Annotated[str, Field(description="The new content after modification.")] + oldText: Annotated[Optional[str], Field(description="The original content (None for new files).")] = None + path: Annotated[str, Field(description="The file path being modified.")] + type: Literal["diff"] -class ToolCallContent3(BaseModel): +class TerminalToolCallContent(BaseModel): terminalId: str - type: Literal['terminal'] + type: Literal["terminal"] class ToolCallLocation(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - line: Annotated[ - Optional[int], Field(description='Optional line number within the file.', ge=0) - ] = None - path: Annotated[str, Field(description='The file path being accessed or modified.')] + line: Annotated[Optional[int], Field(description="Optional line number within the file.", ge=0)] = None + path: Annotated[str, Field(description="The file path being accessed or modified.")] class WaitForTerminalExitRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[str, Field(description='The session ID for this request.')] - terminalId: Annotated[str, Field(description='The ID of the terminal to wait for.')] + sessionId: Annotated[str, Field(description="The session ID for this request.")] + terminalId: Annotated[str, Field(description="The ID of the terminal to wait for.")] class WaitForTerminalExitResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None exitCode: Annotated[ Optional[int], Field( - description='The process exit code (may be null if terminated by signal).', + description="The process exit code (may be null if terminated by signal).", ge=0, ), ] = None signal: Annotated[ Optional[str], - Field( - description='The signal that terminated the process (may be null if exited normally).' - ), + Field(description="The signal that terminated the process (may be null if exited normally)."), ] = None class WriteTextFileRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - content: Annotated[str, Field(description='The text content to write to the file.')] - path: Annotated[str, Field(description='Absolute path to the file to write.')] - sessionId: Annotated[str, Field(description='The session ID for this request.')] + content: Annotated[str, Field(description="The text content to write to the file.")] + path: Annotated[str, Field(description="Absolute path to the file to write.")] + sessionId: Annotated[str, Field(description="The session ID for this request.")] class WriteTextFileResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None class AgentCapabilities(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - loadSession: Annotated[ - Optional[bool], Field(description='Whether the agent supports `session/load`.') - ] = False + loadSession: Annotated[Optional[bool], Field(description="Whether the agent supports `session/load`.")] = False mcpCapabilities: Annotated[ Optional[McpCapabilities], - Field(description='MCP capabilities supported by the agent.'), - ] = {'http': False, 'sse': False} + Field(description="MCP capabilities supported by the agent."), + ] = {"http": False, "sse": False} promptCapabilities: Annotated[ Optional[PromptCapabilities], - Field(description='Prompt capabilities supported by the agent.'), - ] = {'audio': False, 'embeddedContext': False, 'image': False} + Field(description="Prompt capabilities supported by the agent."), + ] = {"audio": False, "embeddedContext": False, "image": False} class Annotations(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None audience: Optional[List[Role]] = None lastModified: Optional[str] = None @@ -428,7 +414,7 @@ class Annotations(BaseModel): class AudioContent(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None data: str @@ -438,104 +424,94 @@ class AudioContent(BaseModel): class AuthMethod(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None description: Annotated[ Optional[str], - Field( - description='Optional description providing more details about this authentication method.' - ), + Field(description="Optional description providing more details about this authentication method."), ] = None - id: Annotated[ - str, Field(description='Unique identifier for this authentication method.') - ] - name: Annotated[ - str, Field(description='Human-readable name of the authentication method.') - ] + id: Annotated[str, Field(description="Unique identifier for this authentication method.")] + name: Annotated[str, Field(description="Human-readable name of the authentication method.")] class AvailableCommand(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - description: Annotated[ - str, Field(description='Human-readable description of what the command does.') - ] + description: Annotated[str, Field(description="Human-readable description of what the command does.")] input: Annotated[ Optional[AvailableCommandInput], - Field(description='Input for the command if required'), + Field(description="Input for the command if required"), ] = None name: Annotated[ str, - Field(description='Command name (e.g., "create_plan", "research_codebase").'), + Field(description="Command name (e.g., `create_plan`, `research_codebase`)."), ] class CancelNotification(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[ - str, Field(description='The ID of the session to cancel operations for.') - ] + sessionId: Annotated[str, Field(description="The ID of the session to cancel operations for.")] class ClientCapabilities(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None fs: Annotated[ Optional[FileSystemCapability], Field( - description='File system capabilities supported by the client.\nDetermines which file operations the agent can request.' + description="File system capabilities supported by the client.\nDetermines which file operations the agent can request." ), - ] = {'readTextFile': False, 'writeTextFile': False} + ] = {"readTextFile": False, "writeTextFile": False} terminal: Annotated[ Optional[bool], - Field(description='Whether the Client support all `terminal/*` methods.'), + Field(description="Whether the Client support all `terminal/*` methods."), ] = False -class ContentBlock1(BaseModel): +class TextContentBlock(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None text: str - type: Literal['text'] + type: Literal["text"] -class ContentBlock2(BaseModel): +class ImageContentBlock(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None data: str mimeType: str - type: Literal['image'] + type: Literal["image"] uri: Optional[str] = None -class ContentBlock3(BaseModel): +class AudioContentBlock(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None data: str mimeType: str - type: Literal['audio'] + type: Literal["audio"] -class ContentBlock4(BaseModel): +class ResourceContentBlock(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None description: Optional[str] = None @@ -543,41 +519,39 @@ class ContentBlock4(BaseModel): name: str size: Optional[int] = None title: Optional[str] = None - type: Literal['resource_link'] + type: Literal["resource_link"] uri: str class CreateTerminalRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - args: Annotated[ - Optional[List[str]], Field(description='Array of command arguments.') - ] = None - command: Annotated[str, Field(description='The command to execute.')] + args: Annotated[Optional[List[str]], Field(description="Array of command arguments.")] = None + command: Annotated[str, Field(description="The command to execute.")] cwd: Annotated[ Optional[str], - Field(description='Working directory for the command (absolute path).'), + Field(description="Working directory for the command (absolute path)."), ] = None env: Annotated[ Optional[List[EnvVariable]], - Field(description='Environment variables for the command.'), + Field(description="Environment variables for the command."), ] = None outputByteLimit: Annotated[ Optional[int], Field( - description='Maximum number of output bytes to retain.\n\nWhen the limit is exceeded, the Client truncates from the beginning of the output\nto stay within the limit.\n\nThe Client MUST ensure truncation happens at a character boundary to maintain valid\nstring output, even if this means the retained output is slightly less than the\nspecified limit.', + description="Maximum number of output bytes to retain.\n\nWhen the limit is exceeded, the Client truncates from the beginning of the output\nto stay within the limit.\n\nThe Client MUST ensure truncation happens at a character boundary to maintain valid\nstring output, even if this means the retained output is slightly less than the\nspecified limit.", ge=0, ), ] = None - sessionId: Annotated[str, Field(description='The session ID for this request.')] + sessionId: Annotated[str, Field(description="The session ID for this request.")] class ImageContent(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None data: str @@ -588,16 +562,16 @@ class ImageContent(BaseModel): class InitializeRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None clientCapabilities: Annotated[ Optional[ClientCapabilities], - Field(description='Capabilities supported by the client.'), - ] = {'fs': {'readTextFile': False, 'writeTextFile': False}, 'terminal': False} + Field(description="Capabilities supported by the client."), + ] = {"fs": {"readTextFile": False, "writeTextFile": False}, "terminal": False} protocolVersion: Annotated[ int, Field( - description='The latest protocol version supported by the client.', + description="The latest protocol version supported by the client.", ge=0, le=65535, ), @@ -607,23 +581,23 @@ class InitializeRequest(BaseModel): class InitializeResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None agentCapabilities: Annotated[ Optional[AgentCapabilities], - Field(description='Capabilities supported by the agent.'), + Field(description="Capabilities supported by the agent."), ] = { - 'loadSession': False, - 'mcpCapabilities': {'http': False, 'sse': False}, - 'promptCapabilities': { - 'audio': False, - 'embeddedContext': False, - 'image': False, + "loadSession": False, + "mcpCapabilities": {"http": False, "sse": False}, + "promptCapabilities": { + "audio": False, + "embeddedContext": False, + "image": False, }, } authMethods: Annotated[ Optional[List[AuthMethod]], - Field(description='Authentication methods supported by the agent.'), + Field(description="Authentication methods supported by the agent."), ] = [] protocolVersion: Annotated[ int, @@ -638,100 +612,88 @@ class InitializeResponse(BaseModel): class KillTerminalCommandRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[str, Field(description='The session ID for this request.')] - terminalId: Annotated[str, Field(description='The ID of the terminal to kill.')] + sessionId: Annotated[str, Field(description="The session ID for this request.")] + terminalId: Annotated[str, Field(description="The ID of the terminal to kill.")] class LoadSessionRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - cwd: Annotated[str, Field(description='The working directory for this session.')] + cwd: Annotated[str, Field(description="The working directory for this session.")] mcpServers: Annotated[ - List[Union[McpServer1, McpServer2, McpServer3]], - Field(description='List of MCP servers to connect to for this session.'), + List[Union[HttpMcpServer, SseMcpServer, StdioMcpServer]], + Field(description="List of MCP servers to connect to for this session."), ] - sessionId: Annotated[str, Field(description='The ID of the session to load.')] + sessionId: Annotated[str, Field(description="The ID of the session to load.")] class PermissionOption(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - kind: Annotated[ - str, Field(description='Hint about the nature of this permission option.') - ] - name: Annotated[ - str, Field(description='Human-readable label to display to the user.') - ] - optionId: Annotated[ - str, Field(description='Unique identifier for this permission option.') - ] + kind: Annotated[str, Field(description="Hint about the nature of this permission option.")] + name: Annotated[str, Field(description="Human-readable label to display to the user.")] + optionId: Annotated[str, Field(description="Unique identifier for this permission option.")] class PlanEntry(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: Annotated[ str, - Field( - description='Human-readable description of what this task aims to accomplish.' - ), + Field(description="Human-readable description of what this task aims to accomplish."), ] priority: Annotated[ str, Field( - description='The relative importance of this task.\nUsed to indicate which tasks are most critical to the overall goal.' + description="The relative importance of this task.\nUsed to indicate which tasks are most critical to the overall goal." ), ] - status: Annotated[str, Field(description='Current execution status of this task.')] + status: Annotated[str, Field(description="Current execution status of this task.")] class PromptResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - stopReason: Annotated[ - str, Field(description='Indicates why the agent stopped processing the turn.') - ] + stopReason: Annotated[str, Field(description="Indicates why the agent stopped processing the turn.")] class ReadTextFileRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), - ] = None - limit: Annotated[ - Optional[int], Field(description='Maximum number of lines to read.', ge=0) + Field(alias="_meta", description="Extension point for implementations"), ] = None + limit: Annotated[Optional[int], Field(description="Maximum number of lines to read.", ge=0)] = None line: Annotated[ Optional[int], - Field(description='Line number to start reading from (1-based).', ge=0), + Field(description="Line number to start reading from (1-based).", ge=0), ] = None - path: Annotated[str, Field(description='Absolute path to the file to read.')] - sessionId: Annotated[str, Field(description='The session ID for this request.')] + path: Annotated[str, Field(description="Absolute path to the file to read.")] + sessionId: Annotated[str, Field(description="The session ID for this request.")] class ReleaseTerminalRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[str, Field(description='The session ID for this request.')] - terminalId: Annotated[str, Field(description='The ID of the terminal to release.')] + sessionId: Annotated[str, Field(description="The session ID for this request.")] + terminalId: Annotated[str, Field(description="The ID of the terminal to release.")] class ResourceLink(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None description: Optional[str] = None @@ -745,89 +707,93 @@ class ResourceLink(BaseModel): class SessionMode(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None description: Optional[str] = None - id: Annotated[str, Field(description='Unique identifier for a Session Mode.')] + id: Annotated[str, Field(description="Unique identifier for a Session Mode.")] name: str class SessionModeState(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None availableModes: Annotated[ List[SessionMode], - Field(description='The set of modes that the Agent can operate in'), - ] - currentModeId: Annotated[ - str, Field(description='The current mode the Agent is in.') + Field(description="The set of modes that the Agent can operate in"), ] + currentModeId: Annotated[str, Field(description="The current mode the Agent is in.")] -class SessionUpdate6(BaseModel): +class AgentPlanUpdate(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None entries: Annotated[ List[PlanEntry], Field( - description='The list of tasks to be accomplished.\n\nWhen updating a plan, the agent must send a complete list of all entries\nwith their current status. The client replaces the entire plan with each update.' + description="The list of tasks to be accomplished.\n\nWhen updating a plan, the agent must send a complete list of all entries\nwith their current status. The client replaces the entire plan with each update." ), ] - sessionUpdate: Literal['plan'] + sessionUpdate: Literal["plan"] -class SessionUpdate7(BaseModel): +class AvailableCommandsUpdate(BaseModel): availableCommands: List[AvailableCommand] - sessionUpdate: Literal['available_commands_update'] + sessionUpdate: Literal["available_commands_update"] class TextContent(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None text: str -class ContentBlock5(BaseModel): +class EmbeddedResourceContentBlock(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None resource: Annotated[ Union[TextResourceContents, BlobResourceContents], - Field(description='Resource content that can be embedded in a message.'), + Field(description="Resource content that can be embedded in a message."), ] - type: Literal['resource'] + type: Literal["resource"] class EmbeddedResource(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None annotations: Optional[Annotations] = None resource: Annotated[ Union[TextResourceContents, BlobResourceContents], - Field(description='Resource content that can be embedded in a message.'), + Field(description="Resource content that can be embedded in a message."), ] class LoadSessionResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), + ] = None + models: Annotated[ + Optional[SessionModelState], + Field( + description="**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nInitial model state if supported by the Agent" + ), ] = None modes: Annotated[ Optional[SessionModeState], Field( - description='Initial mode state if supported by the Agent\n\nSee protocol docs: [Session Modes](https://agentclientprotocol.com/protocol/session-modes)' + description="Initial mode state if supported by the Agent\n\nSee protocol docs: [Session Modes](https://agentclientprotocol.com/protocol/session-modes)" ), ] = None @@ -835,18 +801,24 @@ class LoadSessionResponse(BaseModel): class NewSessionResponse(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), + ] = None + models: Annotated[ + Optional[SessionModelState], + Field( + description="**UNSTABLE**\n\nThis capability is not part of the spec yet, and may be removed or changed at any point.\n\nInitial model state if supported by the Agent" + ), ] = None modes: Annotated[ Optional[SessionModeState], Field( - description='Initial mode state if supported by the Agent\n\nSee protocol docs: [Session Modes](https://agentclientprotocol.com/protocol/session-modes)' + description="Initial mode state if supported by the Agent\n\nSee protocol docs: [Session Modes](https://agentclientprotocol.com/protocol/session-modes)" ), ] = None sessionId: Annotated[ str, Field( - description='Unique identifier for the created session.\n\nUsed in all subsequent requests for this conversation.' + description="Unique identifier for the created session.\n\nUsed in all subsequent requests for this conversation." ), ] @@ -854,12 +826,12 @@ class NewSessionResponse(BaseModel): class Plan(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None entries: Annotated[ List[PlanEntry], Field( - description='The list of tasks to be accomplished.\n\nWhen updating a plan, the agent must send a complete list of all entries\nwith their current status. The client replaces the entire plan with each update.' + description="The list of tasks to be accomplished.\n\nWhen updating a plan, the agent must send a complete list of all entries\nwith their current status. The client replaces the entire plan with each update." ), ] @@ -867,253 +839,213 @@ class Plan(BaseModel): class PromptRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None prompt: Annotated[ List[ Union[ - ContentBlock1, - ContentBlock2, - ContentBlock3, - ContentBlock4, - ContentBlock5, + TextContentBlock, + ImageContentBlock, + AudioContentBlock, + ResourceContentBlock, + EmbeddedResourceContentBlock, ] ], Field( description="The blocks of content that compose the user's message.\n\nAs a baseline, the Agent MUST support [`ContentBlock::Text`] and [`ContentBlock::ResourceLink`],\nwhile other variants are optionally enabled via [`PromptCapabilities`].\n\nThe Client MUST adapt its interface according to [`PromptCapabilities`].\n\nThe client MAY include referenced pieces of context as either\n[`ContentBlock::Resource`] or [`ContentBlock::ResourceLink`].\n\nWhen available, [`ContentBlock::Resource`] is preferred\nas it avoids extra round-trips and allows the message to include\npieces of context from sources the agent may not have access to." ), ] - sessionId: Annotated[ - str, Field(description='The ID of the session to send this user message to') - ] + sessionId: Annotated[str, Field(description="The ID of the session to send this user message to")] -class SessionUpdate1(BaseModel): +class UserMessageChunk(BaseModel): content: Annotated[ Union[ - ContentBlock1, ContentBlock2, ContentBlock3, ContentBlock4, ContentBlock5 + TextContentBlock, ImageContentBlock, AudioContentBlock, ResourceContentBlock, EmbeddedResourceContentBlock ], Field( description="Content blocks represent displayable information in the Agent Client Protocol.\n\nThey provide a structured way to handle various types of user-facing content—whether\nit's text from language models, images for analysis, or embedded resources for context.\n\nContent blocks appear in:\n- User prompts sent via `session/prompt`\n- Language model output streamed through `session/update` notifications\n- Progress updates and results from tool calls\n\nThis structure is compatible with the Model Context Protocol (MCP), enabling\nagents to seamlessly forward content from MCP tool outputs without transformation.\n\nSee protocol docs: [Content](https://agentclientprotocol.com/protocol/content)" ), ] - sessionUpdate: Literal['user_message_chunk'] + sessionUpdate: Literal["user_message_chunk"] -class SessionUpdate2(BaseModel): +class AgentMessageChunk(BaseModel): content: Annotated[ Union[ - ContentBlock1, ContentBlock2, ContentBlock3, ContentBlock4, ContentBlock5 + TextContentBlock, ImageContentBlock, AudioContentBlock, ResourceContentBlock, EmbeddedResourceContentBlock ], Field( description="Content blocks represent displayable information in the Agent Client Protocol.\n\nThey provide a structured way to handle various types of user-facing content—whether\nit's text from language models, images for analysis, or embedded resources for context.\n\nContent blocks appear in:\n- User prompts sent via `session/prompt`\n- Language model output streamed through `session/update` notifications\n- Progress updates and results from tool calls\n\nThis structure is compatible with the Model Context Protocol (MCP), enabling\nagents to seamlessly forward content from MCP tool outputs without transformation.\n\nSee protocol docs: [Content](https://agentclientprotocol.com/protocol/content)" ), ] - sessionUpdate: Literal['agent_message_chunk'] + sessionUpdate: Literal["agent_message_chunk"] -class SessionUpdate3(BaseModel): +class AgentThoughtChunk(BaseModel): content: Annotated[ Union[ - ContentBlock1, ContentBlock2, ContentBlock3, ContentBlock4, ContentBlock5 + TextContentBlock, ImageContentBlock, AudioContentBlock, ResourceContentBlock, EmbeddedResourceContentBlock ], Field( description="Content blocks represent displayable information in the Agent Client Protocol.\n\nThey provide a structured way to handle various types of user-facing content—whether\nit's text from language models, images for analysis, or embedded resources for context.\n\nContent blocks appear in:\n- User prompts sent via `session/prompt`\n- Language model output streamed through `session/update` notifications\n- Progress updates and results from tool calls\n\nThis structure is compatible with the Model Context Protocol (MCP), enabling\nagents to seamlessly forward content from MCP tool outputs without transformation.\n\nSee protocol docs: [Content](https://agentclientprotocol.com/protocol/content)" ), ] - sessionUpdate: Literal['agent_thought_chunk'] + sessionUpdate: Literal["agent_thought_chunk"] -class ToolCallContent1(BaseModel): +class ContentToolCallContent(BaseModel): content: Annotated[ Union[ - ContentBlock1, ContentBlock2, ContentBlock3, ContentBlock4, ContentBlock5 + TextContentBlock, ImageContentBlock, AudioContentBlock, ResourceContentBlock, EmbeddedResourceContentBlock ], - Field(description='The actual content block.'), + Field(description="The actual content block."), ] - type: Literal['content'] + type: Literal["content"] class ToolCallUpdate(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: Annotated[ - Optional[List[Union[ToolCallContent1, ToolCallContent2, ToolCallContent3]]], - Field(description='Replace the content collection.'), + Optional[List[Union[ContentToolCallContent, FileEditToolCallContent, TerminalToolCallContent]]], + Field(description="Replace the content collection."), ] = None - kind: Annotated[Optional[str], Field(description='Update the tool kind.')] = None + kind: Annotated[Optional[str], Field(description="Update the tool kind.")] = None locations: Annotated[ Optional[List[ToolCallLocation]], - Field(description='Replace the locations collection.'), - ] = None - rawInput: Annotated[Optional[Any], Field(description='Update the raw input.')] = ( - None - ) - rawOutput: Annotated[Optional[Any], Field(description='Update the raw output.')] = ( - None - ) - status: Annotated[ - Optional[str], Field(description='Update the execution status.') - ] = None - title: Annotated[ - Optional[str], Field(description='Update the human-readable title.') + Field(description="Replace the locations collection."), ] = None - toolCallId: Annotated[ - str, Field(description='The ID of the tool call being updated.') - ] + rawInput: Annotated[Optional[Any], Field(description="Update the raw input.")] = None + rawOutput: Annotated[Optional[Any], Field(description="Update the raw output.")] = None + status: Annotated[Optional[str], Field(description="Update the execution status.")] = None + title: Annotated[Optional[str], Field(description="Update the human-readable title.")] = None + toolCallId: Annotated[str, Field(description="The ID of the tool call being updated.")] class RequestPermissionRequest(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None options: Annotated[ List[PermissionOption], - Field(description='Available permission options for the user to choose from.'), + Field(description="Available permission options for the user to choose from."), ] - sessionId: Annotated[str, Field(description='The session ID for this request.')] + sessionId: Annotated[str, Field(description="The session ID for this request.")] toolCall: Annotated[ ToolCallUpdate, - Field(description='Details about the tool call requiring permission.'), + Field(description="Details about the tool call requiring permission."), ] -class SessionUpdate4(BaseModel): +class ToolCallStart(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: Annotated[ - Optional[List[Union[ToolCallContent1, ToolCallContent2, ToolCallContent3]]], - Field(description='Content produced by the tool call.'), + Optional[List[Union[ContentToolCallContent, FileEditToolCallContent, TerminalToolCallContent]]], + Field(description="Content produced by the tool call."), ] = None kind: Annotated[ Optional[str], Field( - description='The category of tool being invoked.\nHelps clients choose appropriate icons and UI treatment.' + description="The category of tool being invoked.\nHelps clients choose appropriate icons and UI treatment." ), ] = None locations: Annotated[ Optional[List[ToolCallLocation]], - Field( - description='File locations affected by this tool call.\nEnables "follow-along" features in clients.' - ), - ] = None - rawInput: Annotated[ - Optional[Any], Field(description='Raw input parameters sent to the tool.') - ] = None - rawOutput: Annotated[ - Optional[Any], Field(description='Raw output returned by the tool.') - ] = None - sessionUpdate: Literal['tool_call'] - status: Annotated[ - Optional[str], Field(description='Current execution status of the tool call.') + Field(description='File locations affected by this tool call.\nEnables "follow-along" features in clients.'), ] = None + rawInput: Annotated[Optional[Any], Field(description="Raw input parameters sent to the tool.")] = None + rawOutput: Annotated[Optional[Any], Field(description="Raw output returned by the tool.")] = None + sessionUpdate: Literal["tool_call"] + status: Annotated[Optional[str], Field(description="Current execution status of the tool call.")] = None title: Annotated[ str, - Field(description='Human-readable title describing what the tool is doing.'), + Field(description="Human-readable title describing what the tool is doing."), ] toolCallId: Annotated[ str, - Field(description='Unique identifier for this tool call within the session.'), + Field(description="Unique identifier for this tool call within the session."), ] -class SessionUpdate5(BaseModel): +class ToolCallProgress(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: Annotated[ - Optional[List[Union[ToolCallContent1, ToolCallContent2, ToolCallContent3]]], - Field(description='Replace the content collection.'), + Optional[List[Union[ContentToolCallContent, FileEditToolCallContent, TerminalToolCallContent]]], + Field(description="Replace the content collection."), ] = None - kind: Annotated[Optional[str], Field(description='Update the tool kind.')] = None + kind: Annotated[Optional[str], Field(description="Update the tool kind.")] = None locations: Annotated[ Optional[List[ToolCallLocation]], - Field(description='Replace the locations collection.'), - ] = None - rawInput: Annotated[Optional[Any], Field(description='Update the raw input.')] = ( - None - ) - rawOutput: Annotated[Optional[Any], Field(description='Update the raw output.')] = ( - None - ) - sessionUpdate: Literal['tool_call_update'] - status: Annotated[ - Optional[str], Field(description='Update the execution status.') + Field(description="Replace the locations collection."), ] = None - title: Annotated[ - Optional[str], Field(description='Update the human-readable title.') - ] = None - toolCallId: Annotated[ - str, Field(description='The ID of the tool call being updated.') - ] + rawInput: Annotated[Optional[Any], Field(description="Update the raw input.")] = None + rawOutput: Annotated[Optional[Any], Field(description="Update the raw output.")] = None + sessionUpdate: Literal["tool_call_update"] + status: Annotated[Optional[str], Field(description="Update the execution status.")] = None + title: Annotated[Optional[str], Field(description="Update the human-readable title.")] = None + toolCallId: Annotated[str, Field(description="The ID of the tool call being updated.")] class ToolCall(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None content: Annotated[ - Optional[List[Union[ToolCallContent1, ToolCallContent2, ToolCallContent3]]], - Field(description='Content produced by the tool call.'), + Optional[List[Union[ContentToolCallContent, FileEditToolCallContent, TerminalToolCallContent]]], + Field(description="Content produced by the tool call."), ] = None kind: Annotated[ Optional[str], Field( - description='The category of tool being invoked.\nHelps clients choose appropriate icons and UI treatment.' + description="The category of tool being invoked.\nHelps clients choose appropriate icons and UI treatment." ), ] = None locations: Annotated[ Optional[List[ToolCallLocation]], - Field( - description='File locations affected by this tool call.\nEnables "follow-along" features in clients.' - ), - ] = None - rawInput: Annotated[ - Optional[Any], Field(description='Raw input parameters sent to the tool.') - ] = None - rawOutput: Annotated[ - Optional[Any], Field(description='Raw output returned by the tool.') - ] = None - status: Annotated[ - Optional[str], Field(description='Current execution status of the tool call.') + Field(description='File locations affected by this tool call.\nEnables "follow-along" features in clients.'), ] = None + rawInput: Annotated[Optional[Any], Field(description="Raw input parameters sent to the tool.")] = None + rawOutput: Annotated[Optional[Any], Field(description="Raw output returned by the tool.")] = None + status: Annotated[Optional[str], Field(description="Current execution status of the tool call.")] = None title: Annotated[ str, - Field(description='Human-readable title describing what the tool is doing.'), + Field(description="Human-readable title describing what the tool is doing."), ] toolCallId: Annotated[ str, - Field(description='Unique identifier for this tool call within the session.'), + Field(description="Unique identifier for this tool call within the session."), ] class SessionNotification(BaseModel): field_meta: Annotated[ Optional[Any], - Field(alias='_meta', description='Extension point for implementations'), + Field(alias="_meta", description="Extension point for implementations"), ] = None - sessionId: Annotated[ - str, Field(description='The ID of the session this update pertains to.') - ] + sessionId: Annotated[str, Field(description="The ID of the session this update pertains to.")] update: Annotated[ Union[ - SessionUpdate1, - SessionUpdate2, - SessionUpdate3, - SessionUpdate4, - SessionUpdate5, - SessionUpdate6, - SessionUpdate7, - SessionUpdate8, + UserMessageChunk, + AgentMessageChunk, + AgentThoughtChunk, + ToolCallStart, + ToolCallProgress, + AgentPlanUpdate, + AvailableCommandsUpdate, + CurrentModeUpdate, ], - Field(description='The actual update content.'), + Field(description="The actual update content."), ] @@ -1150,6 +1082,7 @@ class Model( LoadSessionRequest, SetSessionModeRequest, PromptRequest, + SetSessionModelRequest, Any, ], Union[ @@ -1159,6 +1092,7 @@ class Model( LoadSessionResponse, SetSessionModeResponse, PromptResponse, + SetSessionModelResponse, Any, ], Union[SessionNotification, Any], @@ -1196,6 +1130,7 @@ class Model( LoadSessionRequest, SetSessionModeRequest, PromptRequest, + SetSessionModelRequest, Any, ], Union[ @@ -1205,7 +1140,33 @@ class Model( LoadSessionResponse, SetSessionModeResponse, PromptResponse, + SetSessionModelResponse, Any, ], Union[SessionNotification, Any], ] + + +# Backwards compatibility aliases +AvailableCommandInput1 = CommandInputHint +ContentBlock1 = TextContentBlock +ContentBlock2 = ImageContentBlock +ContentBlock3 = AudioContentBlock +ContentBlock4 = ResourceContentBlock +ContentBlock5 = EmbeddedResourceContentBlock +McpServer1 = HttpMcpServer +McpServer2 = SseMcpServer +McpServer3 = StdioMcpServer +RequestPermissionOutcome1 = DeniedOutcome +RequestPermissionOutcome2 = AllowedOutcome +SessionUpdate1 = UserMessageChunk +SessionUpdate2 = AgentMessageChunk +SessionUpdate3 = AgentThoughtChunk +SessionUpdate4 = ToolCallStart +SessionUpdate5 = ToolCallProgress +SessionUpdate6 = AgentPlanUpdate +SessionUpdate7 = AvailableCommandsUpdate +SessionUpdate8 = CurrentModeUpdate +ToolCallContent1 = ContentToolCallContent +ToolCallContent2 = FileEditToolCallContent +ToolCallContent3 = TerminalToolCallContent diff --git a/tests/test_cancel_prompt_flow.py b/tests/test_cancel_prompt_flow.py new file mode 100644 index 0000000..c43263e --- /dev/null +++ b/tests/test_cancel_prompt_flow.py @@ -0,0 +1,62 @@ +import asyncio + +import pytest + +from acp import ( + AgentSideConnection, + CancelNotification, + ClientSideConnection, + PromptRequest, + PromptResponse, +) +from acp.schema import TextContentBlock +from tests.test_rpc import TestAgent, TestClient, _Server + + +class LongRunningAgent(TestAgent): + """Agent variant whose prompt waits for a cancel notification.""" + + def __init__(self) -> None: + super().__init__() + self.prompt_started = asyncio.Event() + self.cancel_received = asyncio.Event() + + async def prompt(self, params: PromptRequest) -> PromptResponse: + self.prompts.append(params) + self.prompt_started.set() + try: + await asyncio.wait_for(self.cancel_received.wait(), timeout=1.0) + except asyncio.TimeoutError as exc: + msg = "Cancel notification did not arrive while prompt pending" + raise AssertionError(msg) from exc + return PromptResponse(stopReason="cancelled") + + async def cancel(self, params: CancelNotification) -> None: + await super().cancel(params) + self.cancel_received.set() + + +@pytest.mark.asyncio +async def test_cancel_reaches_agent_during_prompt() -> None: + async with _Server() as server: + agent = LongRunningAgent() + client = TestClient() + agent_conn = ClientSideConnection(lambda _conn: client, server.client_writer, server.client_reader) + _client_conn = AgentSideConnection(lambda _conn: agent, server.server_writer, server.server_reader) + + prompt_request = PromptRequest( + sessionId="sess-xyz", + prompt=[TextContentBlock(type="text", text="hello")], + ) + prompt_task = asyncio.create_task(agent_conn.prompt(prompt_request)) + + await agent.prompt_started.wait() + assert not prompt_task.done(), "Prompt finished before cancel was sent" + + await agent_conn.cancel(CancelNotification(sessionId="sess-xyz")) + + await asyncio.wait_for(agent.cancel_received.wait(), timeout=1.0) + + response = await asyncio.wait_for(prompt_task, timeout=1.0) + assert response.stopReason == "cancelled" + assert agent.cancellations == ["sess-xyz"] diff --git a/tests/test_permission_flow.py b/tests/test_permission_flow.py new file mode 100644 index 0000000..a4478b3 --- /dev/null +++ b/tests/test_permission_flow.py @@ -0,0 +1,71 @@ +import asyncio + +import pytest + +from acp import ( + AgentSideConnection, + ClientSideConnection, + PromptRequest, + PromptResponse, + RequestPermissionRequest, + RequestPermissionResponse, +) +from acp.schema import PermissionOption, TextContentBlock, ToolCallUpdate +from tests.test_rpc import TestAgent, TestClient, _Server + + +class PermissionRequestAgent(TestAgent): + """Agent that asks the client for permission during a prompt.""" + + def __init__(self, conn: AgentSideConnection) -> None: + super().__init__() + self._conn = conn + self.permission_responses: list[RequestPermissionResponse] = [] + + async def prompt(self, params: PromptRequest) -> PromptResponse: + permission = await self._conn.requestPermission( + RequestPermissionRequest( + sessionId=params.sessionId, + options=[ + PermissionOption(optionId="allow", name="Allow", kind="allow"), + PermissionOption(optionId="deny", name="Deny", kind="deny"), + ], + toolCall=ToolCallUpdate(toolCallId="call-1", title="Write File"), + ) + ) + self.permission_responses.append(permission) + return await super().prompt(params) + + +@pytest.mark.asyncio +async def test_agent_request_permission_roundtrip() -> None: + async with _Server() as server: + client = TestClient() + client.queue_permission_selected("allow") + + captured_agent: list[PermissionRequestAgent] = [] + + agent_conn = ClientSideConnection(lambda _conn: client, server.client_writer, server.client_reader) + _agent_conn = AgentSideConnection( + lambda conn: captured_agent.append(PermissionRequestAgent(conn)) or captured_agent[-1], + server.server_writer, + server.server_reader, + ) + + response = await asyncio.wait_for( + agent_conn.prompt( + PromptRequest( + sessionId="sess-perm", + prompt=[TextContentBlock(type="text", text="needs approval")], + ) + ), + timeout=1.0, + ) + assert response.stopReason == "end_turn" + + assert captured_agent, "Agent was not constructed" + [agent] = captured_agent + assert agent.permission_responses, "Agent did not receive permission response" + permission_response = agent.permission_responses[0] + assert permission_response.outcome.outcome == "selected" + assert permission_response.outcome.optionId == "allow" diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 09aa8ad..ea6fb6e 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -7,25 +7,39 @@ from acp import ( Agent, AgentSideConnection, + AuthenticateRequest, + AuthenticateResponse, CancelNotification, Client, ClientSideConnection, InitializeRequest, InitializeResponse, LoadSessionRequest, + LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse, ReadTextFileRequest, ReadTextFileResponse, + RequestError, RequestPermissionRequest, RequestPermissionResponse, SessionNotification, + SetSessionModelRequest, + SetSessionModelResponse, SetSessionModeRequest, + SetSessionModeResponse, WriteTextFileRequest, + WriteTextFileResponse, +) +from acp.schema import ( + AgentMessageChunk, + AllowedOutcome, + DeniedOutcome, + TextContentBlock, + UserMessageChunk, ) -from acp.schema import ContentBlock1, SessionUpdate1, SessionUpdate2 # --------------------- Test Utilities --------------------- @@ -77,18 +91,28 @@ class TestClient(Client): __test__ = False # prevent pytest from collecting this class def __init__(self) -> None: - self.permission_outcomes: list[dict] = [] + self.permission_outcomes: list[RequestPermissionResponse] = [] self.files: dict[str, str] = {} self.notifications: list[SessionNotification] = [] self.ext_calls: list[tuple[str, dict]] = [] self.ext_notes: list[tuple[str, dict]] = [] + def queue_permission_cancelled(self) -> None: + self.permission_outcomes.append(RequestPermissionResponse(outcome=DeniedOutcome(outcome="cancelled"))) + + def queue_permission_selected(self, option_id: str) -> None: + self.permission_outcomes.append( + RequestPermissionResponse(outcome=AllowedOutcome(optionId=option_id, outcome="selected")) + ) + async def requestPermission(self, params: RequestPermissionRequest) -> RequestPermissionResponse: - outcome = self.permission_outcomes.pop() if self.permission_outcomes else {"outcome": "cancelled"} - return RequestPermissionResponse.model_validate({"outcome": outcome}) + if self.permission_outcomes: + return self.permission_outcomes.pop() + return RequestPermissionResponse(outcome=DeniedOutcome(outcome="cancelled")) - async def writeTextFile(self, params: WriteTextFileRequest) -> None: + async def writeTextFile(self, params: WriteTextFileRequest) -> WriteTextFileResponse: self.files[str(params.path)] = params.content + return WriteTextFileResponse() async def readTextFile(self, params: ReadTextFileRequest) -> ReadTextFileResponse: content = self.files.get(str(params.path), "default content") @@ -98,24 +122,26 @@ async def sessionUpdate(self, params: SessionNotification) -> None: self.notifications.append(params) # Optional terminal methods (not implemented in this test client) - async def createTerminal(self, params) -> None: # pragma: no cover - placeholder - pass + async def createTerminal(self, params): # pragma: no cover - placeholder + raise NotImplementedError - async def terminalOutput(self, params) -> None: # pragma: no cover - placeholder - pass + async def terminalOutput(self, params): # pragma: no cover - placeholder + raise NotImplementedError - async def releaseTerminal(self, params) -> None: # pragma: no cover - placeholder - pass + async def releaseTerminal(self, params): # pragma: no cover - placeholder + raise NotImplementedError - async def waitForTerminalExit(self, params) -> None: # pragma: no cover - placeholder - pass + async def waitForTerminalExit(self, params): # pragma: no cover - placeholder + raise NotImplementedError - async def killTerminal(self, params) -> None: # pragma: no cover - placeholder - pass + async def killTerminal(self, params): # pragma: no cover - placeholder + raise NotImplementedError async def extMethod(self, method: str, params: dict) -> dict: self.ext_calls.append((method, params)) - return {"ok": True, "method": method} + if method == "example.com/ping": + return {"response": "pong", "params": params} + raise RequestError.method_not_found(method) async def extNotification(self, method: str, params: dict) -> None: self.ext_notes.append((method, params)) @@ -137,11 +163,11 @@ async def initialize(self, params: InitializeRequest) -> InitializeResponse: async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: return NewSessionResponse(sessionId="test-session-123") - async def loadSession(self, params: LoadSessionRequest) -> None: - return None + async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse: + return LoadSessionResponse() - async def authenticate(self, params) -> None: - return None + async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse: + return AuthenticateResponse() async def prompt(self, params: PromptRequest) -> PromptResponse: self.prompts.append(params) @@ -150,12 +176,17 @@ async def prompt(self, params: PromptRequest) -> PromptResponse: async def cancel(self, params: CancelNotification) -> None: self.cancellations.append(params.sessionId) - async def setSessionMode(self, params): - return {} + async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse: + return SetSessionModeResponse() + + async def setSessionModel(self, params: SetSessionModelRequest) -> SetSessionModelResponse: + return SetSessionModelResponse() async def extMethod(self, method: str, params: dict) -> dict: self.ext_calls.append((method, params)) - return {"ok": True, "method": method} + if method == "example.com/echo": + return {"echo": params} + raise RequestError.method_not_found(method) async def extNotification(self, method: str, params: dict) -> None: self.ext_notes.append((method, params)) @@ -180,6 +211,22 @@ async def test_initialize_and_new_session(): new_sess = await agent_conn.newSession(NewSessionRequest(mcpServers=[], cwd="/test")) assert new_sess.sessionId == "test-session-123" + load_resp = await agent_conn.loadSession( + LoadSessionRequest(sessionId=new_sess.sessionId, cwd="/test", mcpServers=[]) + ) + assert isinstance(load_resp, LoadSessionResponse) + + auth_resp = await agent_conn.authenticate(AuthenticateRequest(methodId="password")) + assert isinstance(auth_resp, AuthenticateResponse) + + mode_resp = await agent_conn.setSessionMode(SetSessionModeRequest(sessionId=new_sess.sessionId, modeId="ask")) + assert isinstance(mode_resp, SetSessionModeResponse) + + model_resp = await agent_conn.setSessionModel( + SetSessionModelRequest(sessionId=new_sess.sessionId, modelId="gpt-4o") + ) + assert isinstance(model_resp, SetSessionModelResponse) + @pytest.mark.asyncio async def test_bidirectional_file_ops(): @@ -195,9 +242,10 @@ async def test_bidirectional_file_ops(): assert res.content == "Hello, World!" # Agent asks client to write - await client_conn.writeTextFile( + write_result = await client_conn.writeTextFile( WriteTextFileRequest(sessionId="sess", path="/test/file.txt", content="Updated") ) + assert isinstance(write_result, WriteTextFileResponse) assert client.files["/test/file.txt"] == "Updated" @@ -234,18 +282,18 @@ async def test_session_notifications_flow(): await client_conn.sessionUpdate( SessionNotification( sessionId="sess", - update=SessionUpdate2( + update=AgentMessageChunk( sessionUpdate="agent_message_chunk", - content=ContentBlock1(type="text", text="Hello"), + content=TextContentBlock(type="text", text="Hello"), ), ) ) await client_conn.sessionUpdate( SessionNotification( sessionId="sess", - update=SessionUpdate1( + update=UserMessageChunk( sessionUpdate="user_message_chunk", - content=ContentBlock1(type="text", text="World"), + content=TextContentBlock(type="text", text="World"), ), ) ) @@ -319,16 +367,18 @@ async def test_set_session_mode_and_extensions(): agent = TestAgent() client = TestClient() agent_conn = ClientSideConnection(lambda _conn: client, s.client_writer, s.client_reader) - _client_conn = AgentSideConnection(lambda _conn: agent, s.server_writer, s.server_reader) + client_conn = AgentSideConnection(lambda _conn: agent, s.server_writer, s.server_reader) # setSessionMode resp = await agent_conn.setSessionMode(SetSessionModeRequest(sessionId="sess", modeId="yolo")) - # Either empty object or typed response depending on implementation - assert resp is None or resp.__class__.__name__ == "SetSessionModeResponse" + assert isinstance(resp, SetSessionModeResponse) + + model_resp = await agent_conn.setSessionModel(SetSessionModelRequest(sessionId="sess", modelId="gpt-4o-mini")) + assert isinstance(model_resp, SetSessionModelResponse) # extMethod - res = await agent_conn.extMethod("ping", {"x": 1}) - assert res.get("ok") is True + echo = await agent_conn.extMethod("example.com/echo", {"x": 1}) + assert echo == {"echo": {"x": 1}} # extNotification await agent_conn.extNotification("note", {"y": 2}) @@ -336,6 +386,11 @@ async def test_set_session_mode_and_extensions(): await asyncio.sleep(0.05) assert agent.ext_notes and agent.ext_notes[-1][0] == "note" + # client extension method + ping = await client_conn.extMethod("example.com/ping", {"k": 3}) + assert ping == {"response": "pong", "params": {"k": 3}} + assert client.ext_calls and client.ext_calls[-1] == ("example.com/ping", {"k": 3}) + @pytest.mark.asyncio async def test_ignore_invalid_messages(): diff --git a/uv.lock b/uv.lock index 48d77b3..33dffe2 100644 --- a/uv.lock +++ b/uv.lock @@ -4,7 +4,7 @@ requires-python = ">=3.10, <4.0" [[package]] name = "agent-client-protocol" -version = "0.3.0" +version = "0.4.5" source = { editable = "." } dependencies = [ { name = "pydantic" },