From 1839a69cb625c409505c9eae4e435342e16c7a77 Mon Sep 17 00:00:00 2001 From: Vinayak Sarawagi Date: Mon, 5 Aug 2024 14:54:58 +0530 Subject: [PATCH 1/4] add database as a driver for queues --- lib/queue/core/payloadBuilder.ts | 17 ++++++++ lib/queue/drivers/database.ts | 66 ++++++++++++++++++++++++++++++++ lib/queue/drivers/index.ts | 6 ++- lib/queue/drivers/redis.ts | 9 ++--- lib/queue/interfaces/job.ts | 31 +++++++++++++++ lib/queue/interfaces/options.ts | 11 +++++- lib/queue/interfaces/redisJob.ts | 7 ---- lib/queue/interfaces/sqsJob.ts | 7 ---- lib/queue/metadata.ts | 2 +- lib/queue/service.ts | 7 +++- lib/queue/strategy/driverJob.ts | 2 + lib/queue/strategy/message.ts | 4 +- lib/queue/workers/pollQueue.ts | 4 +- package-lock.json | 33 ++++++++-------- package.json | 2 + 15 files changed, 167 insertions(+), 41 deletions(-) create mode 100644 lib/queue/drivers/database.ts create mode 100644 lib/queue/interfaces/job.ts delete mode 100644 lib/queue/interfaces/redisJob.ts delete mode 100644 lib/queue/interfaces/sqsJob.ts diff --git a/lib/queue/core/payloadBuilder.ts b/lib/queue/core/payloadBuilder.ts index c09c780..9469caf 100644 --- a/lib/queue/core/payloadBuilder.ts +++ b/lib/queue/core/payloadBuilder.ts @@ -1,5 +1,7 @@ +import { ulid } from "ulid"; import { QueueMetadata } from "../metadata"; import { Message, JobOptions, InternalMessage } from "../strategy"; +import ms from "ms"; type Complete = { [P in keyof Required]: Pick extends Required> @@ -7,6 +9,19 @@ type Complete = { : T[P] | undefined; }; +const calculateDelay = (delay: number | string | Date): number => { + const now = Date.now(); + if (delay instanceof Date) { + const time = delay.getTime(); + return now > time ? now : time; + } + + const delayInMs = typeof delay === "string" ? ms(delay) : delay * 1000; + const calculatedDelay = now + delayInMs; + if (calculatedDelay < now) return now; + return calculatedDelay; +}; + export class PayloadBuilder { static build( message: Message, @@ -14,6 +29,7 @@ export class PayloadBuilder { ): Complete { const defaultOptions = QueueMetadata.getDefaultOptions(); const payload = { + id: ulid(), attemptCount: 0, ...defaultOptions, queue: undefined, @@ -21,6 +37,7 @@ export class PayloadBuilder { ...message, } as Complete; + payload.delay = calculateDelay(payload.delay || 0); payload.connection = payload.connection || defaultOptions.connection; if (!payload.queue) { diff --git a/lib/queue/drivers/database.ts b/lib/queue/drivers/database.ts new file mode 100644 index 0000000..09b7fa4 --- /dev/null +++ b/lib/queue/drivers/database.ts @@ -0,0 +1,66 @@ +import { ObjectionService } from "../../database"; +import { DbQueueDriverOptions } from "../interfaces"; +import { DbJob } from "../interfaces/job"; +import { InternalMessage } from "../strategy"; +import { PollQueueDriver } from "../strategy/pollQueueDriver"; + +export class DatabaseQueueDriver implements PollQueueDriver { + private client: any; + + constructor(private options: DbQueueDriverOptions) { + this.client = ObjectionService.connection(options?.connection); + } + + async init(): Promise { + return; + } + + async push(message: string, rawPayload: InternalMessage): Promise { + const {} = this.options; + await this.client + .insert({ + id: rawPayload.id, + queue: rawPayload.queue, + payload: message, + scheduledAt: rawPayload.delay, + }) + .into(this.options.table); + return; + } + + async pull(options: Record): Promise { + const messages = await this.client + .select("*") + .where("queue", options.queue) + .where("scheduled_at", "<=", Date.now()) + .from(this.options.table); + + return messages.map((m) => new DbJob(m)); + } + + async remove(job: DbJob, options: Record): Promise { + await this.client + .del() + .where("id", job.getId()) + .where("queue", options.queue) + .from(this.options.table); + return; + } + + async purge(options: Record): Promise { + await this.client + .del() + .where("queue", options.queue) + .from(this.options.table); + return; + } + + async count(options: Record): Promise { + const count = await this.client + .count("1") + .where("queue", options.queue) + .from(this.options.table); + + return count; + } +} diff --git a/lib/queue/drivers/index.ts b/lib/queue/drivers/index.ts index ba3219f..0946608 100644 --- a/lib/queue/drivers/index.ts +++ b/lib/queue/drivers/index.ts @@ -1,2 +1,4 @@ -export * from './sync'; -export * from './sqs'; +export * from "./sync"; +export * from "./sqs"; +export * from "./redis"; +export * from "./database"; diff --git a/lib/queue/drivers/redis.ts b/lib/queue/drivers/redis.ts index 0bdf6e7..0f03f49 100644 --- a/lib/queue/drivers/redis.ts +++ b/lib/queue/drivers/redis.ts @@ -1,10 +1,9 @@ import { InternalMessage } from "../strategy"; -import { RedisJob } from "../interfaces/redisJob"; import { RedisQueueOptionsDto } from "../schema"; import { PollQueueDriver } from "../strategy/pollQueueDriver"; -import { validateOptions } from "../../utils/helpers"; -import { Package } from "../../utils"; import { ulid } from "ulid"; +import { Package, validateOptions } from "../../utils"; +import { RedisJob } from "../interfaces/job"; const FIND_DELAYED_JOB = ` local source_key = KEYS[1] @@ -52,7 +51,7 @@ export class RedisQueueDriver implements PollQueueDriver { async init(): Promise {} async push(message: string, rawPayload: InternalMessage): Promise { - if ((rawPayload.delay || 0) > 0) { + if (rawPayload.delay > Date.now()) { await this.pushToDelayedQueue(message, rawPayload); return; } @@ -89,7 +88,7 @@ export class RedisQueueDriver implements PollQueueDriver { ): Promise { await this.client.zadd( this.getDelayedQueue(`${rawPayload.queue}`), - Date.now() + rawPayload.delay * 1000, + rawPayload.delay, this.getProcessedMessage(message) ); return; diff --git a/lib/queue/interfaces/job.ts b/lib/queue/interfaces/job.ts new file mode 100644 index 0000000..5bf9b95 --- /dev/null +++ b/lib/queue/interfaces/job.ts @@ -0,0 +1,31 @@ +import { DriverJob } from "../strategy"; + +export class RedisJob extends DriverJob { + public getId(): string { + return ""; + } + + public getMessage(): string { + return this.data.message; + } +} + +export class SqsJob extends DriverJob { + public getId(): string { + return ""; + } + + public getMessage(): string { + return this.data.Body; + } +} + +export class DbJob extends DriverJob { + public getId(): string { + return this.data.id; + } + + public getMessage(): string { + return this.data.payload; + } +} diff --git a/lib/queue/interfaces/options.ts b/lib/queue/interfaces/options.ts index 2a9eded..293f5be 100644 --- a/lib/queue/interfaces/options.ts +++ b/lib/queue/interfaces/options.ts @@ -40,6 +40,14 @@ export interface RedisQueueDriverOptions { prefix: string; } +export interface DbQueueDriverOptions { + listenerType: "poll"; + driver: "db"; + connection?: string; + table: string; + queue: string; +} + export interface QueueOptions { isGlobal?: boolean; default: string; @@ -48,7 +56,8 @@ export interface QueueOptions { | SyncQueueDriverOptions | SqsQueueDriverOptions | RedisQueueDriverOptions - | QueueDriverOptions; + | QueueDriverOptions + | DbQueueDriverOptions; }; } diff --git a/lib/queue/interfaces/redisJob.ts b/lib/queue/interfaces/redisJob.ts deleted file mode 100644 index eafc721..0000000 --- a/lib/queue/interfaces/redisJob.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { DriverJob } from '../strategy'; - -export class RedisJob extends DriverJob { - public getMessage(): string { - return this.data.message; - } -} diff --git a/lib/queue/interfaces/sqsJob.ts b/lib/queue/interfaces/sqsJob.ts deleted file mode 100644 index 0a8afc7..0000000 --- a/lib/queue/interfaces/sqsJob.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { DriverJob } from '../strategy'; - -export class SqsJob extends DriverJob { - public getMessage(): string { - return this.data.Body; - } -} diff --git a/lib/queue/metadata.ts b/lib/queue/metadata.ts index 839dbcb..aceb365 100644 --- a/lib/queue/metadata.ts +++ b/lib/queue/metadata.ts @@ -27,7 +27,7 @@ export class QueueMetadata { delay: 10, tries: 5, timeout: 30, - sleep: 5000, + sleep: 10000, }; } diff --git a/lib/queue/service.ts b/lib/queue/service.ts index 3c7d0a8..cfce54e 100644 --- a/lib/queue/service.ts +++ b/lib/queue/service.ts @@ -2,7 +2,11 @@ import { Injectable, Type } from "@nestjs/common"; import { QueueDriverOptions, QueueOptions } from "./interfaces"; import { QueueMetadata } from "./metadata"; import { IntentConfig } from "../config/service"; -import { SqsQueueDriver, SyncQueueDriver } from "./drivers"; +import { + DatabaseQueueDriver, + SqsQueueDriver, + SyncQueueDriver, +} from "./drivers"; import { Str } from "../utils/string"; import { InternalLogger } from "../utils/logger"; import { QueueDrivers } from "./strategy"; @@ -15,6 +19,7 @@ export class QueueService { sync: SyncQueueDriver, sqs: SqsQueueDriver, redis: RedisQueueDriver, + db: DatabaseQueueDriver, }; private static connections: Record = {}; diff --git a/lib/queue/strategy/driverJob.ts b/lib/queue/strategy/driverJob.ts index 1273b06..f7b469c 100644 --- a/lib/queue/strategy/driverJob.ts +++ b/lib/queue/strategy/driverJob.ts @@ -1,5 +1,7 @@ export abstract class DriverJob { constructor(public data: Record) {} + public abstract getId(): string; + public abstract getMessage(): string; } diff --git a/lib/queue/strategy/message.ts b/lib/queue/strategy/message.ts index 8490a7f..e4957c0 100644 --- a/lib/queue/strategy/message.ts +++ b/lib/queue/strategy/message.ts @@ -1,7 +1,7 @@ export type Payload = Record | string | number; export interface JobOptions { - delay?: number; + delay?: number | string | Date; tries?: number; queue?: string; timeout?: number; @@ -15,4 +15,6 @@ export interface Message extends JobOptions { export interface InternalMessage extends Message { attemptCount: number; + id: string; + delay?: number; } diff --git a/lib/queue/workers/pollQueue.ts b/lib/queue/workers/pollQueue.ts index e83465a..8306be2 100644 --- a/lib/queue/workers/pollQueue.ts +++ b/lib/queue/workers/pollQueue.ts @@ -11,6 +11,7 @@ import { JobStatusEnum } from "../constants"; import { JobMaxRetriesExceeed } from "../events/jobMaxRetries"; import { EmitsEvent } from "../../events"; import { logTime } from "../../utils/helpers"; +import { Obj } from "../../utils"; export class PollQueueWorker extends BaseQueueWorker { protected options: ListenerOptions; @@ -186,6 +187,7 @@ export class PollQueueWorker extends BaseQueueWorker { * @param job */ fetchMessage(job: DriverJob): InternalMessage { - return JSON.parse(job.getMessage()); + const message = job.getMessage(); + return Obj.isObj(message) ? message : JSON.parse(job.getMessage()); } } diff --git a/package-lock.json b/package-lock.json index 0d555c4..233c37c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@intentjs/core", - "version": "0.0.1-beta6", + "version": "0.1.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@intentjs/core", - "version": "0.0.1-beta6", + "version": "0.1.2", "license": "MIT", "dependencies": { "@nestjs/config": "^3.2.0", @@ -24,6 +24,7 @@ "fs-extra": "^11.1.1", "ioredis": "^5.3.2", "knex": "^3.1.0", + "ms": "^2.1.3", "mute-stdout": "^2.0.0", "objection": "^3.1.4", "picocolors": "^1.0.0", @@ -42,6 +43,7 @@ "@types/inquirer": "^9.0.7", "@types/jest": "^29.5.12", "@types/lodash": "^4.14.202", + "@types/ms": "^0.7.34", "@types/node": "^18.14.6", "jest": "^29.7.0", "reflect-metadata": "^0.1.13", @@ -3491,6 +3493,12 @@ "integrity": "sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "node_modules/@types/node": { "version": "18.19.41", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.41.tgz", @@ -4810,6 +4818,11 @@ "ms": "2.0.0" } }, + "node_modules/debug/node_modules/ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" + }, "node_modules/dedent": { "version": "1.5.3", "resolved": "https://registry.npmjs.org/dedent/-/dedent-1.5.3.tgz", @@ -7697,11 +7710,6 @@ "node": ">=0.1.90" } }, - "node_modules/logform/node_modules/ms": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" - }, "node_modules/loose-envify": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", @@ -7908,9 +7916,9 @@ } }, "node_modules/ms": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, "node_modules/multer": { "version": "1.4.4-lts.1", @@ -9450,11 +9458,6 @@ "node": ">= 0.8.0" } }, - "node_modules/send/node_modules/ms": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" - }, "node_modules/serialize-javascript": { "version": "6.0.2", "resolved": "https://registry.npmjs.org/serialize-javascript/-/serialize-javascript-6.0.2.tgz", diff --git a/package.json b/package.json index cbc0a2d..cffb7f3 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@types/inquirer": "^9.0.7", "@types/jest": "^29.5.12", "@types/lodash": "^4.14.202", + "@types/ms": "^0.7.34", "@types/node": "^18.14.6", "jest": "^29.7.0", "reflect-metadata": "^0.1.13", @@ -57,6 +58,7 @@ "fs-extra": "^11.1.1", "ioredis": "^5.3.2", "knex": "^3.1.0", + "ms": "^2.1.3", "mute-stdout": "^2.0.0", "objection": "^3.1.4", "picocolors": "^1.0.0", From e687207d7673df72599f6b5d782225215ef5b3c3 Mon Sep 17 00:00:00 2001 From: Vinayak Sarawagi Date: Sun, 1 Sep 2024 00:45:59 +0530 Subject: [PATCH 2/4] formatting fix --- lib/queue/drivers/database.ts | 28 ++++++++++++++-------------- lib/queue/drivers/index.ts | 8 ++++---- lib/queue/interfaces/job.ts | 6 +++--- lib/queue/interfaces/options.ts | 4 ++-- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/lib/queue/drivers/database.ts b/lib/queue/drivers/database.ts index 09b7fa4..b13f79a 100644 --- a/lib/queue/drivers/database.ts +++ b/lib/queue/drivers/database.ts @@ -1,8 +1,8 @@ -import { ObjectionService } from "../../database"; -import { DbQueueDriverOptions } from "../interfaces"; -import { DbJob } from "../interfaces/job"; -import { InternalMessage } from "../strategy"; -import { PollQueueDriver } from "../strategy/pollQueueDriver"; +import { ObjectionService } from '../../database'; +import { DbQueueDriverOptions } from '../interfaces'; +import { DbJob } from '../interfaces/job'; +import { InternalMessage } from '../strategy'; +import { PollQueueDriver } from '../strategy/pollQueueDriver'; export class DatabaseQueueDriver implements PollQueueDriver { private client: any; @@ -30,19 +30,19 @@ export class DatabaseQueueDriver implements PollQueueDriver { async pull(options: Record): Promise { const messages = await this.client - .select("*") - .where("queue", options.queue) - .where("scheduled_at", "<=", Date.now()) + .select('*') + .where('queue', options.queue) + .where('scheduled_at', '<=', Date.now()) .from(this.options.table); - return messages.map((m) => new DbJob(m)); + return messages.map(m => new DbJob(m)); } async remove(job: DbJob, options: Record): Promise { await this.client .del() - .where("id", job.getId()) - .where("queue", options.queue) + .where('id', job.getId()) + .where('queue', options.queue) .from(this.options.table); return; } @@ -50,15 +50,15 @@ export class DatabaseQueueDriver implements PollQueueDriver { async purge(options: Record): Promise { await this.client .del() - .where("queue", options.queue) + .where('queue', options.queue) .from(this.options.table); return; } async count(options: Record): Promise { const count = await this.client - .count("1") - .where("queue", options.queue) + .count('1') + .where('queue', options.queue) .from(this.options.table); return count; diff --git a/lib/queue/drivers/index.ts b/lib/queue/drivers/index.ts index 0946608..add8456 100644 --- a/lib/queue/drivers/index.ts +++ b/lib/queue/drivers/index.ts @@ -1,4 +1,4 @@ -export * from "./sync"; -export * from "./sqs"; -export * from "./redis"; -export * from "./database"; +export * from './sync'; +export * from './sqs'; +export * from './redis'; +export * from './database'; diff --git a/lib/queue/interfaces/job.ts b/lib/queue/interfaces/job.ts index 5bf9b95..d480611 100644 --- a/lib/queue/interfaces/job.ts +++ b/lib/queue/interfaces/job.ts @@ -1,8 +1,8 @@ -import { DriverJob } from "../strategy"; +import { DriverJob } from '../strategy'; export class RedisJob extends DriverJob { public getId(): string { - return ""; + return ''; } public getMessage(): string { @@ -12,7 +12,7 @@ export class RedisJob extends DriverJob { export class SqsJob extends DriverJob { public getId(): string { - return ""; + return ''; } public getMessage(): string { diff --git a/lib/queue/interfaces/options.ts b/lib/queue/interfaces/options.ts index 75e1cdc..df8d0e3 100644 --- a/lib/queue/interfaces/options.ts +++ b/lib/queue/interfaces/options.ts @@ -41,8 +41,8 @@ export interface RedisQueueDriverOptions { } export interface DbQueueDriverOptions { - listenerType: "poll"; - driver: "db"; + listenerType: 'poll'; + driver: 'db'; connection?: string; table: string; queue: string; From fe30dcfc16c6ba0f667da76c3aca0938dc49f8d5 Mon Sep 17 00:00:00 2001 From: Vinayak Sarawagi Date: Sun, 1 Sep 2024 00:47:04 +0530 Subject: [PATCH 3/4] fix linting issues --- lib/queue/drivers/database.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/queue/drivers/database.ts b/lib/queue/drivers/database.ts index b13f79a..1a27d73 100644 --- a/lib/queue/drivers/database.ts +++ b/lib/queue/drivers/database.ts @@ -16,7 +16,6 @@ export class DatabaseQueueDriver implements PollQueueDriver { } async push(message: string, rawPayload: InternalMessage): Promise { - const {} = this.options; await this.client .insert({ id: rawPayload.id, @@ -25,7 +24,6 @@ export class DatabaseQueueDriver implements PollQueueDriver { scheduledAt: rawPayload.delay, }) .into(this.options.table); - return; } async pull(options: Record): Promise { @@ -34,7 +32,6 @@ export class DatabaseQueueDriver implements PollQueueDriver { .where('queue', options.queue) .where('scheduled_at', '<=', Date.now()) .from(this.options.table); - return messages.map(m => new DbJob(m)); } @@ -44,7 +41,6 @@ export class DatabaseQueueDriver implements PollQueueDriver { .where('id', job.getId()) .where('queue', options.queue) .from(this.options.table); - return; } async purge(options: Record): Promise { @@ -52,15 +48,12 @@ export class DatabaseQueueDriver implements PollQueueDriver { .del() .where('queue', options.queue) .from(this.options.table); - return; } async count(options: Record): Promise { - const count = await this.client + return await this.client .count('1') .where('queue', options.queue) .from(this.options.table); - - return count; } } From df1e8fb1cca39704681a5c2ff280620d4316c890 Mon Sep 17 00:00:00 2001 From: Vinayak Sarawagi Date: Mon, 16 Sep 2024 22:48:26 +0530 Subject: [PATCH 4/4] fix sqs driver bug --- lib/queue/drivers/sqs.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/queue/drivers/sqs.ts b/lib/queue/drivers/sqs.ts index 1614d8d..1531f89 100644 --- a/lib/queue/drivers/sqs.ts +++ b/lib/queue/drivers/sqs.ts @@ -1,6 +1,6 @@ import { Package } from '../../utils'; import { joinUrl, validateOptions } from '../../utils/helpers'; -import { SqsJob } from '../interfaces/sqsJob'; +import { SqsJob } from '../interfaces/job'; import { SqsQueueOptionsDto } from '../schema'; import { InternalMessage } from '../strategy'; import { PollQueueDriver } from '../strategy/pollQueueDriver';