diff --git a/lib/message-queue.js b/lib/message-queue.js index 17c398b..b93f695 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -1,7 +1,5 @@ const _ = require('lodash'); -// TODO... what indexes should be created and should this be responsible for creating them? - function MessageQueue() { const self = this; @@ -9,6 +7,8 @@ function MessageQueue() { self.databasePromise = null; self.collectionName = '_queue'; + self.createIndex = true; + self.indexName = 'poll-index'; self.pollingInterval = 1000; self.processingTimeout = 30 * 1000; @@ -20,7 +20,7 @@ function MessageQueue() { self.registerWorker = function(type, promise) { _workers[type] = promise; - _startPolling(); + return _startPolling(); }; self.stopPolling = function() { @@ -80,13 +80,37 @@ function MessageQueue() { //region Private Helper Methods - function _startPolling() { + async function _startPolling() { if (!_pollingIntervalId) { + // Temporarily assigning polling interval to prevent multiple polls starting while index is being created + _pollingIntervalId = -1; + + if (self.createIndex) { + await _createQueueIndex(); + } + // Try and find work at least once every pollingInterval _pollingIntervalId = setInterval(_poll, self.pollingInterval); } } + async function _createQueueIndex() { + const collection = await _getCollection(); + + await collection.createIndex( + { + type: 1, + rejectedTime: 1, + nextReceivableTime: 1, + receivedTime: 1 + }, + { + sparse: false, + name: self.indexName + } + ); + } + function _stopPolling() { if (_pollingIntervalId) { clearInterval(_pollingIntervalId);