diff --git a/src/dispatch-queue.ts b/src/dispatch-queue.ts index 30fbe4a..65872cd 100644 --- a/src/dispatch-queue.ts +++ b/src/dispatch-queue.ts @@ -1,13 +1,14 @@ import { DispatchQueueEventMap } from "./events/events-map.ts"; import { DispatchQueueEvents } from "./events/events.ts"; -import { DispatchQueueStartupErrorEvent } from "./events/startup-error-event.ts"; +import { DispatchQueueRuntimeErrorEvent } from "./events/runtime-error-event.ts"; import { DispatchQueueWorkerErrorEvent } from "./events/worker-error-event.ts"; import { Queue } from "./queue.ts"; import { Worker } from "./worker.ts"; interface DispatchQueueOptions { - processor(value: T, workerId: string): Promise; + autoStart?: boolean; concurrentWorkers?: number; + processor(value: T, workerId: string): Promise; } /** @@ -20,7 +21,11 @@ export class DispatchQueue { private readonly _queue: Queue = new Queue(); private readonly _readyWorkerQueue: Queue> = new Queue>(); - constructor({ processor, concurrentWorkers = 2 }: DispatchQueueOptions) { + constructor({ + autoStart = true, + concurrentWorkers = 2, + processor, + }: DispatchQueueOptions) { for (let i = 0; i < concurrentWorkers; i++) { this._readyWorkerQueue.enque( new Worker(`worker-${i}`, { @@ -35,6 +40,10 @@ export class DispatchQueue { ); } + if (!autoStart) { + return; + } + this.startProcessing(); } @@ -64,8 +73,8 @@ export class DispatchQueue { * the DispatchQueue */ startProcessing() { - this.start().catch((error) => { - this._events.dispatchEvent(new DispatchQueueStartupErrorEvent(error)); + this.run().catch((error) => { + this._events.dispatchEvent(new DispatchQueueRuntimeErrorEvent(error)); }); } @@ -76,7 +85,7 @@ export class DispatchQueue { this._abortController?.abort(); } - private async start(): Promise { + private async run(): Promise { this._abortController = new AbortController(); while (!this._abortController.signal.aborted) { diff --git a/src/events/events-map.ts b/src/events/events-map.ts index 081c978..c8d356a 100644 --- a/src/events/events-map.ts +++ b/src/events/events-map.ts @@ -1,8 +1,8 @@ -import { DispatchQueueStartupErrorEvent } from "./startup-error-event.ts"; +import { DispatchQueueRuntimeErrorEvent } from "./runtime-error-event.ts"; import { DispatchQueueWorkerErrorEvent } from "./worker-error-event.ts"; import { DispatchQueueEvents } from "./events.ts"; export interface DispatchQueueEventMap { [DispatchQueueEvents.WorkerError]: DispatchQueueWorkerErrorEvent; - [DispatchQueueEvents.StartupError]: DispatchQueueStartupErrorEvent; + [DispatchQueueEvents.RuntimeError]: DispatchQueueRuntimeErrorEvent; } diff --git a/src/events/events.ts b/src/events/events.ts index 3cbc05d..1bca95b 100644 --- a/src/events/events.ts +++ b/src/events/events.ts @@ -1,4 +1,4 @@ export enum DispatchQueueEvents { WorkerError = "worker-error", - StartupError = "startup-error", + RuntimeError = "runtime-error", } diff --git a/src/events/startup-error-event.ts b/src/events/runtime-error-event.ts similarity index 62% rename from src/events/startup-error-event.ts rename to src/events/runtime-error-event.ts index bbf2504..a0dcb29 100644 --- a/src/events/startup-error-event.ts +++ b/src/events/runtime-error-event.ts @@ -1,10 +1,10 @@ import { DispatchQueueEvents } from "./events.ts"; -export class DispatchQueueStartupErrorEvent extends Event { +export class DispatchQueueRuntimeErrorEvent extends Event { constructor( public readonly error: unknown, public readonly message: string = "Startup exception", ) { - super(DispatchQueueEvents.StartupError); + super(DispatchQueueEvents.RuntimeError); } } diff --git a/tests/dispatch-queue.test.ts b/tests/dispatch-queue.test.ts index 5d805de..33830de 100644 --- a/tests/dispatch-queue.test.ts +++ b/tests/dispatch-queue.test.ts @@ -15,6 +15,14 @@ import { import { DispatchQueue } from "../src/dispatch-queue.ts"; import { DispatchQueueEvents } from "../src/events/events.ts"; import { DispatchQueueWorkerErrorEvent } from "../src/events/worker-error-event.ts"; +import { + assertSpyCalls, + returnsNext, + Stub, + stub, +} from "https://deno.land/std@0.158.0/testing/mock.ts"; +import { Queue } from "../src/queue.ts"; +import { DispatchQueueRuntimeErrorEvent } from "../src/events/runtime-error-event.ts"; describe("Dispatch", () => { let dispatcher: DispatchQueue; @@ -156,4 +164,85 @@ describe("Dispatch", () => { assertSpyCall(mockedProcessor, 0); }); }); + + describe("when removeEventListener()", () => { + let eventListener: Spy< + unknown, + [_evt: DispatchQueueWorkerErrorEvent], + void + >; + + beforeEach(() => { + mockedProcessor = spy((_value, _workerId) => + Promise.reject("this is an error") + ); + dispatcher = new DispatchQueue({ + processor: mockedProcessor, + }); + + eventListener = spy((_evt: DispatchQueueWorkerErrorEvent) => {}); + + dispatcher.addEventListener( + DispatchQueueEvents.WorkerError, + eventListener, + ); + }); + + it("does not handle event", async () => { + // Arrange + dispatcher.removeEventListener( + DispatchQueueEvents.WorkerError, + eventListener, + ); + + // Act + dispatcher.process("test1"); + await delay(10); + + // Assert + assertSpyCalls(eventListener, 0); + }); + }); + + describe("when unhandle runtime error occurs", () => { + let stubbedDequeue: Stub, [], Promise>; + + beforeEach(() => { + stubbedDequeue = stub( + Queue.prototype, + "deque", + returnsNext([Promise.reject()]), + ); + + mockedProcessor = spy((_value, _workerId) => Promise.resolve()); + + dispatcher = new DispatchQueue({ + processor: mockedProcessor, + }); + }); + + afterEach(() => { + stubbedDequeue.restore(); + }); + + it("it handles exception", async () => { + const deferredPromise = new Deferred(); + + const eventListener = spy((_evt: DispatchQueueRuntimeErrorEvent) => { + deferredPromise.resolve(); + }); + + dispatcher.addEventListener( + DispatchQueueEvents.RuntimeError, + eventListener, + ); + + dispatcher.process("test"); + dispatcher.startProcessing(); + + await deferredPromise; + + assertSpyCalls(eventListener, 2); + }); + }); });