Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions serversocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,11 @@ AGServerSocket.prototype._processSubscribeRequest = async function (request) {
};

AGServerSocket.prototype._unsubscribeFromAllChannels = function () {
Object.keys(this.channelSubscriptions).forEach((channelName) => {
this._unsubscribe(channelName);
});
const channels = Object.keys(this.channelSubscriptions);
return Promise.all(channels.map((channel) => this._unsubscribe(channel)));
};

AGServerSocket.prototype._unsubscribe = function (channel) {
AGServerSocket.prototype._unsubscribe = async function (channel) {
if (typeof channel !== 'string') {
throw new InvalidActionError(
`Socket ${this.id} tried to unsubscribe from an invalid channel name`
Expand All @@ -560,21 +559,26 @@ AGServerSocket.prototype._unsubscribe = function (channel) {
);
}

delete this.channelSubscriptions[channel];
if (this.channelSubscriptionsCount != null) {
this.channelSubscriptionsCount--;
try {
await this.server.brokerEngine.unsubscribeSocket(this, channel);
delete this.channelSubscriptions[channel];
if (this.channelSubscriptionsCount != null) {
this.channelSubscriptionsCount--;
}
this.emit('unsubscribe', {channel});
this.server.emit('unsubscription', {socket: this, channel});
} catch (err) {
const error = new BrokerError(
`Failed to unsubscribe socket from the ${channel} channel - ${err}`
);
this.emitError(error);
}

this.server.brokerEngine.unsubscribeSocket(this, channel);

this.emit('unsubscribe', {channel});
this.server.emit('unsubscription', {socket: this, channel});
};

AGServerSocket.prototype._processUnsubscribePacket = async function (packet) {
let channel = packet.data;
try {
this._unsubscribe(channel);
await this._unsubscribe(channel);
} catch (err) {
let error = new BrokerError(
`Failed to unsubscribe socket from the ${channel} channel - ${err}`
Expand All @@ -586,7 +590,7 @@ AGServerSocket.prototype._processUnsubscribePacket = async function (packet) {
AGServerSocket.prototype._processUnsubscribeRequest = async function (request) {
let channel = request.data;
try {
this._unsubscribe(channel);
await this._unsubscribe(channel);
} catch (err) {
let error = new BrokerError(
`Failed to unsubscribe socket from the ${channel} channel - ${err}`
Expand Down Expand Up @@ -1427,10 +1431,17 @@ AGServerSocket.prototype.deauthenticate = async function (options) {
};

AGServerSocket.prototype.kickOut = function (channel, message) {
delete this.channelSubscriptions[channel];
this.channelSubscriptionsCount--;
this.transmit('#kickOut', {channel, message});
return this.server.brokerEngine.unsubscribeSocket(this, channel);
let channels = channel;
if (!channels) {
channels = Object.keys(this.channelSubscriptions);
}
if (!Array.isArray(channels)) {
channels = [channel];
}
return Promise.all(channels.map((channelName) => {
this.transmit('#kickOut', {channel: channelName, message});
return this._unsubscribe(channelName);
}));
};

AGServerSocket.prototype.subscriptions = function () {
Expand Down