Skip to content

Commit 463282c

Browse files
committed
fixup: remove compose
1 parent 6c91184 commit 463282c

File tree

4 files changed

+212
-251
lines changed

4 files changed

+212
-251
lines changed

lib/internal/streams/body.js

Lines changed: 168 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const { createDeferredPromise } = require('internal/util');
99
const { destroyer } = require('internal/streams/destroy');
1010
const from = require('internal/streams/from');
1111
const assert = require('internal/assert');
12+
const pipeline = require('internal/streams/pipeline');
1213

1314
const {
1415
isBlob
@@ -25,23 +26,30 @@ const isWritableStream =
2526

2627
const {
2728
isIterable,
29+
isNodeStream,
30+
isReadable,
31+
isWritable,
2832
isDuplexNodeStream,
2933
isReadableNodeStream,
3034
isWritableNodeStream,
3135
} = require('internal/streams/utils');
3236

3337
const {
38+
ArrayIsArray,
3439
JSONParse,
3540
PromiseResolve,
3641
Symbol,
3742
SymbolAsyncIterator
3843
} = primordials;
3944

4045
const {
46+
AbortError,
4147
codes: {
4248
ERR_INVALID_ARG_TYPE,
4349
ERR_INVALID_RETURN_VALUE,
4450
ERR_INVALID_STATE,
51+
ERR_MISSING_ARGS,
52+
ERR_INVALID_ARG_VALUE,
4553
},
4654
} = require('internal/errors');
4755

@@ -73,7 +81,10 @@ class Body {
7381
constructor(body, options) {
7482
// TODO (ronag): What about TransformStream?
7583

76-
if (body[kState]) {
84+
if (ArrayIsArray(body)) {
85+
const d = compose(...body);
86+
this[kState] = { readable: d, writable: d };
87+
} else if (body[kState]) {
7788
this[kState] = body[kState];
7889
} else if (
7990
isReadableStream(body?.readable) &&
@@ -322,4 +333,160 @@ function fromAsyncGen(fn) {
322333
};
323334
}
324335

336+
function compose(...streams) {
337+
if (streams.length === 0) {
338+
throw new ERR_MISSING_ARGS('streams');
339+
}
340+
341+
if (streams.length === 1) {
342+
return new Body(streams[0]);
343+
}
344+
345+
const orgStreams = [...streams];
346+
347+
if (typeof streams[0] === 'function') {
348+
streams[0] = new Body(streams[0]).nodeStream();
349+
}
350+
351+
if (typeof streams[streams.length - 1] === 'function') {
352+
const idx = streams.length - 1;
353+
streams[idx] = new Body(streams[idx]);
354+
}
355+
356+
for (let n = 0; n < streams.length; ++n) {
357+
// TODO(ronag): Relax requirements.
358+
359+
if (!isNodeStream(streams[n])) {
360+
// TODO(ronag): Add checks for non streams.
361+
continue;
362+
}
363+
if (n < streams.length - 1 && !isReadable(streams[n])) {
364+
throw new ERR_INVALID_ARG_VALUE(
365+
`streams[${n}]`,
366+
orgStreams[n],
367+
'must be readable'
368+
);
369+
}
370+
if (n > 0 && !isWritable(streams[n])) {
371+
throw new ERR_INVALID_ARG_VALUE(
372+
`streams[${n}]`,
373+
orgStreams[n],
374+
'must be writable'
375+
);
376+
}
377+
}
378+
379+
let ondrain;
380+
let onfinish;
381+
let onreadable;
382+
let onclose;
383+
let d;
384+
385+
function onfinished(err) {
386+
const cb = onclose;
387+
onclose = null;
388+
389+
if (cb) {
390+
cb(err);
391+
} else if (err) {
392+
d.destroy(err);
393+
} else if (!readable && !writable) {
394+
d.destroy();
395+
}
396+
}
397+
398+
const head = streams[0];
399+
const tail = pipeline(streams, onfinished);
400+
401+
const writable = !!isWritable(head);
402+
const readable = !!isReadable(tail);
403+
404+
d = new BodyDuplex({
405+
// TODO (ronag): highWaterMark?
406+
readableObjectMode: !!tail?.readableObjectMode,
407+
writableObjectMode: !!head?.writableObjectMode,
408+
readable,
409+
writable,
410+
});
411+
412+
if (writable) {
413+
d._write = function(chunk, encoding, callback) {
414+
if (head.write(chunk, encoding)) {
415+
callback();
416+
} else {
417+
ondrain = callback;
418+
}
419+
};
420+
421+
d._final = function(callback) {
422+
head.end();
423+
onfinish = callback;
424+
};
425+
426+
head.on('drain', function() {
427+
if (ondrain) {
428+
const cb = ondrain;
429+
ondrain = null;
430+
cb();
431+
}
432+
});
433+
434+
tail.on('finish', function() {
435+
if (onfinish) {
436+
const cb = onfinish;
437+
onfinish = null;
438+
cb();
439+
}
440+
});
441+
}
442+
443+
if (readable) {
444+
tail.on('readable', function() {
445+
if (onreadable) {
446+
const cb = onreadable;
447+
onreadable = null;
448+
cb();
449+
}
450+
});
451+
452+
tail.on('end', function() {
453+
d.push(null);
454+
});
455+
456+
d._read = function() {
457+
while (true) {
458+
const buf = tail.read();
459+
460+
if (buf === null) {
461+
onreadable = d._read;
462+
return;
463+
}
464+
465+
if (!d.push(buf)) {
466+
return;
467+
}
468+
}
469+
};
470+
}
471+
472+
d._destroy = function(err, callback) {
473+
if (!err && onclose !== null) {
474+
err = new AbortError();
475+
}
476+
477+
onreadable = null;
478+
ondrain = null;
479+
onfinish = null;
480+
481+
if (onclose === null) {
482+
callback(err);
483+
} else {
484+
onclose = callback;
485+
destroyer(tail, err);
486+
}
487+
};
488+
489+
return d;
490+
}
491+
325492
module.exports = Body;

0 commit comments

Comments
 (0)