Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-connection-closed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@modelcontextprotocol/core': patch
'@modelcontextprotocol/server': patch
---

Fix unhandled promise rejections on transport close and detect stdin EOF in StdioServerTransport. Pending request promises are now rejected asynchronously via microtask deferral, and the server transport listens for stdin `end` events to trigger a clean shutdown when the client process exits.
11 changes: 10 additions & 1 deletion packages/core/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,17 @@ export abstract class Protocol<ContextT extends BaseContext> {
try {
this.onclose?.();
} finally {
// Reject pending response handlers on the next microtask to allow
// callers time to attach .catch() handlers and prevent unhandled
// promise rejections (see #1049, #392).
for (const handler of responseHandlers.values()) {
handler(error);
void Promise.resolve().then(() => {
try {
handler(error);
} catch (handlerError) {
this._onerror(handlerError instanceof Error ? handlerError : new Error(String(handlerError)));
}
});
}

for (const controller of requestHandlerAbortControllers.values()) {
Expand Down
47 changes: 47 additions & 0 deletions packages/core/test/shared/protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,53 @@
expect(oncloseMock).toHaveBeenCalled();
});

test('should reject pending requests with ConnectionClosed when transport closes', async () => {
await protocol.connect(transport);
const mockSchema = z.object({ result: z.string() });
const requestPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema);

// Close transport while request is pending
await transport.close();

// The pending request should reject with ConnectionClosed
await expect(requestPromise).rejects.toThrow('Connection closed');
await expect(requestPromise).rejects.toMatchObject({
code: SdkErrorCode.ConnectionClosed
});
});

test('should not cause unhandled promise rejections when transport closes with pending requests', async () => {
await protocol.connect(transport);
const mockSchema = z.object({ result: z.string() });

// Track unhandled rejections
const unhandledRejections: unknown[] = [];
const processHandler = (reason: unknown) => {
unhandledRejections.push(reason);
};
process.on('unhandledRejection', processHandler);

try {
// Create a pending request and attach .catch() to prevent the expected rejection
// from triggering the handler
const requestPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema);
requestPromise.catch(() => {
// Expected — the request was rejected due to connection close
});

// Close transport
await transport.close();

// Wait for microtasks to flush
await new Promise(resolve => setTimeout(resolve, 50));

// No unhandled rejections should have occurred
expect(unhandledRejections).toHaveLength(0);
} finally {
process.off('unhandledRejection', processHandler);
}
});

Check warning on line 265 in packages/core/test/shared/protocol.test.ts

View check run for this annotation

Claude / Claude Code Review

Unhandled-rejection test pre-attaches .catch() before close, making it a no-op validation

The test pre-attaches `requestPromise.catch()` synchronously **before** calling `transport.close()`, so the rejection handler is already in place when the rejection fires regardless of whether it fires synchronously (old code) or via deferred microtask (new code). This means the test passes equally well with the old synchronous rejection code and does not actually validate that the microtask deferral prevents unhandled rejections. Consider restructuring the test to close the transport first and
Comment on lines +247 to +265
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The test pre-attaches requestPromise.catch() synchronously before calling transport.close(), so the rejection handler is already in place when the rejection fires regardless of whether it fires synchronously (old code) or via deferred microtask (new code). This means the test passes equally well with the old synchronous rejection code and does not actually validate that the microtask deferral prevents unhandled rejections. Consider restructuring the test to close the transport first and then observe that a request made immediately afterward does not produce an unhandled rejection without an explicit .catch().

Extended reasoning...

What the test does vs. what it should validate

The test is intended to verify that deferring handler(error) calls to the next microtask (via Promise.resolve().then()) prevents unhandled promise rejections. The race condition the PR fixes is: a transport closes and synchronously rejects all pending promises before the caller has had a chance to attach a .catch() handler. On Node.js 24 with --unhandled-rejections=throw, such an unhandled rejection kills the process.

Why the test gives false assurance

