diff --git a/lib/stream.js b/lib/stream.js index a26cc0b81c557e..953169537b5013 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -39,6 +39,7 @@ const { const { codes: { ERR_ILLEGAL_CONSTRUCTOR, + ERR_INVALID_ARG_VALUE, }, } = require('internal/errors'); const compose = require('internal/streams/compose'); @@ -69,7 +70,14 @@ for (let i = 0; i < streamKeys.length; i++) { if (new.target) { throw new ERR_ILLEGAL_CONSTRUCTOR(); } - return Stream.Readable.from(ReflectApply(op, this, args)); + const result = (ReflectApply(op, this, args)); + if (result instanceof Stream.Duplex) { + return result; + } + if (utils.isReadable(result) || utils.isWritable(result)) { + return Stream.compose(result); + } + throw new ERR_INVALID_ARG_VALUE('result', result, 'must be a Duplex, Readable, or Writable stream'); } ObjectDefineProperty(fn, 'name', { __proto__: null, value: op.name }); ObjectDefineProperty(fn, 'length', { __proto__: null, value: op.length }); diff --git a/test/parallel/test-stream-duplex.js b/test/parallel/test-stream-duplex.js index 490744910cb1e1..8cd60435fd9341 100644 --- a/test/parallel/test-stream-duplex.js +++ b/test/parallel/test-stream-duplex.js @@ -23,6 +23,7 @@ const common = require('../common'); const assert = require('assert'); const Duplex = require('stream').Duplex; +const { Readable, Writable } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const stream = new Duplex({ objectMode: true }); @@ -131,3 +132,47 @@ process.on('exit', () => { assert.deepStrictEqual(Buffer.from(result.value), dataToRead); })); } + +const assertMessages = { + isDuplex: 'Expected a Duplex stream', + duplexUnchanged: 'Expected the Duplex stream to be returned unchanged', + notDuplex: 'Expected a non-Duplex stream', + isReadable: 'Expected a Readable stream' +}; + +// readable.compose - Duplex stream unchanged +{ + const readable = new Readable({ + read() {} + }); + + const duplex = new Duplex({ + read() {}, + write(chunk, encoding, callback) { + callback(); + } + }); + + const composedStream = readable.compose(duplex); + + assert(composedStream instanceof Duplex, assertMessages.isDuplex); + assert.strictEqual(composedStream, duplex, assertMessages.duplexUnchanged); +} + +// readable.compose - wraps non-Duplex streams +{ + const readable = new Readable({ + read() {} + }); + + const writable = new Writable({ + write(chunk, encoding, callback) { + callback(); + } + }); + + const composedStream = readable.compose(writable); + + assert(!(composedStream instanceof Duplex), assertMessages.notDuplex); + assert(composedStream instanceof Readable, assertMessages.isReadable); +}