From e9a2cc4f73c42c7e9b54559c5e83efe16b1dffb3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Jun 2021 11:55:12 +0200 Subject: [PATCH 1/4] streams: use finished for pump Re-use existing compay logic for pump by using finished. --- lib/internal/streams/pipeline.js | 74 +++++++++++++-------------- test/parallel/test-stream-pipeline.js | 33 ------------ 2 files changed, 36 insertions(+), 71 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 202a8cf9810d62..269607e1bf71b0 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -19,21 +19,17 @@ const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED, - ERR_STREAM_PREMATURE_CLOSE, + ERR_STREAM_DESTROYED }, } = require('internal/errors'); const { validateCallback } = require('internal/validators'); -function noop() {} - const { isIterable, isReadable, isStream, } = require('internal/streams/utils'); -const assert = require('internal/assert'); let PassThrough; let Readable; @@ -109,62 +105,64 @@ async function* fromReadable(val) { async function pump(iterable, writable, finish) { let error; - let callback = noop; + let onresolve = null; + const resume = (err) => { - error = aggregateTwoErrors(error, err); - const _callback = callback; - callback = noop; - _callback(); - }; - const onClose = () => { - resume(new ERR_STREAM_PREMATURE_CLOSE()); + if (err) { + error = err; + } + + if (onresolve) { + const callback = onresolve; + onresolve = null; + callback(); + } }; - const waitForDrain = () => new Promise((resolve) => { - assert(callback === noop); - if (error || writable.destroyed) { - resolve(); + const wait = () => new Promise((resolve, reject) => { + if (error) { + reject(error); } else { - callback = resolve; + onresolve = () => { + if (error) { + reject(error); + } else { + resolve(); + } + }; } }); - writable - .on('drain', resume) - .on('error', resume) - .on('close', onClose); + writable.on('drain', resume); + const cleanup = eos(writable, resume); try { if (writable.writableNeedDrain) { - await waitForDrain(); - } - - if (error) { - return; + await wait(); } for await (const chunk of iterable) { - if (!writable.write(chunk)) { - await waitForDrain(); - } if (error) { - return; + throw error; + } + + if (!writable.write(chunk)) { + await wait(); } } if (error) { - return; + throw error; } writable.end(); + + finish(); } catch (err) { - error = aggregateTwoErrors(error, err); + finish(error !== err ? aggregateTwoErrors(error, err) : err); } finally { - writable - .off('drain', resume) - .off('error', resume) - .off('close', onClose); - finish(error); + cleanup(); + writable.off('drain', resume); } } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index aaf726ea5a0350..e2e5fe2e0d561a 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1387,36 +1387,3 @@ const net = require('net'); assert.strictEqual(res, content); })); } - -{ - const writableLike = new Stream(); - writableLike.writableNeedDrain = true; - - pipeline( - async function *() {}, - writableLike, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - }) - ); - - writableLike.emit('close'); -} - -{ - const writableLike = new Stream(); - writableLike.write = () => false; - - pipeline( - async function *() { - yield null; - yield null; - }, - writableLike, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - }) - ); - - writableLike.emit('close'); -} From 882f4122b9a3d71ebd6c08737a61f8f063124373 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 30 Jun 2021 10:39:20 +0200 Subject: [PATCH 2/4] fixup --- lib/internal/streams/pipeline.js | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 269607e1bf71b0..c9179d02401ddc 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -142,19 +142,11 @@ async function pump(iterable, writable, finish) { } for await (const chunk of iterable) { - if (error) { - throw error; - } - - if (!writable.write(chunk)) { + if (writable.write(chunk) === false) { await wait(); } } - if (error) { - throw error; - } - writable.end(); finish(); From a4a04d00c099a48e3399d685d60b97532b87c8d0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 30 Jun 2021 10:45:45 +0200 Subject: [PATCH 3/4] fixup --- lib/internal/streams/pipeline.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c9179d02401ddc..960f697bb0b291 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -134,7 +134,7 @@ async function pump(iterable, writable, finish) { }); writable.on('drain', resume); - const cleanup = eos(writable, resume); + const cleanup = eos(writable, { readable: false }, resume); try { if (writable.writableNeedDrain) { @@ -142,13 +142,15 @@ async function pump(iterable, writable, finish) { } for await (const chunk of iterable) { - if (writable.write(chunk) === false) { + if (!writable.write(chunk)) { await wait(); } } writable.end(); + await wait(); + finish(); } catch (err) { finish(error !== err ? aggregateTwoErrors(error, err) : err); From bc3e7eadbba73426198a02331d40160a7795521a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 1 Jul 2021 17:00:40 +0200 Subject: [PATCH 4/4] Update lib/internal/streams/pipeline.js Co-authored-by: Antoine du Hamel --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 960f697bb0b291..5759dbd4a580a3 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -19,7 +19,7 @@ const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED + ERR_STREAM_DESTROYED, }, } = require('internal/errors');