From f0b38f7bc6ce9fa269fb9ce40b236be4dac53a87 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 6 Aug 2019 21:45:59 +0200 Subject: [PATCH 1/4] stream: refactor writable buffering --- lib/_stream_writable.js | 163 ++++++++++++---------------------------- 1 file changed, 48 insertions(+), 115 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e212881c4ac555..24b5d61bf4a182 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -135,8 +135,8 @@ function WritableState(options, stream, isDuplex) { // The amount that is being written when _write is called. this.writelen = 0; - this.bufferedRequest = null; - this.lastBufferedRequest = null; + this.buffered = []; + this.buffered.allBuffers = true; // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -154,25 +154,10 @@ function WritableState(options, stream, isDuplex) { // Should .destroy() be called after 'finish' (and potentially 'end') this.autoDestroy = !!options.autoDestroy; - - // Count buffered requests - this.bufferedRequestCount = 0; - - // Allocate the first CorkedRequest, there is always - // one allocated and free to use, and we maintain at most two - const corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); - this.corkedRequestsFree = corkReq; } WritableState.prototype.getBuffer = function getBuffer() { - var current = this.bufferedRequest; - const out = []; - while (current) { - out.push(current); - current = current.next; - } - return out; + return this.buffered; }; Object.defineProperty(WritableState.prototype, 'buffer', { @@ -182,6 +167,12 @@ Object.defineProperty(WritableState.prototype, 'buffer', { 'instead.', 'DEP0003') }); +Object.defineProperty(WritableState.prototype, 'bufferedRequestCount', { + get() { + return this.buffered.length; + } +}); + // Test _writableState for inheritance to account for Duplex streams, // whose prototype chain only points to Readable. var realHasInstance; @@ -315,12 +306,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - - if (!state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) - clearBuffer(this, state); + clearBuffer(this, state); } }; @@ -376,7 +362,7 @@ Object.defineProperty(Writable.prototype, 'writableHighWaterMark', { // If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { +function writeOrBuffer(stream, state, isBuf, chunk, encoding, callback) { if (!isBuf) { var newChunk = decodeChunk(state, chunk, encoding); if (chunk !== newChunk) { @@ -395,22 +381,11 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { state.needDrain = true; if (state.writing || state.corked) { - var last = state.lastBufferedRequest; - state.lastBufferedRequest = { - chunk, - encoding, - isBuf, - callback: cb, - next: null - }; - if (last) { - last.next = state.lastBufferedRequest; - } else { - state.bufferedRequest = state.lastBufferedRequest; - } - state.bufferedRequestCount += 1; + const buffered = state.buffered; + buffered.push({ chunk, encoding, callback }); + buffered.allBuffers = isBuf && buffered.allBuffers; } else { - doWrite(stream, state, false, len, chunk, encoding, cb); + doWrite(stream, state, false, len, chunk, encoding, callback); } return ret; @@ -471,10 +446,7 @@ function onwrite(stream, er) { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; - if (!finished && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) { + if (!finished) { clearBuffer(stream, state); } @@ -500,67 +472,42 @@ function afterWrite(stream, state, cb) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { - state.bufferProcessing = true; - var entry = state.bufferedRequest; - - if (stream._writev && entry && entry.next) { - // Fast case, write everything using _writev() - var l = state.bufferedRequestCount; - var buffer = new Array(l); - var holder = state.corkedRequestsFree; - holder.entry = entry; - - var count = 0; - var allBuffers = true; - while (entry) { - buffer[count] = entry; - if (!entry.isBuf) - allBuffers = false; - entry = entry.next; - count += 1; - } - buffer.allBuffers = allBuffers; + const buffered = state.buffered; + const bufferedLength = buffered.length; - doWrite(stream, state, true, state.length, buffer, '', holder.finish); + if (!bufferedLength || state.corked || state.bufferProcessing || + state.writing) { + return; + } - // doWrite is almost always async, defer these to save a bit of time - // as the hot path ends with doWrite + state.bufferProcessing = true; + if (bufferedLength === 1) { + const { chunk, encoding, callback } = buffered[0]; + const len = state.objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + buffered.length = 0; + buffered.allBuffers = true; + } else if (stream._writev) { state.pendingcb++; - state.lastBufferedRequest = null; - if (holder.next) { - state.corkedRequestsFree = holder.next; - holder.next = null; - } else { - var corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); - state.corkedRequestsFree = corkReq; - } - state.bufferedRequestCount = 0; - } else { - // Slow case, write chunks one-by-one - while (entry) { - var chunk = entry.chunk; - var encoding = entry.encoding; - var cb = entry.callback; - var len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, false, len, chunk, encoding, cb); - entry = entry.next; - state.bufferedRequestCount--; - // If we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - break; + // doWrite mutates buffered array. Keep a copy of callbacks. + const callbacks = buffered.map(({ callback }) => callback); + doWrite(stream, state, true, state.length, buffered, '', (err) => { + for (const callback of callbacks) { + state.pendingcb--; + callback(err); } + }); + state.buffered = []; + state.buffered.allBuffers = true; + } else { + let i = 0; + while (i < bufferedLength && !state.writing) { + const { chunk, encoding, callback } = buffered[i++]; + const len = state.objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); } - - if (entry === null) - state.lastBufferedRequest = null; + buffered.splice(0, i); } - - state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -617,8 +564,8 @@ Object.defineProperty(Writable.prototype, 'writableLength', { function needFinish(state) { return (state.ending && - state.length === 0 && - state.bufferedRequest === null && + !state.length && + !state.buffered.length && !state.finished && !state.writing); } @@ -681,20 +628,6 @@ function endWritable(stream, state, cb) { stream.writable = false; } -function onCorkedFinish(corkReq, state, err) { - var entry = corkReq.entry; - corkReq.entry = null; - while (entry) { - var cb = entry.callback; - state.pendingcb--; - cb(err); - entry = entry.next; - } - - // Reuse the free corkReq. - state.corkedRequestsFree.next = corkReq; -} - Object.defineProperty(Writable.prototype, 'destroyed', { // Making it explicit this property is not enumerable // because otherwise some prototype manipulation in From c09f94cc75472ef16e0fb991a5e5bf9c63caf5c0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Aug 2019 22:09:52 +0200 Subject: [PATCH 2/4] fixup --- lib/_stream_writable.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 24b5d61bf4a182..8ca97d3d6729f4 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -482,7 +482,10 @@ function clearBuffer(stream, state) { state.bufferProcessing = true; if (bufferedLength === 1) { - const { chunk, encoding, callback } = buffered[0]; + const chunk = buffered[0].chunk; + const encoding = buffered[0].encoding; + const callback = buffered[0].callback; + const len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, callback); buffered.length = 0; @@ -490,7 +493,10 @@ function clearBuffer(stream, state) { } else if (stream._writev) { state.pendingcb++; // doWrite mutates buffered array. Keep a copy of callbacks. - const callbacks = buffered.map(({ callback }) => callback); + const callbacks = []; + for (let n = 0; n < buffered.length; ++n) { + callbacks.push(buffered[n].callback); + } doWrite(stream, state, true, state.length, buffered, '', (err) => { for (const callback of callbacks) { state.pendingcb--; @@ -501,8 +507,11 @@ function clearBuffer(stream, state) { state.buffered.allBuffers = true; } else { let i = 0; - while (i < bufferedLength && !state.writing) { - const { chunk, encoding, callback } = buffered[i++]; + for (; i < bufferedLength && !state.writing; i++) { + const chunk = buffered[i].chunk; + const encoding = buffered[i].encoding; + const callback = buffered[i].callback; + const len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, callback); } From d6da4e00fda88ae1d22e04ddf9ea2fbecbf95bf6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Aug 2019 22:11:51 +0200 Subject: [PATCH 3/4] fixup --- lib/_stream_writable.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 8ca97d3d6729f4..7391826f8f1537 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -506,8 +506,8 @@ function clearBuffer(stream, state) { state.buffered = []; state.buffered.allBuffers = true; } else { - let i = 0; - for (; i < bufferedLength && !state.writing; i++) { + let i; + for (i = 0; i < bufferedLength && !state.writing; i++) { const chunk = buffered[i].chunk; const encoding = buffered[i].encoding; const callback = buffered[i].callback; From dcc9bf2d0e69eb6709d93a8299bff2077f20dc12 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 26 Aug 2019 06:05:03 +0200 Subject: [PATCH 4/4] Apply suggestions from code review Co-Authored-By: mscdex --- lib/_stream_writable.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 7391826f8f1537..268992174f8478 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -493,9 +493,9 @@ function clearBuffer(stream, state) { } else if (stream._writev) { state.pendingcb++; // doWrite mutates buffered array. Keep a copy of callbacks. - const callbacks = []; + const callbacks = new Array(buffered.length); for (let n = 0; n < buffered.length; ++n) { - callbacks.push(buffered[n].callback); + callbacks[n] = buffered[n].callback; } doWrite(stream, state, true, state.length, buffered, '', (err) => { for (const callback of callbacks) {