From e21ad8ddcee4e1720b6e47b9c60746db36bb16cb Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Sat, 17 Apr 2021 21:16:46 +0300 Subject: [PATCH 1/5] stream: add a non-destroying iterator to Readable add a non-destroying iterator to Readable fixes: https://github.com/nodejs/node/issues/38491 --- doc/api/stream.md | 58 +++++++++- lib/internal/streams/readable.js | 36 +++++-- .../test-stream-readable-async-iterators.js | 102 ++++++++++++++++++ 3 files changed, 186 insertions(+), 10 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 4e35340c04812d..dab0f81a65037b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1506,13 +1506,67 @@ async function print(readable) { print(fs.createReadStream('file')).catch(console.error); ``` -If the loop terminates with a `break` or a `throw`, the stream will be -destroyed. In other terms, iterating over a stream will consume the stream +If the loop terminates with a `break`, `return` or a `throw`, the stream will +be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the `highWaterMark` option. In the code example above, data will be in a single chunk if the file has less then 64KB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable.iterator([options])` + + +* `options` {Object} + * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the + async iterator, or exiting a `for await...of` iteration using a `break`, + `return` or `throw` will not destroy the stream. **Default:** `true`. + * `destroyOnError` {boolean} When set to `false`, if the stream emits an + error while it's being iterated, the iterator will not destroy the stream. + **Default:** `true`. +* Returns: {AsyncIterator} to consume the stream. + +The iterator created by this method gives users the option to cancel the +destruction the stream if the `for await...of` loop is exited by `return`, +`break` or `throw`, or if the iterator should destroy the stream if the stream +emitted an error during iteration. + +```js +const { Readable } = require('stream'); + +async function printIterator(readable) { + for await (const chunk of readable.iterator({ destroyOnReturn: false })) { + console.log(chunk); // 1 + break; + } + + console.log(readable.destroyed); // false + + for await (const chunk of readable.iterator({ destroyOnReturn: false })) { + console.log(chunk); // Will print 2 and then 3 + } + + console.log(readable.destroyed); // true, stream was totally consumed +} + +async function printSymbolAsyncIterator(readable) { + for await (const chunk of readable) { + console.log(chunk); // 1 + break; + } + + console.log(readable.destroyed); // true +} + +async function showBoth() { + await printIterator(Readable.from([1, 2, 3])); + await printSymbolAsyncIterator(Readable.from([1, 2, 3])); +} + +showBoth(); +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index aa0f5f94886427..3858a1e8899aa4 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1062,8 +1062,17 @@ Readable.prototype.wrap = function(stream) { }; Readable.prototype[SymbolAsyncIterator] = function() { - let stream = this; + return streamToAsyncIterator(this); +}; + +Readable.prototype.iterator = function (options = { + destroyOnReturn: true, + destroyOnError: true, +}) { + return streamToAsyncIterator(this, options); +}; +function streamToAsyncIterator(stream, options) { if (typeof stream.read !== 'function') { // v1 stream const src = stream; @@ -1076,14 +1085,19 @@ Readable.prototype[SymbolAsyncIterator] = function() { }).wrap(src); } - const iter = createAsyncIterator(stream); + const iter = createAsyncIterator(stream, options); iter.stream = stream; return iter; -}; +} -async function* createAsyncIterator(stream) { +async function* createAsyncIterator(stream, options) { let callback = nop; + const { + destroyOnReturn = true, + destroyOnError = true, + } = (options ?? {}); + function next(resolve) { if (this === stream) { callback(); @@ -1116,6 +1130,7 @@ async function* createAsyncIterator(stream) { next.call(this); }); + let errorThrown = false; try { while (true) { const chunk = stream.destroyed ? null : stream.read(); @@ -1132,12 +1147,17 @@ async function* createAsyncIterator(stream) { } } } catch (err) { - destroyImpl.destroyer(stream, err); + if (destroyOnError) { + destroyImpl.destroyer(stream, err); + } + errorThrown = true; throw err; } finally { - if (state.autoDestroy || !endEmitted) { - // TODO(ronag): ERR_PREMATURE_CLOSE? - destroyImpl.destroyer(stream, null); + if (!errorThrown && destroyOnReturn) { + if (state.autoDestroy || !endEmitted) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + destroyImpl.destroyer(stream, null); + } } } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 7c457fdc3da24b..e49f4e86543057 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -693,6 +693,108 @@ async function tests() { }); } +// AsyncIterator non-destroying iterator +{ + function createReadable() { + return Readable.from((async function* () { + await Promise.resolve(); + yield 5; + await Promise.resolve(); + yield 7; + await Promise.resolve(); + })()); + } + + function createErrorReadable() { + const opts = { read() { throw new Error('inner'); } }; + return new Readable(opts); + } + + // Check default destroys on return + (async function() { + const readable = createReadable(); + for await (const chunk of readable.iterator()) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit destroying on return + (async function() { + const readable = createReadable(); + for await (const chunk of readable.iterator({ destroyOnReturn: true })) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check default destroys on error + (async function() { + const readable = createErrorReadable(); + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit destroys on error + (async function() { + const readable = createErrorReadable(); + const opts = { destroyOnError: true, destroyOnReturn: false }; + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable.iterator(opts)) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit non-destroy with return true + (async function() { + const readable = createErrorReadable(); + const opts = { destroyOnError: false, destroyOnReturn: true }; + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable.iterator(opts)) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(!readable.destroyed); + })().then(common.mustCall()); + + // Check explicit non-destroy with return true + (async function() { + const readable = createReadable(); + const opts = { destroyOnReturn: false }; + for await (const chunk of readable.iterator(opts)) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(!readable.destroyed); + + for await (const chunk of readable.iterator(opts)) { + assert.strictEqual(chunk, 7); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); +} + { let _req; const server = http.createServer((request, response) => { From 87f53168582bf4d3f7e4279d8aaede47737fce68 Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Tue, 4 May 2021 03:08:21 +0300 Subject: [PATCH 2/5] fixup! stream: add a non-destroying iterator to Readable --- doc/api/stream.md | 2 +- lib/internal/streams/readable.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index dab0f81a65037b..d016567421d7b6 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1547,7 +1547,7 @@ async function printIterator(readable) { console.log(chunk); // Will print 2 and then 3 } - console.log(readable.destroyed); // true, stream was totally consumed + console.log(readable.destroyed); // True, stream was totally consumed } async function printSymbolAsyncIterator(readable) { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3858a1e8899aa4..9f7fa2828af4ae 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1065,7 +1065,7 @@ Readable.prototype[SymbolAsyncIterator] = function() { return streamToAsyncIterator(this); }; -Readable.prototype.iterator = function (options = { +Readable.prototype.iterator = function(options = { destroyOnReturn: true, destroyOnError: true, }) { From 468f9ea6dc61105a6da41fda0925b09d76a5d68e Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Tue, 4 May 2021 21:35:57 +0300 Subject: [PATCH 3/5] fixup! fixup! stream: add a non-destroying iterator to Readable --- doc/api/stream.md | 2 +- lib/internal/streams/readable.js | 22 ++++++++++--------- .../test-stream-readable-async-iterators.js | 14 ++++++++++++ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index d016567421d7b6..7ac25bcd7c79df 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1528,7 +1528,7 @@ added: REPLACEME * Returns: {AsyncIterator} to consume the stream. The iterator created by this method gives users the option to cancel the -destruction the stream if the `for await...of` loop is exited by `return`, +destruction of the stream if the `for await...of` loop is exited by `return`, `break` or `throw`, or if the iterator should destroy the stream if the stream emitted an error during iteration. diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 9f7fa2828af4ae..2a818db22dfb1d 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -62,6 +62,7 @@ const { ERR_METHOD_NOT_IMPLEMENTED, ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); @@ -1065,10 +1066,10 @@ Readable.prototype[SymbolAsyncIterator] = function() { return streamToAsyncIterator(this); }; -Readable.prototype.iterator = function(options = { - destroyOnReturn: true, - destroyOnError: true, -}) { +Readable.prototype.iterator = function(options) { + if (options !== undefined) { + validateObject(options, 'options'); + } return streamToAsyncIterator(this, options); }; @@ -1093,10 +1094,11 @@ function streamToAsyncIterator(stream, options) { async function* createAsyncIterator(stream, options) { let callback = nop; - const { - destroyOnReturn = true, - destroyOnError = true, - } = (options ?? {}); + const opts = { + destroyOnReturn: true, + destroyOnError: true, + ...options, + }; function next(resolve) { if (this === stream) { @@ -1147,13 +1149,13 @@ async function* createAsyncIterator(stream, options) { } } } catch (err) { - if (destroyOnError) { + if (opts.destroyOnError) { destroyImpl.destroyer(stream, err); } errorThrown = true; throw err; } finally { - if (!errorThrown && destroyOnReturn) { + if (!errorThrown && opts.destroyOnReturn) { if (state.autoDestroy || !endEmitted) { // TODO(ronag): ERR_PREMATURE_CLOSE? destroyImpl.destroyer(stream, null); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index e49f4e86543057..a497317565fb4c 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -793,6 +793,20 @@ async function tests() { assert.ok(readable.destroyed); })().then(common.mustCall()); + + // Check non-object options. + { + const readable = createReadable(); + assert.throws( + () => readable.iterator(42), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + message: 'The "options" argument must be of type object. Received ' + + 'type number (42)', + } + ); + } } { From 4bb108b92379a1738c7f3bf41930021e8481119a Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Wed, 12 May 2021 18:40:24 +0300 Subject: [PATCH 4/5] fixup! fixup! fixup! stream: add a non-destroying iterator to Readable --- doc/api/stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 7ac25bcd7c79df..382f428e27820a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1506,7 +1506,7 @@ async function print(readable) { print(fs.createReadStream('file')).catch(console.error); ``` -If the loop terminates with a `break`, `return` or a `throw`, the stream will +If the loop terminates with a `break`, `return`, or a `throw`, the stream will be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the `highWaterMark` option. In the code example above, data will be in a single chunk if the file @@ -1521,7 +1521,7 @@ added: REPLACEME * `options` {Object} * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the async iterator, or exiting a `for await...of` iteration using a `break`, - `return` or `throw` will not destroy the stream. **Default:** `true`. + `return`, or `throw` will not destroy the stream. **Default:** `true`. * `destroyOnError` {boolean} When set to `false`, if the stream emits an error while it's being iterated, the iterator will not destroy the stream. **Default:** `true`. @@ -1529,7 +1529,7 @@ added: REPLACEME The iterator created by this method gives users the option to cancel the destruction of the stream if the `for await...of` loop is exited by `return`, -`break` or `throw`, or if the iterator should destroy the stream if the stream +`break`, or `throw`, or if the iterator should destroy the stream if the stream emitted an error during iteration. ```js From f939af2fb5b8000309dd38c79ed6580119b4c041 Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Sun, 23 May 2021 00:18:04 +0300 Subject: [PATCH 5/5] fixup! fixup! fixup! fixup! stream: add a non-destroying iterator to Readable --- doc/api/stream.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 382f428e27820a..988790a6f097b8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1518,6 +1518,8 @@ has less then 64KB of data because no `highWaterMark` option is provided to added: REPLACEME --> +> Stability: 1 - Experimental + * `options` {Object} * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the async iterator, or exiting a `for await...of` iteration using a `break`,