diff --git a/benchmark/streams/pipe-object-mode.js b/benchmark/streams/pipe-object-mode.js new file mode 100644 index 00000000000000..d52b2238780ccb --- /dev/null +++ b/benchmark/streams/pipe-object-mode.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [5e6] +}); + +function main({ n }) { + const b = {}; + const r = new Readable({ objectMode: true }); + const w = new Writable({ objectMode: true }); + + var i = 0; + + r._read = () => r.push(i++ === n ? null : b); + w._write = (data, enc, cb) => cb(); + + bench.start(); + + r.pipe(w); + w.on('finish', () => bench.end(n)); +} diff --git a/benchmark/streams/pipe.js b/benchmark/streams/pipe.js new file mode 100644 index 00000000000000..a7d67b7d6908c8 --- /dev/null +++ b/benchmark/streams/pipe.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [5e6] +}); + +function main({ n }) { + const b = new Buffer(1024); + const r = new Readable(); + const w = new Writable(); + + var i = 0; + + r._read = () => r.push(i++ === n ? null : b); + w._write = (data, enc, cb) => cb(); + + bench.start(); + + r.pipe(w); + w.on('finish', () => bench.end(n)); +} diff --git a/lib/internal/process/next_tick.js b/lib/internal/process/next_tick.js index e7fad84397c427..84a7402117c5c2 100644 --- a/lib/internal/process/next_tick.js +++ b/lib/internal/process/next_tick.js @@ -32,32 +32,55 @@ function setupNextTick() { const kHasScheduled = 0; const kHasPromiseRejections = 1; - const nextTickQueue = { - head: null, - tail: null, + // Queue size for each tick array. Must be a factor of two. + const kQueueSize = 2048; + const kQueueMask = kQueueSize - 1; + + class FixedQueue { + constructor() { + this.bottom = 0; + this.top = 0; + this.list = new Array(kQueueSize); + this.next = null; + } + push(data) { - const entry = { data, next: null }; - if (this.tail !== null) { - this.tail.next = entry; - } else { - this.head = entry; - tickInfo[kHasScheduled] = 1; - } - this.tail = entry; - }, + this.list[this.top] = data; + this.top = (this.top + 1) & kQueueMask; + } + shift() { - if (this.head === null) - return; - const ret = this.head.data; - if (this.head === this.tail) { - this.head = this.tail = null; + const next = this.list[this.bottom]; + if (next === undefined) return null; + this.list[this.bottom] = undefined; + this.bottom = (this.bottom + 1) & kQueueMask; + return next; + } + } + + var head = new FixedQueue(); + var tail = head; + + function push(data) { + if (head.bottom === head.top) { + if (head.list[head.top] !== undefined) + head = head.next = new FixedQueue(); + else + tickInfo[kHasScheduled] = 1; + } + head.push(data); + } + + function shift() { + const next = tail.shift(); + if (tail.top === tail.bottom) { + if (tail.next) + tail = tail.next; + else tickInfo[kHasScheduled] = 0; - } else { - this.head = this.head.next; - } - return ret; } - }; + return next; + } process.nextTick = nextTick; // Needs to be accessible from beyond this scope. @@ -69,7 +92,7 @@ function setupNextTick() { function _tickCallback() { let tock; do { - while (tock = nextTickQueue.shift()) { + while (tock = shift()) { const asyncId = tock[async_id_symbol]; emitBefore(asyncId, tock[trigger_async_id_symbol]); // emitDestroy() places the async_id_symbol into an asynchronous queue @@ -93,7 +116,7 @@ function setupNextTick() { emitAfter(asyncId); } runMicrotasks(); - } while (nextTickQueue.head !== null || emitPromiseRejectionWarnings()); + } while (head.top !== head.bottom || emitPromiseRejectionWarnings()); tickInfo[kHasPromiseRejections] = 0; } @@ -139,8 +162,7 @@ function setupNextTick() { args[i - 1] = arguments[i]; } - nextTickQueue.push(new TickObject(callback, args, - getDefaultTriggerAsyncId())); + push(new TickObject(callback, args, getDefaultTriggerAsyncId())); } // `internalNextTick()` will not enqueue any callback when the process is @@ -168,6 +190,6 @@ function setupNextTick() { if (triggerAsyncId === null) triggerAsyncId = getDefaultTriggerAsyncId(); - nextTickQueue.push(new TickObject(callback, args, triggerAsyncId)); + push(new TickObject(callback, args, triggerAsyncId)); } }