diff --git a/README.md b/README.md index 08aca9c421..28ccae1616 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ npm install --save readable-stream This package is a mirror of the streams implementations in Node.js. -Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.2/docs/api/stream.html). +Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.3/docs/api/stream.html). If you want to guarantee a stable streams base, regardless of what version of Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). diff --git a/build/build.js b/build/build.js index cdb6f736e8..3a2cf0f435 100755 --- a/build/build.js +++ b/build/build.js @@ -146,6 +146,7 @@ pump( file !== 'test-stream-base-prototype-accessors.js' && file !== 'test-stream-base-prototype-accessors-enumerability.js' && file !== 'test-stream-wrap-drain.js' && + file !== 'test-stream-pipeline-http2.js' && file !== 'test-stream-base-typechecking.js') { processTestFile(file) } diff --git a/build/files.js b/build/files.js index e9b721afba..7276dfc25a 100644 --- a/build/files.js +++ b/build/files.js @@ -337,6 +337,17 @@ module.exports['internal/streams/async_iterator.js'] = [ module.exports['internal/streams/end-of-stream.js'] = [ , errorsTwoLevel + , [ + /const \{ once \} = require\('internal\/util'\);/, + `function once(callback) { + let called = false; + return function(...args) { + if (called) return; + called = true; + callback.apply(this, args); + }; +}` + ] ] module.exports['internal/streams/pipeline.js'] = [ diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 42e16bc8b2..33f478d7e8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -544,13 +544,35 @@ function maybeReadMore(stream, state) { } function maybeReadMore_(stream, state) { - var len = state.length; - - while (!state.reading && !state.ended && state.length < state.highWaterMark) { + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. + while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) { + var len = state.length; debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // didn't get any data, stop spinning. - break;else len = state.length; + break; } state.readingMore = false; diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 79a6e1e11e..9fb615a2f3 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -47,6 +47,11 @@ function onReadable(iter) { function wrapForNext(lastPromise, iter) { return function (resolve, reject) { lastPromise.then(function () { + if (iter[kEnded]) { + resolve(createIterResult(undefined, true)); + return; + } + iter[kHandlePromise](resolve, reject); }, reject); }; @@ -70,7 +75,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro } if (this[kEnded]) { - return Promise.resolve(createIterResult(null, true)); + return Promise.resolve(createIterResult(undefined, true)); } if (this[kStream].destroyed) { @@ -83,7 +88,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro if (_this[kError]) { reject(_this[kError]); } else { - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); } }); }); @@ -128,7 +133,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro return; } - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); }); }); }), _Object$setPrototypeO), AsyncIteratorPrototype); @@ -151,9 +156,6 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat }), _defineProperty(_Object$create, kEnded, { value: stream._readableState.endEmitted, writable: true - }), _defineProperty(_Object$create, kLastPromise, { - value: null, - writable: true }), _defineProperty(_Object$create, kHandlePromise, { value: function value(resolve, reject) { var data = iterator[kStream].read(); @@ -170,6 +172,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat }, writable: true }), _Object$create)); + iterator[kLastPromise] = null; finished(stream, function (err) { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise @@ -192,7 +195,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat iterator[kLastPromise] = null; iterator[kLastResolve] = null; iterator[kLastReject] = null; - resolve(createIterResult(null, true)); + resolve(createIterResult(undefined, true)); } iterator[kEnded] = true; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index cf23d53b5a..831f286d98 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -4,27 +4,30 @@ var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE; -function noop() {} - -function isRequest(stream) { - return stream.setHeader && typeof stream.abort === 'function'; -} - function once(callback) { var called = false; - return function (err) { + return function () { if (called) return; called = true; - callback.call(this, err); + + for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) { + args[_key] = arguments[_key]; + } + + callback.apply(this, args); }; } +function noop() {} + +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === 'function'; +} + function eos(stream, opts, callback) { if (typeof opts === 'function') return eos(stream, null, opts); if (!opts) opts = {}; callback = once(callback || noop); - var ws = stream._writableState; - var rs = stream._readableState; var readable = opts.readable || opts.readable !== false && stream.readable; var writable = opts.writable || opts.writable !== false && stream.writable; @@ -32,13 +35,19 @@ function eos(stream, opts, callback) { if (!stream.writable) onfinish(); }; + var writableEnded = stream._writableState && stream._writableState.finished; + var onfinish = function onfinish() { writable = false; + writableEnded = true; if (!readable) callback.call(stream); }; + var readableEnded = stream._readableState && stream._readableState.endEmitted; + var onend = function onend() { readable = false; + readableEnded = true; if (!writable) callback.call(stream); }; @@ -47,12 +56,16 @@ function eos(stream, opts, callback) { }; var onclose = function onclose() { - if (readable && !(rs && rs.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + var err; + + if (readable && !readableEnded) { + if (!stream._readableState || !stream._readableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } - if (writable && !(ws && ws.ended)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (writable && !writableEnded) { + if (!stream._writableState || !stream._writableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } }; @@ -64,7 +77,7 @@ function eos(stream, opts, callback) { stream.on('complete', onfinish); stream.on('abort', onclose); if (stream.req) onrequest();else stream.on('request', onrequest); - } else if (writable && !ws) { + } else if (writable && !stream._writableState) { // legacy streams stream.on('end', onlegacyfinish); stream.on('close', onlegacyfinish); diff --git a/test/parallel/test-stream-pipeline-queued-end-in-destroy.js b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js new file mode 100644 index 0000000000..1bf85834d9 --- /dev/null +++ b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js @@ -0,0 +1,57 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +var common = require('../common'); + +var assert = require('assert/'); + +var _require = require('../../'), + Readable = _require.Readable, + Duplex = _require.Duplex, + pipeline = _require.pipeline; // Test that the callback for pipeline() is called even when the ._destroy() +// method of the stream places an .end() request to itself that does not +// get processed before the destruction of the stream (i.e. the 'close' event). +// Refs: https://github.com/nodejs/node/issues/24456 + + +var readable = new Readable({ + read: common.mustCall(function () {}) +}); +var duplex = new Duplex({ + write: function write(chunk, enc, cb) {// Simulate messages queueing up. + }, + read: function read() {}, + destroy: function destroy(err, cb) { + // Call end() from inside the destroy() method, like HTTP/2 streams + // do at the time of writing. + this.end(); + cb(err); + } +}); +duplex.on('finished', common.mustNotCall()); +pipeline(readable, duplex, common.mustCall(function (err) { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); +})); // Write one chunk of data, and destroy the stream later. +// That should trigger the pipeline destruction. + +readable.push('foo'); +setImmediate(function () { + readable.destroy(); +}); +; + +require('tap').pass('sync run'); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 160cdc4ed9..565dbe8425 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -11,8 +11,6 @@ var bufferShim = require('safe-buffer').Buffer; var common = require('../common'); -if (!common.hasCrypto) common.skip('missing crypto'); - var _require = require('../../'), Stream = _require.Stream, Writable = _require.Writable, @@ -24,14 +22,6 @@ var assert = require('assert/'); var http = require('http'); -var http2 = { - createServer: function createServer() { - return { - listen: function listen() {} - }; - } -}; - var promisify = require('util-promisify'); { @@ -281,34 +271,6 @@ var promisify = require('util-promisify'); }); }); } -{ - var _server4 = http2.createServer(function (req, res) { - pipeline(req, res, common.mustCall()); - }); - - _server4.listen(0, function () { - var url = "http://localhost:".concat(_server4.address().port); - var client = http2.connect(url); - var req = client.request({ - ':method': 'POST' - }); - var rs = new Readable({ - read: function read() { - rs.push('hello'); - } - }); - pipeline(rs, req, common.mustCall(function (err) { - _server4.close(); - - client.close(); - })); - var cnt = 10; - req.on('data', function (data) { - cnt--; - if (cnt === 0) rs.destroy(); - }); - }); -} { var makeTransform = function makeTransform() { var tr = new Transform({ diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index cc28b09a17..6ef4b5d867 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -32,7 +32,7 @@ function _tests() { var rs = new Readable({}); assert.strictEqual(Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), AsyncIteratorPrototype); } - yield _asyncToGenerator(function* () { + { var readable = new Readable({ objectMode: true, read: function read() {} @@ -66,20 +66,23 @@ function _tests() { } } } - })(); - yield _asyncToGenerator(function* () { + } + { console.log('read without for..await'); var max = 5; - var readable = new Readable({ + + var _readable = new Readable({ objectMode: true, read: function read() {} }); - var iter = readable[Symbol.asyncIterator](); - assert.strictEqual(iter.stream, readable); + + var _iter = _readable[Symbol.asyncIterator](); + + assert.strictEqual(_iter.stream, _readable); var values = []; for (var i = 0; i < max; i++) { - values.push(iter.next()); + values.push(_iter.next()); } Promise.all(values).then(common.mustCall(function (values) { @@ -87,96 +90,127 @@ function _tests() { return assert.strictEqual(item.value, 'hello-' + i); }, 5)); })); - readable.push('hello-0'); - readable.push('hello-1'); - readable.push('hello-2'); - readable.push('hello-3'); - readable.push('hello-4'); - readable.push(null); - var last = yield iter.next(); + + _readable.push('hello-0'); + + _readable.push('hello-1'); + + _readable.push('hello-2'); + + _readable.push('hello-3'); + + _readable.push('hello-4'); + + _readable.push(null); + + var last = yield _iter.next(); assert.strictEqual(last.done, true); - })(); - yield _asyncToGenerator(function* () { + } + { console.log('read without for..await deferred'); - var readable = new Readable({ + + var _readable2 = new Readable({ objectMode: true, read: function read() {} }); - var iter = readable[Symbol.asyncIterator](); - assert.strictEqual(iter.stream, readable); - var values = []; - for (var i = 0; i < 3; i++) { - values.push(iter.next()); + var _iter2 = _readable2[Symbol.asyncIterator](); + + assert.strictEqual(_iter2.stream, _readable2); + var _values = []; + + for (var _i = 0; _i < 3; _i++) { + _values.push(_iter2.next()); } - readable.push('hello-0'); - readable.push('hello-1'); - readable.push('hello-2'); + _readable2.push('hello-0'); + + _readable2.push('hello-1'); + + _readable2.push('hello-2'); + var k = 0; - var results1 = yield Promise.all(values); + var results1 = yield Promise.all(_values); results1.forEach(common.mustCall(function (item) { return assert.strictEqual(item.value, 'hello-' + k++); }, 3)); - values = []; + _values = []; - for (var _i = 0; _i < 2; _i++) { - values.push(iter.next()); + for (var _i2 = 0; _i2 < 2; _i2++) { + _values.push(_iter2.next()); } - readable.push('hello-3'); - readable.push('hello-4'); - readable.push(null); - var results2 = yield Promise.all(values); + _readable2.push('hello-3'); + + _readable2.push('hello-4'); + + _readable2.push(null); + + var results2 = yield Promise.all(_values); results2.forEach(common.mustCall(function (item) { return assert.strictEqual(item.value, 'hello-' + k++); }, 2)); - var last = yield iter.next(); - assert.strictEqual(last.done, true); - })(); - yield _asyncToGenerator(function* () { + + var _last = yield _iter2.next(); + + assert.strictEqual(_last.done, true); + } + { console.log('read without for..await with errors'); - var max = 3; - var readable = new Readable({ + var _max = 3; + + var _readable3 = new Readable({ objectMode: true, read: function read() {} }); - var iter = readable[Symbol.asyncIterator](); - assert.strictEqual(iter.stream, readable); - var values = []; + + var _iter3 = _readable3[Symbol.asyncIterator](); + + assert.strictEqual(_iter3.stream, _readable3); + var _values2 = []; var errors = []; - var i; - for (i = 0; i < max; i++) { - values.push(iter.next()); + var _i3; + + for (_i3 = 0; _i3 < _max; _i3++) { + _values2.push(_iter3.next()); } - for (i = 0; i < 2; i++) { - errors.push(iter.next()); + for (_i3 = 0; _i3 < 2; _i3++) { + errors.push(_iter3.next()); } - readable.push('hello-0'); - readable.push('hello-1'); - readable.push('hello-2'); - var resolved = yield Promise.all(values); + _readable3.push('hello-0'); + + _readable3.push('hello-1'); + + _readable3.push('hello-2'); + + var resolved = yield Promise.all(_values2); resolved.forEach(common.mustCall(function (item, i) { return assert.strictEqual(item.value, 'hello-' + i); - }, max)); + }, _max)); errors.forEach(function (promise) { promise.catch(common.mustCall(function (err) { assert.strictEqual(err.message, 'kaboom'); })); }); - readable.destroy(new Error('kaboom')); - })(); - yield _asyncToGenerator(function* () { + + _readable3.destroy(new Error('kaboom')); + } + { console.log('call next() after error'); - var readable = new Readable({ + + var _readable4 = new Readable({ read: function read() {} }); - var iterator = readable[Symbol.asyncIterator](); + + var iterator = _readable4[Symbol.asyncIterator](); + var err = new Error('kaboom'); - readable.destroy(new Error('kaboom')); + + _readable4.destroy(new Error('kaboom')); + yield function (f, e) { var success = false; f().then(function () { @@ -190,32 +224,34 @@ function _tests() { assert.strictEqual(e.message, e2.message); }); }(iterator.next.bind(iterator), err); - })(); - yield _asyncToGenerator(function* () { + } + { console.log('read object mode'); - var max = 42; + var _max2 = 42; var readed = 0; var received = 0; - var readable = new Readable({ + + var _readable5 = new Readable({ objectMode: true, read: function read() { this.push('hello'); - if (++readed === max) { + if (++readed === _max2) { this.push(null); } } }); + var _iteratorNormalCompletion2 = true; var _didIteratorError2 = false; var _iteratorError2; try { - for (var _iterator2 = _asyncIterator(readable), _step2, _value2; _step2 = yield _iterator2.next(), _iteratorNormalCompletion2 = _step2.done, _value2 = yield _step2.value, !_iteratorNormalCompletion2; _iteratorNormalCompletion2 = true) { - var k = _value2; + for (var _iterator2 = _asyncIterator(_readable5), _step2, _value2; _step2 = yield _iterator2.next(), _iteratorNormalCompletion2 = _step2.done, _value2 = yield _step2.value, !_iteratorNormalCompletion2; _iteratorNormalCompletion2 = true) { + var _k = _value2; received++; - assert.strictEqual(k, 'hello'); + assert.strictEqual(_k, 'hello'); } } catch (err) { _didIteratorError2 = true; @@ -233,16 +269,18 @@ function _tests() { } assert.strictEqual(readed, received); - })(); - yield _asyncToGenerator(function* () { + } + { console.log('destroy sync'); - var readable = new Readable({ + + var _readable6 = new Readable({ objectMode: true, read: function read() { this.destroy(new Error('kaboom from read')); } }); - var err; + + var _err; try { // eslint-disable-next-line no-unused-vars @@ -252,8 +290,8 @@ function _tests() { var _iteratorError3; try { - for (var _iterator3 = _asyncIterator(readable), _step3, _value3; _step3 = yield _iterator3.next(), _iteratorNormalCompletion3 = _step3.done, _value3 = yield _step3.value, !_iteratorNormalCompletion3; _iteratorNormalCompletion3 = true) { - var k = _value3; + for (var _iterator3 = _asyncIterator(_readable6), _step3, _value3; _step3 = yield _iterator3.next(), _iteratorNormalCompletion3 = _step3.done, _value3 = yield _step3.value, !_iteratorNormalCompletion3; _iteratorNormalCompletion3 = true) { + var _k2 = _value3; } } catch (err) { _didIteratorError3 = true; @@ -270,14 +308,15 @@ function _tests() { } } } catch (e) { - err = e; + _err = e; } - assert.strictEqual(err.message, 'kaboom from read'); - })(); - yield _asyncToGenerator(function* () { + assert.strictEqual(_err.message, 'kaboom from read'); + } + { console.log('destroy async'); - var readable = new Readable({ + + var _readable7 = new Readable({ objectMode: true, read: function read() { var _this = this; @@ -291,8 +330,9 @@ function _tests() { } } }); - var received = 0; - var err = null; + + var _received = 0; + var _err2 = null; try { // eslint-disable-next-line no-unused-vars @@ -302,9 +342,9 @@ function _tests() { var _iteratorError4; try { - for (var _iterator4 = _asyncIterator(readable), _step4, _value4; _step4 = yield _iterator4.next(), _iteratorNormalCompletion4 = _step4.done, _value4 = yield _step4.value, !_iteratorNormalCompletion4; _iteratorNormalCompletion4 = true) { - var k = _value4; - received++; + for (var _iterator4 = _asyncIterator(_readable7), _step4, _value4; _step4 = yield _iterator4.next(), _iteratorNormalCompletion4 = _step4.done, _value4 = yield _step4.value, !_iteratorNormalCompletion4; _iteratorNormalCompletion4 = true) { + var _k3 = _value4; + _received++; } } catch (err) { _didIteratorError4 = true; @@ -321,21 +361,23 @@ function _tests() { } } } catch (e) { - err = e; + _err2 = e; } - assert.strictEqual(err.message, 'kaboom'); - assert.strictEqual(received, 1); - })(); - yield _asyncToGenerator(function* () { + assert.strictEqual(_err2.message, 'kaboom'); + assert.strictEqual(_received, 1); + } + { console.log('destroyed by throw'); - var readable = new Readable({ + + var _readable8 = new Readable({ objectMode: true, read: function read() { this.push('hello'); } }); - var err = null; + + var _err3 = null; try { var _iteratorNormalCompletion5 = true; @@ -344,9 +386,9 @@ function _tests() { var _iteratorError5; try { - for (var _iterator5 = _asyncIterator(readable), _step5, _value5; _step5 = yield _iterator5.next(), _iteratorNormalCompletion5 = _step5.done, _value5 = yield _step5.value, !_iteratorNormalCompletion5; _iteratorNormalCompletion5 = true) { - var k = _value5; - assert.strictEqual(k, 'hello'); + for (var _iterator5 = _asyncIterator(_readable8), _step5, _value5; _step5 = yield _iterator5.next(), _iteratorNormalCompletion5 = _step5.done, _value5 = yield _step5.value, !_iteratorNormalCompletion5; _iteratorNormalCompletion5 = true) { + var _k4 = _value5; + assert.strictEqual(_k4, 'hello'); throw new Error('kaboom'); } } catch (err) { @@ -364,23 +406,25 @@ function _tests() { } } } catch (e) { - err = e; + _err3 = e; } - assert.strictEqual(err.message, 'kaboom'); - assert.strictEqual(readable.destroyed, true); - })(); - yield _asyncToGenerator(function* () { + assert.strictEqual(_err3.message, 'kaboom'); + assert.strictEqual(_readable8.destroyed, true); + } + { console.log('destroyed sync after push'); - var readable = new Readable({ + + var _readable9 = new Readable({ objectMode: true, read: function read() { this.push('hello'); this.destroy(new Error('kaboom')); } }); - var received = 0; - var err = null; + + var _received2 = 0; + var _err4 = null; try { var _iteratorNormalCompletion6 = true; @@ -389,10 +433,10 @@ function _tests() { var _iteratorError6; try { - for (var _iterator6 = _asyncIterator(readable), _step6, _value6; _step6 = yield _iterator6.next(), _iteratorNormalCompletion6 = _step6.done, _value6 = yield _step6.value, !_iteratorNormalCompletion6; _iteratorNormalCompletion6 = true) { - var k = _value6; - assert.strictEqual(k, 'hello'); - received++; + for (var _iterator6 = _asyncIterator(_readable9), _step6, _value6; _step6 = yield _iterator6.next(), _iteratorNormalCompletion6 = _step6.done, _value6 = yield _step6.value, !_iteratorNormalCompletion6; _iteratorNormalCompletion6 = true) { + var _k5 = _value6; + assert.strictEqual(_k5, 'hello'); + _received2++; } } catch (err) { _didIteratorError6 = true; @@ -409,18 +453,19 @@ function _tests() { } } } catch (e) { - err = e; + _err4 = e; } - assert.strictEqual(err.message, 'kaboom'); - assert.strictEqual(received, 1); - })(); - yield _asyncToGenerator(function* () { + assert.strictEqual(_err4.message, 'kaboom'); + assert.strictEqual(_received2, 1); + } + { console.log('push async'); - var max = 42; - var readed = 0; - var received = 0; - var readable = new Readable({ + var _max3 = 42; + var _readed = 0; + var _received3 = 0; + + var _readable10 = new Readable({ objectMode: true, read: function read() { var _this2 = this; @@ -428,22 +473,23 @@ function _tests() { setImmediate(function () { _this2.push('hello'); - if (++readed === max) { + if (++_readed === _max3) { _this2.push(null); } }); } }); + var _iteratorNormalCompletion7 = true; var _didIteratorError7 = false; var _iteratorError7; try { - for (var _iterator7 = _asyncIterator(readable), _step7, _value7; _step7 = yield _iterator7.next(), _iteratorNormalCompletion7 = _step7.done, _value7 = yield _step7.value, !_iteratorNormalCompletion7; _iteratorNormalCompletion7 = true) { - var k = _value7; - received++; - assert.strictEqual(k, 'hello'); + for (var _iterator7 = _asyncIterator(_readable10), _step7, _value7; _step7 = yield _iterator7.next(), _iteratorNormalCompletion7 = _step7.done, _value7 = yield _step7.value, !_iteratorNormalCompletion7; _iteratorNormalCompletion7 = true) { + var _k6 = _value7; + _received3++; + assert.strictEqual(_k6, 'hello'); } } catch (err) { _didIteratorError7 = true; @@ -460,31 +506,37 @@ function _tests() { } } - assert.strictEqual(readed, received); - })(); - yield _asyncToGenerator(function* () { + assert.strictEqual(_readed, _received3); + } + { console.log('push binary async'); - var max = 42; - var readed = 0; - var readable = new Readable({ + var _max4 = 42; + var _readed2 = 0; + + var _readable11 = new Readable({ read: function read() { var _this3 = this; setImmediate(function () { _this3.push('hello'); - if (++readed === max) { + if (++_readed2 === _max4) { _this3.push(null); } }); } }); + var expected = ''; - readable.setEncoding('utf8'); - readable.pause(); - readable.on('data', function (chunk) { + + _readable11.setEncoding('utf8'); + + _readable11.pause(); + + _readable11.on('data', function (chunk) { expected += chunk; }); + var data = ''; var _iteratorNormalCompletion8 = true; var _didIteratorError8 = false; @@ -492,9 +544,9 @@ function _tests() { var _iteratorError8; try { - for (var _iterator8 = _asyncIterator(readable), _step8, _value8; _step8 = yield _iterator8.next(), _iteratorNormalCompletion8 = _step8.done, _value8 = yield _step8.value, !_iteratorNormalCompletion8; _iteratorNormalCompletion8 = true) { - var k = _value8; - data += k; + for (var _iterator8 = _asyncIterator(_readable11), _step8, _value8; _step8 = yield _iterator8.next(), _iteratorNormalCompletion8 = _step8.done, _value8 = yield _step8.value, !_iteratorNormalCompletion8; _iteratorNormalCompletion8 = true) { + var _k7 = _value8; + data += _k7; } } catch (err) { _didIteratorError8 = true; @@ -512,40 +564,47 @@ function _tests() { } assert.strictEqual(data, expected); - })(); - yield _asyncToGenerator(function* () { + } + { console.log('.next() on destroyed stream'); - var readable = new Readable({ + + var _readable12 = new Readable({ read: function read() {// no-op } }); - readable.destroy(); - var _ref14 = yield readable[Symbol.asyncIterator]().next(), - done = _ref14.done; + _readable12.destroy(); + + var _ref = yield _readable12[Symbol.asyncIterator]().next(), + done = _ref.done; assert.strictEqual(done, true); - })(); - yield _asyncToGenerator(function* () { + } + { console.log('.next() on pipelined stream'); - var readable = new Readable({ + + var _readable13 = new Readable({ read: function read() {// no-op } }); + var passthrough = new PassThrough(); - var err = new Error('kaboom'); - pipeline(readable, passthrough, common.mustCall(function (e) { - assert.strictEqual(e, err); + + var _err5 = new Error('kaboom'); + + pipeline(_readable13, passthrough, common.mustCall(function (e) { + assert.strictEqual(e, _err5); })); - readable.destroy(err); + + _readable13.destroy(_err5); try { - yield readable[Symbol.asyncIterator]().next(); + yield _readable13[Symbol.asyncIterator]().next(); } catch (e) { - assert.strictEqual(e, err); + assert.strictEqual(e, _err5); } - })(); - yield _asyncToGenerator(function* () { + } + { console.log('iterating on an ended stream completes'); var r = new Readable({ objectMode: true, @@ -604,10 +663,11 @@ function _tests() { } } } - })(); - yield _asyncToGenerator(function* () { + } + { console.log('destroy mid-stream does not error'); - var r = new Readable({ + + var _r = new Readable({ objectMode: true, read: function read() { this.push('asdf'); @@ -615,15 +675,17 @@ function _tests() { } }); // eslint-disable-next-line no-unused-vars + var _iteratorNormalCompletion11 = true; var _didIteratorError11 = false; var _iteratorError11; try { - for (var _iterator11 = _asyncIterator(r), _step11, _value11; _step11 = yield _iterator11.next(), _iteratorNormalCompletion11 = _step11.done, _value11 = yield _step11.value, !_iteratorNormalCompletion11; _iteratorNormalCompletion11 = true) { - var a = _value11; - r.destroy(null); + for (var _iterator11 = _asyncIterator(_r), _step11, _value11; _step11 = yield _iterator11.next(), _iteratorNormalCompletion11 = _step11.done, _value11 = yield _step11.value, !_iteratorNormalCompletion11; _iteratorNormalCompletion11 = true) { + var _a = _value11; + + _r.destroy(null); } } catch (err) { _didIteratorError11 = true; @@ -639,7 +701,97 @@ function _tests() { } } } - })(); + } + { + console.log('all next promises must be resolved on end'); + + var _r2 = new Readable({ + objectMode: true, + read: function read() {} + }); + + var _b = _r2[Symbol.asyncIterator](); + + var c = _b.next(); + + var _d = _b.next(); + + _r2.push(null); + + assert.deepStrictEqual((yield c), { + done: true, + value: undefined + }); + assert.deepStrictEqual((yield _d), { + done: true, + value: undefined + }); + } + { + console.log('all next promises must be resolved on destroy'); + + var _r3 = new Readable({ + objectMode: true, + read: function read() {} + }); + + var _b2 = _r3[Symbol.asyncIterator](); + + var _c = _b2.next(); + + var _d2 = _b2.next(); + + _r3.destroy(); + + assert.deepStrictEqual((yield _c), { + done: true, + value: undefined + }); + assert.deepStrictEqual((yield _d2), { + done: true, + value: undefined + }); + } + { + console.log('all next promises must be resolved on destroy with error'); + + var _r4 = new Readable({ + objectMode: true, + read: function read() {} + }); + + var _b3 = _r4[Symbol.asyncIterator](); + + var _c2 = _b3.next(); + + var _d3 = _b3.next(); + + var _err6 = new Error('kaboom'); + + _r4.destroy(_err6); + + yield Promise.all([_asyncToGenerator(function* () { + var e; + + try { + yield _c2; + } catch (_e) { + e = _e; + } + + assert.strictEqual(e, _err6); + })(), _asyncToGenerator(function* () { + var e; + + try { + yield _d3; + } catch (_e) { + e = _e; + } + + assert.strictEqual(e, _err6); + })()]); + } }); return _tests.apply(this, arguments); } diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 590fb21365..d375c0747d 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -195,7 +195,7 @@ var _require2 = require('util'), _read8.on('close', common.mustCall()); _read8.destroy(_expected4, common.mustCall(function (err) { - assert.strictEqual(_expected4, err); + assert.strictEqual(err, _expected4); })); } { diff --git a/test/parallel/test-stream-readable-hwm-0-async.js b/test/parallel/test-stream-readable-hwm-0-async.js new file mode 100644 index 0000000000..553ce6f755 --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-async.js @@ -0,0 +1,40 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +var common = require('../common'); // This test ensures that Readable stream will continue to call _read +// for streams with highWaterMark === 0 once the stream returns data +// by calling push() asynchronously. + + +var _require = require('../../'), + Readable = _require.Readable; + +var count = 5; +var r = new Readable({ + // Called 6 times: First 5 return data, last one signals end of stream. + read: common.mustCall(function () { + process.nextTick(common.mustCall(function () { + if (count--) r.push('a');else r.push(null); + })); + }, 6), + highWaterMark: 0 +}); +r.on('end', common.mustCall()); +r.on('data', common.mustCall(5)); +; + +require('tap').pass('sync run'); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file diff --git a/test/parallel/test-stream-readable-hwm-0-no-flow-data.js b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js new file mode 100644 index 0000000000..93864ae89a --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js @@ -0,0 +1,102 @@ +"use strict"; + +/**/ +var bufferShim = require('safe-buffer').Buffer; +/**/ + + +var common = require('../common'); // Ensure that subscribing the 'data' event will not make the stream flow. +// The 'data' event will require calling read() by hand. +// +// The test is written for the (somewhat rare) highWaterMark: 0 streams to +// specifically catch any regressions that might occur with these streams. + + +var assert = require('assert/'); + +var _require = require('../../'), + Readable = _require.Readable; + +var streamData = ['a', null]; // Track the calls so we can assert their order later. + +var calls = []; +var r = new Readable({ + read: common.mustCall(function () { + calls.push('_read:' + streamData[0]); + process.nextTick(function () { + calls.push('push:' + streamData[0]); + r.push(streamData.shift()); + }); + }, streamData.length), + highWaterMark: 0, + // Object mode is used here just for testing convenience. It really + // shouldn't affect the order of events. Just the data and its format. + objectMode: true +}); +assert.strictEqual(r.readableFlowing, null); +r.on('readable', common.mustCall(function () { + calls.push('readable'); +}, 2)); +assert.strictEqual(r.readableFlowing, false); +r.on('data', common.mustCall(function (data) { + calls.push('data:' + data); +}, 1)); +r.on('end', common.mustCall(function () { + calls.push('end'); +})); +assert.strictEqual(r.readableFlowing, false); // The stream emits the events asynchronously but that's not guaranteed to +// happen on the next tick (especially since the _read implementation above +// uses process.nextTick). +// +// We use setImmediate here to give the stream enough time to emit all the +// events it's about to emit. + +setImmediate(function () { + // Only the _read, push, readable calls have happened. No data must be + // emitted yet. + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable']); // Calling 'r.read()' should trigger the data event. + + assert.strictEqual(r.read(), 'a'); + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable', 'data:a']); // The next 'read()' will return null because hwm: 0 does not buffer any + // data and the _read implementation above does the push() asynchronously. + // + // Note: This 'null' signals "no data available". It isn't the end-of-stream + // null value as the stream doesn't know yet that it is about to reach the + // end. + // + // Using setImmediate again to give the stream enough time to emit all the + // events it wants to emit. + + assert.strictEqual(r.read(), null); + setImmediate(function () { + // There's a new 'readable' event after the data has been pushed. + // The 'end' event will be emitted only after a 'read()'. + // + // This is somewhat special for the case where the '_read' implementation + // calls 'push' asynchronously. If 'push' was synchronous, the 'end' event + // would be emitted here _before_ we call read(). + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable', 'data:a', '_read:null', 'push:null', 'readable']); + assert.strictEqual(r.read(), null); // While it isn't really specified whether the 'end' event should happen + // synchronously with read() or not, we'll assert the current behavior + // ('end' event happening on the next tick after read()) so any changes + // to it are noted and acknowledged in the future. + + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable', 'data:a', '_read:null', 'push:null', 'readable']); + process.nextTick(function () { + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable', 'data:a', '_read:null', 'push:null', 'readable', 'end']); + }); + }); +}); +; + +require('tap').pass('sync run'); + +var _list = process.listeners('uncaughtException'); + +process.removeAllListeners('uncaughtException'); + +_list.pop(); + +_list.forEach(function (e) { + return process.on('uncaughtException', e); +}); \ No newline at end of file diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 0d7e85b693..94beb839b9 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -226,7 +226,7 @@ var _require2 = require('util'), var _expected4 = new Error('kaboom'); _write9.destroy(_expected4, common.mustCall(function (err) { - assert.strictEqual(_expected4, err); + assert.strictEqual(err, _expected4); })); } { diff --git a/test/parallel/test-stream-writable-null.js b/test/parallel/test-stream-writable-null.js index d8e8f7b59b..da64b5cf7e 100644 --- a/test/parallel/test-stream-writable-null.js +++ b/test/parallel/test-stream-writable-null.js @@ -18,8 +18,8 @@ var MyWritable = function (_stream$Writable) { _inheritsLoose(MyWritable, _stream$Writable); - function MyWritable(opt) { - return _stream$Writable.call(this, opt) || this; + function MyWritable() { + return _stream$Writable.apply(this, arguments) || this; } var _proto = MyWritable.prototype; diff --git a/test/parallel/test-stream-writev.js b/test/parallel/test-stream-writev.js index 3e92d17e61..e0ee22553e 100644 --- a/test/parallel/test-stream-writev.js +++ b/test/parallel/test-stream-writev.js @@ -125,7 +125,7 @@ function test(decode, uncork, multi, next) { w.on('finish', function () { // make sure finish comes after all the write cb cnt('finish')(); - assert.deepStrictEqual(expectChunks, actualChunks); + assert.deepStrictEqual(actualChunks, expectChunks); next(); }); }