From f90da175c8226c55cc35bb997a94f4c67f5ac901 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 20 Aug 2022 12:58:33 +0200 Subject: [PATCH 1/2] stream: only cleanup error listener if not readable Only cleanup the error listener if we know for sure that the last stream is still readable after the pipeline has completed. --- environment_test.log | 0 lib/internal/streams/pipeline.js | 50 ++++++++------------------- test/parallel/test-stream-pipeline.js | 41 ++++++++++++++++++++++ 3 files changed, 55 insertions(+), 36 deletions(-) create mode 100644 environment_test.log diff --git a/environment_test.log b/environment_test.log new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 77520a14d50a6f..6257a19b626790 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -32,8 +32,8 @@ const { const { isIterable, - isReadable, isReadableNodeStream, + isReadableFinished, isNodeStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -41,7 +41,7 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; -function destroyer(stream, reading, writing) { +function destroyer(stream, reading, writing, isLast) { let finished = false; stream.on('close', () => { finished = true; @@ -51,13 +51,15 @@ function destroyer(stream, reading, writing) { finished = !err; }); - return { - destroy: (err) => { - if (finished) return; - finished = true; - destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); - }, - cleanup + return (err) => { + if (isLast && isReadableFinished(stream) === false) { + cleanup(); + } + if (finished) { + return; + } + finished = true; + destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); }; } @@ -164,10 +166,6 @@ function pipelineImpl(streams, callback, opts) { const signal = ac.signal; const outerSignal = opts?.signal; - // Need to cleanup event listeners if last stream is readable - // https://github.com/nodejs/node/issues/35452 - const lastStreamCleanup = []; - validateAbortSignal(outerSignal, 'options.signal'); function abort() { @@ -203,9 +201,6 @@ function pipelineImpl(streams, callback, opts) { ac.abort(); if (final) { - if (!error) { - lastStreamCleanup.forEach((fn) => fn()); - } process.nextTick(callback, error, value); } } @@ -220,12 +215,7 @@ function pipelineImpl(streams, callback, opts) { if (isNodeStream(stream)) { if (end) { - const { destroy, cleanup } = destroyer(stream, reading, writing); - destroys.push(destroy); - - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(cleanup); - } + destroys.push(destroyer(stream, reading, writing, isLastStream)); } // Catch stream errors that occur after pipe/pump has completed. @@ -239,11 +229,6 @@ function pipelineImpl(streams, callback, opts) { } } stream.on('error', onError); - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(() => { - stream.removeListener('error', onError); - }); - } } if (i === 0) { @@ -311,19 +296,12 @@ function pipelineImpl(streams, callback, opts) { ret = pt; - const { destroy, cleanup } = destroyer(ret, false, true); - destroys.push(destroy); - if (isLastStream) { - lastStreamCleanup.push(cleanup); - } + destroys.push(destroyer(ret, false, true)); } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - const cleanup = pipe(ret, stream, finish, { end }); - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(cleanup); - } + pipe(ret, stream, finish, { end }); } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 529b18386e25a6..426e33a582f445 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1556,3 +1556,44 @@ const tsp = require('timers/promises'); }) ); } + +{ + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }); + const s = new Duplex({ + readable: false, + read() {}, + write(chunk, encoding, callback) { + callback(); + } + }); + pipeline(r, s, common.mustCall((err) => { + assert(!err); + assert(s.listenerCount('error'), 1); + })); +} + +{ + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }); + const s = new Duplex({ + read() { + this.push(null); + }, + write(chunk, encoding, callback) { + callback(); + } + }); + pipeline(r, s, common.mustCall((err) => { + assert(!err); + assert(s.listenerCount('error'), 1); + })); +} From 69dc082fc6ec7bf3ebc89476f636a7b2a5f451eb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 21 Aug 2022 13:04:49 +0200 Subject: [PATCH 2/2] fixup --- environment_test.log | 0 lib/internal/streams/pipeline.js | 9 ++++++- .../test-stream-pipeline-listeners.js | 26 +------------------ 3 files changed, 9 insertions(+), 26 deletions(-) delete mode 100644 environment_test.log diff --git a/environment_test.log b/environment_test.log deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 6257a19b626790..821bd7373b5135 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -177,6 +177,7 @@ function pipelineImpl(streams, callback, opts) { let error; let value; const destroys = []; + const cleanups = []; let finishCount = 0; @@ -197,6 +198,10 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } + for (const cleanup of cleanups) { + cleanup(); + } + outerSignal?.removeEventListener('abort', abort); ac.abort(); @@ -229,6 +234,7 @@ function pipelineImpl(streams, callback, opts) { } } stream.on('error', onError); + cleanups.push(() => stream.off('error', onError)); } if (i === 0) { @@ -301,7 +307,8 @@ function pipelineImpl(streams, callback, opts) { } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - pipe(ret, stream, finish, { end }); + const cleanup = pipe(ret, stream, finish, { end }); + cleanups.push(cleanup); } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); diff --git a/test/parallel/test-stream-pipeline-listeners.js b/test/parallel/test-stream-pipeline-listeners.js index 81e287b77c7589..69e876e10afa81 100644 --- a/test/parallel/test-stream-pipeline-listeners.js +++ b/test/parallel/test-stream-pipeline-listeners.js @@ -6,7 +6,7 @@ const assert = require('assert'); process.on('uncaughtException', common.mustCall((err) => { assert.strictEqual(err.message, 'no way'); -}, 2)); +}, 1)); // Ensure that listeners is removed if last stream is readable // And other stream's listeners unchanged @@ -30,30 +30,6 @@ pipeline(a, b, common.mustCall((error) => { }, 100); })); -// Async generators -const c = new PassThrough(); -c.end('foobar'); -const d = pipeline( - c, - async function* (source) { - for await (const chunk of source) { - yield String(chunk).toUpperCase(); - } - }, - common.mustCall((error) => { - if (error) { - assert.ifError(error); - } - - assert(c.listenerCount('error') > 0); - assert.strictEqual(d.listenerCount('error'), 0); - setTimeout(() => { - assert.strictEqual(b.listenerCount('error'), 0); - d.destroy(new Error('no way')); - }, 100); - }) -); - // If last stream is not readable, will not throw and remove listeners const e = new PassThrough(); e.end('foobar');