diff --git a/app/models/server/raw/OmnichannelQueue.ts b/app/models/server/raw/OmnichannelQueue.ts index 74a2e1d9f8029..bca8878d86098 100644 --- a/app/models/server/raw/OmnichannelQueue.ts +++ b/app/models/server/raw/OmnichannelQueue.ts @@ -32,12 +32,22 @@ export class OmnichannelQueueRaw extends BaseRaw { } async lockQueue() { + const date = new Date(); const result = await this.col.findOneAndUpdate({ _id: UNIQUE_QUEUE_ID, - locked: false, + $or: [{ + locked: true, + lockedAt: { + $lte: new Date(date.getTime() - 5000), + }, + }, { + locked: false, + }], }, { $set: { locked: true, + // apply 5 secs lock lifetime + lockedAt: new Date(), }, }, { sort: { @@ -55,6 +65,9 @@ export class OmnichannelQueueRaw extends BaseRaw { $set: { locked: false, }, + $unset: { + lockedAt: 1, + }, }, { sort: { _id: 1, diff --git a/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js b/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js index 1da3c3ef1e381..0109eceb75fde 100644 --- a/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js +++ b/ee/app/livechat-enterprise/server/lib/LivechatEnterprise.js @@ -249,13 +249,22 @@ const queueWorker = { async checkQueue(queue) { queueLogger.debug(`Processing items for queue ${ queue || 'Public' }`); - if (await OmnichannelQueue.lockQueue()) { - await processWaitingQueue(queue); - queueLogger.debug(`Queue ${ queue || 'Public' } processed. Unlocking`); - await OmnichannelQueue.unlockQueue(); + try { + if (await OmnichannelQueue.lockQueue()) { + await processWaitingQueue(queue); + queueLogger.debug(`Queue ${ queue || 'Public' } processed. Unlocking`); + await OmnichannelQueue.unlockQueue(); + } else { + queueLogger.debug('Queue locked. Waiting'); + } + } catch (e) { + queueLogger.error({ + msg: `Error processing queue ${ queue || 'public' }`, + err: e, + }); + } finally { + this.execute(); } - - this.execute(); }, };