Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
node_modules
coverage

dist
dist

.github/copilot-instructions.md
.gitconfig
6 changes: 6 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"trailingComma": "all",
"tabWidth": 2,
"singleQuote": true,
"semi": false
}
9 changes: 6 additions & 3 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// @ts-check

import eslint from '@eslint/js';
import tseslint from 'typescript-eslint';
import eslint from '@eslint/js'
import tseslint from 'typescript-eslint'

export default tseslint.config(
eslint.configs.recommended,
...tseslint.configs.recommended,
);
{
ignores: ['node_modules/**', 'dist/**'],
},
)
16 changes: 10 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "",
"description": "Queue over Nats",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"repository": "https://github.com/arusakov/node-nats-queue.git",
"author": "Aleksandr Rusakov <aleksandr.s.rusakov@gmail.com>",
"license": "MIT",
Expand All @@ -13,18 +14,15 @@
"build": "tsc -p .",
"compile": "tsc --noEmit -p ./test",
"lint": "eslint .",
"test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/*.test.ts",
"test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/**/*.test.ts",
"test:coverage:html": "c8 --reporter=html --reporter=text yarn test:all",
"test:coverage": "c8 --reporter=lcovonly --reporter=text yarn test:all",
"test": "node --test --test-concurrency=1 --require=ts-node/register"
},
"devDependencies": {
"@eslint/js": "9.11.0",
"@nats-io/jetstream": "3.0.0-10",
"@nats-io/nats-core": "3.0.0-27",
"@nats-io/transport-node": "3.0.0-12",
"@types/eslint__js": "8.42.3",
"@types/node": "^20.0.0",
"@types/node": "22.0.0",
"c8": "10.1.2",
"eslint": "9.11.0",
"ts-node": "10.9.2",
Expand All @@ -33,5 +31,11 @@
},
"engines": {
"node": ">=20.0.0"
},
"dependencies": {
"@nats-io/kv": "3.0.2",
"@nats-io/jetstream": "3.0.2",
"@nats-io/nats-core": "3.0.2",
"@nats-io/transport-node": "3.0.2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот эти зависимости надо указывать в peerDependecies.
И в дев вернуть.

}
}
}
209 changes: 3 additions & 206 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,206 +1,3 @@
import EventEmitter from 'events'

import { AckPolicy } from '@nats-io/jetstream'
import { nanos, NatsError } from '@nats-io/nats-core'
import type { JetStreamClient, Consumer, JsMsg} from '@nats-io/jetstream'
import type { MsgHdrs } from '@nats-io/nats-core'

import { createSubject, sleep } from './utils'
import { FixedWindowLimiter, IntervalLimiter, type Limiter } from './limiter'


export type QueueOpts
= {
client: JetStreamClient
name: string

/**
* ms
*/
deduplicateWindow?: number
}

export type Job = {
id?: number | string
name: string
data: unknown
}

export type AddOptions = {
id?: string
headers?: MsgHdrs
}

export const DEFAULT_DEDUPLICATE_WINDOW = 2_000

export class Queue {
protected readonly client: JetStreamClient
protected readonly name: string

protected readonly deduplicateWindow: number

constructor(opts: QueueOpts) {
this.client = opts.client
this.name = opts.name

this.deduplicateWindow = opts.deduplicateWindow || DEFAULT_DEDUPLICATE_WINDOW
}

async setup() {
const manager = await this.client.jetstreamManager()

try {
await manager.streams.add({
name: this.name,
subjects: [`${this.name}.*`],
duplicate_window: nanos(this.deduplicateWindow)
})
} catch (e) {
// TODO smart error handling
if (!(e instanceof NatsError)) {
throw e
}
await manager.streams.update(
this.name,
{
subjects: [`${this.name}.*`],
duplicate_window: nanos(this.deduplicateWindow)
}
)
}

}

async add(name: string, data?: unknown, options?: AddOptions) {
const payload = JSON.stringify(data)
return this.client.publish(`${this.name}.${name}`, payload, options && {
msgID: options.id,
headers: options.headers
})
}
}

export type RateLimit = {
duration: number
max: number
}

