From 7be3f87eb941b71e380e51ea17a5c942cf8fe3dd Mon Sep 17 00:00:00 2001 From: cjihrig Date: Tue, 12 Mar 2019 16:02:59 -0400 Subject: [PATCH] Revert "stream: make sure 'readable' is emitted before ending the stream" This reverts commit e95e7f9af540c88e93701ff6ca2f38986deb8e99. --- lib/_stream_readable.js | 18 ++- lib/_stream_transform.js | 3 + .../parallel/test-http-readable-data-event.js | 4 +- ...eam-readable-emit-readable-short-stream.js | 146 ------------------ .../test-stream-readable-emittedReadable.js | 13 +- .../test-stream-readable-needReadable.js | 10 +- ...est-stream-readable-reading-readingMore.js | 10 +- .../test-stream2-httpclient-response-end.js | 7 +- test/parallel/test-stream2-transform.js | 13 +- 9 files changed, 33 insertions(+), 191 deletions(-) delete mode 100644 test/parallel/test-stream-readable-emit-readable-short-stream.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 968c7b258cc89a..3753c4c7b64cd6 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -512,12 +512,20 @@ function onEofChunk(stream, state) { } } state.ended = true; - state.needReadable = false; - // We are not protecting if emittedReadable = true, - // so 'readable' gets scheduled anyway. - state.emittedReadable = true; - process.nextTick(emitReadable_, stream); + if (state.sync) { + // If we are sync, wait until next tick to emit the data. + // Otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call + emitReadable(stream); + } else { + // Emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); + } + } } // Don't emit readable right away in sync mode, because this can trigger diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 8745c713413035..9d8da0c5473611 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -116,6 +116,9 @@ function Transform(options) { writeencoding: null }; + // Start out asking for a readable event once data is transformed. + this._readableState.needReadable = true; + // We have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. diff --git a/test/parallel/test-http-readable-data-event.js b/test/parallel/test-http-readable-data-event.js index 21b1fa65c661c8..ddaeca5fe7103c 100644 --- a/test/parallel/test-http-readable-data-event.js +++ b/test/parallel/test-http-readable-data-event.js @@ -29,7 +29,7 @@ const server = http.createServer((req, res) => { }; const expectedData = [helloWorld, helloAgainLater]; - const expectedRead = [helloWorld, null, helloAgainLater, null, null]; + const expectedRead = [helloWorld, null, helloAgainLater, null]; const req = http.request(opts, (res) => { res.on('error', common.mustNotCall()); @@ -42,7 +42,7 @@ const server = http.createServer((req, res) => { assert.strictEqual(data, expectedRead.shift()); next(); } while (data !== null); - }, 3)); + }, 2)); res.setEncoding('utf8'); res.on('data', common.mustCall((data) => { diff --git a/test/parallel/test-stream-readable-emit-readable-short-stream.js b/test/parallel/test-stream-readable-emit-readable-short-stream.js deleted file mode 100644 index 2f4f43baf5a848..00000000000000 --- a/test/parallel/test-stream-readable-emit-readable-short-stream.js +++ /dev/null @@ -1,146 +0,0 @@ -'use strict'; - -const common = require('../common'); -const stream = require('stream'); -const assert = require('assert'); - -{ - const r = new stream.Readable({ - read: common.mustCall(function() { - this.push('content'); - this.push(null); - }) - }); - - const t = new stream.Transform({ - transform: common.mustCall(function(chunk, encoding, callback) { - this.push(chunk); - return callback(); - }), - flush: common.mustCall(function(callback) { - return callback(); - }) - }); - - r.pipe(t); - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); -} - -{ - const t = new stream.Transform({ - transform: common.mustCall(function(chunk, encoding, callback) { - this.push(chunk); - return callback(); - }), - flush: common.mustCall(function(callback) { - return callback(); - }) - }); - - t.end('content'); - - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); -} - -{ - const t = new stream.Transform({ - transform: common.mustCall(function(chunk, encoding, callback) { - this.push(chunk); - return callback(); - }), - flush: common.mustCall(function(callback) { - return callback(); - }) - }); - - t.write('content'); - t.end(); - - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); -} - -{ - const t = new stream.Readable({ - read() { - } - }); - - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); - - t.push('content'); - t.push(null); -} - -{ - const t = new stream.Readable({ - read() { - } - }); - - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); - - process.nextTick(() => { - t.push('content'); - t.push(null); - }); -} - -{ - const t = new stream.Transform({ - transform: common.mustCall(function(chunk, encoding, callback) { - this.push(chunk); - return callback(); - }), - flush: common.mustCall(function(callback) { - return callback(); - }) - }); - - t.on('readable', common.mustCall(function() { - while (true) { - const chunk = t.read(); - if (!chunk) - break; - assert.strictEqual(chunk.toString(), 'content'); - } - }, 2)); - - t.write('content'); - t.end(); -} diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index 6580c36303e11e..ba613f9e9ff19d 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -43,23 +43,12 @@ const noRead = new Readable({ read: () => {} }); -noRead.once('readable', common.mustCall(() => { +noRead.on('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(noRead._readableState.emittedReadable, true); noRead.read(0); // emittedReadable is not reset during read(0) assert.strictEqual(noRead._readableState.emittedReadable, true); - - noRead.on('readable', common.mustCall(() => { - // The second 'readable' is emitted because we are ending - - // emittedReadable should be true when the readable event is emitted - assert.strictEqual(noRead._readableState.emittedReadable, false); - noRead.read(0); - // emittedReadable is not reset during read(0) - assert.strictEqual(noRead._readableState.emittedReadable, false); - - })); })); noRead.push('foo'); diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index 54618b5e8ab14c..7058e123f07823 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => { // When the readable event fires, needReadable is reset. assert.strictEqual(readable._readableState.needReadable, false); readable.read(); -}, 2)); +})); // If a readable listener is attached, then a readable event is needed. assert.strictEqual(readable._readableState.needReadable, true); @@ -74,14 +74,12 @@ const slowProducer = new Readable({ }); slowProducer.on('readable', common.mustCall(() => { - const chunk = slowProducer.read(8); - const state = slowProducer._readableState; - if (chunk === null) { + if (slowProducer.read(8) === null) { // The buffer doesn't have enough data, and the stream is not need, // we need to notify the reader when data arrives. - assert.strictEqual(state.needReadable, true); + assert.strictEqual(slowProducer._readableState.needReadable, true); } else { - assert.strictEqual(state.needReadable, false); + assert.strictEqual(slowProducer._readableState.needReadable, false); } }, 4)); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index e72159d1c9be94..f5643205da0596 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -31,7 +31,7 @@ const Readable = require('stream').Readable; assert.strictEqual(state.reading, false); } - const expectedReadingMore = [true, false, false]; + const expectedReadingMore = [true, false]; readable.on('readable', common.mustCall(() => { // There is only one readingMore scheduled from on('data'), // after which everything is governed by the .read() call @@ -40,12 +40,10 @@ const Readable = require('stream').Readable; // If the stream has ended, we shouldn't be reading assert.strictEqual(state.ended, !state.reading); - // consume all the data - while (readable.read() !== null) {} - - if (expectedReadingMore.length === 0) // reached end of stream + const data = readable.read(); + if (data === null) // reached end of stream process.nextTick(common.mustCall(onStreamEnd, 1)); - }, 3)); + }, 2)); readable.on('end', common.mustCall(onStreamEnd)); readable.push('pushed'); diff --git a/test/parallel/test-stream2-httpclient-response-end.js b/test/parallel/test-stream2-httpclient-response-end.js index 8b2920668cd703..27d31e50a96a7e 100644 --- a/test/parallel/test-stream2-httpclient-response-end.js +++ b/test/parallel/test-stream2-httpclient-response-end.js @@ -11,11 +11,8 @@ const server = http.createServer(function(req, res) { let data = ''; res.on('readable', common.mustCall(function() { console.log('readable event'); - let chunk; - while ((chunk = res.read()) !== null) { - data += chunk; - } - }, 2)); + data += res.read(); + })); res.on('end', common.mustCall(function() { console.log('end event'); assert.strictEqual(msg, data); diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 2590d5192fe103..cd6deaabf1147e 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -321,16 +321,11 @@ const Transform = require('_stream_transform'); pt.end(); - // The next readable is emitted on the next tick. - assert.strictEqual(emits, 0); - - process.on('nextTick', function() { - assert.strictEqual(emits, 1); - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); + assert.strictEqual(emits, 1); + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); - assert.strictEqual(emits, 1); - }); + assert.strictEqual(emits, 1); } {