From 04ec458cf52002da2395cfbbdd18f1b296c8a092 Mon Sep 17 00:00:00 2001 From: Gabor Koos Date: Sat, 21 Mar 2026 03:19:33 +0000 Subject: [PATCH 1/3] fix(web-incoming): close downstream stream when upstream SSE aborts --- src/middleware/web-incoming.ts | 602 ++++++------- test/http-proxy.test.ts | 1476 +++++++++++++++++--------------- 2 files changed, 1079 insertions(+), 999 deletions(-) diff --git a/src/middleware/web-incoming.ts b/src/middleware/web-incoming.ts index b82a142..fbffc6f 100644 --- a/src/middleware/web-incoming.ts +++ b/src/middleware/web-incoming.ts @@ -1,294 +1,308 @@ -import type { ClientRequest, IncomingMessage, ServerResponse } from "node:http"; -import type { ProxyTargetDetailed } from "../types.ts"; -import nodeHTTP from "node:http"; -import nodeHTTPS from "node:https"; -import { getPort, hasEncryptedConnection, setupOutgoing } from "../_utils.ts"; -import { webOutgoingMiddleware } from "./web-outgoing.ts"; -import { type ProxyMiddleware, defineProxyMiddleware } from "./_utils.ts"; - -const nativeAgents = { http: nodeHTTP, https: nodeHTTPS }; -const redirectStatuses = new Set([301, 302, 303, 307, 308]); - -/** - * Sets `content-length` to '0' if request is of DELETE type. - */ -export const deleteLength = defineProxyMiddleware((req) => { - if ((req.method === "DELETE" || req.method === "OPTIONS") && !req.headers["content-length"]) { - req.headers["content-length"] = "0"; - delete req.headers["transfer-encoding"]; - } -}); - -/** - * Sets timeout in request socket if it was specified in options. - */ -export const timeout = defineProxyMiddleware((req, res, options) => { - if (options.timeout) { - req.socket.setTimeout(options.timeout, () => { - req.socket.destroy(); - }); - } -}); - -/** - * Sets `x-forwarded-*` headers if specified in config. - */ -export const XHeaders = defineProxyMiddleware((req, res, options) => { - if (!options.xfwd) { - return; - } - - const encrypted = (req as any).isSpdy || hasEncryptedConnection(req); - const values = { - for: req.connection.remoteAddress || req.socket.remoteAddress, - port: getPort(req), - proto: encrypted ? "https" : "http", - }; - - for (const header of ["for", "port", "proto"] as const) { - req.headers["x-forwarded-" + header] = - (req.headers["x-forwarded-" + header] || "") + - (req.headers["x-forwarded-" + header] ? "," : "") + - values[header]; - } - - req.headers["x-forwarded-host"] = req.headers["x-forwarded-host"] || req.headers.host || ""; -}); - -/** - * Does the actual proxying. If `forward` is enabled fires up - * a ForwardStream, same happens for ProxyStream. The request - * just dies otherwise. - * - */ -export const stream = defineProxyMiddleware((req, res, options, server, head, callback) => { - // And we begin! - server.emit("start", req, res, options.target || options.forward); - - const http = nativeAgents.http; - const https = nativeAgents.https; - - const maxRedirects = - typeof options.followRedirects === "number" - ? options.followRedirects - : options.followRedirects - ? 5 - : 0; - - if (options.forward) { - // If forward enable, so just pipe the request - const forwardReq = (options.forward.protocol === "https:" ? https : http).request( - setupOutgoing(options.ssl || {}, options, req, "forward"), - ); - - // error handler (e.g. ECONNRESET, ECONNREFUSED) - // Handle errors on incoming request as well as it makes sense to - const forwardError = createErrorHandler(forwardReq, options.forward); - req.on("error", forwardError); - forwardReq.on("error", forwardError); - - (options.buffer || req).pipe(forwardReq); - if (!options.target) { - res.end(); - return; - } - } - - // Request initalization - const proxyReq = (options.target.protocol === "https:" ? https : http).request( - setupOutgoing(options.ssl || {}, options, req), - ); - - // Enable developers to modify the proxyReq before headers are sent - proxyReq.on("socket", (_socket) => { - if (server && !proxyReq.getHeader("expect")) { - server.emit("proxyReq", proxyReq, req, res, options); - } - }); - - // allow outgoing socket to timeout so that we could - // show an error page at the initial request - if (options.proxyTimeout) { - proxyReq.setTimeout(options.proxyTimeout, function () { - proxyReq.abort(); - }); - } - - // Ensure we abort proxy if request is aborted - req.on("aborted", function () { - proxyReq.abort(); - }); - - // Abort proxy request when client disconnects - res.on("close", function () { - if (!res.writableFinished) { - proxyReq.destroy(); - } - }); - - // handle errors in proxy and incoming request, just like for forward proxy - const proxyError = createErrorHandler(proxyReq, options.target); - req.on("error", proxyError); - proxyReq.on("error", proxyError); - - function createErrorHandler(proxyReq: ClientRequest, url: URL | ProxyTargetDetailed) { - return function proxyError(err: Error) { - if (req.socket.destroyed && (err as NodeJS.ErrnoException).code === "ECONNRESET") { - server.emit("econnreset", err, req, res, url); - return proxyReq.abort(); - } - - if (callback) { - callback(err, req, res, url); - } else { - server.emit("error", err, req, res, url); - } - }; - } - - // Buffer request body when following redirects (needed for 307/308 replay) - let bodyBuffer: Buffer | undefined; - if (maxRedirects > 0) { - const chunks: Buffer[] = []; - const source = options.buffer || req; - source.on("data", (chunk: Buffer) => { - chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); - proxyReq.write(chunk); - }); - source.on("end", () => { - bodyBuffer = Buffer.concat(chunks); - proxyReq.end(); - }); - source.on("error", (err: Error) => { - proxyReq.destroy(err); - }); - } else { - (options.buffer || req).pipe(proxyReq); - } - - function handleResponse(proxyRes: IncomingMessage, redirectCount: number, currentUrl: URL) { - const statusCode = proxyRes.statusCode!; - - if ( - maxRedirects > 0 && - redirectStatuses.has(statusCode) && - redirectCount < maxRedirects && - proxyRes.headers.location - ) { - // Drain the redirect response body - proxyRes.resume(); - - const location = new URL(proxyRes.headers.location, currentUrl); - - // 301/302/303 → GET without body; 307/308 → preserve method and body - const preserveMethod = statusCode === 307 || statusCode === 308; - const redirectMethod = preserveMethod ? req.method || "GET" : "GET"; - - const isHTTPS = location.protocol === "https:"; - const agent = isHTTPS ? https : http; - - // Build headers from original request - const redirectHeaders: Record = { ...req.headers }; - if (options.headers) { - Object.assign(redirectHeaders, options.headers); - } - redirectHeaders.host = location.host; - - // Strip sensitive headers on cross-origin redirects - if (location.host !== currentUrl.host) { - delete redirectHeaders.authorization; - delete redirectHeaders.cookie; - } - - // Drop body-related headers when method changes to GET - if (!preserveMethod) { - delete redirectHeaders["content-length"]; - delete redirectHeaders["content-type"]; - delete redirectHeaders["transfer-encoding"]; - } - - const redirectOpts: nodeHTTP.RequestOptions = { - hostname: location.hostname, - port: location.port || (isHTTPS ? 443 : 80), - path: location.pathname + location.search, - method: redirectMethod, - headers: redirectHeaders, - agent: options.agent || false, - }; - - if (isHTTPS) { - (redirectOpts as nodeHTTPS.RequestOptions).rejectUnauthorized = - options.secure === undefined ? true : options.secure; - } - - const redirectReq = agent.request(redirectOpts); - - if (server && !redirectReq.getHeader("expect")) { - server.emit("proxyReq", redirectReq, req, res, options); - } - - if (options.proxyTimeout) { - redirectReq.setTimeout(options.proxyTimeout, () => { - redirectReq.abort(); - }); - } - - const redirectError = createErrorHandler(redirectReq, location); - redirectReq.on("error", redirectError); - - redirectReq.on("response", (nextRes: IncomingMessage) => { - handleResponse(nextRes, redirectCount + 1, location); - }); - - if (preserveMethod && bodyBuffer && bodyBuffer.length > 0) { - redirectReq.end(bodyBuffer); - } else { - redirectReq.end(); - } - - return; - } - - // Non-redirect response (or max redirects exceeded) - if (server) { - server.emit("proxyRes", proxyRes, req, res); - } - - if (!res.headersSent && !options.selfHandleResponse) { - for (const pass of webOutgoingMiddleware) { - if (pass(req, res, proxyRes, options)) { - break; - } - } - } - - if (res.finished) { - if (server) { - server.emit("end", req, res, proxyRes); - } - } else { - res.on("close", function () { - proxyRes.destroy(); - }); - proxyRes.on("end", function () { - if (server) { - server.emit("end", req, res, proxyRes); - } - }); - if (!options.selfHandleResponse) { - proxyRes.pipe(res); - } - } - } - - proxyReq.on("response", function (proxyRes) { - handleResponse(proxyRes, 0, options.target as URL); - }); -}); - -export const webIncomingMiddleware: readonly ProxyMiddleware[] = [ - deleteLength, - timeout, - XHeaders, - stream, -] as const; +import type { ClientRequest, IncomingMessage, ServerResponse } from "node:http"; +import type { ProxyTargetDetailed } from "../types.ts"; +import nodeHTTP from "node:http"; +import nodeHTTPS from "node:https"; +import { getPort, hasEncryptedConnection, setupOutgoing } from "../_utils.ts"; +import { webOutgoingMiddleware } from "./web-outgoing.ts"; +import { type ProxyMiddleware, defineProxyMiddleware } from "./_utils.ts"; + +const nativeAgents = { http: nodeHTTP, https: nodeHTTPS }; +const redirectStatuses = new Set([301, 302, 303, 307, 308]); + +/** + * Sets `content-length` to '0' if request is of DELETE type. + */ +export const deleteLength = defineProxyMiddleware((req) => { + if ((req.method === "DELETE" || req.method === "OPTIONS") && !req.headers["content-length"]) { + req.headers["content-length"] = "0"; + delete req.headers["transfer-encoding"]; + } +}); + +/** + * Sets timeout in request socket if it was specified in options. + */ +export const timeout = defineProxyMiddleware((req, res, options) => { + if (options.timeout) { + req.socket.setTimeout(options.timeout, () => { + req.socket.destroy(); + }); + } +}); + +/** + * Sets `x-forwarded-*` headers if specified in config. + */ +export const XHeaders = defineProxyMiddleware((req, res, options) => { + if (!options.xfwd) { + return; + } + + const encrypted = (req as any).isSpdy || hasEncryptedConnection(req); + const values = { + for: req.connection.remoteAddress || req.socket.remoteAddress, + port: getPort(req), + proto: encrypted ? "https" : "http", + }; + + for (const header of ["for", "port", "proto"] as const) { + req.headers["x-forwarded-" + header] = + (req.headers["x-forwarded-" + header] || "") + + (req.headers["x-forwarded-" + header] ? "," : "") + + values[header]; + } + + req.headers["x-forwarded-host"] = req.headers["x-forwarded-host"] || req.headers.host || ""; +}); + +/** + * Does the actual proxying. If `forward` is enabled fires up + * a ForwardStream, same happens for ProxyStream. The request + * just dies otherwise. + * + */ +export const stream = defineProxyMiddleware((req, res, options, server, head, callback) => { + // And we begin! + server.emit("start", req, res, options.target || options.forward); + + const http = nativeAgents.http; + const https = nativeAgents.https; + + const maxRedirects = + typeof options.followRedirects === "number" + ? options.followRedirects + : options.followRedirects + ? 5 + : 0; + + if (options.forward) { + // If forward enable, so just pipe the request + const forwardReq = (options.forward.protocol === "https:" ? https : http).request( + setupOutgoing(options.ssl || {}, options, req, "forward"), + ); + + // error handler (e.g. ECONNRESET, ECONNREFUSED) + // Handle errors on incoming request as well as it makes sense to + const forwardError = createErrorHandler(forwardReq, options.forward); + req.on("error", forwardError); + forwardReq.on("error", forwardError); + + (options.buffer || req).pipe(forwardReq); + if (!options.target) { + res.end(); + return; + } + } + + // Request initalization + const proxyReq = (options.target.protocol === "https:" ? https : http).request( + setupOutgoing(options.ssl || {}, options, req), + ); + + // Enable developers to modify the proxyReq before headers are sent + proxyReq.on("socket", (_socket) => { + if (server && !proxyReq.getHeader("expect")) { + server.emit("proxyReq", proxyReq, req, res, options); + } + }); + + // allow outgoing socket to timeout so that we could + // show an error page at the initial request + if (options.proxyTimeout) { + proxyReq.setTimeout(options.proxyTimeout, function () { + proxyReq.abort(); + }); + } + + // Ensure we abort proxy if request is aborted + req.on("aborted", function () { + proxyReq.abort(); + }); + + // Abort proxy request when client disconnects + res.on("close", function () { + if (!res.writableFinished) { + proxyReq.destroy(); + } + }); + + // handle errors in proxy and incoming request, just like for forward proxy + const proxyError = createErrorHandler(proxyReq, options.target); + req.on("error", proxyError); + proxyReq.on("error", proxyError); + + function createErrorHandler(proxyReq: ClientRequest, url: URL | ProxyTargetDetailed) { + return function proxyError(err: Error) { + if (req.socket.destroyed && (err as NodeJS.ErrnoException).code === "ECONNRESET") { + server.emit("econnreset", err, req, res, url); + return proxyReq.abort(); + } + + if (callback) { + callback(err, req, res, url); + } else { + server.emit("error", err, req, res, url); + } + }; + } + + // Buffer request body when following redirects (needed for 307/308 replay) + let bodyBuffer: Buffer | undefined; + if (maxRedirects > 0) { + const chunks: Buffer[] = []; + const source = options.buffer || req; + source.on("data", (chunk: Buffer) => { + chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); + proxyReq.write(chunk); + }); + source.on("end", () => { + bodyBuffer = Buffer.concat(chunks); + proxyReq.end(); + }); + source.on("error", (err: Error) => { + proxyReq.destroy(err); + }); + } else { + (options.buffer || req).pipe(proxyReq); + } + + function handleResponse(proxyRes: IncomingMessage, redirectCount: number, currentUrl: URL) { + const statusCode = proxyRes.statusCode!; + + if ( + maxRedirects > 0 && + redirectStatuses.has(statusCode) && + redirectCount < maxRedirects && + proxyRes.headers.location + ) { + // Drain the redirect response body + proxyRes.resume(); + + const location = new URL(proxyRes.headers.location, currentUrl); + + // 301/302/303 → GET without body; 307/308 → preserve method and body + const preserveMethod = statusCode === 307 || statusCode === 308; + const redirectMethod = preserveMethod ? req.method || "GET" : "GET"; + + const isHTTPS = location.protocol === "https:"; + const agent = isHTTPS ? https : http; + + // Build headers from original request + const redirectHeaders: Record = { ...req.headers }; + if (options.headers) { + Object.assign(redirectHeaders, options.headers); + } + redirectHeaders.host = location.host; + + // Strip sensitive headers on cross-origin redirects + if (location.host !== currentUrl.host) { + delete redirectHeaders.authorization; + delete redirectHeaders.cookie; + } + + // Drop body-related headers when method changes to GET + if (!preserveMethod) { + delete redirectHeaders["content-length"]; + delete redirectHeaders["content-type"]; + delete redirectHeaders["transfer-encoding"]; + } + + const redirectOpts: nodeHTTP.RequestOptions = { + hostname: location.hostname, + port: location.port || (isHTTPS ? 443 : 80), + path: location.pathname + location.search, + method: redirectMethod, + headers: redirectHeaders, + agent: options.agent || false, + }; + + if (isHTTPS) { + (redirectOpts as nodeHTTPS.RequestOptions).rejectUnauthorized = + options.secure === undefined ? true : options.secure; + } + + const redirectReq = agent.request(redirectOpts); + + if (server && !redirectReq.getHeader("expect")) { + server.emit("proxyReq", redirectReq, req, res, options); + } + + if (options.proxyTimeout) { + redirectReq.setTimeout(options.proxyTimeout, () => { + redirectReq.abort(); + }); + } + + const redirectError = createErrorHandler(redirectReq, location); + redirectReq.on("error", redirectError); + + redirectReq.on("response", (nextRes: IncomingMessage) => { + handleResponse(nextRes, redirectCount + 1, location); + }); + + if (preserveMethod && bodyBuffer && bodyBuffer.length > 0) { + redirectReq.end(bodyBuffer); + } else { + redirectReq.end(); + } + + return; + } + + // Non-redirect response (or max redirects exceeded) + if (server) { + server.emit("proxyRes", proxyRes, req, res); + } + + if (!res.headersSent && !options.selfHandleResponse) { + for (const pass of webOutgoingMiddleware) { + if (pass(req, res, proxyRes, options)) { + break; + } + } + } + + if (res.finished) { + if (server) { + server.emit("end", req, res, proxyRes); + } + } else { + res.on("close", function () { + proxyRes.destroy(); + }); + proxyRes.on("aborted", function () { + if (!res.destroyed) { + res.destroy(); + } + }); + proxyRes.on("error", function (err) { + if (!res.destroyed) { + res.destroy(err); + } + + if (server.listenerCount("error") > 0) { + server.emit("error", err, req, res, currentUrl); + } + }); + proxyRes.on("end", function () { + if (server) { + server.emit("end", req, res, proxyRes); + } + }); + if (!options.selfHandleResponse) { + proxyRes.pipe(res); + } + } + } + + proxyReq.on("response", function (proxyRes) { + handleResponse(proxyRes, 0, options.target as URL); + }); +}); + +export const webIncomingMiddleware: readonly ProxyMiddleware[] = [ + deleteLength, + timeout, + XHeaders, + stream, +] as const; diff --git a/test/http-proxy.test.ts b/test/http-proxy.test.ts index d5b8c77..e4ca9df 100644 --- a/test/http-proxy.test.ts +++ b/test/http-proxy.test.ts @@ -1,705 +1,771 @@ -import { describe, it, expect } from "vitest"; -import * as httpProxy from "../src/index.ts"; -import http from "node:http"; -import net from "node:net"; -import * as ws from "ws"; -import * as io from "socket.io"; -import SSE from "sse"; -import ioClient from "socket.io-client"; -import type { AddressInfo } from "node:net"; - -// Source: https://github.com/http-party/node-http-proxy/blob/master/test/lib-http-proxy-test.js - -function listenOn(server: http.Server | net.Server): Promise { - return new Promise((resolve, reject) => { - server.once("error", reject); - server.listen(0, "127.0.0.1", () => { - resolve((server.address() as AddressInfo).port); - }); - }); -} - -function proxyListen(proxy: ReturnType): Promise { - return new Promise((resolve, reject) => { - proxy.listen(0, "127.0.0.1"); - const server = (proxy as any)._server as net.Server; - server.once("error", reject); - server.once("listening", () => { - resolve((server.address() as AddressInfo).port); - }); - }); -} - -describe("http-proxy", () => { - describe("#createProxyServer", () => { - it.skip("should throw without options", () => { - let error; - try { - httpProxy.createProxyServer(); - } catch (error_) { - error = error_; - } - - expect(error).to.toBeInstanceOf(Error); - }); - - it("should return an object otherwise", () => { - const obj = httpProxy.createProxyServer({ - target: "http://www.google.com:80", - }); - - expect(obj.web).to.toBeInstanceOf(Function); - expect(obj.ws).to.instanceOf(Function); - expect(obj.listen).to.instanceOf(Function); - }); - }); - - describe("#createProxyServer with forward options and using web-incoming passes", () => { - it("should pipe the request using web-incoming#stream method", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - forward: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", (req, res) => { - expect(req.method).to.eql("GET"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).toBe(proxyPort); - source.close(); - proxy.close(resolve); - }); - - http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); - - await promise; - }); - }); - - describe("#createProxyServer using the web-incoming passes", () => { - it("should proxy sse", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const sse = new SSE(source, { path: "/" }); - sse.on("connection", (client) => { - client.send("Hello over SSE"); - client.close(); - }); - - const options = { - hostname: "127.0.0.1", - port: proxyPort, - }; - - const { promise, resolve } = Promise.withResolvers(); - const req = http - .request(options, (res) => { - let streamData = ""; - res.on("data", (chunk) => { - streamData += chunk.toString("utf8"); - }); - res.on("end", () => { - expect(streamData).to.equal(":ok\n\ndata: Hello over SSE\n\n"); - source.close(); - proxy.close(resolve); - }); - }) - .end(); - - await promise; - }); - - it("should make the request on pipe and finish it", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", (req, res) => { - expect(req.method).to.eql("POST"); - expect(req.headers["x-forwarded-for"]).to.eql("127.0.0.1"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); - source.close(); - proxy.close(() => {}); - resolve(); - }); - - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "POST", - headers: { - "x-forwarded-for": "127.0.0.1", - }, - }, - () => {}, - ) - .end(); - - await promise; - }); - }); - - describe("#createProxyServer using the web-incoming passes", () => { - it("should make the request, handle response and finish it", async () => { - const source = http.createServer((req, res) => { - expect(req.method).to.eql("GET"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); - res.writeHead(200, { "Content-Type": "text/plain" }); - res.end("Hello from " + (source.address()! as any).port); - }); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - preserveHeaderKeyCase: true, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - (res) => { - expect(res.statusCode).to.eql(200); - expect(res.headers["content-type"]).to.eql("text/plain"); - if (res.rawHeaders != undefined) { - expect(res.rawHeaders.indexOf("Content-Type")).not.to.eql(-1); - expect(res.rawHeaders.indexOf("text/plain")).not.to.eql(-1); - } - - res.on("data", (data) => { - expect(data.toString()).to.eql("Hello from " + sourcePort); - }); - - res.on("end", () => { - source.close(); - proxy.close(resolve); - }); - }, - ) - .end(); - await promise; - }); - }); - - describe("#createProxyServer() method with error response", () => { - it("should make the request and emit the error event", async () => { - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:1", - }); - - const { promise, resolve } = Promise.withResolvers(); - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNREFUSED"); - proxy.close(() => {}); - resolve(); - }); - - const proxyPort = await proxyListen(proxy); - - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - () => {}, - ) - .end(); - - await promise.catch(() => {}); - }); - }); - - describe("#createProxyServer setting the correct timeout value", () => { - it("should hang up the socket at the timeout", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const source = http.createServer(function (_req, res) { - setTimeout(() => { - res.end("At this point the socket should be closed"); - }, 5); - }); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - timeout: 3, - }); - const proxyPort = await proxyListen(proxy); - - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - }); - - const testReq = http.request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - () => {}, - ); - - testReq.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - proxy.close(() => {}); - source.close(); - resolve(); - }); - - testReq.end(); - await promise; - }); - }); - - describe("#createProxyServer with xfwd option", () => { - it("should not throw on empty http host header", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - forward: "http://127.0.0.1:" + sourcePort, - xfwd: true, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", function (req, _res) { - expect(req.method).to.eql("GET"); - // Host header is forwarded from the original request (not changed to source) - expect(req.headers["x-forwarded-for"]).toBeDefined(); - source.close(); - proxy.close(resolve); - }); - - const socket = net.connect({ port: proxyPort }, () => { - socket.write("GET / HTTP/1.0\r\n\r\n"); - }); - - socket.on("data", () => { - socket.end(); - }); - - // Ignore socket errors during teardown (server may close before socket drains) - socket.on("error", () => {}); - - http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); - await promise; - }); - }); - - describe("#createProxyServer using the ws-incoming passes", () => { - it("should proxy the websockets stream", async () => { - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const { promise, resolve } = Promise.withResolvers(); - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("hello there"); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should emit error on proxy error", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const proxy = httpProxy.createProxyServer({ - // Note: we don't ever listen on this port - target: "ws://127.0.0.1:1", - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - let count = 0; - function maybe_done() { - count += 1; - if (count === 2) resolve(); - } - - client.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - maybe_done(); - }); - - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNREFUSED"); - proxyServer.close(() => {}); - maybe_done(); - }); - await promise; - }); - - it("should close client socket if upstream is closed before upgrade", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - server.on("upgrade", function (req, socket, head) { - const response = ["HTTP/1.1 404 Not Found", "Content-type: text/html", "", ""]; - socket.write(response.join("\r\n")); - socket.end(); - }); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - // note: we don't ever listen on this port - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - proxyServer.close(resolve); - }); - - await promise; - }); - - it("should proxy a socket.io stream", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const destiny = new io.Server(server); - - function startSocketIo() { - const client = ioClient("ws://127.0.0.1:" + proxyPort); - client.on("connect", () => { - client.emit("incoming", "hello there"); - }); - - client.on("outgoing", (data: any) => { - expect(data).toBe("Hello over websockets"); - client.disconnect(); - destiny.close(); - server.close(); - proxyServer.close(resolve); - }); - } - startSocketIo(); - - destiny.on("connection", (socket) => { - socket.on("incoming", (msg) => { - expect(msg).toBe("hello there"); - socket.emit("outgoing", "Hello over websockets"); - }); - }); - - await promise; - }); - - it("should emit open and close events when socket.io client connects and disconnects", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const destiny = new io.Server(server); - - function startSocketIo() { - const client = ioClient("ws://127.0.0.1:" + proxyPort); - client.on("connect", () => { - client.disconnect(); - }); - } - let count = 0; - - proxyServer.on("open", () => { - count += 1; - }); - - proxyServer.on("close", () => { - destiny.close(); - server.close(); - proxyServer.close(() => {}); - expect(count).toBe(1); - resolve(); - }); - - startSocketIo(); - await promise; - }); - - it("should pass all set-cookie headers to client", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("upgrade", (res) => { - expect(res.headers["set-cookie"]).toHaveLength(2); - }); - - client.on("open", () => { - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("headers", (headers) => { - headers.push("Set-Cookie: test1=test1", "Set-Cookie: test2=test2"); - }); - - await promise; - }); - - it("should detect a proxyReq event and modify headers", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - - proxy.on("proxyReqWs", function (proxyReq, req, socket, options, head) { - proxyReq.setHeader("X-Special-Proxy-Header", "foobar"); - }); - - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("message", (msg: any) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", function (socket, upgradeReq) { - expect(upgradeReq.headers["x-special-proxy-header"]).to.eql("foobar"); - - socket.on("message", (msg: any) => { - expect(msg.toString("utf8")).toBe("hello there"); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should forward frames with single frame payload (including on node 4.x)", async () => { - const { resolve, promise } = await Promise.withResolvers(); - const payload = Array.from({ length: 65_529 }).join("0"); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send(payload); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe(payload); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should forward continuation frames with big payload (including on node 4.x)", async () => { - const { promise, resolve } = Promise.withResolvers(); - const payload = Array.from({ length: 65_530 }).join("0"); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send(payload); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe(payload); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should not crash when client socket errors before upstream upgrade (issue #79)", async () => { - const { promise, resolve } = Promise.withResolvers(); - - // Backend that delays responding to the upgrade request - const server = http.createServer(); - server.on("upgrade", (_req, socket) => { - // Never respond — simulate a slow/hanging backend - socket.on("error", () => {}); - setTimeout(() => socket.destroy(), 500); - }); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - - // Intercept the ws stream pass to inject an error on the client socket - // before the upstream upgrade response arrives - proxy.before("ws", "", ((_req: any, socket: any) => { - // After the proxy sets up the upstream request but before the - // upgrade callback fires, simulate a client disconnect (ECONNRESET) - setTimeout(() => { - socket.destroy(new Error("read ECONNRESET")); - }, 50); - }) as any); - - const proxyPort = await proxyListen(proxy); - - proxy.on("error", () => { - // The error should be caught here, not crash the process - proxy.close(() => {}); - server.close(); - resolve(); - }); - - // Use a raw TCP socket to send a WebSocket upgrade request - const client = net.connect(proxyPort, "127.0.0.1", () => { - client.write( - "GET / HTTP/1.1\r\n" + - "Host: 127.0.0.1\r\n" + - "Upgrade: websocket\r\n" + - "Connection: Upgrade\r\n" + - "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + - "Sec-WebSocket-Version: 13\r\n" + - "\r\n", - ); - }); - client.on("error", () => {}); - - await promise; - }); - }); -}); +import { describe, it, expect } from "vitest"; +import * as httpProxy from "../src/index.ts"; +import http from "node:http"; +import net from "node:net"; +import * as ws from "ws"; +import * as io from "socket.io"; +import SSE from "sse"; +import ioClient from "socket.io-client"; +import type { AddressInfo } from "node:net"; + +// Source: https://github.com/http-party/node-http-proxy/blob/master/test/lib-http-proxy-test.js + +function listenOn(server: http.Server | net.Server): Promise { + return new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + resolve((server.address() as AddressInfo).port); + }); + }); +} + +function proxyListen(proxy: ReturnType): Promise { + return new Promise((resolve, reject) => { + proxy.listen(0, "127.0.0.1"); + const server = (proxy as any)._server as net.Server; + server.once("error", reject); + server.once("listening", () => { + resolve((server.address() as AddressInfo).port); + }); + }); +} + +describe("http-proxy", () => { + describe("#createProxyServer", () => { + it.skip("should throw without options", () => { + let error; + try { + httpProxy.createProxyServer(); + } catch (error_) { + error = error_; + } + + expect(error).to.toBeInstanceOf(Error); + }); + + it("should return an object otherwise", () => { + const obj = httpProxy.createProxyServer({ + target: "http://www.google.com:80", + }); + + expect(obj.web).to.toBeInstanceOf(Function); + expect(obj.ws).to.instanceOf(Function); + expect(obj.listen).to.instanceOf(Function); + }); + }); + + describe("#createProxyServer with forward options and using web-incoming passes", () => { + it("should pipe the request using web-incoming#stream method", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + forward: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", (req, res) => { + expect(req.method).to.eql("GET"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).toBe(proxyPort); + source.close(); + proxy.close(resolve); + }); + + http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); + + await promise; + }); + }); + + describe("#createProxyServer using the web-incoming passes", () => { + it("should proxy sse", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const sse = new SSE(source, { path: "/" }); + sse.on("connection", (client) => { + client.send("Hello over SSE"); + client.close(); + }); + + const options = { + hostname: "127.0.0.1", + port: proxyPort, + }; + + const { promise, resolve } = Promise.withResolvers(); + const req = http + .request(options, (res) => { + let streamData = ""; + res.on("data", (chunk) => { + streamData += chunk.toString("utf8"); + }); + res.on("end", () => { + expect(streamData).to.equal(":ok\n\ndata: Hello over SSE\n\n"); + source.close(); + proxy.close(resolve); + }); + }) + .end(); + + await promise; + }); + + it("should close downstream SSE stream when upstream aborts", async () => { + const source = http.createServer((_, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.write(":ok\n\n"); + + setTimeout(() => { + res.socket?.destroy(); + }, 20); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + let gotFirstChunk = false; + let requestError: Error | undefined; + + const finish = () => { + source.close(); + proxy.close(resolve); + }; + + const timeout = setTimeout(() => { + requestError = new Error("Timed out waiting for downstream SSE close"); + finish(); + }, 1000); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + (res) => { + res.on("data", (chunk) => { + if (chunk.toString("utf8").includes(":ok")) { + gotFirstChunk = true; + } + }); + + res.once("close", () => { + clearTimeout(timeout); + finish(); + }); + }, + ) + .on("error", (error) => { + clearTimeout(timeout); + requestError = error; + finish(); + }) + .end(); + + await promise; + expect(requestError).toBeUndefined(); + expect(gotFirstChunk).toBe(true); + }); + + it("should make the request on pipe and finish it", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", (req, res) => { + expect(req.method).to.eql("POST"); + expect(req.headers["x-forwarded-for"]).to.eql("127.0.0.1"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); + source.close(); + proxy.close(() => {}); + resolve(); + }); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "POST", + headers: { + "x-forwarded-for": "127.0.0.1", + }, + }, + () => {}, + ) + .end(); + + await promise; + }); + }); + + describe("#createProxyServer using the web-incoming passes", () => { + it("should make the request, handle response and finish it", async () => { + const source = http.createServer((req, res) => { + expect(req.method).to.eql("GET"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("Hello from " + (source.address()! as any).port); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + preserveHeaderKeyCase: true, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + (res) => { + expect(res.statusCode).to.eql(200); + expect(res.headers["content-type"]).to.eql("text/plain"); + if (res.rawHeaders != undefined) { + expect(res.rawHeaders.indexOf("Content-Type")).not.to.eql(-1); + expect(res.rawHeaders.indexOf("text/plain")).not.to.eql(-1); + } + + res.on("data", (data) => { + expect(data.toString()).to.eql("Hello from " + sourcePort); + }); + + res.on("end", () => { + source.close(); + proxy.close(resolve); + }); + }, + ) + .end(); + await promise; + }); + }); + + describe("#createProxyServer() method with error response", () => { + it("should make the request and emit the error event", async () => { + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:1", + }); + + const { promise, resolve } = Promise.withResolvers(); + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNREFUSED"); + proxy.close(() => {}); + resolve(); + }); + + const proxyPort = await proxyListen(proxy); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + () => {}, + ) + .end(); + + await promise.catch(() => {}); + }); + }); + + describe("#createProxyServer setting the correct timeout value", () => { + it("should hang up the socket at the timeout", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const source = http.createServer(function (_req, res) { + setTimeout(() => { + res.end("At this point the socket should be closed"); + }, 5); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + timeout: 3, + }); + const proxyPort = await proxyListen(proxy); + + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + }); + + const testReq = http.request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + () => {}, + ); + + testReq.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + proxy.close(() => {}); + source.close(); + resolve(); + }); + + testReq.end(); + await promise; + }); + }); + + describe("#createProxyServer with xfwd option", () => { + it("should not throw on empty http host header", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + forward: "http://127.0.0.1:" + sourcePort, + xfwd: true, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", function (req, _res) { + expect(req.method).to.eql("GET"); + // Host header is forwarded from the original request (not changed to source) + expect(req.headers["x-forwarded-for"]).toBeDefined(); + source.close(); + proxy.close(resolve); + }); + + const socket = net.connect({ port: proxyPort }, () => { + socket.write("GET / HTTP/1.0\r\n\r\n"); + }); + + socket.on("data", () => { + socket.end(); + }); + + // Ignore socket errors during teardown (server may close before socket drains) + socket.on("error", () => {}); + + http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); + await promise; + }); + }); + + describe("#createProxyServer using the ws-incoming passes", () => { + it("should proxy the websockets stream", async () => { + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const { promise, resolve } = Promise.withResolvers(); + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("hello there"); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should emit error on proxy error", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const proxy = httpProxy.createProxyServer({ + // Note: we don't ever listen on this port + target: "ws://127.0.0.1:1", + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + let count = 0; + function maybe_done() { + count += 1; + if (count === 2) resolve(); + } + + client.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + maybe_done(); + }); + + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNREFUSED"); + proxyServer.close(() => {}); + maybe_done(); + }); + await promise; + }); + + it("should close client socket if upstream is closed before upgrade", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + server.on("upgrade", function (req, socket, head) { + const response = ["HTTP/1.1 404 Not Found", "Content-type: text/html", "", ""]; + socket.write(response.join("\r\n")); + socket.end(); + }); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + // note: we don't ever listen on this port + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + proxyServer.close(resolve); + }); + + await promise; + }); + + it("should proxy a socket.io stream", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const destiny = new io.Server(server); + + function startSocketIo() { + const client = ioClient("ws://127.0.0.1:" + proxyPort); + client.on("connect", () => { + client.emit("incoming", "hello there"); + }); + + client.on("outgoing", (data: any) => { + expect(data).toBe("Hello over websockets"); + client.disconnect(); + destiny.close(); + server.close(); + proxyServer.close(resolve); + }); + } + startSocketIo(); + + destiny.on("connection", (socket) => { + socket.on("incoming", (msg) => { + expect(msg).toBe("hello there"); + socket.emit("outgoing", "Hello over websockets"); + }); + }); + + await promise; + }); + + it("should emit open and close events when socket.io client connects and disconnects", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const destiny = new io.Server(server); + + function startSocketIo() { + const client = ioClient("ws://127.0.0.1:" + proxyPort); + client.on("connect", () => { + client.disconnect(); + }); + } + let count = 0; + + proxyServer.on("open", () => { + count += 1; + }); + + proxyServer.on("close", () => { + destiny.close(); + server.close(); + proxyServer.close(() => {}); + expect(count).toBe(1); + resolve(); + }); + + startSocketIo(); + await promise; + }); + + it("should pass all set-cookie headers to client", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("upgrade", (res) => { + expect(res.headers["set-cookie"]).toHaveLength(2); + }); + + client.on("open", () => { + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("headers", (headers) => { + headers.push("Set-Cookie: test1=test1", "Set-Cookie: test2=test2"); + }); + + await promise; + }); + + it("should detect a proxyReq event and modify headers", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + + proxy.on("proxyReqWs", function (proxyReq, req, socket, options, head) { + proxyReq.setHeader("X-Special-Proxy-Header", "foobar"); + }); + + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("message", (msg: any) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", function (socket, upgradeReq) { + expect(upgradeReq.headers["x-special-proxy-header"]).to.eql("foobar"); + + socket.on("message", (msg: any) => { + expect(msg.toString("utf8")).toBe("hello there"); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should forward frames with single frame payload (including on node 4.x)", async () => { + const { resolve, promise } = await Promise.withResolvers(); + const payload = Array.from({ length: 65_529 }).join("0"); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send(payload); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe(payload); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should forward continuation frames with big payload (including on node 4.x)", async () => { + const { promise, resolve } = Promise.withResolvers(); + const payload = Array.from({ length: 65_530 }).join("0"); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send(payload); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe(payload); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should not crash when client socket errors before upstream upgrade (issue #79)", async () => { + const { promise, resolve } = Promise.withResolvers(); + + // Backend that delays responding to the upgrade request + const server = http.createServer(); + server.on("upgrade", (_req, socket) => { + // Never respond — simulate a slow/hanging backend + socket.on("error", () => {}); + setTimeout(() => socket.destroy(), 500); + }); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + + // Intercept the ws stream pass to inject an error on the client socket + // before the upstream upgrade response arrives + proxy.before("ws", "", ((_req: any, socket: any) => { + // After the proxy sets up the upstream request but before the + // upgrade callback fires, simulate a client disconnect (ECONNRESET) + setTimeout(() => { + socket.destroy(new Error("read ECONNRESET")); + }, 50); + }) as any); + + const proxyPort = await proxyListen(proxy); + + proxy.on("error", () => { + // The error should be caught here, not crash the process + proxy.close(() => {}); + server.close(); + resolve(); + }); + + // Use a raw TCP socket to send a WebSocket upgrade request + const client = net.connect(proxyPort, "127.0.0.1", () => { + client.write( + "GET / HTTP/1.1\r\n" + + "Host: 127.0.0.1\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + + "Sec-WebSocket-Version: 13\r\n" + + "\r\n", + ); + }); + client.on("error", () => {}); + + await promise; + }); + }); +}); From 65fc8600cdf6d55a2c895b6317e3c6c8c315e45b Mon Sep 17 00:00:00 2001 From: gkoos Date: Sat, 21 Mar 2026 03:53:44 +0000 Subject: [PATCH 2/3] fix(web-incoming): close downstream on upstream SSE abort/error --- src/middleware/web-incoming.ts | 616 ++++++------- test/http-proxy.test.ts | 1542 ++++++++++++++++---------------- 2 files changed, 1079 insertions(+), 1079 deletions(-) diff --git a/src/middleware/web-incoming.ts b/src/middleware/web-incoming.ts index fbffc6f..953a655 100644 --- a/src/middleware/web-incoming.ts +++ b/src/middleware/web-incoming.ts @@ -1,308 +1,308 @@ -import type { ClientRequest, IncomingMessage, ServerResponse } from "node:http"; -import type { ProxyTargetDetailed } from "../types.ts"; -import nodeHTTP from "node:http"; -import nodeHTTPS from "node:https"; -import { getPort, hasEncryptedConnection, setupOutgoing } from "../_utils.ts"; -import { webOutgoingMiddleware } from "./web-outgoing.ts"; -import { type ProxyMiddleware, defineProxyMiddleware } from "./_utils.ts"; - -const nativeAgents = { http: nodeHTTP, https: nodeHTTPS }; -const redirectStatuses = new Set([301, 302, 303, 307, 308]); - -/** - * Sets `content-length` to '0' if request is of DELETE type. - */ -export const deleteLength = defineProxyMiddleware((req) => { - if ((req.method === "DELETE" || req.method === "OPTIONS") && !req.headers["content-length"]) { - req.headers["content-length"] = "0"; - delete req.headers["transfer-encoding"]; - } -}); - -/** - * Sets timeout in request socket if it was specified in options. - */ -export const timeout = defineProxyMiddleware((req, res, options) => { - if (options.timeout) { - req.socket.setTimeout(options.timeout, () => { - req.socket.destroy(); - }); - } -}); - -/** - * Sets `x-forwarded-*` headers if specified in config. - */ -export const XHeaders = defineProxyMiddleware((req, res, options) => { - if (!options.xfwd) { - return; - } - - const encrypted = (req as any).isSpdy || hasEncryptedConnection(req); - const values = { - for: req.connection.remoteAddress || req.socket.remoteAddress, - port: getPort(req), - proto: encrypted ? "https" : "http", - }; - - for (const header of ["for", "port", "proto"] as const) { - req.headers["x-forwarded-" + header] = - (req.headers["x-forwarded-" + header] || "") + - (req.headers["x-forwarded-" + header] ? "," : "") + - values[header]; - } - - req.headers["x-forwarded-host"] = req.headers["x-forwarded-host"] || req.headers.host || ""; -}); - -/** - * Does the actual proxying. If `forward` is enabled fires up - * a ForwardStream, same happens for ProxyStream. The request - * just dies otherwise. - * - */ -export const stream = defineProxyMiddleware((req, res, options, server, head, callback) => { - // And we begin! - server.emit("start", req, res, options.target || options.forward); - - const http = nativeAgents.http; - const https = nativeAgents.https; - - const maxRedirects = - typeof options.followRedirects === "number" - ? options.followRedirects - : options.followRedirects - ? 5 - : 0; - - if (options.forward) { - // If forward enable, so just pipe the request - const forwardReq = (options.forward.protocol === "https:" ? https : http).request( - setupOutgoing(options.ssl || {}, options, req, "forward"), - ); - - // error handler (e.g. ECONNRESET, ECONNREFUSED) - // Handle errors on incoming request as well as it makes sense to - const forwardError = createErrorHandler(forwardReq, options.forward); - req.on("error", forwardError); - forwardReq.on("error", forwardError); - - (options.buffer || req).pipe(forwardReq); - if (!options.target) { - res.end(); - return; - } - } - - // Request initalization - const proxyReq = (options.target.protocol === "https:" ? https : http).request( - setupOutgoing(options.ssl || {}, options, req), - ); - - // Enable developers to modify the proxyReq before headers are sent - proxyReq.on("socket", (_socket) => { - if (server && !proxyReq.getHeader("expect")) { - server.emit("proxyReq", proxyReq, req, res, options); - } - }); - - // allow outgoing socket to timeout so that we could - // show an error page at the initial request - if (options.proxyTimeout) { - proxyReq.setTimeout(options.proxyTimeout, function () { - proxyReq.abort(); - }); - } - - // Ensure we abort proxy if request is aborted - req.on("aborted", function () { - proxyReq.abort(); - }); - - // Abort proxy request when client disconnects - res.on("close", function () { - if (!res.writableFinished) { - proxyReq.destroy(); - } - }); - - // handle errors in proxy and incoming request, just like for forward proxy - const proxyError = createErrorHandler(proxyReq, options.target); - req.on("error", proxyError); - proxyReq.on("error", proxyError); - - function createErrorHandler(proxyReq: ClientRequest, url: URL | ProxyTargetDetailed) { - return function proxyError(err: Error) { - if (req.socket.destroyed && (err as NodeJS.ErrnoException).code === "ECONNRESET") { - server.emit("econnreset", err, req, res, url); - return proxyReq.abort(); - } - - if (callback) { - callback(err, req, res, url); - } else { - server.emit("error", err, req, res, url); - } - }; - } - - // Buffer request body when following redirects (needed for 307/308 replay) - let bodyBuffer: Buffer | undefined; - if (maxRedirects > 0) { - const chunks: Buffer[] = []; - const source = options.buffer || req; - source.on("data", (chunk: Buffer) => { - chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); - proxyReq.write(chunk); - }); - source.on("end", () => { - bodyBuffer = Buffer.concat(chunks); - proxyReq.end(); - }); - source.on("error", (err: Error) => { - proxyReq.destroy(err); - }); - } else { - (options.buffer || req).pipe(proxyReq); - } - - function handleResponse(proxyRes: IncomingMessage, redirectCount: number, currentUrl: URL) { - const statusCode = proxyRes.statusCode!; - - if ( - maxRedirects > 0 && - redirectStatuses.has(statusCode) && - redirectCount < maxRedirects && - proxyRes.headers.location - ) { - // Drain the redirect response body - proxyRes.resume(); - - const location = new URL(proxyRes.headers.location, currentUrl); - - // 301/302/303 → GET without body; 307/308 → preserve method and body - const preserveMethod = statusCode === 307 || statusCode === 308; - const redirectMethod = preserveMethod ? req.method || "GET" : "GET"; - - const isHTTPS = location.protocol === "https:"; - const agent = isHTTPS ? https : http; - - // Build headers from original request - const redirectHeaders: Record = { ...req.headers }; - if (options.headers) { - Object.assign(redirectHeaders, options.headers); - } - redirectHeaders.host = location.host; - - // Strip sensitive headers on cross-origin redirects - if (location.host !== currentUrl.host) { - delete redirectHeaders.authorization; - delete redirectHeaders.cookie; - } - - // Drop body-related headers when method changes to GET - if (!preserveMethod) { - delete redirectHeaders["content-length"]; - delete redirectHeaders["content-type"]; - delete redirectHeaders["transfer-encoding"]; - } - - const redirectOpts: nodeHTTP.RequestOptions = { - hostname: location.hostname, - port: location.port || (isHTTPS ? 443 : 80), - path: location.pathname + location.search, - method: redirectMethod, - headers: redirectHeaders, - agent: options.agent || false, - }; - - if (isHTTPS) { - (redirectOpts as nodeHTTPS.RequestOptions).rejectUnauthorized = - options.secure === undefined ? true : options.secure; - } - - const redirectReq = agent.request(redirectOpts); - - if (server && !redirectReq.getHeader("expect")) { - server.emit("proxyReq", redirectReq, req, res, options); - } - - if (options.proxyTimeout) { - redirectReq.setTimeout(options.proxyTimeout, () => { - redirectReq.abort(); - }); - } - - const redirectError = createErrorHandler(redirectReq, location); - redirectReq.on("error", redirectError); - - redirectReq.on("response", (nextRes: IncomingMessage) => { - handleResponse(nextRes, redirectCount + 1, location); - }); - - if (preserveMethod && bodyBuffer && bodyBuffer.length > 0) { - redirectReq.end(bodyBuffer); - } else { - redirectReq.end(); - } - - return; - } - - // Non-redirect response (or max redirects exceeded) - if (server) { - server.emit("proxyRes", proxyRes, req, res); - } - - if (!res.headersSent && !options.selfHandleResponse) { - for (const pass of webOutgoingMiddleware) { - if (pass(req, res, proxyRes, options)) { - break; - } - } - } - - if (res.finished) { - if (server) { - server.emit("end", req, res, proxyRes); - } - } else { - res.on("close", function () { - proxyRes.destroy(); - }); - proxyRes.on("aborted", function () { - if (!res.destroyed) { - res.destroy(); - } - }); - proxyRes.on("error", function (err) { - if (!res.destroyed) { - res.destroy(err); - } - - if (server.listenerCount("error") > 0) { - server.emit("error", err, req, res, currentUrl); - } - }); - proxyRes.on("end", function () { - if (server) { - server.emit("end", req, res, proxyRes); - } - }); - if (!options.selfHandleResponse) { - proxyRes.pipe(res); - } - } - } - - proxyReq.on("response", function (proxyRes) { - handleResponse(proxyRes, 0, options.target as URL); - }); -}); - -export const webIncomingMiddleware: readonly ProxyMiddleware[] = [ - deleteLength, - timeout, - XHeaders, - stream, -] as const; +import type { ClientRequest, IncomingMessage, ServerResponse } from "node:http"; +import type { ProxyTargetDetailed } from "../types.ts"; +import nodeHTTP from "node:http"; +import nodeHTTPS from "node:https"; +import { getPort, hasEncryptedConnection, setupOutgoing } from "../_utils.ts"; +import { webOutgoingMiddleware } from "./web-outgoing.ts"; +import { type ProxyMiddleware, defineProxyMiddleware } from "./_utils.ts"; + +const nativeAgents = { http: nodeHTTP, https: nodeHTTPS }; +const redirectStatuses = new Set([301, 302, 303, 307, 308]); + +/** + * Sets `content-length` to '0' if request is of DELETE type. + */ +export const deleteLength = defineProxyMiddleware((req) => { + if ((req.method === "DELETE" || req.method === "OPTIONS") && !req.headers["content-length"]) { + req.headers["content-length"] = "0"; + delete req.headers["transfer-encoding"]; + } +}); + +/** + * Sets timeout in request socket if it was specified in options. + */ +export const timeout = defineProxyMiddleware((req, res, options) => { + if (options.timeout) { + req.socket.setTimeout(options.timeout, () => { + req.socket.destroy(); + }); + } +}); + +/** + * Sets `x-forwarded-*` headers if specified in config. + */ +export const XHeaders = defineProxyMiddleware((req, res, options) => { + if (!options.xfwd) { + return; + } + + const encrypted = (req as any).isSpdy || hasEncryptedConnection(req); + const values = { + for: req.connection.remoteAddress || req.socket.remoteAddress, + port: getPort(req), + proto: encrypted ? "https" : "http", + }; + + for (const header of ["for", "port", "proto"] as const) { + req.headers["x-forwarded-" + header] = + (req.headers["x-forwarded-" + header] || "") + + (req.headers["x-forwarded-" + header] ? "," : "") + + values[header]; + } + + req.headers["x-forwarded-host"] = req.headers["x-forwarded-host"] || req.headers.host || ""; +}); + +/** + * Does the actual proxying. If `forward` is enabled fires up + * a ForwardStream, same happens for ProxyStream. The request + * just dies otherwise. + * + */ +export const stream = defineProxyMiddleware((req, res, options, server, head, callback) => { + // And we begin! + server.emit("start", req, res, options.target || options.forward); + + const http = nativeAgents.http; + const https = nativeAgents.https; + + const maxRedirects = + typeof options.followRedirects === "number" + ? options.followRedirects + : options.followRedirects + ? 5 + : 0; + + if (options.forward) { + // If forward enable, so just pipe the request + const forwardReq = (options.forward.protocol === "https:" ? https : http).request( + setupOutgoing(options.ssl || {}, options, req, "forward"), + ); + + // error handler (e.g. ECONNRESET, ECONNREFUSED) + // Handle errors on incoming request as well as it makes sense to + const forwardError = createErrorHandler(forwardReq, options.forward); + req.on("error", forwardError); + forwardReq.on("error", forwardError); + + (options.buffer || req).pipe(forwardReq); + if (!options.target) { + res.end(); + return; + } + } + + // Request initalization + const proxyReq = (options.target.protocol === "https:" ? https : http).request( + setupOutgoing(options.ssl || {}, options, req), + ); + + // Enable developers to modify the proxyReq before headers are sent + proxyReq.on("socket", (_socket) => { + if (server && !proxyReq.getHeader("expect")) { + server.emit("proxyReq", proxyReq, req, res, options); + } + }); + + // allow outgoing socket to timeout so that we could + // show an error page at the initial request + if (options.proxyTimeout) { + proxyReq.setTimeout(options.proxyTimeout, function () { + proxyReq.abort(); + }); + } + + // Ensure we abort proxy if request is aborted + req.on("aborted", function () { + proxyReq.abort(); + }); + + // Abort proxy request when client disconnects + res.on("close", function () { + if (!res.writableFinished) { + proxyReq.destroy(); + } + }); + + // handle errors in proxy and incoming request, just like for forward proxy + const proxyError = createErrorHandler(proxyReq, options.target); + req.on("error", proxyError); + proxyReq.on("error", proxyError); + + function createErrorHandler(proxyReq: ClientRequest, url: URL | ProxyTargetDetailed) { + return function proxyError(err: Error) { + if (req.socket.destroyed && (err as NodeJS.ErrnoException).code === "ECONNRESET") { + server.emit("econnreset", err, req, res, url); + return proxyReq.abort(); + } + + if (callback) { + callback(err, req, res, url); + } else if (server.listenerCount("error") > 0) { + server.emit("error", err, req, res, url); + } + }; + } + + // Buffer request body when following redirects (needed for 307/308 replay) + let bodyBuffer: Buffer | undefined; + if (maxRedirects > 0) { + const chunks: Buffer[] = []; + const source = options.buffer || req; + source.on("data", (chunk: Buffer) => { + chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); + proxyReq.write(chunk); + }); + source.on("end", () => { + bodyBuffer = Buffer.concat(chunks); + proxyReq.end(); + }); + source.on("error", (err: Error) => { + proxyReq.destroy(err); + }); + } else { + (options.buffer || req).pipe(proxyReq); + } + + function handleResponse(proxyRes: IncomingMessage, redirectCount: number, currentUrl: URL) { + const statusCode = proxyRes.statusCode!; + + if ( + maxRedirects > 0 && + redirectStatuses.has(statusCode) && + redirectCount < maxRedirects && + proxyRes.headers.location + ) { + // Drain the redirect response body + proxyRes.resume(); + + const location = new URL(proxyRes.headers.location, currentUrl); + + // 301/302/303 → GET without body; 307/308 → preserve method and body + const preserveMethod = statusCode === 307 || statusCode === 308; + const redirectMethod = preserveMethod ? req.method || "GET" : "GET"; + + const isHTTPS = location.protocol === "https:"; + const agent = isHTTPS ? https : http; + + // Build headers from original request + const redirectHeaders: Record = { ...req.headers }; + if (options.headers) { + Object.assign(redirectHeaders, options.headers); + } + redirectHeaders.host = location.host; + + // Strip sensitive headers on cross-origin redirects + if (location.host !== currentUrl.host) { + delete redirectHeaders.authorization; + delete redirectHeaders.cookie; + } + + // Drop body-related headers when method changes to GET + if (!preserveMethod) { + delete redirectHeaders["content-length"]; + delete redirectHeaders["content-type"]; + delete redirectHeaders["transfer-encoding"]; + } + + const redirectOpts: nodeHTTP.RequestOptions = { + hostname: location.hostname, + port: location.port || (isHTTPS ? 443 : 80), + path: location.pathname + location.search, + method: redirectMethod, + headers: redirectHeaders, + agent: options.agent || false, + }; + + if (isHTTPS) { + (redirectOpts as nodeHTTPS.RequestOptions).rejectUnauthorized = + options.secure === undefined ? true : options.secure; + } + + const redirectReq = agent.request(redirectOpts); + + if (server && !redirectReq.getHeader("expect")) { + server.emit("proxyReq", redirectReq, req, res, options); + } + + if (options.proxyTimeout) { + redirectReq.setTimeout(options.proxyTimeout, () => { + redirectReq.abort(); + }); + } + + const redirectError = createErrorHandler(redirectReq, location); + redirectReq.on("error", redirectError); + + redirectReq.on("response", (nextRes: IncomingMessage) => { + handleResponse(nextRes, redirectCount + 1, location); + }); + + if (preserveMethod && bodyBuffer && bodyBuffer.length > 0) { + redirectReq.end(bodyBuffer); + } else { + redirectReq.end(); + } + + return; + } + + // Non-redirect response (or max redirects exceeded) + if (server) { + server.emit("proxyRes", proxyRes, req, res); + } + + if (!res.headersSent && !options.selfHandleResponse) { + for (const pass of webOutgoingMiddleware) { + if (pass(req, res, proxyRes, options)) { + break; + } + } + } + + if (res.finished) { + if (server) { + server.emit("end", req, res, proxyRes); + } + } else { + res.on("close", function () { + proxyRes.destroy(); + }); + proxyRes.on("close", function () { + if (!proxyRes.complete && !res.destroyed) { + res.destroy(); + } + }); + proxyRes.on("error", function (err) { + if (!res.destroyed) { + res.destroy(err); + } + + if (server.listenerCount("error") > 0) { + server.emit("error", err, req, res, currentUrl); + } + }); + proxyRes.on("end", function () { + if (server) { + server.emit("end", req, res, proxyRes); + } + }); + if (!options.selfHandleResponse) { + proxyRes.pipe(res); + } + } + } + + proxyReq.on("response", function (proxyRes) { + handleResponse(proxyRes, 0, options.target as URL); + }); +}); + +export const webIncomingMiddleware: readonly ProxyMiddleware[] = [ + deleteLength, + timeout, + XHeaders, + stream, +] as const; diff --git a/test/http-proxy.test.ts b/test/http-proxy.test.ts index e4ca9df..acabfba 100644 --- a/test/http-proxy.test.ts +++ b/test/http-proxy.test.ts @@ -1,771 +1,771 @@ -import { describe, it, expect } from "vitest"; -import * as httpProxy from "../src/index.ts"; -import http from "node:http"; -import net from "node:net"; -import * as ws from "ws"; -import * as io from "socket.io"; -import SSE from "sse"; -import ioClient from "socket.io-client"; -import type { AddressInfo } from "node:net"; - -// Source: https://github.com/http-party/node-http-proxy/blob/master/test/lib-http-proxy-test.js - -function listenOn(server: http.Server | net.Server): Promise { - return new Promise((resolve, reject) => { - server.once("error", reject); - server.listen(0, "127.0.0.1", () => { - resolve((server.address() as AddressInfo).port); - }); - }); -} - -function proxyListen(proxy: ReturnType): Promise { - return new Promise((resolve, reject) => { - proxy.listen(0, "127.0.0.1"); - const server = (proxy as any)._server as net.Server; - server.once("error", reject); - server.once("listening", () => { - resolve((server.address() as AddressInfo).port); - }); - }); -} - -describe("http-proxy", () => { - describe("#createProxyServer", () => { - it.skip("should throw without options", () => { - let error; - try { - httpProxy.createProxyServer(); - } catch (error_) { - error = error_; - } - - expect(error).to.toBeInstanceOf(Error); - }); - - it("should return an object otherwise", () => { - const obj = httpProxy.createProxyServer({ - target: "http://www.google.com:80", - }); - - expect(obj.web).to.toBeInstanceOf(Function); - expect(obj.ws).to.instanceOf(Function); - expect(obj.listen).to.instanceOf(Function); - }); - }); - - describe("#createProxyServer with forward options and using web-incoming passes", () => { - it("should pipe the request using web-incoming#stream method", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - forward: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", (req, res) => { - expect(req.method).to.eql("GET"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).toBe(proxyPort); - source.close(); - proxy.close(resolve); - }); - - http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); - - await promise; - }); - }); - - describe("#createProxyServer using the web-incoming passes", () => { - it("should proxy sse", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const sse = new SSE(source, { path: "/" }); - sse.on("connection", (client) => { - client.send("Hello over SSE"); - client.close(); - }); - - const options = { - hostname: "127.0.0.1", - port: proxyPort, - }; - - const { promise, resolve } = Promise.withResolvers(); - const req = http - .request(options, (res) => { - let streamData = ""; - res.on("data", (chunk) => { - streamData += chunk.toString("utf8"); - }); - res.on("end", () => { - expect(streamData).to.equal(":ok\n\ndata: Hello over SSE\n\n"); - source.close(); - proxy.close(resolve); - }); - }) - .end(); - - await promise; - }); - - it("should close downstream SSE stream when upstream aborts", async () => { - const source = http.createServer((_, res) => { - res.writeHead(200, { - "content-type": "text/event-stream", - "cache-control": "no-cache", - connection: "keep-alive", - }); - res.write(":ok\n\n"); - - setTimeout(() => { - res.socket?.destroy(); - }, 20); - }); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - let gotFirstChunk = false; - let requestError: Error | undefined; - - const finish = () => { - source.close(); - proxy.close(resolve); - }; - - const timeout = setTimeout(() => { - requestError = new Error("Timed out waiting for downstream SSE close"); - finish(); - }, 1000); - - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - (res) => { - res.on("data", (chunk) => { - if (chunk.toString("utf8").includes(":ok")) { - gotFirstChunk = true; - } - }); - - res.once("close", () => { - clearTimeout(timeout); - finish(); - }); - }, - ) - .on("error", (error) => { - clearTimeout(timeout); - requestError = error; - finish(); - }) - .end(); - - await promise; - expect(requestError).toBeUndefined(); - expect(gotFirstChunk).toBe(true); - }); - - it("should make the request on pipe and finish it", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", (req, res) => { - expect(req.method).to.eql("POST"); - expect(req.headers["x-forwarded-for"]).to.eql("127.0.0.1"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); - source.close(); - proxy.close(() => {}); - resolve(); - }); - - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "POST", - headers: { - "x-forwarded-for": "127.0.0.1", - }, - }, - () => {}, - ) - .end(); - - await promise; - }); - }); - - describe("#createProxyServer using the web-incoming passes", () => { - it("should make the request, handle response and finish it", async () => { - const source = http.createServer((req, res) => { - expect(req.method).to.eql("GET"); - expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); - res.writeHead(200, { "Content-Type": "text/plain" }); - res.end("Hello from " + (source.address()! as any).port); - }); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - preserveHeaderKeyCase: true, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - (res) => { - expect(res.statusCode).to.eql(200); - expect(res.headers["content-type"]).to.eql("text/plain"); - if (res.rawHeaders != undefined) { - expect(res.rawHeaders.indexOf("Content-Type")).not.to.eql(-1); - expect(res.rawHeaders.indexOf("text/plain")).not.to.eql(-1); - } - - res.on("data", (data) => { - expect(data.toString()).to.eql("Hello from " + sourcePort); - }); - - res.on("end", () => { - source.close(); - proxy.close(resolve); - }); - }, - ) - .end(); - await promise; - }); - }); - - describe("#createProxyServer() method with error response", () => { - it("should make the request and emit the error event", async () => { - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:1", - }); - - const { promise, resolve } = Promise.withResolvers(); - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNREFUSED"); - proxy.close(() => {}); - resolve(); - }); - - const proxyPort = await proxyListen(proxy); - - http - .request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - () => {}, - ) - .end(); - - await promise.catch(() => {}); - }); - }); - - describe("#createProxyServer setting the correct timeout value", () => { - it("should hang up the socket at the timeout", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const source = http.createServer(function (_req, res) { - setTimeout(() => { - res.end("At this point the socket should be closed"); - }, 5); - }); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - target: "http://127.0.0.1:" + sourcePort, - timeout: 3, - }); - const proxyPort = await proxyListen(proxy); - - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - }); - - const testReq = http.request( - { - hostname: "127.0.0.1", - port: proxyPort, - method: "GET", - }, - () => {}, - ); - - testReq.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - proxy.close(() => {}); - source.close(); - resolve(); - }); - - testReq.end(); - await promise; - }); - }); - - describe("#createProxyServer with xfwd option", () => { - it("should not throw on empty http host header", async () => { - const source = http.createServer(); - const sourcePort = await listenOn(source); - - const proxy = httpProxy.createProxyServer({ - forward: "http://127.0.0.1:" + sourcePort, - xfwd: true, - }); - const proxyPort = await proxyListen(proxy); - - const { promise, resolve } = Promise.withResolvers(); - source.on("request", function (req, _res) { - expect(req.method).to.eql("GET"); - // Host header is forwarded from the original request (not changed to source) - expect(req.headers["x-forwarded-for"]).toBeDefined(); - source.close(); - proxy.close(resolve); - }); - - const socket = net.connect({ port: proxyPort }, () => { - socket.write("GET / HTTP/1.0\r\n\r\n"); - }); - - socket.on("data", () => { - socket.end(); - }); - - // Ignore socket errors during teardown (server may close before socket drains) - socket.on("error", () => {}); - - http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); - await promise; - }); - }); - - describe("#createProxyServer using the ws-incoming passes", () => { - it("should proxy the websockets stream", async () => { - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const { promise, resolve } = Promise.withResolvers(); - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("hello there"); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should emit error on proxy error", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const proxy = httpProxy.createProxyServer({ - // Note: we don't ever listen on this port - target: "ws://127.0.0.1:1", - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - let count = 0; - function maybe_done() { - count += 1; - if (count === 2) resolve(); - } - - client.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNRESET"); - maybe_done(); - }); - - proxy.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - expect((err as any).code).toBe("ECONNREFUSED"); - proxyServer.close(() => {}); - maybe_done(); - }); - await promise; - }); - - it("should close client socket if upstream is closed before upgrade", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - server.on("upgrade", function (req, socket, head) { - const response = ["HTTP/1.1 404 Not Found", "Content-type: text/html", "", ""]; - socket.write(response.join("\r\n")); - socket.end(); - }); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - // note: we don't ever listen on this port - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("error", (err) => { - expect(err).toBeInstanceOf(Error); - proxyServer.close(resolve); - }); - - await promise; - }); - - it("should proxy a socket.io stream", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const destiny = new io.Server(server); - - function startSocketIo() { - const client = ioClient("ws://127.0.0.1:" + proxyPort); - client.on("connect", () => { - client.emit("incoming", "hello there"); - }); - - client.on("outgoing", (data: any) => { - expect(data).toBe("Hello over websockets"); - client.disconnect(); - destiny.close(); - server.close(); - proxyServer.close(resolve); - }); - } - startSocketIo(); - - destiny.on("connection", (socket) => { - socket.on("incoming", (msg) => { - expect(msg).toBe("hello there"); - socket.emit("outgoing", "Hello over websockets"); - }); - }); - - await promise; - }); - - it("should emit open and close events when socket.io client connects and disconnects", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const server = http.createServer(); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - const destiny = new io.Server(server); - - function startSocketIo() { - const client = ioClient("ws://127.0.0.1:" + proxyPort); - client.on("connect", () => { - client.disconnect(); - }); - } - let count = 0; - - proxyServer.on("open", () => { - count += 1; - }); - - proxyServer.on("close", () => { - destiny.close(); - server.close(); - proxyServer.close(() => {}); - expect(count).toBe(1); - resolve(); - }); - - startSocketIo(); - await promise; - }); - - it("should pass all set-cookie headers to client", async () => { - const { resolve, promise } = Promise.withResolvers(); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("upgrade", (res) => { - expect(res.headers["set-cookie"]).toHaveLength(2); - }); - - client.on("open", () => { - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("headers", (headers) => { - headers.push("Set-Cookie: test1=test1", "Set-Cookie: test2=test2"); - }); - - await promise; - }); - - it("should detect a proxyReq event and modify headers", async () => { - const { promise, resolve } = Promise.withResolvers(); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - - proxy.on("proxyReqWs", function (proxyReq, req, socket, options, head) { - proxyReq.setHeader("X-Special-Proxy-Header", "foobar"); - }); - - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send("hello there"); - }); - - client.on("message", (msg: any) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", function (socket, upgradeReq) { - expect(upgradeReq.headers["x-special-proxy-header"]).to.eql("foobar"); - - socket.on("message", (msg: any) => { - expect(msg.toString("utf8")).toBe("hello there"); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should forward frames with single frame payload (including on node 4.x)", async () => { - const { resolve, promise } = await Promise.withResolvers(); - const payload = Array.from({ length: 65_529 }).join("0"); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send(payload); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe(payload); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should forward continuation frames with big payload (including on node 4.x)", async () => { - const { promise, resolve } = Promise.withResolvers(); - const payload = Array.from({ length: 65_530 }).join("0"); - - const destiny = new ws.WebSocketServer({ port: 0 }); - await new Promise((r) => destiny.on("listening", r)); - const sourcePort = (destiny.address() as AddressInfo).port; - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - const proxyPort = await proxyListen(proxy); - const proxyServer = proxy; - - const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); - - client.on("open", () => { - client.send(payload); - }); - - client.on("message", (msg) => { - expect(msg.toString("utf8")).toBe("Hello over websockets"); - client.close(); - destiny.close(); - proxyServer.close(resolve); - }); - - destiny.on("connection", (socket) => { - socket.on("message", (msg) => { - expect(msg.toString("utf8")).toBe(payload); - socket.send("Hello over websockets"); - }); - }); - - await promise; - }); - - it("should not crash when client socket errors before upstream upgrade (issue #79)", async () => { - const { promise, resolve } = Promise.withResolvers(); - - // Backend that delays responding to the upgrade request - const server = http.createServer(); - server.on("upgrade", (_req, socket) => { - // Never respond — simulate a slow/hanging backend - socket.on("error", () => {}); - setTimeout(() => socket.destroy(), 500); - }); - const sourcePort = await listenOn(server); - - const proxy = httpProxy.createProxyServer({ - target: "ws://127.0.0.1:" + sourcePort, - ws: true, - }); - - // Intercept the ws stream pass to inject an error on the client socket - // before the upstream upgrade response arrives - proxy.before("ws", "", ((_req: any, socket: any) => { - // After the proxy sets up the upstream request but before the - // upgrade callback fires, simulate a client disconnect (ECONNRESET) - setTimeout(() => { - socket.destroy(new Error("read ECONNRESET")); - }, 50); - }) as any); - - const proxyPort = await proxyListen(proxy); - - proxy.on("error", () => { - // The error should be caught here, not crash the process - proxy.close(() => {}); - server.close(); - resolve(); - }); - - // Use a raw TCP socket to send a WebSocket upgrade request - const client = net.connect(proxyPort, "127.0.0.1", () => { - client.write( - "GET / HTTP/1.1\r\n" + - "Host: 127.0.0.1\r\n" + - "Upgrade: websocket\r\n" + - "Connection: Upgrade\r\n" + - "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + - "Sec-WebSocket-Version: 13\r\n" + - "\r\n", - ); - }); - client.on("error", () => {}); - - await promise; - }); - }); -}); +import { describe, it, expect } from "vitest"; +import * as httpProxy from "../src/index.ts"; +import http from "node:http"; +import net from "node:net"; +import * as ws from "ws"; +import * as io from "socket.io"; +import SSE from "sse"; +import ioClient from "socket.io-client"; +import type { AddressInfo } from "node:net"; + +// Source: https://github.com/http-party/node-http-proxy/blob/master/test/lib-http-proxy-test.js + +function listenOn(server: http.Server | net.Server): Promise { + return new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + resolve((server.address() as AddressInfo).port); + }); + }); +} + +function proxyListen(proxy: ReturnType): Promise { + return new Promise((resolve, reject) => { + proxy.listen(0, "127.0.0.1"); + const server = (proxy as any)._server as net.Server; + server.once("error", reject); + server.once("listening", () => { + resolve((server.address() as AddressInfo).port); + }); + }); +} + +describe("http-proxy", () => { + describe("#createProxyServer", () => { + it.skip("should throw without options", () => { + let error; + try { + httpProxy.createProxyServer(); + } catch (error_) { + error = error_; + } + + expect(error).to.toBeInstanceOf(Error); + }); + + it("should return an object otherwise", () => { + const obj = httpProxy.createProxyServer({ + target: "http://www.google.com:80", + }); + + expect(obj.web).to.toBeInstanceOf(Function); + expect(obj.ws).to.instanceOf(Function); + expect(obj.listen).to.instanceOf(Function); + }); + }); + + describe("#createProxyServer with forward options and using web-incoming passes", () => { + it("should pipe the request using web-incoming#stream method", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + forward: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", (req, res) => { + expect(req.method).to.eql("GET"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).toBe(proxyPort); + source.close(); + proxy.close(resolve); + }); + + http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); + + await promise; + }); + }); + + describe("#createProxyServer using the web-incoming passes", () => { + it("should proxy sse", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const sse = new SSE(source, { path: "/" }); + sse.on("connection", (client) => { + client.send("Hello over SSE"); + client.close(); + }); + + const options = { + hostname: "127.0.0.1", + port: proxyPort, + }; + + const { promise, resolve } = Promise.withResolvers(); + const req = http + .request(options, (res) => { + let streamData = ""; + res.on("data", (chunk) => { + streamData += chunk.toString("utf8"); + }); + res.on("end", () => { + expect(streamData).to.equal(":ok\n\ndata: Hello over SSE\n\n"); + source.close(); + proxy.close(resolve); + }); + }) + .end(); + + await promise; + }); + + it("should close downstream SSE stream when upstream aborts", async () => { + const source = http.createServer((_, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.write(":ok\n\n"); + + setTimeout(() => { + res.socket?.destroy(); + }, 20); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + let gotFirstChunk = false; + let requestError: Error | undefined; + + const finish = () => { + source.close(); + proxy.close(resolve); + }; + + const timeout = setTimeout(() => { + requestError = new Error("Timed out waiting for downstream SSE close"); + finish(); + }, 1000); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + (res) => { + res.on("data", (chunk) => { + if (chunk.toString("utf8").includes(":ok")) { + gotFirstChunk = true; + } + }); + + res.once("close", () => { + clearTimeout(timeout); + finish(); + }); + }, + ) + .on("error", (error) => { + clearTimeout(timeout); + requestError = error; + finish(); + }) + .end(); + + await promise; + expect(requestError).toBeUndefined(); + expect(gotFirstChunk).toBe(true); + }); + + it("should make the request on pipe and finish it", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", (req, res) => { + expect(req.method).to.eql("POST"); + expect(req.headers["x-forwarded-for"]).to.eql("127.0.0.1"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); + source.close(); + proxy.close(() => {}); + resolve(); + }); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "POST", + headers: { + "x-forwarded-for": "127.0.0.1", + }, + }, + () => {}, + ) + .end(); + + await promise; + }); + }); + + describe("#createProxyServer using the web-incoming passes", () => { + it("should make the request, handle response and finish it", async () => { + const source = http.createServer((req, res) => { + expect(req.method).to.eql("GET"); + expect(Number.parseInt(req.headers.host!.split(":")[1]!)).to.eql(proxyPort); + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("Hello from " + (source.address()! as any).port); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + preserveHeaderKeyCase: true, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + (res) => { + expect(res.statusCode).to.eql(200); + expect(res.headers["content-type"]).to.eql("text/plain"); + if (res.rawHeaders != undefined) { + expect(res.rawHeaders.indexOf("Content-Type")).not.to.eql(-1); + expect(res.rawHeaders.indexOf("text/plain")).not.to.eql(-1); + } + + res.on("data", (data) => { + expect(data.toString()).to.eql("Hello from " + sourcePort); + }); + + res.on("end", () => { + source.close(); + proxy.close(resolve); + }); + }, + ) + .end(); + await promise; + }); + }); + + describe("#createProxyServer() method with error response", () => { + it("should make the request and emit the error event", async () => { + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:1", + }); + + const { promise, resolve } = Promise.withResolvers(); + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNREFUSED"); + proxy.close(() => {}); + resolve(); + }); + + const proxyPort = await proxyListen(proxy); + + http + .request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + () => {}, + ) + .end(); + + await promise.catch(() => {}); + }); + }); + + describe("#createProxyServer setting the correct timeout value", () => { + it("should hang up the socket at the timeout", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const source = http.createServer(function (_req, res) { + setTimeout(() => { + res.end("At this point the socket should be closed"); + }, 5); + }); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + target: "http://127.0.0.1:" + sourcePort, + timeout: 3, + }); + const proxyPort = await proxyListen(proxy); + + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + }); + + const testReq = http.request( + { + hostname: "127.0.0.1", + port: proxyPort, + method: "GET", + }, + () => {}, + ); + + testReq.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + proxy.close(() => {}); + source.close(); + resolve(); + }); + + testReq.end(); + await promise; + }); + }); + + describe("#createProxyServer with xfwd option", () => { + it("should not throw on empty http host header", async () => { + const source = http.createServer(); + const sourcePort = await listenOn(source); + + const proxy = httpProxy.createProxyServer({ + forward: "http://127.0.0.1:" + sourcePort, + xfwd: true, + }); + const proxyPort = await proxyListen(proxy); + + const { promise, resolve } = Promise.withResolvers(); + source.on("request", function (req, _res) { + expect(req.method).to.eql("GET"); + // Host header is forwarded from the original request (not changed to source) + expect(req.headers["x-forwarded-for"]).toBeDefined(); + source.close(); + proxy.close(resolve); + }); + + const socket = net.connect({ port: proxyPort }, () => { + socket.write("GET / HTTP/1.0\r\n\r\n"); + }); + + socket.on("data", () => { + socket.end(); + }); + + // Ignore socket errors during teardown (server may close before socket drains) + socket.on("error", () => {}); + + http.request("http://127.0.0.1:" + proxyPort, () => {}).end(); + await promise; + }); + }); + + describe("#createProxyServer using the ws-incoming passes", () => { + it("should proxy the websockets stream", async () => { + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const { promise, resolve } = Promise.withResolvers(); + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("hello there"); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should emit error on proxy error", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const proxy = httpProxy.createProxyServer({ + // Note: we don't ever listen on this port + target: "ws://127.0.0.1:1", + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + let count = 0; + function maybe_done() { + count += 1; + if (count === 2) resolve(); + } + + client.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNRESET"); + maybe_done(); + }); + + proxy.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + expect((err as any).code).toBe("ECONNREFUSED"); + proxyServer.close(() => {}); + maybe_done(); + }); + await promise; + }); + + it("should close client socket if upstream is closed before upgrade", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + server.on("upgrade", function (req, socket, head) { + const response = ["HTTP/1.1 404 Not Found", "Content-type: text/html", "", ""]; + socket.write(response.join("\r\n")); + socket.end(); + }); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + // note: we don't ever listen on this port + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("error", (err) => { + expect(err).toBeInstanceOf(Error); + proxyServer.close(resolve); + }); + + await promise; + }); + + it("should proxy a socket.io stream", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const destiny = new io.Server(server); + + function startSocketIo() { + const client = ioClient("ws://127.0.0.1:" + proxyPort); + client.on("connect", () => { + client.emit("incoming", "hello there"); + }); + + client.on("outgoing", (data: any) => { + expect(data).toBe("Hello over websockets"); + client.disconnect(); + destiny.close(); + server.close(); + proxyServer.close(resolve); + }); + } + startSocketIo(); + + destiny.on("connection", (socket) => { + socket.on("incoming", (msg) => { + expect(msg).toBe("hello there"); + socket.emit("outgoing", "Hello over websockets"); + }); + }); + + await promise; + }); + + it("should emit open and close events when socket.io client connects and disconnects", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const server = http.createServer(); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + const destiny = new io.Server(server); + + function startSocketIo() { + const client = ioClient("ws://127.0.0.1:" + proxyPort); + client.on("connect", () => { + client.disconnect(); + }); + } + let count = 0; + + proxyServer.on("open", () => { + count += 1; + }); + + proxyServer.on("close", () => { + destiny.close(); + server.close(); + proxyServer.close(() => {}); + expect(count).toBe(1); + resolve(); + }); + + startSocketIo(); + await promise; + }); + + it("should pass all set-cookie headers to client", async () => { + const { resolve, promise } = Promise.withResolvers(); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("upgrade", (res) => { + expect(res.headers["set-cookie"]).toHaveLength(2); + }); + + client.on("open", () => { + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("headers", (headers) => { + headers.push("Set-Cookie: test1=test1", "Set-Cookie: test2=test2"); + }); + + await promise; + }); + + it("should detect a proxyReq event and modify headers", async () => { + const { promise, resolve } = Promise.withResolvers(); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + + proxy.on("proxyReqWs", function (proxyReq, req, socket, options, head) { + proxyReq.setHeader("X-Special-Proxy-Header", "foobar"); + }); + + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send("hello there"); + }); + + client.on("message", (msg: any) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", function (socket, upgradeReq) { + expect(upgradeReq.headers["x-special-proxy-header"]).to.eql("foobar"); + + socket.on("message", (msg: any) => { + expect(msg.toString("utf8")).toBe("hello there"); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should forward frames with single frame payload (including on node 4.x)", async () => { + const { resolve, promise } = await Promise.withResolvers(); + const payload = Array.from({ length: 65_529 }).join("0"); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send(payload); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe(payload); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should forward continuation frames with big payload (including on node 4.x)", async () => { + const { promise, resolve } = Promise.withResolvers(); + const payload = Array.from({ length: 65_530 }).join("0"); + + const destiny = new ws.WebSocketServer({ port: 0 }); + await new Promise((r) => destiny.on("listening", r)); + const sourcePort = (destiny.address() as AddressInfo).port; + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + const proxyPort = await proxyListen(proxy); + const proxyServer = proxy; + + const client = new ws.WebSocket("ws://127.0.0.1:" + proxyPort); + + client.on("open", () => { + client.send(payload); + }); + + client.on("message", (msg) => { + expect(msg.toString("utf8")).toBe("Hello over websockets"); + client.close(); + destiny.close(); + proxyServer.close(resolve); + }); + + destiny.on("connection", (socket) => { + socket.on("message", (msg) => { + expect(msg.toString("utf8")).toBe(payload); + socket.send("Hello over websockets"); + }); + }); + + await promise; + }); + + it("should not crash when client socket errors before upstream upgrade (issue #79)", async () => { + const { promise, resolve } = Promise.withResolvers(); + + // Backend that delays responding to the upgrade request + const server = http.createServer(); + server.on("upgrade", (_req, socket) => { + // Never respond — simulate a slow/hanging backend + socket.on("error", () => {}); + setTimeout(() => socket.destroy(), 500); + }); + const sourcePort = await listenOn(server); + + const proxy = httpProxy.createProxyServer({ + target: "ws://127.0.0.1:" + sourcePort, + ws: true, + }); + + // Intercept the ws stream pass to inject an error on the client socket + // before the upstream upgrade response arrives + proxy.before("ws", "", ((_req: any, socket: any) => { + // After the proxy sets up the upstream request but before the + // upgrade callback fires, simulate a client disconnect (ECONNRESET) + setTimeout(() => { + socket.destroy(new Error("read ECONNRESET")); + }, 50); + }) as any); + + const proxyPort = await proxyListen(proxy); + + proxy.on("error", () => { + // The error should be caught here, not crash the process + proxy.close(() => {}); + server.close(); + resolve(); + }); + + // Use a raw TCP socket to send a WebSocket upgrade request + const client = net.connect(proxyPort, "127.0.0.1", () => { + client.write( + "GET / HTTP/1.1\r\n" + + "Host: 127.0.0.1\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + + "Sec-WebSocket-Version: 13\r\n" + + "\r\n", + ); + }); + client.on("error", () => {}); + + await promise; + }); + }); +}); From 9a7906bdf7d8b9f004c9d090420bcdc93bf6f947 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Wed, 25 Mar 2026 20:03:06 +0100 Subject: [PATCH 3/3] fix(web-incoming): revert silent error swallowing in createErrorHandler Restore unconditional server.emit("error") so unhandled proxy errors throw instead of being silently dropped when no listener exists. --- src/middleware/web-incoming.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/middleware/web-incoming.ts b/src/middleware/web-incoming.ts index 953a655..cb9e574 100644 --- a/src/middleware/web-incoming.ts +++ b/src/middleware/web-incoming.ts @@ -140,7 +140,7 @@ export const stream = defineProxyMiddleware((req, res, options, server, head, ca if (callback) { callback(err, req, res, url); - } else if (server.listenerCount("error") > 0) { + } else { server.emit("error", err, req, res, url); } };