Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
de485fd
feat(cli): add daemon mode for dimos run (DIM-681)
spomichter Mar 6, 2026
ff5094d
fix: address greptile review — fd leak, wrong PID, fabricated log path
spomichter Mar 6, 2026
9cde9ef
feat(cli): add dimos stop and dimos status commands (DIM-682, DIM-684)
spomichter Mar 6, 2026
21f16e7
test: add e2e daemon lifecycle tests with PingPong blueprint
spomichter Mar 6, 2026
3854c8a
fix: rename stderr.log to daemon.log (addresses greptile review)
spomichter Mar 6, 2026
d1fbc51
fix: resolve mypy type errors in stop command (DIM-681)
spomichter Mar 6, 2026
cf3560f
feat: per-run log directory with unified main.jsonl (DIM-685)
spomichter Mar 6, 2026
f1e5a01
fix: migrate existing FileHandlers when set_run_log_dir is called
spomichter Mar 6, 2026
1541993
chore: move daemon tests to dimos/core/ for CI discovery
spomichter Mar 6, 2026
50d456d
chore: mark e2e daemon tests as slow
spomichter Mar 6, 2026
7446f75
test: add CLI integration tests for dimos stop and dimos status (DIM-…
spomichter Mar 6, 2026
0b0caa2
test: add e2e CLI tests against real running blueprint (DIM-682, DIM-…
spomichter Mar 6, 2026
9587115
fix: address paul's review comments
spomichter Mar 6, 2026
5d8b76b
fix: drop daemon.log, redirect all stdio to /dev/null
spomichter Mar 6, 2026
8ce1a9d
fix: restore LOG_BASE_DIR import, remove duplicate set_run_log_dir im…
spomichter Mar 6, 2026
ec1a3f4
fix: address remaining paul review comments
spomichter Mar 6, 2026
c33bd9b
fix: address all remaining paul review comments
spomichter Mar 6, 2026
6f7c4c5
fix: remove module docstring from test_daemon.py
spomichter Mar 6, 2026
32f2661
feat: MCP server enhancements, dimos mcp CLI, agent context, stress t…
spomichter Mar 6, 2026
9caa3fb
fix: address greptile review on PR #1451
spomichter Mar 6, 2026
1858c66
feat: dimos agent-send CLI + MCP method
spomichter Mar 6, 2026
fa3a04f
fix: address greptile review round 2
spomichter Mar 6, 2026
459c09a
feat: module IO introspection via MCP + CLI
spomichter Mar 6, 2026
89364f1
fix: daemon context generation + standalone e2e stress tests
spomichter Mar 7, 2026
5c60adf
refactor: strip module-io, fix greptile review issues 7-13
spomichter Mar 7, 2026
7df0e64
cleanup: remove agent_context.py, fix final greptile nits
spomichter Mar 7, 2026
c3b6114
merge: dev into feat/dim-686-mcp-agent-cli
spomichter Mar 7, 2026
baa7036
fix: address latest greptile review round
spomichter Mar 7, 2026
0237032
fix: resolve mypy errors in worker.py and stress_test_module
spomichter Mar 7, 2026
3ef29ff
perf: class-scoped MCP fixtures, 125s → 51s test runtime
spomichter Mar 7, 2026
8c9ebd7
fix: resolve remaining CI failures (mypy + all_blueprints)
spomichter Mar 7, 2026
959736c
Merge remote-tracking branch 'origin/dev' into feat/dim-686-mcp-agent…
spomichter Mar 8, 2026
315b201
refactor: McpAdapter class + convert custom methods to @skill tools
spomichter Mar 8, 2026
45929a0
fix: alphabetical order in all_blueprints.py for demo-mcp-stress-test
spomichter Mar 8, 2026
1ebfb70
fix: catch HTTPError in McpAdapter, guard None pid in Worker
spomichter Mar 9, 2026
b4ae18a
fix: server_status returns main process PID, not worker PID
spomichter Mar 9, 2026
88bc77c
refactor: use click.ParamType for --arg parsing in mcp call
spomichter Mar 9, 2026
7dd5317
Merge remote-tracking branch 'origin/dev' into feat/dim-686-mcp-agent…
spomichter Mar 9, 2026
dd84280
fix: viewer_backend → viewer rename + KeyValueType test fix
spomichter Mar 9, 2026
6cf6f27
fix: mypy arg-type error for KeyValueType dict(args)
spomichter Mar 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions dimos/agents/mcp/mcp_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Lightweight MCP JSON-RPC client adapter.

``McpAdapter`` provides a typed Python interface to a running MCP server.
It is used by:

* The ``dimos mcp`` CLI commands
* Integration / e2e tests
* Any code that needs to talk to a local MCP server

Usage::

adapter = McpAdapter("http://localhost:9990/mcp")
adapter.wait_for_ready(timeout=10)
tools = adapter.list_tools()
result = adapter.call_tool("echo", {"message": "hi"})
"""

from __future__ import annotations

import time
from typing import Any
import uuid

import requests

from dimos.utils.logging_config import setup_logger

logger = setup_logger()

DEFAULT_TIMEOUT = 30


class McpError(Exception):
"""Raised when the MCP server returns a JSON-RPC error."""

def __init__(self, message: str, code: int | None = None) -> None:
self.code = code
super().__init__(message)


class McpAdapter:
"""Thin JSON-RPC client for a running MCP server."""

def __init__(self, url: str | None = None, timeout: int = DEFAULT_TIMEOUT) -> None:
if url is None:
from dimos.core.global_config import global_config

url = f"http://localhost:{global_config.mcp_port}/mcp"
self.url = url
self.timeout = timeout

# ------------------------------------------------------------------
# Low-level JSON-RPC
# ------------------------------------------------------------------

def call(self, method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Send a JSON-RPC request and return the parsed response.

Raises ``requests.ConnectionError`` if the server is unreachable.
"""
payload: dict[str, Any] = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
}
if params:
payload["params"] = params

resp = requests.post(self.url, json=payload, timeout=self.timeout)
try:
resp.raise_for_status()
except requests.HTTPError as e:
raise McpError(f"HTTP {resp.status_code}: {e}") from e
return resp.json() # type: ignore[no-any-return]

# ------------------------------------------------------------------
# MCP standard methods
# ------------------------------------------------------------------

def initialize(self) -> dict[str, Any]:
"""Send ``initialize`` and return server info."""
return self.call("initialize")

def list_tools(self) -> list[dict[str, Any]]:
"""Return the list of available tools."""
result = self._unwrap(self.call("tools/list"))
return result.get("tools", []) # type: ignore[no-any-return]

def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]:
"""Call a tool by name and return the result dict."""
return self._unwrap(self.call("tools/call", {"name": name, "arguments": arguments or {}}))

