From 36d209468ce1601799932b5d05f9ab8d8947efed Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 04:58:22 -0800 Subject: [PATCH 01/12] Working dimensional MCP server - tested with Claude Code MCP client --- dimos/agents/agent.py | 76 +++++ dimos/agents/spec.py | 3 + dimos/protocol/mcp/README.md | 30 ++ dimos/protocol/mcp/__init__.py | 17 + dimos/protocol/mcp/bridge.py | 53 +++ dimos/protocol/mcp/cli.py | 55 ++++ dimos/protocol/mcp/server.py | 151 +++++++++ dimos/protocol/mcp/test_e2e_unitree.py | 304 ++++++++++++++++++ dimos/protocol/mcp/test_server.py | 268 +++++++++++++++ .../unitree_webrtc/unitree_go2_blueprints.py | 2 +- pyproject.toml | 3 + uv.lock | 50 +++ 12 files changed, 1011 insertions(+), 1 deletion(-) create mode 100644 dimos/protocol/mcp/README.md create mode 100644 dimos/protocol/mcp/__init__.py create mode 100644 dimos/protocol/mcp/bridge.py create mode 100644 dimos/protocol/mcp/cli.py create mode 100644 dimos/protocol/mcp/server.py create mode 100644 dimos/protocol/mcp/test_e2e_unitree.py create mode 100644 dimos/protocol/mcp/test_server.py diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 17f1871210..4255fd488c 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -222,6 +222,8 @@ def get_agent_id(self) -> str: def start(self) -> None: super().start() self.coordinator.start() + if self.config.mcp_port: + self.start_mcp_server(self.config.mcp_port) @rpc def stop(self) -> None: @@ -229,6 +231,80 @@ def stop(self) -> None: self._agent_stopped = True super().stop() + @rpc + def start_mcp_server(self, port: int = 9990) -> None: + """Start TCP server for MCP connections.""" + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + logger.info("MCP client connected") + try: + while True: + data = await reader.readline() + if not data: + break + request = json.loads(data.decode()) + response = self._handle_mcp_request(request) + writer.write(json.dumps(response).encode() + b"\n") + await writer.drain() + finally: + writer.close() + logger.info("MCP client disconnected") + + async def start_server() -> None: + server = await asyncio.start_server(handle_client, "0.0.0.0", port) + logger.info(f"MCP TCP server listening on port {port}") + await server.serve_forever() + + asyncio.run_coroutine_threadsafe(start_server(), self._loop) + + def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: + """Handle MCP JSON-RPC request.""" + method = request.get("method", "") + params = request.get("params", {}) or {} + req_id = request.get("id") + + if method == "initialize": + return { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "dimensional", "version": "1.0.0"}, + }, + } + + if method == "tools/list": + tools = [ + { + "name": c.name, + "description": c.schema.get("function", {}).get("description", ""), + "inputSchema": c.schema.get("function", {}).get("parameters", {}), + } + for c in self.coordinator.skills().values() + if not c.hide_skill + ] + return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}} + + if method == "tools/call": + name, args = params.get("name"), params.get("arguments") or {} + call_id = str(uuid.uuid4()) + self.coordinator.call_skill(call_id, name, args) + snapshot = self.coordinator.generate_snapshot() + result = snapshot.get(call_id) + text = str(result.content()) if result and result.content() else "Skill started" + return { + "jsonrpc": "2.0", + "id": req_id, + "result": {"content": [{"type": "text", "text": text}]}, + } + + return { + "jsonrpc": "2.0", + "id": req_id, + "error": {"code": -32601, "message": f"Unknown: {method}"}, + } + def clear_history(self) -> None: self._history.clear() diff --git a/dimos/agents/spec.py b/dimos/agents/spec.py index 37262dc497..c169f481d2 100644 --- a/dimos/agents/spec.py +++ b/dimos/agents/spec.py @@ -140,6 +140,9 @@ class AgentConfig(ModuleConfig): agent_transport: type[PubSub] = lcm.PickleLCM # type: ignore[type-arg] agent_topic: Any = field(default_factory=lambda: lcm.Topic("/agent")) + # MCP server port (None = disabled) + mcp_port: int | None = None + AnyMessage = Union[SystemMessage, ToolMessage, AIMessage, HumanMessage] diff --git a/dimos/protocol/mcp/README.md b/dimos/protocol/mcp/README.md new file mode 100644 index 0000000000..d33424618f --- /dev/null +++ b/dimos/protocol/mcp/README.md @@ -0,0 +1,30 @@ +# DimOS MCP Server + +Expose DimOS robot skills to Claude Code via Model Context Protocol. + +## Setup + +Add to Claude Code (one command): +```bash +claude mcp add --transport stdio dimos --scope project -- python -m dimos.protocol.mcp.cli --bridge +``` + +## Usage + +**Terminal 1** - Start DimOS: +```bash +dimos --replay run unitree-go2-agentic +``` + +**Claude Code** - Use robot skills: +``` +> move forward 1 meter +> go to the kitchen +> tag this location as "desk" +``` + +## How It Works + +1. `llm_agent(mcp_port=9990)` in the blueprint starts a TCP server +2. Claude Code spawns the bridge (`--bridge`) which connects to `localhost:9990` +3. Skills are exposed as MCP tools (e.g., `relative_move`, `navigate_with_text`) diff --git a/dimos/protocol/mcp/__init__.py b/dimos/protocol/mcp/__init__.py new file mode 100644 index 0000000000..6592849f66 --- /dev/null +++ b/dimos/protocol/mcp/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.protocol.mcp.server import DimensionalMCPServer + +__all__ = ["DimensionalMCPServer"] diff --git a/dimos/protocol/mcp/bridge.py b/dimos/protocol/mcp/bridge.py new file mode 100644 index 0000000000..0b09997798 --- /dev/null +++ b/dimos/protocol/mcp/bridge.py @@ -0,0 +1,53 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""MCP Bridge - Connects stdio (Claude Code) to TCP (DimOS Agent).""" + +import asyncio +import os +import sys + +DEFAULT_PORT = 9990 + + +async def main() -> None: + port = int(os.environ.get("MCP_PORT", DEFAULT_PORT)) + host = os.environ.get("MCP_HOST", "localhost") + + reader, writer = await asyncio.open_connection(host, port) + sys.stderr.write(f"MCP Bridge connected to {host}:{port}\n") + + async def stdin_to_tcp() -> None: + loop = asyncio.get_event_loop() + while True: + line = await loop.run_in_executor(None, sys.stdin.readline) + if not line: + break + writer.write(line.encode()) + await writer.drain() + + async def tcp_to_stdout() -> None: + while True: + data = await reader.readline() + if not data: + break + sys.stdout.write(data.decode()) + sys.stdout.flush() + + await asyncio.gather(stdin_to_tcp(), tcp_to_stdout()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/dimos/protocol/mcp/cli.py b/dimos/protocol/mcp/cli.py new file mode 100644 index 0000000000..07ae868f7c --- /dev/null +++ b/dimos/protocol/mcp/cli.py @@ -0,0 +1,55 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CLI entry point for Dimensional MCP Server. + +Usage: + python -m dimos.protocol.mcp.cli # Standalone stdio server + python -m dimos.protocol.mcp.cli --bridge # Bridge to running DimOS +""" + +from __future__ import annotations + +import asyncio +import sys + + +def main() -> None: + """Main entry point for the MCP CLI.""" + if "--bridge" in sys.argv or "-b" in sys.argv: + from dimos.protocol.mcp.bridge import main as bridge_main + + asyncio.run(bridge_main()) + else: + asyncio.run(run_server()) + + +async def run_server() -> None: + """Run standalone MCP server with default skill containers.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + + server = DimensionalMCPServer() + + try: + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server.register_skills(UnitreeSkillContainer()) + except ImportError: + pass + + await server.run() + + +if __name__ == "__main__": + main() diff --git a/dimos/protocol/mcp/server.py b/dimos/protocol/mcp/server.py new file mode 100644 index 0000000000..b300dd9050 --- /dev/null +++ b/dimos/protocol/mcp/server.py @@ -0,0 +1,151 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MCP Server exposing DimOS skills as tools to Claude Code and other MCP clients.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any +import uuid + +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import TextContent, Tool + +from dimos.protocol.skill.coordinator import SkillCoordinator +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.protocol.skill.skill import SkillContainer + +logger = setup_logger() + + +class DimensionalMCPServer: + """MCP Server exposing DimOS skills as tools. + + This server bridges the MCP protocol with DimOS's SkillCoordinator, + allowing external AI agents like Claude Code to control robots. + + Example: + >>> from dimos.protocol.mcp import DimensionalMCPServer + >>> server = DimensionalMCPServer() + >>> server.register_skills(robot.skill_library) + >>> await server.run() + """ + + def __init__(self, coordinator: SkillCoordinator | None = None): + """Initialize the MCP server. + + Args: + coordinator: Optional SkillCoordinator to use. If not provided, + a new one will be created. + """ + self.coordinator = coordinator if coordinator is not None else SkillCoordinator() + self.server = Server("dimensional") + self._setup_handlers() + + def _setup_handlers(self) -> None: + """Set up MCP protocol handlers.""" + + @self.server.list_tools() + async def list_tools() -> list[Tool]: + """List all available skills as MCP tools.""" + tools = [] + for config in self.coordinator.skills().values(): + if config.hide_skill: + continue + + schema = config.schema + func_schema = schema.get("function", {}) + + tools.append( + Tool( + name=config.name, + description=func_schema.get("description", ""), + inputSchema=func_schema.get( + "parameters", {"type": "object", "properties": {}} + ), + ) + ) + return tools + + @self.server.call_tool() + async def call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]: + """Execute a skill and return the result.""" + call_id = str(uuid.uuid4()) + args = arguments or {} + + logger.info(f"MCP: Calling skill '{name}' with args: {args}") + + # Check if skill exists + skill_config = self.coordinator.get_skill_config(name) + if not skill_config: + return [TextContent(type="text", text=f"Error: Skill '{name}' not found")] + + # Execute the skill + self.coordinator.call_skill(call_id, name, args) + + # Wait for skill completion + try: + await self.coordinator.wait_for_updates(timeout=60) + except Exception as e: + logger.error(f"MCP: Error waiting for skill updates: {e}") + return [TextContent(type="text", text=f"Error: {e}")] + + # Get the result + snapshot = self.coordinator.generate_snapshot() + result = snapshot.get(call_id) + + if result is None: + return [TextContent(type="text", text="Skill executed but no result returned")] + + content = result.content() + if content is None: + content = "Skill completed successfully" + + return [TextContent(type="text", text=str(content))] + + def register_skills(self, container: SkillContainer) -> None: + """Register a skill container with the coordinator. + + Args: + container: SkillContainer with skills to expose via MCP. + """ + self.coordinator.register_skills(container) + + async def run(self) -> None: + """Run the MCP server using stdio transport. + + This starts the coordinator and runs the MCP server, + listening for requests on stdin and responding on stdout. + """ + logger.info("Starting Dimensional MCP Server...") + self.coordinator.start() + + try: + async with stdio_server() as (read_stream, write_stream): + logger.info("MCP Server ready, listening for requests...") + await self.server.run( + read_stream, + write_stream, + self.server.create_initialization_options(), + ) + finally: + logger.info("Shutting down MCP Server...") + self.coordinator.stop() + + def stop(self) -> None: + """Stop the MCP server and coordinator.""" + self.coordinator.stop() diff --git a/dimos/protocol/mcp/test_e2e_unitree.py b/dimos/protocol/mcp/test_e2e_unitree.py new file mode 100644 index 0000000000..a3008f3605 --- /dev/null +++ b/dimos/protocol/mcp/test_e2e_unitree.py @@ -0,0 +1,304 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end tests for MCP server with UnitreeGo2 robot skills. + +These tests verify that: +1. The MCP server can be properly configured with UnitreeGo2 skills +2. Skills are correctly exposed as MCP tools +3. Claude Code (or any MCP client) would be able to import and use the server +4. Tool calls execute correctly with the skill coordinator + +Most tests require LCM infrastructure and are marked with @pytest.mark.lcm. +Run with: pytest -m lcm dimos/protocol/mcp/test_e2e_unitree.py +""" + +from __future__ import annotations + +import subprocess +import sys +import time + +import pytest + +pytestmark = pytest.mark.lcm # Mark all tests in this module as requiring LCM + + +class TestMCPServerE2E: + """End-to-end tests for MCP server with Unitree skills.""" + + def test_mcp_module_importable(self) -> None: + """Test that the MCP server module can be imported.""" + from dimos.protocol.mcp import DimensionalMCPServer + + assert DimensionalMCPServer is not None + + def test_mcp_server_with_unitree_skills(self) -> None: + """Test MCP server setup with Unitree skill container.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + skills = server.coordinator.skills() + + # Verify Unitree skills are registered + assert "relative_move" in skills + assert "wait" in skills + assert "execute_sport_command" in skills + + # Verify skill schemas are generated + move_skill = skills["relative_move"] + schema = move_skill.schema.get("function", {}) + assert schema.get("name") == "relative_move" + assert "parameters" in schema + + def test_mcp_tools_exclude_hidden_skills(self) -> None: + """Test that hidden skills (like current_time) are not exposed as tools.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + skills = server.coordinator.skills() + + # current_time is marked as hide_skill=True + assert "current_time" in skills + assert skills["current_time"].hide_skill is True + + # Verify hidden skills would be filtered in list_tools + visible_skills = [name for name, config in skills.items() if not config.hide_skill] + assert "current_time" not in visible_skills + assert "relative_move" in visible_skills + + def test_mcp_tool_schema_format(self) -> None: + """Test that tool schemas match MCP expected format.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + skills = server.coordinator.skills() + wait_skill = skills["wait"] + + schema = wait_skill.schema + func_schema = schema.get("function", {}) + + # MCP tool format requirements + assert "name" in func_schema + assert "description" in func_schema + assert "parameters" in func_schema + + params = func_schema["parameters"] + assert params.get("type") == "object" + assert "properties" in params + assert "seconds" in params["properties"] + + @pytest.mark.asyncio + async def test_mcp_call_wait_skill(self) -> None: + """Test calling the 'wait' skill through MCP coordinator.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + server.coordinator.start() + + try: + call_id = "e2e-wait-test" + start_time = time.time() + + # Call the wait skill with 0.5 seconds + server.coordinator.call_skill(call_id, "wait", {"seconds": 0.5}) + + # Wait for completion + await server.coordinator.wait_for_updates(timeout=5) + elapsed = time.time() - start_time + + snapshot = server.coordinator.generate_snapshot() + result = snapshot.get(call_id) + + assert result is not None + assert "Wait completed" in str(result.content()) + assert elapsed >= 0.5 # Should have waited at least 0.5s + finally: + server.coordinator.stop() + container.stop() + + def test_cli_module_runnable(self) -> None: + """Test that the CLI module can be loaded without errors.""" + # This simulates what Claude Code would do when starting the server + result = subprocess.run( + [sys.executable, "-c", "from dimos.protocol.mcp.cli import main; print('OK')"], + capture_output=True, + text=True, + timeout=10, + ) + assert result.returncode == 0 + assert "OK" in result.stdout + + def test_server_can_be_instantiated_from_cli_context(self) -> None: + """Test server instantiation as it would happen from CLI.""" + result = subprocess.run( + [ + sys.executable, + "-c", + """ +from dimos.protocol.mcp.server import DimensionalMCPServer +from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + +server = DimensionalMCPServer() +container = UnitreeSkillContainer() +server.register_skills(container) + +skills = server.coordinator.skills() +print(f"Registered {len(skills)} skills") +print("Skills:", list(skills.keys())) +""", + ], + capture_output=True, + text=True, + timeout=10, + ) + assert result.returncode == 0, f"Failed with stderr: {result.stderr}" + assert "Registered" in result.stdout + assert "relative_move" in result.stdout + + +class TestMCPProtocolCompatibility: + """Tests to verify MCP protocol compatibility.""" + + def test_tool_inputschema_is_valid_json_schema(self) -> None: + """Test that inputSchema is valid JSON Schema format.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + for name, config in server.coordinator.skills().items(): + if config.hide_skill: + continue + + schema = config.schema.get("function", {}) + params = schema.get("parameters", {}) + + # JSON Schema requirements + assert params.get("type") == "object", f"Skill {name} missing type: object" + assert "properties" in params, f"Skill {name} missing properties" + + def test_mcp_server_protocol_version(self) -> None: + """Test that server reports correct MCP protocol version.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + + server = DimensionalMCPServer() + + # Server should be named "dimensional" + assert server.server.name == "dimensional" + + @pytest.mark.asyncio + async def test_coordinator_state_tracking(self) -> None: + """Test that skill states are properly tracked for MCP responses.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.protocol.skill.coordinator import SkillStateEnum + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + server.coordinator.start() + + try: + call_id = "state-tracking-test" + server.coordinator.call_skill(call_id, "wait", {"seconds": 0.1}) + + # Initially should be pending or running + initial_state = server.coordinator._skill_state.get(call_id) + assert initial_state is not None + assert initial_state.state in [SkillStateEnum.pending, SkillStateEnum.running] + + # Wait for completion + await server.coordinator.wait_for_updates(timeout=5) + snapshot = server.coordinator.generate_snapshot() + + result = snapshot.get(call_id) + assert result is not None + assert result.state == SkillStateEnum.completed + finally: + server.coordinator.stop() + container.stop() + + +class TestClaudeCodeIntegration: + """Tests simulating Claude Code MCP client behavior.""" + + def test_mcp_config_format(self) -> None: + """Test that the expected Claude Code config format works.""" + # Verify the module path is valid (this is what Claude Code would call) + result = subprocess.run( + [sys.executable, "-c", "import dimos.protocol.mcp.cli"], + capture_output=True, + text=True, + timeout=10, + ) + assert result.returncode == 0, f"CLI import failed: {result.stderr}" + + def test_tool_call_argument_formats(self) -> None: + """Test that various argument formats work with skill calls.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + # Test that get_skill_config works (used in call_tool handler) + config = server.coordinator.get_skill_config("wait") + assert config is not None + assert config.name == "wait" + + # Test with missing skill + missing = server.coordinator.get_skill_config("nonexistent_skill") + assert missing is None + + def test_skill_descriptions_for_llm(self) -> None: + """Test that skill descriptions are suitable for LLM consumption.""" + from dimos.protocol.mcp.server import DimensionalMCPServer + from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer + + server = DimensionalMCPServer() + container = UnitreeSkillContainer() + server.register_skills(container) + + skills = server.coordinator.skills() + + # Check that key skills have meaningful descriptions + relative_move = skills["relative_move"] + desc = relative_move.schema.get("function", {}).get("description", "") + assert len(desc) > 20, "relative_move should have a detailed description" + assert "move" in desc.lower() or "robot" in desc.lower() + + # Note: execute_sport_command has its docstring set dynamically after + # class definition, so the @skill decorator schema doesn't capture it. + # This is a known limitation of the schema generation. + execute_cmd = skills["execute_sport_command"] + assert execute_cmd is not None, "execute_sport_command skill should exist" diff --git a/dimos/protocol/mcp/test_server.py b/dimos/protocol/mcp/test_server.py new file mode 100644 index 0000000000..159d4c4f76 --- /dev/null +++ b/dimos/protocol/mcp/test_server.py @@ -0,0 +1,268 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the Dimensional MCP Server. + +These tests require LCM infrastructure and are marked with @pytest.mark.lcm. +Run with: pytest -m lcm dimos/protocol/mcp/test_server.py +""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import pytest + +from dimos.core import Module, rpc +from dimos.protocol.mcp.server import DimensionalMCPServer +from dimos.protocol.skill.coordinator import SkillCoordinator +from dimos.protocol.skill.skill import skill + +if TYPE_CHECKING: + from collections.abc import Generator + +pytestmark = pytest.mark.lcm # Mark all tests in this module as requiring LCM + + +class MockSkillContainer(Module): + """Simple skill container for testing.""" + + @rpc + def start(self) -> None: + super().start() + + @rpc + def stop(self) -> None: + super().stop() + + @skill() + def add(self, x: int, y: int) -> int: + """Add two numbers together.""" + return x + y + + @skill() + def greet(self, name: str) -> str: + """Greet a person by name.""" + return f"Hello, {name}!" + + @skill() + def slow_task(self, delay: float) -> str: + """A slow task that takes time to complete.""" + time.sleep(delay) + return f"Completed after {delay}s" + + @skill(hide_skill=True) + def hidden_skill(self) -> str: + """This skill should not be visible via MCP.""" + return "Hidden" + + +@pytest.fixture +def mcp_server() -> Generator[DimensionalMCPServer, None, None]: + """Fixture that provides a properly cleaned up MCP server.""" + server = DimensionalMCPServer() + yield server + server.stop() + + +@pytest.fixture +def mcp_server_with_skills() -> Generator[ + tuple[DimensionalMCPServer, MockSkillContainer], None, None +]: + """Fixture that provides an MCP server with mock skills registered.""" + server = DimensionalMCPServer() + container = MockSkillContainer() + server.register_skills(container) + yield server, container + server.stop() + container.stop() + + +class TestDimensionalMCPServer: + """Test suite for DimensionalMCPServer.""" + + def test_server_initialization(self, mcp_server: DimensionalMCPServer) -> None: + """Test that the server initializes correctly.""" + assert mcp_server.coordinator is not None + assert mcp_server.server is not None + + def test_server_with_existing_coordinator(self) -> None: + """Test server initialization with an existing coordinator.""" + coordinator = SkillCoordinator() + server = DimensionalMCPServer(coordinator=coordinator) + try: + assert server.coordinator is coordinator + finally: + server.stop() + + def test_register_skills( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test that skills can be registered with the server.""" + server, _ = mcp_server_with_skills + skills = server.coordinator.skills() + assert "add" in skills + assert "greet" in skills + + @pytest.mark.asyncio + async def test_list_tools( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test that tools are listed correctly via MCP protocol.""" + server, _ = mcp_server_with_skills + + # Get the list_tools handler directly from the server + # The handler is registered as a decorator, we need to call it via server internals + tools = [] + for config in server.coordinator.skills().values(): + if not config.hide_skill: + tools.append( + { + "name": config.name, + "description": config.schema.get("function", {}).get("description", ""), + } + ) + + # Verify tools are listed (excluding hidden ones) + tool_names = [t["name"] for t in tools] + assert "add" in tool_names + assert "greet" in tool_names + assert "slow_task" in tool_names + assert "hidden_skill" not in tool_names # Hidden skills should not be listed + + @pytest.mark.asyncio + async def test_call_tool_add( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test calling the 'add' skill via MCP.""" + server, _container = mcp_server_with_skills + server.coordinator.start() + + try: + call_id = "test-add-1" + server.coordinator.call_skill(call_id, "add", {"x": 5, "y": 3}) + + # Wait for the result + await server.coordinator.wait_for_updates(timeout=5) + snapshot = server.coordinator.generate_snapshot() + + result = snapshot.get(call_id) + assert result is not None + assert result.content() == 8 + finally: + server.coordinator.stop() + + @pytest.mark.asyncio + async def test_call_tool_greet( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test calling the 'greet' skill via MCP.""" + server, _container = mcp_server_with_skills + server.coordinator.start() + + try: + call_id = "test-greet-1" + server.coordinator.call_skill(call_id, "greet", {"name": "World"}) + + await server.coordinator.wait_for_updates(timeout=5) + snapshot = server.coordinator.generate_snapshot() + + result = snapshot.get(call_id) + assert result is not None + assert result.content() == "Hello, World!" + finally: + server.coordinator.stop() + + @pytest.mark.asyncio + async def test_tool_schema_generation( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test that tool schemas are generated correctly.""" + server, _ = mcp_server_with_skills + + skills = server.coordinator.skills() + add_skill = skills.get("add") + + assert add_skill is not None + schema = add_skill.schema + func_schema = schema.get("function", {}) + + # Verify schema has required fields + assert func_schema.get("name") == "add" + assert "description" in func_schema + assert "parameters" in func_schema + + # Verify parameters schema + params = func_schema.get("parameters", {}) + assert params.get("type") == "object" + assert "properties" in params + assert "x" in params["properties"] + assert "y" in params["properties"] + + def test_hidden_skills_not_exposed( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test that hidden skills are not exposed via MCP.""" + server, _ = mcp_server_with_skills + + skills = server.coordinator.skills() + + # hidden_skill should be in the coordinator but marked as hidden + assert "hidden_skill" in skills + assert skills["hidden_skill"].hide_skill is True + + @pytest.mark.asyncio + async def test_multiple_concurrent_calls( + self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] + ) -> None: + """Test multiple concurrent skill calls.""" + server, _container = mcp_server_with_skills + server.coordinator.start() + + try: + # Start multiple calls + server.coordinator.call_skill("call-1", "add", {"x": 1, "y": 1}) + server.coordinator.call_skill("call-2", "add", {"x": 2, "y": 2}) + server.coordinator.call_skill("call-3", "add", {"x": 3, "y": 3}) + + # Wait for all results + results: dict[str, int] = {} + timeout = 10 + start = time.time() + + while len(results) < 3 and (time.time() - start) < timeout: + await server.coordinator.wait_for_updates(timeout=1) + snapshot = server.coordinator.generate_snapshot() + for call_id, state in snapshot.items(): + if state.content() is not None: + results[call_id] = state.content() # type: ignore[assignment] + + assert results.get("call-1") == 2 + assert results.get("call-2") == 4 + assert results.get("call-3") == 6 + finally: + server.coordinator.stop() + + +@pytest.mark.asyncio +async def test_server_importable() -> None: + """Test that the MCP server can be imported correctly.""" + from dimos.protocol.mcp import DimensionalMCPServer + + server = DimensionalMCPServer() + try: + assert server is not None + finally: + server.stop() diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 2d962af981..a781f756d8 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -161,7 +161,7 @@ agentic = autoconnect( spatial, - llm_agent(), + llm_agent(mcp_port=9990), _common_agentic, ) diff --git a/pyproject.toml b/pyproject.toml index 206263d209..bd7bc13da9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,9 @@ dependencies = [ "sse-starlette>=2.2.1", "uvicorn>=0.34.0", + # MCP Server + "mcp>=1.0.0", + # Agents "langchain>=1,<2", "langchain-chroma>=1,<2", diff --git a/uv.lock b/uv.lock index 6a89d211df..d497efcad4 100644 --- a/uv.lock +++ b/uv.lock @@ -1480,6 +1480,7 @@ dependencies = [ { name = "lap" }, { name = "lark" }, { name = "llvmlite" }, + { name = "mcp" }, { name = "moondream" }, { name = "numba" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, @@ -1656,6 +1657,7 @@ requires-dist = [ { name = "lvis", marker = "extra == 'cuda'" }, { name = "lxml-stubs", marker = "extra == 'dev'", specifier = ">=0.5.1,<1" }, { name = "matplotlib", marker = "extra == 'manipulation'", specifier = ">=3.7.1" }, + { name = "mcp", specifier = ">=1.0.0" }, { name = "mmcv", marker = "extra == 'cuda'", specifier = ">=2.1.0" }, { name = "mmengine", marker = "extra == 'cuda'", specifier = ">=0.10.3" }, { name = "moondream" }, @@ -2666,6 +2668,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[[package]] +name = "httpx-sse" +version = "0.4.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0f/4c/751061ffa58615a32c31b2d82e8482be8dd4a89154f003147acee90f2be9/httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d", size = 15943, upload-time = "2025-10-10T21:48:22.271Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d2/fd/6668e5aec43ab844de6fc74927e155a3b37bf40d7c3790e49fc0406b6578/httpx_sse-0.4.3-py3-none-any.whl", hash = "sha256:0ac1c9fe3c0afad2e0ebb25a934a59f4c7823b60792691f779fad2c5568830fc", size = 8960, upload-time = "2025-10-10T21:48:21.158Z" }, +] + [[package]] name = "huggingface-hub" version = "0.36.0" @@ -4248,6 +4259,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/af/33/ee4519fa02ed11a94aef9559552f3b17bb863f2ecfe1a35dc7f548cde231/matplotlib_inline-0.2.1-py3-none-any.whl", hash = "sha256:d56ce5156ba6085e00a9d54fead6ed29a9c47e215cd1bba2e976ef39f5710a76", size = 9516, upload-time = "2025-10-23T09:00:20.675Z" }, ] +[[package]] +name = "mcp" +version = "1.25.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "jsonschema" }, + { name = "pydantic" }, + { name = "pydantic-settings" }, + { name = "pyjwt", extra = ["crypto"] }, + { name = "python-multipart" }, + { name = "pywin32", marker = "sys_platform == 'win32'" }, + { name = "sse-starlette" }, + { name = "starlette" }, + { name = "typing-extensions" }, + { name = "typing-inspection" }, + { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d5/2d/649d80a0ecf6a1f82632ca44bec21c0461a9d9fc8934d38cb5b319f2db5e/mcp-1.25.0.tar.gz", hash = "sha256:56310361ebf0364e2d438e5b45f7668cbb124e158bb358333cd06e49e83a6802", size = 605387, upload-time = "2025-12-19T10:19:56.985Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e2/fc/6dc7659c2ae5ddf280477011f4213a74f806862856b796ef08f028e664bf/mcp-1.25.0-py3-none-any.whl", hash = "sha256:b37c38144a666add0862614cc79ec276e97d72aa8ca26d622818d4e278b9721a", size = 233076, upload-time = "2025-12-19T10:19:55.416Z" }, +] + [[package]] name = "mdit-py-plugins" version = "0.5.0" @@ -6805,6 +6841,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, ] +[[package]] +name = "pyjwt" +version = "2.10.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e7/46/bd74733ff231675599650d3e47f361794b22ef3e3770998dda30d3b63726/pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953", size = 87785, upload-time = "2024-11-28T03:43:29.933Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" }, +] + +[package.optional-dependencies] +crypto = [ + { name = "cryptography" }, +] + [[package]] name = "pylibsrtp" version = "1.0.0" From a82dc646f62e64d85268d3f850aa3047d54d4875 Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 20:11:41 -0800 Subject: [PATCH 02/12] fix relative move float cast bug --- dimos/robot/unitree_webrtc/unitree_skill_container.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dimos/robot/unitree_webrtc/unitree_skill_container.py b/dimos/robot/unitree_webrtc/unitree_skill_container.py index 1d2557da4d..c3dea43424 100644 --- a/dimos/robot/unitree_webrtc/unitree_skill_container.py +++ b/dimos/robot/unitree_webrtc/unitree_skill_container.py @@ -81,6 +81,7 @@ def relative_move(self, forward: float = 0.0, left: float = 0.0, degrees: float # Move 3 meters left, and face that direction relative_move(forward=0, left=3, degrees=90) """ + forward, left, degrees = float(forward), float(left), float(degrees) tf = self.tf.get("world", "base_link") if tf is None: From ae2ab82f3373fde7e904231df8f891d4973e1ad5 Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 20:13:31 -0800 Subject: [PATCH 03/12] Return skill state to MCP server for observability - still some race conditions/bugs --- dimos/agents/agent.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 4255fd488c..0c2d715579 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -37,6 +37,7 @@ SkillCoordinator, SkillState, SkillStateDict, + SkillStateEnum, ) from dimos.protocol.skill.skill import SkillContainer from dimos.protocol.skill.type import Output @@ -243,7 +244,7 @@ async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWrit if not data: break request = json.loads(data.decode()) - response = self._handle_mcp_request(request) + response = await self._handle_mcp_request(request) writer.write(json.dumps(response).encode() + b"\n") await writer.drain() finally: @@ -257,7 +258,7 @@ async def start_server() -> None: asyncio.run_coroutine_threadsafe(start_server(), self._loop) - def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: + async def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: """Handle MCP JSON-RPC request.""" method = request.get("method", "") params = request.get("params", {}) or {} @@ -290,9 +291,27 @@ def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: name, args = params.get("name"), params.get("arguments") or {} call_id = str(uuid.uuid4()) self.coordinator.call_skill(call_id, name, args) - snapshot = self.coordinator.generate_snapshot() - result = snapshot.get(call_id) - text = str(result.content()) if result and result.content() else "Skill started" + + # Wait for skill completion (up to 5s for immediate skills) + # TODO: Fix: This is a hack and bad pracice, but currently we don't have a way to + # access specific skill responses once its sent from the Coordinator into the threadpool for execution + try: + await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) + except asyncio.TimeoutError: + pass # Skill still running, return current state + + snapshot: SkillStateDict = self.coordinator.generate_snapshot(clear=False) + result: SkillState | None = snapshot.get(call_id) + + if result is None: + text = "Skill not found" + elif result.state == SkillStateEnum.completed: + text = str(result.content()) if result.content() else "Completed" + elif result.state == SkillStateEnum.error: + text = f"Error: {result.content()}" + else: + text = f"Started ({result.state.name})" + return { "jsonrpc": "2.0", "id": req_id, From 01e1325230aa827230fe393a20d4d107507eef68 Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 20:51:56 -0800 Subject: [PATCH 04/12] Fix race condition bug with SkillState --- dimos/agents/agent.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 0c2d715579..7bb762e729 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -292,16 +292,19 @@ async def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: call_id = str(uuid.uuid4()) self.coordinator.call_skill(call_id, name, args) + # Grab reference immediately (survives dict deletion by agent loop) + result: SkillState | None = self.coordinator._skill_state.get(call_id) + logger.info(f"MCP: Called skill '{name}', got result ref: {result is not None}") + # Wait for skill completion (up to 5s for immediate skills) - # TODO: Fix: This is a hack and bad pracice, but currently we don't have a way to - # access specific skill responses once its sent from the Coordinator into the threadpool for execution try: await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) except asyncio.TimeoutError: pass # Skill still running, return current state - snapshot: SkillStateDict = self.coordinator.generate_snapshot(clear=False) - result: SkillState | None = snapshot.get(call_id) + logger.info( + f"MCP: After wait - state: {result.state if result else None}, content: {result.content() if result else None}" + ) if result is None: text = "Skill not found" From a090e4e10a34a0af4b2f8bb539e46a4cbaf846d7 Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 23:40:13 -0800 Subject: [PATCH 05/12] Cleanup MCP client bridge --- dimos/protocol/mcp/README.md | 2 +- dimos/protocol/mcp/__main__.py | 36 ++++++++++++++++++++++ dimos/protocol/mcp/cli.py | 55 ---------------------------------- 3 files changed, 37 insertions(+), 56 deletions(-) create mode 100644 dimos/protocol/mcp/__main__.py delete mode 100644 dimos/protocol/mcp/cli.py diff --git a/dimos/protocol/mcp/README.md b/dimos/protocol/mcp/README.md index d33424618f..2a3c382484 100644 --- a/dimos/protocol/mcp/README.md +++ b/dimos/protocol/mcp/README.md @@ -6,7 +6,7 @@ Expose DimOS robot skills to Claude Code via Model Context Protocol. Add to Claude Code (one command): ```bash -claude mcp add --transport stdio dimos --scope project -- python -m dimos.protocol.mcp.cli --bridge +claude mcp add --transport stdio dimos --scope project -- python -m dimos.protocol.mcp ``` ## Usage diff --git a/dimos/protocol/mcp/__main__.py b/dimos/protocol/mcp/__main__.py new file mode 100644 index 0000000000..a58e59d367 --- /dev/null +++ b/dimos/protocol/mcp/__main__.py @@ -0,0 +1,36 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CLI entry point for Dimensional MCP Bridge. + +Connects Claude Code (or other MCP clients) to a running DimOS agent. + +Usage: + python -m dimos.protocol.mcp # Bridge to running DimOS on default port +""" + +from __future__ import annotations + +import asyncio + +from dimos.protocol.mcp.bridge import main as bridge_main + + +def main() -> None: + """Main entry point - connects to running DimOS via bridge.""" + asyncio.run(bridge_main()) + + +if __name__ == "__main__": + main() diff --git a/dimos/protocol/mcp/cli.py b/dimos/protocol/mcp/cli.py deleted file mode 100644 index 07ae868f7c..0000000000 --- a/dimos/protocol/mcp/cli.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""CLI entry point for Dimensional MCP Server. - -Usage: - python -m dimos.protocol.mcp.cli # Standalone stdio server - python -m dimos.protocol.mcp.cli --bridge # Bridge to running DimOS -""" - -from __future__ import annotations - -import asyncio -import sys - - -def main() -> None: - """Main entry point for the MCP CLI.""" - if "--bridge" in sys.argv or "-b" in sys.argv: - from dimos.protocol.mcp.bridge import main as bridge_main - - asyncio.run(bridge_main()) - else: - asyncio.run(run_server()) - - -async def run_server() -> None: - """Run standalone MCP server with default skill containers.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - - server = DimensionalMCPServer() - - try: - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server.register_skills(UnitreeSkillContainer()) - except ImportError: - pass - - await server.run() - - -if __name__ == "__main__": - main() From 6784dcc30fbfcdd82aa4e351b59dfc663d285d3b Mon Sep 17 00:00:00 2001 From: stash Date: Mon, 5 Jan 2026 23:40:41 -0800 Subject: [PATCH 06/12] Fix mypy --- dimos/protocol/mcp/server.py | 4 ++-- dimos/protocol/mcp/test_e2e_unitree.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dimos/protocol/mcp/server.py b/dimos/protocol/mcp/server.py index b300dd9050..ad1db22bc5 100644 --- a/dimos/protocol/mcp/server.py +++ b/dimos/protocol/mcp/server.py @@ -59,7 +59,7 @@ def __init__(self, coordinator: SkillCoordinator | None = None): def _setup_handlers(self) -> None: """Set up MCP protocol handlers.""" - @self.server.list_tools() + @self.server.list_tools() # type: ignore[misc] async def list_tools() -> list[Tool]: """List all available skills as MCP tools.""" tools = [] @@ -81,7 +81,7 @@ async def list_tools() -> list[Tool]: ) return tools - @self.server.call_tool() + @self.server.call_tool() # type: ignore[misc] async def call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]: """Execute a skill and return the result.""" call_id = str(uuid.uuid4()) diff --git a/dimos/protocol/mcp/test_e2e_unitree.py b/dimos/protocol/mcp/test_e2e_unitree.py index a3008f3605..b921187472 100644 --- a/dimos/protocol/mcp/test_e2e_unitree.py +++ b/dimos/protocol/mcp/test_e2e_unitree.py @@ -143,6 +143,7 @@ async def test_mcp_call_wait_skill(self) -> None: server.coordinator.stop() container.stop() + @pytest.mark.exclude # Skip in CI - subprocess import triggers LCM init def test_cli_module_runnable(self) -> None: """Test that the CLI module can be loaded without errors.""" # This simulates what Claude Code would do when starting the server @@ -155,6 +156,7 @@ def test_cli_module_runnable(self) -> None: assert result.returncode == 0 assert "OK" in result.stdout + @pytest.mark.exclude # Skip in CI - subprocess import triggers LCM init def test_server_can_be_instantiated_from_cli_context(self) -> None: """Test server instantiation as it would happen from CLI.""" result = subprocess.run( From 56081c3b8cb44042af4328b7613c121580b7d7de Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 05:08:52 -0800 Subject: [PATCH 07/12] RE implemented MCP as a Module that plugs into Agent / Coordinator via RPC --- dimos/agents/agent.py | 127 ++------ dimos/agents/spec.py | 3 - dimos/protocol/mcp/__init__.py | 4 +- dimos/protocol/mcp/mcp.py | 108 +++++++ dimos/protocol/mcp/server.py | 151 --------- dimos/protocol/mcp/test_e2e_unitree.py | 306 ------------------ dimos/protocol/mcp/test_server.py | 268 --------------- .../unitree_webrtc/unitree_go2_blueprints.py | 10 +- 8 files changed, 146 insertions(+), 831 deletions(-) create mode 100644 dimos/protocol/mcp/mcp.py delete mode 100644 dimos/protocol/mcp/server.py delete mode 100644 dimos/protocol/mcp/test_e2e_unitree.py delete mode 100644 dimos/protocol/mcp/test_server.py diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 7bb762e729..00c9aa8889 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -33,12 +33,7 @@ from dimos.agents.spec import AgentSpec, Model, Provider from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import DimosCluster, rpc -from dimos.protocol.skill.coordinator import ( - SkillCoordinator, - SkillState, - SkillStateDict, - SkillStateEnum, -) +from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict from dimos.protocol.skill.skill import SkillContainer from dimos.protocol.skill.type import Output from dimos.utils.logging_config import setup_logger @@ -223,8 +218,6 @@ def get_agent_id(self) -> str: def start(self) -> None: super().start() self.coordinator.start() - if self.config.mcp_port: - self.start_mcp_server(self.config.mcp_port) @rpc def stop(self) -> None: @@ -232,103 +225,37 @@ def stop(self) -> None: self._agent_stopped = True super().stop() + def clear_history(self) -> None: + self._history.clear() + @rpc - def start_mcp_server(self, port: int = 9990) -> None: - """Start TCP server for MCP connections.""" - - async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: - logger.info("MCP client connected") - try: - while True: - data = await reader.readline() - if not data: - break - request = json.loads(data.decode()) - response = await self._handle_mcp_request(request) - writer.write(json.dumps(response).encode() + b"\n") - await writer.drain() - finally: - writer.close() - logger.info("MCP client disconnected") - - async def start_server() -> None: - server = await asyncio.start_server(handle_client, "0.0.0.0", port) - logger.info(f"MCP TCP server listening on port {port}") - await server.serve_forever() - - asyncio.run_coroutine_threadsafe(start_server(), self._loop) - - async def _handle_mcp_request(self, request: dict[str, Any]) -> dict[str, Any]: - """Handle MCP JSON-RPC request.""" - method = request.get("method", "") - params = request.get("params", {}) or {} - req_id = request.get("id") - - if method == "initialize": - return { - "jsonrpc": "2.0", - "id": req_id, - "result": { - "protocolVersion": "2024-11-05", - "capabilities": {"tools": {}}, - "serverInfo": {"name": "dimensional", "version": "1.0.0"}, - }, + def list_skills(self) -> list[dict[str, Any]]: + return [ + { + "name": c.name, + "description": c.schema.get("function", {}).get("description", ""), + "inputSchema": c.schema.get("function", {}).get("parameters", {}), } + for c in self.coordinator.skills().values() + if not c.hide_skill + ] - if method == "tools/list": - tools = [ - { - "name": c.name, - "description": c.schema.get("function", {}).get("description", ""), - "inputSchema": c.schema.get("function", {}).get("parameters", {}), - } - for c in self.coordinator.skills().values() - if not c.hide_skill - ] - return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}} - - if method == "tools/call": - name, args = params.get("name"), params.get("arguments") or {} - call_id = str(uuid.uuid4()) - self.coordinator.call_skill(call_id, name, args) - - # Grab reference immediately (survives dict deletion by agent loop) - result: SkillState | None = self.coordinator._skill_state.get(call_id) - logger.info(f"MCP: Called skill '{name}', got result ref: {result is not None}") - - # Wait for skill completion (up to 5s for immediate skills) - try: - await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) - except asyncio.TimeoutError: - pass # Skill still running, return current state - - logger.info( - f"MCP: After wait - state: {result.state if result else None}, content: {result.content() if result else None}" - ) - - if result is None: - text = "Skill not found" - elif result.state == SkillStateEnum.completed: - text = str(result.content()) if result.content() else "Completed" - elif result.state == SkillStateEnum.error: - text = f"Error: {result.content()}" - else: - text = f"Started ({result.state.name})" - - return { - "jsonrpc": "2.0", - "id": req_id, - "result": {"content": [{"type": "text", "text": text}]}, - } + @rpc + def call_skill(self, call_id: str, name: str, args: dict[str, Any]) -> None: + self.coordinator.call_skill(call_id, name, args) - return { - "jsonrpc": "2.0", - "id": req_id, - "error": {"code": -32601, "message": f"Unknown: {method}"}, - } + @rpc + def wait_for_skill_updates(self, timeout: float | None = None) -> bool: + return asyncio.run_coroutine_threadsafe( + self.coordinator.wait_for_updates(timeout=timeout), self._loop + ).result() - def clear_history(self) -> None: - self._history.clear() + @rpc + def get_skill_state(self, call_id: str) -> dict[str, Any] | None: + state: SkillState | None = self.coordinator._skill_state.get(call_id) + if state is None: + return None + return {"state": state.state.name, "content": state.content()} def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: for msg in msgs: diff --git a/dimos/agents/spec.py b/dimos/agents/spec.py index c169f481d2..37262dc497 100644 --- a/dimos/agents/spec.py +++ b/dimos/agents/spec.py @@ -140,9 +140,6 @@ class AgentConfig(ModuleConfig): agent_transport: type[PubSub] = lcm.PickleLCM # type: ignore[type-arg] agent_topic: Any = field(default_factory=lambda: lcm.Topic("/agent")) - # MCP server port (None = disabled) - mcp_port: int | None = None - AnyMessage = Union[SystemMessage, ToolMessage, AIMessage, HumanMessage] diff --git a/dimos/protocol/mcp/__init__.py b/dimos/protocol/mcp/__init__.py index 6592849f66..51432ba0cf 100644 --- a/dimos/protocol/mcp/__init__.py +++ b/dimos/protocol/mcp/__init__.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.protocol.mcp.server import DimensionalMCPServer +from dimos.protocol.mcp.mcp import MCPModule -__all__ = ["DimensionalMCPServer"] +__all__ = ["MCPModule"] diff --git a/dimos/protocol/mcp/mcp.py b/dimos/protocol/mcp/mcp.py new file mode 100644 index 0000000000..1ad1a1e5a7 --- /dev/null +++ b/dimos/protocol/mcp/mcp.py @@ -0,0 +1,108 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import json +from typing import Any +import uuid + +from dimos.core import Module, rpc +from dimos.protocol.skill.coordinator import SkillStateEnum + + +class MCPModule(Module): + rpc_calls = ( + "Agent.list_skills", + "Agent.call_skill", + "Agent.wait_for_skill_updates", + "Agent.get_skill_state", + ) + + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(*args, **kwargs) + self._list_skills = self._call_skill = self._wait_for_updates = self._get_skill_state = None + + @rpc + def start(self) -> None: + super().start() + ( + self._list_skills, + self._call_skill, + self._wait_for_updates, + self._get_skill_state, + ) = self.get_rpc_calls(*self.rpc_calls) + self._start_server() + + def _start_server(self, port: int = 9990) -> None: + async def handle_client(reader, writer) -> None: # type: ignore[no-untyped-def] + while True: + data = await reader.readline() + if not data: + break + request = json.loads(data.decode()) + response = await self._handle_request(request) + writer.write(json.dumps(response).encode() + b"\n") + await writer.drain() + writer.close() + + async def start_server() -> None: + server = await asyncio.start_server(handle_client, "0.0.0.0", port) + await server.serve_forever() + + asyncio.run_coroutine_threadsafe(start_server(), self._loop) + + async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: + method = request.get("method", "") + params = request.get("params", {}) or {} + req_id = request.get("id") + if method == "initialize": + return { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "dimensional", "version": "1.0.0"}, + }, + } + if method == "tools/list": + return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": self._list_skills()}} + if method == "tools/call": + name, args = params.get("name"), params.get("arguments") or {} + call_id = str(uuid.uuid4()) + self._call_skill(call_id, name, args) + try: + await asyncio.wait_for(asyncio.to_thread(self._wait_for_updates, 5.0), timeout=6.0) + except asyncio.TimeoutError: + pass + result = self._get_skill_state(call_id) + if not result: + text = "Skill not found" + elif result["state"] == SkillStateEnum.completed.name: + text = str(result["content"]) if result["content"] else "Completed" + elif result["state"] == SkillStateEnum.error.name: + text = f"Error: {result['content']}" + else: + text = f"Started ({result['state']})" + return { + "jsonrpc": "2.0", + "id": req_id, + "result": {"content": [{"type": "text", "text": text}]}, + } + return { + "jsonrpc": "2.0", + "id": req_id, + "error": {"code": -32601, "message": f"Unknown: {method}"}, + } diff --git a/dimos/protocol/mcp/server.py b/dimos/protocol/mcp/server.py deleted file mode 100644 index ad1db22bc5..0000000000 --- a/dimos/protocol/mcp/server.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""MCP Server exposing DimOS skills as tools to Claude Code and other MCP clients.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Any -import uuid - -from mcp.server import Server -from mcp.server.stdio import stdio_server -from mcp.types import TextContent, Tool - -from dimos.protocol.skill.coordinator import SkillCoordinator -from dimos.utils.logging_config import setup_logger - -if TYPE_CHECKING: - from dimos.protocol.skill.skill import SkillContainer - -logger = setup_logger() - - -class DimensionalMCPServer: - """MCP Server exposing DimOS skills as tools. - - This server bridges the MCP protocol with DimOS's SkillCoordinator, - allowing external AI agents like Claude Code to control robots. - - Example: - >>> from dimos.protocol.mcp import DimensionalMCPServer - >>> server = DimensionalMCPServer() - >>> server.register_skills(robot.skill_library) - >>> await server.run() - """ - - def __init__(self, coordinator: SkillCoordinator | None = None): - """Initialize the MCP server. - - Args: - coordinator: Optional SkillCoordinator to use. If not provided, - a new one will be created. - """ - self.coordinator = coordinator if coordinator is not None else SkillCoordinator() - self.server = Server("dimensional") - self._setup_handlers() - - def _setup_handlers(self) -> None: - """Set up MCP protocol handlers.""" - - @self.server.list_tools() # type: ignore[misc] - async def list_tools() -> list[Tool]: - """List all available skills as MCP tools.""" - tools = [] - for config in self.coordinator.skills().values(): - if config.hide_skill: - continue - - schema = config.schema - func_schema = schema.get("function", {}) - - tools.append( - Tool( - name=config.name, - description=func_schema.get("description", ""), - inputSchema=func_schema.get( - "parameters", {"type": "object", "properties": {}} - ), - ) - ) - return tools - - @self.server.call_tool() # type: ignore[misc] - async def call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]: - """Execute a skill and return the result.""" - call_id = str(uuid.uuid4()) - args = arguments or {} - - logger.info(f"MCP: Calling skill '{name}' with args: {args}") - - # Check if skill exists - skill_config = self.coordinator.get_skill_config(name) - if not skill_config: - return [TextContent(type="text", text=f"Error: Skill '{name}' not found")] - - # Execute the skill - self.coordinator.call_skill(call_id, name, args) - - # Wait for skill completion - try: - await self.coordinator.wait_for_updates(timeout=60) - except Exception as e: - logger.error(f"MCP: Error waiting for skill updates: {e}") - return [TextContent(type="text", text=f"Error: {e}")] - - # Get the result - snapshot = self.coordinator.generate_snapshot() - result = snapshot.get(call_id) - - if result is None: - return [TextContent(type="text", text="Skill executed but no result returned")] - - content = result.content() - if content is None: - content = "Skill completed successfully" - - return [TextContent(type="text", text=str(content))] - - def register_skills(self, container: SkillContainer) -> None: - """Register a skill container with the coordinator. - - Args: - container: SkillContainer with skills to expose via MCP. - """ - self.coordinator.register_skills(container) - - async def run(self) -> None: - """Run the MCP server using stdio transport. - - This starts the coordinator and runs the MCP server, - listening for requests on stdin and responding on stdout. - """ - logger.info("Starting Dimensional MCP Server...") - self.coordinator.start() - - try: - async with stdio_server() as (read_stream, write_stream): - logger.info("MCP Server ready, listening for requests...") - await self.server.run( - read_stream, - write_stream, - self.server.create_initialization_options(), - ) - finally: - logger.info("Shutting down MCP Server...") - self.coordinator.stop() - - def stop(self) -> None: - """Stop the MCP server and coordinator.""" - self.coordinator.stop() diff --git a/dimos/protocol/mcp/test_e2e_unitree.py b/dimos/protocol/mcp/test_e2e_unitree.py deleted file mode 100644 index b921187472..0000000000 --- a/dimos/protocol/mcp/test_e2e_unitree.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""End-to-end tests for MCP server with UnitreeGo2 robot skills. - -These tests verify that: -1. The MCP server can be properly configured with UnitreeGo2 skills -2. Skills are correctly exposed as MCP tools -3. Claude Code (or any MCP client) would be able to import and use the server -4. Tool calls execute correctly with the skill coordinator - -Most tests require LCM infrastructure and are marked with @pytest.mark.lcm. -Run with: pytest -m lcm dimos/protocol/mcp/test_e2e_unitree.py -""" - -from __future__ import annotations - -import subprocess -import sys -import time - -import pytest - -pytestmark = pytest.mark.lcm # Mark all tests in this module as requiring LCM - - -class TestMCPServerE2E: - """End-to-end tests for MCP server with Unitree skills.""" - - def test_mcp_module_importable(self) -> None: - """Test that the MCP server module can be imported.""" - from dimos.protocol.mcp import DimensionalMCPServer - - assert DimensionalMCPServer is not None - - def test_mcp_server_with_unitree_skills(self) -> None: - """Test MCP server setup with Unitree skill container.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - skills = server.coordinator.skills() - - # Verify Unitree skills are registered - assert "relative_move" in skills - assert "wait" in skills - assert "execute_sport_command" in skills - - # Verify skill schemas are generated - move_skill = skills["relative_move"] - schema = move_skill.schema.get("function", {}) - assert schema.get("name") == "relative_move" - assert "parameters" in schema - - def test_mcp_tools_exclude_hidden_skills(self) -> None: - """Test that hidden skills (like current_time) are not exposed as tools.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - skills = server.coordinator.skills() - - # current_time is marked as hide_skill=True - assert "current_time" in skills - assert skills["current_time"].hide_skill is True - - # Verify hidden skills would be filtered in list_tools - visible_skills = [name for name, config in skills.items() if not config.hide_skill] - assert "current_time" not in visible_skills - assert "relative_move" in visible_skills - - def test_mcp_tool_schema_format(self) -> None: - """Test that tool schemas match MCP expected format.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - skills = server.coordinator.skills() - wait_skill = skills["wait"] - - schema = wait_skill.schema - func_schema = schema.get("function", {}) - - # MCP tool format requirements - assert "name" in func_schema - assert "description" in func_schema - assert "parameters" in func_schema - - params = func_schema["parameters"] - assert params.get("type") == "object" - assert "properties" in params - assert "seconds" in params["properties"] - - @pytest.mark.asyncio - async def test_mcp_call_wait_skill(self) -> None: - """Test calling the 'wait' skill through MCP coordinator.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - server.coordinator.start() - - try: - call_id = "e2e-wait-test" - start_time = time.time() - - # Call the wait skill with 0.5 seconds - server.coordinator.call_skill(call_id, "wait", {"seconds": 0.5}) - - # Wait for completion - await server.coordinator.wait_for_updates(timeout=5) - elapsed = time.time() - start_time - - snapshot = server.coordinator.generate_snapshot() - result = snapshot.get(call_id) - - assert result is not None - assert "Wait completed" in str(result.content()) - assert elapsed >= 0.5 # Should have waited at least 0.5s - finally: - server.coordinator.stop() - container.stop() - - @pytest.mark.exclude # Skip in CI - subprocess import triggers LCM init - def test_cli_module_runnable(self) -> None: - """Test that the CLI module can be loaded without errors.""" - # This simulates what Claude Code would do when starting the server - result = subprocess.run( - [sys.executable, "-c", "from dimos.protocol.mcp.cli import main; print('OK')"], - capture_output=True, - text=True, - timeout=10, - ) - assert result.returncode == 0 - assert "OK" in result.stdout - - @pytest.mark.exclude # Skip in CI - subprocess import triggers LCM init - def test_server_can_be_instantiated_from_cli_context(self) -> None: - """Test server instantiation as it would happen from CLI.""" - result = subprocess.run( - [ - sys.executable, - "-c", - """ -from dimos.protocol.mcp.server import DimensionalMCPServer -from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - -server = DimensionalMCPServer() -container = UnitreeSkillContainer() -server.register_skills(container) - -skills = server.coordinator.skills() -print(f"Registered {len(skills)} skills") -print("Skills:", list(skills.keys())) -""", - ], - capture_output=True, - text=True, - timeout=10, - ) - assert result.returncode == 0, f"Failed with stderr: {result.stderr}" - assert "Registered" in result.stdout - assert "relative_move" in result.stdout - - -class TestMCPProtocolCompatibility: - """Tests to verify MCP protocol compatibility.""" - - def test_tool_inputschema_is_valid_json_schema(self) -> None: - """Test that inputSchema is valid JSON Schema format.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - for name, config in server.coordinator.skills().items(): - if config.hide_skill: - continue - - schema = config.schema.get("function", {}) - params = schema.get("parameters", {}) - - # JSON Schema requirements - assert params.get("type") == "object", f"Skill {name} missing type: object" - assert "properties" in params, f"Skill {name} missing properties" - - def test_mcp_server_protocol_version(self) -> None: - """Test that server reports correct MCP protocol version.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - - server = DimensionalMCPServer() - - # Server should be named "dimensional" - assert server.server.name == "dimensional" - - @pytest.mark.asyncio - async def test_coordinator_state_tracking(self) -> None: - """Test that skill states are properly tracked for MCP responses.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.protocol.skill.coordinator import SkillStateEnum - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - server.coordinator.start() - - try: - call_id = "state-tracking-test" - server.coordinator.call_skill(call_id, "wait", {"seconds": 0.1}) - - # Initially should be pending or running - initial_state = server.coordinator._skill_state.get(call_id) - assert initial_state is not None - assert initial_state.state in [SkillStateEnum.pending, SkillStateEnum.running] - - # Wait for completion - await server.coordinator.wait_for_updates(timeout=5) - snapshot = server.coordinator.generate_snapshot() - - result = snapshot.get(call_id) - assert result is not None - assert result.state == SkillStateEnum.completed - finally: - server.coordinator.stop() - container.stop() - - -class TestClaudeCodeIntegration: - """Tests simulating Claude Code MCP client behavior.""" - - def test_mcp_config_format(self) -> None: - """Test that the expected Claude Code config format works.""" - # Verify the module path is valid (this is what Claude Code would call) - result = subprocess.run( - [sys.executable, "-c", "import dimos.protocol.mcp.cli"], - capture_output=True, - text=True, - timeout=10, - ) - assert result.returncode == 0, f"CLI import failed: {result.stderr}" - - def test_tool_call_argument_formats(self) -> None: - """Test that various argument formats work with skill calls.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - # Test that get_skill_config works (used in call_tool handler) - config = server.coordinator.get_skill_config("wait") - assert config is not None - assert config.name == "wait" - - # Test with missing skill - missing = server.coordinator.get_skill_config("nonexistent_skill") - assert missing is None - - def test_skill_descriptions_for_llm(self) -> None: - """Test that skill descriptions are suitable for LLM consumption.""" - from dimos.protocol.mcp.server import DimensionalMCPServer - from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer - - server = DimensionalMCPServer() - container = UnitreeSkillContainer() - server.register_skills(container) - - skills = server.coordinator.skills() - - # Check that key skills have meaningful descriptions - relative_move = skills["relative_move"] - desc = relative_move.schema.get("function", {}).get("description", "") - assert len(desc) > 20, "relative_move should have a detailed description" - assert "move" in desc.lower() or "robot" in desc.lower() - - # Note: execute_sport_command has its docstring set dynamically after - # class definition, so the @skill decorator schema doesn't capture it. - # This is a known limitation of the schema generation. - execute_cmd = skills["execute_sport_command"] - assert execute_cmd is not None, "execute_sport_command skill should exist" diff --git a/dimos/protocol/mcp/test_server.py b/dimos/protocol/mcp/test_server.py deleted file mode 100644 index 159d4c4f76..0000000000 --- a/dimos/protocol/mcp/test_server.py +++ /dev/null @@ -1,268 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for the Dimensional MCP Server. - -These tests require LCM infrastructure and are marked with @pytest.mark.lcm. -Run with: pytest -m lcm dimos/protocol/mcp/test_server.py -""" - -from __future__ import annotations - -import time -from typing import TYPE_CHECKING - -import pytest - -from dimos.core import Module, rpc -from dimos.protocol.mcp.server import DimensionalMCPServer -from dimos.protocol.skill.coordinator import SkillCoordinator -from dimos.protocol.skill.skill import skill - -if TYPE_CHECKING: - from collections.abc import Generator - -pytestmark = pytest.mark.lcm # Mark all tests in this module as requiring LCM - - -class MockSkillContainer(Module): - """Simple skill container for testing.""" - - @rpc - def start(self) -> None: - super().start() - - @rpc - def stop(self) -> None: - super().stop() - - @skill() - def add(self, x: int, y: int) -> int: - """Add two numbers together.""" - return x + y - - @skill() - def greet(self, name: str) -> str: - """Greet a person by name.""" - return f"Hello, {name}!" - - @skill() - def slow_task(self, delay: float) -> str: - """A slow task that takes time to complete.""" - time.sleep(delay) - return f"Completed after {delay}s" - - @skill(hide_skill=True) - def hidden_skill(self) -> str: - """This skill should not be visible via MCP.""" - return "Hidden" - - -@pytest.fixture -def mcp_server() -> Generator[DimensionalMCPServer, None, None]: - """Fixture that provides a properly cleaned up MCP server.""" - server = DimensionalMCPServer() - yield server - server.stop() - - -@pytest.fixture -def mcp_server_with_skills() -> Generator[ - tuple[DimensionalMCPServer, MockSkillContainer], None, None -]: - """Fixture that provides an MCP server with mock skills registered.""" - server = DimensionalMCPServer() - container = MockSkillContainer() - server.register_skills(container) - yield server, container - server.stop() - container.stop() - - -class TestDimensionalMCPServer: - """Test suite for DimensionalMCPServer.""" - - def test_server_initialization(self, mcp_server: DimensionalMCPServer) -> None: - """Test that the server initializes correctly.""" - assert mcp_server.coordinator is not None - assert mcp_server.server is not None - - def test_server_with_existing_coordinator(self) -> None: - """Test server initialization with an existing coordinator.""" - coordinator = SkillCoordinator() - server = DimensionalMCPServer(coordinator=coordinator) - try: - assert server.coordinator is coordinator - finally: - server.stop() - - def test_register_skills( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test that skills can be registered with the server.""" - server, _ = mcp_server_with_skills - skills = server.coordinator.skills() - assert "add" in skills - assert "greet" in skills - - @pytest.mark.asyncio - async def test_list_tools( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test that tools are listed correctly via MCP protocol.""" - server, _ = mcp_server_with_skills - - # Get the list_tools handler directly from the server - # The handler is registered as a decorator, we need to call it via server internals - tools = [] - for config in server.coordinator.skills().values(): - if not config.hide_skill: - tools.append( - { - "name": config.name, - "description": config.schema.get("function", {}).get("description", ""), - } - ) - - # Verify tools are listed (excluding hidden ones) - tool_names = [t["name"] for t in tools] - assert "add" in tool_names - assert "greet" in tool_names - assert "slow_task" in tool_names - assert "hidden_skill" not in tool_names # Hidden skills should not be listed - - @pytest.mark.asyncio - async def test_call_tool_add( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test calling the 'add' skill via MCP.""" - server, _container = mcp_server_with_skills - server.coordinator.start() - - try: - call_id = "test-add-1" - server.coordinator.call_skill(call_id, "add", {"x": 5, "y": 3}) - - # Wait for the result - await server.coordinator.wait_for_updates(timeout=5) - snapshot = server.coordinator.generate_snapshot() - - result = snapshot.get(call_id) - assert result is not None - assert result.content() == 8 - finally: - server.coordinator.stop() - - @pytest.mark.asyncio - async def test_call_tool_greet( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test calling the 'greet' skill via MCP.""" - server, _container = mcp_server_with_skills - server.coordinator.start() - - try: - call_id = "test-greet-1" - server.coordinator.call_skill(call_id, "greet", {"name": "World"}) - - await server.coordinator.wait_for_updates(timeout=5) - snapshot = server.coordinator.generate_snapshot() - - result = snapshot.get(call_id) - assert result is not None - assert result.content() == "Hello, World!" - finally: - server.coordinator.stop() - - @pytest.mark.asyncio - async def test_tool_schema_generation( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test that tool schemas are generated correctly.""" - server, _ = mcp_server_with_skills - - skills = server.coordinator.skills() - add_skill = skills.get("add") - - assert add_skill is not None - schema = add_skill.schema - func_schema = schema.get("function", {}) - - # Verify schema has required fields - assert func_schema.get("name") == "add" - assert "description" in func_schema - assert "parameters" in func_schema - - # Verify parameters schema - params = func_schema.get("parameters", {}) - assert params.get("type") == "object" - assert "properties" in params - assert "x" in params["properties"] - assert "y" in params["properties"] - - def test_hidden_skills_not_exposed( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test that hidden skills are not exposed via MCP.""" - server, _ = mcp_server_with_skills - - skills = server.coordinator.skills() - - # hidden_skill should be in the coordinator but marked as hidden - assert "hidden_skill" in skills - assert skills["hidden_skill"].hide_skill is True - - @pytest.mark.asyncio - async def test_multiple_concurrent_calls( - self, mcp_server_with_skills: tuple[DimensionalMCPServer, MockSkillContainer] - ) -> None: - """Test multiple concurrent skill calls.""" - server, _container = mcp_server_with_skills - server.coordinator.start() - - try: - # Start multiple calls - server.coordinator.call_skill("call-1", "add", {"x": 1, "y": 1}) - server.coordinator.call_skill("call-2", "add", {"x": 2, "y": 2}) - server.coordinator.call_skill("call-3", "add", {"x": 3, "y": 3}) - - # Wait for all results - results: dict[str, int] = {} - timeout = 10 - start = time.time() - - while len(results) < 3 and (time.time() - start) < timeout: - await server.coordinator.wait_for_updates(timeout=1) - snapshot = server.coordinator.generate_snapshot() - for call_id, state in snapshot.items(): - if state.content() is not None: - results[call_id] = state.content() # type: ignore[assignment] - - assert results.get("call-1") == 2 - assert results.get("call-2") == 4 - assert results.get("call-3") == 6 - finally: - server.coordinator.stop() - - -@pytest.mark.asyncio -async def test_server_importable() -> None: - """Test that the MCP server can be imported correctly.""" - from dimos.protocol.mcp import DimensionalMCPServer - - server = DimensionalMCPServer() - try: - assert server is not None - finally: - server.stop() diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index a781f756d8..d8d06bb59b 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -43,6 +43,7 @@ ) from dimos.perception.detection.moduleDB import ObjectDBModule, detectionDB_module from dimos.perception.spatial_perception import spatial_memory +from dimos.protocol.mcp.mcp import MCPModule from dimos.robot.foxglove_bridge import foxglove_bridge from dimos.robot.unitree.connection.go2 import GO2Connection, go2_connection from dimos.robot.unitree_webrtc.unitree_skill_container import unitree_skills @@ -161,7 +162,14 @@ agentic = autoconnect( spatial, - llm_agent(mcp_port=9990), + llm_agent(), + _common_agentic, +) + +agentic_mcp = autoconnect( + spatial, + llm_agent(), + MCPModule.blueprint(), _common_agentic, ) From 534d6a46106794330160de7bf4c352d752ef8272 Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 05:53:12 -0800 Subject: [PATCH 08/12] Remove Agent RPC calls - MCP integrated with SkillCoordinator directly --- dimos/agents/agent.py | 29 -------- dimos/protocol/mcp/mcp.py | 61 +++++++++------- dimos/protocol/mcp/test_mcp_module.py | 73 +++++++++++++++++++ dimos/robot/all_blueprints.py | 1 + .../unitree_webrtc/unitree_go2_blueprints.py | 4 +- 5 files changed, 110 insertions(+), 58 deletions(-) create mode 100644 dimos/protocol/mcp/test_mcp_module.py diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 00c9aa8889..bf5ded4f00 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -228,35 +228,6 @@ def stop(self) -> None: def clear_history(self) -> None: self._history.clear() - @rpc - def list_skills(self) -> list[dict[str, Any]]: - return [ - { - "name": c.name, - "description": c.schema.get("function", {}).get("description", ""), - "inputSchema": c.schema.get("function", {}).get("parameters", {}), - } - for c in self.coordinator.skills().values() - if not c.hide_skill - ] - - @rpc - def call_skill(self, call_id: str, name: str, args: dict[str, Any]) -> None: - self.coordinator.call_skill(call_id, name, args) - - @rpc - def wait_for_skill_updates(self, timeout: float | None = None) -> bool: - return asyncio.run_coroutine_threadsafe( - self.coordinator.wait_for_updates(timeout=timeout), self._loop - ).result() - - @rpc - def get_skill_state(self, call_id: str) -> dict[str, Any] | None: - state: SkillState | None = self.coordinator._skill_state.get(call_id) - if state is None: - return None - return {"state": state.state.name, "content": state.content()} - def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: for msg in msgs: self.publish(msg) # type: ignore[arg-type] diff --git a/dimos/protocol/mcp/mcp.py b/dimos/protocol/mcp/mcp.py index 1ad1a1e5a7..f69cb98a8c 100644 --- a/dimos/protocol/mcp/mcp.py +++ b/dimos/protocol/mcp/mcp.py @@ -15,36 +15,36 @@ import asyncio import json -from typing import Any +from typing import TYPE_CHECKING, Any import uuid from dimos.core import Module, rpc -from dimos.protocol.skill.coordinator import SkillStateEnum +from dimos.protocol.skill.coordinator import SkillCoordinator, SkillStateEnum +if TYPE_CHECKING: + from dimos.protocol.skill.coordinator import SkillState -class MCPModule(Module): - rpc_calls = ( - "Agent.list_skills", - "Agent.call_skill", - "Agent.wait_for_skill_updates", - "Agent.get_skill_state", - ) +class MCPModule(Module): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) - self._list_skills = self._call_skill = self._wait_for_updates = self._get_skill_state = None + self.coordinator = SkillCoordinator() @rpc def start(self) -> None: super().start() - ( - self._list_skills, - self._call_skill, - self._wait_for_updates, - self._get_skill_state, - ) = self.get_rpc_calls(*self.rpc_calls) + self.coordinator.start() self._start_server() + @rpc + def stop(self) -> None: + self.coordinator.stop() + super().stop() + + @rpc + def register_skills(self, container) -> None: # type: ignore[no-untyped-def] + self.coordinator.register_skills(container) + def _start_server(self, port: int = 9990) -> None: async def handle_client(reader, writer) -> None: # type: ignore[no-untyped-def] while True: @@ -78,24 +78,33 @@ async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: }, } if method == "tools/list": - return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": self._list_skills()}} + tools = [ + { + "name": c.name, + "description": c.schema.get("function", {}).get("description", ""), + "inputSchema": c.schema.get("function", {}).get("parameters", {}), + } + for c in self.coordinator.skills().values() + if not c.hide_skill + ] + return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}} if method == "tools/call": name, args = params.get("name"), params.get("arguments") or {} call_id = str(uuid.uuid4()) - self._call_skill(call_id, name, args) + self.coordinator.call_skill(call_id, name, args) + result: SkillState | None = self.coordinator._skill_state.get(call_id) try: - await asyncio.wait_for(asyncio.to_thread(self._wait_for_updates, 5.0), timeout=6.0) + await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) except asyncio.TimeoutError: pass - result = self._get_skill_state(call_id) - if not result: + if result is None: text = "Skill not found" - elif result["state"] == SkillStateEnum.completed.name: - text = str(result["content"]) if result["content"] else "Completed" - elif result["state"] == SkillStateEnum.error.name: - text = f"Error: {result['content']}" + elif result.state == SkillStateEnum.completed: + text = str(result.content()) if result.content() else "Completed" + elif result.state == SkillStateEnum.error: + text = f"Error: {result.content()}" else: - text = f"Started ({result['state']})" + text = f"Started ({result.state.name})" return { "jsonrpc": "2.0", "id": req_id, diff --git a/dimos/protocol/mcp/test_mcp_module.py b/dimos/protocol/mcp/test_mcp_module.py new file mode 100644 index 0000000000..971e22c11d --- /dev/null +++ b/dimos/protocol/mcp/test_mcp_module.py @@ -0,0 +1,73 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from dimos.protocol.mcp.mcp import MCPModule +from dimos.protocol.skill.coordinator import SkillStateEnum + + +def test_unitree_blueprint_has_mcp() -> None: + contents = Path("dimos/robot/unitree_webrtc/unitree_go2_blueprints.py").read_text() + assert "agentic_mcp" in contents + assert "MCPModule.blueprint()" in contents + + +def test_mcp_module_request_flow() -> None: + class DummySkill: + def __init__(self) -> None: + self.name = "add" + self.hide_skill = False + self.schema = {"function": {"description": "", "parameters": {"type": "object"}}} + + class DummyState: + def __init__(self, content: int) -> None: + self.state = SkillStateEnum.completed + self._content = content + + def content(self) -> int: + return self._content + + class DummyCoordinator: + def __init__(self) -> None: + self._skill_state: dict[str, DummyState] = {} + + def skills(self) -> dict[str, DummySkill]: + return {"add": DummySkill()} + + def call_skill(self, call_id: str, _name: str, args: dict[str, int]) -> None: + self._skill_state[call_id] = DummyState(args["x"] + args["y"]) + + async def wait_for_updates(self) -> bool: + return True + + mcp = MCPModule.__new__(MCPModule) + mcp.coordinator = DummyCoordinator() + + response = asyncio.run(mcp._handle_request({"method": "tools/list", "id": 1})) + assert response["result"]["tools"][0]["name"] == "add" + + response = asyncio.run( + mcp._handle_request( + { + "method": "tools/call", + "id": 2, + "params": {"name": "add", "arguments": {"x": 2, "y": 3}}, + } + ) + ) + assert response["result"]["content"][0]["text"] == "5" diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 7dbbc9c67a..9b118cbd60 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -22,6 +22,7 @@ "unitree-go2-detection": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:detection", "unitree-go2-spatial": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:spatial", "unitree-go2-agentic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic", + "unitree-go2-agentic-mcp": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_mcp", "unitree-go2-agentic-ollama": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_ollama", "unitree-go2-agentic-huggingface": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_huggingface", "unitree-g1": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:standard", diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index d8d06bb59b..46d951650c 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -167,10 +167,8 @@ ) agentic_mcp = autoconnect( - spatial, - llm_agent(), + agentic, MCPModule.blueprint(), - _common_agentic, ) agentic_ollama = autoconnect( From 5ace4c065126f12b34dff692507df453cc920c82 Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 06:25:46 -0800 Subject: [PATCH 09/12] Fully working new MCP implementation --- dimos/core/skill_module.py | 5 +++ dimos/protocol/mcp/test_mcp_module.py | 60 +++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/dimos/core/skill_module.py b/dimos/core/skill_module.py index 212d7bbb99..545f904e60 100644 --- a/dimos/core/skill_module.py +++ b/dimos/core/skill_module.py @@ -25,6 +25,11 @@ def set_LlmAgent_register_skills(self, callable: RpcCall) -> None: callable.set_rpc(self.rpc) # type: ignore[arg-type] callable(RPCClient(self, self.__class__)) + @rpc + def set_MCPModule_register_skills(self, callable: RpcCall) -> None: + callable.set_rpc(self.rpc) # type: ignore[arg-type] + callable(RPCClient(self, self.__class__)) + def __getstate__(self) -> None: pass diff --git a/dimos/protocol/mcp/test_mcp_module.py b/dimos/protocol/mcp/test_mcp_module.py index 971e22c11d..919178b66b 100644 --- a/dimos/protocol/mcp/test_mcp_module.py +++ b/dimos/protocol/mcp/test_mcp_module.py @@ -15,7 +15,15 @@ from __future__ import annotations import asyncio +import json +import os from pathlib import Path +import socket +import subprocess +import sys +import threading + +import pytest from dimos.protocol.mcp.mcp import MCPModule from dimos.protocol.skill.coordinator import SkillStateEnum @@ -71,3 +79,55 @@ async def wait_for_updates(self) -> bool: ) ) assert response["result"]["content"][0]["text"] == "5" + + +def test_mcp_module_handles_hidden_and_errors() -> None: + class DummySkill: + def __init__(self, name: str, hide_skill: bool) -> None: + self.name = name + self.hide_skill = hide_skill + self.schema = {"function": {"description": "", "parameters": {"type": "object"}}} + + class DummyState: + def __init__(self, state: SkillStateEnum, content: str | None) -> None: + self.state = state + self._content = content + + def content(self) -> str | None: + return self._content + + class DummyCoordinator: + def __init__(self) -> None: + self._skill_state: dict[str, DummyState] = {} + self._skills = { + "visible": DummySkill("visible", False), + "hidden": DummySkill("hidden", True), + "fail": DummySkill("fail", False), + } + + def skills(self) -> dict[str, DummySkill]: + return self._skills + + def call_skill(self, call_id: str, name: str, _args: dict[str, int]) -> None: + if name == "fail": + self._skill_state[call_id] = DummyState(SkillStateEnum.error, "boom") + elif name in self._skills: + self._skill_state[call_id] = DummyState(SkillStateEnum.running, None) + + async def wait_for_updates(self) -> bool: + return True + + mcp = MCPModule.__new__(MCPModule) + mcp.coordinator = DummyCoordinator() + + response = asyncio.run(mcp._handle_request({"method": "tools/list", "id": 1})) + tool_names = {tool["name"] for tool in response["result"]["tools"]} + assert "visible" in tool_names + assert "hidden" not in tool_names + + response = asyncio.run( + mcp._handle_request( + {"method": "tools/call", "id": 2, "params": {"name": "fail", "arguments": {}}} + ) + ) + assert "Error:" in response["result"]["content"][0]["text"] From 45d30f7c85b9918aad97448c3674e5aad47b265a Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 06:49:00 -0800 Subject: [PATCH 10/12] Added end to end MPC test and async close to server for clean shutdown --- dimos/protocol/mcp/mcp.py | 16 +++++- dimos/protocol/mcp/test_mcp_module.py | 75 +++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/dimos/protocol/mcp/mcp.py b/dimos/protocol/mcp/mcp.py index f69cb98a8c..6fa0b9aa43 100644 --- a/dimos/protocol/mcp/mcp.py +++ b/dimos/protocol/mcp/mcp.py @@ -29,6 +29,8 @@ class MCPModule(Module): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) self.coordinator = SkillCoordinator() + self._server: asyncio.AbstractServer | None = None + self._server_future: asyncio.Future | None = None @rpc def start(self) -> None: @@ -38,6 +40,10 @@ def start(self) -> None: @rpc def stop(self) -> None: + if self._server: + asyncio.run_coroutine_threadsafe(self._stop_server(), self._loop).result() + if self._server_future and not self._server_future.done(): + self._server_future.cancel() self.coordinator.stop() super().stop() @@ -45,6 +51,13 @@ def stop(self) -> None: def register_skills(self, container) -> None: # type: ignore[no-untyped-def] self.coordinator.register_skills(container) + async def _stop_server(self) -> None: + if not self._server: + return + self._server.close() + await self._server.wait_closed() + self._server = None + def _start_server(self, port: int = 9990) -> None: async def handle_client(reader, writer) -> None: # type: ignore[no-untyped-def] while True: @@ -59,9 +72,10 @@ async def handle_client(reader, writer) -> None: # type: ignore[no-untyped-def] async def start_server() -> None: server = await asyncio.start_server(handle_client, "0.0.0.0", port) + self._server = server await server.serve_forever() - asyncio.run_coroutine_threadsafe(start_server(), self._loop) + self._server_future = asyncio.run_coroutine_threadsafe(start_server(), self._loop) async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: method = request.get("method", "") diff --git a/dimos/protocol/mcp/test_mcp_module.py b/dimos/protocol/mcp/test_mcp_module.py index 919178b66b..1deb5b9057 100644 --- a/dimos/protocol/mcp/test_mcp_module.py +++ b/dimos/protocol/mcp/test_mcp_module.py @@ -27,6 +27,7 @@ from dimos.protocol.mcp.mcp import MCPModule from dimos.protocol.skill.coordinator import SkillStateEnum +from dimos.protocol.skill.skill import SkillContainer, skill def test_unitree_blueprint_has_mcp() -> None: @@ -131,3 +132,77 @@ async def wait_for_updates(self) -> bool: ) ) assert "Error:" in response["result"]["content"][0]["text"] + + +def test_mcp_end_to_end_lcm_bridge() -> None: + try: + import lcm # type: ignore[import-untyped] + + lcm.LCM() + except Exception as exc: + if os.environ.get("CI"): + pytest.fail(f"LCM unavailable for MCP end-to-end test: {exc}") + pytest.skip("LCM unavailable for MCP end-to-end test.") + + try: + socket.socket(socket.AF_INET, socket.SOCK_STREAM).close() + except PermissionError: + if os.environ.get("CI"): + pytest.fail("Socket creation not permitted in CI environment.") + pytest.skip("Socket creation not permitted in this environment.") + + class TestSkills(SkillContainer): + @skill() + def add(self, x: int, y: int) -> int: + return x + y + + mcp = MCPModule() + mcp.start() + + try: + mcp.register_skills(TestSkills()) + + env = {"MCP_HOST": "127.0.0.1", "MCP_PORT": "9990"} + proc = subprocess.Popen( + [sys.executable, "-m", "dimos.protocol.mcp"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={**os.environ, **env}, + text=True, + ) + try: + request = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} + proc.stdin.write(json.dumps(request) + "\n") + proc.stdin.flush() + stdout = proc.stdout.readline() + assert '"tools"' in stdout + assert '"add"' in stdout + finally: + proc.terminate() + proc.wait(timeout=5) + + proc = subprocess.Popen( + [sys.executable, "-m", "dimos.protocol.mcp"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={**os.environ, **env}, + text=True, + ) + try: + request = { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {"name": "add", "arguments": {"x": 2, "y": 3}}, + } + proc.stdin.write(json.dumps(request) + "\n") + proc.stdin.flush() + stdout = proc.stdout.readline() + assert "5" in stdout + finally: + proc.terminate() + proc.wait(timeout=5) + finally: + mcp.stop() From 1b9b510187a8166f40b29c28f15ca1503bfd4aa8 Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 13:17:20 -0800 Subject: [PATCH 11/12] Mypy fixes to mcp --- dimos/protocol/mcp/mcp.py | 63 +++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/dimos/protocol/mcp/mcp.py b/dimos/protocol/mcp/mcp.py index 6fa0b9aa43..6031191734 100644 --- a/dimos/protocol/mcp/mcp.py +++ b/dimos/protocol/mcp/mcp.py @@ -15,22 +15,19 @@ import asyncio import json -from typing import TYPE_CHECKING, Any +from typing import Any import uuid from dimos.core import Module, rpc from dimos.protocol.skill.coordinator import SkillCoordinator, SkillStateEnum -if TYPE_CHECKING: - from dimos.protocol.skill.coordinator import SkillState - class MCPModule(Module): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) self.coordinator = SkillCoordinator() self._server: asyncio.AbstractServer | None = None - self._server_future: asyncio.Future | None = None + self._server_future: object | None = None @rpc def start(self) -> None: @@ -41,8 +38,12 @@ def start(self) -> None: @rpc def stop(self) -> None: if self._server: - asyncio.run_coroutine_threadsafe(self._stop_server(), self._loop).result() - if self._server_future and not self._server_future.done(): + self._server.close() + loop = self._loop + assert loop is not None + asyncio.run_coroutine_threadsafe(self._server.wait_closed(), loop).result() + self._server = None + if self._server_future and hasattr(self._server_future, "cancel"): self._server_future.cancel() self.coordinator.stop() super().stop() @@ -51,46 +52,35 @@ def stop(self) -> None: def register_skills(self, container) -> None: # type: ignore[no-untyped-def] self.coordinator.register_skills(container) - async def _stop_server(self) -> None: - if not self._server: - return - self._server.close() - await self._server.wait_closed() - self._server = None - def _start_server(self, port: int = 9990) -> None: async def handle_client(reader, writer) -> None: # type: ignore[no-untyped-def] while True: - data = await reader.readline() - if not data: + if not (data := await reader.readline()): break - request = json.loads(data.decode()) - response = await self._handle_request(request) + response = await self._handle_request(json.loads(data.decode())) writer.write(json.dumps(response).encode() + b"\n") await writer.drain() writer.close() async def start_server() -> None: - server = await asyncio.start_server(handle_client, "0.0.0.0", port) - self._server = server - await server.serve_forever() + self._server = await asyncio.start_server(handle_client, "0.0.0.0", port) + await self._server.serve_forever() - self._server_future = asyncio.run_coroutine_threadsafe(start_server(), self._loop) + loop = self._loop + assert loop is not None + self._server_future = asyncio.run_coroutine_threadsafe(start_server(), loop) async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: method = request.get("method", "") params = request.get("params", {}) or {} req_id = request.get("id") if method == "initialize": - return { - "jsonrpc": "2.0", - "id": req_id, - "result": { - "protocolVersion": "2024-11-05", - "capabilities": {"tools": {}}, - "serverInfo": {"name": "dimensional", "version": "1.0.0"}, - }, + result = { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "dimensional", "version": "1.0.0"}, } + return {"jsonrpc": "2.0", "id": req_id, "result": result} if method == "tools/list": tools = [ { @@ -103,10 +93,19 @@ async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: ] return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}} if method == "tools/call": - name, args = params.get("name"), params.get("arguments") or {} + name = params.get("name") + args = params.get("arguments") or {} + if not isinstance(name, str): + return { + "jsonrpc": "2.0", + "id": req_id, + "error": {"code": -32602, "message": "Missing or invalid tool name"}, + } + if not isinstance(args, dict): + args = {} call_id = str(uuid.uuid4()) self.coordinator.call_skill(call_id, name, args) - result: SkillState | None = self.coordinator._skill_state.get(call_id) + result = self.coordinator._skill_state.get(call_id) try: await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) except asyncio.TimeoutError: From 1d02c93df94776797c5203d0c613bcf2add3f633 Mon Sep 17 00:00:00 2001 From: stash Date: Tue, 6 Jan 2026 15:35:05 -0800 Subject: [PATCH 12/12] fix mypy again --- dimos/protocol/mcp/mcp.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dimos/protocol/mcp/mcp.py b/dimos/protocol/mcp/mcp.py index 6031191734..f7427cd613 100644 --- a/dimos/protocol/mcp/mcp.py +++ b/dimos/protocol/mcp/mcp.py @@ -15,12 +15,15 @@ import asyncio import json -from typing import Any +from typing import TYPE_CHECKING, Any import uuid from dimos.core import Module, rpc from dimos.protocol.skill.coordinator import SkillCoordinator, SkillStateEnum +if TYPE_CHECKING: + from dimos.protocol.skill.coordinator import SkillState + class MCPModule(Module): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] @@ -75,12 +78,12 @@ async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: params = request.get("params", {}) or {} req_id = request.get("id") if method == "initialize": - result = { + init_result = { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "dimensional", "version": "1.0.0"}, } - return {"jsonrpc": "2.0", "id": req_id, "result": result} + return {"jsonrpc": "2.0", "id": req_id, "result": init_result} if method == "tools/list": tools = [ { @@ -105,7 +108,7 @@ async def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: args = {} call_id = str(uuid.uuid4()) self.coordinator.call_skill(call_id, name, args) - result = self.coordinator._skill_state.get(call_id) + result: SkillState | None = self.coordinator._skill_state.get(call_id) try: await asyncio.wait_for(self.coordinator.wait_for_updates(), timeout=5.0) except asyncio.TimeoutError: