Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions benchmark/streams/writable-manywrites.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ function main({ n, sync }) {
};

bench.start();
for (var k = 0; k < n; ++k) {
s.write(b);

let k = 0;
function run () {
while (k++ < n && s.write(b));
if (k >= n)
bench.end(n);
}
bench.end(n);
s.on('drain', run);
run();
}
162 changes: 48 additions & 114 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ function WritableState(options, stream, isDuplex) {
// synchronous _write() completion.
this.afterWriteTickInfo = null;

this.bufferedRequest = null;
this.lastBufferedRequest = null;
this.buffered = [];
this.buffered.allBuffers = true;

// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
Expand All @@ -172,25 +172,10 @@ function WritableState(options, stream, isDuplex) {
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Count buffered requests
this.bufferedRequestCount = 0;

// Allocate the first CorkedRequest, there is always
// one allocated and free to use, and we maintain at most two
const corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
this.corkedRequestsFree = corkReq;
}

WritableState.prototype.getBuffer = function getBuffer() {
var current = this.bufferedRequest;
const out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
return this.buffered;
};

ObjectDefineProperty(WritableState.prototype, 'buffer', {
Expand All @@ -200,6 +185,12 @@ ObjectDefineProperty(WritableState.prototype, 'buffer', {
'instead.', 'DEP0003')
});

Object.defineProperty(WritableState.prototype, 'bufferedRequestCount', {
get() {
return this.buffered.length;
}
});

// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
var realHasInstance;
Expand Down Expand Up @@ -338,11 +329,9 @@ Writable.prototype.uncork = function() {
if (state.corked) {
state.corked--;

if (!state.writing &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest)
if (!state.writing) {
clearBuffer(this, state);
}
}
};

Expand Down Expand Up @@ -427,20 +416,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
const buffered = state.buffered;
buffered.push({ chunk, encoding, callback: cb });
buffered.allBuffers = isBuf && buffered.allBuffers;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
Expand Down Expand Up @@ -496,10 +474,7 @@ function onwrite(stream, er) {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
if (!finished) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -544,67 +519,42 @@ function afterWrite(stream, state, count, cb) {

// If there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;

if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var l = state.bufferedRequestCount;
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;

var count = 0;
var allBuffers = true;
while (entry) {
buffer[count] = entry;
if (!entry.isBuf)
allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;
const buffered = state.buffered;
const bufferedLength = buffered.length;

doWrite(stream, state, true, state.length, buffer, '', holder.finish);
if (!bufferedLength || state.corked || state.bufferProcessing) {
return;
}

// doWrite is almost always async, defer these to save a bit of time
// as the hot path ends with doWrite
state.bufferProcessing = true;
if (bufferedLength > 1 && stream._writev) {
state.pendingcb++;
state.lastBufferedRequest = null;
if (holder.next) {
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
var corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
state.corkedRequestsFree = corkReq;
// doWrite mutates buffered array. Keep a copy of callbacks.
const callbacks = new Array(buffered.length);
for (let n = 0; n < buffered.length; ++n) {
callbacks[n] = buffered[n].callback;
}
state.bufferedRequestCount = 0;
} else {
// Slow case, write chunks one-by-one
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;

doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
state.bufferedRequestCount--;
// If we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
break;

doWrite(stream, state, true, state.length, buffered, '', (err) => {
for (const callback of callbacks) {
state.pendingcb--;
callback(err);
}
});
state.buffered = [];
state.buffered.allBuffers = true;
} else {
let i;
for (i = 0; i < bufferedLength && !state.writing; i++) {
const chunk = buffered[i].chunk;
const encoding = buffered[i].encoding;
const callback = buffered[i].callback;

const len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, callback);
}

if (entry === null)
state.lastBufferedRequest = null;
buffered.splice(0, i);
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
}

Expand Down Expand Up @@ -665,9 +615,9 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', {

function needFinish(state) {
return (state.ending &&
state.length === 0 &&
!state.length &&
!state.buffered.length &&
!state.errored &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
}
Expand Down Expand Up @@ -742,20 +692,6 @@ function endWritable(stream, state, cb) {
stream.writable = false;
}

function onCorkedFinish(corkReq, state, err) {
var entry = corkReq.entry;
corkReq.entry = null;
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}

// Reuse the free corkReq.
state.corkedRequestsFree.next = corkReq;
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
// TODO(ronag): Backwards compat. Should be moved to end() without
Expand Down Expand Up @@ -828,12 +764,10 @@ const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;
if (!state.destroyed) {
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
for (const entry of state.buffered) {
process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write'));
}
state.bufferedRequest = null;
state.lastBufferedRequest = null;
state.bufferedRequestCount = 0;
state.buffered = [];
}
destroy.call(this, err, cb);
return this;
Expand Down