From 92bb697daa0d8c66a826e618903f3c070409b943 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 29 Sep 2019 17:55:54 +0200 Subject: [PATCH 1/4] stream: make sure readable is called on empty chunk followed by eof An empty chunk should be treated like any other chunk and trigger the read logic. Fixes: https://github.com/nodejs/node/issues/29758 --- lib/_stream_readable.js | 50 +++++++++---------- .../test-stream-duplex-empty-chunk.js | 18 +++++++ 2 files changed, 43 insertions(+), 25 deletions(-) create mode 100644 test/parallel/test-stream-duplex-empty-chunk.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index eb7d5011d49de2..86bf26ee127a9c 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -271,12 +271,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { er = chunkInvalid(state, chunk); if (er) { errorOrDestroy(stream, er); - } else if (state.objectMode || chunk && chunk.length > 0) { + } else { if (typeof chunk !== 'string' && !state.objectMode && // Do not use Object.getPrototypeOf as it is slower since V8 7.3. !(chunk instanceof Buffer)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + chunk = chunk ? Stream._uint8ArrayToBuffer(chunk) : Buffer.alloc(0); } if (addToFront) { @@ -292,17 +292,9 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { state.reading = false; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); - if (state.objectMode || chunk.length !== 0) - addChunk(stream, state, chunk, false); - else - maybeReadMore(stream, state); - } else { - addChunk(stream, state, chunk, false); } + addChunk(stream, state, chunk, false); } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); } } @@ -314,22 +306,27 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } function addChunk(stream, state, chunk, addToFront) { + const len = state.objectMode ? 1 : chunk.length; if (state.flowing && state.length === 0 && !state.sync) { - // Use the guard to avoid creating `Set()` repeatedly - // when we have multiple pipes. - if (state.multiAwaitDrain) { - state.awaitDrainWriters.clear(); - } else { - state.awaitDrainWriters = null; + if (len) { + // Use the guard to avoid creating `Set()` repeatedly + // when we have multiple pipes. + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } + stream.emit('data', chunk); } - stream.emit('data', chunk); } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) - state.buffer.unshift(chunk); - else - state.buffer.push(chunk); + if (len) { + // Update the buffer info. + state.length += len; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); + } if (state.needReadable) emitReadable(stream); @@ -593,9 +590,12 @@ function emitReadable_(stream) { debug('emitReadable_', state.destroyed, state.length, state.ended); if (!state.destroyed && (state.length || state.ended)) { stream.emit('readable'); - state.emittedReadable = false; } + // Reset emittedReadable once it is safe to schedule another + // emitReadable. + state.emittedReadable = false; + // The stream needs another readable event if // 1. It is not flowing, as the flow mechanism will take // care of it. diff --git a/test/parallel/test-stream-duplex-empty-chunk.js b/test/parallel/test-stream-duplex-empty-chunk.js new file mode 100644 index 00000000000000..13ca0efe1a91b9 --- /dev/null +++ b/test/parallel/test-stream-duplex-empty-chunk.js @@ -0,0 +1,18 @@ +'use strict'; + +const common = require('../common'); +const makeDuplexPair = require('../common/duplexpair'); + +const { clientSide, serverSide } = makeDuplexPair(); + +serverSide.resume(); +serverSide.on('end', function(buf) { + serverSide.write('out'); + serverSide.write(''); + serverSide.end(); +}); + +clientSide.end(); + +clientSide.on('data', common.mustCall()); +clientSide.on('end', common.mustCall()); From 2c720a1701ec82937116b165d6d5b40d48f88929 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 29 Sep 2019 23:30:00 +0200 Subject: [PATCH 2/4] fixup: less significant changes --- lib/_stream_readable.js | 47 +++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 86bf26ee127a9c..afb25e6e84cd18 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -271,12 +271,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { er = chunkInvalid(state, chunk); if (er) { errorOrDestroy(stream, er); - } else { + } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && // Do not use Object.getPrototypeOf as it is slower since V8 7.3. !(chunk instanceof Buffer)) { - chunk = chunk ? Stream._uint8ArrayToBuffer(chunk) : Buffer.alloc(0); + chunk = Stream._uint8ArrayToBuffer(chunk); } if (addToFront) { @@ -292,9 +292,19 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { state.reading = false; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); + if (state.objectMode || chunk.length !== 0) + addChunk(stream, state, chunk, false); + else + maybeReadMore(stream, state); + } else { + addChunk(stream, state, chunk, false); } - addChunk(stream, state, chunk, false); } + } else if (!addToFront) { + state.reading = false; + if (state.needReadable) + emitReadable(stream); + maybeReadMore(stream, state); } } @@ -306,27 +316,22 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } function addChunk(stream, state, chunk, addToFront) { - const len = state.objectMode ? 1 : chunk.length; if (state.flowing && state.length === 0 && !state.sync) { - if (len) { - // Use the guard to avoid creating `Set()` repeatedly - // when we have multiple pipes. - if (state.multiAwaitDrain) { - state.awaitDrainWriters.clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit('data', chunk); + // Use the guard to avoid creating `Set()` repeatedly + // when we have multiple pipes. + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; } + stream.emit('data', chunk); } else { - if (len) { - // Update the buffer info. - state.length += len; - if (addToFront) - state.buffer.unshift(chunk); - else - state.buffer.push(chunk); - } + // Update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); if (state.needReadable) emitReadable(stream); From 2f13cdd58f9bc667dfc8a5a7ab288c096073ca34 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 30 Sep 2019 00:47:26 +0200 Subject: [PATCH 3/4] fixup: test --- test/parallel/test-stream2-compatibility.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream2-compatibility.js b/test/parallel/test-stream2-compatibility.js index bd0314ec1a9918..43cabbc7675d15 100644 --- a/test/parallel/test-stream2-compatibility.js +++ b/test/parallel/test-stream2-compatibility.js @@ -44,7 +44,7 @@ class TestReader extends R { } const reader = new TestReader(); -setImmediate(function() { +process.nextTick(function() { assert.strictEqual(ondataCalled, 1); console.log('ok'); reader.push(null); From 9253f5ee73ff59e43538f1775c0e592621911d20 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 30 Sep 2019 00:54:48 +0200 Subject: [PATCH 4/4] fixup: test --- test/parallel/test-stream-duplex-empty-chunk.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/parallel/test-stream-duplex-empty-chunk.js b/test/parallel/test-stream-duplex-empty-chunk.js index 13ca0efe1a91b9..2977b5bff8b0cc 100644 --- a/test/parallel/test-stream-duplex-empty-chunk.js +++ b/test/parallel/test-stream-duplex-empty-chunk.js @@ -9,6 +9,7 @@ serverSide.resume(); serverSide.on('end', function(buf) { serverSide.write('out'); serverSide.write(''); + serverSide.write(''); serverSide.end(); });