Skip to content

Commit cb189a3

Browse files
committed
stream: refactor Writable buffering
1 parent e13a37e commit cb189a3

File tree

2 files changed

+56
-117
lines changed

2 files changed

+56
-117
lines changed

benchmark/streams/writable-manywrites.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ function main({ n, sync }) {
2020
};
2121

2222
bench.start();
23-
for (var k = 0; k < n; ++k) {
24-
s.write(b);
23+
24+
let k = 0;
25+
function run () {
26+
while (k++ < n && s.write(b));
27+
if (k >= n)
28+
bench.end(n);
2529
}
26-
bench.end(n);
30+
s.on('drain', run);
31+
run();
2732
}

lib/_stream_writable.js

Lines changed: 48 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ function WritableState(options, stream, isDuplex) {
148148
// synchronous _write() completion.
149149
this.afterWriteTickInfo = null;
150150

151-
this.bufferedRequest = null;
152-
this.lastBufferedRequest = null;
151+
this.buffered = [];
152+
this.buffered.allBuffers = true;
153153

154154
// Number of pending user-supplied write callbacks
155155
// this must be 0 before 'finish' can be emitted
@@ -172,25 +172,10 @@ function WritableState(options, stream, isDuplex) {
172172
// should return false. This is needed since when autoDestroy
173173
// is disabled we need a way to tell whether the stream has failed.
174174
this.errored = false;
175-
176-
// Count buffered requests
177-
this.bufferedRequestCount = 0;
178-
179-
// Allocate the first CorkedRequest, there is always
180-
// one allocated and free to use, and we maintain at most two
181-
const corkReq = { next: null, entry: null, finish: undefined };
182-
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
183-
this.corkedRequestsFree = corkReq;
184175
}
185176

186177
WritableState.prototype.getBuffer = function getBuffer() {
187-
var current = this.bufferedRequest;
188-
const out = [];
189-
while (current) {
190-
out.push(current);
191-
current = current.next;
192-
}
193-
return out;
178+
return this.buffered;
194179
};
195180

196181
ObjectDefineProperty(WritableState.prototype, 'buffer', {
@@ -200,6 +185,12 @@ ObjectDefineProperty(WritableState.prototype, 'buffer', {
200185
'instead.', 'DEP0003')
201186
});
202187

188+
Object.defineProperty(WritableState.prototype, 'bufferedRequestCount', {
189+
get() {
190+
return this.buffered.length;
191+
}
192+
});
193+
203194
// Test _writableState for inheritance to account for Duplex streams,
204195
// whose prototype chain only points to Readable.
205196
var realHasInstance;
@@ -338,11 +329,9 @@ Writable.prototype.uncork = function() {
338329
if (state.corked) {
339330
state.corked--;
340331

341-
if (!state.writing &&
342-
!state.corked &&
343-
!state.bufferProcessing &&
344-
state.bufferedRequest)
332+
if (!state.writing) {
345333
clearBuffer(this, state);
334+
}
346335
}
347336
};
348337

@@ -427,20 +416,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
427416
state.needDrain = true;
428417

429418
if (state.writing || state.corked || state.errored) {
430-
var last = state.lastBufferedRequest;
431-
state.lastBufferedRequest = {
432-
chunk,
433-
encoding,
434-
isBuf,
435-
callback: cb,
436-
next: null
437-
};
438-
if (last) {
439-
last.next = state.lastBufferedRequest;
440-
} else {
441-
state.bufferedRequest = state.lastBufferedRequest;
442-
}
443-
state.bufferedRequestCount += 1;
419+
const buffered = state.buffered;
420+
buffered.push({ chunk, encoding, callback: cb });
421+
buffered.allBuffers = isBuf && buffered.allBuffers;
444422
} else {
445423
doWrite(stream, state, false, len, chunk, encoding, cb);
446424
}
@@ -496,10 +474,7 @@ function onwrite(stream, er) {
496474
// Check if we're actually ready to finish, but don't emit yet
497475
var finished = needFinish(state) || stream.destroyed;
498476

499-
if (!finished &&
500-
!state.corked &&
501-
!state.bufferProcessing &&
502-
state.bufferedRequest) {
477+
if (!finished) {
503478
clearBuffer(stream, state);
504479
}
505480

@@ -544,67 +519,42 @@ function afterWrite(stream, state, count, cb) {
544519

545520
// If there's something in the buffer waiting, then process it
546521
function clearBuffer(stream, state) {
547-
state.bufferProcessing = true;
548-
var entry = state.bufferedRequest;
549-
550-
if (stream._writev && entry && entry.next) {
551-
// Fast case, write everything using _writev()
552-
var l = state.bufferedRequestCount;
553-
var buffer = new Array(l);
554-
var holder = state.corkedRequestsFree;
555-
holder.entry = entry;
556-
557-
var count = 0;
558-
var allBuffers = true;
559-
while (entry) {
560-
buffer[count] = entry;
561-
if (!entry.isBuf)
562-
allBuffers = false;
563-
entry = entry.next;
564-
count += 1;
565-
}
566-
buffer.allBuffers = allBuffers;
522+
const buffered = state.buffered;
523+
const bufferedLength = buffered.length;
567524

568-
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
525+
if (!bufferedLength || state.corked || state.bufferProcessing) {
526+
return;
527+
}
569528

570-
// doWrite is almost always async, defer these to save a bit of time
571-
// as the hot path ends with doWrite
529+
state.bufferProcessing = true;
530+
if (bufferedLength > 1 && stream._writev) {
572531
state.pendingcb++;
573-
state.lastBufferedRequest = null;
574-
if (holder.next) {
575-
state.corkedRequestsFree = holder.next;
576-
holder.next = null;
577-
} else {
578-
var corkReq = { next: null, entry: null, finish: undefined };
579-
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
580-
state.corkedRequestsFree = corkReq;
532+
// doWrite mutates buffered array. Keep a copy of callbacks.
533+
const callbacks = new Array(buffered.length);
534+
for (let n = 0; n < buffered.length; ++n) {
535+
callbacks[n] = buffered[n].callback;
581536
}
582-
state.bufferedRequestCount = 0;
583-
} else {
584-
// Slow case, write chunks one-by-one
585-
while (entry) {
586-
var chunk = entry.chunk;
587-
var encoding = entry.encoding;
588-
var cb = entry.callback;
589-
var len = state.objectMode ? 1 : chunk.length;
590-
591-
doWrite(stream, state, false, len, chunk, encoding, cb);
592-
entry = entry.next;
593-
state.bufferedRequestCount--;
594-
// If we didn't call the onwrite immediately, then
595-
// it means that we need to wait until it does.
596-
// also, that means that the chunk and cb are currently
597-
// being processed, so move the buffer counter past them.
598-
if (state.writing) {
599-
break;
537+
538+
doWrite(stream, state, true, state.length, buffered, '', (err) => {
539+
for (const callback of callbacks) {
540+
state.pendingcb--;
541+
callback(err);
600542
}
543+
});
544+
state.buffered = [];
545+
state.buffered.allBuffers = true;
546+
} else {
547+
let i;
548+
for (i = 0; i < bufferedLength && !state.writing; i++) {
549+
const chunk = buffered[i].chunk;
550+
const encoding = buffered[i].encoding;
551+
const callback = buffered[i].callback;
552+
553+
const len = state.objectMode ? 1 : chunk.length;
554+
doWrite(stream, state, false, len, chunk, encoding, callback);
601555
}
602-
603-
if (entry === null)
604-
state.lastBufferedRequest = null;
556+
buffered.splice(0, i);
605557
}
606-
607-
state.bufferedRequest = entry;
608558
state.bufferProcessing = false;
609559
}
610560

@@ -665,9 +615,9 @@ ObjectDefineProperty(Writable.prototype, 'writableLength', {
665615

666616
function needFinish(state) {
667617
return (state.ending &&
668-
state.length === 0 &&
618+
!state.length &&
619+
!state.buffered.length &&
669620
!state.errored &&
670-
state.bufferedRequest === null &&
671621
!state.finished &&
672622
!state.writing);
673623
}
@@ -742,20 +692,6 @@ function endWritable(stream, state, cb) {
742692
stream.writable = false;
743693
}
744694

745-
function onCorkedFinish(corkReq, state, err) {
746-
var entry = corkReq.entry;
747-
corkReq.entry = null;
748-
while (entry) {
749-
var cb = entry.callback;
750-
state.pendingcb--;
751-
cb(err);
752-
entry = entry.next;
753-
}
754-
755-
// Reuse the free corkReq.
756-
state.corkedRequestsFree.next = corkReq;
757-
}
758-
759695
function onFinished(stream, state, cb) {
760696
if (state.destroyed && state.errorEmitted) {
761697
// TODO(ronag): Backwards compat. Should be moved to end() without
@@ -828,12 +764,10 @@ const destroy = destroyImpl.destroy;
828764
Writable.prototype.destroy = function(err, cb) {
829765
const state = this._writableState;
830766
if (!state.destroyed) {
831-
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
767+
for (const entry of state.buffered) {
832768
process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write'));
833769
}
834-
state.bufferedRequest = null;
835-
state.lastBufferedRequest = null;
836-
state.bufferedRequestCount = 0;
770+
state.buffered = [];
837771
}
838772
destroy.call(this, err, cb);
839773
return this;

0 commit comments

Comments
 (0)