From 0d7cad284fea09c76b8ecc0a6b4960e84716730e Mon Sep 17 00:00:00 2001 From: Noah Baertsch Date: Fri, 27 Feb 2026 18:51:35 -0500 Subject: [PATCH 1/3] fixed signature mis-match in streamable_http_client --- docs/tools.md | 3 +- src/py_code_mode/tools/adapters/mcp.py | 13 ++- src/py_code_mode/tools/registry.py | 1 - tests/test_mcp_adapter.py | 50 ++++++---- tests/test_mcp_server.py | 133 +++++++++++++++++++++++++ tests/test_registry.py | 44 +++++++- 6 files changed, 218 insertions(+), 26 deletions(-) diff --git a/docs/tools.md b/docs/tools.md index 27f8778..1a09d18 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -172,8 +172,7 @@ name: mythic type: mcp transport: streamable_http url: http://localhost:3333/mcp -timeout: 10 -sse_read_timeout: 120 +timeout: 10 # optional, 30 seconds if not specified ``` ### Agent Usage diff --git a/src/py_code_mode/tools/adapters/mcp.py b/src/py_code_mode/tools/adapters/mcp.py index e9b348d..d3cba19 100644 --- a/src/py_code_mode/tools/adapters/mcp.py +++ b/src/py_code_mode/tools/adapters/mcp.py @@ -178,7 +178,6 @@ async def connect_streamable_http( url: str, headers: dict[str, str] | None = None, timeout: float = 5.0, - sse_read_timeout: float = 300.0, *, namespace: str, ) -> MCPAdapter: @@ -191,7 +190,6 @@ async def connect_streamable_http( url: Streamable HTTP endpoint URL (e.g., "http://localhost:3333/mcp"). headers: Optional HTTP headers (e.g., for authentication). timeout: Connection timeout in seconds. - sse_read_timeout: Read timeout for server-sent events in seconds. namespace: Name for the tool namespace (e.g., "c2", "web"). Returns: @@ -203,8 +201,10 @@ async def connect_streamable_http( try: from contextlib import AsyncExitStack + import httpx from mcp import ClientSession from mcp.client.streamable_http import streamable_http_client + from mcp.shared._httpx_utils import create_mcp_http_client except ImportError as e: raise ImportError( "MCP package required for streamable HTTP connection. Install with: pip install mcp" @@ -212,12 +212,15 @@ async def connect_streamable_http( exit_stack = AsyncExitStack() + http_timeout = httpx.Timeout(timeout) + http_client = await exit_stack.enter_async_context( + create_mcp_http_client(headers=headers, timeout=http_timeout) + ) + transport = await exit_stack.enter_async_context( streamable_http_client( url, - headers=headers, - timeout=timeout, - sse_read_timeout=sse_read_timeout, + http_client=http_client, terminate_on_close=True, ) ) diff --git a/src/py_code_mode/tools/registry.py b/src/py_code_mode/tools/registry.py index 6386293..a27ff48 100644 --- a/src/py_code_mode/tools/registry.py +++ b/src/py_code_mode/tools/registry.py @@ -58,7 +58,6 @@ async def _load_mcp_adapter( url=mcp_config["url"], headers=mcp_config.get("headers"), timeout=float(mcp_config.get("timeout", 5.0)), - sse_read_timeout=float(mcp_config.get("sse_read_timeout", 300.0)), namespace=tool_name, ) else: diff --git a/tests/test_mcp_adapter.py b/tests/test_mcp_adapter.py index 1d37781..8fddb6a 100644 --- a/tests/test_mcp_adapter.py +++ b/tests/test_mcp_adapter.py @@ -305,13 +305,16 @@ class TestMCPAdapterStreamableHTTPTransport: """Tests for Streamable HTTP transport connection.""" @pytest.mark.asyncio - async def test_connect_streamable_http_uses_client(self) -> None: - """connect_streamable_http uses mcp.client.streamable_http.* client.""" + async def test_connect_streamable_http_passes_headers(self) -> None: + """connect_streamable_http forwards headers via http_client.""" from py_code_mode.tools.adapters.mcp import MCPAdapter with ( - patch("mcp.client.streamable_http.streamable_http_client") as mock_http_client, - patch("mcp.ClientSession") as mock_session_class, + patch( + "mcp.client.streamable_http.streamable_http_client", + autospec=True, + ) as mock_http_client, + patch("mcp.ClientSession", autospec=True) as mock_session_class, ): mock_read = AsyncMock() mock_write = AsyncMock() @@ -327,19 +330,29 @@ async def test_connect_streamable_http_uses_client(self) -> None: mock_session_cm.__aexit__.return_value = None mock_session_class.return_value = mock_session_cm - await MCPAdapter.connect_streamable_http("http://localhost:3333/mcp", namespace="test") + headers = {"Authorization": "Bearer token123"} + await MCPAdapter.connect_streamable_http( + "http://localhost:3333/mcp", + headers=headers, + namespace="test", + ) - mock_http_client.assert_called_once() assert mock_http_client.call_args[0][0] == "http://localhost:3333/mcp" + call_kwargs = mock_http_client.call_args[1] + assert set(call_kwargs) == {"http_client", "terminate_on_close"} + assert call_kwargs["http_client"].headers.get("Authorization") == "Bearer token123" @pytest.mark.asyncio - async def test_connect_streamable_http_passes_headers(self) -> None: - """connect_streamable_http forwards headers to the HTTP client.""" + async def test_connect_streamable_http_maps_timeout_to_http_client(self) -> None: + """connect_streamable_http maps timeout to http_client.""" from py_code_mode.tools.adapters.mcp import MCPAdapter with ( - patch("mcp.client.streamable_http.streamable_http_client") as mock_http_client, - patch("mcp.ClientSession") as mock_session_class, + patch( + "mcp.client.streamable_http.streamable_http_client", + autospec=True, + ) as mock_http_client, + patch("mcp.ClientSession", autospec=True) as mock_session_class, ): mock_read = AsyncMock() mock_write = AsyncMock() @@ -355,15 +368,15 @@ async def test_connect_streamable_http_passes_headers(self) -> None: mock_session_cm.__aexit__.return_value = None mock_session_class.return_value = mock_session_cm - headers = {"Authorization": "Bearer token123"} await MCPAdapter.connect_streamable_http( "http://localhost:3333/mcp", - headers=headers, + timeout=7.0, namespace="test", ) - call_kwargs = mock_http_client.call_args[1] - assert call_kwargs.get("headers") == headers + http_client = mock_http_client.call_args[1]["http_client"] + assert http_client.timeout.connect == 7.0 + assert http_client.timeout.read == 7.0 @pytest.mark.asyncio async def test_connect_streamable_http_initializes_session(self) -> None: @@ -371,8 +384,11 @@ async def test_connect_streamable_http_initializes_session(self) -> None: from py_code_mode.tools.adapters.mcp import MCPAdapter with ( - patch("mcp.client.streamable_http.streamable_http_client") as mock_http_client, - patch("mcp.ClientSession") as mock_session_class, + patch( + "mcp.client.streamable_http.streamable_http_client", + autospec=True, + ) as mock_http_client, + patch("mcp.ClientSession", autospec=True) as mock_session_class, ): mock_read = AsyncMock() mock_write = AsyncMock() @@ -524,7 +540,7 @@ async def test_connect_streamable_http_with_namespace(self) -> None: from py_code_mode.tools.adapters.mcp import MCPAdapter sig = inspect.signature(MCPAdapter.connect_streamable_http) - assert "namespace" in sig.parameters + assert set(sig.parameters) == {"url", "headers", "timeout", "namespace"} class TestMCPAdapterWithRegistry: diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 52c10c6..573da2c 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -15,6 +15,7 @@ from pathlib import Path import pytest +from aiohttp import web from mcp.client.stdio import StdioServerParameters # ============================================================================= @@ -106,6 +107,91 @@ async def run(url: str) -> str: return storage, tools_dir +@pytest.fixture +async def streamable_http_mcp_url(unused_tcp_port: int) -> str: + """Start a minimal local Streamable HTTP MCP server and return its URL.""" + + async def handle_mcp_post(request: web.Request) -> web.Response: + payload = await request.json() + method = payload.get("method") + request_id = payload.get("id") + + if method == "initialize": + return web.json_response( + { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "protocolVersion": "2025-06-18", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "test-streamable-http", "version": "0.1.0"}, + }, + } + ) + + if method == "notifications/initialized": + return web.Response(status=202) + + if method == "tools/list": + return web.json_response( + { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "tools": [ + { + "name": "hello", + "description": "Return a greeting", + "inputSchema": { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + } + ] + }, + } + ) + + if method == "tools/call": + arguments = (payload.get("params") or {}).get("arguments") or {} + name = arguments.get("name", "world") + return web.json_response( + { + "jsonrpc": "2.0", + "id": request_id, + "result": { + "content": [{"type": "text", "text": f"hello {name}"}], + "isError": False, + }, + } + ) + + if request_id is None: + return web.Response(status=202) + + return web.json_response( + { + "jsonrpc": "2.0", + "id": request_id, + "error": {"code": -32601, "message": f"Unknown method: {method}"}, + } + ) + + app = web.Application() + app.router.add_post("/mcp", handle_mcp_post) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port) + await site.start() + + try: + yield f"http://127.0.0.1:{unused_tcp_port}/mcp" + finally: + await runner.cleanup() + + # ============================================================================= # E2E Tests via Stdio - Production Path # ============================================================================= @@ -247,6 +333,53 @@ async def test_mcp_server_search_workflows( workflow_names = {s["name"] for s in workflows_data} assert "double" in workflow_names + @pytest.mark.asyncio + async def test_mcp_server_loads_streamable_http_tool_e2e( + self, + tmp_path: Path, + streamable_http_mcp_url: str, + ) -> None: + """E2E: streamable_http MCP tools load and are callable from run_code.""" + from mcp import ClientSession + from mcp.client.stdio import stdio_client + + storage_path = tmp_path / "storage" + storage_path.mkdir() + (storage_path / "workflows").mkdir() + (storage_path / "artifacts").mkdir() + + tools_path = tmp_path / "tools" + tools_path.mkdir() + (tools_path / "mythic.yaml").write_text( + f""" +name: mythic +type: mcp +transport: streamable_http +url: {streamable_http_mcp_url} +timeout: 5 +""".strip() + + "\n" + ) + + server_params = StdioServerParameters( + command="py-code-mode-mcp", + args=["--storage", str(storage_path), "--tools", str(tools_path)], + ) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + + list_result = await session.call_tool("list_tools", {}) + tools_data = json.loads(list_result.content[0].text) + tool_names = {t["name"] for t in tools_data} + assert "mythic" in tool_names + + run_result = await session.call_tool( + "run_code", {"code": 'tools.mythic.hello(name="Noah")'} + ) + assert "hello Noah" in run_result.content[0].text + # ------------------------------------------------------------------------- # Runtime Workflow Creation Tests # ------------------------------------------------------------------------- diff --git a/tests/test_registry.py b/tests/test_registry.py index 068a429..1a52e21 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,9 +1,12 @@ """Tests for ToolRegistry with flat namespace.""" +import logging +from unittest.mock import AsyncMock, patch + import pytest from py_code_mode.errors import ToolNotFoundError -from py_code_mode.tools.registry import ToolRegistry +from py_code_mode.tools.registry import ToolRegistry, _load_mcp_adapter from tests.conftest import ControllableEmbedder, MockAdapter @@ -265,6 +268,45 @@ async def test_close_all(self, network_adapter: MockAdapter, web_adapter: MockAd assert len(registry.list_tools()) == 0 +class TestLoadMCPAdapter: + """Tests for MCP adapter loading helper.""" + + @pytest.mark.asyncio + async def test_streamable_http_forwards_supported_kwargs_only(self) -> None: + """streamable_http transport forwards only supported kwargs.""" + logger = logging.getLogger("test") + + mock_adapter = AsyncMock() + mock_adapter._refresh_tools = AsyncMock() + + with patch( + "py_code_mode.tools.adapters.mcp.MCPAdapter.connect_streamable_http", + autospec=True, + ) as connect: + connect.return_value = mock_adapter + + await _load_mcp_adapter( + { + "name": "mythic", + "type": "mcp", + "transport": "streamable_http", + "url": "http://localhost:3333/mcp", + "headers": {"Authorization": "Bearer token"}, + "timeout": 12, + "unused_option": 999, + }, + logger, + ) + + call_kwargs = connect.call_args.kwargs + assert call_kwargs == { + "url": "http://localhost:3333/mcp", + "headers": {"Authorization": "Bearer token"}, + "timeout": 12.0, + "namespace": "mythic", + } + + # TestToolRegistryFromDir removed - use CLIAdapter(tools_path=...) for loading tools From d2ef88b3173d01ccb1f15d65b2932a67df04a0db Mon Sep 17 00:00:00 2001 From: Noah Baertsch Date: Fri, 27 Feb 2026 18:56:48 -0500 Subject: [PATCH 2/3] 30s default timeout --- src/py_code_mode/tools/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py_code_mode/tools/registry.py b/src/py_code_mode/tools/registry.py index a27ff48..0331371 100644 --- a/src/py_code_mode/tools/registry.py +++ b/src/py_code_mode/tools/registry.py @@ -57,7 +57,7 @@ async def _load_mcp_adapter( adapter = await MCPAdapter.connect_streamable_http( url=mcp_config["url"], headers=mcp_config.get("headers"), - timeout=float(mcp_config.get("timeout", 5.0)), + timeout=float(mcp_config.get("timeout", 30.0)), namespace=tool_name, ) else: From 0e3d660e9de14e79221f62643edb1a4672e05818 Mon Sep 17 00:00:00 2001 From: actae0n <19864268+xpcmdshell@users.noreply.github.com> Date: Fri, 27 Feb 2026 22:50:06 -0800 Subject: [PATCH 3/3] Replace hand-rolled aiohttp mock with real FastMCP server in E2E test The previous fixture only handled POST with plain JSON responses, missing SSE framing, GET notification stream, and session management. Replace with a real FastMCP server running over uvicorn for proper streamable HTTP coverage. --- tests/test_mcp_server.py | 92 ++++++++-------------------------------- 1 file changed, 17 insertions(+), 75 deletions(-) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 573da2c..8c8ca77 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -11,12 +11,14 @@ - Negative tests (error handling) """ +import asyncio import json from pathlib import Path import pytest -from aiohttp import web +import uvicorn from mcp.client.stdio import StdioServerParameters +from mcp.server.fastmcp import FastMCP # ============================================================================= # Test Fixtures @@ -109,87 +111,27 @@ async def run(url: str) -> str: @pytest.fixture async def streamable_http_mcp_url(unused_tcp_port: int) -> str: - """Start a minimal local Streamable HTTP MCP server and return its URL.""" - - async def handle_mcp_post(request: web.Request) -> web.Response: - payload = await request.json() - method = payload.get("method") - request_id = payload.get("id") - - if method == "initialize": - return web.json_response( - { - "jsonrpc": "2.0", - "id": request_id, - "result": { - "protocolVersion": "2025-06-18", - "capabilities": {"tools": {}}, - "serverInfo": {"name": "test-streamable-http", "version": "0.1.0"}, - }, - } - ) - - if method == "notifications/initialized": - return web.Response(status=202) - - if method == "tools/list": - return web.json_response( - { - "jsonrpc": "2.0", - "id": request_id, - "result": { - "tools": [ - { - "name": "hello", - "description": "Return a greeting", - "inputSchema": { - "type": "object", - "properties": {"name": {"type": "string"}}, - "required": ["name"], - }, - } - ] - }, - } - ) - - if method == "tools/call": - arguments = (payload.get("params") or {}).get("arguments") or {} - name = arguments.get("name", "world") - return web.json_response( - { - "jsonrpc": "2.0", - "id": request_id, - "result": { - "content": [{"type": "text", "text": f"hello {name}"}], - "isError": False, - }, - } - ) + """Start a real FastMCP server over streamable HTTP and return its URL.""" + mcp = FastMCP("test-streamable-http") - if request_id is None: - return web.Response(status=202) - - return web.json_response( - { - "jsonrpc": "2.0", - "id": request_id, - "error": {"code": -32601, "message": f"Unknown method: {method}"}, - } - ) + @mcp.tool() + def hello(name: str) -> str: + """Return a greeting.""" + return f"hello {name}" - app = web.Application() - app.router.add_post("/mcp", handle_mcp_post) + app = mcp.streamable_http_app() + config = uvicorn.Config(app, host="127.0.0.1", port=unused_tcp_port, log_level="warning") + server = uvicorn.Server(config) - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port) - await site.start() + task = asyncio.create_task(server.serve()) + while not server.started: + await asyncio.sleep(0.5) try: yield f"http://127.0.0.1:{unused_tcp_port}/mcp" finally: - await runner.cleanup() + server.should_exit = True + await task # =============================================================================