diff --git a/serversocket.js b/serversocket.js index 5ddd2c8..e255369 100644 --- a/serversocket.js +++ b/serversocket.js @@ -543,12 +543,11 @@ AGServerSocket.prototype._processSubscribeRequest = async function (request) { }; AGServerSocket.prototype._unsubscribeFromAllChannels = function () { - Object.keys(this.channelSubscriptions).forEach((channelName) => { - this._unsubscribe(channelName); - }); + const channels = Object.keys(this.channelSubscriptions); + return Promise.all(channels.map((channel) => this._unsubscribe(channel))); }; -AGServerSocket.prototype._unsubscribe = function (channel) { +AGServerSocket.prototype._unsubscribe = async function (channel) { if (typeof channel !== 'string') { throw new InvalidActionError( `Socket ${this.id} tried to unsubscribe from an invalid channel name` @@ -560,21 +559,26 @@ AGServerSocket.prototype._unsubscribe = function (channel) { ); } - delete this.channelSubscriptions[channel]; - if (this.channelSubscriptionsCount != null) { - this.channelSubscriptionsCount--; + try { + await this.server.brokerEngine.unsubscribeSocket(this, channel); + delete this.channelSubscriptions[channel]; + if (this.channelSubscriptionsCount != null) { + this.channelSubscriptionsCount--; + } + this.emit('unsubscribe', {channel}); + this.server.emit('unsubscription', {socket: this, channel}); + } catch (err) { + const error = new BrokerError( + `Failed to unsubscribe socket from the ${channel} channel - ${err}` + ); + this.emitError(error); } - - this.server.brokerEngine.unsubscribeSocket(this, channel); - - this.emit('unsubscribe', {channel}); - this.server.emit('unsubscription', {socket: this, channel}); }; AGServerSocket.prototype._processUnsubscribePacket = async function (packet) { let channel = packet.data; try { - this._unsubscribe(channel); + await this._unsubscribe(channel); } catch (err) { let error = new BrokerError( `Failed to unsubscribe socket from the ${channel} channel - ${err}` @@ -586,7 +590,7 @@ AGServerSocket.prototype._processUnsubscribePacket = async function (packet) { AGServerSocket.prototype._processUnsubscribeRequest = async function (request) { let channel = request.data; try { - this._unsubscribe(channel); + await this._unsubscribe(channel); } catch (err) { let error = new BrokerError( `Failed to unsubscribe socket from the ${channel} channel - ${err}` @@ -1427,10 +1431,17 @@ AGServerSocket.prototype.deauthenticate = async function (options) { }; AGServerSocket.prototype.kickOut = function (channel, message) { - delete this.channelSubscriptions[channel]; - this.channelSubscriptionsCount--; - this.transmit('#kickOut', {channel, message}); - return this.server.brokerEngine.unsubscribeSocket(this, channel); + let channels = channel; + if (!channels) { + channels = Object.keys(this.channelSubscriptions); + } + if (!Array.isArray(channels)) { + channels = [channel]; + } + return Promise.all(channels.map((channelName) => { + this.transmit('#kickOut', {channel: channelName, message}); + return this._unsubscribe(channelName); + })); }; AGServerSocket.prototype.subscriptions = function () {