From 203e607201fa9567d3e38278b036645d8bd0f811 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 7 Feb 2018 03:30:43 +0100 Subject: [PATCH 1/2] Revert "stream: defer readable and flow when sync" This reverts commit 563fff2938110065589a3dc1b34692670206e4e4 as it was causing failures in CITGM with `dicer`. Refs: https://github.com/nodejs/node/pull/18515 --- lib/_stream_readable.js | 17 ++--- test/parallel/test-stream-pipe-flow.js | 67 ------------------- .../test-stream-readable-pause-and-resume.js | 40 ----------- 3 files changed, 5 insertions(+), 119 deletions(-) delete mode 100644 test/parallel/test-stream-pipe-flow.js delete mode 100644 test/parallel/test-stream-readable-pause-and-resume.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c7b356ed4421b0..638a753cbcdf43 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -488,18 +488,11 @@ function onEofChunk(stream, state) { } state.ended = true; - if (state.sync && state.length) { - // if we are sync and have data in the buffer, wait until next tick - // to emit the data. otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call - emitReadable(stream); - } else { - // emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - emitReadable_(stream); - } + // emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); } } diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js deleted file mode 100644 index 1f8564182a3107..00000000000000 --- a/test/parallel/test-stream-pipe-flow.js +++ /dev/null @@ -1,67 +0,0 @@ -'use strict'; -const common = require('../common'); -const { Readable, Writable, PassThrough } = require('stream'); - -{ - let ticks = 17; - - const rs = new Readable({ - objectMode: true, - read: () => { - if (ticks-- > 0) - return process.nextTick(() => rs.push({})); - rs.push({}); - rs.push(null); - } - }); - - const ws = new Writable({ - highWaterMark: 0, - objectMode: true, - write: (data, end, cb) => setImmediate(cb) - }); - - rs.on('end', common.mustCall()); - ws.on('finish', common.mustCall()); - rs.pipe(ws); -} - -{ - let missing = 8; - - const rs = new Readable({ - objectMode: true, - read: () => { - if (missing--) rs.push({}); - else rs.push(null); - } - }); - - const pt = rs - .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) - .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); - - pt.on('end', function() { - wrapper.push(null); - }); - - const wrapper = new Readable({ - objectMode: true, - read: () => { - process.nextTick(function() { - let data = pt.read(); - if (data === null) { - pt.once('readable', function() { - data = pt.read(); - if (data !== null) wrapper.push(data); - }); - } else { - wrapper.push(data); - } - }); - } - }); - - wrapper.resume(); - wrapper.on('end', common.mustCall()); -} diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js deleted file mode 100644 index 505327e247da38..00000000000000 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict'; - -const { Readable } = require('stream'); -const common = require('../common'); - -let ticks = 18; -let expectedData = 19; - -const rs = new Readable({ - objectMode: true, - read: () => { - if (ticks-- > 0) - return process.nextTick(() => rs.push({})); - rs.push({}); - rs.push(null); - } -}); - -rs.on('end', common.mustCall()); -readAndPause(); - -function readAndPause() { - // Does a on(data) -> pause -> wait -> resume -> on(data) ... loop. - // Expects on(data) to never fire if the stream is paused. - const ondata = common.mustCall((data) => { - rs.pause(); - - expectedData--; - if (expectedData <= 0) - return; - - setImmediate(function() { - rs.removeListener('data', ondata); - readAndPause(); - rs.resume(); - }); - }, 1); // only call ondata once - - rs.on('data', ondata); -} From 1779c7d68d167ee5eaecca48059d7852d06c9ff0 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Sat, 3 Feb 2018 10:29:43 +0100 Subject: [PATCH 2/2] test: keep test/parallel/test-stream-pipe-flow Refs: https://github.com/nodejs/node/pull/18515 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- test/parallel/test-stream-pipe-flow.js | 67 ++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/parallel/test-stream-pipe-flow.js diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js new file mode 100644 index 00000000000000..1f8564182a3107 --- /dev/null +++ b/test/parallel/test-stream-pipe-flow.js @@ -0,0 +1,67 @@ +'use strict'; +const common = require('../common'); +const { Readable, Writable, PassThrough } = require('stream'); + +{ + let ticks = 17; + + const rs = new Readable({ + objectMode: true, + read: () => { + if (ticks-- > 0) + return process.nextTick(() => rs.push({})); + rs.push({}); + rs.push(null); + } + }); + + const ws = new Writable({ + highWaterMark: 0, + objectMode: true, + write: (data, end, cb) => setImmediate(cb) + }); + + rs.on('end', common.mustCall()); + ws.on('finish', common.mustCall()); + rs.pipe(ws); +} + +{ + let missing = 8; + + const rs = new Readable({ + objectMode: true, + read: () => { + if (missing--) rs.push({}); + else rs.push(null); + } + }); + + const pt = rs + .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) + .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); + + pt.on('end', function() { + wrapper.push(null); + }); + + const wrapper = new Readable({ + objectMode: true, + read: () => { + process.nextTick(function() { + let data = pt.read(); + if (data === null) { + pt.once('readable', function() { + data = pt.read(); + if (data !== null) wrapper.push(data); + }); + } else { + wrapper.push(data); + } + }); + } + }); + + wrapper.resume(); + wrapper.on('end', common.mustCall()); +}