Skip to content

Commit 1839086

Browse files
authored
refactor: make routing/sending pipeline better and add optional telemetry (#9)
* refactor: better code style Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> * refactor: split into files Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> * feat: intro task module for better handle Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> * feat: intro router to manager handlers Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> * feat: intro telemetry Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> * fix: make check happy Signed-off-by: Chojan Shang <chojan.shang@vesoft.com> --------- Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
1 parent f65295a commit 1839086

26 files changed

+1878
-698
lines changed

.github/actions/setup-python-env/action.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ runs:
2222
uses: astral-sh/setup-uv@v6
2323
with:
2424
version: ${{ inputs.uv-version }}
25-
enable-cache: 'true'
25+
enable-cache: "true"
2626
cache-suffix: ${{ matrix.python-version }}
2727

2828
- name: Install Python dependencies
29-
run: uv sync --frozen
29+
run: uv sync --all-extras --all-groups --frozen
3030
shell: bash

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ dev = [
4343
"python-dotenv>=1.1.1",
4444
]
4545

46+
[project.optional-dependencies]
47+
logfire = ["logfire>=0.14", "opentelemetry-sdk>=1.28.0"]
48+
4649
[build-system]
4750
requires = ["hatchling"]
4851
build-backend = "hatchling.build"

src/acp/agent/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .connection import AgentSideConnection
2+
3+
__all__ = ["AgentSideConnection"]

src/acp/agent/connection.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from collections.abc import Callable
5+
from typing import Any
6+
7+
from ..connection import Connection, MethodHandler
8+
from ..interfaces import Agent
9+
from ..meta import CLIENT_METHODS
10+
from ..schema import (
11+
CreateTerminalRequest,
12+
CreateTerminalResponse,
13+
KillTerminalCommandRequest,
14+
KillTerminalCommandResponse,
15+
ReadTextFileRequest,
16+
ReadTextFileResponse,
17+
ReleaseTerminalRequest,
18+
ReleaseTerminalResponse,
19+
RequestPermissionRequest,
20+
RequestPermissionResponse,
21+
SessionNotification,
22+
TerminalOutputRequest,
23+
TerminalOutputResponse,
24+
WaitForTerminalExitRequest,
25+
WaitForTerminalExitResponse,
26+
WriteTextFileRequest,
27+
WriteTextFileResponse,
28+
)
29+
from ..terminal import TerminalHandle
30+
from ..utils import notify_model, request_model, request_optional_model
31+
from .router import build_agent_router
32+
33+
__all__ = ["AgentSideConnection"]
34+
35+
_AGENT_CONNECTION_ERROR = "AgentSideConnection requires asyncio StreamWriter/StreamReader"
36+
37+
38+
class AgentSideConnection:
39+
"""Agent-side connection wrapper that dispatches JSON-RPC messages to a Client implementation."""
40+
41+
def __init__(
42+
self,
43+
to_agent: Callable[[AgentSideConnection], Agent],
44+
input_stream: Any,
45+
output_stream: Any,
46+
) -> None:
47+
agent = to_agent(self)
48+
handler = self._create_handler(agent)
49+
50+
if not isinstance(input_stream, asyncio.StreamWriter) or not isinstance(output_stream, asyncio.StreamReader):
51+
raise TypeError(_AGENT_CONNECTION_ERROR)
52+
self._conn = Connection(handler, input_stream, output_stream)
53+
54+
def _create_handler(self, agent: Agent) -> MethodHandler:
55+
router = build_agent_router(agent)
56+
57+
async def handler(method: str, params: Any | None, is_notification: bool) -> Any:
58+
if is_notification:
59+
await router.dispatch_notification(method, params)
60+
return None
61+
return await router.dispatch_request(method, params)
62+
63+
return handler
64+
65+
async def sessionUpdate(self, params: SessionNotification) -> None:
66+
await notify_model(self._conn, CLIENT_METHODS["session_update"], params)
67+
68+
async def requestPermission(self, params: RequestPermissionRequest) -> RequestPermissionResponse:
69+
return await request_model(
70+
self._conn,
71+
CLIENT_METHODS["session_request_permission"],
72+
params,
73+
RequestPermissionResponse,
74+
)
75+
76+
async def readTextFile(self, params: ReadTextFileRequest) -> ReadTextFileResponse:
77+
return await request_model(
78+
self._conn,
79+
CLIENT_METHODS["fs_read_text_file"],
80+
params,
81+
ReadTextFileResponse,
82+
)
83+
84+
async def writeTextFile(self, params: WriteTextFileRequest) -> WriteTextFileResponse | None:
85+
return await request_optional_model(
86+
self._conn,
87+
CLIENT_METHODS["fs_write_text_file"],
88+
params,
89+
WriteTextFileResponse,
90+
)
91+
92+
async def createTerminal(self, params: CreateTerminalRequest) -> TerminalHandle:
93+
create_response = await request_model(
94+
self._conn,
95+
CLIENT_METHODS["terminal_create"],
96+
params,
97+
CreateTerminalResponse,
98+
)
99+
return TerminalHandle(create_response.terminalId, params.sessionId, self._conn)
100+
101+
async def terminalOutput(self, params: TerminalOutputRequest) -> TerminalOutputResponse:
102+
return await request_model(
103+
self._conn,
104+
CLIENT_METHODS["terminal_output"],
105+
params,
106+
TerminalOutputResponse,
107+
)
108+
109+
async def releaseTerminal(self, params: ReleaseTerminalRequest) -> ReleaseTerminalResponse | None:
110+
return await request_optional_model(
111+
self._conn,
112+
CLIENT_METHODS["terminal_release"],
113+
params,
114+
ReleaseTerminalResponse,
115+
)
116+
117+
async def waitForTerminalExit(self, params: WaitForTerminalExitRequest) -> WaitForTerminalExitResponse:
118+
return await request_model(
119+
self._conn,
120+
CLIENT_METHODS["terminal_wait_for_exit"],
121+
params,
122+
WaitForTerminalExitResponse,
123+
)
124+
125+
async def killTerminal(self, params: KillTerminalCommandRequest) -> KillTerminalCommandResponse | None:
126+
return await request_optional_model(
127+
self._conn,
128+
CLIENT_METHODS["terminal_kill"],
129+
params,
130+
KillTerminalCommandResponse,
131+
)
132+
133+
async def extMethod(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
134+
return await self._conn.send_request(f"_{method}", params)
135+
136+
async def extNotification(self, method: str, params: dict[str, Any]) -> None:
137+
await self._conn.send_notification(f"_{method}", params)

src/acp/agent/router.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from ..exceptions import RequestError
6+
from ..interfaces import Agent
7+
from ..meta import AGENT_METHODS
8+
from ..router import MessageRouter, RouterBuilder
9+
from ..schema import (
10+
AuthenticateRequest,
11+
CancelNotification,
12+
InitializeRequest,
13+
LoadSessionRequest,
14+
NewSessionRequest,
15+
PromptRequest,
16+
SetSessionModelRequest,
17+
SetSessionModeRequest,
18+
)
19+
from ..utils import normalize_result
20+
21+
__all__ = ["build_agent_router"]
22+
23+
24+
def build_agent_router(agent: Agent) -> MessageRouter:
25+
builder = RouterBuilder()
26+
27+
builder.request_attr(AGENT_METHODS["initialize"], InitializeRequest, agent, "initialize")
28+
builder.request_attr(AGENT_METHODS["session_new"], NewSessionRequest, agent, "newSession")
29+
builder.request_attr(
30+
AGENT_METHODS["session_load"],
31+
LoadSessionRequest,
32+
agent,
33+
"loadSession",
34+
adapt_result=normalize_result,
35+
)
36+
builder.request_attr(
37+
AGENT_METHODS["session_set_mode"],
38+
SetSessionModeRequest,
39+
agent,
40+
"setSessionMode",
41+
adapt_result=normalize_result,
42+
)
43+
builder.request_attr(AGENT_METHODS["session_prompt"], PromptRequest, agent, "prompt")
44+
builder.request_attr(
45+
AGENT_METHODS["session_set_model"],
46+
SetSessionModelRequest,
47+
agent,
48+
"setSessionModel",
49+
adapt_result=normalize_result,
50+
)
51+
builder.request_attr(
52+
AGENT_METHODS["authenticate"],
53+
AuthenticateRequest,
54+
agent,
55+
"authenticate",
56+
adapt_result=normalize_result,
57+
)
58+
59+
builder.notification_attr(AGENT_METHODS["session_cancel"], CancelNotification, agent, "cancel")
60+
61+
async def handle_extension_request(name: str, payload: dict[str, Any]) -> Any:
62+
ext = getattr(agent, "extMethod", None)
63+
if ext is None:
64+
raise RequestError.method_not_found(f"_{name}")
65+
return await ext(name, payload)
66+
67+
async def handle_extension_notification(name: str, payload: dict[str, Any]) -> None:
68+
ext = getattr(agent, "extNotification", None)
69+
if ext is None:
70+
return
71+
await ext(name, payload)
72+
73+
return builder.build(
74+
request_extensions=handle_extension_request,
75+
notification_extensions=handle_extension_notification,
76+
)

src/acp/client/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .connection import ClientSideConnection
2+
3+
__all__ = ["ClientSideConnection"]

src/acp/client/connection.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from collections.abc import Callable
5+
from typing import Any
6+
7+
from ..connection import Connection, MethodHandler
8+
from ..interfaces import Agent, Client
9+
from ..meta import AGENT_METHODS
10+
from ..schema import (
11+
AuthenticateRequest,
12+
AuthenticateResponse,
13+
CancelNotification,
14+
InitializeRequest,
15+
InitializeResponse,
16+
LoadSessionRequest,
17+
LoadSessionResponse,
18+
NewSessionRequest,
19+
NewSessionResponse,
20+
PromptRequest,
21+
PromptResponse,
22+
SetSessionModelRequest,
23+
SetSessionModelResponse,
24+
SetSessionModeRequest,
25+
SetSessionModeResponse,
26+
)
27+
from ..utils import (
28+
notify_model,
29+
request_model,
30+
request_model_from_dict,
31+
)
32+
from .router import build_client_router
33+
34+
__all__ = ["ClientSideConnection"]
35+
36+
_CLIENT_CONNECTION_ERROR = "ClientSideConnection requires asyncio StreamWriter/StreamReader"
37+
38+
39+
class ClientSideConnection:
40+
"""Client-side connection wrapper that dispatches JSON-RPC messages to an Agent implementation."""
41+
42+
def __init__(
43+
self,
44+
to_client: Callable[[Agent], Client],
45+
input_stream: Any,
46+
output_stream: Any,
47+
) -> None:
48+
if not isinstance(input_stream, asyncio.StreamWriter) or not isinstance(output_stream, asyncio.StreamReader):
49+
raise TypeError(_CLIENT_CONNECTION_ERROR)
50+
51+
client = to_client(self) # type: ignore[arg-type]
52+
handler = self._create_handler(client)
53+
self._conn = Connection(handler, input_stream, output_stream)
54+
55+
def _create_handler(self, client: Client) -> MethodHandler:
56+
router = build_client_router(client)
57+
58+
async def handler(method: str, params: Any | None, is_notification: bool) -> Any:
59+
if is_notification:
60+
await router.dispatch_notification(method, params)
61+
return None
62+
return await router.dispatch_request(method, params)
63+
64+
return handler
65+
66+
async def initialize(self, params: InitializeRequest) -> InitializeResponse:
67+
return await request_model(
68+
self._conn,
69+
AGENT_METHODS["initialize"],
70+
params,
71+
InitializeResponse,
72+
)
73+
74+
async def newSession(self, params: NewSessionRequest) -> NewSessionResponse:
75+
return await request_model(
76+
self._conn,
77+
AGENT_METHODS["session_new"],
78+
params,
79+
NewSessionResponse,
80+
)
81+
82+
async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse:
83+
return await request_model_from_dict(
84+
self._conn,
85+
AGENT_METHODS["session_load"],
86+
params,
87+
LoadSessionResponse,
88+
)
89+
90+
async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse:
91+
return await request_model_from_dict(
92+
self._conn,
93+
AGENT_METHODS["session_set_mode"],
94+
params,
95+
SetSessionModeResponse,
96+
)
97+
98+
async def setSessionModel(self, params: SetSessionModelRequest) -> SetSessionModelResponse:
99+
return await request_model_from_dict(
100+
self._conn,
101+
AGENT_METHODS["session_set_model"],
102+
params,
103+
SetSessionModelResponse,
104+
)
105+
106+
async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse:
107+
return await request_model_from_dict(
108+
self._conn,
109+
AGENT_METHODS["authenticate"],
110+
params,
111+
AuthenticateResponse,
112+
)
113+
114+
async def prompt(self, params: PromptRequest) -> PromptResponse:
115+
return await request_model(
116+
self._conn,
117+
AGENT_METHODS["session_prompt"],
118+
params,
119+
PromptResponse,
120+
)
121+
122+
async def cancel(self, params: CancelNotification) -> None:
123+
await notify_model(self._conn, AGENT_METHODS["session_cancel"], params)
124+
125+
async def extMethod(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
126+
return await self._conn.send_request(f"_{method}", params)
127+
128+
async def extNotification(self, method: str, params: dict[str, Any]) -> None:
129+
await self._conn.send_notification(f"_{method}", params)

0 commit comments

Comments
 (0)