-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Fix CI failures for proxy tests #1763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0357258
e1cff6c
f740e75
719c724
75d8114
e5bdd4c
a2deae1
5de4410
6c4f1bc
2e2b8e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,144 @@ | ||||||||||||||||
| """ | ||||||||||||||||
| MCP Proxy Module | ||||||||||||||||
|
|
||||||||||||||||
| This module provides utilities for proxying messages between two MCP transports, | ||||||||||||||||
| enabling bidirectional message forwarding with proper error handling and cleanup. | ||||||||||||||||
| """ | ||||||||||||||||
|
Comment on lines
+1
to
+6
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
|
|
||||||||||||||||
| import logging | ||||||||||||||||
| from collections.abc import AsyncGenerator, Awaitable, Callable | ||||||||||||||||
| from contextlib import asynccontextmanager | ||||||||||||||||
|
|
||||||||||||||||
| import anyio | ||||||||||||||||
| from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||||||||||||||||
|
|
||||||||||||||||
| from mcp.shared.message import SessionMessage | ||||||||||||||||
|
|
||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||
|
|
||||||||||||||||
| MessageStream = tuple[ | ||||||||||||||||
| MemoryObjectReceiveStream[SessionMessage | Exception], | ||||||||||||||||
| MemoryObjectSendStream[SessionMessage], | ||||||||||||||||
| ] | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def _handle_error( | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move the private methods below the main function of this file? |
||||||||||||||||
| error: Exception, | ||||||||||||||||
| onerror: Callable[[Exception], None | Awaitable[None]] | None, | ||||||||||||||||
| ) -> None: | ||||||||||||||||
| """Handle an error by calling the error callback if provided.""" | ||||||||||||||||
| if onerror: | ||||||||||||||||
| try: | ||||||||||||||||
| result = onerror(error) | ||||||||||||||||
| if isinstance(result, Awaitable): | ||||||||||||||||
| await result | ||||||||||||||||
| except Exception as callback_error: # pragma: no cover | ||||||||||||||||
| logger.exception("Error in onerror callback", exc_info=callback_error) | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment for the others in this file. |
||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def _forward_message( | ||||||||||||||||
| message: SessionMessage | Exception, | ||||||||||||||||
| write_stream: MemoryObjectSendStream[SessionMessage], | ||||||||||||||||
| onerror: Callable[[Exception], None | Awaitable[None]] | None, | ||||||||||||||||
| source: str, | ||||||||||||||||
| ) -> None: | ||||||||||||||||
| """Forward a single message, handling exceptions appropriately.""" | ||||||||||||||||
| if isinstance(message, SessionMessage): | ||||||||||||||||
| await write_stream.send(message) | ||||||||||||||||
| else: | ||||||||||||||||
| # message is Exception (type narrowing) | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
| logger.debug(f"Exception received from {source}: {message}") | ||||||||||||||||
| await _handle_error(message, onerror) | ||||||||||||||||
| # Exceptions are not forwarded as messages (write streams only accept SessionMessage) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| async def _forward_loop( | ||||||||||||||||
| read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], | ||||||||||||||||
| write_stream: MemoryObjectSendStream[SessionMessage], | ||||||||||||||||
| onerror: Callable[[Exception], None | Awaitable[None]] | None, | ||||||||||||||||
| source: str, | ||||||||||||||||
| ) -> None: | ||||||||||||||||
| """Forward messages from read_stream to write_stream.""" | ||||||||||||||||
| try: | ||||||||||||||||
| async with read_stream: | ||||||||||||||||
| async for message in read_stream: | ||||||||||||||||
| try: | ||||||||||||||||
| await _forward_message(message, write_stream, onerror, source) | ||||||||||||||||
| except anyio.ClosedResourceError: | ||||||||||||||||
| logger.debug(f"{source} write stream closed") | ||||||||||||||||
| break | ||||||||||||||||
| except Exception as exc: # pragma: no cover | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a test to hit this rather than adding no cover? General rule after #1553 the idea is to not allow any new |
||||||||||||||||
| # This covers non-ClosedResourceError exceptions during message forwarding | ||||||||||||||||
| # (e.g., from custom stream implementations) | ||||||||||||||||
| logger.exception(f"Error forwarding message from {source}", exc_info=exc) | ||||||||||||||||
| await _handle_error(exc, onerror) | ||||||||||||||||
| except anyio.ClosedResourceError: | ||||||||||||||||
| logger.debug(f"{source} read stream closed") | ||||||||||||||||
| except Exception as exc: # pragma: no cover | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add tests for this as per https://github.com/modelcontextprotocol/python-sdk/pull/1763/files#r2607144408 |
||||||||||||||||
| # This covers exceptions during stream iteration setup | ||||||||||||||||
| # (e.g., from custom stream implementations) | ||||||||||||||||
| logger.exception(f"Error in forward loop from {source}", exc_info=exc) | ||||||||||||||||
| await _handle_error(exc, onerror) | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be a bit cleaner if this Also, can we drop the |
||||||||||||||||
| finally: | ||||||||||||||||
| # Close write stream when read stream closes | ||||||||||||||||
| try: | ||||||||||||||||
| await write_stream.aclose() | ||||||||||||||||
| except Exception: # pragma: no cover | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add tests for this as per https://github.com/modelcontextprotocol/python-sdk/pull/1763/files#r2607144408
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exception is this one exactly? This repo has too many |
||||||||||||||||
| # Stream might already be closed | ||||||||||||||||
| pass | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| @asynccontextmanager | ||||||||||||||||
| async def mcp_proxy( | ||||||||||||||||
| transport_to_client: MessageStream, | ||||||||||||||||
| transport_to_server: MessageStream, | ||||||||||||||||
| onerror: Callable[[Exception], None | Awaitable[None]] | None = None, | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
| ) -> AsyncGenerator[None, None]: | ||||||||||||||||
| """ | ||||||||||||||||
| Proxy messages bidirectionally between two MCP transports. | ||||||||||||||||
|
Comment on lines
+97
to
+98
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||
|
|
||||||||||||||||
| This function sets up bidirectional message forwarding between two transport pairs. | ||||||||||||||||
| When one transport closes, the other is also closed. Errors are forwarded to the | ||||||||||||||||
| error callback if provided. | ||||||||||||||||
|
|
||||||||||||||||
| Args: | ||||||||||||||||
| transport_to_client: A tuple of (read_stream, write_stream) for the client-facing transport. | ||||||||||||||||
| transport_to_server: A tuple of (read_stream, write_stream) for the server-facing transport. | ||||||||||||||||
| onerror: Optional callback function for handling errors. Can be sync or async. | ||||||||||||||||
| Called with the Exception object when an error occurs. | ||||||||||||||||
|
|
||||||||||||||||
| Example: | ||||||||||||||||
| ```python | ||||||||||||||||
| async with mcp_proxy( | ||||||||||||||||
| transport_to_client=(client_read, client_write), | ||||||||||||||||
| transport_to_server=(server_read, server_write), | ||||||||||||||||
| onerror=lambda e: logger.error(f"Proxy error: {e}"), | ||||||||||||||||
| ): | ||||||||||||||||
| # Proxy is active, forwarding messages bidirectionally | ||||||||||||||||
| await some_operation() | ||||||||||||||||
| # Both transports are closed when exiting the context | ||||||||||||||||
| ``` | ||||||||||||||||
|
|
||||||||||||||||
| Yields: | ||||||||||||||||
| None: The context manager yields control while the proxy is active. | ||||||||||||||||
| """ | ||||||||||||||||
| client_read, client_write = transport_to_client | ||||||||||||||||
| server_read, server_write = transport_to_server | ||||||||||||||||
|
|
||||||||||||||||
| async with anyio.create_task_group() as tg: | ||||||||||||||||
| tg.start_soon(_forward_loop, client_read, server_write, onerror, "client") | ||||||||||||||||
| tg.start_soon(_forward_loop, server_read, client_write, onerror, "server") | ||||||||||||||||
| try: | ||||||||||||||||
| yield | ||||||||||||||||
| finally: | ||||||||||||||||
| # Cancel the task group to stop forwarding | ||||||||||||||||
| tg.cancel_scope.cancel() | ||||||||||||||||
| # Close both write streams | ||||||||||||||||
| try: | ||||||||||||||||
| await client_write.aclose() | ||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we do |
||||||||||||||||
| except Exception: # pragma: no cover | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||
| pass | ||||||||||||||||
| try: | ||||||||||||||||
| await server_write.aclose() | ||||||||||||||||
| except Exception: # pragma: no cover | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||
| pass | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to
mcp/proxy.pyinstead?