Skip to content
32 changes: 28 additions & 4 deletions lib/message-queue.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
const _ = require('lodash');

// TODO... what indexes should be created and should this be responsible for creating them?

function MessageQueue() {
const self = this;

self.errorHandler = console.error;

self.databasePromise = null;
self.collectionName = '_queue';
self.createIndex = true;
self.indexName = 'poll-index';

self.pollingInterval = 1000;
self.processingTimeout = 30 * 1000;
Expand All @@ -20,7 +20,7 @@ function MessageQueue() {

self.registerWorker = function(type, promise) {
_workers[type] = promise;
_startPolling();
return _startPolling();
Comment thread
jacobgardner marked this conversation as resolved.
};

self.stopPolling = function() {
Expand Down Expand Up @@ -80,13 +80,37 @@ function MessageQueue() {

//region Private Helper Methods

function _startPolling() {
async function _startPolling() {
if (!_pollingIntervalId) {
// Temporarily assigning polling interval to prevent multiple polls starting while index is being created
_pollingIntervalId = -1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this. Does the await on line 93 not actually keep line 97 from starting the polling?

Copy link
Copy Markdown
Contributor Author

@jacobgardner jacobgardner Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to avoid a "race condition" when registerWorker is called twice in quick succession, which is what everyone will probably be doing anyway, including us.

// _pollingIntervalId is undefined
mq.registerWorker('typeOne', (queueItem) => {...});
// _startPolling called => !_pollingIntervalId === true => createIndex awaiting
mq.registerWorker('typeTwo', (queueItem) => {...});
// _startPolling called => !_pollingIntervalId === true => createIndex awaiting again
// first createIndex finishes => _pollingIntervalId updated and setInterval called
// second createIndex finishes => _pollingIntervalId changed (first one lost) and setInterval called again. 

This is a pretty awkward place to be creating the index as it is...
I think I would rather do a major version bump, have the database promise passed in through the constructor and do a createIndex once that gets fulfilled and then start the poller (if register worker has been called).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes more sense now. Let's sleep on it and discuss further tomorrow.


if (self.createIndex) {
await _createQueueIndex();
}

// Try and find work at least once every pollingInterval
_pollingIntervalId = setInterval(_poll, self.pollingInterval);
}
}

async function _createQueueIndex() {
const collection = await _getCollection();

await collection.createIndex(
{
type: 1,
rejectedTime: 1,
nextReceivableTime: 1,
receivedTime: 1
},
{
sparse: false,
name: self.indexName
}
);
}

function _stopPolling() {
if (_pollingIntervalId) {
clearInterval(_pollingIntervalId);
Expand Down