Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 20 additions & 35 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ const {

const {
isIterable,
isReadable,
isReadableNodeStream,
isReadableFinished,
isNodeStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;

function destroyer(stream, reading, writing) {
function destroyer(stream, reading, writing, isLast) {
let finished = false;
stream.on('close', () => {
finished = true;
Expand All @@ -51,13 +51,15 @@ function destroyer(stream, reading, writing) {
finished = !err;
});

return {
destroy: (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
},
cleanup
return (err) => {
if (isLast && isReadableFinished(stream) === false) {
cleanup();
}
if (finished) {
return;
}
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
};
}

Expand Down Expand Up @@ -164,10 +166,6 @@ function pipelineImpl(streams, callback, opts) {
const signal = ac.signal;
const outerSignal = opts?.signal;

// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = [];

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
Expand All @@ -179,6 +177,7 @@ function pipelineImpl(streams, callback, opts) {
let error;
let value;
const destroys = [];
const cleanups = [];

let finishCount = 0;

Expand All @@ -199,13 +198,14 @@ function pipelineImpl(streams, callback, opts) {
destroys.shift()(error);
}

for (const cleanup of cleanups) {
cleanup();
}

outerSignal?.removeEventListener('abort', abort);
ac.abort();

if (final) {
if (!error) {
lastStreamCleanup.forEach((fn) => fn());
}
process.nextTick(callback, error, value);
}
}
Expand All @@ -220,12 +220,7 @@ function pipelineImpl(streams, callback, opts) {

if (isNodeStream(stream)) {
if (end) {
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);

if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
destroys.push(destroyer(stream, reading, writing, isLastStream));
}

// Catch stream errors that occur after pipe/pump has completed.
Expand All @@ -239,11 +234,7 @@ function pipelineImpl(streams, callback, opts) {
}
}
stream.on('error', onError);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
cleanups.push(() => stream.off('error', onError));
}

if (i === 0) {
Expand Down Expand Up @@ -311,19 +302,13 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

const { destroy, cleanup } = destroyer(ret, false, true);
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}
destroys.push(destroyer(ret, false, true));
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
const cleanup = pipe(ret, stream, finish, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
cleanups.push(cleanup);
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
Expand Down
26 changes: 1 addition & 25 deletions test/parallel/test-stream-pipeline-listeners.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const assert = require('assert');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 2));
}, 1));

// Ensure that listeners is removed if last stream is readable
// And other stream's listeners unchanged
Expand All @@ -30,30 +30,6 @@ pipeline(a, b, common.mustCall((error) => {
}, 100);
}));

// Async generators
const c = new PassThrough();
c.end('foobar');
const d = pipeline(
c,
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(c.listenerCount('error') > 0);
assert.strictEqual(d.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
d.destroy(new Error('no way'));
}, 100);
})
);

// If last stream is not readable, will not throw and remove listeners
const e = new PassThrough();
e.end('foobar');
Expand Down
41 changes: 41 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1556,3 +1556,44 @@ const tsp = require('timers/promises');
})
);
}

{
const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
});
const s = new Duplex({
readable: false,
read() {},
write(chunk, encoding, callback) {
callback();
}
});
pipeline(r, s, common.mustCall((err) => {
assert(!err);
assert(s.listenerCount('error'), 1);
}));
}

{
const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
});
const s = new Duplex({
read() {
this.push(null);
},
write(chunk, encoding, callback) {
callback();
}
});
pipeline(r, s, common.mustCall((err) => {
assert(!err);
assert(s.listenerCount('error'), 1);
}));
}