From 125ba870ee16f4fa3faaaeca0eef3458db9b5e8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Wed, 15 Apr 2020 01:18:09 +0300 Subject: [PATCH] stream: close iterator in Readable.from Use for-of loop to traverse iterator and properly close it if not all of its values are consumed. Fixes: https://github.com/nodejs/node/issues/32842 --- lib/internal/streams/from.js | 88 ++++++++----- .../test-readable-from-iterator-closing.js | 117 ++++++++++++++++++ 2 files changed, 177 insertions(+), 28 deletions(-) create mode 100644 test/parallel/test-readable-from-iterator-closing.js diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index ab6db00a125a0b..bb4bb603243092 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -2,7 +2,8 @@ const { SymbolAsyncIterator, - SymbolIterator + SymbolIterator, + Promise } = primordials; const { Buffer } = require('buffer'); @@ -11,7 +12,6 @@ const { } = require('internal/errors').codes; function from(Readable, iterable, opts) { - let iterator; if (typeof iterable === 'string' || iterable instanceof Buffer) { return new Readable({ objectMode: true, @@ -23,41 +23,73 @@ function from(Readable, iterable, opts) { }); } - if (iterable && iterable[SymbolAsyncIterator]) - iterator = iterable[SymbolAsyncIterator](); - else if (iterable && iterable[SymbolIterator]) - iterator = iterable[SymbolIterator](); - else - throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); + let onDataNeeded; const readable = new Readable({ objectMode: true, - ...opts + ...opts, + read() { + onDataNeeded && onDataNeeded(); + }, + async destroy(error, cb) { + onDataNeeded && onDataNeeded(); + try { + await pumping; + } catch (e) { + // Do not hide present error + if (!error) error = e; + } + cb(error); + }, }); - // Reading boolean to protect against _read - // being called before last iteration completion. - let reading = false; - readable._read = function() { - if (!reading) { - reading = true; - next(); - } - }; - async function next() { + + if (!iterable[SymbolAsyncIterator] && !iterable[SymbolIterator]) + throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); + + const pumping = pump(); + + return readable; + + async function pump() { + /* + We're iterating over sync or async iterator with the appropriate sync + or async version of the `for-of` loop. + + `for-await-of` loop has an edge case when looping over synchronous + iterator. + + It does not close synchronous iterator with .return() if that iterator + yields rejected Promise, so finally blocks within such an iterator are + never executed. + + In the application code developers can choose between async and sync + forms of the loop depending on their needs, but in the library code we + have to handle such edge cases properly and close iterators anyway. + */ try { - const { value, done } = await iterator.next(); - if (done) { - readable.push(null); - } else if (readable.push(await value)) { - next(); + if (iterable[SymbolAsyncIterator]) { + for await (const data of iterable) { + if (readable.destroyed) return; + if (!readable.push(data)) { + await new Promise((resolve) => { onDataNeeded = resolve; }); + if (readable.destroyed) return; + } + } } else { - reading = false; + for (const data of iterable) { + const value = await data; + if (readable.destroyed) return; + if (!readable.push(value)) { + await new Promise((resolve) => { onDataNeeded = resolve; }); + if (readable.destroyed) return; + } + } } - } catch (err) { - readable.destroy(err); + if (!readable.destroyed) readable.push(null); + } catch (error) { + if (!readable.destroyed) readable.destroy(error); } } - return readable; } module.exports = from; diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js new file mode 100644 index 00000000000000..aed7a8e02166a6 --- /dev/null +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -0,0 +1,117 @@ +'use strict'; + +const { mustCall, mustNotCall } = require('../common'); +const { Readable } = require('stream'); +const { strictEqual } = require('assert'); + +async function asyncSupport() { + const finallyMustCall = mustCall(); + async function* generate() { + try { + yield 'a'; + mustNotCall('only first item is read'); + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(generate()); + + for await (const chunk of stream) { + strictEqual(chunk, 'a'); + break; + } +} + +asyncSupport().then(mustCall()); + +async function syncSupport() { + const finallyMustCall = mustCall(); + function* generate() { + try { + yield 'a'; + mustNotCall('only first item is read'); + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(generate()); + + for await (const chunk of stream) { + strictEqual(chunk, 'a'); + break; + } +} + +syncSupport().then(mustCall()); + +async function syncPromiseSupport() { + const finallyMustCall = mustCall(); + function* generate() { + try { + yield Promise.resolve('a'); + mustNotCall('only first item is read'); + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(generate()); + + for await (const chunk of stream) { + strictEqual(chunk, 'a'); + break; + } +} + +syncPromiseSupport().then(mustCall()); + +async function syncRejectedSupport() { + const finallyMustCall = mustCall(); + const noBodyCall = mustNotCall(); + const catchMustCall = mustCall(); + + function* generate() { + try { + yield Promise.reject('a'); + mustNotCall(); + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(generate()); + + try { + for await (const chunk of stream) { + noBodyCall(chunk); + } + } catch { + catchMustCall(); + } +} + +syncRejectedSupport().then(mustCall()); + +async function noReturnAfterThrow() { + const returnMustNotCall = mustNotCall(); + const noBodyCall = mustNotCall(); + const catchMustCall = mustCall(); + + const stream = Readable.from({ + [Symbol.asyncIterator]() { return this; }, + async next() { throw new Error('a'); }, + async return() { returnMustNotCall(); return { done: true }; }, + }); + + try { + for await (const chunk of stream) { + noBodyCall(chunk); + } + } catch { + catchMustCall(); + } +} + +noReturnAfterThrow().then(mustCall());