export type WorkerOpts = {
client: JetStreamClient
name: string
processor: (job: JsMsg) => Promise<void>
concurrency?: number
rateLimit?: RateLimit
}

export class Worker extends EventEmitter {
protected readonly client: JetStreamClient
protected readonly name: string
protected readonly processor: (job: JsMsg) => Promise<void>
protected readonly concurrency: number
protected readonly limiter: Limiter
protected readonly fetchInterval: number
protected readonly fetchTimeout: number

protected consumer: Consumer | null = null
protected running = false
protected processingNow = 0
protected loopPromise: Promise<void> | null = null

constructor(opts: WorkerOpts) {
super()

this.client = opts.client
this.name = opts.name
this.processor = opts.processor
this.concurrency = opts.concurrency || 1

this.fetchInterval = 150
this.fetchTimeout = 3_000
this.limiter = opts.rateLimit ?
new FixedWindowLimiter(opts.rateLimit.max, opts.rateLimit.duration, this.fetchInterval) :
new IntervalLimiter(this.fetchInterval)
}

async setup() {
const manager = await this.client.jetstreamManager()

try {
await manager.streams.add({
name: this.name,
subjects: [createSubject(this.name)],
})
} catch (e) {
if (!(e instanceof NatsError && e.api_error?.err_code === 10058)) {
throw e
}
}

await manager.consumers.add(this.name, {
durable_name: this.name,
ack_policy: AckPolicy.All,
})

this.consumer = await this.client.consumers.get(this.name, this.name)
}

async stop() {
this.running = false

if (this.loopPromise) {
await this.loopPromise
}
while (this.processingNow > 0) {
await sleep(this.fetchInterval)
}
}

start() {
if (!this.consumer) {
throw new Error('call setup() before start()')
}

if (!this.loopPromise) {
this.running = true
this.loopPromise = this.loop()
}
}

protected async loop() {
while (this.running) {
const max = this.limiter.get(this.concurrency - this.processingNow)
const jobs = await this.fetch(max)

for await (const j of jobs) {
this.limiter.inc()
this.process(j) // without await!
}

await sleep(this.limiter.timeout())
}
}

protected async process(j: JsMsg) {
this.processingNow += 1
try {
this.process(j)
await j.ackAck()
} catch (e) {
await j.term()
} finally {
this.processingNow -= 1
}
}

protected async fetch(count: number) {
try {
return this.consumer!.fetch({
max_messages: count,
expires: this.fetchTimeout
})
} catch (e) {
// TODO
return []
}
}
}
export * from './queue'
export * from './job'
export * from './worker'
29 changes: 29 additions & 0 deletions src/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { JobCreateData } from './types'

export class Job {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо джобу иметь для консьюмера.
И это дает возможность что-то попытаться сделать lazy.

id: string
name: string
meta: {
failed: boolean
startTime: number
retryCount: number
timeout: number
// parentId?: string
}
data: unknown
queueName: string

constructor(data: JobCreateData) {
this.id = data.id ?? `${crypto.randomUUID()}_${Date.now()}`
this.queueName = data.queueName
this.name = data.name
this.data = data.data
this.meta = {
retryCount: 0,
startTime: Date.now() + (data.delay ?? 0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше вообще не писать свойство, если это можно сделать.
И в целом нейминг свойст можно более ужатый

failed: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это по принципу отсутствие failed === false

// TODO: Is this correct?
timeout: data.timeout ?? 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это нафиг

}
}
}
20 changes: 20 additions & 0 deletions src/jobEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type Event<T, D> = {
event: T
data: D
}

export type JobCompletedEvent = Event<'JOB_COMPLETED', { jobId: string }>
export type JobFailedEvent = Event<'JOB_FAILED', { jobId: string }>
// export type JobChildCompletedEvent = Event<
// 'JOB_CHILD_COMPLETED',
// { childId: string; parentId: string }
// >
// export type JobChildFailedEvent = Event<
// 'JOB_CHILD_FAILED',
// { childId: string; parentId: string }
// >

export type JobEvent =
// | JobChildFailedEvent
// | JobChildCompletedEvent
JobCompletedEvent | JobFailedEvent
Loading
Loading