From d512a9699def4a1540fcf388fa102b6ef0143f09 Mon Sep 17 00:00:00 2001 From: "Jacob V. Gardner" Date: Wed, 9 Oct 2019 10:36:23 -0500 Subject: [PATCH 1/5] Add index to queue when starting polling --- lib/message-queue.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/message-queue.js b/lib/message-queue.js index c6d7a8f..71f136c 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -1,7 +1,5 @@ var _ = require('lodash'); -// TODO... what indexes should be created and should this be responsible for creating them? - function MessageQueue() { var self = this; @@ -20,7 +18,7 @@ function MessageQueue() { self.registerWorker = function(type, promise) { _workers[type] = promise; - _startPolling(); + return _startPolling(); }; self.stopPolling = function() { @@ -85,8 +83,19 @@ function MessageQueue() { //region Private Helper Methods - function _startPolling() { + async function _startPolling() { if (!_pollingIntervalId) { + // Temporarily assigning polling interval to prevent multiple polls starting while index is being created + _pollingIntervalId = -1; + + const db = await self.databasePromise(); + await db.collection(self.collectionName).createIndex({ + type: 1, + rejectedTime: 1, + nextReceivableTime: 1, + receivedTime: 1 + }); + // Try and find work at least once every pollingInterval _pollingIntervalId = setInterval(_poll, self.pollingInterval); } From ea48302ac42ffacc6e1621e38a2a163597ba3622 Mon Sep 17 00:00:00 2001 From: "Jacob V. Gardner" Date: Wed, 9 Oct 2019 10:42:38 -0500 Subject: [PATCH 2/5] Hide queue creation behind flag --- lib/message-queue.js | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/message-queue.js b/lib/message-queue.js index 71f136c..706463e 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -7,6 +7,7 @@ function MessageQueue() { self.databasePromise = null; self.collectionName = '_queue'; + self.createIndex = false; self.pollingInterval = 1000; self.processingTimeout = 30 * 1000; @@ -88,13 +89,15 @@ function MessageQueue() { // Temporarily assigning polling interval to prevent multiple polls starting while index is being created _pollingIntervalId = -1; - const db = await self.databasePromise(); - await db.collection(self.collectionName).createIndex({ - type: 1, - rejectedTime: 1, - nextReceivableTime: 1, - receivedTime: 1 - }); + if (self.createIndex) { + const db = await self.databasePromise(); + await db.collection(self.collectionName).createIndex({ + type: 1, + rejectedTime: 1, + nextReceivableTime: 1, + receivedTime: 1 + }); + } // Try and find work at least once every pollingInterval _pollingIntervalId = setInterval(_poll, self.pollingInterval); From a4022bb8755eb43851beb59bc60e85a23879466b Mon Sep 17 00:00:00 2001 From: "Jacob V. Gardner" Date: Wed, 9 Oct 2019 10:44:38 -0500 Subject: [PATCH 3/5] Extract index creation code into its own private method --- lib/message-queue.js | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/message-queue.js b/lib/message-queue.js index 706463e..85e4340 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -90,13 +90,7 @@ function MessageQueue() { _pollingIntervalId = -1; if (self.createIndex) { - const db = await self.databasePromise(); - await db.collection(self.collectionName).createIndex({ - type: 1, - rejectedTime: 1, - nextReceivableTime: 1, - receivedTime: 1 - }); + await _createQueueIndex(); } // Try and find work at least once every pollingInterval @@ -104,6 +98,16 @@ function MessageQueue() { } } + async function _createQueueIndex() { + const db = await self.databasePromise(); + await db.collection(self.collectionName).createIndex({ + type: 1, + rejectedTime: 1, + nextReceivableTime: 1, + receivedTime: 1 + }); + } + function _stopPolling() { if (_pollingIntervalId) { clearInterval(_pollingIntervalId); From 215c2b11a0d857456046dd36a56bd50b87353ab0 Mon Sep 17 00:00:00 2001 From: "Jacob V. Gardner" Date: Wed, 9 Oct 2019 11:01:38 -0500 Subject: [PATCH 4/5] Use _getCollection in _createQueueIndex --- lib/message-queue.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/message-queue.js b/lib/message-queue.js index 132fa50..f66157b 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -99,8 +99,9 @@ function MessageQueue() { } async function _createQueueIndex() { - const db = await self.databasePromise(); - await db.collection(self.collectionName).createIndex({ + const collection = await _getCollection(); + + await collection.createIndex({ type: 1, rejectedTime: 1, nextReceivableTime: 1, From 25b2bc3fc74444dd74e86ca2b0e139928bc378f2 Mon Sep 17 00:00:00 2001 From: "Jacob V. Gardner" Date: Thu, 10 Oct 2019 10:05:55 -0500 Subject: [PATCH 5/5] Address some review feedback --- lib/message-queue.js | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/message-queue.js b/lib/message-queue.js index cc6a5c2..b93f695 100644 --- a/lib/message-queue.js +++ b/lib/message-queue.js @@ -7,7 +7,8 @@ function MessageQueue() { self.databasePromise = null; self.collectionName = '_queue'; - self.createIndex = false; + self.createIndex = true; + self.indexName = 'poll-index'; self.pollingInterval = 1000; self.processingTimeout = 30 * 1000; @@ -96,12 +97,18 @@ function MessageQueue() { async function _createQueueIndex() { const collection = await _getCollection(); - await collection.createIndex({ - type: 1, - rejectedTime: 1, - nextReceivableTime: 1, - receivedTime: 1 - }); + await collection.createIndex( + { + type: 1, + rejectedTime: 1, + nextReceivableTime: 1, + receivedTime: 1 + }, + { + sparse: false, + name: self.indexName + } + ); } function _stopPolling() {