Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ It ensures that the number of active requests does not exceed a specified limit,
* [.setConcurrent(concurrent)](#FetchQueue+setConcurrent)
* [.getDebug()](#FetchQueue+getDebug) ⇒
* [.setDebug(debug)](#FetchQueue+setDebug)
* [.emptyQueue(urlPattern)](#FetchQueue+emptyQueue)
* [.pauseQueue()](#FetchQueue+pauseQueue) ⇒ <code>void</code>
* [.startQueue()](#FetchQueue+startQueue) ⇒ <code>void</code>
* [.getQueueLength()](#FetchQueue+getQueueLength) ⇒
* [.getActiveRequests()](#FetchQueue+getActiveRequests) ⇒

<a name="new_FetchQueue_new"></a>

Expand Down Expand Up @@ -65,8 +69,36 @@ If no options are provided, the default concurrent value is set to 3.</p>
| --- | --- |
| debug | <code>boolean</code> |

<a name="FetchQueue+emptyQueue"></a>

### fetchQueue.emptyQueue(urlPattern)
<p>Empties the queue of fetch requests.</p>

**Kind**: instance method of [<code>FetchQueue</code>](#FetchQueue)

| Param | Description |
| --- | --- |
| urlPattern | <p>Optional regular expression to match against the URLs in the queue. If provided, only the requests with URLs that match the pattern will be aborted. If not provided, all requests in the queue will be aborted.</p> |

<a name="FetchQueue+pauseQueue"></a>

### fetchQueue.pauseQueue() ⇒ <code>void</code>
<p>Disables the queuing of fetch requests in the FetchQueue.</p>

**Kind**: instance method of [<code>FetchQueue</code>](#FetchQueue)
<a name="FetchQueue+startQueue"></a>

### fetchQueue.startQueue() ⇒ <code>void</code>
<p>Enables the queuing of fetch requests in the FetchQueue.</p>

**Kind**: instance method of [<code>FetchQueue</code>](#FetchQueue)
<a name="FetchQueue+getQueueLength"></a>

### fetchQueue.getQueueLength() ⇒
**Kind**: instance method of [<code>FetchQueue</code>](#FetchQueue)
**Returns**: <p>Length of queue</p>
<a name="FetchQueue+getActiveRequests"></a>

### fetchQueue.getActiveRequests() ⇒
**Kind**: instance method of [<code>FetchQueue</code>](#FetchQueue)
**Returns**: <p>Number of active requests</p>
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,18 @@ console.log(fetchedQueue.getConcurrent()); // output: 5
fetchQueue.setDebug(true);
console.log(fetchedQueue.getDebug()); // output: true
```

```js
// start and pause queue
const fetchQueue = new FetchQueue(concurrent: 3, pauseQueueOnInit: true);
const customFetch = fetchQueue.getFetchMethod();

// ...some calls
fetchQueue.emptyQueue();
fetchQueue.startQueue();

// ...some calls
fetchQueue.pauseQueue();
```

Note: See DOCUMENTATION.md for more information on methods.
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export default {
preset: "ts-jest",
testEnvironment: "node",
fakeTimers: { enableGlobally: true },
transform: {
"^.+\\.(ts|tsx)$": "ts-jest",
},
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fundwave/fetchq",
"version": "1.1.1",
"version": "1.1.2-disable-queue.4",
"description": "Queue for fetch requests",
"main": "dist/esm/index.js",
"types": "dist/esm/index.d.ts",
Expand All @@ -13,7 +13,8 @@
"build:esm": "tsc --module NodeNext --outDir ./dist/esm",
"build": "tsc --module commonjs --outDir ./dist/cjs",
"prepare": "npm run build && npm run build:esm",
"test": "jest",
"pretest": "jest --clearCache",
"test": "jest --detectOpenHandles",
"lint": "npx eslint src/index.ts",
"docs": "npx jsdoc-to-markdown ./src/index.ts --configure ./jsdoc2md.json > DOCUMENTATION.md"
},
Expand Down
176 changes: 109 additions & 67 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import fetch from "node-fetch";
import { RequestInfo, RequestInit, Response } from "node-fetch";
import fetch, { RequestInfo, RequestInit, Response } from "node-fetch";
import { FetchQueueConfig } from "./interfaces/index.js";
/**
* The `FetchQueue` class is a utility class that allows for managing and controlling concurrent fetch requests.
Expand All @@ -9,46 +8,52 @@ export class FetchQueue {
/**
* The maximum number of concurrent fetch requests allowed.
*/
private _concurrent: number;
#concurrent: number;

/**
* Indicates whether debugging is enabled or not.
*/
private _debug: boolean;
#debug: boolean;

/**
* An array of strings representing the URLs in the queue.
*/
private _urlsQueued: Array<string>;
#urlsQueued: Array<{ url: string; controller: AbortController }>;

/**
* An array of strings representing the URLs executing.
*/
private _urlsExecuting: Array<string>;
#urlsExecuting: Set<string>;

/**
* The current number of active fetch requests.
*/
private _activeRequests: number;
#activeRequests: number;
/**
* A queue of tasks to be executed when a slot becomes available for a new fetch request.
*/
private _queue: Array<() => void>;
#queue: Array<() => void>;

/**
* If true, Disables task executions but {@link #queue} gets populated.
*/
#pauseQueue: boolean;

/**
* Initializes a new instance of the FetchQueue class with an optional FetchQueueConfig object.
* If no options are provided, the default concurrent value is set to 3.
* @param {FetchQueueConfig} options - The FetchQueueConfig object containing the concurrent value.
*/
constructor(options?: FetchQueueConfig) {
this._concurrent = options?.concurrent || 3;
this._debug = options?.debug || false;
this._activeRequests = 0;
this._queue = [];
this._urlsQueued = [];
this._urlsExecuting = [];

if (typeof this._concurrent !== "number" || this._concurrent <= 0) {
this.#concurrent = options?.concurrent || 3;
this.#debug = options?.debug || false;
this.#activeRequests = 0;
this.#queue = [];
this.#urlsQueued = [];
this.#urlsExecuting = new Set<string>();
this.#pauseQueue = options?.pauseQueueOnInit || false;

if (typeof this.#concurrent !== "number" || this.#concurrent <= 0) {
throw new Error("Concurrent should be a number greater than zero.");
}
}
Expand All @@ -61,40 +66,40 @@ export class FetchQueue {
* @param options - The options for the fetch request.
* @returns A Promise that resolves to the fetch response.
*/
private _run = async (
url: URL | RequestInfo,
options?: RequestInit
): Promise<Response> => {
this._activeRequests++;
#run = async (url: URL | RequestInfo, options?: RequestInit, controller?: AbortController): Promise<Response> => {
try {
if (this._debug) {
this._urlsExecuting.push(url.toString());
console.log("executing", this._urlsExecuting);
if (this.#debug) {
this.#urlsExecuting.add(url.toString());
console.log("executing", this.#urlsExecuting);
}
const response: Response = await fetch(url, options);
if (this._debug) {
const index = this._urlsExecuting.indexOf(url.toString());
this._urlsExecuting.splice(index, 1);

if (!!controller && controller.signal.aborted) {
if (this.#debug) this.#urlsExecuting.delete(url.toString());
throw new Error("Aborted");
}

this.#activeRequests++;
const response: Response = await fetch(url, options);

if (this.#debug) this.#urlsExecuting.delete(url.toString());

return response;
} finally {
this._activeRequests--;
this._emitRequestCompletedEvent();
this.#activeRequests--;
this.#emitRequestCompletedEvent();
}
};

/**
* Executes the next task in the queue when a fetch request is completed.
*/
private _emitRequestCompletedEvent = (): void => {
if (this._debug) {
if (this._urlsQueued.length > 0) {
console.log("queue", this._urlsQueued);
this._urlsQueued.shift();
}
}
if (this._queue.length <= 0) return;
const nextTask = this._queue.shift();
#emitRequestCompletedEvent = (): void => {
if (this.#debug) console.log("queue", this.#urlsQueued);

if (this.#queue.length <= 0 || this.#pauseQueue) return;

this.#urlsQueued.shift();
const nextTask = this.#queue.shift();
nextTask!();
};

Expand All @@ -103,71 +108,108 @@ export class FetchQueue {
* @returns The custom fetch function.
*/
public getFetchMethod() {
return this._f_fetch;
return this.#f_fetch;
}

/**
* @returns value of concurrent property
*/
public getConcurrent() {
return this._concurrent;
return this.#concurrent;
}

/**
* set value of concurrent property
* @param {number} concurrent
*/
public setConcurrent(concurrent: number) {
this._concurrent = concurrent;
this.#concurrent = concurrent;
}

/**
* @returns value of debug property
*/
public getDebug() {
return this._debug;
return this.#debug;
}

/**
* set value of debug property
* @param {boolean} debug
*/
public setDebug(debug: boolean) {
this._debug = debug;
this.#debug = debug;
}

/**
* Empties the queue of fetch requests.
*
* @param urlPattern - Optional regular expression to match against the URLs in the queue.
* If provided, only the requests with URLs that match the pattern will be aborted.
* If not provided, all requests in the queue will be aborted.
*/
public emptyQueue(urlPattern?: RegExp) {
this.#urlsQueued.forEach((request) => {
if (!urlPattern) request.controller.abort();
else if (!!urlPattern && request.url.match(urlPattern)) request.controller.abort();
});

this.#queue.forEach((task) => task());

this.#urlsQueued = [];
this.#queue = [];
}

/**
* Disables the queuing of fetch requests in the FetchQueue.
* @returns {void}
*/
public pauseQueue(): void {
this.#pauseQueue = true;
this.#activeRequests = 0;
}

/**
* Enables the queuing of fetch requests in the FetchQueue.
* @returns {void}
*/
public startQueue(): void {
this.#pauseQueue = false;
this.#emitRequestCompletedEvent();
}

/**
* @returns Length of queue
*/
public getQueueLength() {
return this._queue.length;
public getQueueLength(): number {
return this.#queue.length;
}

/**
* @returns Number of active requests
*/
public getActiveRequests(): number {
return this.#activeRequests;
}

/**
* The internal fetch implementation that handles queuing of fetch requests.
*/
private _f_fetch = (() => {
return (
url: RequestInfo | URL,
options?: RequestInit
): Promise<Response> => {
const task = () => this._run(url, options);

if (this._activeRequests < this._concurrent) {
return task();
} else {
return new Promise((resolve, reject) => {
const queueTask = () => {
task().then(resolve).catch(reject);
};
this._queue.push(queueTask);
if (this._debug) {
this._urlsQueued.push(
url.toString().split("/").slice(-3).join("/")
);
}
});
#f_fetch = (() => {
return (url: RequestInfo | URL, options?: RequestInit): Promise<Response> => {
const controller = new AbortController();
const executeFetchRequest = (controller: AbortController) => this.#run(url, options, controller);

if (this.#activeRequests < this.#concurrent && !this.#pauseQueue) {
return executeFetchRequest(controller);
}
return new Promise((resolve, reject) => {
const queueTask = () => {
executeFetchRequest(controller).then(resolve).catch(reject);
};
this.#queue.push(queueTask);
this.#urlsQueued.push({ url: url.toString(), controller });
});
};
})();
}
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export type FetchQueueConfig = {
concurrent: number;
pauseQueueOnInit?: boolean
debug?: boolean;
};
Loading