fix: improved better health check and stream URL check on MCP, improved JSON recognition#8982
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe update enhances session management and error handling for MCP client sessions, introducing improved health checks, session recreation on server switch, robust retry logic for tool execution, and more accurate SSE endpoint validation. New internal methods and modifications to existing ones ensure better detection of session issues and more reliable client-server interactions. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant MCPSessionManager
participant MCPClient (Stdio/SSE)
participant Server
User->>MCPSessionManager: get_session(context_id, params, transport_type)
MCPSessionManager->>MCPClient: Check background task and stream health
alt Session healthy
MCPSessionManager->>MCPClient: _validate_session_connectivity()
alt Connectivity OK
MCPSessionManager-->>User: Return session
else Connectivity failed or server switched
MCPSessionManager->>MCPClient: disconnect()
MCPSessionManager->>MCPSessionManager: Create new session
MCPSessionManager-->>User: Return new session
end
else Session unhealthy
MCPSessionManager->>MCPClient: disconnect()
MCPSessionManager->>MCPSessionManager: Create new session
MCPSessionManager-->>User: Return new session
end
User->>MCPClient: run_tool(tool_name, arguments)
MCPClient->>Server: Call tool (with 30s timeout)
alt Success
MCPClient-->>User: Return result
else Connection/Timeout error
MCPClient->>MCPSessionManager: Clear session cache, reset state
MCPClient->>Server: Retry call tool (max 2 attempts)
alt Success
MCPClient-->>User: Return result
else Failure
MCPClient-->>User: Raise error
end
end
Possibly related PRs
Suggested labels
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
src/backend/base/langflow/base/mcp/util.py (1)
776-887: Critical: Extract common retry logic to eliminate code duplication.The
run_toolimplementations in bothMCPStdioClientandMCPSseClientare nearly identical (100+ lines of duplicated code). This violates the DRY principle and makes maintenance difficult.Additionally, the error detection using string matching (e.g.,
"ClosedResourceError" in str(type(e))) is fragile and could break with error message changes.Create a base class or mixin with the common retry logic:
class MCPClientBase: """Base class for MCP clients with common retry logic.""" async def _run_tool_with_retry( self, tool_name: str, arguments: dict[str, Any], get_session_func: Callable, session_context: str, cleanup_func: Callable ) -> Any: """Common retry logic for running tools.""" max_retries = 2 last_error_type = None for attempt in range(max_retries): try: logger.debug(f"Attempting to run tool '{tool_name}' (attempt {attempt + 1}/{max_retries})") session = await get_session_func() result = await asyncio.wait_for( session.call_tool(tool_name, arguments=arguments), timeout=30.0 ) logger.debug(f"Tool '{tool_name}' completed successfully") return result except Exception as e: current_error_type = type(e).__name__ logger.warning(f"Tool '{tool_name}' failed on attempt {attempt + 1}: {current_error_type} - {e}") # Better error detection is_connection_error = self._is_connection_error(e) is_timeout_error = isinstance(e, (asyncio.TimeoutError, TimeoutError)) if last_error_type == current_error_type and attempt > 0: logger.error(f"Repeated {current_error_type} error for tool '{tool_name}', not retrying") break last_error_type = current_error_type if is_connection_error and attempt < max_retries - 1: logger.warning(f"MCP session connection issue for tool '{tool_name}', retrying...") await cleanup_func(session_context) await asyncio.sleep(0.5) continue if is_timeout_error and attempt < max_retries - 1: logger.warning(f"Tool '{tool_name}' timed out, retrying...") await asyncio.sleep(1.0) continue # Handle final error if is_connection_error or is_timeout_error: msg = f"Failed to run tool '{tool_name}' after {attempt + 1} attempts: {e}" logger.error(msg) self._connected = False raise ValueError(msg) from e raise msg = f"Failed to run tool '{tool_name}': Maximum retries exceeded" logger.error(msg) raise ValueError(msg) def _is_connection_error(self, e: Exception) -> bool: """Check if exception is a connection-related error.""" # Standard connection errors if isinstance(e, (ConnectionError, OSError, ValueError)): return True # MCP-specific errors - check type instead of string error_type_name = type(e).__name__ if error_type_name in ("ClosedResourceError", "McpError"): return True # Check error message as fallback error_str = str(e) connection_keywords = [ "Connection closed", "Connection lost", "Transport closed", "Stream closed" ] return any(keyword in error_str for keyword in connection_keywords)Then simplify the client implementations:
class MCPStdioClient(MCPClientBase): async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: # ... validation code ... return await self._run_tool_with_retry( tool_name=tool_name, arguments=arguments, get_session_func=self._get_or_create_session, session_context=self._session_context, cleanup_func=lambda ctx: self._get_session_manager()._cleanup_session(ctx) )Also applies to: 1063-1174
🧹 Nitpick comments (3)
src/backend/base/langflow/base/mcp/util.py (3)
26-30: Consider using httpx status code constants directly.The file already imports
httpx_codes(line 13). Instead of defining duplicate constants, use the existing httpx constants for consistency.-# HTTP status codes used in validation -HTTP_NOT_FOUND = 404 -HTTP_BAD_REQUEST = 400 -HTTP_INTERNAL_SERVER_ERROR = 500Then update the usage in lines 950, 957-958 to:
-if response.status_code == HTTP_NOT_FOUND: +if response.status_code == httpx_codes.NOT_FOUND: -if ( - HTTP_BAD_REQUEST <= response.status_code < HTTP_INTERNAL_SERVER_ERROR - and response.status_code != HTTP_NOT_FOUND -): +if ( + httpx_codes.BAD_REQUEST <= response.status_code < httpx_codes.INTERNAL_SERVER_ERROR + and response.status_code != httpx_codes.NOT_FOUND +):
461-489: Consider extracting stream health check logic for better maintainability.The stream health check logic is comprehensive but complex with multiple conditional branches. Consider extracting this into a separate method for better readability and testability.
async def _check_stream_health(self, session) -> bool: """Check if the session's write stream is still healthy.""" try: if not hasattr(session, "_write_stream"): return True # Can't check, assume healthy write_stream = session._write_stream # Check for explicit closed state if hasattr(write_stream, "_closed") and write_stream._closed: return False # Check anyio stream state for send channels if hasattr(write_stream, "_state") and hasattr(write_stream._state, "open_send_channels"): return write_stream._state.open_send_channels > 0 # Check for other stream closed indicators if hasattr(write_stream, "is_closing") and callable(write_stream.is_closing): return not write_stream.is_closing() # Default to healthy if we can't determine state return True except (AttributeError, TypeError) as e: logger.debug(f"Could not check stream health: {e}") return TrueThen use it in the main method:
-# Additional check for stream health -stream_is_healthy = True -try: - # Check if the session's write stream is still open - if hasattr(session, "_write_stream"): - write_stream = session._write_stream - # ... (rest of the checks) -except (AttributeError, TypeError) as e: - # ... - stream_is_healthy = True +stream_is_healthy = await self._check_stream_health(session)
888-899: Consider including disconnect in the base class refactor.The
disconnectmethod is also duplicated betweenMCPStdioClientandMCPSseClient. When creating the base class suggested earlier, include this common cleanup logic.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/backend/base/langflow/base/mcp/util.py(10 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`src/backend/**/*.py`: Run make format_backend to format Python code early and often Run make lint to check for linting issues in backend Python code
src/backend/**/*.py: Run make format_backend to format Python code early and often
Run make lint to check for linting issues in backend Python code
📄 Source: CodeRabbit Inference Engine (.cursor/rules/backend_development.mdc)
List of files the instruction was applied to:
src/backend/base/langflow/base/mcp/util.py
🧠 Learnings (1)
src/backend/base/langflow/base/mcp/util.py (1)
Learnt from: CR
PR: langflow-ai/langflow#0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-06-30T14:39:17.464Z
Learning: Applies to src/backend/base/langflow/components/**/*.py : Use asyncio.create_task for background work in async components and ensure proper cleanup on cancellation
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Update Starter Projects
- GitHub Check: Run Ruff Check and Format
- GitHub Check: Ruff Style Check (3.13)
- GitHub Check: Optimize new Python code in this PR
🔇 Additional comments (4)
src/backend/base/langflow/base/mcp/util.py (4)
388-429: Well-implemented session connectivity validation.The method provides a robust lightweight connectivity test with:
- Appropriate 3-second timeout for fast failure
- Comprehensive error handling for both standard and MCP-specific errors
- Proper response validation
- Clear debug logging
580-598: Excellent timeout handling for session creation.The 10-second timeout with proper cleanup ensures sessions don't hang indefinitely. The error handling and task cancellation logic is well-implemented in both STDIO and SSE session creation methods.
Also applies to: 643-661
931-973: Excellent SSE endpoint validation improvements.The changes properly handle SSE server compatibility:
- Using GET with
Accept: text/event-streamheader instead of HEAD- Treating 404 as potentially valid (many SSE servers don't support non-SSE requests)
- Interpreting timeouts as expected behavior for streaming endpoints
This should significantly improve compatibility with various SSE server implementations.
1-1174: Reminder: Run backend formatting and linting.As per the coding guidelines for
src/backend/**/*.pyfiles, ensure you run:
make format_backendto format the codemake lintto check for any linting issues
…ed JSON recognition (langflow-ai#8982) * Improved health check and stream URL check on MCP * Improved health check by validating session connectivity * Changed mcp servers from json checks * Fixed imports * Fixed mcp server tab test
This pull request introduces enhancements to error handling, session management, and validation logic in the
src/backend/base/langflow/base/mcp/util.pyfile. Key changes include improved session health checks, retry logic for closed resources, and adjustments to HTTP validation for SSE endpoints.This fixes the Figma MCP server, both in STDIO and SSE.
Session Management Improvements:
ClosedResourceErrorduring tool execution, with automatic session cleanup and retries for up to two attempts. [1] [2]Error Handling Enhancements:
run_toolby distinguishing between expected errors (e.g.,ConnectionError,TimeoutError) and unexpected ones, with proper re-raising and logging. [1] [2]HTTP Validation for SSE Endpoints:
GETrequests with SSE headers instead ofHEAD, accommodating servers that don't supportHEADrequests. [1] [2]404,400,500) in SSE validation, allowing graceful fallback for unsupported endpoints.Miscellaneous:
HTTP_NOT_FOUND,HTTP_BAD_REQUEST,HTTP_INTERNAL_SERVER_ERROR) to improve code readability and maintainability.Summary by CodeRabbit
New Features
Bug Fixes