fix: serialize concurrent MCP session access to prevent race conditions#12761
Conversation
Two MCPTools components pointing at the same SSE URL share a single MCPSessionManager via the component cache. Under concurrent flow execution (10+ runs against the same server), ~40-50% of runs failed with one of: Error updating tool list: 'streamable_http_<hash>_0' Timeout updating tool list: ... Error updating tool list: <connection error> Three races caused this: 1. get_session() was not serialized per-server. Concurrent callers iterating self.sessions_by_server[server_key]["sessions"] could both pass the health check, then both invoke _cleanup_session_by_id(). 2. _cleanup_session_by_id() used del sessions[session_id] in a finally block. Two callers that both passed the `if session_id not in sessions` guard would race on the delete — the loser raised KeyError: 'streamable_http_<hash>_0', matching the reported symptom. 3. Session ids were generated from len(sessions), so removing and re-adding sessions could silently produce colliding ids. Fixes: - Per-server asyncio.Lock (guarded by a module-level lock for creation) serializes session reuse/creation/cleanup. - _cleanup_session_by_id() now pops the session entry up front; only the winning caller runs teardown, the rest no-op. - Session ids come from a monotonic per-server counter. Regression tests cover all three races: 10 concurrent get_session calls must share a single created session; 10 concurrent cleanups must not raise; session ids must not recycle "_0" after cleanup. Fixes #9860
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
❌ Your patch status has failed because the patch coverage (8.77%) is below the target coverage (40.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## release-1.10.0 #12761 +/- ##
==================================================
- Coverage 52.90% 52.60% -0.30%
==================================================
Files 2025 2025
Lines 184223 183549 -674
Branches 28873 26179 -2694
==================================================
- Hits 97460 96559 -901
- Misses 85669 85895 +226
- Partials 1094 1095 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Addresses review findings on the previous commit: HIGH: cleanup paths bypassed the per-server lock. Both `_cleanup_session(context_id)` (invoked from client disconnect) and `_cleanup_idle_sessions()` (invoked from the periodic background task) still mutated `sessions_by_server` without holding the lock. A concurrent `get_session()` could finish validating a session while idle-cleanup popped and cancelled its task; `get_session()` then returned a dead session and registered a dangling refcount entry. MEDIUM: per-server maps (`_server_locks`, `_session_id_counters`) grew unboundedly. The previous patch only reclaimed server entries from `sessions_by_server`; rotating auth/session headers (which change `server_key` via `_get_server_key`) leaked entries in the other two maps forever in long-lived processes. Changes: - Replace `_get_server_lock()` with `_server_lock()`, an async context manager that pin-counts the entry so reclamation can't race a task that holds or is about to acquire the lock. - Wrap the mutating regions of `_cleanup_session()` and `_cleanup_idle_sessions()` in `_server_lock(server_key)`. - On lock release, reclaim both `_server_locks[server_key]` and `_session_id_counters[server_key]` once pins drop to 0, the lock is unheld, and the server has no remaining sessions. - `cleanup_all()` also clears the two maps under `_locks_guard`. New regression tests: - `test_cleanup_idle_vs_get_session_are_serialized` — an in-flight `get_session` blocks a concurrent idle-cleanup pass; the session is returned live, not torn down underneath the caller. - `test_concurrent_cleanup_session_and_get_session_safe` — refcount transitions stay consistent across concurrent connect/disconnect. - `test_server_lock_and_counter_reclaimed_when_unused` — after all sessions for a `server_key` are removed, both `_server_locks` and `_session_id_counters` entries go away.
|
Thanks for the sharp review — both findings confirmed and fixed in HIGH — cleanup paths now honor the per-server lockBoth New regression tests exercise both angles:
MEDIUM —
|
Addresses the review finding on cross-server reuse of `context_id`: `_cleanup_session(context_id)` runs under the per-server lock of the server the mapping pointed at when cleanup started. That lock does NOT serialize a concurrent `get_session(context_id, different_server)` which runs under a *different* per-server lock and atomically re-points `_context_to_session[context_id]` at the new session. The old cleanup path then ran `self._context_to_session.pop(context_id, None)` unconditionally, wiping out the fresh mapping. The new session on the other server was left with refcount 1 and no context mapping, so the next disconnect was a no-op and the session leaked indefinitely. Fix: CAS the pop. After the refcount decrement / teardown, only drop the mapping if `_context_to_session[context_id]` still points at `(server_key, session_id)` — the pair we just cleaned up. The `get()` and `pop()` run synchronously (no `await` between them) so asyncio cannot interleave another coroutine between the check and the mutation. New regression test `test_cleanup_does_not_wipe_cross_server_handoff` slows down `_cleanup_session_by_id` for server A while a concurrent `get_session(ctx, serverB)` re-points the context at server B. Without the CAS, the assertion that `_context_to_session[ctx] == (server_B, ...)` fails. With the CAS, the fresh B mapping survives and its refcount is intact.
|
Good catch — fixed in The per-server lock does serialize same-server cleanup/reuse, but you're right that Fix: CAS the pop. After the refcount decrement and teardown for the session we were cleaning, only drop the mapping if New regression test |
Cristhianzl
left a comment
There was a problem hiding this comment.
🧭 TL;DR Verdict
| Dimension | Result |
|---|---|
| 🔴 CRITICAL blockers | None |
| 🟠 IMPORTANT (must-fix before merge) | 2 |
| 🟡 RECOMMENDED | 5 |
| 🟢 NICE TO HAVE | 3 |
| Overall | Approve with minor changes — the bug fix itself is surgically correct and well-tested; findings below are quality refinements, not design defects |
This is a high-quality concurrency fix. The root-cause analysis is sharp, the fix is minimal and targeted, the three commits form a clean progression (fix → extend lock coverage → close cross-server CAS), and the regression tests actually reproduce the races they’re guarding against. The PR exemplifies the security-mindset principles in DEVELOPMENT_RULE.md §Security Mindset: each commit explicitly names an assumption that was wrong and demonstrates the blast radius before fixing it.
🔴 CRITICAL — Security & PII Blockers
None found.
Verifications performed:
- No
print(...),console.log, or PII in any newlogger.*call. The only values logged aresession_idandserver_key— both internal, non-reversible identifiers (server_key = f"streamable_http_{hash(url|headers)}", seeutil.py:877, 885). - No secrets or tokens appear in any log statement, test fixture, or error message.
- No
eval,exec,os.system, subprocess injection, SQL string concat, orinnerHTML-equivalents. - Test fixtures use
http://example.test/sse— an IANA reserved test domain. No real endpoints. - The fix does not alter any authentication/authorization boundary; it only serializes already-authenticated session reuse.
🟠 IMPORTANT — Must Fix Before Merge
1. dict[str, Any] typing on _server_locks
File: src/lfx/src/lfx/base/mcp/util.py:758
self._server_locks: dict[str, dict[str, Any]] = {}The comment above documents the shape precisely — {"lock": asyncio.Lock(), "pins": int} — but the type system sees Any, defeating static checks on every callsite that touches entry["lock"] / entry["pins"].
- Why it violates the rules:
DEVELOPMENT_RULE.md §Code Quality – General Rules: "Use strong typing; avoid loose types like any, object, or dynamic." AndREVIEWER_RULE.md §Clean Code—Anyis the Python equivalent of TSany. - Fix: Introduce a
TypedDict(no runtime cost) next to the attribute:Or a slotted dataclass if mutation ergonomics matter. Either option preserves the pin-count invariant encoded in the comment.from typing import TypedDict class _ServerLockEntry(TypedDict): lock: asyncio.Lock pins: int self._server_locks: dict[str, _ServerLockEntry] = {}
2. Legacy file-size violation is being extended
File: src/lfx/src/lfx/base/mcp/util.py — 2,127 lines after this PR (was 2,112).
File: src/backend/tests/unit/base/mcp/test_mcp_util.py — 3,666 lines after this PR (was 3,465).
Both files massively exceed the hard 500-line limit documented in REVIEWER_RULE.md §File Structure Limits / DEVELOPMENT_RULE.md §1.
This PR did not create the problem and the fix legitimately must live on MCPSessionManager, so I am not blocking merge on this alone. However, REVIEWER_RULE.md §Legacy Code Awareness is explicit:
Do NOT prolong bad patterns — even if surrounding code is bad, write good code. Flag legacy patterns you encounter for future cleanup.
- Why it matters here: this PR adds four new methods (
_server_lock,_release_server_lock_if_idle,_next_session_id, plus a modified_cleanup_session) to an already-2k-line file. Each addition makes the eventual split harder. - Recommended action (not a blocker): open a follow-up ticket to extract
MCPSessionManagerinto its own module (mcp/session_manager.py+mcp/session_manager_test.py), grouping the new concurrency primitives in ahelpers/concurrency.pysubmodule. Reference this PR as the trigger. - For this PR specifically: add a
# TODO(#<ticket>):marker immediately above theclass MCPSessionManager:declaration so the debt is visible to the next reader.
🟡 RECOMMENDED — Should Fix
3. Time-based sleeps make the regression tests flaky-prone
File: src/backend/tests/unit/base/mcp/test_mcp_util.py
-
test_concurrent_get_session_same_server_reuses_one_session→await asyncio.sleep(0.05)insidefake_create(line 90 of diff) -
test_cleanup_idle_vs_get_session_are_serialized→await asyncio.sleep(0.02)(line 528 of diff) to "give it a chance to run if it were going to race" -
test_cleanup_does_not_wipe_cross_server_handoff→await asyncio.sleep(0)(line 936 of diff) -
Why:
REVIEWER_RULE.md §Test Quality— "Deterministic tests (no flaky tests)". On slow CI runners or under load, 20 ms can expire before the task reaches the awaited point. -
Fix: replace the sleep-based waits with
asyncio.Event/asyncio.Barriersynchronization. Example:cleaner_entered = asyncio.Event() # monkeypatch _cleanup_idle_sessions to set cleaner_entered.set() before blocking cleaner = asyncio.create_task(session_manager._cleanup_idle_sessions()) await cleaner_entered.wait() # deterministic rendezvous
The first test (
fake_createwith 0.05 s) can just use anasyncio.Event()per caller and gather — the point is to overlap, not to measure a latency.
4. Law-of-Demeter leaks of internal dict shape
File: src/lfx/src/lfx/base/mcp/util.py
New code repeatedly digs into shared dicts:
self.sessions_by_server[server_key]["sessions"]
self._server_locks[server_key]["lock"]
self._server_locks[server_key]["pins"]Also in the regression tests (test_mcp_util.py:104, 171, 587, 947 etc.).
- Why:
DEVELOPMENT_RULE.md §Law of DemeterandREVIEWER_RULE.md §KISS. The caller knows the full shape, which is exactly what thedict[str, Any]hides. Tests becoming coupled to internal representation is a well-known smell. - Fix: lightweight accessors on
MCPSessionManager:Combined with finding #1'sdef _sessions_for(self, server_key: str) -> dict[str, dict]: entry = self.sessions_by_server.get(server_key) return entry.get("sessions", {}) if entry else {}
TypedDict, this also removes["lock"]/["pins"]string-key access from the concurrency path.
5. Pre-existing except Exception: # noqa: BLE001
File: src/lfx/src/lfx/base/mcp/util.py — inside _cleanup_session_by_id (the generic-catch block the PR preserves).
This is not introduced by this PR — it already existed — but REVIEWER_RULE.md §Error Handling asks: "No generic exceptions (Exception, Error, object)".
- Recommended action: since you're already touching this function (the whole purpose of the PR), narrow the
exceptto the actual exceptions teardown can raise (asyncio.CancelledError,ConnectionError,OSError, the MCP-specific ones you already enumerate in_validate_session_connectivity), plusExceptionas a last-resort log-and-continue. At minimum, replace the bare# noqa: BLE001with a short comment saying why the broad catch is load-bearing. - If out of scope for this PR: fine — add a follow-up ticket.
6. _release_server_lock_if_idle reclamation has a subtle ordering contract not asserted
File: src/lfx/src/lfx/base/mcp/util.py:801-808
The reclamation uses three conditions:
if entry["pins"] <= 0 and not entry["lock"].locked() and server_key not in self.sessions_by_server:Why <= not == 0? If pins ever goes negative, the branch still fires — but a negative pin count would indicate a missing acquire / double-release bug that should fail loudly.
- Fix: Keep the
<= 0for safety but add anassert entry["pins"] >= 0with a log.warn path when it trips, so a regression shows up in telemetry instead of being silently swept.
7. _next_session_id is not thread/task-safe on its own
File: src/lfx/src/lfx/base/mcp/util.py:810-814
def _next_session_id(self, server_key: str) -> str:
current = self._session_id_counters.get(server_key, 0)
self._session_id_counters[server_key] = current + 1
return f"{server_key}_{current}"This is correct only because every caller of _next_session_id is already inside async with self._server_lock(server_key) (verified: the single call site is get_session at util.py:979 after the async with self._server_lock(...) block). That invariant is load-bearing and invisible.
- Fix: add a one-line docstring note: "Caller must hold
self._server_lock(server_key). Otherwise this increment races." This protects the next maintainer.
🟢 NICE TO HAVE — Polish
8. Type the async context manager return
File: src/lfx/src/lfx/base/mcp/util.py:766-767
@contextlib.asynccontextmanager
async def _server_lock(self, server_key: str):Annotate the yield for clarity and IDE support:
from collections.abc import AsyncIterator
@contextlib.asynccontextmanager
async def _server_lock(self, server_key: str) -> AsyncIterator[None]:9. Test naming convention
Tests follow test_<what>_<expected>. Most are descriptive, but test_concurrent_cleanup_same_session_is_idempotent could be test_cleanup_session_by_id_is_idempotent_when_called_concurrently to match should_[expected]_when_[condition] style from REVIEWER_RULE.md §Test Quality. Low priority — the intent is unambiguous either way.
10. Consider a centralized constants symbol
"streamable_http", "sse", "stdio" are string literals peppered across both files. A small class TransportType(StrEnum): ... would remove magic strings (DEVELOPMENT_RULE.md §General Rules – avoid magic strings). Probably out of scope for a bug-fix PR, but worth a follow-up.
✅ Compliant Aspects Worth Highlighting
- Root-cause discipline — each of the three commits opens with a clear statement of what the old code assumed, why that assumption was wrong, and what the blast radius was. Matches
DEVELOPMENT_RULE.md §Security Mindsetverbatim. - Defensive by default —
sessions.pop(session_id, None)replaces the racydel sessions[session_id];_context_to_session.get() == ... then .pop()CAS replaces the unconditional pop. - Reclamation is conservative — pin counting was added in response to review feedback to close a subtle reclaim-vs-acquire race. The
pins == 0 AND not locked AND server emptygate is the right conjunction. - Tests reproduce the races — not just assertion-of-happy-path.
test_cleanup_idle_vs_get_session_are_serializeduses anasyncio.Eventrendezvous to prove the lock is load-bearing, not incidental. - Idempotent cleanup —
test_concurrent_cleanup_same_session_is_idempotentfires 10 concurrent cleanups and assertscancel.assert_called_once(). Exactly the right shape. - Cross-server CAS — the third commit catches a race (
ctx_movereconnecting to a different server while the old disconnect is in flight) that most reviewers would miss. The reasoning in the commit message is first-rate. - Zero PII / zero secrets / zero eval.
- Comments explain WHY — every new block has a paragraph in the docstring describing the incident it prevents, not the mechanics of the code.
🧪 Security Mindset — The Five Questions (per REVIEWER_RULE.md and DEVELOPMENT_RULE.md §Security Mindset)
| # | Question | Answer for this PR |
|---|---|---|
| 1 | What is this code trusting without verifying? | The monotonic counter is only safe while held under _server_lock — invariant is documented in the docstring but not enforced; see finding #7. |
| 2 | Authoritative source for behavior? | Python asyncio primitives (Lock, Event, gather) — idiomatic and correct. ✅ |
| 3 | What happens in every failure path? | _cleanup_session_by_id preserves the pre-existing broad except Exception for teardown safety (finding #5). Lock-acquire failure does not hide underlying exceptions. ✅ |
| 4 | Who controls each value, and can they lie? | All inputs (server_key, session_id, context_id) are server-generated. No external control surface. ✅ |
| 5 | Blast radius if wrong? | Worst case: session leak or DoS via connection exhaustion on a long-lived process. The PR specifically closes the "leaks forever" case via pin-counted reclamation. Reduced, not amplified. ✅ |
📋 Final Pre-Merge Checklist
🔴 CRITICAL PASS
🟠 IMPORTANT 2 items — resolve before merge
[ ] 1. Replace dict[str, Any] with TypedDict for _ServerLockEntry
[ ] 2. Add TODO(#<follow-up-ticket>) marker flagging util.py file-size debt
🟡 RECOMMENDED 5 items — strongly encouraged
[ ] 3. Replace asyncio.sleep-based test rendezvous with asyncio.Event
[ ] 4. Encapsulate internal dict-shape access (Law of Demeter)
[ ] 5. Narrow (or justify) the broad `except Exception` in _cleanup_session_by_id
[ ] 6. Assert non-negative pin count in _release_server_lock_if_idle
[ ] 7. Document "caller must hold server lock" on _next_session_id
🟢 NICE TO HAVE 3 items — follow-up ticket OK
Testing
[x] Regression tests exist for all three race conditions (reuse, cleanup, id collision)
[x] Extra regression tests for follow-on findings (idle-vs-get, cross-server CAS, reclamation)
[x] Tests are adversarial, not happy-path only
[ ] Replace timing-based sleeps with Event rendezvous (finding #3)
[ ] Show pytest coverage output for the touched functions (≥ 75%, target 80%)
[x] `ruff check` / `ruff format` confirmed passing in PR description
IMPORTANT - Replace `dict[str, Any]` with `_ServerLockEntry` TypedDict so the lock/pins shape is visible to static analysis (finding #1). - Add TODO marker above `MCPSessionManager` flagging the module's file-size debt as follow-up work (finding #2). RECOMMENDED - Replace timing-based `asyncio.sleep(0.02)` / `asyncio.sleep(0.05)` in concurrency regression tests with deterministic Event-based rendezvous and pin-count polling (finding #3). - Introduce `_sessions_for(server_key)` helper and use it in `_cleanup_idle_sessions`, `_cleanup_session_by_id`, and `cleanup_all` to stop reaching through `sessions_by_server[server_key]["sessions"]` at every call site (finding #4). - Document why `_cleanup_session_by_id` keeps a broad `except Exception` (transport-layer teardown raises many different exception hierarchies — leak-on-cleanup is worse than a swallowed error) (finding #5). - Log a warning when pin count goes negative in `_release_server_lock_if_idle` so a missing acquire / double release surfaces in telemetry instead of being silently swept (finding #6). - Document the "caller must hold `_server_lock(server_key)`" invariant on `_next_session_id` (finding #7). NICE TO HAVE - Annotate the `_server_lock` async context manager yield type as `AsyncIterator[None]` for IDE support (finding #8). All 175 tests in `test_mcp_util.py` still pass; ruff clean.
# Conflicts: # src/lfx/src/lfx/base/mcp/util.py
…ns (#12761) * fix: serialize concurrent MCP session access to prevent race conditions Two MCPTools components pointing at the same SSE URL share a single MCPSessionManager via the component cache. Under concurrent flow execution (10+ runs against the same server), ~40-50% of runs failed with one of: Error updating tool list: 'streamable_http_<hash>_0' Timeout updating tool list: ... Error updating tool list: <connection error> Three races caused this: 1. get_session() was not serialized per-server. Concurrent callers iterating self.sessions_by_server[server_key]["sessions"] could both pass the health check, then both invoke _cleanup_session_by_id(). 2. _cleanup_session_by_id() used del sessions[session_id] in a finally block. Two callers that both passed the `if session_id not in sessions` guard would race on the delete — the loser raised KeyError: 'streamable_http_<hash>_0', matching the reported symptom. 3. Session ids were generated from len(sessions), so removing and re-adding sessions could silently produce colliding ids. Fixes: - Per-server asyncio.Lock (guarded by a module-level lock for creation) serializes session reuse/creation/cleanup. - _cleanup_session_by_id() now pops the session entry up front; only the winning caller runs teardown, the rest no-op. - Session ids come from a monotonic per-server counter. Regression tests cover all three races: 10 concurrent get_session calls must share a single created session; 10 concurrent cleanups must not raise; session ids must not recycle "_0" after cleanup. Fixes #9860 * fix: extend per-server lock to cleanup paths and reclaim per-key maps Addresses review findings on the previous commit: HIGH: cleanup paths bypassed the per-server lock. Both `_cleanup_session(context_id)` (invoked from client disconnect) and `_cleanup_idle_sessions()` (invoked from the periodic background task) still mutated `sessions_by_server` without holding the lock. A concurrent `get_session()` could finish validating a session while idle-cleanup popped and cancelled its task; `get_session()` then returned a dead session and registered a dangling refcount entry. MEDIUM: per-server maps (`_server_locks`, `_session_id_counters`) grew unboundedly. The previous patch only reclaimed server entries from `sessions_by_server`; rotating auth/session headers (which change `server_key` via `_get_server_key`) leaked entries in the other two maps forever in long-lived processes. Changes: - Replace `_get_server_lock()` with `_server_lock()`, an async context manager that pin-counts the entry so reclamation can't race a task that holds or is about to acquire the lock. - Wrap the mutating regions of `_cleanup_session()` and `_cleanup_idle_sessions()` in `_server_lock(server_key)`. - On lock release, reclaim both `_server_locks[server_key]` and `_session_id_counters[server_key]` once pins drop to 0, the lock is unheld, and the server has no remaining sessions. - `cleanup_all()` also clears the two maps under `_locks_guard`. New regression tests: - `test_cleanup_idle_vs_get_session_are_serialized` — an in-flight `get_session` blocks a concurrent idle-cleanup pass; the session is returned live, not torn down underneath the caller. - `test_concurrent_cleanup_session_and_get_session_safe` — refcount transitions stay consistent across concurrent connect/disconnect. - `test_server_lock_and_counter_reclaimed_when_unused` — after all sessions for a `server_key` are removed, both `_server_locks` and `_session_id_counters` entries go away. * fix: CAS context mapping pop to preserve cross-server handoffs Addresses the review finding on cross-server reuse of `context_id`: `_cleanup_session(context_id)` runs under the per-server lock of the server the mapping pointed at when cleanup started. That lock does NOT serialize a concurrent `get_session(context_id, different_server)` which runs under a *different* per-server lock and atomically re-points `_context_to_session[context_id]` at the new session. The old cleanup path then ran `self._context_to_session.pop(context_id, None)` unconditionally, wiping out the fresh mapping. The new session on the other server was left with refcount 1 and no context mapping, so the next disconnect was a no-op and the session leaked indefinitely. Fix: CAS the pop. After the refcount decrement / teardown, only drop the mapping if `_context_to_session[context_id]` still points at `(server_key, session_id)` — the pair we just cleaned up. The `get()` and `pop()` run synchronously (no `await` between them) so asyncio cannot interleave another coroutine between the check and the mutation. New regression test `test_cleanup_does_not_wipe_cross_server_handoff` slows down `_cleanup_session_by_id` for server A while a concurrent `get_session(ctx, serverB)` re-points the context at server B. Without the CAS, the assertion that `_context_to_session[ctx] == (server_B, ...)` fails. With the CAS, the fresh B mapping survives and its refcount is intact. * refactor(mcp): address review findings on concurrent-access fix IMPORTANT - Replace `dict[str, Any]` with `_ServerLockEntry` TypedDict so the lock/pins shape is visible to static analysis (finding #1). - Add TODO marker above `MCPSessionManager` flagging the module's file-size debt as follow-up work (finding #2). RECOMMENDED - Replace timing-based `asyncio.sleep(0.02)` / `asyncio.sleep(0.05)` in concurrency regression tests with deterministic Event-based rendezvous and pin-count polling (finding #3). - Introduce `_sessions_for(server_key)` helper and use it in `_cleanup_idle_sessions`, `_cleanup_session_by_id`, and `cleanup_all` to stop reaching through `sessions_by_server[server_key]["sessions"]` at every call site (finding #4). - Document why `_cleanup_session_by_id` keeps a broad `except Exception` (transport-layer teardown raises many different exception hierarchies — leak-on-cleanup is worse than a swallowed error) (finding #5). - Log a warning when pin count goes negative in `_release_server_lock_if_idle` so a missing acquire / double release surfaces in telemetry instead of being silently swept (finding #6). - Document the "caller must hold `_server_lock(server_key)`" invariant on `_next_session_id` (finding #7). NICE TO HAVE - Annotate the `_server_lock` async context manager yield type as `AsyncIterator[None]` for IDE support (finding #8). All 175 tests in `test_mcp_util.py` still pass; ruff clean.
Summary
Fixes intermittent failures (~40–50%) when two
MCPToolscomponents in the same flow point at the same SSE URL and multiple flow executions run concurrently, as reported in #9860 and re-confirmed in v1.9.0 / v1.10.0.Root Cause
Three concurrent-access bugs in
MCPSessionManager(src/lfx/src/lfx/base/mcp/util.py):get_session()— concurrent callers for the same server both iteratedsessions_by_server[server_key]["sessions"], both passed the health check, and both invoked_cleanup_session_by_id()on the same session._cleanup_session_by_id()useddel sessions[session_id]in afinallyblock. Two callers that both passed theif session_id not in sessionsguard raced on the delete; the loser raisedKeyError: 'streamable_http_<hash>_0'— matching the exact reported error.session_id = f"{server_key}_{len(sessions)}"could recycle IDs after cleanup, silently overwriting an existing session entry.Fix
asyncio.Lock(created under a module-level guard) to serialize the reuse / create / cleanup sections ofget_session()._cleanup_session_by_id()nowpop()s the session entry up front — only the winning caller tears the session down; the rest no-op._session_id_counters[server_key]), so recycled slots don't produce colliding IDs.Test plan
test_mcp_util.py::TestMCPSessionManager:test_concurrent_get_session_same_server_reuses_one_session— 10 concurrentget_session()calls share exactly one underlying session / one create call.test_concurrent_cleanup_same_session_is_idempotent— 10 concurrent_cleanup_session_by_id()calls on the same session do not raiseKeyError;task.cancel()runs exactly once.test_session_ids_are_monotonic_not_len_based— the counter keeps advancing after cleanup;{server_key}_0is not recycled for new sessions.test_session_caching,test_session_cleanup,test_server_switch_detectionstill pass.ruff check+ruff formatpass.MCPTools→ same SSE URL → 2 different Agents) with 10+ concurrent executions. No'streamable_http_..._0'KeyError, noTimeout updating tool list.Fixes #9860