The test attaches requestPromise.catch(() => {}) synchronously and immediately after creating the request promise — well before transport.close() is called. Because .catch() is already registered, no unhandled rejection can occur regardless of when the underlying reject() call fires:

  • Old synchronous code path: transport.close()_onclose()handler(error)reject(error) fires synchronously. But .catch() was attached two lines earlier, so Node.js sees the rejection as handled. ✓ test passes.
  • New deferred code path: same, except reject(error) fires in the next microtask. .catch() was still attached before close, so the rejection is still handled. ✓ test passes.

Both implementations pass the test with identical results; the deferral change is entirely unobservable by this test.

The actual race condition is not exercised

The true race occurs when: (1) a request promise is created, (2) the transport closes synchronously in the same JS tick, and (3) the caller hasn't yet attached .catch() — e.g., because the close happens inside the same synchronous block before the caller returns to user code. The test eliminates this scenario by pre-attaching the handler.

Impact

The production code change is correct and the fix is real. However, the test provides false confidence that the fix is validated. If someone reverts the deferral change (or introduces a regression), this test would still pass, making it ineffective as a regression guard.

How to fix the test

A correct test would:

  1. Call transport.close() without any pending request.
  2. Then synchronously call protocol.request() to get a promise in a state where the transport is already closed.
  3. Do NOT attach .catch() before verifying the unhandledRejection listener count stays at zero.

Alternatively: capture the promise, do NOT attach .catch(), flush microtasks, then attach .catch() and assert no unhandledRejection fired. This directly reproduces the race because the rejection fires (in the next microtask) while no handler is attached.


test('should abort in-flight request handlers when the connection is closed', async () => {
await protocol.connect(transport);

Expand Down
10 changes: 10 additions & 0 deletions packages/server/src/server/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ export class StdioServerTransport implements Transport {
// Ignore errors during close — we're already in an error path
});
};
_onend = () => {
// stdin EOF means the client process has disconnected.
// Trigger a clean close so pending requests are properly rejected
// and the server can shut down gracefully (see #1049).
this.close().catch(() => {
// Ignore errors during close — we're already in a shutdown path
});
};

/**
* Starts listening for messages on `stdin`.
Expand All @@ -57,6 +65,7 @@ export class StdioServerTransport implements Transport {

this._started = true;
this._stdin.on('data', this._ondata);
this._stdin.on('end', this._onend);
this._stdin.on('error', this._onerror);
this._stdout.on('error', this._onstdouterror);
}
Expand Down Expand Up @@ -84,6 +93,7 @@ export class StdioServerTransport implements Transport {

// Remove our event listeners first
this._stdin.off('data', this._ondata);
this._stdin.off('end', this._onend);
this._stdin.off('error', this._onerror);
this._stdout.off('error', this._onstdouterror);

Expand Down
75 changes: 75 additions & 0 deletions packages/server/test/server/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,78 @@ test('should fire onerror before onclose on stdout error', async () => {

expect(events).toEqual(['error', 'close']);
});

test('should close transport when stdin emits end (EOF)', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

let didClose = false;
server.onclose = () => {
didClose = true;
};

await server.start();
expect(didClose).toBeFalsy();

// Simulate client disconnecting (stdin EOF)
input.push(null);

// Wait for the async close() to complete
await new Promise(resolve => setTimeout(resolve, 50));
expect(didClose).toBeTruthy();
});

test('should not fire onclose twice when stdin EOF followed by explicit close()', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

let closeCount = 0;
server.onclose = () => {
closeCount++;
};

await server.start();

// stdin EOF triggers close
input.push(null);
await new Promise(resolve => setTimeout(resolve, 50));

// Explicit close should be idempotent
await server.close();

expect(closeCount).toBe(1);
});

test('should process remaining messages before closing on stdin EOF', async () => {
const server = new StdioServerTransport(input, output);
server.onerror = error => {
throw error;
};

const messages: JSONRPCMessage[] = [];
server.onmessage = message => {
messages.push(message);
};

await server.start();

const message: JSONRPCMessage = {
jsonrpc: '2.0',
id: 1,
method: 'ping'
};

// Push a message followed by EOF
input.push(serializeMessage(message));
input.push(null);

// Wait for processing
await new Promise(resolve => setTimeout(resolve, 50));

// The message should have been processed before close
expect(messages).toEqual([message]);
});
Loading