diff --git a/.gitignore b/.gitignore index 06ab5e5..7540984 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ node_modules/* *.log *~ package-lock.json +*.d.ts +*.js.map +*.js +!test/*.js diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..a09524a --- /dev/null +++ b/.npmignore @@ -0,0 +1,3 @@ +**/* +!*.d.ts +!*.js.map diff --git a/mongodb-queue.js b/mongodb-queue.ts similarity index 55% rename from mongodb-queue.js rename to mongodb-queue.ts index fb781d7..71ad175 100644 --- a/mongodb-queue.js +++ b/mongodb-queue.ts @@ -10,32 +10,74 @@ * **/ -const crypto = require('crypto') +import {randomBytes} from 'crypto'; +import {Collection, Db, Filter, FindOneAndUpdateOptions, Sort, UpdateFilter, WithId} from 'mongodb'; -function id() { - return crypto.randomBytes(16).toString('hex') +function id(): string { + return randomBytes(16).toString('hex') } -function now() { +function now(): string { return (new Date()).toISOString() } -function nowPlusSecs(secs) { +function nowPlusSecs(secs: number): string { return (new Date(Date.now() + secs * 1000)).toISOString() } -module.exports = class Queue { - constructor(db, name, opts) { +export type QueueOptions = { + visibility?: number; + delay?: number; + deadQueue?: Queue; + maxRetries?: number; +} + +export type AddOptions = { + delay?: number; +} + +export type GetOptions = { + visibility?: number; +} + +export type PingOptions = { + visibility?: number; + resetTries?: boolean; +} + +export type BaseMessage = { + payload: T; + visible: string; +} + +export type Message = BaseMessage & { + ack: string; + tries: number; + deleted?: string; +} + +export type ExternalMessage = { + id: string; + ack: string; + payload: T; + tries: number; +} + +export default class Queue { + private readonly col: Collection>>; + private readonly visibility: number; + private readonly delay: number; + private readonly maxRetries: number; + private readonly deadQueue: Queue; + + constructor(db: Db, name: string, opts: QueueOptions = {}) { if (!db) { throw new Error("mongodb-queue: provide a mongodb.MongoClient.db") } if (!name) { throw new Error("mongodb-queue: provide a queue name") } - opts = opts || {} - this.db = db - this.name = name this.col = db.collection(name) this.visibility = opts.visibility || 30 this.delay = opts.delay || 0 @@ -46,7 +88,7 @@ module.exports = class Queue { } } - async createIndexes() { + async createIndexes(): Promise { await Promise.all([ this.col.createIndex({deleted: 1, visible: 1}), this.col.createIndex({ack: 1}, {unique: true, sparse: true}), @@ -54,11 +96,11 @@ module.exports = class Queue { ]) } - async add(payload, opts = {}) { + async add(payload: T | T[], opts: AddOptions = {}): Promise { const delay = opts.delay || this.delay const visible = delay ? nowPlusSecs(delay) : now() - const msgs = [] + const msgs: BaseMessage[] = [] if (payload instanceof Array) { if (payload.length === 0) { throw new Error('Queue.add(): Array payload length must be greater than 0') @@ -81,33 +123,33 @@ module.exports = class Queue { return '' + results.insertedIds[0] } - async get(opts = {}) { + async get(opts: GetOptions = {}): Promise | null> { const visibility = opts.visibility || this.visibility - const query = { - deleted: null, + const query: Filter>> = { + deleted: {$exists: false}, visible: {$lte: now()}, } - const sort = { + const sort: Sort = { _id: 1 } - const update = { + const update: UpdateFilter> = { $inc: {tries: 1}, $set: { ack: id(), visible: nowPlusSecs(visibility), } } - const options = { + const options: FindOneAndUpdateOptions = { sort: sort, returnDocument: 'after' } const result = await this.col.findOneAndUpdate(query, update, options) - let msg = result.value - if (!msg) return + const msg = result.value as WithId>; + if (!msg) return null // convert to an external representation - msg = { + const externalMessage: ExternalMessage = { // convert '_id' to an 'id' string id: '' + msg._id, ack: msg.ack, @@ -121,32 +163,35 @@ module.exports = class Queue { // 1) add this message to the deadQueue // 2) ack this message from the regular queue // 3) call ourself to return a new message (if exists) - await this.deadQueue.add(msg) + await this.deadQueue.add(externalMessage) await this.ack(msg.ack) return this.get() } - return msg + return externalMessage } - async ping(ack, opts = {}) { + async ping(ack: string, opts: PingOptions = {}): Promise { const visibility = opts.visibility || this.visibility - const query = { + const query: Filter>> = { ack: ack, visible: {$gt: now()}, - deleted: null, + deleted: {$exists: false}, } - const update = { + const update: UpdateFilter> = { $set: { visible: nowPlusSecs(visibility) } } - const options = { + const options: FindOneAndUpdateOptions = { returnDocument: 'after' } if (opts.resetTries) { - update.$set.tries = 0 + update.$set = { + ...update.$set, + tries: 0, + } } const msg = await this.col.findOneAndUpdate(query, update, options) @@ -156,18 +201,18 @@ module.exports = class Queue { return '' + msg.value._id } - async ack(ack) { - const query = { + async ack(ack: string): Promise { + const query: Filter>> = { ack: ack, visible: {$gt: now()}, - deleted: null, + deleted: {$exists: false}, } - const update = { + const update: UpdateFilter> = { $set: { deleted: now(), } } - const options = { + const options: FindOneAndUpdateOptions = { returnDocument: 'after' } const msg = await this.col.findOneAndUpdate(query, update, options) @@ -177,42 +222,36 @@ module.exports = class Queue { return '' + msg.value._id } - async clean() { + async clean(): Promise { const query = { deleted: {$exists: true}, } - return this.col.deleteMany(query) + await this.col.deleteMany(query) } - async total() { + async total(): Promise { return this.col.countDocuments() } - async size() { - const query = { - deleted: null, + async size(): Promise { + return this.col.countDocuments({ + deleted: {$exists: false}, visible: {$lte: now()}, - } - - return this.col.countDocuments(query) + }) } - async inFlight() { - const query = { + async inFlight(): Promise { + return this.col.countDocuments({ ack: {$exists: true}, visible: {$gt: now()}, - deleted: null, - } - - return this.col.countDocuments(query) + deleted: {$exists: false}, + }) } - async done() { - const query = { + async done(): Promise { + return this.col.countDocuments({ deleted: {$exists: true}, - } - - return this.col.countDocuments(query) + }) } } diff --git a/package.json b/package.json index a0cd8a7..db139d9 100644 --- a/package.json +++ b/package.json @@ -1,15 +1,18 @@ { "name": "@reedsy/mongodb-queue", - "version": "4.0.0-reedsy-3.0.0", + "version": "5.0.0", "description": "Message queues which uses MongoDB.", "main": "mongodb-queue.js", "scripts": { + "build": "tsc --pretty", + "prepublish": "npm run build", + "pretest": "npm run build", "test": "set -e; for FILE in test/*.js; do echo --- $FILE ---; node $FILE; done" }, - "dependencies": {}, "devDependencies": { "mongodb": "^5.0.0", - "tape": "^4.10.1" + "tape": "^4.10.1", + "typescript": "^4.9.5" }, "peerDependencies": { "mongodb": "^4.0.0 || ^5.0.0" diff --git a/tag.sh b/tag.sh index eee381f..9f544d5 100755 --- a/tag.sh +++ b/tag.sh @@ -16,11 +16,5 @@ else echo "Deploying version $VERSION" fi -echo '!/dist' >> .gitignore - -git checkout -b release-$VERSION -git add .gitignore -git add --all dist/ -git commit --message "Release version $VERSION" git tag $VERSION git push origin refs/tags/$VERSION diff --git a/test/clean.js b/test/clean.js index 1624e65..5be1523 100644 --- a/test/clean.js +++ b/test/clean.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/dead-queue.js b/test/dead-queue.js index 13286c4..e49097c 100644 --- a/test/dead-queue.js +++ b/test/dead-queue.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/default.js b/test/default.js index 67a8d37..8d1b6ee 100644 --- a/test/default.js +++ b/test/default.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/delay.js b/test/delay.js index 61a4efa..582b20d 100644 --- a/test/delay.js +++ b/test/delay.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default const {timeout} = require('./_timeout.js') setup().then(({client, db}) => { diff --git a/test/indexes.js b/test/indexes.js index e250b7c..fddaaa8 100644 --- a/test/indexes.js +++ b/test/indexes.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/many.js b/test/many.js index 39c7659..6fc79b8 100644 --- a/test/many.js +++ b/test/many.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default const total = 250 diff --git a/test/multi.js b/test/multi.js index f34d6cb..9a4f0af 100644 --- a/test/multi.js +++ b/test/multi.js @@ -1,7 +1,7 @@ const test = require('tape') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default const total = 250 diff --git a/test/ping.js b/test/ping.js index 84cb5f4..7451a37 100644 --- a/test/ping.js +++ b/test/ping.js @@ -2,7 +2,7 @@ const test = require('tape') const {timeout} = require('./_timeout') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/stats.js b/test/stats.js index 6679321..424984f 100644 --- a/test/stats.js +++ b/test/stats.js @@ -2,7 +2,7 @@ const test = require('tape') const {timeout} = require('./_timeout') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/test/visibility.js b/test/visibility.js index 5fa6153..2596dac 100644 --- a/test/visibility.js +++ b/test/visibility.js @@ -2,7 +2,7 @@ const test = require('tape') const {timeout} = require('./_timeout') const setup = require('./setup.js') -const MongoDbQueue = require('../') +const MongoDbQueue = require('../').default setup().then(({client, db}) => { diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..77858df --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "module": "commonjs", + "target": "es6", + "noImplicitAny": true, + "noUnusedLocals": true, + "moduleResolution": "node", + "sourceMap": true, + "declaration": true, + }, + "exclude": [ + "test/" + ], +}