diff --git a/.gitignore b/.gitignore index 5e9bc25..e66c271 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ node_modules coverage -dist \ No newline at end of file +dist + +.github/copilot-instructions.md +.gitconfig \ No newline at end of file diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..0b26ab2 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,6 @@ +{ + "trailingComma": "all", + "tabWidth": 2, + "singleQuote": true, + "semi": false +} diff --git a/eslint.config.mjs b/eslint.config.mjs index 7d3e80e..d5215d3 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -1,9 +1,12 @@ // @ts-check -import eslint from '@eslint/js'; -import tseslint from 'typescript-eslint'; +import eslint from '@eslint/js' +import tseslint from 'typescript-eslint' export default tseslint.config( eslint.configs.recommended, ...tseslint.configs.recommended, -); \ No newline at end of file + { + ignores: ['node_modules/**', 'dist/**'], + }, +) diff --git a/package.json b/package.json index c5c548f..2f7196f 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "", "description": "Queue over Nats", "main": "dist/index.js", + "types": "dist/index.d.ts", "repository": "https://github.com/arusakov/node-nats-queue.git", "author": "Aleksandr Rusakov ", "license": "MIT", @@ -13,18 +14,15 @@ "build": "tsc -p .", "compile": "tsc --noEmit -p ./test", "lint": "eslint .", - "test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/*.test.ts", + "test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/**/*.test.ts", "test:coverage:html": "c8 --reporter=html --reporter=text yarn test:all", "test:coverage": "c8 --reporter=lcovonly --reporter=text yarn test:all", "test": "node --test --test-concurrency=1 --require=ts-node/register" }, "devDependencies": { "@eslint/js": "9.11.0", - "@nats-io/jetstream": "3.0.0-10", - "@nats-io/nats-core": "3.0.0-27", - "@nats-io/transport-node": "3.0.0-12", "@types/eslint__js": "8.42.3", - "@types/node": "^20.0.0", + "@types/node": "22.0.0", "c8": "10.1.2", "eslint": "9.11.0", "ts-node": "10.9.2", @@ -33,5 +31,11 @@ }, "engines": { "node": ">=20.0.0" + }, + "dependencies": { + "@nats-io/kv": "3.0.2", + "@nats-io/jetstream": "3.0.2", + "@nats-io/nats-core": "3.0.2", + "@nats-io/transport-node": "3.0.2" } -} \ No newline at end of file +} diff --git a/src/index.ts b/src/index.ts index 7715c7f..6997e71 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,206 +1,3 @@ -import EventEmitter from 'events' - -import { AckPolicy } from '@nats-io/jetstream' -import { nanos, NatsError } from '@nats-io/nats-core' -import type { JetStreamClient, Consumer, JsMsg} from '@nats-io/jetstream' -import type { MsgHdrs } from '@nats-io/nats-core' - -import { createSubject, sleep } from './utils' -import { FixedWindowLimiter, IntervalLimiter, type Limiter } from './limiter' - - -export type QueueOpts - = { - client: JetStreamClient - name: string - - /** - * ms - */ - deduplicateWindow?: number -} - -export type Job = { - id?: number | string - name: string - data: unknown -} - -export type AddOptions = { - id?: string - headers?: MsgHdrs -} - -export const DEFAULT_DEDUPLICATE_WINDOW = 2_000 - -export class Queue { - protected readonly client: JetStreamClient - protected readonly name: string - - protected readonly deduplicateWindow: number - - constructor(opts: QueueOpts) { - this.client = opts.client - this.name = opts.name - - this.deduplicateWindow = opts.deduplicateWindow || DEFAULT_DEDUPLICATE_WINDOW - } - - async setup() { - const manager = await this.client.jetstreamManager() - - try { - await manager.streams.add({ - name: this.name, - subjects: [`${this.name}.*`], - duplicate_window: nanos(this.deduplicateWindow) - }) - } catch (e) { - // TODO smart error handling - if (!(e instanceof NatsError)) { - throw e - } - await manager.streams.update( - this.name, - { - subjects: [`${this.name}.*`], - duplicate_window: nanos(this.deduplicateWindow) - } - ) - } - - } - - async add(name: string, data?: unknown, options?: AddOptions) { - const payload = JSON.stringify(data) - return this.client.publish(`${this.name}.${name}`, payload, options && { - msgID: options.id, - headers: options.headers - }) - } -} - -export type RateLimit = { - duration: number - max: number -} - -export type WorkerOpts = { - client: JetStreamClient - name: string - processor: (job: JsMsg) => Promise - concurrency?: number - rateLimit?: RateLimit -} - -export class Worker extends EventEmitter { - protected readonly client: JetStreamClient - protected readonly name: string - protected readonly processor: (job: JsMsg) => Promise - protected readonly concurrency: number - protected readonly limiter: Limiter - protected readonly fetchInterval: number - protected readonly fetchTimeout: number - - protected consumer: Consumer | null = null - protected running = false - protected processingNow = 0 - protected loopPromise: Promise | null = null - - constructor(opts: WorkerOpts) { - super() - - this.client = opts.client - this.name = opts.name - this.processor = opts.processor - this.concurrency = opts.concurrency || 1 - - this.fetchInterval = 150 - this.fetchTimeout = 3_000 - this.limiter = opts.rateLimit ? - new FixedWindowLimiter(opts.rateLimit.max, opts.rateLimit.duration, this.fetchInterval) : - new IntervalLimiter(this.fetchInterval) - } - - async setup() { - const manager = await this.client.jetstreamManager() - - try { - await manager.streams.add({ - name: this.name, - subjects: [createSubject(this.name)], - }) - } catch (e) { - if (!(e instanceof NatsError && e.api_error?.err_code === 10058)) { - throw e - } - } - - await manager.consumers.add(this.name, { - durable_name: this.name, - ack_policy: AckPolicy.All, - }) - - this.consumer = await this.client.consumers.get(this.name, this.name) - } - - async stop() { - this.running = false - - if (this.loopPromise) { - await this.loopPromise - } - while (this.processingNow > 0) { - await sleep(this.fetchInterval) - } - } - - start() { - if (!this.consumer) { - throw new Error('call setup() before start()') - } - - if (!this.loopPromise) { - this.running = true - this.loopPromise = this.loop() - } - } - - protected async loop() { - while (this.running) { - const max = this.limiter.get(this.concurrency - this.processingNow) - const jobs = await this.fetch(max) - - for await (const j of jobs) { - this.limiter.inc() - this.process(j) // without await! - } - - await sleep(this.limiter.timeout()) - } - } - - protected async process(j: JsMsg) { - this.processingNow += 1 - try { - this.process(j) - await j.ackAck() - } catch (e) { - await j.term() - } finally { - this.processingNow -= 1 - } - } - - protected async fetch(count: number) { - try { - return this.consumer!.fetch({ - max_messages: count, - expires: this.fetchTimeout - }) - } catch (e) { - // TODO - return [] - } - } -} \ No newline at end of file +export * from './queue' +export * from './job' +export * from './worker' diff --git a/src/job.ts b/src/job.ts new file mode 100644 index 0000000..92a3d92 --- /dev/null +++ b/src/job.ts @@ -0,0 +1,29 @@ +import { JobCreateData } from './types' + +export class Job { + id: string + name: string + meta: { + failed: boolean + startTime: number + retryCount: number + timeout: number + // parentId?: string + } + data: unknown + queueName: string + + constructor(data: JobCreateData) { + this.id = data.id ?? `${crypto.randomUUID()}_${Date.now()}` + this.queueName = data.queueName + this.name = data.name + this.data = data.data + this.meta = { + retryCount: 0, + startTime: Date.now() + (data.delay ?? 0), + failed: false, + // TODO: Is this correct? + timeout: data.timeout ?? 0, + } + } +} diff --git a/src/jobEvent.ts b/src/jobEvent.ts new file mode 100644 index 0000000..a7703c3 --- /dev/null +++ b/src/jobEvent.ts @@ -0,0 +1,20 @@ +type Event = { + event: T + data: D +} + +export type JobCompletedEvent = Event<'JOB_COMPLETED', { jobId: string }> +export type JobFailedEvent = Event<'JOB_FAILED', { jobId: string }> +// export type JobChildCompletedEvent = Event< +// 'JOB_CHILD_COMPLETED', +// { childId: string; parentId: string } +// > +// export type JobChildFailedEvent = Event< +// 'JOB_CHILD_FAILED', +// { childId: string; parentId: string } +// > + +export type JobEvent = + // | JobChildFailedEvent + // | JobChildCompletedEvent + JobCompletedEvent | JobFailedEvent diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..42fe023 --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,148 @@ +import { + jetstream, + JetStreamApiError, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { Job } from './job' +import { headers, nanos, NatsConnection } from '@nats-io/nats-core' + +const DEFAULT_DEDUPLICATE_WINDOW = 2000 +const MIN_DUPLICATE_WINDOW = 100 + +export type QueueOpts = { + connection: NatsConnection + name: string + priorities: number + duplicateWindow: number + client: JetStreamClient +} + +export class Queue { + protected name: string + private priorities: number + protected connection: NatsConnection + private client: JetStreamClient + protected manager: JetStreamManager | null = null + protected duplicateWindow: number + + constructor({ + client, + name, + priorities = 1, + connection, + duplicateWindow = DEFAULT_DEDUPLICATE_WINDOW, + }: QueueOpts) { + if (!name) { + throw new Error("Parameter 'name' cannot be empty") + } + if (priorities <= 0) { + throw new Error("Parameter 'priorities' must be greater than 0") + } + if (duplicateWindow < MIN_DUPLICATE_WINDOW) { + throw new Error( + `Parameter 'duplicateWindow' must be more than or equal to ${MIN_DUPLICATE_WINDOW}`, + ) + } + + this.client = client + this.name = name + this.priorities = priorities + this.connection = connection + this.duplicateWindow = duplicateWindow + + console.log( + `Queue initialized with name=${this.name}, priorities=${this.priorities}`, + ) + } + + public async setup(): Promise { + try { + const jetstreamClient = jetstream(this.connection) + this.manager = await jetstreamClient.jetstreamManager() + + const subjects = [`${this.name}.*`] + await this.manager.streams.add({ + name: this.name, + subjects: subjects, + duplicate_window: nanos(this.duplicateWindow), + }) + console.log(`Stream '${this.name}' created successfully.`) + } catch (e) { + if (e instanceof JetStreamApiError) { + const jsError = e.apiError() + if (jsError.err_code !== 10058) { + throw e + } + + console.error( + `Stream '${this.name}' already exists. Attempting to update`, + ) + await this.manager!.streams.update(this.name, { + subjects: [`${this.name}.*`], + duplicate_window: nanos(this.duplicateWindow), + }) + console.log(`Stream '${this.name}' updated successfully.`) + } else { + console.error(`Error connecting to JetStream: ${e}`) + throw e + } + } + } + + // TODO: I think this is not needed + public async close(): Promise { + try { + if (this.connection.isClosed()) { + console.error('Connection to NATS already closed.') + return + } + await this.connection.close() + console.log('Connection to NATS closed.') + } catch (e) { + // TODO: Connection closed error + if (e instanceof Error) { + console.error('Connection to NATS already closed.') + } else { + throw e + } + } + } + + public async addJob(job: Job, priority: number = 1): Promise { + if (this.connection.isClosed()) { + throw new Error('Cannot add job when NATS connection is closed.') + } + if (!this.manager) { + throw new Error('Call setup before creating a new job') + } + + if (priority >= this.priorities) { + priority = this.priorities + } else if (priority <= 0) { + priority = 1 + } + + try { + const jobData = JSON.stringify(job) + const msgHeaders = headers() + msgHeaders.set('Nats-Msg-Id', job.id) + + await this.client.publish(`${job.queueName}.${priority}`, jobData, { + headers: msgHeaders, + }) + console.log( + `JobData ID=${job.id} added successfully. Subject: ${job.queueName}.${priority}`, + ) + } catch (e) { + console.error(`Failed to add job ID=${job.id}: ${e}`) + throw e + } + } + + public async addJobs(jobs: Job[], priority: number = 1): Promise { + for (const job of jobs) { + await this.addJob(job, priority) + } + } +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..d758aa8 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,30 @@ +// import { Job } from './job' + +// export type ParentJob = Job & { +// childrenCount: number +// } + +export type JobCreateData = { + name: string + data: unknown + queueName: string + id?: string + delay?: number + timeout?: number +} + +// export type FlowJobCreateData = { +// job: JobCreateData +// children?: FlowJobCreateData[] +// } + +export type RateLimit = { + duration: number + max: number +} + +// export type DependenciesKVValue = ParentJob + +// export type ChildToParentsKVValue = { +// parentIds: string[] +// } diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 0000000..53f0ce9 --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,487 @@ +import { + JetStreamClient, + JsMsg, + Consumer, + AckPolicy, + JetStreamManager, +} from '@nats-io/jetstream' +import { RateLimit } from './types' +import { Limiter, FixedWindowLimiter, IntervalLimiter } from './limiter' +import { sleep } from './utils' +import { headers, TimeoutError } from '@nats-io/nats-core' +import { Job } from './job' +import { + // JobChildCompletedEvent, + // JobChildFailedEvent, + JobCompletedEvent, + JobEvent, + JobFailedEvent, +} from './jobEvent' + +// TODO: Maybe add Pino logger + +export type WorkerOpts = { + client: JetStreamClient + name: string + processor: (job: JsMsg, timeout: number) => Promise + concurrency?: number + rateLimit?: RateLimit + priorityQuota?: Map< + number, + { + quota: number + } + > + maxRetries?: number + priorities?: number + fetchInterval?: number + fetchTimeout?: number +} + +export class Worker { + protected readonly client: JetStreamClient + protected readonly name: string + protected readonly processor: (job: JsMsg, timeout: number) => Promise + protected readonly concurrency: number + protected readonly limiter: Limiter + protected readonly fetchInterval: number + protected readonly fetchTimeout: number + protected readonly maxRetries: number + protected readonly priorities: number + + protected manager: JetStreamManager | null = null + protected consumers: Consumer[] = [] + protected consumer: Consumer | null = null + protected running = false + protected processingNow = 0 + protected loopPromise: Promise | null = null + protected workerEventsLoopPromise: Promise | null = null + protected priorityQuota?: Map< + number, + { + quota: number + counter: number + } + > = new Map() + protected jobCompletedConsumer: Consumer | null = null + protected jobFailedConsumer: Consumer | null = null + + constructor(opts: WorkerOpts) { + this.client = opts.client + this.name = opts.name + this.processor = opts.processor + this.concurrency = opts.concurrency || 1 + this.maxRetries = opts.maxRetries || 3 + this.priorities = opts.priorities || 1 + + this.fetchInterval = opts.fetchInterval ?? 150 + this.fetchTimeout = opts.fetchTimeout ?? 3_000 + this.limiter = opts.rateLimit + ? new FixedWindowLimiter( + opts.rateLimit.max, + opts.rateLimit.duration, + this.fetchInterval, + ) + : new IntervalLimiter(this.fetchInterval) + + this.priorityQuota = opts.priorityQuota + ? new Map( + Array.from(opts.priorityQuota.entries()).map(([priority, item]) => { + return [ + priority, + { + quota: item.quota, + counter: 0, + }, + ] + }), + ) + : undefined + } + + public async setup() { + try { + this.manager = await this.client.jetstreamManager() + // TODO: Rename + this.consumers = await this.setupConsumers() + + await this.setupInternalQueues(this.manager) + await this.setupInternalQueuesConsumers(this.manager) + } catch (e) { + // TODO: Error handling? + console.error( + `Error while setting up worker: ${this.name} with error: ${e}`, + ) + throw e + } + } + + private async setupInternalQueues(manager: JetStreamManager) { + // TODO: What about deduplication + // Create streams for completed jobs and failed jobs + + // Completed queue + await manager.streams.add({ + name: `${this.name}_completed`, + subjects: [`${this.name}_completed`], + }) + + // Failed queue + await manager.streams.add({ + name: `${this.name}_failed`, + subjects: [`${this.name}_failed`], + }) + } + + private async setupInternalQueuesConsumers(manager: JetStreamManager) { + // TODO: What about deduplication + // Create streams for completed jobs and failed jobs + + await manager.consumers.add(`${this.name}_completed`, { + filter_subject: `${this.name}_completed`, + name: `job_completed_consumer`, + durable_name: `job_completed_consumer`, + ack_policy: AckPolicy.All, + }) + + await manager.consumers.add(`${this.name}_failed`, { + filter_subject: `${this.name}_failed`, + name: `job_failed_consumer`, + durable_name: `job_failed_consumer`, + ack_policy: AckPolicy.All, + }) + + this.jobCompletedConsumer = await this.client.consumers.get( + `${this.name}_completed`, + `job_completed_consumer`, + ) + this.jobFailedConsumer = await this.client.consumers.get( + `${this.name}_failed`, + `job_failed_consumer`, + ) + } + + private async publishJobCompletedEvent(job: Job) { + const subject = `${this.name}_completed` + const messageHeaders = headers() + messageHeaders.set('Nats-Msg-Id', crypto.randomUUID()) + const event: JobCompletedEvent = { + event: 'JOB_COMPLETED', + data: { + jobId: job.id, + }, + } + await this.client.publish(subject, JSON.stringify(event), { + headers: messageHeaders, + }) + } + + private async publishJobFailedEvent(job: Job) { + const subject = `${this.name}_failed` + const messageHeaders = headers() + messageHeaders.set('Nats-Msg-Id', crypto.randomUUID()) + const event: JobFailedEvent = { + event: 'JOB_FAILED', + data: { + jobId: job.id, + }, + } + await this.client.publish(subject, JSON.stringify(event), { + headers: messageHeaders, + }) + } + + private async setupConsumers(): Promise { + const consumers: Consumer[] = [] + for (let i = 1; i <= this.priorities; i++) { + // TODO: Naming might be wrong, independent of the queue name + const consumerName = `worker_group_${i}` + const subject = `${this.name}.${i}` + try { + await this.manager!.consumers.add(this.name, { + filter_subject: subject, + name: consumerName, + durable_name: consumerName, + ack_policy: AckPolicy.All, + }) + try { + const consumer = await this.client.consumers.get( + this.name, + consumerName, + ) + console.log( + `Consumer: name=${consumerName} successfully subscribed to topic ${subject} in queue ${this.name}`, + ) + consumers.push(consumer) + } catch (e) { + console.error('Error while getting consumer:', e) + throw e + } + } catch (e) { + console.error( + `Consumer: name=${this.name} error while subscribing to topic ${subject}: ${e}`, + ) + throw e + } + } + return consumers + } + + public async stop() { + this.running = false + + if (this.loopPromise) { + await this.loopPromise + await this.workerEventsLoopPromise + } + while (this.processingNow > 0) { + await sleep(this.fetchInterval) + } + } + + start() { + if (this.consumers.length === 0) { + throw new Error('call setup() before start()') + } + + if (!this.loopPromise) { + this.running = true + this.loopPromise = this.loop() + this.workerEventsLoopPromise = this.workerEventsLoop() + } + } + + private resetQuotesCounter() { + for (const [, item] of this.priorityQuota!.entries()) { + item.counter = 0 + } + } + + private handleQuota(consumerPriority: number) { + const priorityItem = this.priorityQuota!.get(consumerPriority) + const currentQuota = priorityItem!.quota + const currentCounter = priorityItem!.counter + + if (currentQuota - currentCounter <= 0) { + if (consumerPriority < this.priorities) { + console.debug( + `Skip consumer_priority=${consumerPriority} due to quota overruns`, + ) + return true + } else { + console.debug( + `Reset counters when quota is exceeded for consumer_priority=${consumerPriority}`, + ) + this.resetQuotesCounter() + return false + } + } + return null + } + + protected async loop() { + while (this.running) { + let jobs: JsMsg[] = [] + let consumerPriority = 0 + for (let i = 0; i < this.consumers.length; i++) { + consumerPriority = i + 1 + + if (this.priorityQuota) { + const isQuotaMet = this.handleQuota(consumerPriority) + if (isQuotaMet !== null) { + if (isQuotaMet) continue + else break + } + } + + const maxJobs = this.limiter.get(this.concurrency - this.processingNow) + if (maxJobs <= 0) break + + jobs = await this.fetch(this.consumers[i], maxJobs) + if (jobs.length > 0) break + } + + for (const j of jobs) { + if (this.priorityQuota) + this.priorityQuota.get(consumerPriority)!.counter += 1 + + this.limiter.inc() + this.processTask(j) + } + + await sleep(this.limiter.timeout()) + } + } + + protected async workerEventsLoop() { + while (this.running) { + const [jobCompletedEvents, jobFailedEvents] = await Promise.all([ + this.fetch(this.jobCompletedConsumer!, 1), + this.fetch(this.jobFailedConsumer!, 1), + ]) + + jobCompletedEvents.forEach((j) => this.processEventMessage(j)) + jobFailedEvents.forEach((j) => this.processEventMessage(j)) + await sleep(100) + } + } + + protected async processEventMessage(j: JsMsg) { + const event: JobEvent = j.json() + console.log('Processing event:', event) + + if (event.event === 'JOB_COMPLETED') { + // await this.processJobCompletedEvent(event) + } else if (event.event === 'JOB_FAILED') { + // await this.processJobFailedEvent(event) + } + // else if (event.event === 'JOB_CHILD_COMPLETED') { + // // await this.processChildJobCompletedEvent(event) + // } else if (event.event === 'JOB_CHILD_FAILED') { + // // await this.processChildJobFailedEvent(event) + // } + else { + console.error('Unknown event:', event) + } + + await j.ackAck() + console.log('Processing event finished:', event) + } + + // protected async processJobCompletedEvent( + // _jobCompletedEvent: JobCompletedEvent, + // ) { + // return + // } + + // protected async processJobFailedEvent(_event: JobFailedEvent) { + // return + // } + + // protected async processChildJobCompletedEvent( + // _childJobCompletedEvent: JobChildCompletedEvent, + // ) { + // return + // } + + // protected async processChildJobFailedEvent( + // _childJobCompletedEvent: JobChildFailedEvent, + // ) { + // return + // } + + protected async processTask(j: JsMsg) { + this.processingNow += 1 + const data: Job = j.json() + try { + if (data.meta.failed) { + await j.term() + } + + const jobStartTime = data.meta.startTime + const now = Date.now() + if (jobStartTime > now) { + const delay = jobStartTime - now + await j.nak(delay) + console.debug( + `Job: name=${data.name} id=${data.id} is scheduled later. Requeueing in ${delay} seconds`, + ) + return + } + + if (data.meta.retryCount >= this.maxRetries) { + await j.term() + console.error( + `Job: name=${data.name} id=${data.id} failed max retries exceeded`, + ) + + await this.publishJobFailedEvent(data) + return + } + + console.log( + `Job: name=${data.name} id=${data.id} is started with data=${data.data} in queue=${data.queueName}`, + ) + + const timeout = data.meta.timeout + await this.processor(j, timeout) + + await j.ackAck() + console.log(`Job: name=${data.name} id=${data.id} is completed`) + + await this.publishJobCompletedEvent(data) + } catch (e) { + if (e instanceof TimeoutError) { + console.error( + `Job: name=${data.name} id=${data.id} TimeoutError start retry`, + ) + } else { + console.error( + `Error while processing job id=${data.id}: "${e}" start retry`, + ) + } + + const newId = `${crypto.randomUUID()}_${Date.now()}` + data.meta.retryCount += 1 + data.id = newId + + const jobBytes = JSON.stringify(data) + await j.term() + const messageHeaders = headers() + messageHeaders.set('Nats-Msg-Id', newId) + await this.client.publish(j.subject, jobBytes, { + headers: messageHeaders, + }) + } finally { + this.processingNow -= 1 + } + } + + // protected async publishParentJob(parentJobData: Job): Promise { + // const subject = `${parentJobData.queueName}.1` + // const jobBytes = JSON.stringify(parentJobData) + // const msgHeaders = headers() + // msgHeaders.set('Nats-Msg-Id', parentJobData.id) + // await this.client.publish(subject, jobBytes, { + // headers: msgHeaders, + // }) + // console.log( + // `ParentJob: name=${parentJobData.name} id=${parentJobData.id} added to topic=${subject} successfully`, + // ) + // } + + protected async fetch(consumer: Consumer, count: number): Promise { + // TODO: Maybe fail to fetch consumer info + const consumerInfo = await consumer.info() + try { + const msgs = await consumer.fetch({ + max_messages: count, + expires: this.fetchTimeout, + }) + const awaitedMessages: JsMsg[] = [] + + for await (const msg of msgs) { + awaitedMessages.push(msg) + } + + console.debug( + `Consumer: name=${consumerInfo.name} fetched ${awaitedMessages.length} messages from queue=${this.name}`, + ) + + return awaitedMessages + } catch (e) { + if (e instanceof TimeoutError) { + console.debug( + `Consumer: name=${consumerInfo.name} timeout while fetching messages`, + ) + return [] + } + + console.error( + `Consumer: name=${consumerInfo.name} error while fetching messages from queue=${this.name}: ${e}`, + ) + // TODO: Handle other errors + throw e + } + } +} diff --git a/test/helpers/deleteAllKV.ts b/test/helpers/deleteAllKV.ts new file mode 100644 index 0000000..abb31ed --- /dev/null +++ b/test/helpers/deleteAllKV.ts @@ -0,0 +1,16 @@ +import { Kvm } from '@nats-io/kv' +import { NatsConnection } from '@nats-io/nats-core' + +export const deleteAllKV = async (nc: NatsConnection) => { + const kvm = new Kvm(nc) + const kvStores = await kvm.list() + for await (const store of kvStores) { + try { + const kv = await kvm.open(store.bucket) + await kv.destroy() + console.log(`Deleted KV store: ${store.bucket}`) + } catch { + console.log(`Failed to delete KV store: ${store.bucket}`) + } + } +} diff --git a/test/helpers/deleteAllStreams.ts b/test/helpers/deleteAllStreams.ts new file mode 100644 index 0000000..224e328 --- /dev/null +++ b/test/helpers/deleteAllStreams.ts @@ -0,0 +1,13 @@ +import { JetStreamManager } from '@nats-io/jetstream' + +export const deleteAllStreams = async (jsm: JetStreamManager) => { + const streams = await jsm.streams.list() + for await (const stream of streams) { + try { + await jsm.streams.delete(stream.config.name) + console.log(`Deleted stream: ${stream.config.name}`) + } catch { + console.log(`Failed to delete stream: ${stream.config.name}`) + } + } +} diff --git a/test/queue-add.test.ts b/test/queue-add.test.ts deleted file mode 100644 index b728add..0000000 --- a/test/queue-add.test.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { equal } from 'assert/strict' -import { describe, it, before, after, afterEach, beforeEach } from 'node:test' - -import { connect } from '@nats-io/transport-node' -import { jetstream } from '@nats-io/jetstream' -import { NatsConnection } from '@nats-io/nats-core' -import type { JetStreamClient, JetStreamManager } from '@nats-io/jetstream' - -import { Queue, Worker } from '../src' - -describe('Queue.add()', () => { - let connection: NatsConnection - let client: JetStreamClient - let manager: JetStreamManager - let queue: Queue - - const QUEUE_NAME_1 = 'queue1' - const JOB_NAME_1 = 'job1' - - before(async () => { - connection = await connect({ - servers: '127.0.0.1:4222', - }) - client = jetstream(connection) - manager = await client.jetstreamManager() - }) - - beforeEach(async () => { - queue = new Queue({ - client, - name: QUEUE_NAME_1, - }) - - await queue.setup() - }) - - afterEach(async () => { - await manager.streams.delete(QUEUE_NAME_1) - }) - - after(async () => { - await connection.close() - }) - - it('OK', async () => { - const ack = await queue.add(JOB_NAME_1) - - equal(ack.duplicate, false) - equal(ack.seq, 1) - - const stream = await client.streams.get(QUEUE_NAME_1) - - const { state: { messages } } = await stream.info() - equal(messages, 1) - }) - - it('OK with payload', async () => { - const ack = await queue.add(JOB_NAME_1, { x: 1 }) - - equal(ack.duplicate, false) - equal(ack.seq, 1) - }) - - it('OK with different IDs', async () => { - const ack1 = await queue.add(JOB_NAME_1, 'data', { - id: 'id1', - }) - - equal(ack1.duplicate, false) - equal(ack1.seq, 1) - - const ack2 = await queue.add(JOB_NAME_1, 'data', { - id: 'id2', - }) - - equal(ack2.duplicate, false) - equal(ack2.seq, 2) - }) - - it('OK duplicated IDs', async () => { - const id = '12345' - - const ack1 = await queue.add(JOB_NAME_1, 'data', { - id, - }) - - equal(ack1.duplicate, false) - equal(ack1.seq, 1) - - const ack2 = await queue.add(JOB_NAME_1, 'data', { - id, - }) - - equal(ack2.duplicate, true) - equal(ack2.seq, 1) - }) -}) \ No newline at end of file diff --git a/test/queue.test.ts b/test/queue.test.ts deleted file mode 100644 index 06da21b..0000000 --- a/test/queue.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { equal, deepEqual } from 'assert/strict' -import { describe, it, before, after, afterEach } from 'node:test' - -import { connect, nanos } from '@nats-io/transport-node' -import { jetstream } from '@nats-io/jetstream' -import { NatsConnection } from '@nats-io/nats-core' -import type { JetStreamClient, JetStreamManager } from '@nats-io/jetstream' - - -import { DEFAULT_DEDUPLICATE_WINDOW, Queue, Worker } from '../src' - - -describe('Queue', () => { - let connection: NatsConnection - let client: JetStreamClient - let manager: JetStreamManager - - const NAME_1 = 'queue1' - - before(async () => { - connection = await connect({ - servers: '127.0.0.1:4222', - }) - client = jetstream(connection) - manager = await client.jetstreamManager() - }) - - afterEach(async () => { - await manager.streams.delete(NAME_1) - }) - - after(async () => { - await connection.close() - }) - - it('create', async () => { - const queue1 = new Queue({ - client, - name: NAME_1, - }) - await queue1.setup() - - const stream = await client.streams.get(NAME_1) - - const { config: { name, subjects, duplicate_window }, state: { messages } } = await stream.info() - - equal(name, NAME_1) - deepEqual(subjects, [`${name}.*`]) - equal(duplicate_window, nanos(DEFAULT_DEDUPLICATE_WINDOW)) - equal(messages, 0) - }) - - it('create multiple', async () => { - const queue1 = new Queue({ - client, - name: NAME_1, - }) - await queue1.setup() - - const deduplicateWindow = 3_000 - const queue2 = new Queue({ - client, - name: NAME_1, - deduplicateWindow, - }) - await queue2.setup() - - const stream = await client.streams.get(NAME_1) - - const { config: { name, subjects, duplicate_window } } = await stream.info() - - equal(duplicate_window, nanos(deduplicateWindow)) - }) - - it('create after Worker', async () => { - const worker = new Worker({ - client, - name: NAME_1, - processor: async () => {}, - }) - await worker.setup() - - const queue = new Queue({ - client, - name: NAME_1, - deduplicateWindow: 3_000, - }) - await queue.setup() - }) -}) \ No newline at end of file diff --git a/test/queue/addJob.test.ts b/test/queue/addJob.test.ts new file mode 100644 index 0000000..a220b2e --- /dev/null +++ b/test/queue/addJob.test.ts @@ -0,0 +1,148 @@ +import { describe, it, before, after, beforeEach, afterEach } from 'node:test' +import { + jetstream, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import assert from 'node:assert' +import { Queue } from '../../src/queue' +import { Job } from '../../src/job' + +describe('Queue.addJob()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + let queue: Queue | undefined = undefined + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + }) + + afterEach(async () => { + await jsm.streams.delete(queueName).catch(() => {}) + }) + + after(async () => { + await nc.close() + }) + + it('should add job', async () => { + const job = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + await queue!.addJob(job) + + const stream = await jsm.streams.get(queueName) + const streamInfo = await stream.info() + assert.strictEqual(streamInfo.state.messages, 1) + }) + + it('should add job with priority', async () => { + const job = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + await queue!.addJob(job, 2) + + const message = await jsm.streams.getMessage(queueName, { + seq: 1, + }) + assert.strictEqual(message?.subject, `${queueName}.2`) + }) + + it('should limit job priority to queues max priority', async () => { + const job = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + await queue!.addJob(job, queueMaxPriority + 1) + + const message = await jsm.streams.getMessage(queueName, { + seq: 1, + }) + assert.strictEqual(message?.subject, `${queueName}.${queueMaxPriority}`) + }) + + it('should add job with data', async () => { + const data = { + test1: 25, + test2: { + test: 'test42', + }, + } + const job = new Job({ + name: 'test', + queueName: queueName, + data: data, + }) + await queue!.addJob(job, queueMaxPriority + 1) + + const message = await jsm.streams.getMessage(queueName, { + seq: 1, + }) + if (!message) throw new Error('Message not found') + const parsedData: Job = message.json() + assert.deepStrictEqual(parsedData.data, data) + }) + + it('should deduplicate jobs', async () => { + const job = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + await queue!.addJob(job) + await queue!.addJob(job, queueMaxPriority) + + const stream = await jsm.streams.get(queueName) + const streamInfo = await stream.info() + assert.strictEqual(streamInfo.state.messages, 1) + }) + + it('should add job with correct meta', async () => { + const job = new Job({ + name: 'test', + queueName: queueName, + timeout: 1000, + data: { + test1: 25, + test2: { + test: 'test42', + }, + }, + }) + await queue!.addJob(job, queueMaxPriority + 1) + + const message = await jsm.streams.getMessage(queueName, { + seq: 1, + }) + if (!message) throw new Error('Message not found') + + const parsedData: Job = message.json() + assert.strictEqual(parsedData.meta.retryCount, 0) + assert.strictEqual(parsedData.meta.failed, false) + assert.strictEqual(parsedData.meta.timeout, 1000) + }) +}) diff --git a/test/queue/addJobs.test.ts b/test/queue/addJobs.test.ts new file mode 100644 index 0000000..bd29d64 --- /dev/null +++ b/test/queue/addJobs.test.ts @@ -0,0 +1,69 @@ +import { describe, it, before, after, beforeEach, afterEach } from 'node:test' +import { + jetstream, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import assert from 'node:assert' +import { Queue } from '../../src/queue' +import { Job } from '../../src/job' + +describe('Queue.addJobs()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + let queue: Queue | undefined = undefined + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + }) + + afterEach(async () => { + await jsm.streams.delete(queueName).catch(() => {}) + }) + + after(async () => { + await nc.close() + }) + + it('should add multiple jobs', async () => { + const job1 = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + const job2 = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + const job3 = new Job({ + name: 'test', + queueName: queueName, + data: {}, + }) + await queue!.addJobs([job1, job2, job3], 2) + + const stream = await jsm.streams.get(queueName) + const streamInfo = await stream.info() + assert.strictEqual(streamInfo.state.messages, 3) + }) +}) diff --git a/test/queue/constructor.test.ts b/test/queue/constructor.test.ts new file mode 100644 index 0000000..5975cbb --- /dev/null +++ b/test/queue/constructor.test.ts @@ -0,0 +1,75 @@ +import { describe, it, before, after } from 'node:test' +import { jetstream, JetStreamClient } from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import assert from 'node:assert' +import { Queue } from '../../src/queue' + +describe('Queue.constructor()', () => { + let nc: NatsConnection + let js: JetStreamClient + const queueName = 'queue' + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + }) + + after(async () => { + await nc.close() + }) + + it('should fail if duplicateWindow < 100ms', async () => { + try { + new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: 1, + duplicateWindow: 99, + }) + assert.fail('Expected error') + } catch (e) { + assert.strictEqual( + (e as Error).message, + "Parameter 'duplicateWindow' must be more than or equal to 100", + ) + } + }) + + it('should fail if priorites <= 0', async () => { + try { + new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: 0, + duplicateWindow: 100, + }) + assert.fail('Expected error') + } catch (e) { + assert.strictEqual( + (e as Error).message, + "Parameter 'priorities' must be greater than 0", + ) + } + }) + + it('should fail if name is empty', async () => { + try { + new Queue({ + name: '', + client: js, + connection: nc, + priorities: 0, + duplicateWindow: 100, + }) + assert.fail('Expected error') + } catch (e) { + assert.strictEqual( + (e as Error).message, + "Parameter 'name' cannot be empty", + ) + } + }) +}) diff --git a/test/queue/setup.test.ts b/test/queue/setup.test.ts new file mode 100644 index 0000000..ffd1a9d --- /dev/null +++ b/test/queue/setup.test.ts @@ -0,0 +1,66 @@ +import { describe, it, before, after, afterEach } from 'node:test' +import { + jetstream, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import assert from 'node:assert' +import { Queue } from '../../src/queue' + +describe('Queue.setup()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + afterEach(async () => { + await jsm.streams.delete(queueName) + }) + + after(async () => { + await nc.close() + }) + + it('should create queue', async () => { + const queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: 1, + duplicateWindow: 2000, + }) + + await queue.setup() + + const streamInfo = await jsm.streams.get(queueName) + assert.strictEqual(streamInfo.name, queueName) + }) + + it('should update queue', async () => { + await jsm.streams.add({ + name: queueName, + }) + + const queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: 1, + duplicateWindow: 2000, + }) + + await queue.setup() + + const streamInfo = await jsm.streams.get(queueName) + assert.strictEqual(streamInfo.name, queueName) + }) +}) diff --git a/test/worker-start-stop.test.ts b/test/worker-start-stop.test.ts deleted file mode 100644 index a3beb31..0000000 --- a/test/worker-start-stop.test.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { equal, deepEqual } from 'assert/strict' -import { describe, it, before, after, afterEach, beforeEach } from 'node:test' - -import { connect, nanos } from '@nats-io/transport-node' -import { jetstream } from '@nats-io/jetstream' -import { NatsConnection } from '@nats-io/nats-core' -import type { ConsumerInfo, JetStreamClient, JetStreamManager } from '@nats-io/jetstream' - - -import { Queue, Worker } from '../src' - -describe('Worker start() & stop()', () => { - let connection: NatsConnection - let client: JetStreamClient - let manager: JetStreamManager - let queue: Queue - - const NAME_1 = 'queue1' - - before(async () => { - connection = await connect({ - servers: '127.0.0.1:4222', - }) - client = jetstream(connection) - manager = await client.jetstreamManager() - }) - - beforeEach(async () => { - queue = new Queue({ - name: NAME_1, - client, - }) - await queue.setup() - }) - - afterEach(async () => { - await manager.consumers.delete(NAME_1, NAME_1) - await manager.streams.delete(NAME_1) - }) - - after(async () => { - await connection.close() - }) - - it('start and stop', async () => { - const worker = new Worker({ - client, - name: NAME_1, - processor: async () => {}, - }) - await worker.setup() - - worker.start() - - await worker.stop() - }) -}) \ No newline at end of file diff --git a/test/worker.test.ts b/test/worker.test.ts deleted file mode 100644 index 40ea223..0000000 --- a/test/worker.test.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { equal, deepEqual } from 'assert/strict' -import { describe, it, before, after, afterEach } from 'node:test' - -import { connect, nanos } from '@nats-io/transport-node' -import { jetstream } from '@nats-io/jetstream' -import { NatsConnection } from '@nats-io/nats-core' -import type { ConsumerInfo, JetStreamClient, JetStreamManager } from '@nats-io/jetstream' - - -import { Queue, Worker } from '../src' - -describe('Worker', () => { - let connection: NatsConnection - let client: JetStreamClient - let manager: JetStreamManager - - const NAME_1 = 'queue1' - - before(async () => { - connection = await connect({ - servers: '127.0.0.1:4222', - }) - client = jetstream(connection) - manager = await client.jetstreamManager() - }) - - afterEach(async () => { - await manager.consumers.delete(NAME_1, NAME_1) - await manager.streams.delete(NAME_1) - }) - - after(async () => { - await connection.close() - }) - - it('create', async () => { - const worker1 = new Worker({ - client, - name: NAME_1, - processor: async () => {}, - }) - await worker1.setup() - - const info = await manager.consumers.list(NAME_1) - - const consumers: ConsumerInfo[] = [] - for await (const c of info) { - consumers.push(c) - } - - equal(consumers.length, 1) - }) - - it('create multiple', async () => { - const worker1 = new Worker({ - client, - name: NAME_1, - processor: async () => {}, - }) - await worker1.setup() - - const worker2 = new Worker({ - client, - name: NAME_1, - processor: async () => {}, - }) - await worker2.setup() - - const info = await manager.consumers.list(NAME_1) - - const consumers: ConsumerInfo[] = [] - for await (const c of info) { - consumers.push(c) - } - - equal(consumers.length, 1) - }) - - it('create after Queue', async () => { - const queue = new Queue({ - name: NAME_1, - client, - deduplicateWindow: 3000, - }) - await queue.setup() - - const worker = new Worker({ - name: NAME_1, - client, - processor: async () => {}, - }) - await worker.setup() - }) -}) \ No newline at end of file diff --git a/test/worker/process/common/test.ts b/test/worker/process/common/test.ts new file mode 100644 index 0000000..786ac09 --- /dev/null +++ b/test/worker/process/common/test.ts @@ -0,0 +1,103 @@ +import { + after, + afterEach, + before, + beforeEach, + describe, + it, + mock, +} from 'node:test' +import { Queue } from '../../../../src/queue' +import { Worker } from '../../../../src/worker' +import { connect, NatsConnection } from '@nats-io/transport-node' +import { + jetstream, + JetStreamClient, + JetStreamManager, + JsMsg, +} from '@nats-io/jetstream' +import { Job } from '../../../../src/job' +import assert from 'assert' +import { sleep } from '../../../../src/utils' +import { deleteAllKV } from '../../../helpers/deleteAllKV' +import { deleteAllStreams } from '../../../helpers/deleteAllStreams' + +describe('Worker.process(): common', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + const maxRetries = 2 + let queue: Queue | undefined = undefined + let worker: Worker | undefined = undefined + let processorMock: ReturnType< + typeof mock.fn<(job: JsMsg, timeout: number) => Promise> + > + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + }) + + beforeEach(async () => { + await deleteAllKV(nc) + await deleteAllStreams(jsm) + + // Create a mock function for the processor + processorMock = mock.fn<(job: JsMsg, timeout: number) => Promise>( + async () => {}, + ) + + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + + worker = new Worker({ + name: queueName, + client: js, + processor: processorMock, + maxRetries, + }) + + await worker.setup() + }) + + afterEach(async () => { + await worker!.stop() + }) + + after(async () => { + await nc.close() + }) + + it('should process job', async () => { + await worker!.start() + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + + await queue!.addJob(job) + + await sleep(100) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 1) + + const processedJob: Job = processorMock.mock.calls[0].arguments[0].json() + assert(processedJob.id === job.id) + assert(processedJob.name === job.name) + assert(processedJob.queueName === job.queueName) + assert.deepStrictEqual(processedJob.data, job.data) + }) +}) diff --git a/test/worker/process/priority/test.ts b/test/worker/process/priority/test.ts new file mode 100644 index 0000000..733f155 --- /dev/null +++ b/test/worker/process/priority/test.ts @@ -0,0 +1,202 @@ +import { + after, + afterEach, + before, + beforeEach, + describe, + it, + mock, +} from 'node:test' +import { Queue } from '../../../../src/queue' +import { Worker } from '../../../../src/worker' +import { connect, NatsConnection } from '@nats-io/transport-node' +import { + jetstream, + JetStreamClient, + JetStreamManager, + JsMsg, +} from '@nats-io/jetstream' +import { Job } from '../../../../src/job' +import assert from 'assert' +import { sleep } from '../../../../src/utils' + +describe('Worker.process(): priority', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + const maxRetries = 2 + let queue: Queue | undefined = undefined + let worker: Worker | undefined = undefined + let processorMock: ReturnType< + typeof mock.fn<(job: JsMsg, timeout: number) => Promise> + > + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + // Create a mock function for the processor + processorMock = mock.fn<(job: JsMsg, timeout: number) => Promise>( + async () => {}, + ) + + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + + worker = new Worker({ + name: queueName, + client: js, + processor: processorMock, + maxRetries, + priorities: queueMaxPriority, + priorityQuota: new Map([ + [1, { quota: 2 }], + [2, { quota: 1 }], + [3, { quota: 1 }], + ]), + rateLimit: { + duration: 1000, + max: 1, + }, + fetchInterval: 50, + fetchTimeout: 1000, + }) + + await worker.setup() + }) + + afterEach(async () => { + await worker!.stop() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + after(async () => { + await nc.close() + }) + + it('should process jobs from priority quota first', async () => { + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + const job2 = new Job({ + id: 'job2', + name: 'job2', + queueName: queueName, + data: { + message: 42, + }, + }) + const job3 = new Job({ + id: 'job3', + name: 'job3', + queueName: queueName, + data: { + message: 42, + }, + }) + + await queue!.addJobs([job2, job3], 1) + await queue!.addJobs([job], 3) + + await worker!.start() + await sleep(200) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 2) + + const job2Data: Job = processorMock.mock.calls[0].arguments[0].json() + assert.strictEqual(job2Data.name, job2.name) + + const job3Data: Job = processorMock.mock.calls[1].arguments[0].json() + assert.strictEqual(job3Data.name, job3.name) + }) + + it('should process no more jobs than available in quota', async () => { + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + const job2 = new Job({ + id: 'job2', + name: 'job2', + queueName: queueName, + data: { + message: 42, + }, + }) + const job3 = new Job({ + id: 'job3', + name: 'job3', + queueName: queueName, + data: { + message: 42, + }, + }) + + await queue!.addJobs([job2, job3, job], 1) + + await worker!.start() + await sleep(200) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 2) + + const job2Data: Job = processorMock.mock.calls[0].arguments[0].json() + assert.strictEqual(job2Data.name, job2.name) + + const job3Data: Job = processorMock.mock.calls[1].arguments[0].json() + assert.strictEqual(job3Data.name, job3.name) + }) + + it('should process from next quota if previous does not have enough jobs', async () => { + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + const job2 = new Job({ + id: 'job2', + name: 'job2', + queueName: queueName, + data: { + message: 42, + }, + }) + + await queue!.addJobs([job], 1) + await queue!.addJobs([job2], 2) + + await worker!.start() + await sleep(2000) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 2) + + const jobData: Job = processorMock.mock.calls[0].arguments[0].json() + assert.strictEqual(jobData.name, job.name) + + const job2Data: Job = processorMock.mock.calls[1].arguments[0].json() + assert.strictEqual(job2Data.name, job2.name) + }) +}) diff --git a/test/worker/process/rateLimit/test.ts b/test/worker/process/rateLimit/test.ts new file mode 100644 index 0000000..d875d05 --- /dev/null +++ b/test/worker/process/rateLimit/test.ts @@ -0,0 +1,114 @@ +import { + after, + afterEach, + before, + beforeEach, + describe, + it, + mock, +} from 'node:test' +import { Queue } from '../../../../src/queue' +import { Worker } from '../../../../src/worker' +import { connect, NatsConnection } from '@nats-io/transport-node' +import { + jetstream, + JetStreamClient, + JetStreamManager, + JsMsg, +} from '@nats-io/jetstream' +import { Job } from '../../../../src/job' +import assert from 'assert' +import { sleep } from '../../../../src/utils' + +describe('Worker.process(): rateLimit', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + const maxRetries = 2 + let queue: Queue | undefined = undefined + let worker: Worker | undefined = undefined + let processorMock: ReturnType< + typeof mock.fn<(job: JsMsg, timeout: number) => Promise> + > + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + // Create a mock function for the processor + processorMock = mock.fn<(job: JsMsg, timeout: number) => Promise>( + async () => {}, + ) + + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + + worker = new Worker({ + name: queueName, + client: js, + processor: processorMock, + maxRetries, + rateLimit: { + duration: 1000, + max: 2, + }, + }) + + await worker.setup() + }) + + afterEach(async () => { + await worker!.stop() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + after(async () => { + await nc.close() + }) + + it('should process no more than rate limit', async () => { + await worker!.start() + const job1 = new Job({ + name: 'job1', + id: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + const job2 = new Job({ + name: 'job1', + id: 'job2', + queueName: queueName, + data: { + message: 42, + }, + }) + const job3 = new Job({ + name: 'job1', + id: 'job3', + queueName: queueName, + data: { + message: 42, + }, + }) + + await queue!.addJobs([job1, job2, job3]) + + await sleep(200) + + assert.strictEqual(processorMock.mock.calls.length, 2) + }) +}) diff --git a/test/worker/process/retries/test.ts b/test/worker/process/retries/test.ts new file mode 100644 index 0000000..ce08e70 --- /dev/null +++ b/test/worker/process/retries/test.ts @@ -0,0 +1,132 @@ +import { + after, + afterEach, + before, + beforeEach, + describe, + it, + mock, +} from 'node:test' +import { Queue } from '../../../../src/queue' +import { Worker } from '../../../../src/worker' +import { connect, NatsConnection } from '@nats-io/transport-node' +import { + jetstream, + JetStreamClient, + JetStreamManager, + JsMsg, +} from '@nats-io/jetstream' +import { Job } from '../../../../src/job' +import assert from 'assert' +import { sleep } from '../../../../src/utils' + +describe('Worker.process(): retries', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + const queueMaxPriority = 3 + const maxRetries = 2 + let queue: Queue | undefined = undefined + let worker: Worker | undefined = undefined + let processorMock: ReturnType< + typeof mock.fn<(job: JsMsg, timeout: number) => Promise> + > + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + beforeEach(async () => { + // Create a mock function for the processor + processorMock = mock.fn<(job: JsMsg, timeout: number) => Promise>( + async () => {}, + ) + + queue = new Queue({ + name: queueName, + client: js, + connection: nc, + priorities: queueMaxPriority, + duplicateWindow: 2000, + }) + await queue.setup() + + worker = new Worker({ + name: queueName, + client: js, + processor: processorMock, + maxRetries, + }) + + await worker.setup() + }) + + afterEach(async () => { + await worker!.stop() + await jsm.streams.delete(queueName).catch(() => {}) + }) + + after(async () => { + await nc.close() + }) + + it('should retry job', async () => { + await worker!.start() + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + + processorMock.mock.mockImplementationOnce(async () => { + throw new Error('Processing failed') + }) + + await queue!.addJob(job) + + await sleep(500) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, 2) + + const processedJob: Job = processorMock.mock.calls[0].arguments[0].json() + assert(processedJob.id === job.id) + assert(processedJob.name === job.name) + assert(processedJob.queueName === job.queueName) + assert.deepStrictEqual(processedJob.data, job.data) + }) + + it('should retry job not more than maxAttempts times', async () => { + await worker!.start() + const job = new Job({ + id: 'job1', + name: 'job1', + queueName: queueName, + data: { + message: 42, + }, + }) + + processorMock.mock.mockImplementation(async () => { + throw new Error('Processing failed') + }) + + await queue!.addJob(job) + + await sleep(500) // Wait for job to be processed + + assert.strictEqual(processorMock.mock.calls.length, maxRetries) + + const processedJob: Job = processorMock.mock.calls[0].arguments[0].json() + assert(processedJob.id === job.id) + assert(processedJob.name === job.name) + assert(processedJob.queueName === job.queueName) + assert.deepStrictEqual(processedJob.data, job.data) + }) +}) diff --git a/test/worker/setup.test.ts b/test/worker/setup.test.ts new file mode 100644 index 0000000..957204d --- /dev/null +++ b/test/worker/setup.test.ts @@ -0,0 +1,123 @@ +import { describe, it, before, after, beforeEach, afterEach } from 'node:test' +import { + jetstream, + JetStreamApiError, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import { Worker } from '../../src/worker' +import assert from 'node:assert' +import { Kvm } from '@nats-io/kv' + +describe('Worker.setup()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + let kvm: Kvm + const queueName = 'queue' + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + kvm = await new Kvm(nc) + }) + + beforeEach(async () => { + await jsm.streams.add({ + name: queueName, + subjects: [`${queueName}.*.*`], + }) + }) + + afterEach(async () => { + await jsm.streams.delete(queueName) + }) + + after(async () => { + await nc.close() + }) + + it('should initialize successfully', async () => { + const worker = new Worker({ + client: js, + name: queueName, + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 1, + }) + + await worker.setup() + }) + + it('should create 1 consumer for each priority', async () => { + const worker = new Worker({ + client: js, + name: queueName, + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 3, + }) + + await worker.setup() + + const consumerName1 = `worker_group_1` + const consumerName2 = `worker_group_2` + const consumerName3 = `worker_group_3` + + try { + await js.consumers.get(queueName, consumerName1) + await js.consumers.get(queueName, consumerName2) + await js.consumers.get(queueName, consumerName3) + } catch { + assert.fail('Consumer not found') + } + }) + + it('should create parent_id bucket for stream', async () => { + const worker = new Worker({ + client: js, + name: queueName, + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 1, + }) + + await worker.setup() + + const bucketName = `${queueName}_parent_id` + + try { + await kvm.open(bucketName) + } catch { + assert.fail('Bucket not found') + } + }) + + it('should fail if stream not found', async () => { + const worker = new Worker({ + client: js, + name: 'non_existent_stream', + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 1, + }) + + try { + await worker.setup() + assert.fail('Expected error') + } catch (e) { + if (e instanceof JetStreamApiError) { + assert(e.name === 'StreamNotFoundError', 'StreamNotFoundError expected') + } else { + assert.fail('Unexpected error') + } + } + }) +}) diff --git a/test/worker/start-stop.test.ts b/test/worker/start-stop.test.ts new file mode 100644 index 0000000..5821cca --- /dev/null +++ b/test/worker/start-stop.test.ts @@ -0,0 +1,73 @@ +import { describe, it, before, after, beforeEach, afterEach } from 'node:test' +import { + jetstream, + JetStreamClient, + JetStreamManager, +} from '@nats-io/jetstream' +import { connect } from '@nats-io/transport-node' +import { NatsConnection } from '@nats-io/nats-core' +import { Worker } from '../../src/worker' +import assert from 'node:assert' + +describe('Worker.start(), Worker.stop()', () => { + let nc: NatsConnection + let js: JetStreamClient + let jsm: JetStreamManager + const queueName = 'queue' + + before(async () => { + nc = await connect({ servers: '127.0.0.1:4222' }) + js = jetstream(nc) + jsm = await js.jetstreamManager() + }) + + beforeEach(async () => { + await jsm.streams.add({ + name: queueName, + subjects: [`${queueName}.*.*`], + }) + }) + + afterEach(async () => { + await jsm.streams.delete(queueName) + }) + + after(async () => { + await nc.close() + }) + + it('should start and stop successfully', async () => { + const worker = new Worker({ + client: js, + name: queueName, + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 1, + }) + + await worker.setup() + + worker.start() + + await worker.stop() + }) + + it('should fail if called before setup()', async () => { + const worker = new Worker({ + client: js, + name: queueName, + processor: async () => {}, + concurrency: 1, + maxRetries: 1, + priorities: 1, + }) + + try { + worker.start() + assert.fail('Expected error') + } catch (e) { + assert.strictEqual((e as Error).message, 'call setup() before start()') + } + }) +}) diff --git a/yarn.lock b/yarn.lock index c64026a..9e2da08 100644 --- a/yarn.lock +++ b/yarn.lock @@ -120,41 +120,49 @@ "@jridgewell/resolve-uri" "^3.1.0" "@jridgewell/sourcemap-codec" "^1.4.14" -"@nats-io/jetstream@3.0.0-10": - version "3.0.0-10" - resolved "https://registry.yarnpkg.com/@nats-io/jetstream/-/jetstream-3.0.0-10.tgz#82beb86ef85404c3f09bf3eda5a21902be335a7e" - integrity sha512-FOdwBximucQszBAsSAckqhXxh0WZozKbRTMzODCudcHpOEv0jUwnoqFLG/8r607pJDX1gdiYADF5hpeV3gvh7A== +"@nats-io/jetstream@3.0.2": + version "3.0.2" + resolved "https://registry.yarnpkg.com/@nats-io/jetstream/-/jetstream-3.0.2.tgz#f6baa09b46e27e8e779d5391d54cd7d76be8db23" + integrity sha512-dR789x1xhMyEukXILPWDo6ODW53nHpBD9LnsnnywA+pTkwYFydEkYdyhv9HzFc6s6Nfrb2TpibrT4znijlg2dg== dependencies: - "@nats-io/nats-core" "~3.0.0-26" + "@nats-io/nats-core" "3.0.2" -"@nats-io/nats-core@3.0.0-27", "@nats-io/nats-core@~3.0.0-26": - version "3.0.0-27" - resolved "https://registry.yarnpkg.com/@nats-io/nats-core/-/nats-core-3.0.0-27.tgz#e6ccd3d7acd9786cb9b3219ccb942d721586c201" - integrity sha512-xJvUWoRvUTG/3K4Ppf5KXTZezicbvd2o/psMyI7YPtIxKZsZALXL/bLoNb7FB/Hw4jECioZE7LZCSK41npFD2A== +"@nats-io/kv@3.0.2": + version "3.0.2" + resolved "https://registry.yarnpkg.com/@nats-io/kv/-/kv-3.0.2.tgz#7407bae502308bbf730ebe563da434df81153b5c" + integrity sha512-/4YvVlgmS6HEtukJE66kelJOHovB411GFo8kTUnUr6FYwws9eNMhsOwvBC4HEZXy9m8VjlA0t4nPPoH66bj2lg== dependencies: - "@nats-io/nkeys" "1.2.0-4" - "@nats-io/nuid" "2.0.1-2" + "@nats-io/jetstream" "3.0.2" + "@nats-io/nats-core" "3.0.2" -"@nats-io/nkeys@1.2.0-4", "@nats-io/nkeys@^1.2.0-4": - version "1.2.0-4" - resolved "https://registry.yarnpkg.com/@nats-io/nkeys/-/nkeys-1.2.0-4.tgz#3e16d9076b2f936181dd9d6123dc705daccf714b" - integrity sha512-61R4SO9059AyZBhoZRECHobHsY6mBOGwB5ewBrbnDKVtTehnFwyMNaORHhksFmu9K0zyUtwBWyffzKpibp2Cew== +"@nats-io/nats-core@3.0.2": + version "3.0.2" + resolved "https://registry.yarnpkg.com/@nats-io/nats-core/-/nats-core-3.0.2.tgz#e4dcfa440583e9077673d00f33995f57f040d7dc" + integrity sha512-gACbRIhY3yQTO+5C4E1LY/qO2e5EhGUBeDsnPDT8Hd3qY9mfhNtluY6R7d7WX7/JMGsXRPbYdSLSADr/ECSy3w== dependencies: - tweetnacl "1.0.3" + "@nats-io/nkeys" "2.0.3" + "@nats-io/nuid" "2.0.3" -"@nats-io/nuid@2.0.1-2", "@nats-io/nuid@^2.0.1-2": - version "2.0.1-2" - resolved "https://registry.yarnpkg.com/@nats-io/nuid/-/nuid-2.0.1-2.tgz#ec2d9bd0879fecfea1996dcc1f820af6860ea52f" - integrity sha512-oXJiWXH87FjcPAXVVkoY+o7z9YFuOvsVlUlvB4h/QsoC1hp63lB18YOMHmKxVNO7BzvwzJh3SKLlIlsQrEwaxg== +"@nats-io/nkeys@2.0.3": + version "2.0.3" + resolved "https://registry.yarnpkg.com/@nats-io/nkeys/-/nkeys-2.0.3.tgz#37ce8725e075033202c9ca6093bb22c4a1989430" + integrity sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A== + dependencies: + tweetnacl "^1.0.3" + +"@nats-io/nuid@2.0.3": + version "2.0.3" + resolved "https://registry.yarnpkg.com/@nats-io/nuid/-/nuid-2.0.3.tgz#3afe66825dcb2921386d9a43ba1175a2e36d528c" + integrity sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew== -"@nats-io/transport-node@3.0.0-12": - version "3.0.0-12" - resolved "https://registry.yarnpkg.com/@nats-io/transport-node/-/transport-node-3.0.0-12.tgz#714ee323089a0c0e58c17897131f943719beeb46" - integrity sha512-TWBlZULn02iY3A3+MrC5CztQ4YclFQPiQrH56fToC68NA1rfqWw54RPWaUVrbjgg5u6dX1CH6yuNZImdZV5HGA== +"@nats-io/transport-node@3.0.2": + version "3.0.2" + resolved "https://registry.yarnpkg.com/@nats-io/transport-node/-/transport-node-3.0.2.tgz#58125d6e4ce0eba8b5f9ef64c43cf45ff491ad54" + integrity sha512-hPuep/ObVepjAM1dFy+XhqMlIzga3bQpbgQiOWM5AxwNZ9CF7GHAtfaxG7WY6+t057tBW8axC0nBL71Q8chQBQ== dependencies: - "@nats-io/nats-core" "~3.0.0-26" - "@nats-io/nkeys" "^1.2.0-4" - "@nats-io/nuid" "^2.0.1-2" + "@nats-io/nats-core" "3.0.2" + "@nats-io/nkeys" "2.0.3" + "@nats-io/nuid" "2.0.3" "@nodelib/fs.scandir@2.1.5": version "2.1.5" @@ -232,12 +240,12 @@ resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.15.tgz#596a1747233694d50f6ad8a7869fcb6f56cf5841" integrity sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA== -"@types/node@^20.0.0": - version "20.16.10" - resolved "https://registry.yarnpkg.com/@types/node/-/node-20.16.10.tgz#0cc3fdd3daf114a4776f54ba19726a01c907ef71" - integrity sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA== +"@types/node@22.0.0": + version "22.0.0" + resolved "https://registry.yarnpkg.com/@types/node/-/node-22.0.0.tgz#04862a2a71e62264426083abe1e27e87cac05a30" + integrity sha512-VT7KSYudcPOzP5Q0wfbowyNLaVR8QWUdw+088uFWwfvpY6uCWaXpqV6ieLAu9WBcnTa7H4Z5RLK8I5t2FuOcqw== dependencies: - undici-types "~6.19.2" + undici-types "~6.11.1" "@typescript-eslint/eslint-plugin@8.6.0": version "8.6.0" @@ -1155,7 +1163,7 @@ ts-node@10.9.2: v8-compile-cache-lib "^3.0.1" yn "3.1.1" -tweetnacl@1.0.3: +tweetnacl@^1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-1.0.3.tgz#ac0af71680458d8a6378d0d0d050ab1407d35596" integrity sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw== @@ -1181,10 +1189,10 @@ typescript@5.6.2: resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.6.2.tgz#d1de67b6bef77c41823f822df8f0b3bcff60a5a0" integrity sha512-NW8ByodCSNCwZeghjN3o+JX5OFH0Ojg6sadjEKY4huZ52TqbJTJnDo5+Tw98lSy63NZvi4n+ez5m2u5d4PkZyw== -undici-types@~6.19.2: - version "6.19.8" - resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.19.8.tgz#35111c9d1437ab83a7cdc0abae2f26d88eda0a02" - integrity sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw== +undici-types@~6.11.1: + version "6.11.1" + resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.11.1.tgz#432ea6e8efd54a48569705a699e62d8f4981b197" + integrity sha512-mIDEX2ek50x0OlRgxryxsenE5XaQD4on5U2inY7RApK3SOJpofyw7uW2AyfMKkhAxXIceo2DeWGVGwyvng1GNQ== uri-js@^4.2.2: version "4.4.1"