From e743740b6d27bd6df1baa9f966121e5f1d844fff Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 10 Dec 2025 06:35:55 -0800 Subject: [PATCH 1/3] proxy crash with no abort controller --- src/examples/client/disconnectTestClient.ts | 34 +++++++++ src/examples/server/disconnectTestServer.ts | 78 +++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 src/examples/client/disconnectTestClient.ts create mode 100644 src/examples/server/disconnectTestServer.ts diff --git a/src/examples/client/disconnectTestClient.ts b/src/examples/client/disconnectTestClient.ts new file mode 100644 index 000000000..0f8254672 --- /dev/null +++ b/src/examples/client/disconnectTestClient.ts @@ -0,0 +1,34 @@ +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { CallToolResultSchema } from '../../types.js'; + +const client = new Client({ name: 'disconnect-test-client', version: '1.0.0' }); +const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp')); + +let progressCount = 0; + +client.onerror = e => console.error('Client error:', e); + +(async () => { + await client.connect(transport); + console.log('Connected, calling slow-task with steps=10...'); + + try { + const result = await client.request( + { method: 'tools/call', params: { name: 'slow-task', arguments: { steps: 10 } } }, + CallToolResultSchema, + { + onprogress: progress => { + console.log(`Progress ${++progressCount}: ${progress.progress}/${progress.total}`); + if (progressCount === 5) { + console.log('Abruptly killing process after 5 progress updates...'); + process.exit(1); + } + } + } + ); + console.log('Result:', result); + } catch (e) { + console.log('Request aborted (expected):', (e as Error).message); + } +})(); diff --git a/src/examples/server/disconnectTestServer.ts b/src/examples/server/disconnectTestServer.ts new file mode 100644 index 000000000..e65b66567 --- /dev/null +++ b/src/examples/server/disconnectTestServer.ts @@ -0,0 +1,78 @@ +import { Request, Response } from 'express'; +import { McpServer } from '../../server/mcp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { createMcpExpressApp } from '../../server/express.js'; +import { CallToolResult, isInitializeRequest } from '../../types.js'; +import { randomUUID } from 'node:crypto'; +import * as z from 'zod/v4'; + +const server = new McpServer( + { name: 'disconnect-test', version: '1.0.0' }, + { capabilities: { logging: {} } } +); + +server.tool('slow-task', 'Task with progress notifications', { steps: z.number() }, + async ({ steps }, extra): Promise => { + for (let i = 1; i <= steps; i++) { + console.log(`Sending notification ${i}/${steps}`); + + // SIMULATING A PROXY RELAY: onprogress forwards with same progress token + const progressToken = extra._meta?.progressToken; + if (progressToken !== undefined) { + server.server.notification( + { + method: 'notifications/progress', + params: { progressToken, progress: i, total: steps } + }, + { relatedRequestId: extra.requestId } + ); + } + + await new Promise(r => setTimeout(r, 1000)); + } + return { content: [{ type: 'text', text: 'SUCCESS' }] }; + } +); + +const app = createMcpExpressApp(); +const transports: Record = {}; + +app.post('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport = sessionId ? transports[sessionId] : undefined; + + if (!transport && isInitializeRequest(req.body)) { + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: id => { + console.log(`Session initialized: ${id}`); + transports[id] = transport!; + } + }); + transport.onclose = () => { + console.log(`Transport closed for session: ${transport!.sessionId}`); + delete transports[transport!.sessionId!]; + }; + await server.connect(transport); + } + + if (transport) { + // Track if response finished normally + let finished = false; + res.on('finish', () => { finished = true; }); + res.on('close', () => { + if (!finished) { + console.log('Client disconnected - closing transport to reclaim resources'); + transport!.close(); + } + }); + await transport.handleRequest(req, res, req.body); + } else { + res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad request' }, id: null }); + } +}); + +// Return 405 for GET - we don't support standalone SSE stream +app.get('/mcp', (_req, res) => res.status(405).send('Method not allowed')); + +app.listen(3000, () => console.log('Disconnect test server listening on :3000')); From 3d4455cf078aed0dd182d58e6d5fd8201fd9201d Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 10 Dec 2025 07:25:19 -0800 Subject: [PATCH 2/3] add error handling for onprogress disconnects (stop server crashing) --- src/examples/server/disconnectTestServer.ts | 75 ++++++++++++++++----- src/shared/protocol.ts | 7 +- 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/src/examples/server/disconnectTestServer.ts b/src/examples/server/disconnectTestServer.ts index e65b66567..e674255ff 100644 --- a/src/examples/server/disconnectTestServer.ts +++ b/src/examples/server/disconnectTestServer.ts @@ -6,36 +6,68 @@ import { CallToolResult, isInitializeRequest } from '../../types.js'; import { randomUUID } from 'node:crypto'; import * as z from 'zod/v4'; +// Usage: npx tsx disconnectTestServer.ts [--abort] +const useAbort = process.argv.includes('--abort'); +console.log(`Abort on disconnect: ${useAbort ? 'enabled' : 'disabled'}`); + const server = new McpServer( { name: 'disconnect-test', version: '1.0.0' }, { capabilities: { logging: {} } } ); -server.tool('slow-task', 'Task with progress notifications', { steps: z.number() }, +server.server.onerror = err => console.log('[onerror]', err.message); + +server.registerTool( + 'slow-task', + { + description: 'Task with progress notifications', + inputSchema: { steps: z.number() } + }, async ({ steps }, extra): Promise => { - for (let i = 1; i <= steps; i++) { - console.log(`Sending notification ${i}/${steps}`); + // SIMULATING A PROXY: create abort controller for "upstream" request + const abortController = new AbortController(); + if (extra.sessionId) { + sessionAbortControllers[extra.sessionId] = abortController; + } - // SIMULATING A PROXY RELAY: onprogress forwards with same progress token - const progressToken = extra._meta?.progressToken; - if (progressToken !== undefined) { - server.server.notification( - { - method: 'notifications/progress', - params: { progressToken, progress: i, total: steps } - }, - { relatedRequestId: extra.requestId } - ); - } + try { + for (let i = 1; i <= steps; i++) { + // Check if aborted before each step + if (abortController.signal.aborted) { + console.log('Upstream request aborted - stopping work'); + break; + } - await new Promise(r => setTimeout(r, 1000)); + console.log(`Sending notification ${i}/${steps}`); + + // SIMULATING A PROXY RELAY: onprogress forwards with same progress token + const progressToken = extra._meta?.progressToken; + if (progressToken !== undefined) { + server.server.notification( + { + method: 'notifications/progress', + params: { progressToken, progress: i, total: steps } + }, + { relatedRequestId: extra.requestId } + ); + } + + await new Promise(r => setTimeout(r, 1000)); + } + return { content: [{ type: 'text', text: 'SUCCESS' }] }; + } finally { + // Cleanup abort controller + if (extra.sessionId) { + delete sessionAbortControllers[extra.sessionId]; + } } - return { content: [{ type: 'text', text: 'SUCCESS' }] }; } ); const app = createMcpExpressApp(); const transports: Record = {}; +// SIMULATING A PROXY: track abort controllers for upstream requests per session +const sessionAbortControllers: Record = {}; app.post('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -62,7 +94,16 @@ app.post('/mcp', async (req: Request, res: Response) => { res.on('finish', () => { finished = true; }); res.on('close', () => { if (!finished) { - console.log('Client disconnected - closing transport to reclaim resources'); + console.log('Client disconnected unexpectedly'); + if (useAbort) { + // Abort any in-flight upstream requests for this session + const abortController = sessionAbortControllers[transport!.sessionId!]; + if (abortController) { + console.log('Aborting upstream request'); + abortController.abort(); + delete sessionAbortControllers[transport!.sessionId!]; + } + } transport!.close(); } }); diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index aa242a647..ba738dea3 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -854,7 +854,9 @@ export abstract class Protocol handler(params)) + .catch(error => this._onerror(new Error(`Uncaught error in progress handler: ${error}`))); } private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void { @@ -1277,7 +1279,8 @@ export abstract class Protocol { if (!this._transport) { - throw new Error('Not connected'); + this._onerror(new Error('Not connected')); + return; } this.assertNotificationCapability(notification.method); From 8a4103126e6774ae56106c184b6ddcd78e6934cb Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 10 Dec 2025 07:39:30 -0800 Subject: [PATCH 3/3] lint on test program --- src/examples/server/disconnectTestServer.ts | 9 ++++----- src/shared/protocol.ts | 4 +--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/examples/server/disconnectTestServer.ts b/src/examples/server/disconnectTestServer.ts index e674255ff..b452c418a 100644 --- a/src/examples/server/disconnectTestServer.ts +++ b/src/examples/server/disconnectTestServer.ts @@ -10,10 +10,7 @@ import * as z from 'zod/v4'; const useAbort = process.argv.includes('--abort'); console.log(`Abort on disconnect: ${useAbort ? 'enabled' : 'disabled'}`); -const server = new McpServer( - { name: 'disconnect-test', version: '1.0.0' }, - { capabilities: { logging: {} } } -); +const server = new McpServer({ name: 'disconnect-test', version: '1.0.0' }, { capabilities: { logging: {} } }); server.server.onerror = err => console.log('[onerror]', err.message); @@ -91,7 +88,9 @@ app.post('/mcp', async (req: Request, res: Response) => { if (transport) { // Track if response finished normally let finished = false; - res.on('finish', () => { finished = true; }); + res.on('finish', () => { + finished = true; + }); res.on('close', () => { if (!finished) { console.log('Client disconnected unexpectedly'); diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index ba738dea3..a690ccb5c 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -854,9 +854,7 @@ export abstract class Protocol handler(params)) - .catch(error => this._onerror(new Error(`Uncaught error in progress handler: ${error}`))); + handler(params); } private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void {