diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 77520a14d50a6f..821bd7373b5135 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -32,8 +32,8 @@ const { const { isIterable, - isReadable, isReadableNodeStream, + isReadableFinished, isNodeStream, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -41,7 +41,7 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; -function destroyer(stream, reading, writing) { +function destroyer(stream, reading, writing, isLast) { let finished = false; stream.on('close', () => { finished = true; @@ -51,13 +51,15 @@ function destroyer(stream, reading, writing) { finished = !err; }); - return { - destroy: (err) => { - if (finished) return; - finished = true; - destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); - }, - cleanup + return (err) => { + if (isLast && isReadableFinished(stream) === false) { + cleanup(); + } + if (finished) { + return; + } + finished = true; + destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); }; } @@ -164,10 +166,6 @@ function pipelineImpl(streams, callback, opts) { const signal = ac.signal; const outerSignal = opts?.signal; - // Need to cleanup event listeners if last stream is readable - // https://github.com/nodejs/node/issues/35452 - const lastStreamCleanup = []; - validateAbortSignal(outerSignal, 'options.signal'); function abort() { @@ -179,6 +177,7 @@ function pipelineImpl(streams, callback, opts) { let error; let value; const destroys = []; + const cleanups = []; let finishCount = 0; @@ -199,13 +198,14 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } + for (const cleanup of cleanups) { + cleanup(); + } + outerSignal?.removeEventListener('abort', abort); ac.abort(); if (final) { - if (!error) { - lastStreamCleanup.forEach((fn) => fn()); - } process.nextTick(callback, error, value); } } @@ -220,12 +220,7 @@ function pipelineImpl(streams, callback, opts) { if (isNodeStream(stream)) { if (end) { - const { destroy, cleanup } = destroyer(stream, reading, writing); - destroys.push(destroy); - - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(cleanup); - } + destroys.push(destroyer(stream, reading, writing, isLastStream)); } // Catch stream errors that occur after pipe/pump has completed. @@ -239,11 +234,7 @@ function pipelineImpl(streams, callback, opts) { } } stream.on('error', onError); - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(() => { - stream.removeListener('error', onError); - }); - } + cleanups.push(() => stream.off('error', onError)); } if (i === 0) { @@ -311,19 +302,13 @@ function pipelineImpl(streams, callback, opts) { ret = pt; - const { destroy, cleanup } = destroyer(ret, false, true); - destroys.push(destroy); - if (isLastStream) { - lastStreamCleanup.push(cleanup); - } + destroys.push(destroyer(ret, false, true)); } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; const cleanup = pipe(ret, stream, finish, { end }); - if (isReadable(stream) && isLastStream) { - lastStreamCleanup.push(cleanup); - } + cleanups.push(cleanup); } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); diff --git a/test/parallel/test-stream-pipeline-listeners.js b/test/parallel/test-stream-pipeline-listeners.js index 81e287b77c7589..69e876e10afa81 100644 --- a/test/parallel/test-stream-pipeline-listeners.js +++ b/test/parallel/test-stream-pipeline-listeners.js @@ -6,7 +6,7 @@ const assert = require('assert'); process.on('uncaughtException', common.mustCall((err) => { assert.strictEqual(err.message, 'no way'); -}, 2)); +}, 1)); // Ensure that listeners is removed if last stream is readable // And other stream's listeners unchanged @@ -30,30 +30,6 @@ pipeline(a, b, common.mustCall((error) => { }, 100); })); -// Async generators -const c = new PassThrough(); -c.end('foobar'); -const d = pipeline( - c, - async function* (source) { - for await (const chunk of source) { - yield String(chunk).toUpperCase(); - } - }, - common.mustCall((error) => { - if (error) { - assert.ifError(error); - } - - assert(c.listenerCount('error') > 0); - assert.strictEqual(d.listenerCount('error'), 0); - setTimeout(() => { - assert.strictEqual(b.listenerCount('error'), 0); - d.destroy(new Error('no way')); - }, 100); - }) -); - // If last stream is not readable, will not throw and remove listeners const e = new PassThrough(); e.end('foobar'); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 529b18386e25a6..426e33a582f445 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1556,3 +1556,44 @@ const tsp = require('timers/promises'); }) ); } + +{ + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }); + const s = new Duplex({ + readable: false, + read() {}, + write(chunk, encoding, callback) { + callback(); + } + }); + pipeline(r, s, common.mustCall((err) => { + assert(!err); + assert(s.listenerCount('error'), 1); + })); +} + +{ + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }); + const s = new Duplex({ + read() { + this.push(null); + }, + write(chunk, encoding, callback) { + callback(); + } + }); + pipeline(r, s, common.mustCall((err) => { + assert(!err); + assert(s.listenerCount('error'), 1); + })); +}