From e7adbab9fac43fc44060fb72d7f7dac41191d36e Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sun, 31 Jan 2021 17:22:28 +0200 Subject: [PATCH 1/4] stream: improve Readable.from error handling --- lib/internal/streams/from.js | 33 +++++++++++++---------------- test/parallel/test-readable-from.js | 23 +++++++++++++++++++- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index c4e06945260d5f..72415f06899bcc 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -43,9 +43,6 @@ function from(Readable, iterable, opts) { // being called before last iteration completion. let reading = false; - // Flag for when iterator needs to be explicitly closed. - let needToClose = false; - readable._read = function() { if (!reading) { reading = true; @@ -54,19 +51,23 @@ function from(Readable, iterable, opts) { }; readable._destroy = function(error, cb) { - if (needToClose) { - needToClose = false; - PromisePrototypeThen( - close(), - () => process.nextTick(cb, error), - (e) => process.nextTick(cb, error || e), - ); - } else { - cb(error); - } + PromisePrototypeThen( + close(error), + () => cb(error), + (e) => cb(error ?? e) + ) }; - async function close() { + async function close(error) { + const hadError = typeof error !== 'undefined'; + const hasThrow = typeof iterator.return === 'function'; + if (hadError && hasThrow) { + const { value, done } = await iterator.throw(error); + await value; + if (done) { + return; + } + } if (typeof iterator.return === 'function') { const { value } = await iterator.return(); await value; @@ -75,13 +76,9 @@ function from(Readable, iterable, opts) { async function next() { try { - needToClose = false; const { value, done } = await iterator.next(); - needToClose = !done; if (done) { readable.push(null); - } else if (readable.destroyed) { - await close(); } else { const res = await value; if (res === null) { diff --git a/test/parallel/test-readable-from.js b/test/parallel/test-readable-from.js index a4e0f1b566c32c..aca8f5548a7129 100644 --- a/test/parallel/test-readable-from.js +++ b/test/parallel/test-readable-from.js @@ -4,6 +4,7 @@ const { mustCall } = require('../common'); const { once } = require('events'); const { Readable } = require('stream'); const { strictEqual, throws } = require('assert'); +const common = require('../common'); { throws(() => { @@ -187,6 +188,25 @@ async function endWithError() { } } +async function destroyingStreamWithErrorThrowsInGenerator() { + const validateError = common.mustCall((e) => { + strictEqual(e, 'Boum'); + }); + async function* generate() { + try { + yield 1; + yield 2; + yield 3; + throw new Error(); + } catch (e) { + validateError(e); + } + } + const stream = Readable.from(generate()); + stream.read(); + stream.once('error', common.mustCall()); + stream.destroy('Boum'); +} Promise.all([ toReadableBasicSupport(), @@ -198,5 +218,6 @@ Promise.all([ toReadableOnDataNonObject(), destroysTheStreamWhenThrowing(), asTransformStream(), - endWithError() + endWithError(), + destroyingStreamWithErrorThrowsInGenerator(), ]).then(mustCall()); From 0f9324ae0b8eef500ce58c5bacecd2ab6294a188 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sun, 31 Jan 2021 17:40:01 +0200 Subject: [PATCH 2/4] fixup! lint --- lib/internal/streams/from.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 72415f06899bcc..8b5dca6753fe73 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -53,9 +53,9 @@ function from(Readable, iterable, opts) { readable._destroy = function(error, cb) { PromisePrototypeThen( close(error), - () => cb(error), + () => cb(error), (e) => cb(error ?? e) - ) + ); }; async function close(error) { From 11fe4f2735bdbf995cd9b03072480f44c1be6213 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sun, 31 Jan 2021 19:14:27 +0200 Subject: [PATCH 3/4] fixup! comment --- lib/internal/streams/from.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 8b5dca6753fe73..2cc0b45af9726e 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -53,8 +53,8 @@ function from(Readable, iterable, opts) { readable._destroy = function(error, cb) { PromisePrototypeThen( close(error), - () => cb(error), - (e) => cb(error ?? e) + () => process.nextTick(cb, error), // nextTick is here in case cb throws + (e) => process.nextTick(cb, error || e), ); }; From 0bce498e93981f87a58fc7584be5258b68fefeef Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 1 Feb 2021 12:09:52 +0200 Subject: [PATCH 4/4] fixup! code review --- lib/internal/streams/from.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 2cc0b45af9726e..6798411fa4b038 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -54,13 +54,13 @@ function from(Readable, iterable, opts) { PromisePrototypeThen( close(error), () => process.nextTick(cb, error), // nextTick is here in case cb throws - (e) => process.nextTick(cb, error || e), + (e) => process.nextTick(cb, e || error), ); }; async function close(error) { - const hadError = typeof error !== 'undefined'; - const hasThrow = typeof iterator.return === 'function'; + const hadError = (error !== undefined) && (error !== null); + const hasThrow = typeof iterator.throw === 'function'; if (hadError && hasThrow) { const { value, done } = await iterator.throw(error); await value;