From cb189a3010f5eeae8a23b31326a0b84671e9cfac Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 15 Dec 2019 12:19:16 +0100 Subject: [PATCH] stream: refactor Writable buffering --- benchmark/streams/writable-manywrites.js | 11 +- lib/_stream_writable.js | 162 +++++++---------------- 2 files changed, 56 insertions(+), 117 deletions(-) diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index 0ed38d0357a438..aee7623e01a029 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -20,8 +20,13 @@ function main({ n, sync }) { }; bench.start(); - for (var k = 0; k < n; ++k) { - s.write(b); + + let k = 0; + function run () { + while (k++ < n && s.write(b)); + if (k >= n) + bench.end(n); } - bench.end(n); + s.on('drain', run); + run(); } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d853441b3cc844..c36adb9b74fd0e 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -148,8 +148,8 @@ function WritableState(options, stream, isDuplex) { // synchronous _write() completion. this.afterWriteTickInfo = null; - this.bufferedRequest = null; - this.lastBufferedRequest = null; + this.buffered = []; + this.buffered.allBuffers = true; // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -172,25 +172,10 @@ function WritableState(options, stream, isDuplex) { // should return false. This is needed since when autoDestroy // is disabled we need a way to tell whether the stream has failed. this.errored = false; - - // Count buffered requests - this.bufferedRequestCount = 0; - - // Allocate the first CorkedRequest, there is always - // one allocated and free to use, and we maintain at most two - const corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); - this.corkedRequestsFree = corkReq; } WritableState.prototype.getBuffer = function getBuffer() { - var current = this.bufferedRequest; - const out = []; - while (current) { - out.push(current); - current = current.next; - } - return out; + return this.buffered; }; ObjectDefineProperty(WritableState.prototype, 'buffer', { @@ -200,6 +185,12 @@ ObjectDefineProperty(WritableState.prototype, 'buffer', { 'instead.', 'DEP0003') }); +Object.defineProperty(WritableState.prototype, 'bufferedRequestCount', { + get() { + return this.buffered.length; + } +}); + // Test _writableState for inheritance to account for Duplex streams, // whose prototype chain only points to Readable. var realHasInstance; @@ -338,11 +329,9 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - if (!state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) + if (!state.writing) { clearBuffer(this, state); + } } }; @@ -427,20 +416,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { state.needDrain = true; if (state.writing || state.corked || state.errored) { - var last = state.lastBufferedRequest; - state.lastBufferedRequest = { - chunk, - encoding, - isBuf, - callback: cb, - next: null - }; - if (last) { - last.next = state.lastBufferedRequest; - } else { - state.bufferedRequest = state.lastBufferedRequest; - } - state.bufferedRequestCount += 1; + const buffered = state.buffered; + buffered.push({ chunk, encoding, callback: cb }); + buffered.allBuffers = isBuf && buffered.allBuffers; } else { doWrite(stream, state, false, len, chunk, encoding, cb); } @@ -496,10 +474,7 @@ function onwrite(stream, er) { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; - if (!finished && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) { + if (!finished) { clearBuffer(stream, state); } @@ -544,67 +519,42 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { - state.bufferProcessing = true; - var entry = state.bufferedRequest; - - if (stream._writev && entry && entry.next) { - // Fast case, write everything using _writev() - var l = state.bufferedRequestCount; - var buffer = new Array(l); - var holder = state.corkedRequestsFree; - holder.entry = entry; - - var count = 0; - var allBuffers = true; - while (entry) { - buffer[count] = entry; - if (!entry.isBuf) - allBuffers = false; - entry = entry.next; - count += 1; - } - buffer.allBuffers = allBuffers; + const buffered = state.buffered; + const bufferedLength = buffered.length; - doWrite(stream, state, true, state.length, buffer, '', holder.finish); + if (!bufferedLength || state.corked || state.bufferProcessing) { + return; + } - // doWrite is almost always async, defer these to save a bit of time - // as the hot path ends with doWrite + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { state.pendingcb++; - state.lastBufferedRequest = null; - if (holder.next) { - state.corkedRequestsFree = holder.next; - holder.next = null; - } else { - var corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); - state.corkedRequestsFree = corkReq; + // doWrite mutates buffered array. Keep a copy of callbacks. + const callbacks = new Array(buffered.length); + for (let n = 0; n < buffered.length; ++n) { + callbacks[n] = buffered[n].callback; } - state.bufferedRequestCount = 0; - } else { - // Slow case, write chunks one-by-one - while (entry) { - var chunk = entry.chunk; - var encoding = entry.encoding; - var cb = entry.callback; - var len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, false, len, chunk, encoding, cb); - entry = entry.next; - state.bufferedRequestCount--; - // If we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - break; + + doWrite(stream, state, true, state.length, buffered, '', (err) => { + for (const callback of callbacks) { + state.pendingcb--; + callback(err); } + }); + state.buffered = []; + state.buffered.allBuffers = true; + } else { + let i; + for (i = 0; i < bufferedLength && !state.writing; i++) { + const chunk = buffered[i].chunk; + const encoding = buffered[i].encoding; + const callback = buffered[i].callback; + + const len = state.objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); } - - if (entry === null) - state.lastBufferedRequest = null; + buffered.splice(0, i); } - - state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -665,9 +615,9 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', { function needFinish(state) { return (state.ending && - state.length === 0 && + !state.length && + !state.buffered.length && !state.errored && - state.bufferedRequest === null && !state.finished && !state.writing); } @@ -742,20 +692,6 @@ function endWritable(stream, state, cb) { stream.writable = false; } -function onCorkedFinish(corkReq, state, err) { - var entry = corkReq.entry; - corkReq.entry = null; - while (entry) { - var cb = entry.callback; - state.pendingcb--; - cb(err); - entry = entry.next; - } - - // Reuse the free corkReq. - state.corkedRequestsFree.next = corkReq; -} - function onFinished(stream, state, cb) { if (state.destroyed && state.errorEmitted) { // TODO(ronag): Backwards compat. Should be moved to end() without @@ -828,12 +764,10 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - for (let entry = state.bufferedRequest; entry; entry = entry.next) { + for (const entry of state.buffered) { process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write')); } - state.bufferedRequest = null; - state.lastBufferedRequest = null; - state.bufferedRequestCount = 0; + state.buffered = []; } destroy.call(this, err, cb); return this;