From 963f536b673fd932d305d39ffb567093d77760d6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 11:15:45 +0200 Subject: [PATCH 01/13] stream: fix pipeline pump Refs: https://github.com/nodejs/node/issues/39005 --- lib/internal/streams/pipeline.js | 48 ++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 441fcb471858bc..8b44c41426d280 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -16,18 +16,21 @@ const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED + ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE } = require('internal/errors').codes; const { validateCallback } = require('internal/validators'); +function noop() {} + const { isIterable, isReadable, isStream, } = require('internal/streams/utils'); +const assert = require('internal/assert'); -let EE; let PassThrough; let Readable; @@ -102,25 +105,52 @@ async function* fromReadable(val) { } async function pump(iterable, writable, finish) { - if (!EE) { - EE = require('events'); - } let error; + let callback = noop; + const resume = (err) => { + if (!error && err) { + error = err; + } + const _callback = callback; + callback = noop; + _callback(); + }; + const onClose = () => { + resume(new ERR_STREAM_PREMATURE_CLOSE()); + }; + + const waitForDrain = () => new Promise((resolve) => { + assert.strictEqual(callback, noop); + if (error || writable.destroyed) { + resolve(); + } else { + callback = resolve; + } + }); + + writable + .on('drain', resume) + .on('error', resume) + .on('close', onClose); + try { - if (writable.writableNeedDrain === true) { - await EE.once(writable, 'drain'); + if (writable.writableNeedDrain) { + await waitForDrain(); } for await (const chunk of iterable) { if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); + await waitForDrain(); } } writable.end(); } catch (err) { error = err; } finally { + writable + .off('drain', resume) + .off('error', resume) + .off('close', onClose); finish(error); } } From 5f05fd5d631f1c1eb2c3292aa6ec583badd76139 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 13:07:45 +0200 Subject: [PATCH 02/13] fixup --- lib/internal/streams/pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8b44c41426d280..f16ad9a4f5027b 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,6 +6,7 @@ const { ArrayIsArray, SymbolAsyncIterator, + Promise, } = primordials; let eos; From f29bb2d092a28f2f6cb09e1d7da03515f585c309 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 14:19:31 +0200 Subject: [PATCH 03/13] fixup --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index f16ad9a4f5027b..bcc4be3d68221d 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -121,7 +121,7 @@ async function pump(iterable, writable, finish) { }; const waitForDrain = () => new Promise((resolve) => { - assert.strictEqual(callback, noop); + assert(callback === noop); if (error || writable.destroyed) { resolve(); } else { From 1cd6a1cf59e1071115ba9a5979dab582acb7835c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 15:05:05 +0200 Subject: [PATCH 04/13] Update lib/internal/streams/pipeline.js Co-authored-by: Darshan Sen --- lib/internal/streams/pipeline.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index bcc4be3d68221d..58ebf33319cda0 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -109,9 +109,7 @@ async function pump(iterable, writable, finish) { let error; let callback = noop; const resume = (err) => { - if (!error && err) { - error = err; - } + error ||= err; const _callback = callback; callback = noop; _callback(); From 33c4e21daa3ca320446728bb83dacec0e12bdf95 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 15:24:35 +0200 Subject: [PATCH 05/13] Apply suggestions from code review Co-authored-by: Antoine du Hamel --- lib/internal/streams/pipeline.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 58ebf33319cda0..60cb9be1faf8a2 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -5,8 +5,8 @@ const { ArrayIsArray, - SymbolAsyncIterator, Promise, + SymbolAsyncIterator, } = primordials; let eos; @@ -18,7 +18,7 @@ const { ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, - ERR_STREAM_PREMATURE_CLOSE + ERR_STREAM_PREMATURE_CLOSE, } = require('internal/errors').codes; const { validateCallback } = require('internal/validators'); @@ -109,7 +109,7 @@ async function pump(iterable, writable, finish) { let error; let callback = noop; const resume = (err) => { - error ||= err; + error = aggregateTwoErrors(error, err); const _callback = callback; callback = noop; _callback(); From 30b7e8dc692866ef40e789800787f0eff0164d83 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 15:45:36 +0200 Subject: [PATCH 06/13] fixup --- lib/internal/streams/pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 60cb9be1faf8a2..a3cd57e1e5c8da 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -19,6 +19,7 @@ const { ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, ERR_STREAM_PREMATURE_CLOSE, + aggregateTwoErrors, } = require('internal/errors').codes; const { validateCallback } = require('internal/validators'); From e6a62f46bdd6da7bd423f11692275f245d61c937 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 16:27:01 +0200 Subject: [PATCH 07/13] Apply suggestions from code review Co-authored-by: Antoine du Hamel --- lib/internal/streams/pipeline.js | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index a3cd57e1e5c8da..e9244fd5eaa846 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -14,13 +14,15 @@ let eos; const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); const { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_RETURN_VALUE, - ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED, - ERR_STREAM_PREMATURE_CLOSE, aggregateTwoErrors, -} = require('internal/errors').codes; + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, + ERR_MISSING_ARGS, + ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE, + }, +} = require('internal/errors'); const { validateCallback } = require('internal/validators'); From 2b453dce458116fefb1afdfd07e641ae4c32b8e9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 16:47:56 +0200 Subject: [PATCH 08/13] fixup: test --- test/parallel/test-stream-pipeline.js | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..a729e77218b183 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1387,3 +1387,23 @@ const net = require('net'); assert.strictEqual(res, content); })); } + +{ + const writableLike = new EE(); + writableLike.writableNeedDrain = true; + + pipeline(async function *() {}, writableLike, + common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' })); + + writableLike.emit('close'); +} + +{ + const writableLike = new EE(); + writableLike.write = () => false; + + pipeline(async function *() { yield null }, writableLike, + common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' })); + + writableLike.emit('close'); +} From f12cc3dc1be7bee675f4bfb215c08ef214171ef9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 16:48:53 +0200 Subject: [PATCH 09/13] fixup: test --- test/parallel/test-stream-pipeline.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index a729e77218b183..a6f4bffb46e5ee 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1402,8 +1402,14 @@ const net = require('net'); const writableLike = new EE(); writableLike.write = () => false; - pipeline(async function *() { yield null }, writableLike, - common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' })); + pipeline( + async function *() { + yield null; + yield null; + }, + writableLike, + common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' }) + ); writableLike.emit('close'); } From 4e6be168be2756fa37f040fbb93125aa313e5677 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Jun 2021 19:43:00 +0200 Subject: [PATCH 10/13] Update lib/internal/streams/pipeline.js Co-authored-by: Antoine du Hamel --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e9244fd5eaa846..c9178349e39ad7 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -147,7 +147,7 @@ async function pump(iterable, writable, finish) { } writable.end(); } catch (err) { - error = err; + error = aggregateTwoErrors(error, err); } finally { writable .off('drain', resume) From 225681814ab5500ffd08e0ac3793b3e20e774fe8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 12 Jun 2021 11:04:56 +0200 Subject: [PATCH 11/13] fixup --- test/parallel/test-stream-pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index a6f4bffb46e5ee..07810f01fa105c 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -15,6 +15,7 @@ const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); const net = require('net'); +const EE = require('events'); { let finished = false; From da4623a3c2a820ec992bdd5eba7c7cd3c5af9895 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 12 Jun 2021 12:17:16 +0200 Subject: [PATCH 12/13] fixup --- lib/internal/streams/pipeline.js | 5 ++++- test/parallel/test-stream-pipeline.js | 18 ++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c9178349e39ad7..830629759943ff 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -145,7 +145,10 @@ async function pump(iterable, writable, finish) { await waitForDrain(); } } - writable.end(); + + if (!error) { + writable.end(); + } } catch (err) { error = aggregateTwoErrors(error, err); } finally { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 07810f01fa105c..aaf726ea5a0350 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -15,7 +15,6 @@ const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); const net = require('net'); -const EE = require('events'); { let finished = false; @@ -1390,17 +1389,22 @@ const EE = require('events'); } { - const writableLike = new EE(); + const writableLike = new Stream(); writableLike.writableNeedDrain = true; - pipeline(async function *() {}, writableLike, - common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' })); + pipeline( + async function *() {}, + writableLike, + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + }) + ); writableLike.emit('close'); } { - const writableLike = new EE(); + const writableLike = new Stream(); writableLike.write = () => false; pipeline( @@ -1409,7 +1413,9 @@ const EE = require('events'); yield null; }, writableLike, - common.expectsError({ code: 'ERR_STREAM_PREMATURE_CLOSE' }) + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + }) ); writableLike.emit('close'); From c23ab6439bca2cf79b0dfbf8e1deb00e45da29ec Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 12 Jun 2021 12:19:23 +0200 Subject: [PATCH 13/13] fixup --- lib/internal/streams/pipeline.js | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 830629759943ff..5f63e1c7d29b76 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -140,15 +140,24 @@ async function pump(iterable, writable, finish) { await waitForDrain(); } + if (error) { + return; + } + for await (const chunk of iterable) { if (!writable.write(chunk)) { await waitForDrain(); } + if (error) { + return; + } } - if (!error) { - writable.end(); + if (error) { + return; } + + writable.end(); } catch (err) { error = aggregateTwoErrors(error, err); } finally {