Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions lib/queue/core/payloadBuilder.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ms from 'ms';
import { ulid } from 'ulid';
import { QueueMetadata } from '../metadata';
import { Message, JobOptions, InternalMessage } from '../strategy';

Expand All @@ -7,20 +9,35 @@ type Complete<T> = {
: T[P] | undefined;
};

const calculateDelay = (delay: number | string | Date): number => {
const now = Date.now();
if (delay instanceof Date) {
const time = delay.getTime();
return now > time ? now : time;
}

const delayInMs = typeof delay === 'string' ? ms(delay) : delay * 1000;
const calculatedDelay = now + delayInMs;
if (calculatedDelay < now) return now;
return calculatedDelay;
};

export class PayloadBuilder {
static build(
message: Message,
jobOptions: JobOptions,
): Complete<InternalMessage> {
const defaultOptions = QueueMetadata.getDefaultOptions();
const payload = {
id: ulid(),
attemptCount: 0,
...defaultOptions,
queue: undefined,
...jobOptions,
...message,
} as Complete<InternalMessage>;

payload.delay = calculateDelay(payload.delay || 0);
payload.connection = payload.connection || defaultOptions.connection;

if (!payload.queue) {
Expand Down
59 changes: 59 additions & 0 deletions lib/queue/drivers/database.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { ObjectionService } from '../../database';
import { DbQueueDriverOptions } from '../interfaces';
import { DbJob } from '../interfaces/job';
import { InternalMessage } from '../strategy';
import { PollQueueDriver } from '../strategy/pollQueueDriver';

export class DatabaseQueueDriver implements PollQueueDriver {
private client: any;

constructor(private options: DbQueueDriverOptions) {
this.client = ObjectionService.connection(options?.connection);
}

async init(): Promise<void> {
return;
}

async push(message: string, rawPayload: InternalMessage): Promise<void> {
await this.client
.insert({
id: rawPayload.id,
queue: rawPayload.queue,
payload: message,
scheduledAt: rawPayload.delay,
})
.into(this.options.table);
}

async pull(options: Record<string, any>): Promise<DbJob[] | null> {
const messages = await this.client
.select('*')
.where('queue', options.queue)
.where('scheduled_at', '<=', Date.now())
.from(this.options.table);
return messages.map(m => new DbJob(m));
}

async remove(job: DbJob, options: Record<string, any>): Promise<void> {
await this.client
.del()
.where('id', job.getId())
.where('queue', options.queue)
.from(this.options.table);
}

async purge(options: Record<string, any>): Promise<void> {
await this.client
.del()
.where('queue', options.queue)
.from(this.options.table);
}

async count(options: Record<string, any>): Promise<number> {
return await this.client
.count('1')
.where('queue', options.queue)
.from(this.options.table);
}
}
2 changes: 2 additions & 0 deletions lib/queue/drivers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './sync';
export * from './sqs';
export * from './redis';
export * from './database';
6 changes: 2 additions & 4 deletions lib/queue/drivers/redis.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ulid } from 'ulid';
import { Package } from '../../utils';
import { validateOptions } from '../../utils/helpers';
import { RedisJob } from '../interfaces/redisJob';
import { RedisJob } from '../interfaces/job';
import { RedisQueueOptionsDto } from '../schema';
import { InternalMessage } from '../strategy';
import { PollQueueDriver } from '../strategy/pollQueueDriver';
Expand Down Expand Up @@ -52,7 +52,7 @@ export class RedisQueueDriver implements PollQueueDriver {
async init(): Promise<void> {}

async push(message: string, rawPayload: InternalMessage): Promise<void> {
if ((rawPayload.delay || 0) > 0) {
if (rawPayload.delay > Date.now()) {
await this.pushToDelayedQueue(message, rawPayload);
return;
}
Expand All @@ -75,7 +75,6 @@ export class RedisQueueDriver implements PollQueueDriver {
async purge(options: Record<string, any>): Promise<void> {
await this.client.del(this.getQueue(options.queue));
await this.client.del(this.getDelayedQueue(options.queue));
return;
}

async count(options: Record<string, any>): Promise<number> {
Expand All @@ -91,7 +90,6 @@ export class RedisQueueDriver implements PollQueueDriver {
Date.now() + rawPayload.delay * 1000,
this.getProcessedMessage(message),
);
return;
}

getProcessedMessage(message: string): string {
Expand Down
2 changes: 1 addition & 1 deletion lib/queue/drivers/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Package } from '../../utils';
import { joinUrl, validateOptions } from '../../utils/helpers';
import { SqsJob } from '../interfaces/sqsJob';
import { SqsJob } from '../interfaces/job';
import { SqsQueueOptionsDto } from '../schema';
import { InternalMessage } from '../strategy';
import { PollQueueDriver } from '../strategy/pollQueueDriver';
Expand Down
31 changes: 31 additions & 0 deletions lib/queue/interfaces/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { DriverJob } from '../strategy';

export class RedisJob extends DriverJob {
public getId(): string {
return '';
}

public getMessage(): string {
return this.data.message;
}
}

export class SqsJob extends DriverJob {
public getId(): string {
return '';
}

public getMessage(): string {
return this.data.Body;
}
}

export class DbJob extends DriverJob {
public getId(): string {
return this.data.id;
}

public getMessage(): string {
return this.data.payload;
}
}
11 changes: 10 additions & 1 deletion lib/queue/interfaces/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ export interface RedisQueueDriverOptions {
prefix: string;
}

export interface DbQueueDriverOptions {
listenerType: 'poll';
driver: 'db';
connection?: string;
table: string;
queue: string;
}

export interface QueueOptions {
isGlobal?: boolean;
default: string;
Expand All @@ -48,7 +56,8 @@ export interface QueueOptions {
| SyncQueueDriverOptions
| SqsQueueDriverOptions
| RedisQueueDriverOptions
| QueueDriverOptions;
| QueueDriverOptions
| DbQueueDriverOptions;
};
}

Expand Down
7 changes: 0 additions & 7 deletions lib/queue/interfaces/redisJob.ts

This file was deleted.

7 changes: 0 additions & 7 deletions lib/queue/interfaces/sqsJob.ts

This file was deleted.

2 changes: 1 addition & 1 deletion lib/queue/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class QueueMetadata {
delay: 10,
tries: 5,
timeout: 30,
sleep: 5000,
sleep: 10000,
};
}

Expand Down
7 changes: 6 additions & 1 deletion lib/queue/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { IntentConfig } from '../config/service';
import { logTime } from '../utils/helpers';
import { InternalLogger } from '../utils/logger';
import { Str } from '../utils/string';
import { SqsQueueDriver, SyncQueueDriver } from './drivers';
import {
DatabaseQueueDriver,
SqsQueueDriver,
SyncQueueDriver,
} from './drivers';
import { RedisQueueDriver } from './drivers/redis';
import { QueueDriverOptions, QueueOptions } from './interfaces';
import { QueueMetadata } from './metadata';
Expand All @@ -15,6 +19,7 @@ export class QueueService {
sync: SyncQueueDriver,
sqs: SqsQueueDriver,
redis: RedisQueueDriver,
db: DatabaseQueueDriver,
};

private static connections: Record<string, any> = {};
Expand Down
2 changes: 2 additions & 0 deletions lib/queue/strategy/driverJob.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export abstract class DriverJob {
constructor(public data: Record<string, any>) {}

public abstract getId(): string;

public abstract getMessage(): string;
}
4 changes: 3 additions & 1 deletion lib/queue/strategy/message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export type Payload = Record<string, any> | string | number;

export interface JobOptions {
delay?: number;
delay?: number | string | Date;
tries?: number;
queue?: string;
timeout?: number;
Expand All @@ -15,4 +15,6 @@ export interface Message extends JobOptions {

export interface InternalMessage extends Message {
attemptCount: number;
id: string;
delay?: number;
}
6 changes: 4 additions & 2 deletions lib/queue/workers/pollQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EmitsEvent } from '../../events';
import { Obj } from '../../utils';
import { logTime } from '../../utils/helpers';
import { JobStatusEnum } from '../constants';
import { JobFailed, JobProcessed, JobProcessing } from '../events';
Expand Down Expand Up @@ -58,6 +59,7 @@ export class PollQueueWorker extends BaseQueueWorker {
if (connection.scheduledTask) this.performScheduledTask(connection);

const runner = new JobRunner(this.options, connection);
// eslint-disable-next-line no-constant-condition
while (1) {
const jobs = await this.poll(connection);
if (!jobs.length) {
Expand Down Expand Up @@ -114,7 +116,6 @@ export class PollQueueWorker extends BaseQueueWorker {
`[${message.job}] Job Failed... | ${error['message']}`,
true,
);
return;
}
}

Expand Down Expand Up @@ -186,6 +187,7 @@ export class PollQueueWorker extends BaseQueueWorker {
* @param job
*/
fetchMessage(job: DriverJob): InternalMessage {
return JSON.parse(job.getMessage());
const message = job.getMessage();
return Obj.isObj(message) ? message : JSON.parse(job.getMessage());
}
}
Loading