diff --git a/benchmark/streams/writable-simple.js b/benchmark/streams/writable-simple.js new file mode 100644 index 00000000000000..b6b4642f945492 --- /dev/null +++ b/benchmark/streams/writable-simple.js @@ -0,0 +1,35 @@ +'use strict'; + +const common = require('../common'); +const Writable = require('stream').Writable; + +const bench = common.createBenchmark(main, { + n: [50000], + inputType: ['buffer', 'string'], + inputEncoding: ['utf8', 'ucs2'], + decodeAs: ['', 'utf8', 'ucs2'] +}); + +const inputStr = `袎逜 釂鱞鸄 碄碆碃 蒰裧頖 鋑, 瞂 覮轀 蔝蓶蓨 踥踕踛 鬐鶤 鄜 忁曨曣 +翀胲胵, 鬵鵛嚪 釢髟偛 碞碠粻 漀 涾烰 跬 窱縓 墥墡嬇 禒箈箑, 餤駰 瀁瀎瀊 躆轖轕 蒛, 銙 簎艜薤 +樆樦潏 魡鳱 櫱瀯灂 鷜鷙鷵 禒箈箑 綧 駓駗, 鋡 嗛嗕塨 嶭嶴憝 爂犤繵 罫蓱 摮 灉礭蘠 蠬襱覾 脬舑莕 +躐鑏, 襆贂 漀 刲匊呥 肒芅邥 泏狔狑, 瀗犡礝 浘涀缹 輲輹 綧`; + +function main(conf) { + const n = +conf.n; + const s = new Writable({ + decodeBuffers: !!conf.decodeAs, + defaultEncoding: conf.decodeAs || undefined, + write(chunk, encoding, cb) { cb(); } + }); + + const inputEnc = conf.inputType === 'buffer' ? undefined : conf.inputEncoding; + const input = conf.inputType === 'buffer' ? + Buffer.from(inputStr, conf.inputEncoding) : inputStr; + + bench.start(); + for (var k = 0; k < n; ++k) { + s.write(input, inputEnc); + } + bench.end(n); +} diff --git a/doc/api/stream.md b/doc/api/stream.md index 291cda735aee3d..b6b8d7bcdea13c 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1184,6 +1184,9 @@ constructor and implement the `writable._write()` method. The * `decodeStrings` {Boolean} Whether or not to decode strings into Buffers before passing them to [`stream._write()`][stream-_write]. Defaults to `true` + * `decodeBuffers` {Boolean} Whether or not to decode Buffers into strings + using the default encoding before passing them to + [`stream._write()`][stream-_write]. * `objectMode` {Boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, it becomes possible to write JavaScript values other than string or diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index ba56225d974fe9..deb6e0853dd028 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -11,6 +11,7 @@ const util = require('util'); const internalUtil = require('internal/util'); const Stream = require('stream'); const Buffer = require('buffer').Buffer; +var StringDecoder; util.inherits(Writable, Stream); @@ -46,6 +47,8 @@ function WritableState(options, stream) { // drain event flag. this.needDrain = false; // at the start of calling end() + this.flushing = false; + // at the start of calling endWritable() this.ending = false; // when end() has been called, and returned this.ended = false; @@ -55,6 +58,8 @@ function WritableState(options, stream) { // should we decode strings into buffers before passing to _write? // this is here so that some node-core streams can optimize string // handling at a lower level. + // This is confusingly named. For character encodings (like utf8), setting + // decodeStrings to true will *encode* strings as Buffers. var noDecode = options.decodeStrings === false; this.decodeStrings = !noDecode; @@ -63,6 +68,21 @@ function WritableState(options, stream) { // Everything else in the universe uses 'utf8', though. this.defaultEncoding = options.defaultEncoding || 'utf8'; + // A StringDecoder instance that is used for decoding incoming Buffers + // if that is desired by the stream implementer, as indicated by the + // `decodeBuffers` option. + this.stringDecoder = null; + if (options.decodeBuffers) { + // Check whether the caller explicitly requested inconsistent options. + if (options.decodeStrings === true) { + throw new Error('decodeBuffers and decodeStrings cannot both be true'); + } + + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this.stringDecoder = new StringDecoder(this.defaultEncoding); + } + // not an actual buffer we keep track of, but a measurement // of how much we're waiting to get pushed to some underlying // socket or file. @@ -276,11 +296,27 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { }; function decodeChunk(state, chunk, encoding) { - if (!state.objectMode && - state.decodeStrings !== false && - typeof chunk === 'string') { - chunk = Buffer.from(chunk, encoding); + if (state.objectMode) + return chunk; + + var sd = state.stringDecoder; + if (typeof chunk === 'string') { + if (sd !== null && encoding === sd.encoding && sd.lastNeed === 0) + return chunk; // No re-encoding encessary. + + if (state.decodeStrings !== false || sd !== null) + chunk = Buffer.from(chunk, encoding); } + + if (sd !== null) { + // chunk is always a Buffer now. + if (state.flushing) { + chunk = sd.end(chunk); + } else { + chunk = sd.write(chunk); + } + } + return chunk; } @@ -288,11 +324,15 @@ function decodeChunk(state, chunk, encoding) { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { - if (!isBuf) { + var sd = state.stringDecoder; + if (!isBuf || sd) { chunk = decodeChunk(state, chunk, encoding); if (chunk instanceof Buffer) encoding = 'buffer'; + else if (sd) + encoding = sd.encoding; } + var len = state.objectMode ? 1 : chunk.length; state.length += len; @@ -459,20 +499,26 @@ Writable.prototype._write = function(chunk, encoding, cb) { Writable.prototype._writev = null; -Writable.prototype.end = function(chunk, encoding, cb) { +const empty = Buffer.alloc(0); + +Writable.prototype.end = function(chunk, enc, cb) { var state = this._writableState; if (typeof chunk === 'function') { cb = chunk; chunk = null; - encoding = null; - } else if (typeof encoding === 'function') { - cb = encoding; - encoding = null; + enc = null; + } else if (typeof enc === 'function') { + cb = enc; + enc = null; } + state.flushing = true; + if (chunk !== null && chunk !== undefined) - this.write(chunk, encoding); + this.write(chunk, enc); + else if (state.stringDecoder && state.stringDecoder.lastNeed > 0) + this.write(empty); // .end() fully uncorks if (state.corked) { diff --git a/test/parallel/test-stream-writable-decode-buffers.js b/test/parallel/test-stream-writable-decode-buffers.js new file mode 100644 index 00000000000000..f785896f4a1741 --- /dev/null +++ b/test/parallel/test-stream-writable-decode-buffers.js @@ -0,0 +1,121 @@ +'use strict'; +require('../common'); +const assert = require('assert'); + +const stream = require('stream'); + +class ChunkStoringWritable extends stream.Writable { + constructor(options) { + super(options); + + this.chunks = []; + } + + _write(data, encoding, callback) { + this.chunks.push({ data, encoding }); + callback(); + } +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('e4bd', 'hex')); + w.write(Buffer.from('a0e5', 'hex')); + w.write(Buffer.from('a5bd', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '你', '好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('你', 'utf8')); + w.write(Buffer.from('好', 'utf8')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你', '好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('80', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['\ufffd']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('c3', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '\ufffd']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'utf16le' + }); + + w.write(Buffer.from('你好', 'utf16le')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'base64' + }); + + w.write(Buffer.from('你好', 'utf16le')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['YE99', 'WQ==']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'utf16le' + }); + + w.write('你好', 'utf16le'); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + + w.write(Buffer.from([0x44, 0xc3])); // Ends on incomplete UTF8. + + // This write should *not* be passed through directly as there's + // input pending. + w.write('a'); + + w.end(Buffer.from('bc7373656c', 'hex')); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['D', '�a', '�ssel']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + + w.write('a'); + w.setDefaultEncoding('ucs2'); + w.end('换'); + + assert.deepStrictEqual(w.chunks, [ + { data: 'a', encoding: 'utf8' }, + { data: 'bc', encoding: 'utf8' } + ]); +} + +assert.throws(() => new stream.Writable({ + decodeBuffers: true, + decodeStrings: true +}), /decodeBuffers and decodeStrings cannot both be true/);