From bd404106a154abf21e9fcee11bb78bd860b427b5 Mon Sep 17 00:00:00 2001 From: Branislav Katreniak Date: Fri, 30 Apr 2021 15:28:49 +0200 Subject: [PATCH] websocket: fix write back-pressure Websocket transport is eagerly writing to underlaying websocket without respecting back-pressure. When an event is emitted to multiple clients, socket.io adapter sends the same packet object to all socket clients. These packet objects are shared for all clients inside room. Once the packet is sent to transport, transport prepares buffer with transport headers and packet data and the sharing among clients is lost. This change significantly reduces memory usage when many packets are emitted to many clients in a burst. This change causes that buffered data is sent to clients more evenly packet by packet. --- lib/transports/websocket.js | 46 +++++++++++++++++++------------------ test/server.js | 4 +++- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 500d7ad44..304f5c641 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -63,33 +63,35 @@ class WebSocket extends Transport { * @api private */ send(packets) { - for (let i = 0; i < packets.length; i++) { - const packet = packets[i]; + const packet = packets.shift(); + if (typeof packet === "undefined") { + this.writable = true; + this.emit("drain"); + return; + } - // always creates a new object since ws modifies it - const opts = {}; - if (packet.options) { - opts.compress = packet.options.compress; - } + // always creates a new object since ws modifies it + const opts = {}; + if (packet.options) { + opts.compress = packet.options.compress; + } - this.parser.encodePacket(packet, this.supportsBinary, data => { - if (this.perMessageDeflate) { - const len = - "string" === typeof data ? Buffer.byteLength(data) : data.length; - if (len < this.perMessageDeflate.threshold) { - opts.compress = false; - } + this.parser.encodePacket(packet, this.supportsBinary, data => { + if (this.perMessageDeflate) { + const len = + "string" === typeof data ? Buffer.byteLength(data) : data.length; + if (len < this.perMessageDeflate.threshold) { + opts.compress = false; } - debug('writing "%s"', data); - this.writable = false; + } + debug('writing "%s"', data); + this.writable = false; - this.socket.send(data, opts, err => { - if (err) return this.onError("write error", err.stack); - this.writable = true; - this.emit("drain"); - }); + this.socket.send(data, opts, err => { + if (err) return this.onError("write error", err.stack); + this.send(packets); }); - } + }); } /** diff --git a/test/server.js b/test/server.js index c72ac0771..758897c49 100644 --- a/test/server.js +++ b/test/server.js @@ -1681,7 +1681,9 @@ describe("server", () => { conn.send("a"); conn.send("b"); conn.send("c"); - conn.close(); + setTimeout(() => { + conn.close(); + }, 50); }); socket.on("open", () => {