Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ node_modules/*
*.log
*~
package-lock.json
*.d.ts
*.js.map
*.js
!test/*.js
3 changes: 3 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
**/*
!*.d.ts
!*.js.map
149 changes: 94 additions & 55 deletions mongodb-queue.js → mongodb-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,74 @@
*
**/

const crypto = require('crypto')
import {randomBytes} from 'crypto';
import {Collection, Db, Filter, FindOneAndUpdateOptions, Sort, UpdateFilter, WithId} from 'mongodb';

function id() {
return crypto.randomBytes(16).toString('hex')
function id(): string {
return randomBytes(16).toString('hex')
}

function now() {
function now(): string {
return (new Date()).toISOString()
}

function nowPlusSecs(secs) {
function nowPlusSecs(secs: number): string {
return (new Date(Date.now() + secs * 1000)).toISOString()
}

module.exports = class Queue {
constructor(db, name, opts) {
export type QueueOptions = {
visibility?: number;
delay?: number;
deadQueue?: Queue;
maxRetries?: number;
}

export type AddOptions = {
delay?: number;
}

export type GetOptions = {
visibility?: number;
}

export type PingOptions = {
visibility?: number;
resetTries?: boolean;
}

export type BaseMessage<T extends any = any> = {
payload: T;
visible: string;
}

export type Message<T extends any = any> = BaseMessage<T> & {
ack: string;
tries: number;
deleted?: string;
}

export type ExternalMessage<T extends any = any> = {
id: string;
ack: string;
payload: T;
tries: number;
}

export default class Queue<T extends any = any> {
private readonly col: Collection<Partial<Message<T>>>;
private readonly visibility: number;
private readonly delay: number;
private readonly maxRetries: number;
private readonly deadQueue: Queue;

constructor(db: Db, name: string, opts: QueueOptions = {}) {
if (!db) {
throw new Error("mongodb-queue: provide a mongodb.MongoClient.db")
}
if (!name) {
throw new Error("mongodb-queue: provide a queue name")
}
opts = opts || {}

this.db = db
this.name = name
this.col = db.collection(name)
this.visibility = opts.visibility || 30
this.delay = opts.delay || 0
Expand All @@ -46,19 +88,19 @@ module.exports = class Queue {
}
}

async createIndexes() {
async createIndexes(): Promise<void> {
await Promise.all([
this.col.createIndex({deleted: 1, visible: 1}),
this.col.createIndex({ack: 1}, {unique: true, sparse: true}),
this.col.createIndex({deleted: 1}, {sparse: true})
])
}

async add(payload, opts = {}) {
async add(payload: T | T[], opts: AddOptions = {}): Promise<string> {
const delay = opts.delay || this.delay
const visible = delay ? nowPlusSecs(delay) : now()

const msgs = []
const msgs: BaseMessage<T>[] = []
if (payload instanceof Array) {
if (payload.length === 0) {
throw new Error('Queue.add(): Array payload length must be greater than 0')
Expand All @@ -81,33 +123,33 @@ module.exports = class Queue {
return '' + results.insertedIds[0]
}

async get(opts = {}) {
async get(opts: GetOptions = {}): Promise<ExternalMessage<T> | null> {
const visibility = opts.visibility || this.visibility
const query = {
deleted: null,
const query: Filter<Partial<Message<T>>> = {
deleted: {$exists: false},
visible: {$lte: now()},
}
const sort = {
const sort: Sort = {
_id: 1
}
const update = {
const update: UpdateFilter<Message<T>> = {
$inc: {tries: 1},
$set: {
ack: id(),
visible: nowPlusSecs(visibility),
}
}
const options = {
const options: FindOneAndUpdateOptions = {
sort: sort,
returnDocument: 'after'
}

const result = await this.col.findOneAndUpdate(query, update, options)
let msg = result.value
if (!msg) return
const msg = result.value as WithId<Message<T>>;
if (!msg) return null

// convert to an external representation
msg = {
const externalMessage: ExternalMessage<T> = {
// convert '_id' to an 'id' string
id: '' + msg._id,
ack: msg.ack,
Expand All @@ -121,32 +163,35 @@ module.exports = class Queue {
// 1) add this message to the deadQueue
// 2) ack this message from the regular queue
// 3) call ourself to return a new message (if exists)
await this.deadQueue.add(msg)
await this.deadQueue.add(externalMessage)
await this.ack(msg.ack)
return this.get()
}

return msg
return externalMessage
}

async ping(ack, opts = {}) {
async ping(ack: string, opts: PingOptions = {}): Promise<string> {
const visibility = opts.visibility || this.visibility
const query = {
const query: Filter<Partial<Message<T>>> = {
ack: ack,
visible: {$gt: now()},
deleted: null,
deleted: {$exists: false},
}
const update = {
const update: UpdateFilter<Message<T>> = {
$set: {
visible: nowPlusSecs(visibility)
}
}
const options = {
const options: FindOneAndUpdateOptions = {
returnDocument: 'after'
}

if (opts.resetTries) {
update.$set.tries = 0
update.$set = {
...update.$set,
tries: 0,
}
}

const msg = await this.col.findOneAndUpdate(query, update, options)
Expand All @@ -156,18 +201,18 @@ module.exports = class Queue {
return '' + msg.value._id
}

async ack(ack) {
const query = {
async ack(ack: string): Promise<string> {
const query: Filter<Partial<Message<T>>> = {
ack: ack,
visible: {$gt: now()},
deleted: null,
deleted: {$exists: false},
}
const update = {
const update: UpdateFilter<Message<T>> = {
$set: {
deleted: now(),
}
}
const options = {
const options: FindOneAndUpdateOptions = {
returnDocument: 'after'
}
const msg = await this.col.findOneAndUpdate(query, update, options)
Expand All @@ -177,42 +222,36 @@ module.exports = class Queue {
return '' + msg.value._id
}

async clean() {
async clean(): Promise<void> {
const query = {
deleted: {$exists: true},
}

return this.col.deleteMany(query)
await this.col.deleteMany(query)
}

async total() {
async total(): Promise<number> {
return this.col.countDocuments()
}

async size() {
const query = {
deleted: null,
async size(): Promise<number> {
return this.col.countDocuments({
deleted: {$exists: false},
visible: {$lte: now()},
}

return this.col.countDocuments(query)
})
}

async inFlight() {
const query = {
async inFlight(): Promise<number> {
return this.col.countDocuments({
ack: {$exists: true},
visible: {$gt: now()},
deleted: null,
}

return this.col.countDocuments(query)
deleted: {$exists: false},
})
}

async done() {
const query = {
async done(): Promise<number> {
return this.col.countDocuments({
deleted: {$exists: true},
}

return this.col.countDocuments(query)
})
}
}
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
{
"name": "@reedsy/mongodb-queue",
"version": "4.0.0-reedsy-3.0.0",
"version": "5.0.0",
"description": "Message queues which uses MongoDB.",
"main": "mongodb-queue.js",
"scripts": {
"build": "tsc --pretty",
"prepublish": "npm run build",
"pretest": "npm run build",
"test": "set -e; for FILE in test/*.js; do echo --- $FILE ---; node $FILE; done"
},
"dependencies": {},
"devDependencies": {
"mongodb": "^5.0.0",
"tape": "^4.10.1"
"tape": "^4.10.1",
"typescript": "^4.9.5"
},
"peerDependencies": {
"mongodb": "^4.0.0 || ^5.0.0"
Expand Down
6 changes: 0 additions & 6 deletions tag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,5 @@ else
echo "Deploying version $VERSION"
fi

echo '!/dist' >> .gitignore

git checkout -b release-$VERSION
git add .gitignore
git add --all dist/
git commit --message "Release version $VERSION"
git tag $VERSION
git push origin refs/tags/$VERSION
2 changes: 1 addition & 1 deletion test/clean.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

setup().then(({client, db}) => {

Expand Down
2 changes: 1 addition & 1 deletion test/dead-queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

setup().then(({client, db}) => {

Expand Down
2 changes: 1 addition & 1 deletion test/default.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

setup().then(({client, db}) => {

Expand Down
2 changes: 1 addition & 1 deletion test/delay.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default
const {timeout} = require('./_timeout.js')

setup().then(({client, db}) => {
Expand Down
2 changes: 1 addition & 1 deletion test/indexes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

setup().then(({client, db}) => {

Expand Down
2 changes: 1 addition & 1 deletion test/many.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

const total = 250

Expand Down
2 changes: 1 addition & 1 deletion test/multi.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const test = require('tape')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

const total = 250

Expand Down
2 changes: 1 addition & 1 deletion test/ping.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const test = require('tape')
const {timeout} = require('./_timeout')

const setup = require('./setup.js')
const MongoDbQueue = require('../')
const MongoDbQueue = require('../').default

setup().then(({client, db}) => {

Expand Down
Loading