def call_tool_text(self, name: str, arguments: dict[str, Any] | None = None) -> str:
"""Call a tool and return just the first text content item."""
result = self.call_tool(name, arguments)
content = result.get("content", [])
if not content:
return ""
return content[0].get("text", str(content[0])) # type: ignore[no-any-return]

# ------------------------------------------------------------------
# Readiness probes
# ------------------------------------------------------------------

def wait_for_ready(self, timeout: float = 10.0, interval: float = 0.5) -> bool:
"""Poll until the MCP server responds, or return False on timeout."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
resp = requests.post(
self.url,
json={"jsonrpc": "2.0", "id": "probe", "method": "initialize"},
timeout=2,
)
if resp.status_code == 200:
return True
except requests.ConnectionError:
pass
time.sleep(interval)
return False

def wait_for_down(self, timeout: float = 10.0, interval: float = 0.5) -> bool:
"""Poll until the MCP server stops responding."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
requests.post(
self.url,
json={"jsonrpc": "2.0", "id": "probe", "method": "initialize"},
timeout=1,
)
except (requests.ConnectionError, requests.ReadTimeout):
return True
time.sleep(interval)
return False

# ------------------------------------------------------------------
# Class methods for discovery
# ------------------------------------------------------------------

@classmethod
def from_run_entry(cls, entry: Any | None = None, timeout: int = DEFAULT_TIMEOUT) -> McpAdapter:
"""Create an adapter from a RunEntry, or discover the latest one.

Falls back to the default URL if no entry is found.
"""
if entry is None:
from dimos.core.run_registry import list_runs

runs = list_runs(alive_only=True)
entry = runs[0] if runs else None

if entry is not None and hasattr(entry, "mcp_url") and entry.mcp_url:
return cls(url=entry.mcp_url, timeout=timeout)

# Fall back to default URL using GlobalConfig port
from dimos.core.global_config import global_config

url = f"http://localhost:{global_config.mcp_port}/mcp"
return cls(url=url, timeout=timeout)

# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------

@staticmethod
def _unwrap(response: dict[str, Any]) -> dict[str, Any]:
"""Extract the ``result`` from a JSON-RPC response, raising on error."""
if "error" in response:
err = response["error"]
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
raise McpError(msg, code=err.get("code") if isinstance(err, dict) else None)
return response.get("result", {}) # type: ignore[no-any-return]

