diff --git a/src/acp.test.ts b/src/acp.test.ts index c76bcb4..b1ac9e4 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -1013,6 +1013,49 @@ describe("Connection", () => { } } + it("propagates input stream errors through ndJsonStream", async () => { + const inputStream = new ReadableStream({ + start(controller) { + // Simulate a process crash after partial data + controller.error(new Error("process exited with code 1")); + }, + }); + const outputStream = new WritableStream(); + + const connection = new ClientSideConnection( + () => new MinimalTestClient(), + ndJsonStream(outputStream, inputStream), + ); + + await expect(connection.closed).resolves.toBeUndefined(); + expect(connection.signal.aborted).toBe(true); + }); + + it("rejects pending requests when input stream errors via ndJsonStream", async () => { + let errorController!: ReadableStreamDefaultController; + + const inputStream = new ReadableStream({ + start(controller) { + errorController = controller; + }, + }); + const outputStream = new WritableStream(); + + const connection = new ClientSideConnection( + () => new MinimalTestClient(), + ndJsonStream(outputStream, inputStream), + ); + + const requestPromise = connection.newSession({ + cwd: "/test", + mcpServers: [], + }); + + errorController.error(new Error("process exited with code 1")); + + await expect(requestPromise).rejects.toThrow("process exited with code 1"); + }); + it("rejects pending requests when the stream errors", async () => { let readableController!: ReadableStreamDefaultController; diff --git a/src/stream.ts b/src/stream.ts index d7f08c7..0f63602 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -63,10 +63,13 @@ export function ndJsonStream( } } } + } catch (err) { + controller.error(err); + return; } finally { reader.releaseLock(); - controller.close(); } + controller.close(); }, });