Skip to content

Commit 0eaa534

Browse files
committed
feat: allow worker to be an async function
1 parent 14d46e3 commit 0eaa534

File tree

3 files changed

+97
-10
lines changed

3 files changed

+97
-10
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ const worker = (arg1, arg2, callback) => {
1515
someAsyncFunction(calculation(arg1, arg2), callback);
1616
};
1717

18+
// Worker can be an async function.
19+
const worker = async (arg1) => {
20+
await anotherSyncFunction(arg1);
21+
return await andAnotherOne();
22+
};
23+
1824
// create a queue with max 5 concurrency every second
1925
let q = new TimeQueue(worker, { concurrency: 5, every: 1000 });
2026

lib/index.js

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ module.exports = class TimeQueue extends EventEmitter {
1414
super();
1515

1616
this.worker = worker;
17+
this._workerAsync = worker.constructor.name == 'AsyncFunction';
1718
options = options || {};
1819
this.concurrency = options.concurrency || 1;
1920
this.every = options.every || 0;
@@ -45,12 +46,15 @@ module.exports = class TimeQueue extends EventEmitter {
4546
* @return {Promise?}
4647
*/
4748
push(...args) {
48-
// Returns a promise if no `callback` is given.
49-
if (args.length < this.worker.length) {
49+
// Returns a promise no `callback` is given.
50+
if (this._workerAsync && args.length === this.worker.length ||
51+
!this._workerAsync && args.length < this.worker.length) {
5052
return new Promise((resolve, reject) => {
5153
// Add any missing arguments.
52-
while (args.length < this.worker.length - 1) {
53-
args.push(undefined);
54+
if (!this._workerAsync) {
55+
while (args.length < this.worker.length - 1) {
56+
args.push(undefined);
57+
}
5458
}
5559
this.push(...args, (err, results) => {
5660
if (err) return reject(err);
@@ -92,7 +96,7 @@ module.exports = class TimeQueue extends EventEmitter {
9296
*
9397
* @param {Array.<Object>} args
9498
*/
95-
_process(args) {
99+
async _process(args) {
96100
const callback = args.pop();
97101
let finished = false;
98102
let every = ~~this.every;
@@ -152,12 +156,19 @@ module.exports = class TimeQueue extends EventEmitter {
152156
}, timeout);
153157
}
154158

155-
// Add custom callback to args.
156-
const args2 = args.slice();
157-
args2.push(taskCallback);
158-
159159
// Call the worker.
160-
this.worker(...args2);
160+
if (this._workerAsync) {
161+
try {
162+
taskCallback(null, await this.worker(...args));
163+
} catch (err) {
164+
taskCallback(err);
165+
}
166+
} else {
167+
// Add custom callback to args.
168+
const args2 = args.slice();
169+
args2.push(taskCallback);
170+
this.worker(...args2);
171+
}
161172
}
162173

163174

test/main-test.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,76 @@ describe('Create a queue and add to it', () => {
5353
assert.equal(q.timeout, 0);
5454
});
5555
});
56+
57+
describe('with an async worker', () => {
58+
it('Still respects concurrency', (done) => {
59+
const q = new TimeQueue(async (a, b) => {
60+
return await new Promise((resolve) => {
61+
process.nextTick(() => resolve(a + b));
62+
});
63+
}, { concurrency: 2 });
64+
let result1, result2, result3;
65+
q.push(1, 2, (err, result) => {
66+
assert.ifError(err);
67+
result1 = result;
68+
});
69+
q.push(3, 4, (err, result) => {
70+
assert.ifError(err);
71+
result2 = result;
72+
});
73+
q.push(5, 6, (err, result) => {
74+
assert.ifError(err);
75+
result3 = result;
76+
});
77+
assert.equal(q.active, 2);
78+
q.on('drain', () => {
79+
assert.equal(result1, 3);
80+
assert.equal(result2, 7);
81+
assert.equal(result3, 11);
82+
done();
83+
});
84+
});
85+
it('Pushing to worker can be awaited', async () => {
86+
const q = new TimeQueue(async (a, b) => {
87+
return new Promise((resolve) => {
88+
process.nextTick(() => resolve(a + b));
89+
});
90+
}, { concurrency: 1 });
91+
let result1 = await q.push(1, 2);
92+
let result2 = await q.push(3, 4);
93+
assert.equal(result1, 3);
94+
assert.equal(result2, 7);
95+
});
96+
describe('that errors', () => {
97+
it('Calls callback with error', (done) => {
98+
const q = new TimeQueue(async (a) => {
99+
return new Promise((resolve, reject) => {
100+
process.nextTick(() => reject(Error('no: ' + a)));
101+
});
102+
});
103+
q.push('one', (err) => {
104+
assert.ok(err);
105+
assert.equal(err.message, 'no: one');
106+
done();
107+
});
108+
});
109+
it('Throws when awaited', async () => {
110+
const q = new TimeQueue(async (a) => {
111+
return new Promise((resolve, reject) => {
112+
process.nextTick(() => reject(Error('no: ' + a)));
113+
});
114+
});
115+
try {
116+
await q.push('okay');
117+
} catch (err) {
118+
assert.ok(err);
119+
assert.equal(err.message, 'no: okay');
120+
return;
121+
}
122+
throw Error('shoult not get here');
123+
});
124+
});
125+
});
56126
});
57127

58128

0 commit comments

Comments
 (0)