def __repr__(self) -> str:
return f"McpAdapter(url={self.url!r})"
91 changes: 78 additions & 13 deletions dimos/agents/mcp/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@

import asyncio
import json
import os
from typing import TYPE_CHECKING, Any

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from starlette.requests import Request # noqa: TC002
from starlette.responses import Response
import uvicorn

from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.rpc_client import RpcCall, RPCClient
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


from starlette.requests import Request # noqa: TC002

from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.rpc_client import RpcCall, RPCClient

if TYPE_CHECKING:
import concurrent.futures

Expand All @@ -51,6 +51,11 @@
app.state.rpc_calls = {}


# ---------------------------------------------------------------------------
# JSON-RPC helpers
# ---------------------------------------------------------------------------


def _jsonrpc_result(req_id: Any, result: Any) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "result": result}

Expand All @@ -63,6 +68,11 @@ def _jsonrpc_error(req_id: Any, code: int, message: str) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}}


# ---------------------------------------------------------------------------
# JSON-RPC handlers (standard MCP protocol only)
# ---------------------------------------------------------------------------


def _handle_initialize(req_id: Any) -> dict[str, Any]:
return _jsonrpc_result(
req_id,
Expand All @@ -77,11 +87,11 @@ def _handle_initialize(req_id: Any) -> dict[str, Any]:
def _handle_tools_list(req_id: Any, skills: list[SkillInfo]) -> dict[str, Any]:
tools = []

for skill in skills:
schema = json.loads(skill.args_schema)
for s in skills:
schema = json.loads(s.args_schema)
description = schema.pop("description", None)
schema.pop("title", None)
tool = {"name": skill.func_name, "inputSchema": schema}
tool: dict[str, Any] = {"name": s.func_name, "inputSchema": schema}
if description:
tool["description"] = description
tools.append(tool)
Expand Down Expand Up @@ -128,7 +138,7 @@ async def handle_request(
params = request.get("params", {}) or {}
req_id = request.get("id")

# JSON-RPC notifications have no "id" the server must not reply.
# JSON-RPC notifications have no "id" -- the server must not reply.
if "id" not in request:
return None

Expand Down Expand Up @@ -158,6 +168,11 @@ async def mcp_endpoint(request: Request) -> Response:
return JSONResponse(result)


# ---------------------------------------------------------------------------
# McpServer Module
# ---------------------------------------------------------------------------


class McpServer(Module):
def __init__(self) -> None:
super().__init__()
Expand All @@ -183,12 +198,62 @@ def stop(self) -> None:
@rpc
def on_system_modules(self, modules: list[RPCClient]) -> None:
assert self.rpc is not None
app.state.skills = [skill for module in modules for skill in (module.get_skills() or [])]
app.state.skills = [
skill_info for module in modules for skill_info in (module.get_skills() or [])
]
app.state.rpc_calls = {
skill.func_name: RpcCall(None, self.rpc, skill.func_name, skill.class_name, [])
for skill in app.state.skills
skill_info.func_name: RpcCall(
None, self.rpc, skill_info.func_name, skill_info.class_name, []
)
for skill_info in app.state.skills
}

# ------------------------------------------------------------------
# Introspection skills (exposed as MCP tools via tools/list)
# ------------------------------------------------------------------

@skill
def server_status(self) -> str:
"""Get MCP server status: main process PID, deployed modules, and skill count."""
from dimos.core.run_registry import get_most_recent

skills: list[SkillInfo] = app.state.skills
modules = list(dict.fromkeys(s.class_name for s in skills))
entry = get_most_recent()
pid = entry.pid if entry else os.getpid()
return json.dumps(
{
"pid": pid,
"modules": modules,
"skills": [s.func_name for s in skills],
}
)

@skill
def list_modules(self) -> str:
"""List deployed modules and their skills."""
skills: list[SkillInfo] = app.state.skills
modules: dict[str, list[str]] = {}
for s in skills:
modules.setdefault(s.class_name, []).append(s.func_name)
return json.dumps({"modules": modules})

@skill
def agent_send(self, message: str) -> str:
"""Send a message to the running DimOS agent via LCM."""
if not message:
raise ValueError("Message cannot be empty")

from dimos.core.transport import pLCMTransport

transport: pLCMTransport[str] = pLCMTransport("/human_input")
try:
transport.start()
transport.publish(message)
return f"Message sent to agent: {message[:100]}"
finally:
transport.stop()

def _start_server(self, port: int | None = None) -> None:
from dimos.core.global_config import global_config

Expand Down
Loading
Loading