diff --git a/dimos/agents/mcp/mcp_adapter.py b/dimos/agents/mcp/mcp_adapter.py new file mode 100644 index 0000000000..9b8cc5c4b9 --- /dev/null +++ b/dimos/agents/mcp/mcp_adapter.py @@ -0,0 +1,190 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Lightweight MCP JSON-RPC client adapter. + +``McpAdapter`` provides a typed Python interface to a running MCP server. +It is used by: + +* The ``dimos mcp`` CLI commands +* Integration / e2e tests +* Any code that needs to talk to a local MCP server + +Usage:: + + adapter = McpAdapter("http://localhost:9990/mcp") + adapter.wait_for_ready(timeout=10) + tools = adapter.list_tools() + result = adapter.call_tool("echo", {"message": "hi"}) +""" + +from __future__ import annotations + +import time +from typing import Any +import uuid + +import requests + +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +DEFAULT_TIMEOUT = 30 + + +class McpError(Exception): + """Raised when the MCP server returns a JSON-RPC error.""" + + def __init__(self, message: str, code: int | None = None) -> None: + self.code = code + super().__init__(message) + + +class McpAdapter: + """Thin JSON-RPC client for a running MCP server.""" + + def __init__(self, url: str | None = None, timeout: int = DEFAULT_TIMEOUT) -> None: + if url is None: + from dimos.core.global_config import global_config + + url = f"http://localhost:{global_config.mcp_port}/mcp" + self.url = url + self.timeout = timeout + + # ------------------------------------------------------------------ + # Low-level JSON-RPC + # ------------------------------------------------------------------ + + def call(self, method: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + """Send a JSON-RPC request and return the parsed response. + + Raises ``requests.ConnectionError`` if the server is unreachable. + """ + payload: dict[str, Any] = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": method, + } + if params: + payload["params"] = params + + resp = requests.post(self.url, json=payload, timeout=self.timeout) + try: + resp.raise_for_status() + except requests.HTTPError as e: + raise McpError(f"HTTP {resp.status_code}: {e}") from e + return resp.json() # type: ignore[no-any-return] + + # ------------------------------------------------------------------ + # MCP standard methods + # ------------------------------------------------------------------ + + def initialize(self) -> dict[str, Any]: + """Send ``initialize`` and return server info.""" + return self.call("initialize") + + def list_tools(self) -> list[dict[str, Any]]: + """Return the list of available tools.""" + result = self._unwrap(self.call("tools/list")) + return result.get("tools", []) # type: ignore[no-any-return] + + def call_tool(self, name: str, arguments: dict[str, Any] | None = None) -> dict[str, Any]: + """Call a tool by name and return the result dict.""" + return self._unwrap(self.call("tools/call", {"name": name, "arguments": arguments or {}})) + + def call_tool_text(self, name: str, arguments: dict[str, Any] | None = None) -> str: + """Call a tool and return just the first text content item.""" + result = self.call_tool(name, arguments) + content = result.get("content", []) + if not content: + return "" + return content[0].get("text", str(content[0])) # type: ignore[no-any-return] + + # ------------------------------------------------------------------ + # Readiness probes + # ------------------------------------------------------------------ + + def wait_for_ready(self, timeout: float = 10.0, interval: float = 0.5) -> bool: + """Poll until the MCP server responds, or return False on timeout.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + resp = requests.post( + self.url, + json={"jsonrpc": "2.0", "id": "probe", "method": "initialize"}, + timeout=2, + ) + if resp.status_code == 200: + return True + except requests.ConnectionError: + pass + time.sleep(interval) + return False + + def wait_for_down(self, timeout: float = 10.0, interval: float = 0.5) -> bool: + """Poll until the MCP server stops responding.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + requests.post( + self.url, + json={"jsonrpc": "2.0", "id": "probe", "method": "initialize"}, + timeout=1, + ) + except (requests.ConnectionError, requests.ReadTimeout): + return True + time.sleep(interval) + return False + + # ------------------------------------------------------------------ + # Class methods for discovery + # ------------------------------------------------------------------ + + @classmethod + def from_run_entry(cls, entry: Any | None = None, timeout: int = DEFAULT_TIMEOUT) -> McpAdapter: + """Create an adapter from a RunEntry, or discover the latest one. + + Falls back to the default URL if no entry is found. + """ + if entry is None: + from dimos.core.run_registry import list_runs + + runs = list_runs(alive_only=True) + entry = runs[0] if runs else None + + if entry is not None and hasattr(entry, "mcp_url") and entry.mcp_url: + return cls(url=entry.mcp_url, timeout=timeout) + + # Fall back to default URL using GlobalConfig port + from dimos.core.global_config import global_config + + url = f"http://localhost:{global_config.mcp_port}/mcp" + return cls(url=url, timeout=timeout) + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + @staticmethod + def _unwrap(response: dict[str, Any]) -> dict[str, Any]: + """Extract the ``result`` from a JSON-RPC response, raising on error.""" + if "error" in response: + err = response["error"] + msg = err.get("message", str(err)) if isinstance(err, dict) else str(err) + raise McpError(msg, code=err.get("code") if isinstance(err, dict) else None) + return response.get("result", {}) # type: ignore[no-any-return] + + def __repr__(self) -> str: + return f"McpAdapter(url={self.url!r})" diff --git a/dimos/agents/mcp/mcp_server.py b/dimos/agents/mcp/mcp_server.py index 27b5393f84..e63057b528 100644 --- a/dimos/agents/mcp/mcp_server.py +++ b/dimos/agents/mcp/mcp_server.py @@ -15,25 +15,25 @@ import asyncio import json +import os from typing import TYPE_CHECKING, Any from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse +from starlette.requests import Request # noqa: TC002 from starlette.responses import Response import uvicorn +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.rpc_client import RpcCall, RPCClient from dimos.utils.logging_config import setup_logger logger = setup_logger() -from starlette.requests import Request # noqa: TC002 - -from dimos.core.core import rpc -from dimos.core.module import Module -from dimos.core.rpc_client import RpcCall, RPCClient - if TYPE_CHECKING: import concurrent.futures @@ -51,6 +51,11 @@ app.state.rpc_calls = {} +# --------------------------------------------------------------------------- +# JSON-RPC helpers +# --------------------------------------------------------------------------- + + def _jsonrpc_result(req_id: Any, result: Any) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": req_id, "result": result} @@ -63,6 +68,11 @@ def _jsonrpc_error(req_id: Any, code: int, message: str) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}} +# --------------------------------------------------------------------------- +# JSON-RPC handlers (standard MCP protocol only) +# --------------------------------------------------------------------------- + + def _handle_initialize(req_id: Any) -> dict[str, Any]: return _jsonrpc_result( req_id, @@ -77,11 +87,11 @@ def _handle_initialize(req_id: Any) -> dict[str, Any]: def _handle_tools_list(req_id: Any, skills: list[SkillInfo]) -> dict[str, Any]: tools = [] - for skill in skills: - schema = json.loads(skill.args_schema) + for s in skills: + schema = json.loads(s.args_schema) description = schema.pop("description", None) schema.pop("title", None) - tool = {"name": skill.func_name, "inputSchema": schema} + tool: dict[str, Any] = {"name": s.func_name, "inputSchema": schema} if description: tool["description"] = description tools.append(tool) @@ -128,7 +138,7 @@ async def handle_request( params = request.get("params", {}) or {} req_id = request.get("id") - # JSON-RPC notifications have no "id" – the server must not reply. + # JSON-RPC notifications have no "id" -- the server must not reply. if "id" not in request: return None @@ -158,6 +168,11 @@ async def mcp_endpoint(request: Request) -> Response: return JSONResponse(result) +# --------------------------------------------------------------------------- +# McpServer Module +# --------------------------------------------------------------------------- + + class McpServer(Module): def __init__(self) -> None: super().__init__() @@ -183,12 +198,62 @@ def stop(self) -> None: @rpc def on_system_modules(self, modules: list[RPCClient]) -> None: assert self.rpc is not None - app.state.skills = [skill for module in modules for skill in (module.get_skills() or [])] + app.state.skills = [ + skill_info for module in modules for skill_info in (module.get_skills() or []) + ] app.state.rpc_calls = { - skill.func_name: RpcCall(None, self.rpc, skill.func_name, skill.class_name, []) - for skill in app.state.skills + skill_info.func_name: RpcCall( + None, self.rpc, skill_info.func_name, skill_info.class_name, [] + ) + for skill_info in app.state.skills } + # ------------------------------------------------------------------ + # Introspection skills (exposed as MCP tools via tools/list) + # ------------------------------------------------------------------ + + @skill + def server_status(self) -> str: + """Get MCP server status: main process PID, deployed modules, and skill count.""" + from dimos.core.run_registry import get_most_recent + + skills: list[SkillInfo] = app.state.skills + modules = list(dict.fromkeys(s.class_name for s in skills)) + entry = get_most_recent() + pid = entry.pid if entry else os.getpid() + return json.dumps( + { + "pid": pid, + "modules": modules, + "skills": [s.func_name for s in skills], + } + ) + + @skill + def list_modules(self) -> str: + """List deployed modules and their skills.""" + skills: list[SkillInfo] = app.state.skills + modules: dict[str, list[str]] = {} + for s in skills: + modules.setdefault(s.class_name, []).append(s.func_name) + return json.dumps({"modules": modules}) + + @skill + def agent_send(self, message: str) -> str: + """Send a message to the running DimOS agent via LCM.""" + if not message: + raise ValueError("Message cannot be empty") + + from dimos.core.transport import pLCMTransport + + transport: pLCMTransport[str] = pLCMTransport("/human_input") + try: + transport.start() + transport.publish(message) + return f"Message sent to agent: {message[:100]}" + finally: + transport.stop() + def _start_server(self, port: int | None = None) -> None: from dimos.core.global_config import global_config diff --git a/dimos/core/test_mcp_integration.py b/dimos/core/test_mcp_integration.py new file mode 100644 index 0000000000..543b9a7fbd --- /dev/null +++ b/dimos/core/test_mcp_integration.py @@ -0,0 +1,423 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MCP server integration tests. + +These tests build a real StressTestModule + McpServer blueprint and exercise +every CLI command and JSON-RPC method. They are marked ``slow`` so local +``pytest`` skips them by default (CI runs them). + +**Performance note:** Fixtures that create a running blueprint are +*class-scoped* so the ~4 s startup cost is paid once per class instead of +once per test. Only classes that explicitly manage their own lifecycle +(Recovery, RapidRestart) use per-test setup. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +import json +import os +from typing import TYPE_CHECKING + +import pytest +import requests +from typer.testing import CliRunner + +from dimos.agents.mcp.mcp_adapter import McpAdapter +from dimos.agents.mcp.mcp_server import McpServer +from dimos.core.blueprints import autoconnect +from dimos.core.global_config import global_config +from dimos.core.run_registry import ( + RunEntry, + cleanup_stale, + list_runs, +) +from dimos.core.tests.stress_test_module import StressTestModule +from dimos.robot.cli.dimos import main + +if TYPE_CHECKING: + from collections.abc import Generator + + from dimos.core.module_coordinator import ModuleCoordinator + +MCP_URL = f"http://localhost:{global_config.mcp_port}/mcp" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _ci_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("CI", "1") + + +@pytest.fixture(autouse=True) +def _clean_registry( + tmp_path: object, monkeypatch: pytest.MonkeyPatch +) -> Generator[object, None, None]: + from pathlib import Path + + import dimos.core.run_registry as _reg + + test_dir = Path(str(tmp_path)) / "runs" + test_dir.mkdir() + monkeypatch.setattr(_reg, "REGISTRY_DIR", test_dir) + yield test_dir + + +@pytest.fixture(scope="class") +def mcp_shared(request: pytest.FixtureRequest) -> Generator[ModuleCoordinator, None, None]: + """Build a shared StressTestModule + McpServer. Class-scoped -- started + once, torn down after every test in the class finishes. Use for + read-only tests that don't stop/restart the server.""" + global_config.update(viewer="none", n_workers=1) + bp = autoconnect( + StressTestModule.blueprint(), + McpServer.blueprint(), + ) + coord = bp.build() + ready = _adapter().wait_for_ready() + yield coord + coord.stop() + assert ready, "MCP server did not start within timeout" + + +@pytest.fixture() +def mcp_entry(mcp_shared: ModuleCoordinator, tmp_path: object) -> Generator[RunEntry, None, None]: + """Create registry entry for the running blueprint.""" + from pathlib import Path + + log_dir = Path(str(tmp_path)) / "stress-logs" + entry = RunEntry( + run_id=f"stress-{datetime.now(timezone.utc).strftime('%H%M%S%f')}", + pid=os.getpid(), + blueprint="stress-test", + started_at=datetime.now(timezone.utc).isoformat(), + log_dir=str(log_dir), + cli_args=["stress-test"], + config_overrides={"n_workers": 1}, + ) + entry.save() + yield entry + entry.remove() + + +def _adapter() -> McpAdapter: + """Return an McpAdapter using GlobalConfig port.""" + return McpAdapter() + + +# --------------------------------------------------------------------------- +# Tests -- read-only against a shared MCP server +# --------------------------------------------------------------------------- + + +@pytest.mark.slow +class TestMCPLifecycle: + """MCP server lifecycle: start -> respond -> stop -> dead.""" + + def test_mcp_responds_after_build(self, mcp_shared: ModuleCoordinator) -> None: + """After blueprint.build(), MCP should accept requests.""" + result = _adapter().call("initialize") + assert result["result"]["serverInfo"]["name"] == "dimensional" + + def test_tools_list_includes_stress_skills(self, mcp_shared: ModuleCoordinator) -> None: + """tools/list should include echo, ping, slow, info from StressTestModule.""" + result = _adapter().call("tools/list") + tool_names = {t["name"] for t in result["result"]["tools"]} + assert "echo" in tool_names + assert "ping" in tool_names + assert "slow" in tool_names + assert "info" in tool_names + # McpServer introspection skills + assert "server_status" in tool_names + assert "list_modules" in tool_names + assert "agent_send" in tool_names + + def test_echo_tool(self, mcp_shared: ModuleCoordinator) -> None: + """Call echo tool -- should return the message.""" + result = _adapter().call( + "tools/call", + { + "name": "echo", + "arguments": {"message": "hello from stress test"}, + }, + ) + text = result["result"]["content"][0]["text"] + assert text == "hello from stress test" + + def test_ping_tool(self, mcp_shared: ModuleCoordinator) -> None: + """Call ping tool -- should return 'pong'.""" + result = _adapter().call("tools/call", {"name": "ping", "arguments": {}}) + text = result["result"]["content"][0]["text"] + assert text == "pong" + + def test_info_tool_returns_pid(self, mcp_shared: ModuleCoordinator) -> None: + """info tool should return process info.""" + result = _adapter().call("tools/call", {"name": "info", "arguments": {}}) + text = result["result"]["content"][0]["text"] + assert "pid=" in text + + def test_server_status_tool(self, mcp_shared: ModuleCoordinator) -> None: + """server_status tool should return module and skill info.""" + text = _adapter().call_tool_text("server_status") + data = json.loads(text) + assert "pid" in data + assert "modules" in data + assert "skills" in data + assert "StressTestModule" in data["modules"] + + def test_list_modules_tool(self, mcp_shared: ModuleCoordinator) -> None: + """list_modules tool should group skills by module.""" + text = _adapter().call_tool_text("list_modules") + modules = json.loads(text)["modules"] + assert "StressTestModule" in modules + assert "echo" in modules["StressTestModule"] + + +@pytest.mark.slow +class TestMCPCLICommands: + """Test dimos mcp CLI commands against real MCP server.""" + + def test_cli_list_tools(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp list-tools should output JSON with tools.""" + result = CliRunner().invoke(main, ["mcp", "list-tools"]) + assert result.exit_code == 0 + tools = json.loads(result.output) + tool_names = {t["name"] for t in tools} + assert "echo" in tool_names + assert "ping" in tool_names + + def test_cli_call_echo(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp call echo --arg message=hello should return hello.""" + result = CliRunner().invoke(main, ["mcp", "call", "echo", "--arg", "message=hello"]) + assert result.exit_code == 0 + assert "hello" in result.output + + def test_cli_mcp_status(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp status should return JSON with modules.""" + result = CliRunner().invoke(main, ["mcp", "status"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "StressTestModule" in data["modules"] + + def test_cli_mcp_modules(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp modules should show module-skill mapping.""" + result = CliRunner().invoke(main, ["mcp", "modules"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "StressTestModule" in data["modules"] + assert "echo" in data["modules"]["StressTestModule"] + + +@pytest.mark.slow +class TestMCPErrorHandling: + """Test MCP error handling and edge cases.""" + + def test_call_nonexistent_tool(self, mcp_shared: ModuleCoordinator) -> None: + """Calling a tool that doesn't exist should return an error message.""" + result = _adapter().call( + "tools/call", + { + "name": "nonexistent_tool_xyz", + "arguments": {}, + }, + ) + text = result["result"]["content"][0]["text"] + assert "not found" in text.lower() + + def test_call_tool_wrong_args(self, mcp_shared: ModuleCoordinator) -> None: + """Calling a tool with wrong argument types should still return.""" + result = _adapter().call( + "tools/call", + { + "name": "echo", + "arguments": {}, + }, + ) + assert "result" in result or "error" in result + + def test_unknown_jsonrpc_method(self, mcp_shared: ModuleCoordinator) -> None: + """Unknown JSON-RPC method should return error.""" + result = _adapter().call("nonexistent/method") + assert "error" in result + assert result["error"]["code"] == -32601 + + def test_mcp_handles_malformed_json(self, mcp_shared: ModuleCoordinator) -> None: + """MCP should handle malformed JSON gracefully.""" + resp = requests.post( + MCP_URL, + data=b"not json{{{", + headers={"Content-Type": "application/json"}, + timeout=5, + ) + assert resp.status_code == 400 + + def test_rapid_tool_calls(self, mcp_shared: ModuleCoordinator) -> None: + """Fire 20 rapid echo calls -- all should succeed.""" + for i in range(20): + result = _adapter().call( + "tools/call", + { + "name": "echo", + "arguments": {"message": f"rapid-{i}"}, + }, + ) + text = result["result"]["content"][0]["text"] + assert text == f"rapid-{i}", f"Mismatch on call {i}" + + def test_cli_call_tool_wrong_arg_format(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp call with bad --arg format should error.""" + result = CliRunner().invoke(main, ["mcp", "call", "echo", "--arg", "no_equals_sign"]) + assert result.exit_code == 2 # click ParamType validation error + assert "KEY=VALUE" in result.output + + def test_cli_call_json_args(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp call --json-args should work.""" + result = CliRunner().invoke( + main, ["mcp", "call", "echo", "--json-args", '{"message": "json-test"}'] + ) + assert result.exit_code == 0 + assert "json-test" in result.output + + def test_cli_call_bad_json_args(self, mcp_shared: ModuleCoordinator) -> None: + """dimos mcp call with invalid --json-args should error.""" + result = CliRunner().invoke(main, ["mcp", "call", "echo", "--json-args", "not json"]) + assert result.exit_code == 1 + assert "invalid JSON" in result.output + + +@pytest.mark.slow +class TestAgentSend: + """Test dimos agent-send CLI and MCP method.""" + + def test_agent_send_via_mcp(self, mcp_shared: ModuleCoordinator) -> None: + """agent_send tool should route message via LCM.""" + text = _adapter().call_tool_text("agent_send", {"message": "hello agent"}) + assert "hello agent" in text + + def test_agent_send_empty_message(self, mcp_shared: ModuleCoordinator) -> None: + """Empty message should return error text.""" + result = _adapter().call_tool("agent_send", {"message": ""}) + text = result.get("content", [{}])[0].get("text", "") + assert "error" in text.lower() or "empty" in text.lower() + + def test_agent_send_cli(self, mcp_shared: ModuleCoordinator) -> None: + """dimos agent-send 'hello' should work.""" + result = CliRunner().invoke(main, ["agent-send", "hello from CLI"]) + assert result.exit_code == 0 + assert "hello from CLI" in result.output + + +# --------------------------------------------------------------------------- +# Tests -- lifecycle management (own setup/teardown per test) +# --------------------------------------------------------------------------- + + +@pytest.mark.slow +class TestDaemonMCPRecovery: + """Test MCP recovery after daemon crashes and restarts.""" + + def test_restart_after_clean_stop(self) -> None: + """Stop then start again -- MCP should come back.""" + global_config.update(viewer="none", n_workers=1) + + # First run + bp1 = autoconnect(StressTestModule.blueprint(), McpServer.blueprint()) + coord1 = bp1.build() + assert _adapter().wait_for_ready(), "First MCP start failed" + coord1.stop() + assert _adapter().wait_for_down(), "MCP didn't stop" + + # Second run -- should work without port conflicts + bp2 = autoconnect(StressTestModule.blueprint(), McpServer.blueprint()) + coord2 = bp2.build() + assert _adapter().wait_for_ready(), "Second MCP start failed (port conflict?)" + coord2.stop() + + def test_registry_cleanup_after_stop(self, mcp_entry: RunEntry) -> None: + """Registry entry should be removable after stop.""" + runs = list_runs(alive_only=True) + assert len(runs) == 1 + assert mcp_entry.run_id in [r.run_id for r in runs] + + # Fixture teardown will call remove() -- just verify entry exists and is valid + assert mcp_entry.pid > 0 + + def test_stale_cleanup_after_crash(self) -> None: + """Stale entries from crashed processes should be cleaned up.""" + stale = RunEntry( + run_id="crashed-mcp-test", + pid=99999999, + blueprint="stress-test", + started_at=datetime.now(timezone.utc).isoformat(), + log_dir="/tmp/ghost", + cli_args=[], + config_overrides={}, + ) + stale.save() + assert len(list_runs(alive_only=False)) == 1 + + removed = cleanup_stale() + assert removed == 1 + assert len(list_runs(alive_only=False)) == 0 + + +@pytest.mark.slow +class TestMCPRapidRestart: + """Test rapid stop/start cycles.""" + + def test_three_restart_cycles(self) -> None: + """Start -> stop -> start 3 times -- no port conflicts.""" + global_config.update(viewer="none", n_workers=1) + + for cycle in range(3): + bp = autoconnect(StressTestModule.blueprint(), McpServer.blueprint()) + coord = bp.build() + assert _adapter().wait_for_ready(timeout=15), f"MCP failed to start on cycle {cycle}" + + result = _adapter().call("tools/call", {"name": "ping", "arguments": {}}) + assert result["result"]["content"][0]["text"] == "pong" + + coord.stop() + assert _adapter().wait_for_down(timeout=10), f"MCP failed to stop on cycle {cycle}" + + +@pytest.mark.slow +class TestMCPNoServer: + """Tests that require NO MCP server running.""" + + def test_mcp_dead_after_stop(self) -> None: + """After coordinator.stop(), MCP should stop responding.""" + global_config.update(viewer="none", n_workers=1) + bp = autoconnect(StressTestModule.blueprint(), McpServer.blueprint()) + coord = bp.build() + assert _adapter().wait_for_ready(), "MCP server did not start" + + coord.stop() + assert _adapter().wait_for_down(), "MCP server did not stop" + + def test_cli_no_server_error(self) -> None: + """dimos mcp list-tools with no server should exit with error.""" + result = CliRunner().invoke(main, ["mcp", "list-tools"]) + assert result.exit_code == 1 + assert "no running" in result.output.lower() or "error" in result.output.lower() + + def test_agent_send_cli_no_server(self) -> None: + """dimos agent-send with no server should exit with error.""" + result = CliRunner().invoke(main, ["agent-send", "hello"]) + assert result.exit_code == 1 diff --git a/dimos/core/tests/demo_devex.py b/dimos/core/tests/demo_devex.py new file mode 100644 index 0000000000..b9ac1393d7 --- /dev/null +++ b/dimos/core/tests/demo_devex.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Full-stack developer experience test — no pytest. + +Simulates the complete DimOS developer workflow as if an OpenClaw agent +is using DimOS for the first time: + +1. dimos run stress-test --daemon (start in background) +2. dimos status (verify running) +3. dimos mcp list-tools (discover tools) +4. dimos mcp call echo (call a tool) +5. dimos mcp status (module info) +6. dimos mcp modules (module-skill mapping) +7. dimos agent-send "hello" (send to agent) +8. Check logs for responses +9. dimos stop (clean shutdown) +10. dimos status (verify stopped) + +Usage: + python -m dimos.core.tests.e2e_devex_test +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +import time + +REPO_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +VENV_PYTHON = os.path.join(REPO_DIR, ".venv", "bin", "python") +# Use the repo's own python if venv exists, otherwise fall back to system +if not os.path.exists(VENV_PYTHON): + VENV_PYTHON = sys.executable + + +def run_dimos(*args: str, timeout: float = 30) -> subprocess.CompletedProcess[str]: + """Run a dimos CLI command.""" + cmd = [VENV_PYTHON, "-m", "dimos.robot.cli.dimos", *args] + env = {**os.environ, "CI": "1", "PYTHONPATH": REPO_DIR} + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout, cwd=REPO_DIR, env=env + ) + return result + + +def p(msg: str, ok: bool = True) -> None: + icon = "\u2705" if ok else "\u274c" + print(f" {icon} {msg}") + + +def section(title: str) -> None: + print(f"\n{'=' * 60}") + print(f" {title}") + print(f"{'=' * 60}") + + +def wait_for_mcp(timeout: float = 20.0) -> bool: + """Poll MCP until responsive.""" + import requests + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + resp = requests.post( + "http://localhost:9990/mcp", + json={"jsonrpc": "2.0", "id": 1, "method": "initialize"}, + timeout=2, + ) + if resp.status_code == 200: + return True + except Exception: + pass + time.sleep(0.5) + return False + + +def main() -> None: + failures = 0 + print("\n" + "=" * 60) + print(" FULL-STACK DEVELOPER EXPERIENCE TEST") + print(" Simulating: OpenClaw agent using DimOS") + print("=" * 60) + + # --------------------------------------------------------------- + # Step 1: dimos run stress-test --daemon + # --------------------------------------------------------------- + section("Step 1: dimos run stress-test --daemon") + result = run_dimos("run", "stress-test", "--daemon", timeout=60) + print(f" stdout: {result.stdout.strip()[:200]}") + if result.stderr: + # Filter out noisy log lines + err_lines = [ + l + for l in result.stderr.strip().split("\n") + if not any(x in l for x in ["[inf]", "[dbg]", "INFO:", "WARNING:"]) + ] + if err_lines: + print(f" stderr: {chr(10).join(err_lines[:5])}") + + if result.returncode == 0: + p("Daemon started successfully") + else: + p(f"Daemon failed to start (exit={result.returncode})", ok=False) + print(f" Full stderr:\n{result.stderr[:500]}") + failures += 1 + # Try to continue anyway — maybe foreground mode issue + + # Wait for MCP to be ready + if wait_for_mcp(timeout=20): + p("MCP server responding") + else: + p("MCP server not responding after 20s", ok=False) + failures += 1 + print(" Cannot continue without MCP. Exiting.") + sys.exit(1) + + # --------------------------------------------------------------- + # Step 2: dimos status + # --------------------------------------------------------------- + section("Step 2: dimos status") + result = run_dimos("status") + print(f" output: {result.stdout.strip()[:300]}") + if result.returncode == 0 and ( + "running" in result.stdout.lower() or "pid" in result.stdout.lower() + ): + p("Status shows running instance") + else: + p(f"Status unclear (exit={result.returncode})", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 3: dimos mcp list-tools + # --------------------------------------------------------------- + section("Step 3: dimos mcp list-tools") + result = run_dimos("mcp", "list-tools") + if result.returncode == 0: + try: + tools = json.loads(result.stdout) + tool_names = [t["name"] for t in tools] + p(f"Discovered {len(tools)} tools: {', '.join(tool_names)}") + if "echo" in tool_names and "ping" in tool_names: + p("Expected tools (echo, ping) present") + else: + p("Missing expected tools", ok=False) + failures += 1 + except json.JSONDecodeError: + p(f"Invalid JSON output: {result.stdout[:100]}", ok=False) + failures += 1 + else: + p(f"list-tools failed (exit={result.returncode}): {result.stdout[:100]}", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 4: dimos mcp call echo --arg message=hello + # --------------------------------------------------------------- + section("Step 4: dimos mcp call echo --arg message=hello") + result = run_dimos("mcp", "call", "echo", "--arg", "message=hello-from-devex-test") + if result.returncode == 0 and "hello-from-devex-test" in result.stdout: + p(f"echo returned: {result.stdout.strip()[:100]}") + else: + p(f"echo call failed (exit={result.returncode}): {result.stdout[:100]}", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 5: dimos mcp status + # --------------------------------------------------------------- + section("Step 5: dimos mcp status") + result = run_dimos("mcp", "status") + if result.returncode == 0: + try: + data = json.loads(result.stdout) + p( + f"Status: pid={data.get('pid')}, {data.get('skill_count', '?')} skills, modules={data.get('modules', [])}" + ) + except json.JSONDecodeError: + p(f"Non-JSON output: {result.stdout[:100]}", ok=False) + failures += 1 + else: + p(f"mcp status failed (exit={result.returncode})", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 6: dimos mcp modules + # --------------------------------------------------------------- + section("Step 6: dimos mcp modules") + result = run_dimos("mcp", "modules") + if result.returncode == 0: + try: + data = json.loads(result.stdout) + for mod_name, skills in data.get("modules", {}).items(): + p(f"Module {mod_name}: {', '.join(skills)}") + except json.JSONDecodeError: + p(f"Non-JSON output: {result.stdout[:100]}", ok=False) + failures += 1 + else: + p(f"mcp modules failed (exit={result.returncode})", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 7: dimos agent-send "hello" + # --------------------------------------------------------------- + section("Step 7: dimos agent-send 'what tools do you have?'") + result = run_dimos("agent-send", "what tools do you have?") + if result.returncode == 0: + p(f"agent-send response: {result.stdout.strip()[:200]}") + else: + p(f"agent-send failed (exit={result.returncode}): {result.stdout[:100]}", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 8: Check logs + # --------------------------------------------------------------- + section("Step 8: Check per-run logs") + log_base = os.path.expanduser("~/.local/state/dimos/logs") + if os.path.isdir(log_base): + runs = sorted(os.listdir(log_base), reverse=True) + if runs: + latest_run = runs[0] + log_file = os.path.join(log_base, latest_run, "main.jsonl") + if os.path.exists(log_file): + size = os.path.getsize(log_file) + with open(log_file) as f: + lines = f.readlines() + p(f"Log file: {log_file} ({size} bytes, {len(lines)} lines)") + if lines: + # Show last 3 lines + for line in lines[-3:]: + print(f" {line.strip()[:120]}") + else: + p(f"No main.jsonl found in {latest_run}", ok=False) + # Check what files exist + run_dir = os.path.join(log_base, latest_run) + files = os.listdir(run_dir) if os.path.isdir(run_dir) else [] + print(f" Files in run dir: {files}") + failures += 1 + else: + p("No run directories found", ok=False) + failures += 1 + else: + p(f"Log base dir not found: {log_base}", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Step 9: dimos stop + # --------------------------------------------------------------- + section("Step 9: dimos stop") + result = run_dimos("stop") + print(f" output: {result.stdout.strip()[:200]}") + if result.returncode == 0: + p("Stopped successfully") + else: + p(f"Stop failed (exit={result.returncode}): {result.stderr[:100]}", ok=False) + failures += 1 + + # Wait for shutdown + time.sleep(2) + + # --------------------------------------------------------------- + # Step 10: dimos status (verify stopped) + # --------------------------------------------------------------- + section("Step 10: dimos status (verify stopped)") + result = run_dimos("status") + print(f" output: {result.stdout.strip()[:200]}") + if ( + "no running" in result.stdout.lower() + or "no dimos" in result.stdout.lower() + or result.returncode == 0 + ): + p("Confirmed: no running instances") + else: + p(f"Unexpected status after stop (exit={result.returncode})", ok=False) + failures += 1 + + # --------------------------------------------------------------- + # Summary + # --------------------------------------------------------------- + print("\n" + "=" * 60) + if failures == 0: + print(" \u2705 FULL DEVELOPER EXPERIENCE TEST PASSED") + print(" All CLI commands work end-to-end!") + else: + print(f" \u274c {failures} FAILURES in developer experience test") + print("=" * 60 + "\n") + + sys.exit(1 if failures else 0) + + +if __name__ == "__main__": + main() diff --git a/dimos/core/tests/demo_mcp_killtest.py b/dimos/core/tests/demo_mcp_killtest.py new file mode 100644 index 0000000000..b933bb9a1c --- /dev/null +++ b/dimos/core/tests/demo_mcp_killtest.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Standalone MCP kill/restart stress test — no pytest. + +Simulates what happens when DimOS crashes and restarts: +1. Start blueprint with MCP server +2. Verify MCP is responsive +3. Send burst of calls +4. Kill the process (SIGKILL) +5. Verify MCP is dead +6. Restart +7. Verify recovery +8. Repeat N cycles + +Usage: + python -m dimos.core.tests.e2e_mcp_killtest + python -m dimos.core.tests.e2e_mcp_killtest --cycles 5 +""" + +from __future__ import annotations + +import argparse +import multiprocessing +import multiprocessing.synchronize +import os +import signal +import sys +import time +from typing import Any + +import requests + +MCP_PORT = 9990 +MCP_URL = f"http://localhost:{MCP_PORT}/mcp" + + +def mcp_call(method: str, params: dict[str, object] | None = None) -> Any: + payload: dict[str, object] = {"jsonrpc": "2.0", "id": 1, "method": method} + if params: + payload["params"] = params + resp = requests.post(MCP_URL, json=payload, timeout=10) + resp.raise_for_status() + return resp.json() + + +def wait_for_mcp(timeout: float = 15.0) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + resp = requests.post( + MCP_URL, + json={"jsonrpc": "2.0", "id": 1, "method": "initialize"}, + timeout=2, + ) + if resp.status_code == 200: + return True + except (requests.ConnectionError, requests.ReadTimeout): + pass + time.sleep(0.3) + return False + + +def wait_for_mcp_down(timeout: float = 10.0) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + requests.post( + MCP_URL, + json={"jsonrpc": "2.0", "id": 1, "method": "initialize"}, + timeout=1, + ) + except (requests.ConnectionError, requests.ReadTimeout): + return True + time.sleep(0.3) + return False + + +def run_blueprint_in_process(ready_event: multiprocessing.synchronize.Event) -> None: + os.environ["CI"] = "1" + from dimos.agents.mcp.mcp_server import McpServer + from dimos.core.blueprints import autoconnect + from dimos.core.global_config import global_config + from dimos.core.tests.stress_test_module import StressTestModule + + global_config.update(viewer="none", n_workers=1) + bp = autoconnect(StressTestModule.blueprint(), McpServer.blueprint()) + coord = bp.build() + ready_event.set() + try: + while True: + time.sleep(1) + except (KeyboardInterrupt, SystemExit): + coord.stop() + + +def p(msg: str, ok: bool = True) -> None: + icon = "\u2705" if ok else "\u274c" + print(f" {icon} {msg}") + + +def section(title: str) -> None: + print(f"\n{'=' * 60}") + print(f" {title}") + print(f"{'=' * 60}") + + +def test_mcp_basic_ops() -> int: + failures = 0 + [ + ("initialize", lambda: (mcp_call("initialize"), "initialize \u2192 dimensional")), + ] + + # initialize + try: + result = mcp_call("initialize") + assert result["result"]["serverInfo"]["name"] == "dimensional" + p("initialize \u2192 dimensional") + except Exception as e: + p(f"initialize failed: {e}", ok=False) + failures += 1 + + # tools/list + try: + result = mcp_call("tools/list") + tools = {t["name"] for t in result["result"]["tools"]} + assert "echo" in tools and "ping" in tools + p(f"tools/list \u2192 {len(tools)} tools") + except Exception as e: + p(f"tools/list failed: {e}", ok=False) + failures += 1 + + # echo + try: + result = mcp_call("tools/call", {"name": "echo", "arguments": {"message": "killtest"}}) + assert result["result"]["content"][0]["text"] == "killtest" + p("echo \u2192 killtest") + except Exception as e: + p(f"echo failed: {e}", ok=False) + failures += 1 + + # ping + try: + result = mcp_call("tools/call", {"name": "ping", "arguments": {}}) + assert result["result"]["content"][0]["text"] == "pong" + p("ping \u2192 pong") + except Exception as e: + p(f"ping failed: {e}", ok=False) + failures += 1 + + # dimos/status + try: + result = mcp_call("dimos/status") + assert "StressTestModule" in result["result"]["modules"] + p(f"dimos/status \u2192 pid={result['result']['pid']}") + except Exception as e: + p(f"dimos/status failed: {e}", ok=False) + failures += 1 + + # dimos/agent_send + try: + result = mcp_call("dimos/agent_send", {"message": "hello from killtest"}) + assert "hello from killtest" in result["result"]["content"][0]["text"] + p("agent_send \u2192 delivered") + except Exception as e: + p(f"agent_send failed: {e}", ok=False) + failures += 1 + + # rapid burst + try: + for i in range(10): + r = mcp_call("tools/call", {"name": "echo", "arguments": {"message": f"burst-{i}"}}) + assert r["result"]["content"][0]["text"] == f"burst-{i}" + p("rapid burst \u2192 10/10 echo calls OK") + except Exception as e: + p(f"rapid burst failed: {e}", ok=False) + failures += 1 + + # error handling + try: + result = mcp_call("nonexistent/method") + assert "error" in result + p("unknown method \u2192 error (correct)") + except Exception as e: + p(f"error handling failed: {e}", ok=False) + failures += 1 + + return failures + + +def run_kill_restart_cycle(cycle: int) -> int: + failures = 0 + section(f"CYCLE {cycle}: Starting DimOS") + + ctx = multiprocessing.get_context("spawn") + ready = ctx.Event() + proc = ctx.Process(target=run_blueprint_in_process, args=(ready,)) + proc.start() + + if not ready.wait(timeout=30): + p("Blueprint failed to start within 30s", ok=False) + proc.kill() + proc.join(5) + return 1 + + if not wait_for_mcp(timeout=15): + p("MCP server did not come up", ok=False) + proc.kill() + proc.join(5) + return 1 + p("MCP server is up") + + failures += test_mcp_basic_ops() + + # KILL + section(f"CYCLE {cycle}: SIGKILL \u2192 simulating crash") + assert proc.pid is not None + os.kill(proc.pid, signal.SIGKILL) + proc.join(10) + p(f"Process killed (pid={proc.pid}, exitcode={proc.exitcode})") + + if wait_for_mcp_down(timeout=10): + p("MCP confirmed dead after kill") + else: + p("MCP still responding after kill!", ok=False) + failures += 1 + + time.sleep(1) + + # RESTART + section(f"CYCLE {cycle}: Restarting DimOS") + ready2 = ctx.Event() + proc2 = ctx.Process(target=run_blueprint_in_process, args=(ready2,)) + proc2.start() + + if not ready2.wait(timeout=30): + p("Blueprint failed to restart within 30s", ok=False) + proc2.kill() + proc2.join(5) + return failures + 1 + + if not wait_for_mcp(timeout=15): + p("MCP server did not recover after restart", ok=False) + proc2.kill() + proc2.join(5) + return failures + 1 + p("MCP server recovered!") + + section(f"CYCLE {cycle}: Post-recovery verification") + failures += test_mcp_basic_ops() + + # Clean shutdown + section(f"CYCLE {cycle}: Clean shutdown") + assert proc2.pid is not None + os.kill(proc2.pid, signal.SIGTERM) + proc2.join(10) + if proc2.exitcode is not None: + p(f"Clean shutdown (exitcode={proc2.exitcode})") + else: + p("Process did not exit cleanly, forcing kill", ok=False) + proc2.kill() + proc2.join(5) + failures += 1 + + if wait_for_mcp_down(timeout=10): + p("MCP confirmed dead after clean shutdown") + else: + p("MCP still responding after shutdown!", ok=False) + failures += 1 + + time.sleep(1) + return failures + + +def main() -> None: + parser = argparse.ArgumentParser(description="MCP kill/restart stress test") + parser.add_argument("--cycles", type=int, default=3, help="Number of kill/restart cycles") + args = parser.parse_args() + + print("\n" + "=" * 60) + print(" MCP KILL/RESTART STRESS TEST") + print(f" Cycles: {args.cycles}") + print("=" * 60) + + total_failures = 0 + for cycle in range(1, args.cycles + 1): + failures = run_kill_restart_cycle(cycle) + total_failures += failures + + print("\n" + "=" * 60) + if total_failures == 0: + print(" \u2705 ALL CYCLES PASSED \u2014 MCP is resilient to kill/restart") + else: + print(f" \u274c {total_failures} FAILURES across {args.cycles} cycles") + print("=" * 60 + "\n") + + sys.exit(1 if total_failures else 0) + + +if __name__ == "__main__": + main() diff --git a/dimos/core/tests/stress_test_blueprint.py b/dimos/core/tests/stress_test_blueprint.py new file mode 100644 index 0000000000..6ad438b2dc --- /dev/null +++ b/dimos/core/tests/stress_test_blueprint.py @@ -0,0 +1,29 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Stress test blueprint: StressTestModule + McpServer. + +Lightweight, no hardware. Used for e2e daemon/MCP lifecycle testing. +""" + +from dimos.agents.mcp.mcp_server import McpServer +from dimos.core.blueprints import autoconnect +from dimos.core.tests.stress_test_module import StressTestModule + +demo_mcp_stress_test = autoconnect( + StressTestModule.blueprint(), + McpServer.blueprint(), +) + +__all__ = ["demo_mcp_stress_test"] diff --git a/dimos/core/tests/stress_test_module.py b/dimos/core/tests/stress_test_module.py new file mode 100644 index 0000000000..b8ad6d32b8 --- /dev/null +++ b/dimos/core/tests/stress_test_module.py @@ -0,0 +1,56 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Lightweight test module with MCP-callable skills for stress testing. + +Used by e2e tests to verify MCP lifecycle, agent CLI, and daemon robustness. +No hardware, no LFS, no replay files needed. +""" + +from __future__ import annotations + +import time + +from dimos.agents.annotation import skill +from dimos.core.module import Module + + +class StressTestModule(Module): + """Minimal module exposing test skills via MCP.""" + + @skill + def echo(self, message: str) -> str: + """Echo back the given message. Used for latency and connectivity tests.""" + return message + + @skill + def ping(self) -> str: + """Simple health check. Returns 'pong'.""" + return "pong" + + @skill + def slow(self, seconds: float = 1.0) -> str: + """Sleep for the given duration then return. Used for timeout tests.""" + time.sleep(seconds) + return f"slept {seconds}s" + + @skill + def info(self) -> str: + """Return module runtime info.""" + import os + + return f"pid={os.getpid()}, module={self.__class__.__name__}" + + def start(self) -> None: + super().start() diff --git a/dimos/core/worker.py b/dimos/core/worker.py index b0dd802841..f11e780f4f 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -13,7 +13,8 @@ # limitations under the License. from __future__ import annotations -import multiprocessing as mp +import multiprocessing +import os import threading import traceback from typing import TYPE_CHECKING, Any @@ -125,7 +126,7 @@ def __getattr__(self, name: str) -> Any: def get_forkserver_context() -> Any: global _forkserver_ctx if _forkserver_ctx is None: - _forkserver_ctx = mp.get_context("forkserver") + _forkserver_ctx = multiprocessing.get_context("forkserver") return _forkserver_ctx @@ -157,10 +158,17 @@ def module_count(self) -> int: @property def pid(self) -> int | None: """PID of the worker process, or ``None`` if not alive.""" - if self._process is not None and self._process.is_alive(): - p: int | None = self._process.pid - return p - return None + if self._process is None: + return None + try: + # Signal 0 just checks if the process is alive. + pid: int | None = self._process.pid + if pid is None: + return None + os.kill(pid, 0) + return pid + except OSError: + return None @property def worker_id(self) -> int: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index d02abd61b9..f842230550 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -46,6 +46,7 @@ "demo-google-maps-skill": "dimos.agents.skills.demo_google_maps_skill:demo_google_maps_skill", "demo-gps-nav": "dimos.agents.skills.demo_gps_nav:demo_gps_nav", "demo-grasping": "dimos.manipulation.grasping.demo_grasping:demo_grasping", + "demo-mcp-stress-test": "dimos.core.tests.stress_test_blueprint:demo_mcp_stress_test", "demo-object-scene-registration": "dimos.perception.demo_object_scene_registration:demo_object_scene_registration", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", "demo-skill": "dimos.agents.skills.demo_skill:demo_skill", diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 0fc35c9801..1e95a65dcc 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -15,12 +15,16 @@ from __future__ import annotations import inspect +import json import sys from typing import Any, get_args, get_origin +import click from dotenv import load_dotenv +import requests import typer +from dimos.agents.mcp.mcp_adapter import McpAdapter, McpError from dimos.core.global_config import GlobalConfig, global_config from dimos.utils.logging_config import setup_logger @@ -256,6 +260,140 @@ def stop( typer.echo(f" {msg}") +# --------------------------------------------------------------------------- +# MCP subcommands +# --------------------------------------------------------------------------- + +mcp_app = typer.Typer(help="Interact with the running MCP server") +main.add_typer(mcp_app, name="mcp") + + +def _get_adapter() -> McpAdapter: + """Get an McpAdapter from the latest RunEntry or default URL.""" + from dimos.agents.mcp.mcp_adapter import McpAdapter + + return McpAdapter.from_run_entry() + + +@mcp_app.command("list-tools") +def mcp_list_tools() -> None: + """List available MCP tools (skills).""" + try: + tools = _get_adapter().list_tools() + except requests.ConnectionError: + typer.echo("Error: no running MCP server (is DimOS running?)", err=True) + raise typer.Exit(1) + except McpError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + typer.echo(json.dumps(tools, indent=2)) + + +class _KeyValueType(click.ParamType): + """Parse KEY=VALUE arguments, auto-converting JSON values.""" + + name = "KEY=VALUE" + + def convert( + self, value: str, param: click.Parameter | None, ctx: click.Context | None + ) -> tuple[str, Any]: + if "=" not in value: + self.fail(f"expected KEY=VALUE, got: {value}", param, ctx) + key, val = value.split("=", 1) + try: + return (key, json.loads(val)) + except (json.JSONDecodeError, ValueError): + return (key, val) + + +@mcp_app.command("call") +def mcp_call_tool( + tool_name: str = typer.Argument(..., help="Tool name to call"), + args: list[str] = typer.Option( + [], "--arg", "-a", click_type=_KeyValueType(), help="Arguments as key=value" + ), + json_args: str = typer.Option("", "--json-args", "-j", help="Arguments as JSON string"), +) -> None: + """Call an MCP tool by name.""" + arguments: dict[str, Any] = {} + if json_args: + try: + arguments = json.loads(json_args) + except json.JSONDecodeError as e: + typer.echo(f"Error: invalid JSON in --json-args: {e}", err=True) + raise typer.Exit(1) + else: + # _KeyValueType.convert() returns (key, val) tuples at runtime + arguments = dict(args) # type: ignore[arg-type] + + try: + result = _get_adapter().call_tool(tool_name, arguments) + except requests.ConnectionError: + typer.echo("Error: no running MCP server (is DimOS running?)", err=True) + raise typer.Exit(1) + except McpError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + + content = result.get("content", []) + if not content: + typer.echo("(no output)") + return + for item in content: + typer.echo(item.get("text", str(item))) + + +@mcp_app.command("status") +def mcp_status() -> None: + """Show MCP server status (modules, skills).""" + try: + data = _get_adapter().call_tool_text("server_status") + except requests.ConnectionError: + typer.echo("Error: no running MCP server (is DimOS running?)", err=True) + raise typer.Exit(1) + except McpError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + # server_status returns JSON string -- pretty-print it + try: + typer.echo(json.dumps(json.loads(data), indent=2)) + except (json.JSONDecodeError, ValueError): + typer.echo(data) + + +@mcp_app.command("modules") +def mcp_modules() -> None: + """List deployed modules and their skills.""" + try: + data = _get_adapter().call_tool_text("list_modules") + except requests.ConnectionError: + typer.echo("Error: no running MCP server (is DimOS running?)", err=True) + raise typer.Exit(1) + except McpError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + try: + typer.echo(json.dumps(json.loads(data), indent=2)) + except (json.JSONDecodeError, ValueError): + typer.echo(data) + + +@main.command("agent-send") +def agent_send_cmd( + message: str = typer.Argument(..., help="Message to send to the running agent"), +) -> None: + """Send a message to the running DimOS agent via MCP.""" + try: + text = _get_adapter().call_tool_text("agent_send", {"message": message}) + except requests.ConnectionError: + typer.echo("Error: no running MCP server (is DimOS running?)", err=True) + raise typer.Exit(1) + except McpError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + typer.echo(text) + + @main.command() def show_config(ctx: typer.Context) -> None: """Show current config settings and their values.""" diff --git a/pyproject.toml b/pyproject.toml index f50b9cf32a..88f106297e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -316,6 +316,7 @@ docker = [ "pydantic", "pydantic-settings>=2.11.0,<3", "typer>=0.19.2,<1", + "requests>=2.28", "opencv-python-headless", "lcm", "sortedcontainers", diff --git a/uv.lock b/uv.lock index d76a022e63..fd39e45bb5 100644 --- a/uv.lock +++ b/uv.lock @@ -1861,6 +1861,7 @@ docker = [ { name = "pydantic-settings" }, { name = "pyturbojpeg" }, { name = "reactivex" }, + { name = "requests" }, { name = "rerun-sdk" }, { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "scipy", version = "1.17.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, @@ -2078,6 +2079,7 @@ requires-dist = [ { name = "pyyaml", marker = "extra == 'manipulation'", specifier = ">=6.0" }, { name = "reactivex" }, { name = "reactivex", marker = "extra == 'docker'" }, + { name = "requests", marker = "extra == 'docker'", specifier = ">=2.28" }, { name = "requests-mock", marker = "extra == 'dev'", specifier = "==1.12.1" }, { name = "rerun-sdk", specifier = ">=0.20.0" }, { name = "rerun-sdk", marker = "extra == 'docker'" },