From 2955d849df9bdcdf87022971dd0877be27d6a50c Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 14:08:14 +0800 Subject: [PATCH 01/11] fix: handle ClosedResourceError in StreamableHTTP message router --- src/mcp/server/streamable_http.py | 5 + ...est_1363_race_condition_streamable_http.py | 234 ++++++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 tests/issues/test_1363_race_condition_streamable_http.py diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index b45d742b00..4f5944b8ab 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -875,6 +875,11 @@ async def message_router(): for message. Still processing message as the client might reconnect and replay.""" ) + except anyio.ClosedResourceError: + if self._terminated: + logging.debug("Read stream closed by client") + else: + logging.exception("Unexpected closure of read stream in message router") except Exception: logger.exception("Error in message router") diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py new file mode 100644 index 0000000000..6dbcf6699b --- /dev/null +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -0,0 +1,234 @@ +"""Test for issue #1363 - Race condition in StreamableHTTP transport causes ClosedResourceError. + +This test reproduces the race condition described in issue #1363 where MCP servers +in HTTP Streamable mode experience ClosedResourceError exceptions when requests +fail validation early (e.g., due to incorrect Accept headers). + +The race condition occurs because: +1. Transport setup creates a message_router task +2. Message router enters async for write_stream_reader loop +3. write_stream_reader calls checkpoint() in receive(), yielding control +4. Request handling processes HTTP request +5. If validation fails early, request returns immediately +6. Transport termination closes all streams including write_stream_reader +7. Message router may still be in checkpoint() yield and hasn't returned to check stream state +8. When message router resumes, it encounters a closed stream, raising ClosedResourceError +""" + +import socket +import subprocess +import sys +import time +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager + +import httpx +import pytest +import uvicorn +from starlette.applications import Starlette +from starlette.routing import Mount +from starlette.types import Receive, Scope, Send + +from mcp.server import Server +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.types import Tool + +SERVER_NAME = "test_race_condition_server" + + +def check_server_logs_for_errors(process, test_name: str): + """ + Check server logs for ClosedResourceError and other race condition errors. + + Args: + process: The server process + test_name: Name of the test for better error messages + """ + # Get logs from the process + try: + stdout, stderr = process.communicate(timeout=10) + server_logs = stderr + stdout + except Exception: + server_logs = "" + + # Check for specific race condition errors + errors_found = [] + + if "ClosedResourceError" in server_logs: + errors_found.append("ClosedResourceError") + + if "Error in message router" in server_logs: + errors_found.append("Error in message router") + + if "anyio.ClosedResourceError" in server_logs: + errors_found.append("anyio.ClosedResourceError") + + # Assert no race condition errors occurred + if errors_found: + error_msg = f"Test '{test_name}' found race condition errors: {', '.join(errors_found)}\n" + error_msg += f"Server logs:\n{server_logs}" + pytest.fail(error_msg) + + # If we get here, no race condition errors were found + print(f"✓ Test '{test_name}' passed: No race condition errors detected") + + +@pytest.fixture +def server_port() -> int: + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.fixture +def server_url(server_port: int) -> str: + return f"http://127.0.0.1:{server_port}" + + +class RaceConditionTestServer(Server): + def __init__(self): + super().__init__(SERVER_NAME) + + async def on_list_tools(self) -> list[Tool]: + return [] + + +def run_server_with_logging(port: int): + """Run the StreamableHTTP server with logging to capture race condition errors.""" + app = RaceConditionTestServer() + + # Create session manager + session_manager = StreamableHTTPSessionManager( + app=app, + json_response=False, + stateless=True, # Use stateless mode to trigger the race condition + ) + + # Create the ASGI handler + async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None: + await session_manager.handle_request(scope, receive, send) + + # Create Starlette app with lifespan + @asynccontextmanager + async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: + async with session_manager.run(): + yield + + routes = [ + Mount("/", app=handle_streamable_http), + ] + + starlette_app = Starlette(routes=routes, lifespan=lifespan) + uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug") + + +def start_server_process(port: int): + """Start server in a separate process.""" + # Create a temporary script to run the server + import os + import tempfile + + script_content = f""" +import sys +sys.path.insert(0, '{os.getcwd()}') +from tests.issues.test_1363_race_condition_streamable_http import run_server_with_logging +run_server_with_logging({port}) +""" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(script_content) + script_path = f.name + + process = subprocess.Popen([sys.executable, script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Give server time to start + time.sleep(1) + return process + + +@pytest.mark.anyio +async def test_race_condition_invalid_accept_headers(server_port: int): + """ + Test the race condition with invalid Accept headers. + + This test reproduces the exact scenario described in issue #1363: + - Send POST request with incorrect Accept headers (missing either application/json or text/event-stream) + - Request fails validation early and returns quickly + - This should trigger the race condition where message_router encounters ClosedResourceError + """ + process = start_server_process(server_port) + + try: + # Test with missing text/event-stream in Accept header + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.post( + f"http://127.0.0.1:{server_port}/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "application/json", # Missing text/event-stream + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable due to missing text/event-stream + assert response.status_code == 406 + + # Test with missing application/json in Accept header + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.post( + f"http://127.0.0.1:{server_port}/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "text/event-stream", # Missing application/json + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable due to missing application/json + assert response.status_code == 406 + + # Test with completely invalid Accept header + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.post( + f"http://127.0.0.1:{server_port}/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "text/plain", # Invalid Accept header + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable + assert response.status_code == 406 + + finally: + process.terminate() + process.wait() + # Check server logs for race condition errors + check_server_logs_for_errors(process, "test_race_condition_invalid_accept_headers") + + +@pytest.mark.anyio +async def test_race_condition_invalid_content_type(server_port: int): + """ + Test the race condition with invalid Content-Type headers. + + This test reproduces the race condition scenario with Content-Type validation failure. + """ + process = start_server_process(server_port) + + try: + # Test with invalid Content-Type + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.post( + f"http://127.0.0.1:{server_port}/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "text/plain", # Invalid Content-Type + }, + ) + assert response.status_code == 400 + + finally: + process.terminate() + process.wait() + # Check server logs for race condition errors + check_server_logs_for_errors(process, "test_race_condition_invalid_content_type") From f8072c90f0932d6de3b5904d2853671a69e7d3b0 Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 14:14:41 +0800 Subject: [PATCH 02/11] Add type annotations to race condition test functions --- tests/issues/test_1363_race_condition_streamable_http.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index 6dbcf6699b..7146a83859 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -36,7 +36,7 @@ SERVER_NAME = "test_race_condition_server" -def check_server_logs_for_errors(process, test_name: str): +def check_server_logs_for_errors(process: subprocess.Popen[str], test_name: str) -> None: """ Check server logs for ClosedResourceError and other race condition errors. @@ -52,7 +52,7 @@ def check_server_logs_for_errors(process, test_name: str): server_logs = "" # Check for specific race condition errors - errors_found = [] + errors_found: list[str] = [] if "ClosedResourceError" in server_logs: errors_found.append("ClosedResourceError") @@ -93,7 +93,7 @@ async def on_list_tools(self) -> list[Tool]: return [] -def run_server_with_logging(port: int): +def run_server_with_logging(port: int) -> None: """Run the StreamableHTTP server with logging to capture race condition errors.""" app = RaceConditionTestServer() @@ -122,7 +122,7 @@ async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug") -def start_server_process(port: int): +def start_server_process(port: int) -> subprocess.Popen[str]: """Start server in a separate process.""" # Create a temporary script to run the server import os From 6a7ecd5e5eb49a8347e12390ead610cf8b93236d Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 14:24:27 +0800 Subject: [PATCH 03/11] fix: add connection waiting mechanism for Windows compatibility --- ...est_1363_race_condition_streamable_http.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index 7146a83859..ddf288faf2 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -141,8 +141,23 @@ def start_server_process(port: int) -> subprocess.Popen[str]: process = subprocess.Popen([sys.executable, script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - # Give server time to start - time.sleep(1) + # Wait for server to be running with connection testing (like other tests) + max_attempts = 20 + attempt = 0 + while attempt < max_attempts: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect(("127.0.0.1", port)) + break + except ConnectionRefusedError: + time.sleep(0.1) + attempt += 1 + else: + # If server failed to start, terminate the process and raise an error + process.terminate() + process.wait() + raise RuntimeError(f"Server failed to start after {max_attempts} attempts") + return process From 3f3c8ae5560c2be74f391371b0af660a81ed4fee Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 14:54:14 +0800 Subject: [PATCH 04/11] fix: escape path in test script content using repr() --- tests/issues/test_1363_race_condition_streamable_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index ddf288faf2..e9e7e603d3 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -130,7 +130,7 @@ def start_server_process(port: int) -> subprocess.Popen[str]: script_content = f""" import sys -sys.path.insert(0, '{os.getcwd()}') +sys.path.insert(0, {repr(os.getcwd())}) from tests.issues.test_1363_race_condition_streamable_http import run_server_with_logging run_server_with_logging({port}) """ From c76b28d3ab0735adb65514e699e5e3fc2971adce Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 15:04:08 +0800 Subject: [PATCH 05/11] refactor(test_1363): eliminate temporary files by using python -c --- .../test_1363_race_condition_streamable_http.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index e9e7e603d3..088db0acac 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -126,20 +126,17 @@ def start_server_process(port: int) -> subprocess.Popen[str]: """Start server in a separate process.""" # Create a temporary script to run the server import os - import tempfile - script_content = f""" + server_code = f""" import sys sys.path.insert(0, {repr(os.getcwd())}) from tests.issues.test_1363_race_condition_streamable_http import run_server_with_logging run_server_with_logging({port}) """ - with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: - f.write(script_content) - script_path = f.name - - process = subprocess.Popen([sys.executable, script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + process = subprocess.Popen( + [sys.executable, "-c", server_code], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) # Wait for server to be running with connection testing (like other tests) max_attempts = 20 From 1b8b779312e8b5807d5eb9f7a2625dcb13057c5b Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Sun, 21 Sep 2025 17:14:42 +0800 Subject: [PATCH 06/11] fix: use logger instance instead of logging module in streamable_http --- src/mcp/server/streamable_http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 4f5944b8ab..8d3b4104c7 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -870,16 +870,16 @@ async def message_router(): # Stream might be closed, remove from registry self._request_streams.pop(request_stream_id, None) else: - logging.debug( + logger.debug( f"""Request stream {request_stream_id} not found for message. Still processing message as the client might reconnect and replay.""" ) except anyio.ClosedResourceError: if self._terminated: - logging.debug("Read stream closed by client") + logger.debug("Read stream closed by client") else: - logging.exception("Unexpected closure of read stream in message router") + logger.exception("Unexpected closure of read stream in message router") except Exception: logger.exception("Error in message router") From 2c02f06ae8c80f9a579bd71a0de544e62b8b164c Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Tue, 23 Sep 2025 13:48:02 +0800 Subject: [PATCH 07/11] refactor: inline server code in test_1363 to avoid external imports --- ...est_1363_race_condition_streamable_http.py | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index 088db0acac..09cbd8db8c 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -19,19 +19,9 @@ import subprocess import sys import time -from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager import httpx import pytest -import uvicorn -from starlette.applications import Starlette -from starlette.routing import Mount -from starlette.types import Receive, Scope, Send - -from mcp.server import Server -from mcp.server.streamable_http_manager import StreamableHTTPSessionManager -from mcp.types import Tool SERVER_NAME = "test_race_condition_server" @@ -85,6 +75,32 @@ def server_url(server_port: int) -> str: return f"http://127.0.0.1:{server_port}" +def start_server_process(port: int) -> subprocess.Popen[str]: + """Start server in a separate process.""" + # Create a temporary script to run the server + import os + + server_code = f""" +import sys +import os +sys.path.insert(0, {repr(os.getcwd())}) + +import socket +import time +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager + +import uvicorn +from starlette.applications import Starlette +from starlette.routing import Mount +from starlette.types import Receive, Scope, Send + +from mcp.server import Server +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.types import Tool + +SERVER_NAME = "test_race_condition_server" + class RaceConditionTestServer(Server): def __init__(self): super().__init__(SERVER_NAME) @@ -92,9 +108,7 @@ def __init__(self): async def on_list_tools(self) -> list[Tool]: return [] - def run_server_with_logging(port: int) -> None: - """Run the StreamableHTTP server with logging to capture race condition errors.""" app = RaceConditionTestServer() # Create session manager @@ -121,16 +135,6 @@ async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: starlette_app = Starlette(routes=routes, lifespan=lifespan) uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug") - -def start_server_process(port: int) -> subprocess.Popen[str]: - """Start server in a separate process.""" - # Create a temporary script to run the server - import os - - server_code = f""" -import sys -sys.path.insert(0, {repr(os.getcwd())}) -from tests.issues.test_1363_race_condition_streamable_http import run_server_with_logging run_server_with_logging({port}) """ From a1632e1d9b4fba29ec23f262f33ee46aab5646ee Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Wed, 29 Oct 2025 17:08:23 +0800 Subject: [PATCH 08/11] test(test_1363): add race condition test for json_response=True scenario --- ...est_1363_race_condition_streamable_http.py | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index 09cbd8db8c..677f5a63bd 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -75,7 +75,7 @@ def server_url(server_port: int) -> str: return f"http://127.0.0.1:{server_port}" -def start_server_process(port: int) -> subprocess.Popen[str]: +def start_server_process(port: int, json_response: bool | None = None) -> subprocess.Popen[str]: """Start server in a separate process.""" # Create a temporary script to run the server import os @@ -114,7 +114,7 @@ def run_server_with_logging(port: int) -> None: # Create session manager session_manager = StreamableHTTPSessionManager( app=app, - json_response=False, + json_response={json_response}, stateless=True, # Use stateless mode to trigger the race condition ) @@ -248,3 +248,30 @@ async def test_race_condition_invalid_content_type(server_port: int): process.wait() # Check server logs for race condition errors check_server_logs_for_errors(process, "test_race_condition_invalid_content_type") + + +@pytest.mark.anyio +async def test_race_condition_message_router_async_for(server_port: int): + """ + Uses json_response=True to trigger the `if self.is_json_response_enabled` branch, + which reproduces the ClosedResourceError when message_router is suspended + in async for loop while transport cleanup closes streams concurrently. + """ + process = start_server_process(server_port, json_response=True) + + try: + # use standard mcp client to send requests + from mcp.client.session import ClientSession + from mcp.client.streamable_http import streamablehttp_client + + for _ in range(1): + async with streamablehttp_client(f"http://127.0.0.1:{server_port}") as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + + finally: + process.terminate() + process.wait() + # Check server logs for race condition errors in message router + check_server_logs_for_errors(process, "test_race_condition_message_router_async_for") \ No newline at end of file From 02ae5e62a5d2764518a3f83e8ae30d70d28b96b9 Mon Sep 17 00:00:00 2001 From: zhangn661 Date: Wed, 29 Oct 2025 21:34:33 +0800 Subject: [PATCH 09/11] chore(test_1363): remove extra blank line and add newline --- tests/issues/test_1363_race_condition_streamable_http.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index 677f5a63bd..cad3073b4c 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -269,9 +269,8 @@ async def test_race_condition_message_router_async_for(server_port: int): async with ClientSession(read_stream, write_stream) as session: await session.initialize() - finally: process.terminate() process.wait() # Check server logs for race condition errors in message router - check_server_logs_for_errors(process, "test_race_condition_message_router_async_for") \ No newline at end of file + check_server_logs_for_errors(process, "test_race_condition_message_router_async_for") From 2f9ac440327ef0ab604bd567bd3a670fd69ce124 Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Thu, 4 Dec 2025 17:20:33 +0800 Subject: [PATCH 10/11] refactor(test_1363): use in-process ASGI testing instead of subprocess --- ...est_1363_race_condition_streamable_http.py | 372 +++++++++--------- 1 file changed, 191 insertions(+), 181 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index cad3073b4c..e8a2875320 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -15,85 +15,16 @@ 8. When message router resumes, it encounters a closed stream, raising ClosedResourceError """ -import socket -import subprocess -import sys -import time - -import httpx -import pytest - -SERVER_NAME = "test_race_condition_server" - - -def check_server_logs_for_errors(process: subprocess.Popen[str], test_name: str) -> None: - """ - Check server logs for ClosedResourceError and other race condition errors. - - Args: - process: The server process - test_name: Name of the test for better error messages - """ - # Get logs from the process - try: - stdout, stderr = process.communicate(timeout=10) - server_logs = stderr + stdout - except Exception: - server_logs = "" - - # Check for specific race condition errors - errors_found: list[str] = [] - - if "ClosedResourceError" in server_logs: - errors_found.append("ClosedResourceError") - - if "Error in message router" in server_logs: - errors_found.append("Error in message router") - - if "anyio.ClosedResourceError" in server_logs: - errors_found.append("anyio.ClosedResourceError") - - # Assert no race condition errors occurred - if errors_found: - error_msg = f"Test '{test_name}' found race condition errors: {', '.join(errors_found)}\n" - error_msg += f"Server logs:\n{server_logs}" - pytest.fail(error_msg) - - # If we get here, no race condition errors were found - print(f"✓ Test '{test_name}' passed: No race condition errors detected") - - -@pytest.fixture -def server_port() -> int: - with socket.socket() as s: - s.bind(("127.0.0.1", 0)) - return s.getsockname()[1] - - -@pytest.fixture -def server_url(server_port: int) -> str: - return f"http://127.0.0.1:{server_port}" - - -def start_server_process(port: int, json_response: bool | None = None) -> subprocess.Popen[str]: - """Start server in a separate process.""" - # Create a temporary script to run the server - import os - - server_code = f""" -import sys -import os -sys.path.insert(0, {repr(os.getcwd())}) - -import socket -import time +import logging +import threading from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -import uvicorn +import anyio +import httpx +import pytest from starlette.applications import Starlette from starlette.routing import Mount -from starlette.types import Receive, Scope, Send from mcp.server import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager @@ -101,6 +32,7 @@ def start_server_process(port: int, json_response: bool | None = None) -> subpro SERVER_NAME = "test_race_condition_server" + class RaceConditionTestServer(Server): def __init__(self): super().__init__(SERVER_NAME) @@ -108,20 +40,18 @@ def __init__(self): async def on_list_tools(self) -> list[Tool]: return [] -def run_server_with_logging(port: int) -> None: + +def create_app(json_response: bool = False) -> Starlette: + """Create a Starlette application for testing.""" app = RaceConditionTestServer() # Create session manager session_manager = StreamableHTTPSessionManager( app=app, - json_response={json_response}, + json_response=json_response, stateless=True, # Use stateless mode to trigger the race condition ) - # Create the ASGI handler - async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None: - await session_manager.handle_request(scope, receive, send) - # Create Starlette app with lifespan @asynccontextmanager async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: @@ -129,41 +59,76 @@ async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: yield routes = [ - Mount("/", app=handle_streamable_http), + Mount("/", app=session_manager.handle_request), ] - starlette_app = Starlette(routes=routes, lifespan=lifespan) - uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug") + return Starlette(routes=routes, lifespan=lifespan) -run_server_with_logging({port}) -""" - process = subprocess.Popen( - [sys.executable, "-c", server_code], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True - ) +class ServerThread(threading.Thread): + """Thread that runs the ASGI application lifespan in a separate event loop.""" + + def __init__(self, app: Starlette): + super().__init__(daemon=True) + self.app = app + self._stop_event = threading.Event() + + def run(self) -> None: + """Run the lifespan in a new event loop.""" + + # Create a new event loop for this thread + async def run_lifespan(): + # Use the lifespan context if it exists + lifespan_context = getattr(self.app.router, "lifespan_context", None) + if lifespan_context is not None: + async with lifespan_context(self.app): + # Wait until stop is requested + while not self._stop_event.is_set(): + await anyio.sleep(0.1) + else: + # If no lifespan, just wait + while not self._stop_event.is_set(): + await anyio.sleep(0.1) + + anyio.run(run_lifespan) - # Wait for server to be running with connection testing (like other tests) - max_attempts = 20 - attempt = 0 - while attempt < max_attempts: - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect(("127.0.0.1", port)) - break - except ConnectionRefusedError: - time.sleep(0.1) - attempt += 1 - else: - # If server failed to start, terminate the process and raise an error - process.terminate() - process.wait() - raise RuntimeError(f"Server failed to start after {max_attempts} attempts") - - return process + def stop(self) -> None: + """Signal the thread to stop.""" + self._stop_event.set() + + +def check_logs_for_race_condition_errors(caplog: pytest.LogCaptureFixture, test_name: str) -> None: + """ + Check logs for ClosedResourceError and other race condition errors. + + Args: + caplog: pytest log capture fixture + test_name: Name of the test for better error messages + """ + # Check for specific race condition errors in logs + errors_found: list[str] = [] + + for record in caplog.records: + message = record.getMessage() + if "ClosedResourceError" in message: + errors_found.append("ClosedResourceError") + if "Error in message router" in message: + errors_found.append("Error in message router") + if "anyio.ClosedResourceError" in message: + errors_found.append("anyio.ClosedResourceError") + + # Assert no race condition errors occurred + if errors_found: + error_msg = f"Test '{test_name}' found race condition errors in logs: {', '.join(set(errors_found))}\n" + error_msg += "Log records:\n" + for record in caplog.records: + if any(err in record.getMessage() for err in ["ClosedResourceError", "Error in message router"]): + error_msg += f" {record.levelname}: {record.getMessage()}\n" + pytest.fail(error_msg) @pytest.mark.anyio -async def test_race_condition_invalid_accept_headers(server_port: int): +async def test_race_condition_invalid_accept_headers(caplog: pytest.LogCaptureFixture): """ Test the race condition with invalid Accept headers. @@ -172,105 +137,150 @@ async def test_race_condition_invalid_accept_headers(server_port: int): - Request fails validation early and returns quickly - This should trigger the race condition where message_router encounters ClosedResourceError """ - process = start_server_process(server_port) + app = create_app() + server_thread = ServerThread(app) + server_thread.start() try: - # Test with missing text/event-stream in Accept header - async with httpx.AsyncClient(timeout=5.0) as client: - response = await client.post( - f"http://127.0.0.1:{server_port}/", - json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, - headers={ - "Accept": "application/json", # Missing text/event-stream - "Content-Type": "application/json", - }, - ) - # Should get 406 Not Acceptable due to missing text/event-stream - assert response.status_code == 406 - - # Test with missing application/json in Accept header - async with httpx.AsyncClient(timeout=5.0) as client: - response = await client.post( - f"http://127.0.0.1:{server_port}/", - json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, - headers={ - "Accept": "text/event-stream", # Missing application/json - "Content-Type": "application/json", - }, - ) - # Should get 406 Not Acceptable due to missing application/json - assert response.status_code == 406 - - # Test with completely invalid Accept header - async with httpx.AsyncClient(timeout=5.0) as client: - response = await client.post( - f"http://127.0.0.1:{server_port}/", - json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, - headers={ - "Accept": "text/plain", # Invalid Accept header - "Content-Type": "application/json", - }, - ) - # Should get 406 Not Acceptable - assert response.status_code == 406 + # Give the server thread a moment to start + await anyio.sleep(0.1) + + # Suppress WARNING logs (expected validation errors) and capture ERROR logs + with caplog.at_level(logging.ERROR): + # Test with missing text/event-stream in Accept header + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 + ) as client: + response = await client.post( + "/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "application/json", # Missing text/event-stream + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable due to missing text/event-stream + assert response.status_code == 406 + + # Test with missing application/json in Accept header + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 + ) as client: + response = await client.post( + "/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "text/event-stream", # Missing application/json + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable due to missing application/json + assert response.status_code == 406 + + # Test with completely invalid Accept header + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 + ) as client: + response = await client.post( + "/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "text/plain", # Invalid Accept header + "Content-Type": "application/json", + }, + ) + # Should get 406 Not Acceptable + assert response.status_code == 406 + + # Give background tasks time to complete + await anyio.sleep(0.2) finally: - process.terminate() - process.wait() - # Check server logs for race condition errors - check_server_logs_for_errors(process, "test_race_condition_invalid_accept_headers") + server_thread.stop() + server_thread.join(timeout=5.0) + # Check logs for race condition errors + check_logs_for_race_condition_errors(caplog, "test_race_condition_invalid_accept_headers") @pytest.mark.anyio -async def test_race_condition_invalid_content_type(server_port: int): +async def test_race_condition_invalid_content_type(caplog: pytest.LogCaptureFixture): """ Test the race condition with invalid Content-Type headers. This test reproduces the race condition scenario with Content-Type validation failure. """ - process = start_server_process(server_port) + app = create_app() + server_thread = ServerThread(app) + server_thread.start() try: - # Test with invalid Content-Type - async with httpx.AsyncClient(timeout=5.0) as client: - response = await client.post( - f"http://127.0.0.1:{server_port}/", - json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, - headers={ - "Accept": "application/json, text/event-stream", - "Content-Type": "text/plain", # Invalid Content-Type - }, - ) - assert response.status_code == 400 + # Give the server thread a moment to start + await anyio.sleep(0.1) + + # Suppress WARNING logs (expected validation errors) and capture ERROR logs + with caplog.at_level(logging.ERROR): + # Test with invalid Content-Type + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 + ) as client: + response = await client.post( + "/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "text/plain", # Invalid Content-Type + }, + ) + assert response.status_code == 400 + + # Give background tasks time to complete + await anyio.sleep(0.2) finally: - process.terminate() - process.wait() - # Check server logs for race condition errors - check_server_logs_for_errors(process, "test_race_condition_invalid_content_type") + server_thread.stop() + server_thread.join(timeout=5.0) + # Check logs for race condition errors + check_logs_for_race_condition_errors(caplog, "test_race_condition_invalid_content_type") @pytest.mark.anyio -async def test_race_condition_message_router_async_for(server_port: int): +async def test_race_condition_message_router_async_for(caplog: pytest.LogCaptureFixture): """ Uses json_response=True to trigger the `if self.is_json_response_enabled` branch, which reproduces the ClosedResourceError when message_router is suspended in async for loop while transport cleanup closes streams concurrently. """ - process = start_server_process(server_port, json_response=True) + app = create_app(json_response=True) + server_thread = ServerThread(app) + server_thread.start() try: - # use standard mcp client to send requests - from mcp.client.session import ClientSession - from mcp.client.streamable_http import streamablehttp_client - - for _ in range(1): - async with streamablehttp_client(f"http://127.0.0.1:{server_port}") as (read_stream, write_stream, _): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() + # Give the server thread a moment to start + await anyio.sleep(0.1) + + # Suppress WARNING logs (expected validation errors) and capture ERROR logs + with caplog.at_level(logging.ERROR): + # Use httpx.ASGITransport to test the ASGI app directly + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 + ) as client: + # Send a valid initialize request + response = await client.post( + "/", + json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + }, + ) + # Should get a successful response + assert response.status_code in (200, 201) + + # Give background tasks time to complete + await anyio.sleep(0.2) finally: - process.terminate() - process.wait() - # Check server logs for race condition errors in message router - check_server_logs_for_errors(process, "test_race_condition_message_router_async_for") + server_thread.stop() + server_thread.join(timeout=5.0) + # Check logs for race condition errors in message router + check_logs_for_race_condition_errors(caplog, "test_race_condition_message_router_async_for") From cfa36f7c2458d9343e3c959da4f742593faccf09 Mon Sep 17 00:00:00 2001 From: Edison-A-N Date: Thu, 4 Dec 2025 17:31:47 +0800 Subject: [PATCH 11/11] test(test_1363): remove unused code to improve coverage --- ...est_1363_race_condition_streamable_http.py | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/tests/issues/test_1363_race_condition_streamable_http.py b/tests/issues/test_1363_race_condition_streamable_http.py index e8a2875320..49242d6d8b 100644 --- a/tests/issues/test_1363_race_condition_streamable_http.py +++ b/tests/issues/test_1363_race_condition_streamable_http.py @@ -28,7 +28,6 @@ from mcp.server import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager -from mcp.types import Tool SERVER_NAME = "test_race_condition_server" @@ -37,9 +36,6 @@ class RaceConditionTestServer(Server): def __init__(self): super().__init__(SERVER_NAME) - async def on_list_tools(self) -> list[Tool]: - return [] - def create_app(json_response: bool = False) -> Starlette: """Create a Starlette application for testing.""" @@ -78,15 +74,11 @@ def run(self) -> None: # Create a new event loop for this thread async def run_lifespan(): - # Use the lifespan context if it exists + # Use the lifespan context (always present in our tests) lifespan_context = getattr(self.app.router, "lifespan_context", None) - if lifespan_context is not None: - async with lifespan_context(self.app): - # Wait until stop is requested - while not self._stop_event.is_set(): - await anyio.sleep(0.1) - else: - # If no lifespan, just wait + assert lifespan_context is not None # Tests always create apps with lifespan + async with lifespan_context(self.app): + # Wait until stop is requested while not self._stop_event.is_set(): await anyio.sleep(0.1) @@ -108,7 +100,7 @@ def check_logs_for_race_condition_errors(caplog: pytest.LogCaptureFixture, test_ # Check for specific race condition errors in logs errors_found: list[str] = [] - for record in caplog.records: + for record in caplog.records: # pragma: no cover message = record.getMessage() if "ClosedResourceError" in message: errors_found.append("ClosedResourceError") @@ -118,7 +110,7 @@ def check_logs_for_race_condition_errors(caplog: pytest.LogCaptureFixture, test_ errors_found.append("anyio.ClosedResourceError") # Assert no race condition errors occurred - if errors_found: + if errors_found: # pragma: no cover error_msg = f"Test '{test_name}' found race condition errors in logs: {', '.join(set(errors_found))}\n" error_msg += "Log records:\n" for record in caplog.records: