From ff564fd459b779513bb23b3c844bb7dc973e12af Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 00:34:10 +0200 Subject: [PATCH 01/16] stream: allow stream to stay open after take fix #46980 --- lib/internal/streams/operators.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 9841723622418b..43f24a705e50e5 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -397,13 +397,16 @@ function take(number, options = undefined) { if (options?.signal != null) { validateAbortSignal(options.signal, 'options.signal'); } + if (options?.destroyStream != null) { + validateBoolean(options.destroyStream, 'options.destroyStream'); + } number = toIntegerOrInfinity(number); return async function* take() { if (options?.signal?.aborted) { throw new AbortError(); } - for await (const val of this) { + for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { if (options?.signal?.aborted) { throw new AbortError(); } From abce0a7387581682ba40761bc7e770272ea14a55 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 08:34:58 +0200 Subject: [PATCH 02/16] stream: close stream on error --- lib/internal/streams/operators.js | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 43f24a705e50e5..ee98ef1c65978e 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -406,15 +406,21 @@ function take(number, options = undefined) { if (options?.signal?.aborted) { throw new AbortError(); } - for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { - if (options?.signal?.aborted) { - throw new AbortError(); - } - if (number-- > 0) { - yield val; - } else { - return; + try { + for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (number-- > 0) { + yield val; + } else { + return; + } } + } catch (err) { + this.destroy(err); + + throw err; } }.call(this); } From df52ac12e9217ac39e1893e2d57ac214905e6d84 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 08:41:38 +0200 Subject: [PATCH 03/16] stream: remove unnecessary code --- lib/internal/streams/operators.js | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index ee98ef1c65978e..43f24a705e50e5 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -406,21 +406,15 @@ function take(number, options = undefined) { if (options?.signal?.aborted) { throw new AbortError(); } - try { - for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { - if (options?.signal?.aborted) { - throw new AbortError(); - } - if (number-- > 0) { - yield val; - } else { - return; - } + for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (number-- > 0) { + yield val; + } else { + return; } - } catch (err) { - this.destroy(err); - - throw err; } }.call(this); } From 2f3846082e55f1ad028e38258eb8bc67e11d7e27 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 08:45:50 +0200 Subject: [PATCH 04/16] stream: add import --- lib/internal/streams/operators.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 43f24a705e50e5..e93b95d348c54e 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -15,6 +15,7 @@ const { validateAbortSignal, validateInteger, validateObject, + validateBoolean, } = require('internal/validators'); const { kWeakHandler } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); From d52c8ab326e6db1edcd062cb918368b8dd976d7f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 10:55:39 +0200 Subject: [PATCH 05/16] stream: fix when options is missing --- lib/internal/streams/operators.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index e93b95d348c54e..26a404fb44386b 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -407,7 +407,7 @@ function take(number, options = undefined) { if (options?.signal?.aborted) { throw new AbortError(); } - for await (const val of this.iterator({ destroyOnReturn: options.destroyStream ?? true })) { + for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) { if (options?.signal?.aborted) { throw new AbortError(); } From c8a4849cc315418603e094aaf2b64b55b81a82b4 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 14:58:50 +0200 Subject: [PATCH 06/16] stream: added tests --- test/parallel/test-stream-drop-take.js | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index cb55a4f7ee1813..31aeeb8afed7fb 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -5,6 +5,7 @@ const { Readable, } = require('stream'); const { deepStrictEqual, rejects, throws } = require('assert'); +const assert = require('assert'); const { from } = Readable; @@ -100,3 +101,35 @@ const naturals = () => from(async function*() { throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/); throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); } + +{ + (async () => { + const streamShouldCloseWithoutOption = from([1, 2, 3, 4, 5]); + + // Close stream by default + await streamShouldCloseWithoutOption.take(2).toArray(); + assert.strictEqual(streamShouldCloseWithoutOption.destroyed, true); + })().then(common.mustCall()); +} + +{ + (async () => { + const streamShouldCloseWithOption = from([1, 2, 3, 4, 5]); + + await streamShouldCloseWithOption.take(2, { destroyStream: true }).toArray(); + assert.strictEqual(streamShouldCloseWithOption.destroyed, true); + })().then(common.mustCall()); +} + +{ + (async () => { + const streamShouldNotClose = from([1, 2, 3, 4, 5]); + + // Do not close stream + await streamShouldNotClose.take(2, { destroyStream: false }).toArray(); + assert.strictEqual(streamShouldNotClose.destroyed, false); + + deepStrictEqual(await streamShouldNotClose.toArray(), [3, 4, 5]); + assert.strictEqual(streamShouldNotClose.destroyed, true); + })().then(common.mustCall()); +} From 8af907f82ee2fc1ba002778b4d92a2a963f8fe04 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:24:33 +0200 Subject: [PATCH 07/16] stream: fail in stream will close the stream --- lib/internal/streams/operators.js | 22 +++++++++++++++------- test/parallel/test-stream-drop-take.js | 17 +++++++++++++++++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 26a404fb44386b..99c3ffe8a4eb65 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -407,15 +407,23 @@ function take(number, options = undefined) { if (options?.signal?.aborted) { throw new AbortError(); } - for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) { - if (options?.signal?.aborted) { - throw new AbortError(); + try { + for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) { + if (options?.signal?.aborted) { + throw new AbortError(); + } + if (number-- > 0) { + yield val; + } else { + return; + } } - if (number-- > 0) { - yield val; - } else { - return; + } catch (e) { + if(!this.destroyed) { + this.destroy(e); } + + throw e; } }.call(this); } diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index 31aeeb8afed7fb..5a7f9a56c47ced 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -133,3 +133,20 @@ const naturals = () => from(async function*() { assert.strictEqual(streamShouldNotClose.destroyed, true); })().then(common.mustCall()); } + +{ + const errorToThrow = new Error('should close'); + + const streamShouldNotClose = from((function *() { + yield 1; + throw errorToThrow; + })()); + + streamShouldNotClose.take(3, { destroyStream: false }) + .toArray() + .then(common.mustNotCall()) + .catch(common.mustCall((error) => { + assert.strictEqual(streamShouldNotClose.destroyed, true); + assert.strictEqual(error, errorToThrow); + })) +} From 444cf4085177d02ddee392427a42a26ff5126688 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:31:55 +0200 Subject: [PATCH 08/16] stream: fix taking item from next iteration --- lib/internal/streams/operators.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 99c3ffe8a4eb65..e8b4aa01878e9e 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -414,6 +414,11 @@ function take(number, options = undefined) { } if (number-- > 0) { yield val; + + // Don't get another item from iterator in case we reached the end + if(number === 0) { + return; + } } else { return; } From 6e09304a42a981b8165beb27832a11a03015bbfd Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:34:54 +0200 Subject: [PATCH 09/16] stream: format --- lib/internal/streams/operators.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index e8b4aa01878e9e..5d9155d3b33478 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -416,7 +416,7 @@ function take(number, options = undefined) { yield val; // Don't get another item from iterator in case we reached the end - if(number === 0) { + if (number === 0) { return; } } else { @@ -424,7 +424,7 @@ function take(number, options = undefined) { } } } catch (e) { - if(!this.destroyed) { + if (!this.destroyed) { this.destroy(e); } From 25424019e45e40f14e34c45fd902badba3128e48 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:37:14 +0200 Subject: [PATCH 10/16] stream: use destructure --- test/parallel/test-stream-drop-take.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index 5a7f9a56c47ced..3f7b90eac9f5cd 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -4,8 +4,7 @@ const common = require('../common'); const { Readable, } = require('stream'); -const { deepStrictEqual, rejects, throws } = require('assert'); -const assert = require('assert'); +const { deepStrictEqual, rejects, throws, strictEqual } = require('assert'); const { from } = Readable; @@ -108,7 +107,7 @@ const naturals = () => from(async function*() { // Close stream by default await streamShouldCloseWithoutOption.take(2).toArray(); - assert.strictEqual(streamShouldCloseWithoutOption.destroyed, true); + strictEqual(streamShouldCloseWithoutOption.destroyed, true); })().then(common.mustCall()); } @@ -117,7 +116,7 @@ const naturals = () => from(async function*() { const streamShouldCloseWithOption = from([1, 2, 3, 4, 5]); await streamShouldCloseWithOption.take(2, { destroyStream: true }).toArray(); - assert.strictEqual(streamShouldCloseWithOption.destroyed, true); + strictEqual(streamShouldCloseWithOption.destroyed, true); })().then(common.mustCall()); } @@ -127,10 +126,10 @@ const naturals = () => from(async function*() { // Do not close stream await streamShouldNotClose.take(2, { destroyStream: false }).toArray(); - assert.strictEqual(streamShouldNotClose.destroyed, false); + strictEqual(streamShouldNotClose.destroyed, false); deepStrictEqual(await streamShouldNotClose.toArray(), [3, 4, 5]); - assert.strictEqual(streamShouldNotClose.destroyed, true); + strictEqual(streamShouldNotClose.destroyed, true); })().then(common.mustCall()); } @@ -146,7 +145,7 @@ const naturals = () => from(async function*() { .toArray() .then(common.mustNotCall()) .catch(common.mustCall((error) => { - assert.strictEqual(streamShouldNotClose.destroyed, true); - assert.strictEqual(error, errorToThrow); - })) + strictEqual(streamShouldNotClose.destroyed, true); + strictEqual(error, errorToThrow); + })); } From 1ae2061a307180b6ca4423ced32742cf413ca77d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 9 Mar 2023 17:49:01 +0200 Subject: [PATCH 11/16] stream: trigger ci From d3fc3c66902017b53bf9e163cc0a38c5f1444990 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 10 Mar 2023 00:16:16 +0200 Subject: [PATCH 12/16] stream: add docs --- doc/api/stream.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index d3719dc640f6c8..94d733afa15d02 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2437,6 +2437,9 @@ added: * `options` {Object} * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. + * `destroyStream` {boolean} When set to `false`, the stream will not be + closed after take finished unless the stream had an error. + **Default:** `true`. * Returns: {Readable} a stream with `limit` chunks taken. This method returns a new stream with the first `limit` chunks. @@ -2447,6 +2450,25 @@ import { Readable } from 'node:stream'; await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] ``` +Using the `destroyStream: false` you can use it start parsing + +```mjs +import fs from 'node:fs'; +import { Readable } from 'node:stream'; + +const csvParsedStream = fs + .createReadStream('file.csv') + .compose(myAwesomeParseCSV()); + +const [columns] = await csvParsedStream + .take(1) + .toArray(); + +const parsed = await csvParsedStream + .map((row) => parseRowByColumns(row, columns)) + .toArray(); +``` + ##### `readable.asIndexedPairs([options])`