Skip to content

Commit 50c3f17

Browse files
committed
feat: let TimeQueue#push() return a promise when no callback is given
BREAKING CHANGE: `error` event no longer emitted when a callback isn't given to `push()`. Instead, `push()` will return a promise that can reject.
1 parent c844876 commit 50c3f17

File tree

4 files changed

+50
-40
lines changed

4 files changed

+50
-40
lines changed

README.md

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ const worker = (arg1, arg2, callback) => {
1818
// create a queue with max 5 concurrency every second
1919
let q = new TimeQueue(worker, { concurrency: 5, every: 1000 });
2020

21-
// push tasks onto the queue
21+
// Push tasks onto the queue.
2222
q.push(42, 24);
2323
q.push(2, 74);
2424

25-
// optional callback when pushing tasks
26-
q.push(2, 2, (err) => {
25+
// Optional callback when pushing tasks.
26+
q.push(2, 2, (err, result) => {
2727
// task finished
2828
});
29+
30+
// Can use promise/await syntax instead.
31+
let result = await q.push(3, 5);
2932
```
3033

3134

@@ -40,17 +43,17 @@ Creates a new instance of a queue. Worker must be a function with a callback for
4043

4144
// How much time in milliseconds to allow no more than
4245
// the max number of concurrent tasks to run.
43-
// If the max amount of concurrent tasks are finished faster than the limit,
44-
// they will be queued.
46+
// If the max amount of concurrent tasks finish faster than this time limit,
47+
// additional tasks will wait until enough time has passed before starting.
4548
, every: 0
4649

4750
// Maximum number of tasks to keep in the queue.
4851
// While full, pushed tasks will be ignored.
4952
, maxQueued: Infinity
5053

51-
// If set, will emit an `error` event if a tasks takes too much time.
52-
// if callback was given to that task,
53-
// it will be called with the error instead.
54+
// If set, tasks will error if they take too much time.
55+
// if callback was given to that task, it will be called with the error,
56+
// otherwise, the returned promise should be `caught`.
5457
, timeout: 0
5558
}
5659
```
@@ -86,11 +89,6 @@ Empties queue and clears the timeouts TimeQueue sets to keep track of running ta
8689

8790
# Events
8891

89-
### Event: 'error'
90-
* `Error`
91-
92-
Emitted when there is an error processing a task and a callback isn't given to the `push` method.
93-
9492
### Event: 'full'
9593

9694
Queue is full.

lib/index.js

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,23 @@ module.exports = class TimeQueue extends EventEmitter {
4242
*
4343
* @param {Object} ...args
4444
* @param {Function(!Error, ...)} callback
45+
* @return {Promise?}
4546
*/
4647
push(...args) {
48+
// Returns a promise if no `callback` is given.
49+
if (args.length < this.worker.length) {
50+
return new Promise((resolve, reject) => {
51+
// Add any missing arguments.
52+
while (args.length < this.worker.length - 1) {
53+
args.push(undefined);
54+
}
55+
this.push(...args, (err, results) => {
56+
if (err) return reject(err);
57+
resolve(results);
58+
});
59+
});
60+
}
61+
4762
if (this.isFull()) {
4863
return;
4964
}
@@ -78,7 +93,7 @@ module.exports = class TimeQueue extends EventEmitter {
7893
* @param {Array.<Object>} args
7994
*/
8095
_process(args) {
81-
const callback = args.splice(this.worker.length - 1, 1)[0];
96+
const callback = args.pop();
8297
let finished = false;
8398
let every = ~~this.every;
8499
let timedOut;
@@ -104,7 +119,7 @@ module.exports = class TimeQueue extends EventEmitter {
104119
let timeout = ~~this.timeout;
105120
let tid;
106121

107-
const taskCallback = (err, ...args) => {
122+
const taskCallback = (err, result) => {
108123
// If this task has timed out, and the callback is called again
109124
// from the worker, ignore it.
110125
if (!taskTimedOut) {
@@ -118,19 +133,9 @@ module.exports = class TimeQueue extends EventEmitter {
118133
throw Error('Callback from worker should only be called once');
119134
}
120135
callbackCalled = true;
121-
122136
this.finished++;
123137
this.active--;
124-
125-
if (typeof callback === 'function') {
126-
// If a callback was given with the task,
127-
// call it when the task is finished.
128-
callback(err, ...args);
129-
130-
} else if (err) {
131-
// Otherwise emit an `error` event if there was an error with the task.
132-
this.emit('error', err);
133-
}
138+
callback(err, result);
134139

135140
finished = true;
136141
if (timedOut) {
@@ -147,11 +152,6 @@ module.exports = class TimeQueue extends EventEmitter {
147152
}, timeout);
148153
}
149154

150-
// Add missing arguments.
151-
while (args.length < this.worker.length - 1) {
152-
args.push(undefined);
153-
}
154-
155155
// Add custom callback to args.
156156
const args2 = args.slice();
157157
args2.push(taskCallback);

test/main-test.js

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const assert = require('assert');
44

55
describe('Create a queue and add to it', () => {
66
let full = false;
7-
const q = new TimeQueue(process.nextTick, { concurrency: 3 });
7+
const q = new TimeQueue(callback => process.nextTick(callback), { concurrency: 3 });
88

99
q.on('full', () => {
1010
full = true;
@@ -79,6 +79,16 @@ describe('Create a queue with variable number of arguments', () => {
7979
assert.equal(lastC, 'hello');
8080
});
8181

82+
it('Returns a promise that fulfills', async () => {
83+
const q = new TimeQueue((a, b, c, callback) => {
84+
process.nextTick(() => {
85+
callback(null, a + b + c);
86+
});
87+
}, { concurrency: 10, every: 1000 });
88+
let result = await q.push(1, 2, 3);
89+
assert.equal(result, 6);
90+
});
91+
8292
describe('Push with callback', () => {
8393
it('Calls callback when task finishes', (done) => {
8494
q.push(3, 2, 1, done);
@@ -125,12 +135,14 @@ describe('Create a queue with a worker that always errors', () => {
125135
});
126136
}, { concurrency: 10 });
127137

128-
it('Emits an `error` event', (done) => {
129-
q.push();
130-
q.once('error', (err) => {
138+
it('Trhows an error', async () => {
139+
try {
140+
await q.push();
141+
} catch (err) {
131142
assert.equal(err.message, 'gotcha');
132-
done();
133-
});
143+
return;
144+
}
145+
throw Error('should have thrown');
134146
});
135147

136148
describe('Push task with callback', () => {

test/timeout-test.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ describe('Create a queue with a timeout', () => {
2626
setTimeout(callback, 100);
2727
}, { timeout: 50 });
2828

29-
it('Should throw an error', (done) => {
30-
q.push(3, 4);
31-
q.on('error', (err) => {
29+
it('Should reject', (done) => {
30+
let p = q.push(3, 4);
31+
p.catch((err) => {
3232
assert(err);
3333
assert.equal(err.message, 'Task timed out');
3434
assert.equal(err.args[0], 3);

0 commit comments

Comments
 (0)