diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 441fcb471858bc..341c9619467a28 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -258,6 +258,11 @@ function pipeline(...streams) { throw new ERR_INVALID_ARG_TYPE( name, ['Stream', 'Function'], stream); } + + if (isStream(ret) && ret.destroyed) { + finish(new ERR_STREAM_DESTROYED(reading ? 'pipe' : 'write')); + break; + } } // TODO(ronag): Consider returning a Duplex proxy if the first argument diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..d35218d1d51230 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1372,6 +1372,37 @@ const net = require('net'); })); } +{ + const r = new Readable(); + const t = new PassThrough(); + const w = new Writable(); + + w.destroy(); + + pipeline([r, t, w], common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + assert.strictEqual(err.message, + 'Cannot call write after a stream was destroyed'); + assert.strictEqual(r.destroyed, true); + assert.strictEqual(t.destroyed, true); + })); +} + +{ + const r = new Readable(); + const t = new PassThrough(); + const w = new Writable(); + + t.destroy(); + + pipeline([r, t, w], common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + assert.strictEqual(err.message, + 'Cannot call pipe after a stream was destroyed'); + assert.strictEqual(r.destroyed, true); + })); +} + { const content = 'abc'; pipeline(Buffer.from(content), PassThrough({ objectMode: true }),