Skip to content

Commit 533cafc

Browse files
committed
stream: duplexify
PR-URL: #39519 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 51cd4a8 commit 533cafc

File tree

11 files changed

+560
-111
lines changed

11 files changed

+560
-111
lines changed

doc/api/stream.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,6 +2081,34 @@ added: REPLACEME
20812081
* `streamWritable` {stream.Writable}
20822082
* Returns: {WritableStream}
20832083

2084+
### `stream.Duplex.from(src)`
2085+
<!-- YAML
2086+
added: REPLACEME
2087+
-->
2088+
2089+
* `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable|
2090+
AsyncGeneratorFunction|AsyncFunction|Promise|Object}
2091+
2092+
A utility method for creating duplex streams.
2093+
2094+
* `Stream` converts writable stream into writable `Duplex` and readable stream
2095+
to `Duplex`.
2096+
* `Blob` converts into readable `Duplex`.
2097+
* `string` converts into readable `Duplex`.
2098+
* `ArrayBuffer` converts into readable `Duplex`.
2099+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
2100+
`null`.
2101+
* `AsyncGeneratorFunction` converts into a readable/writable transform
2102+
`Duplex`. Must take a source `AsyncIterable` as first parameter. Cannot yield
2103+
`null`.
2104+
* `AsyncFunction` converts into a writable `Duplex`. Must return
2105+
either `null` or `undefined`
2106+
* `Object ({ writable, readable })` converts `readable` and
2107+
`writable` into `Stream` and then combines them into `Duplex` where the
2108+
`Duplex` will write to the `writable` and read from the `readable`.
2109+
* `Promise` converts into readable `Duplex`. Value `null` is ignored.
2110+
* Returns: {stream.Duplex}
2111+
20842112
### `stream.Duplex.fromWeb(pair[, options])`
20852113
<!-- YAML
20862114
added: REPLACEME

lib/internal/streams/compose.js

Lines changed: 4 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,19 @@
22

33
const pipeline = require('internal/streams/pipeline');
44
const Duplex = require('internal/streams/duplex');
5-
const { createDeferredPromise } = require('internal/util');
65
const { destroyer } = require('internal/streams/destroy');
7-
const from = require('internal/streams/from');
86
const {
97
isNodeStream,
10-
isIterable,
118
isReadable,
129
isWritable,
1310
} = require('internal/streams/utils');
14-
const {
15-
PromiseResolve,
16-
} = primordials;
1711
const {
1812
AbortError,
1913
codes: {
20-
ERR_INVALID_ARG_TYPE,
2114
ERR_INVALID_ARG_VALUE,
22-
ERR_INVALID_RETURN_VALUE,
2315
ERR_MISSING_ARGS,
2416
},
2517
} = require('internal/errors');
26-
const assert = require('internal/assert');
2718

2819
// This is needed for pre node 17.
2920
class ComposeDuplex extends Duplex {
@@ -53,18 +44,18 @@ module.exports = function compose(...streams) {
5344
}
5445

5546
if (streams.length === 1) {
56-
return makeDuplex(streams[0], 'streams[0]');
47+
return Duplex.from(streams[0]);
5748
}
5849

5950
const orgStreams = [...streams];
6051

6152
if (typeof streams[0] === 'function') {
62-
streams[0] = makeDuplex(streams[0], 'streams[0]');
53+
streams[0] = Duplex.from(streams[0]);
6354
}
6455

6556
if (typeof streams[streams.length - 1] === 'function') {
6657
const idx = streams.length - 1;
67-
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
58+
streams[idx] = Duplex.from(streams[idx]);
6859
}
6960

7061
for (let n = 0; n < streams.length; ++n) {
@@ -117,7 +108,7 @@ module.exports = function compose(...streams) {
117108
// Implement Writable/Readable/Duplex traits.
118109
// See, https://github.com/nodejs/node/pull/33515.
119110
d = new ComposeDuplex({
120-
highWaterMark: 1,
111+
// TODO (ronag): highWaterMark?
121112
writableObjectMode: !!head?.writableObjectMode,
122113
readableObjectMode: !!tail?.writableObjectMode,
123114
writable,
@@ -203,82 +194,3 @@ module.exports = function compose(...streams) {
203194

204195
return d;
205196
};
206-
207-
function makeDuplex(stream, name) {
208-
let ret;
209-
if (typeof stream === 'function') {
210-
assert(stream.length > 0);
211-
212-
const { value, write, final } = fromAsyncGen(stream);
213-
214-
if (isIterable(value)) {
215-
ret = from(ComposeDuplex, value, {
216-
objectMode: true,
217-
highWaterMark: 1,
218-
write,
219-
final
220-
});
221-
} else if (typeof value?.then === 'function') {
222-
const promise = PromiseResolve(value)
223-
.then((val) => {
224-
if (val != null) {
225-
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
226-
}
227-
})
228-
.catch((err) => {
229-
destroyer(ret, err);
230-
});
231-
232-
ret = new ComposeDuplex({
233-
objectMode: true,
234-
highWaterMark: 1,
235-
readable: false,
236-
write,
237-
final(cb) {
238-
final(() => promise.then(cb, cb));
239-
}
240-
});
241-
} else {
242-
throw new ERR_INVALID_RETURN_VALUE(
243-
'Iterable, AsyncIterable or AsyncFunction', name, value);
244-
}
245-
} else if (isNodeStream(stream)) {
246-
ret = stream;
247-
} else if (isIterable(stream)) {
248-
ret = from(ComposeDuplex, stream, {
249-
objectMode: true,
250-
highWaterMark: 1,
251-
writable: false
252-
});
253-
} else {
254-
throw new ERR_INVALID_ARG_TYPE(
255-
name,
256-
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
257-
stream)
258-
;
259-
}
260-
return ret;
261-
}
262-
263-
function fromAsyncGen(fn) {
264-
let { promise, resolve } = createDeferredPromise();
265-
const value = fn(async function*() {
266-
while (true) {
267-
const { chunk, done, cb } = await promise;
268-
process.nextTick(cb);
269-
if (done) return;
270-
yield chunk;
271-
({ promise, resolve } = createDeferredPromise());
272-
}
273-
}());
274-
275-
return {
276-
value,
277-
write(chunk, encoding, cb) {
278-
resolve({ chunk, done: false, cb });
279-
},
280-
final(cb) {
281-
resolve({ done: true, cb });
282-
}
283-
};
284-
}

lib/internal/streams/destroy.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ function emitErrorCloseLegacy(stream, err) {
323323

324324
// Normalize destroy for legacy.
325325
function destroyer(stream, err) {
326-
if (isDestroyed(stream)) {
326+
if (!stream || isDestroyed(stream)) {
327327
return;
328328
}
329329

lib/internal/streams/duplex.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,12 @@ Duplex.fromWeb = function(pair, options) {
133133
Duplex.toWeb = function(duplex) {
134134
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
135135
};
136+
137+
let duplexify;
138+
139+
Duplex.from = function(body) {
140+
if (!duplexify) {
141+
duplexify = require('internal/streams/duplexify');
142+
}
143+
return duplexify(body, 'body');
144+
};

0 commit comments

Comments
 (0)