diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 13cbce1005ece1..8bdb07ba84e61a 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -80,7 +80,9 @@ function map(fn, options) { concurrency = MathFloor(options.concurrency); } - validateInteger(concurrency, 'concurrency', 1); + if (concurrency !== Infinity) { + validateInteger(concurrency, 'concurrency', 1); + } return async function* map() { const ac = new AbortController(); diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index ba0571fe3a7b95..9d793c94bd63e3 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -158,6 +158,31 @@ const { setTimeout } = require('timers/promises'); }); } +{ + // Allow Infinite concurrency + let resolve; + const promise = new Promise((res) => { + resolve = res; + }); + const stream = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + if (item === 10) { + resolve(); + } else { + await promise; + } + + return item; + }, { concurrency: Infinity }); + + (async () => { + const expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + { // Concurrency result order const stream = Readable.from([1, 2]).map(async (item, { signal }) => {