diff --git a/src/middleware/web-incoming.ts b/src/middleware/web-incoming.ts index b82a142..cb9e574 100644 --- a/src/middleware/web-incoming.ts +++ b/src/middleware/web-incoming.ts @@ -270,6 +270,20 @@ export const stream = defineProxyMiddleware((req, res, options, server, head, ca res.on("close", function () { proxyRes.destroy(); }); + proxyRes.on("close", function () { + if (!proxyRes.complete && !res.destroyed) { + res.destroy(); + } + }); + proxyRes.on("error", function (err) { + if (!res.destroyed) { + res.destroy(err); + } + + if (server.listenerCount("error") > 0) { + server.emit("error", err, req, res, currentUrl); + } + }); proxyRes.on("end", function () { if (server) { server.emit("end", req, res, proxyRes); diff --git a/test/http-proxy.test.ts b/test/http-proxy.test.ts index d5b8c77..acabfba 100644 --- a/test/http-proxy.test.ts +++ b/test/http-proxy.test.ts @@ -117,6 +117,72 @@ describe("http-proxy", () => { await promise; }); + it("should close downstream SSE stream when upstream aborts", async () => { + const source = http.createServer((_, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.write(":ok\n\n"); + + setTimeout(() => { + res.socket?.destroy(); + }, 20); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + let gotFirstChunk = false; + let requestError: Error | undefined; + + const finish = () => { + source.close(); + proxy.close(resolve); + }; + + const timeout = setTimeout(() => { + requestError = new Error("Timed out waiting for downstream SSE close"); + finish(); + }, 1000); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + (res) => { + res.on("data", (chunk) => { + if (chunk.toString("utf8").includes(":ok")) { + gotFirstChunk = true; + } + }); + + res.once("close", () => { + clearTimeout(timeout); + finish(); + }); + }, + ) + .on("error", (error) => { + clearTimeout(timeout); + requestError = error; + finish(); + }) + .end(); + + await promise; + expect(requestError).toBeUndefined(); + expect(gotFirstChunk).toBe(true); + }); + it("should make the request on pipe and finish it", async () => { const source = http.createServer(); const sourcePort = await listenOn(source);