Skip to content

Commit 95adcda

Browse files
committed
stream: use finished for async iteration
1 parent 70cf0dc commit 95adcda

File tree

1 file changed

+18
-43
lines changed

1 file changed

+18
-43
lines changed

lib/internal/streams/readable.js

Lines changed: 18 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const { Buffer } = require('buffer');
4545
const {
4646
addAbortSignalNoValidate,
4747
} = require('internal/streams/add-abort-signal');
48+
const eos = require('internal/streams/end-of-stream');
4849

4950
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
5051
debug = fn;
@@ -69,6 +70,7 @@ const kPaused = Symbol('kPaused');
6970

7071
const { StringDecoder } = require('string_decoder');
7172
const from = require('internal/streams/from');
73+
const console = require('console');
7274

7375
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
7476
ObjectSetPrototypeOf(Readable, Stream);
@@ -1090,12 +1092,6 @@ function streamToAsyncIterator(stream, options) {
10901092
async function* createAsyncIterator(stream, options) {
10911093
let callback = nop;
10921094

1093-
const opts = {
1094-
destroyOnReturn: true,
1095-
destroyOnError: true,
1096-
...options,
1097-
};
1098-
10991095
function next(resolve) {
11001096
if (this === stream) {
11011097
callback();
@@ -1105,54 +1101,33 @@ async function* createAsyncIterator(stream, options) {
11051101
}
11061102
}
11071103

1108-
const state = stream._readableState;
1104+
stream.on('readable', next);
1105+
1106+
let error;
1107+
eos(stream, { writable: false }, (err) => {
1108+
error = err || null;
1109+
callback();
1110+
callback = nop;
1111+
});
11091112

1110-
let error = state.errored;
1111-
let errorEmitted = state.errorEmitted;
1112-
let endEmitted = state.endEmitted;
1113-
let closeEmitted = state.closeEmitted;
1114-
1115-
stream
1116-
.on('readable', next)
1117-
.on('error', function(err) {
1118-
error = err;
1119-
errorEmitted = true;
1120-
next.call(this);
1121-
})
1122-
.on('end', function() {
1123-
endEmitted = true;
1124-
next.call(this);
1125-
})
1126-
.on('close', function() {
1127-
closeEmitted = true;
1128-
next.call(this);
1129-
});
1130-
1131-
let errorThrown = false;
11321113
try {
11331114
while (true) {
1134-
const chunk = stream.destroyed ? null : stream.read();
1115+
const chunk = stream.read();
11351116
if (chunk !== null) {
11361117
yield chunk;
1137-
} else if (errorEmitted) {
1138-
throw error;
1139-
} else if (endEmitted) {
1118+
} else if (error !== undefined) {
11401119
break;
1141-
} else if (closeEmitted) {
1142-
throw new ERR_STREAM_PREMATURE_CLOSE();
11431120
} else {
11441121
await new Promise(next);
11451122
}
11461123
}
1147-
} catch (err) {
1148-
if (opts.destroyOnError) {
1149-
destroyImpl.destroyer(stream, err);
1150-
}
1151-
errorThrown = true;
1152-
throw err;
11531124
} finally {
1154-
if (!errorThrown && opts.destroyOnReturn) {
1155-
if (state.autoDestroy || !endEmitted) {
1125+
if (error) {
1126+
if (options?.destroyOnError !== false) {
1127+
destroyImpl.destroyer(stream, error);
1128+
}
1129+
} else if (options?.destroyOnReturn !== false) {
1130+
if (stream._readableState.autoDestroy) {
11561131
destroyImpl.destroyer(stream, null);
11571132
}
11581133
}

0 commit comments

Comments
 (0)