From d5056fdaec859f6c933cceb3bbc6d17c6d8f20e5 Mon Sep 17 00:00:00 2001 From: Mega Date: Mon, 30 Jan 2023 08:08:30 +0500 Subject: [PATCH 1/2] fix socket.kickOut() --- serversocket.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/serversocket.js b/serversocket.js index 5ddd2c8..96498b0 100644 --- a/serversocket.js +++ b/serversocket.js @@ -1427,10 +1427,19 @@ AGServerSocket.prototype.deauthenticate = async function (options) { }; AGServerSocket.prototype.kickOut = function (channel, message) { - delete this.channelSubscriptions[channel]; - this.channelSubscriptionsCount--; - this.transmit('#kickOut', {channel, message}); - return this.server.brokerEngine.unsubscribeSocket(this, channel); + let channels = channel; + if (!channels) { + channels = Object.keys(this.channelSubscriptions); + } + if (!Array.isArray(channels)) { + channels = [channel]; + } + for (const channelName of channels) { + delete this.channelSubscriptions[channelName]; + this.channelSubscriptionsCount--; + this.transmit('#kickOut', {channel: channelName, message}); + this.server.brokerEngine.unsubscribeSocket(this, channelName); + } }; AGServerSocket.prototype.subscriptions = function () { From 6119d81eff3d371e6c68a9b15b592490a12729be Mon Sep 17 00:00:00 2001 From: Mega Date: Thu, 2 Feb 2023 03:26:58 +0500 Subject: [PATCH 2/2] account for broker errors when unsubscribing a socket --- serversocket.js | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/serversocket.js b/serversocket.js index 96498b0..e255369 100644 --- a/serversocket.js +++ b/serversocket.js @@ -543,12 +543,11 @@ AGServerSocket.prototype._processSubscribeRequest = async function (request) { }; AGServerSocket.prototype._unsubscribeFromAllChannels = function () { - Object.keys(this.channelSubscriptions).forEach((channelName) => { - this._unsubscribe(channelName); - }); + const channels = Object.keys(this.channelSubscriptions); + return Promise.all(channels.map((channel) => this._unsubscribe(channel))); }; -AGServerSocket.prototype._unsubscribe = function (channel) { +AGServerSocket.prototype._unsubscribe = async function (channel) { if (typeof channel !== 'string') { throw new InvalidActionError( `Socket ${this.id} tried to unsubscribe from an invalid channel name` @@ -560,21 +559,26 @@ AGServerSocket.prototype._unsubscribe = function (channel) { ); } - delete this.channelSubscriptions[channel]; - if (this.channelSubscriptionsCount != null) { - this.channelSubscriptionsCount--; + try { + await this.server.brokerEngine.unsubscribeSocket(this, channel); + delete this.channelSubscriptions[channel]; + if (this.channelSubscriptionsCount != null) { + this.channelSubscriptionsCount--; + } + this.emit('unsubscribe', {channel}); + this.server.emit('unsubscription', {socket: this, channel}); + } catch (err) { + const error = new BrokerError( + `Failed to unsubscribe socket from the ${channel} channel - ${err}` + ); + this.emitError(error); } - - this.server.brokerEngine.unsubscribeSocket(this, channel); - - this.emit('unsubscribe', {channel}); - this.server.emit('unsubscription', {socket: this, channel}); }; AGServerSocket.prototype._processUnsubscribePacket = async function (packet) { let channel = packet.data; try { - this._unsubscribe(channel); + await this._unsubscribe(channel); } catch (err) { let error = new BrokerError( `Failed to unsubscribe socket from the ${channel} channel - ${err}` @@ -586,7 +590,7 @@ AGServerSocket.prototype._processUnsubscribePacket = async function (packet) { AGServerSocket.prototype._processUnsubscribeRequest = async function (request) { let channel = request.data; try { - this._unsubscribe(channel); + await this._unsubscribe(channel); } catch (err) { let error = new BrokerError( `Failed to unsubscribe socket from the ${channel} channel - ${err}` @@ -1434,12 +1438,10 @@ AGServerSocket.prototype.kickOut = function (channel, message) { if (!Array.isArray(channels)) { channels = [channel]; } - for (const channelName of channels) { - delete this.channelSubscriptions[channelName]; - this.channelSubscriptionsCount--; + return Promise.all(channels.map((channelName) => { this.transmit('#kickOut', {channel: channelName, message}); - this.server.brokerEngine.unsubscribeSocket(this, channelName); - } + return this._unsubscribe(channelName); + })); }; AGServerSocket.prototype.subscriptions = function () {