diff --git a/doc/api/deprecations.md b/doc/api/deprecations.md
index 8461caabfc56ee..ddfe9c4adc7250 100644
--- a/doc/api/deprecations.md
+++ b/doc/api/deprecations.md
@@ -590,6 +590,22 @@ Type: Runtime
`node debug` corresponds to the legacy CLI debugger which has been replaced with
a V8-inspector based CLI debugger available through `node inspect`.
+
+### DEP00XX: Usage of _stream_*
+
+Type: Runtime
+
+Using `require('_stream_readable')`,
+`require('_stream_writable')`, `require('_stream_duplex')`
+`require('_stream_transform')`, `require('_stream_passthrough')`
+is now deprecated. The same content are available through:
+`require('stream').Readable`, `require('stream').Writable`,
+`require('stream').Duplex`, `require('stream').Transform`,
+`require('stream').PassThrough`.
+
+*Note*: The usage of `_stream` prefixed modules from core was never
+documented.
+
[alloc]: buffer.html#buffer_class_method_buffer_alloc_size_fill_encoding
[alloc_unsafe_size]: buffer.html#buffer_class_method_buffer_allocunsafe_size
[`Buffer.allocUnsafeSlow(size)`]: buffer.html#buffer_class_method_buffer_allocunsafeslow_size
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 4422b62aac3250..8487eda2076e1d 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -1,78 +1,6 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-// a duplex stream is just a stream that is both readable and writable.
-// Since JS doesn't have multiple prototypal inheritance, this class
-// prototypally inherits from Readable, and then parasitically from
-// Writable.
-
'use strict';
-module.exports = Duplex;
-
-const util = require('util');
-const Readable = require('_stream_readable');
-const Writable = require('_stream_writable');
-
-util.inherits(Duplex, Readable);
-
-var keys = Object.keys(Writable.prototype);
-for (var v = 0; v < keys.length; v++) {
- var method = keys[v];
- if (!Duplex.prototype[method])
- Duplex.prototype[method] = Writable.prototype[method];
-}
-
-function Duplex(options) {
- if (!(this instanceof Duplex))
- return new Duplex(options);
-
- Readable.call(this, options);
- Writable.call(this, options);
-
- if (options && options.readable === false)
- this.readable = false;
-
- if (options && options.writable === false)
- this.writable = false;
-
- this.allowHalfOpen = true;
- if (options && options.allowHalfOpen === false)
- this.allowHalfOpen = false;
-
- this.once('end', onend);
-}
-
-// the no-half-open enforcer
-function onend() {
- // if we allow half-open state, or if the writable side ended,
- // then we're ok.
- if (this.allowHalfOpen || this._writableState.ended)
- return;
-
- // no more data can be written.
- // But allow more writes to happen in this tick.
- process.nextTick(onEndNT, this);
-}
+process.emitWarning(`The ${__filename} module is deprecated. Please use stream`,
+ 'DeprecationWarning', 'DEP00XX');
-function onEndNT(self) {
- self.end();
-}
+module.exports = require('internal/streams/duplex');
diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js
index 82adaa8d1c7d86..0695ca3da84d99 100644
--- a/lib/_stream_passthrough.js
+++ b/lib/_stream_passthrough.js
@@ -1,43 +1,6 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-// a passthrough stream.
-// basically just the most minimal sort of Transform stream.
-// Every written chunk gets output as-is.
-
'use strict';
-module.exports = PassThrough;
-
-const Transform = require('_stream_transform');
-const util = require('util');
-util.inherits(PassThrough, Transform);
-
-function PassThrough(options) {
- if (!(this instanceof PassThrough))
- return new PassThrough(options);
-
- Transform.call(this, options);
-}
+process.emitWarning(`The ${__filename} module is deprecated. Please use stream`,
+ 'DeprecationWarning', 'DEP00XX');
-PassThrough.prototype._transform = function(chunk, encoding, cb) {
- cb(null, chunk);
-};
+module.exports = require('internal/streams/passthrough');
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 879e5ddb53fe40..19e7a41b26e1af 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -1,994 +1,6 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
'use strict';
-module.exports = Readable;
-Readable.ReadableState = ReadableState;
-
-const EE = require('events');
-const Stream = require('stream');
-const Buffer = require('buffer').Buffer;
-const util = require('util');
-const debug = util.debuglog('stream');
-const BufferList = require('internal/streams/BufferList');
-var StringDecoder;
-
-util.inherits(Readable, Stream);
-
-const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
-
-function prependListener(emitter, event, fn) {
- // Sadly this is not cacheable as some libraries bundle their own
- // event emitter implementation with them.
- if (typeof emitter.prependListener === 'function') {
- return emitter.prependListener(event, fn);
- } else {
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- // the prependListener() method. The goal is to eventually remove this hack.
- if (!emitter._events || !emitter._events[event])
- emitter.on(event, fn);
- else if (Array.isArray(emitter._events[event]))
- emitter._events[event].unshift(fn);
- else
- emitter._events[event] = [fn, emitter._events[event]];
- }
-}
-
-function ReadableState(options, stream) {
- options = options || {};
-
- // object stream flag. Used to make read(n) ignore n and to
- // make all the buffer merging and length checks go away
- this.objectMode = !!options.objectMode;
-
- if (stream instanceof Stream.Duplex)
- this.objectMode = this.objectMode || !!options.readableObjectMode;
-
- // the point at which it stops calling _read() to fill the buffer
- // Note: 0 is a valid value, means "don't call _read preemptively ever"
- var hwm = options.highWaterMark;
- var defaultHwm = this.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
-
- // cast to ints.
- this.highWaterMark = ~~this.highWaterMark;
-
- // A linked list is used to store data chunks instead of an array because the
- // linked list can remove elements from the beginning faster than
- // array.shift()
- this.buffer = new BufferList();
- this.length = 0;
- this.pipes = null;
- this.pipesCount = 0;
- this.flowing = null;
- this.ended = false;
- this.endEmitted = false;
- this.reading = false;
-
- // a flag to be able to tell if the event 'readable'/'data' is emitted
- // immediately, or on a later tick. We set this to true at first, because
- // any actions that shouldn't happen until "later" should generally also
- // not happen before the first read call.
- this.sync = true;
-
- // whenever we return null, then we set a flag to say
- // that we're awaiting a 'readable' event emission.
- this.needReadable = false;
- this.emittedReadable = false;
- this.readableListening = false;
- this.resumeScheduled = false;
-
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = options.defaultEncoding || 'utf8';
-
- // the number of writers that are awaiting a drain event in .pipe()s
- this.awaitDrain = 0;
-
- // if true, a maybeReadMore has been scheduled
- this.readingMore = false;
-
- this.decoder = null;
- this.encoding = null;
- if (options.encoding) {
- if (!StringDecoder)
- StringDecoder = require('string_decoder').StringDecoder;
- this.decoder = new StringDecoder(options.encoding);
- this.encoding = options.encoding;
- }
-}
-
-function Readable(options) {
- if (!(this instanceof Readable))
- return new Readable(options);
-
- this._readableState = new ReadableState(options, this);
-
- // legacy
- this.readable = true;
-
- if (options && typeof options.read === 'function')
- this._read = options.read;
-
- Stream.call(this);
-}
-
-// Manually shove something into the read() buffer.
-// This returns true if the highWaterMark has not been hit yet,
-// similar to how Writable.write() returns true if you should
-// write() some more.
-Readable.prototype.push = function(chunk, encoding) {
- var state = this._readableState;
-
- if (!state.objectMode && typeof chunk === 'string') {
- encoding = encoding || state.defaultEncoding;
- if (encoding !== state.encoding) {
- chunk = Buffer.from(chunk, encoding);
- encoding = '';
- }
- }
-
- return readableAddChunk(this, state, chunk, encoding, false);
-};
-
-// Unshift should *always* be something directly out of read()
-Readable.prototype.unshift = function(chunk) {
- var state = this._readableState;
- return readableAddChunk(this, state, chunk, '', true);
-};
-
-Readable.prototype.isPaused = function() {
- return this._readableState.flowing === false;
-};
-
-function readableAddChunk(stream, state, chunk, encoding, addToFront) {
- var er = chunkInvalid(state, chunk);
- if (er) {
- stream.emit('error', er);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || chunk && chunk.length > 0) {
- if (state.ended && !addToFront) {
- const e = new Error('stream.push() after EOF');
- stream.emit('error', e);
- } else if (state.endEmitted && addToFront) {
- const e = new Error('stream.unshift() after end event');
- stream.emit('error', e);
- } else {
- var skipAdd;
- if (state.decoder && !addToFront && !encoding) {
- chunk = state.decoder.write(chunk);
- skipAdd = (!state.objectMode && chunk.length === 0);
- }
-
- if (!addToFront)
- state.reading = false;
-
- // Don't add to the buffer if we've decoded to an empty string chunk and
- // we're not in object mode
- if (!skipAdd) {
- // if we want the data now, just emit it.
- if (state.flowing && state.length === 0 && !state.sync) {
- stream.emit('data', chunk);
- stream.read(0);
- } else {
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront)
- state.buffer.unshift(chunk);
- else
- state.buffer.push(chunk);
-
- if (state.needReadable)
- emitReadable(stream);
- }
- }
-
- maybeReadMore(stream, state);
- }
- } else if (!addToFront) {
- state.reading = false;
- }
-
- return needMoreData(state);
-}
-
-
-// if it's past the high water mark, we can push in some more.
-// Also, if we have no data yet, we can stand some
-// more bytes. This is to work around cases where hwm=0,
-// such as the repl. Also, if the push() triggered a
-// readable event, and the user called read(largeNumber) such that
-// needReadable was set, then we ought to push more, so that another
-// 'readable' event will be triggered.
-function needMoreData(state) {
- return !state.ended &&
- (state.needReadable ||
- state.length < state.highWaterMark ||
- state.length === 0);
-}
-
-// backwards compatibility.
-Readable.prototype.setEncoding = function(enc) {
- if (!StringDecoder)
- StringDecoder = require('string_decoder').StringDecoder;
- this._readableState.decoder = new StringDecoder(enc);
- this._readableState.encoding = enc;
- return this;
-};
-
-// Don't raise the hwm > 8MB
-const MAX_HWM = 0x800000;
-function computeNewHighWaterMark(n) {
- if (n >= MAX_HWM) {
- n = MAX_HWM;
- } else {
- // Get the next highest power of 2 to prevent increasing hwm excessively in
- // tiny amounts
- n--;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n++;
- }
- return n;
-}
-
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function howMuchToRead(n, state) {
- if (n <= 0 || (state.length === 0 && state.ended))
- return 0;
- if (state.objectMode)
- return 1;
- if (n !== n) {
- // Only flow one buffer at a time
- if (state.flowing && state.length)
- return state.buffer.head.data.length;
- else
- return state.length;
- }
- // If we're asking for more than the current hwm, then raise the hwm.
- if (n > state.highWaterMark)
- state.highWaterMark = computeNewHighWaterMark(n);
- if (n <= state.length)
- return n;
- // Don't have enough
- if (!state.ended) {
- state.needReadable = true;
- return 0;
- }
- return state.length;
-}
-
-// you can override either this method, or the async _read(n) below.
-Readable.prototype.read = function(n) {
- debug('read', n);
- n = parseInt(n, 10);
- var state = this._readableState;
- var nOrig = n;
-
- if (n !== 0)
- state.emittedReadable = false;
-
- // if we're doing read(0) to trigger a readable event, but we
- // already have a bunch of data in the buffer, then just trigger
- // the 'readable' event and move on.
- if (n === 0 &&
- state.needReadable &&
- (state.length >= state.highWaterMark || state.ended)) {
- debug('read: emitReadable', state.length, state.ended);
- if (state.length === 0 && state.ended)
- endReadable(this);
- else
- emitReadable(this);
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- // if we've ended, and we're now clear, then finish it up.
- if (n === 0 && state.ended) {
- if (state.length === 0)
- endReadable(this);
- return null;
- }
-
- // All the actual chunk generation logic needs to be
- // *below* the call to _read. The reason is that in certain
- // synthetic stream cases, such as passthrough streams, _read
- // may be a completely synchronous operation which may change
- // the state of the read buffer, providing enough data when
- // before there was *not* enough.
- //
- // So, the steps are:
- // 1. Figure out what the state of things will be after we do
- // a read from the buffer.
- //
- // 2. If that resulting state will trigger a _read, then call _read.
- // Note that this may be asynchronous, or synchronous. Yes, it is
- // deeply ugly to write APIs this way, but that still doesn't mean
- // that the Readable class should behave improperly, as streams are
- // designed to be sync/async agnostic.
- // Take note if the _read call is sync or async (ie, if the read call
- // has returned yet), so that we know whether or not it's safe to emit
- // 'readable' etc.
- //
- // 3. Actually pull the requested chunks out of the buffer and return.
-
- // if we need a readable event, then we need to do some reading.
- var doRead = state.needReadable;
- debug('need readable', doRead);
-
- // if we currently have less than the highWaterMark, then also read some
- if (state.length === 0 || state.length - n < state.highWaterMark) {
- doRead = true;
- debug('length less than watermark', doRead);
- }
-
- // however, if we've ended, then there's no point, and if we're already
- // reading, then it's unnecessary.
- if (state.ended || state.reading) {
- doRead = false;
- debug('reading or ended', doRead);
- } else if (doRead) {
- debug('do read');
- state.reading = true;
- state.sync = true;
- // if the length is currently zero, then we *need* a readable event.
- if (state.length === 0)
- state.needReadable = true;
- // call internal read method
- this._read(state.highWaterMark);
- state.sync = false;
- // If _read pushed data synchronously, then `reading` will be false,
- // and we need to re-evaluate how much data we can return to the user.
- if (!state.reading)
- n = howMuchToRead(nOrig, state);
- }
-
- var ret;
- if (n > 0)
- ret = fromList(n, state);
- else
- ret = null;
-
- if (ret === null) {
- state.needReadable = true;
- n = 0;
- } else {
- state.length -= n;
- }
-
- if (state.length === 0) {
- // If we have nothing in the buffer, then we want to know
- // as soon as we *do* get something into the buffer.
- if (!state.ended)
- state.needReadable = true;
-
- // If we tried to read() past the EOF, then emit end on the next tick.
- if (nOrig !== n && state.ended)
- endReadable(this);
- }
-
- if (ret !== null)
- this.emit('data', ret);
-
- return ret;
-};
-
-function chunkInvalid(state, chunk) {
- var er = null;
- if (!(chunk instanceof Buffer) &&
- typeof chunk !== 'string' &&
- chunk !== null &&
- chunk !== undefined &&
- !state.objectMode) {
- er = new TypeError('Invalid non-string/buffer chunk');
- }
- return er;
-}
-
-
-function onEofChunk(stream, state) {
- if (state.ended) return;
- if (state.decoder) {
- var chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
-
- // emit 'readable' now to make sure it gets picked up.
- emitReadable(stream);
-}
-
-// Don't emit readable right away in sync mode, because this can trigger
-// another read() call => stack overflow. This way, it might trigger
-// a nextTick recursion warning, but that's not so bad.
-function emitReadable(stream) {
- var state = stream._readableState;
- state.needReadable = false;
- if (!state.emittedReadable) {
- debug('emitReadable', state.flowing);
- state.emittedReadable = true;
- if (state.sync)
- process.nextTick(emitReadable_, stream);
- else
- emitReadable_(stream);
- }
-}
-
-function emitReadable_(stream) {
- debug('emit readable');
- stream.emit('readable');
- flow(stream);
-}
-
-
-// at this point, the user has presumably seen the 'readable' event,
-// and called read() to consume some data. that may have triggered
-// in turn another _read(n) call, in which case reading = true if
-// it's in progress.
-// However, if we're not ended, or reading, and the length < hwm,
-// then go ahead and try to read some more preemptively.
-function maybeReadMore(stream, state) {
- if (!state.readingMore) {
- state.readingMore = true;
- process.nextTick(maybeReadMore_, stream, state);
- }
-}
-
-function maybeReadMore_(stream, state) {
- var len = state.length;
- while (!state.reading && !state.flowing && !state.ended &&
- state.length < state.highWaterMark) {
- debug('maybeReadMore read 0');
- stream.read(0);
- if (len === state.length)
- // didn't get any data, stop spinning.
- break;
- else
- len = state.length;
- }
- state.readingMore = false;
-}
-
-// abstract method. to be overridden in specific implementation classes.
-// call cb(er, data) where data is <= n in length.
-// for virtual (non-string, non-buffer) streams, "length" is somewhat
-// arbitrary, and perhaps not very meaningful.
-Readable.prototype._read = function(n) {
- this.emit('error', new Error('_read() is not implemented'));
-};
-
-Readable.prototype.pipe = function(dest, pipeOpts) {
- var src = this;
- var state = this._readableState;
-
- switch (state.pipesCount) {
- case 0:
- state.pipes = dest;
- break;
- case 1:
- state.pipes = [state.pipes, dest];
- break;
- default:
- state.pipes.push(dest);
- break;
- }
- state.pipesCount += 1;
- debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
-
- var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
- dest !== process.stdout &&
- dest !== process.stderr;
-
- var endFn = doEnd ? onend : unpipe;
- if (state.endEmitted)
- process.nextTick(endFn);
- else
- src.once('end', endFn);
-
- dest.on('unpipe', onunpipe);
- function onunpipe(readable) {
- debug('onunpipe');
- if (readable === src) {
- cleanup();
- }
- }
-
- function onend() {
- debug('onend');
- dest.end();
- }
-
- // when the dest drains, it reduces the awaitDrain counter
- // on the source. This would be more elegant with a .once()
- // handler in flow(), but adding and removing repeatedly is
- // too slow.
- var ondrain = pipeOnDrain(src);
- dest.on('drain', ondrain);
-
- var cleanedUp = false;
- function cleanup() {
- debug('cleanup');
- // cleanup event handlers once the pipe is broken
- dest.removeListener('close', onclose);
- dest.removeListener('finish', onfinish);
- dest.removeListener('drain', ondrain);
- dest.removeListener('error', onerror);
- dest.removeListener('unpipe', onunpipe);
- src.removeListener('end', onend);
- src.removeListener('end', unpipe);
- src.removeListener('data', ondata);
-
- cleanedUp = true;
-
- // if the reader is waiting for a drain event from this
- // specific writer, then it would cause it to never start
- // flowing again.
- // So, if this is awaiting a drain, then we just call it now.
- // If we don't know, then assume that we are waiting for one.
- if (state.awaitDrain &&
- (!dest._writableState || dest._writableState.needDrain))
- ondrain();
- }
-
- // If the user pushes more data while we're writing to dest then we'll end up
- // in ondata again. However, we only want to increase awaitDrain once because
- // dest will only emit one 'drain' event for the multiple writes.
- // => Introduce a guard on increasing awaitDrain.
- var increasedAwaitDrain = false;
- src.on('data', ondata);
- function ondata(chunk) {
- debug('ondata');
- increasedAwaitDrain = false;
- var ret = dest.write(chunk);
- if (false === ret && !increasedAwaitDrain) {
- // If the user unpiped during `dest.write()`, it is possible
- // to get stuck in a permanently paused state if that write
- // also returned false.
- // => Check whether `dest` is still a piping destination.
- if (((state.pipesCount === 1 && state.pipes === dest) ||
- (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
- !cleanedUp) {
- debug('false write response, pause', src._readableState.awaitDrain);
- src._readableState.awaitDrain++;
- increasedAwaitDrain = true;
- }
- src.pause();
- }
- }
-
- // if the dest has an error, then stop piping into it.
- // however, don't suppress the throwing behavior for this.
- function onerror(er) {
- debug('onerror', er);
- unpipe();
- dest.removeListener('error', onerror);
- if (EE.listenerCount(dest, 'error') === 0)
- dest.emit('error', er);
- }
-
- // Make sure our error handler is attached before userland ones.
- prependListener(dest, 'error', onerror);
-
- // Both close and finish should trigger unpipe, but only once.
- function onclose() {
- dest.removeListener('finish', onfinish);
- unpipe();
- }
- dest.once('close', onclose);
- function onfinish() {
- debug('onfinish');
- dest.removeListener('close', onclose);
- unpipe();
- }
- dest.once('finish', onfinish);
-
- function unpipe() {
- debug('unpipe');
- src.unpipe(dest);
- }
-
- // tell the dest that it's being piped to
- dest.emit('pipe', src);
-
- // start the flow if it hasn't been started already.
- if (!state.flowing) {
- debug('pipe resume');
- src.resume();
- }
-
- return dest;
-};
-
-function pipeOnDrain(src) {
- return function() {
- var state = src._readableState;
- debug('pipeOnDrain', state.awaitDrain);
- if (state.awaitDrain)
- state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
- state.flowing = true;
- flow(src);
- }
- };
-}
-
-
-Readable.prototype.unpipe = function(dest) {
- var state = this._readableState;
-
- // if we're not piping anywhere, then do nothing.
- if (state.pipesCount === 0)
- return this;
-
- // just one destination. most common case.
- if (state.pipesCount === 1) {
- // passed in one, but it's not the right one.
- if (dest && dest !== state.pipes)
- return this;
-
- if (!dest)
- dest = state.pipes;
-
- // got a match.
- state.pipes = null;
- state.pipesCount = 0;
- state.flowing = false;
- if (dest)
- dest.emit('unpipe', this);
- return this;
- }
-
- // slow case. multiple pipe destinations.
-
- if (!dest) {
- // remove all.
- var dests = state.pipes;
- var len = state.pipesCount;
- state.pipes = null;
- state.pipesCount = 0;
- state.flowing = false;
-
- for (var i = 0; i < len; i++)
- dests[i].emit('unpipe', this);
- return this;
- }
-
- // try to find the right one.
- var index = state.pipes.indexOf(dest);
- if (index === -1)
- return this;
-
- state.pipes.splice(index, 1);
- state.pipesCount -= 1;
- if (state.pipesCount === 1)
- state.pipes = state.pipes[0];
-
- dest.emit('unpipe', this);
-
- return this;
-};
-
-// set up data events if they are asked for
-// Ensure readable listeners eventually get something
-Readable.prototype.on = function(ev, fn) {
- const res = Stream.prototype.on.call(this, ev, fn);
-
- if (ev === 'data') {
- // Start flowing on next tick if stream isn't explicitly paused
- if (this._readableState.flowing !== false)
- this.resume();
- } else if (ev === 'readable') {
- const state = this._readableState;
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.emittedReadable = false;
- if (!state.reading) {
- process.nextTick(nReadingNextTick, this);
- } else if (state.length) {
- emitReadable(this, state);
- }
- }
- }
-
- return res;
-};
-Readable.prototype.addListener = Readable.prototype.on;
-
-function nReadingNextTick(self) {
- debug('readable nexttick read 0');
- self.read(0);
-}
-
-// pause() and resume() are remnants of the legacy readable stream API
-// If the user uses them, then switch into old mode.
-Readable.prototype.resume = function() {
- var state = this._readableState;
- if (!state.flowing) {
- debug('resume');
- state.flowing = true;
- resume(this, state);
- }
- return this;
-};
-
-function resume(stream, state) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- process.nextTick(resume_, stream, state);
- }
-}
-
-function resume_(stream, state) {
- if (!state.reading) {
- debug('resume read 0');
- stream.read(0);
- }
-
- state.resumeScheduled = false;
- state.awaitDrain = 0;
- stream.emit('resume');
- flow(stream);
- if (state.flowing && !state.reading)
- stream.read(0);
-}
-
-Readable.prototype.pause = function() {
- debug('call pause flowing=%j', this._readableState.flowing);
- if (false !== this._readableState.flowing) {
- debug('pause');
- this._readableState.flowing = false;
- this.emit('pause');
- }
- return this;
-};
-
-function flow(stream) {
- const state = stream._readableState;
- debug('flow', state.flowing);
- while (state.flowing && stream.read() !== null);
-}
-
-// wrap an old-style stream as the async data source.
-// This is *not* part of the readable stream interface.
-// It is an ugly unfortunate mess of history.
-Readable.prototype.wrap = function(stream) {
- var state = this._readableState;
- var paused = false;
-
- var self = this;
- stream.on('end', function() {
- debug('wrapped end');
- if (state.decoder && !state.ended) {
- var chunk = state.decoder.end();
- if (chunk && chunk.length)
- self.push(chunk);
- }
-
- self.push(null);
- });
-
- stream.on('data', function(chunk) {
- debug('wrapped data');
- if (state.decoder)
- chunk = state.decoder.write(chunk);
-
- // don't skip over falsy values in objectMode
- if (state.objectMode && (chunk === null || chunk === undefined))
- return;
- else if (!state.objectMode && (!chunk || !chunk.length))
- return;
-
- var ret = self.push(chunk);
- if (!ret) {
- paused = true;
- stream.pause();
- }
- });
-
- // proxy all the other methods.
- // important when wrapping filters and duplexes.
- for (var i in stream) {
- if (this[i] === undefined && typeof stream[i] === 'function') {
- this[i] = function(method) {
- return function() {
- return stream[method].apply(stream, arguments);
- };
- }(i);
- }
- }
-
- // proxy certain important events.
- for (var n = 0; n < kProxyEvents.length; n++) {
- stream.on(kProxyEvents[n], self.emit.bind(self, kProxyEvents[n]));
- }
-
- // when we try to consume some more bytes, simply unpause the
- // underlying stream.
- self._read = function(n) {
- debug('wrapped _read', n);
- if (paused) {
- paused = false;
- stream.resume();
- }
- };
-
- return self;
-};
-
-
-// exposed for testing purposes only.
-Readable._fromList = fromList;
-
-// Pluck off n bytes from an array of buffers.
-// Length is the combined lengths of all the buffers in the list.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function fromList(n, state) {
- // nothing buffered
- if (state.length === 0)
- return null;
-
- var ret;
- if (state.objectMode)
- ret = state.buffer.shift();
- else if (!n || n >= state.length) {
- // read it all, truncate the list
- if (state.decoder)
- ret = state.buffer.join('');
- else if (state.buffer.length === 1)
- ret = state.buffer.head.data;
- else
- ret = state.buffer.concat(state.length);
- state.buffer.clear();
- } else {
- // read part of list
- ret = fromListPartial(n, state.buffer, state.decoder);
- }
-
- return ret;
-}
-
-// Extracts only enough buffered data to satisfy the amount requested.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function fromListPartial(n, list, hasStrings) {
- var ret;
- if (n < list.head.data.length) {
- // slice is the same for buffers and strings
- ret = list.head.data.slice(0, n);
- list.head.data = list.head.data.slice(n);
- } else if (n === list.head.data.length) {
- // first chunk is a perfect match
- ret = list.shift();
- } else {
- // result spans more than one buffer
- ret = (hasStrings ?
- copyFromBufferString(n, list) :
- copyFromBuffer(n, list));
- }
- return ret;
-}
-
-// Copies a specified amount of characters from the list of buffered data
-// chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBufferString(n, list) {
- var p = list.head;
- var c = 1;
- var ret = p.data;
- n -= ret.length;
- while (p = p.next) {
- const str = p.data;
- const nb = (n > str.length ? str.length : n);
- if (nb === str.length)
- ret += str;
- else
- ret += str.slice(0, n);
- n -= nb;
- if (n === 0) {
- if (nb === str.length) {
- ++c;
- if (p.next)
- list.head = p.next;
- else
- list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = str.slice(nb);
- }
- break;
- }
- ++c;
- }
- list.length -= c;
- return ret;
-}
-
-// Copies a specified amount of bytes from the list of buffered data chunks.
-// This function is designed to be inlinable, so please take care when making
-// changes to the function body.
-function copyFromBuffer(n, list) {
- const ret = Buffer.allocUnsafe(n);
- var p = list.head;
- var c = 1;
- p.data.copy(ret);
- n -= p.data.length;
- while (p = p.next) {
- const buf = p.data;
- const nb = (n > buf.length ? buf.length : n);
- buf.copy(ret, ret.length - n, 0, nb);
- n -= nb;
- if (n === 0) {
- if (nb === buf.length) {
- ++c;
- if (p.next)
- list.head = p.next;
- else
- list.head = list.tail = null;
- } else {
- list.head = p;
- p.data = buf.slice(nb);
- }
- break;
- }
- ++c;
- }
- list.length -= c;
- return ret;
-}
-
-function endReadable(stream) {
- var state = stream._readableState;
-
- // If we get here before consuming all the bytes, then that is a
- // bug in node. Should never happen.
- if (state.length > 0)
- throw new Error('"endReadable()" called on non-empty stream');
-
- if (!state.endEmitted) {
- state.ended = true;
- process.nextTick(endReadableNT, state, stream);
- }
-}
+process.emitWarning(`The ${__filename} module is deprecated. Please use stream`,
+ 'DeprecationWarning', 'DEP00XX');
-function endReadableNT(state, stream) {
- // Check that we didn't get one last unshift.
- if (!state.endEmitted && state.length === 0) {
- state.endEmitted = true;
- stream.readable = false;
- stream.emit('end');
- }
-}
+module.exports = require('internal/streams/readable');
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 8adf3ed12d9384..0bb462ffabcc98 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -1,216 +1,6 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-// a transform stream is a readable/writable stream where you do
-// something with the data. Sometimes it's called a "filter",
-// but that's not a great name for it, since that implies a thing where
-// some bits pass through, and others are simply ignored. (That would
-// be a valid example of a transform, of course.)
-//
-// While the output is causally related to the input, it's not a
-// necessarily symmetric or synchronous transformation. For example,
-// a zlib stream might take multiple plain-text writes(), and then
-// emit a single compressed chunk some time in the future.
-//
-// Here's how this works:
-//
-// The Transform stream has all the aspects of the readable and writable
-// stream classes. When you write(chunk), that calls _write(chunk,cb)
-// internally, and returns false if there's a lot of pending writes
-// buffered up. When you call read(), that calls _read(n) until
-// there's enough pending readable data buffered up.
-//
-// In a transform stream, the written data is placed in a buffer. When
-// _read(n) is called, it transforms the queued up data, calling the
-// buffered _write cb's as it consumes chunks. If consuming a single
-// written chunk would result in multiple output chunks, then the first
-// outputted bit calls the readcb, and subsequent chunks just go into
-// the read buffer, and will cause it to emit 'readable' if necessary.
-//
-// This way, back-pressure is actually determined by the reading side,
-// since _read has to be called to start processing a new chunk. However,
-// a pathological inflate type of transform can cause excessive buffering
-// here. For example, imagine a stream where every byte of input is
-// interpreted as an integer from 0-255, and then results in that many
-// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
-// 1kb of data being output. In this case, you could write a very small
-// amount of input, and end up with a very large amount of output. In
-// such a pathological inflating mechanism, there'd be no way to tell
-// the system to stop doing the transform. A single 4MB write could
-// cause the system to run out of memory.
-//
-// However, even in such a pathological case, only a single written chunk
-// would be consumed, and then the rest would wait (un-transformed) until
-// the results of the previous transformed chunk were consumed.
-
'use strict';
-module.exports = Transform;
-
-const Duplex = require('_stream_duplex');
-const util = require('util');
-util.inherits(Transform, Duplex);
-
-
-function TransformState(stream) {
- this.afterTransform = function(er, data) {
- return afterTransform(stream, er, data);
- };
-
- this.needTransform = false;
- this.transforming = false;
- this.writecb = null;
- this.writechunk = null;
- this.writeencoding = null;
-}
-
-function afterTransform(stream, er, data) {
- var ts = stream._transformState;
- ts.transforming = false;
-
- var cb = ts.writecb;
-
- if (!cb)
- return stream.emit('error', new Error('no writecb in Transform class'));
-
- ts.writechunk = null;
- ts.writecb = null;
-
- if (data !== null && data !== undefined)
- stream.push(data);
-
- cb(er);
-
- var rs = stream._readableState;
- rs.reading = false;
- if (rs.needReadable || rs.length < rs.highWaterMark) {
- stream._read(rs.highWaterMark);
- }
-}
-
-
-function Transform(options) {
- if (!(this instanceof Transform))
- return new Transform(options);
-
- Duplex.call(this, options);
-
- this._transformState = new TransformState(this);
-
- var stream = this;
-
- // start out asking for a readable event once data is transformed.
- this._readableState.needReadable = true;
-
- // we have implemented the _read method, and done the other things
- // that Readable wants before the first _read call, so unset the
- // sync guard flag.
- this._readableState.sync = false;
-
- if (options) {
- if (typeof options.transform === 'function')
- this._transform = options.transform;
-
- if (typeof options.flush === 'function')
- this._flush = options.flush;
- }
-
- // When the writable side finishes, then flush out anything remaining.
- this.once('prefinish', function() {
- if (typeof this._flush === 'function')
- this._flush(function(er, data) {
- done(stream, er, data);
- });
- else
- done(stream);
- });
-}
-
-Transform.prototype.push = function(chunk, encoding) {
- this._transformState.needTransform = false;
- return Duplex.prototype.push.call(this, chunk, encoding);
-};
-
-// This is the part where you do stuff!
-// override this function in implementation classes.
-// 'chunk' is an input chunk.
-//
-// Call `push(newChunk)` to pass along transformed output
-// to the readable side. You may call 'push' zero or more times.
-//
-// Call `cb(err)` when you are done with this chunk. If you pass
-// an error, then that'll put the hurt on the whole operation. If you
-// never call cb(), then you'll never get another chunk.
-Transform.prototype._transform = function(chunk, encoding, cb) {
- throw new Error('_transform() is not implemented');
-};
-
-Transform.prototype._write = function(chunk, encoding, cb) {
- var ts = this._transformState;
- ts.writecb = cb;
- ts.writechunk = chunk;
- ts.writeencoding = encoding;
- if (!ts.transforming) {
- var rs = this._readableState;
- if (ts.needTransform ||
- rs.needReadable ||
- rs.length < rs.highWaterMark)
- this._read(rs.highWaterMark);
- }
-};
-
-// Doesn't matter what the args are here.
-// _transform does all the work.
-// That we got here means that the readable side wants more data.
-Transform.prototype._read = function(n) {
- var ts = this._transformState;
-
- if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
- ts.transforming = true;
- this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
- } else {
- // mark that we need a transform, so that any data that comes in
- // will get processed, now that we've asked for it.
- ts.needTransform = true;
- }
-};
-
-
-function done(stream, er, data) {
- if (er)
- return stream.emit('error', er);
-
- if (data !== null && data !== undefined)
- stream.push(data);
-
- // if there's nothing in the write buffer, then that means
- // that nothing more will ever be provided
- var ws = stream._writableState;
- var ts = stream._transformState;
-
- if (ws.length)
- throw new Error('Calling transform done when ws.length != 0');
-
- if (ts.transforming)
- throw new Error('Calling transform done when still transforming');
+process.emitWarning(`The ${__filename} module is deprecated. Please use stream`,
+ 'DeprecationWarning', 'DEP00XX');
- return stream.push(null);
-}
+module.exports = require('internal/streams/transform');
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index a0a37fbb7a0421..a650351168a388 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -1,565 +1,6 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-// A bit simpler than readable streams.
-// Implement an async ._write(chunk, encoding, cb), and it'll handle all
-// the drain event emission and buffering.
-
'use strict';
-module.exports = Writable;
-Writable.WritableState = WritableState;
-
-const util = require('util');
-const internalUtil = require('internal/util');
-const Stream = require('stream');
-const Buffer = require('buffer').Buffer;
-
-util.inherits(Writable, Stream);
-
-function nop() {}
-
-function WritableState(options, stream) {
- options = options || {};
-
- // object stream flag to indicate whether or not this stream
- // contains buffers or objects.
- this.objectMode = !!options.objectMode;
-
- if (stream instanceof Stream.Duplex)
- this.objectMode = this.objectMode || !!options.writableObjectMode;
-
- // the point at which write() starts returning false
- // Note: 0 is a valid value, means that we always return false if
- // the entire buffer is not flushed immediately on write()
- var hwm = options.highWaterMark;
- var defaultHwm = this.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
-
- // cast to ints.
- this.highWaterMark = ~~this.highWaterMark;
-
- // drain event flag.
- this.needDrain = false;
- // at the start of calling end()
- this.ending = false;
- // when end() has been called, and returned
- this.ended = false;
- // when 'finish' is emitted
- this.finished = false;
-
- // should we decode strings into buffers before passing to _write?
- // this is here so that some node-core streams can optimize string
- // handling at a lower level.
- var noDecode = options.decodeStrings === false;
- this.decodeStrings = !noDecode;
-
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = options.defaultEncoding || 'utf8';
-
- // not an actual buffer we keep track of, but a measurement
- // of how much we're waiting to get pushed to some underlying
- // socket or file.
- this.length = 0;
-
- // a flag to see when we're in the middle of a write.
- this.writing = false;
-
- // when true all writes will be buffered until .uncork() call
- this.corked = 0;
-
- // a flag to be able to tell if the onwrite cb is called immediately,
- // or on a later tick. We set this to true at first, because any
- // actions that shouldn't happen until "later" should generally also
- // not happen before the first write call.
- this.sync = true;
-
- // a flag to know if we're processing previously buffered items, which
- // may call the _write() callback in the same tick, so that we don't
- // end up in an overlapped onwrite situation.
- this.bufferProcessing = false;
-
- // the callback that's passed to _write(chunk,cb)
- this.onwrite = onwrite.bind(undefined, stream);
-
- // the callback that the user supplies to write(chunk,encoding,cb)
- this.writecb = null;
-
- // the amount that is being written when _write is called.
- this.writelen = 0;
-
- this.bufferedRequest = null;
- this.lastBufferedRequest = null;
-
- // number of pending user-supplied write callbacks
- // this must be 0 before 'finish' can be emitted
- this.pendingcb = 0;
-
- // emit prefinish if the only thing we're waiting for is _write cbs
- // This is relevant for synchronous Transform streams
- this.prefinished = false;
-
- // True if the error was already emitted and should not be thrown again
- this.errorEmitted = false;
-
- // count buffered requests
- this.bufferedRequestCount = 0;
-
- // allocate the first CorkedRequest, there is always
- // one allocated and free to use, and we maintain at most two
- var corkReq = { next: null, entry: null, finish: undefined };
- corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
- this.corkedRequestsFree = corkReq;
-}
-
-WritableState.prototype.getBuffer = function getBuffer() {
- var current = this.bufferedRequest;
- var out = [];
- while (current) {
- out.push(current);
- current = current.next;
- }
- return out;
-};
-
-Object.defineProperty(WritableState.prototype, 'buffer', {
- get: internalUtil.deprecate(function() {
- return this.getBuffer();
- }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
- 'instead.', 'DEP0003')
-});
-
-// Test _writableState for inheritance to account for Duplex streams,
-// whose prototype chain only points to Readable.
-var realHasInstance;
-if (typeof Symbol === 'function' && Symbol.hasInstance) {
- realHasInstance = Function.prototype[Symbol.hasInstance];
- Object.defineProperty(Writable, Symbol.hasInstance, {
- value: function(object) {
- if (realHasInstance.call(this, object))
- return true;
-
- return object && object._writableState instanceof WritableState;
- }
- });
-} else {
- realHasInstance = function(object) {
- return object instanceof this;
- };
-}
-
-function Writable(options) {
- // Writable ctor is applied to Duplexes, too.
- // `realHasInstance` is necessary because using plain `instanceof`
- // would return false, as no `_writableState` property is attached.
-
- // Trying to use the custom `instanceof` for Writable here will also break the
- // Node.js LazyTransform implementation, which has a non-trivial getter for
- // `_writableState` that would lead to infinite recursion.
- if (!(realHasInstance.call(Writable, this)) &&
- !(this instanceof Stream.Duplex)) {
- return new Writable(options);
- }
-
- this._writableState = new WritableState(options, this);
-
- // legacy.
- this.writable = true;
-
- if (options) {
- if (typeof options.write === 'function')
- this._write = options.write;
-
- if (typeof options.writev === 'function')
- this._writev = options.writev;
- }
-
- Stream.call(this);
-}
-
-// Otherwise people can pipe Writable streams, which is just wrong.
-Writable.prototype.pipe = function() {
- this.emit('error', new Error('Cannot pipe, not readable'));
-};
-
-
-function writeAfterEnd(stream, cb) {
- var er = new Error('write after end');
- // TODO: defer error events consistently everywhere, not just the cb
- stream.emit('error', er);
- process.nextTick(cb, er);
-}
-
-// Checks that a user-supplied chunk is valid, especially for the particular
-// mode the stream is in. Currently this means that `null` is never accepted
-// and undefined/non-string values are only allowed in object mode.
-function validChunk(stream, state, chunk, cb) {
- var valid = true;
- var er = false;
-
- if (chunk === null) {
- er = new TypeError('May not write null values to stream');
- } else if (typeof chunk !== 'string' &&
- chunk !== undefined &&
- !state.objectMode) {
- er = new TypeError('Invalid non-string/buffer chunk');
- }
- if (er) {
- stream.emit('error', er);
- process.nextTick(cb, er);
- valid = false;
- }
- return valid;
-}
-
-Writable.prototype.write = function(chunk, encoding, cb) {
- var state = this._writableState;
- var ret = false;
- var isBuf = (chunk instanceof Buffer);
-
- if (typeof encoding === 'function') {
- cb = encoding;
- encoding = null;
- }
-
- if (isBuf)
- encoding = 'buffer';
- else if (!encoding)
- encoding = state.defaultEncoding;
-
- if (typeof cb !== 'function')
- cb = nop;
-
- if (state.ended)
- writeAfterEnd(this, cb);
- else if (isBuf || validChunk(this, state, chunk, cb)) {
- state.pendingcb++;
- ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
- }
-
- return ret;
-};
-
-Writable.prototype.cork = function() {
- var state = this._writableState;
-
- state.corked++;
-};
-
-Writable.prototype.uncork = function() {
- var state = this._writableState;
-
- if (state.corked) {
- state.corked--;
-
- if (!state.writing &&
- !state.corked &&
- !state.finished &&
- !state.bufferProcessing &&
- state.bufferedRequest)
- clearBuffer(this, state);
- }
-};
-
-Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === 'string')
- encoding = encoding.toLowerCase();
- if (!Buffer.isEncoding(encoding))
- throw new TypeError('Unknown encoding: ' + encoding);
- this._writableState.defaultEncoding = encoding;
- return this;
-};
-
-function decodeChunk(state, chunk, encoding) {
- if (!state.objectMode &&
- state.decodeStrings !== false &&
- typeof chunk === 'string') {
- chunk = Buffer.from(chunk, encoding);
- }
- return chunk;
-}
-
-// if we're already writing something, then just put this
-// in the queue, and wait our turn. Otherwise, call _write
-// If we return false, then we need a drain event, so set that flag.
-function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
- if (!isBuf) {
- var newChunk = decodeChunk(state, chunk, encoding);
- if (chunk !== newChunk) {
- encoding = 'buffer';
- chunk = newChunk;
- }
- }
- var len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- var ret = state.length < state.highWaterMark;
- // we must ensure that previous needDrain will not be reset to false.
- if (!ret)
- state.needDrain = true;
-
- if (state.writing || state.corked) {
- var last = state.lastBufferedRequest;
- state.lastBufferedRequest = { chunk, encoding, callback: cb, next: null };
- if (last) {
- last.next = state.lastBufferedRequest;
- } else {
- state.bufferedRequest = state.lastBufferedRequest;
- }
- state.bufferedRequestCount += 1;
- } else {
- doWrite(stream, state, false, len, chunk, encoding, cb);
- }
-
- return ret;
-}
-
-function doWrite(stream, state, writev, len, chunk, encoding, cb) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (writev)
- stream._writev(chunk, state.onwrite);
- else
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
-}
-
-function onwriteError(stream, state, sync, er, cb) {
- --state.pendingcb;
- if (sync)
- process.nextTick(cb, er);
- else
- cb(er);
-
- stream._writableState.errorEmitted = true;
- stream.emit('error', er);
-}
-
-function onwriteStateUpdate(state) {
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-}
-
-function onwrite(stream, er) {
- var state = stream._writableState;
- var sync = state.sync;
- var cb = state.writecb;
-
- onwriteStateUpdate(state);
-
- if (er)
- onwriteError(stream, state, sync, er, cb);
- else {
- // Check if we're actually ready to finish, but don't emit yet
- var finished = needFinish(state);
-
- if (!finished &&
- !state.corked &&
- !state.bufferProcessing &&
- state.bufferedRequest) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- process.nextTick(afterWrite, stream, state, finished, cb);
- } else {
- afterWrite(stream, state, finished, cb);
- }
- }
-}
-
-function afterWrite(stream, state, finished, cb) {
- if (!finished)
- onwriteDrain(stream, state);
- state.pendingcb--;
- cb();
- finishMaybe(stream, state);
-}
-
-// Must force callback to be called on nextTick, so that we don't
-// emit 'drain' before the write() consumer gets the 'false' return
-// value, and has a chance to attach a 'drain' listener.
-function onwriteDrain(stream, state) {
- if (state.length === 0 && state.needDrain) {
- state.needDrain = false;
- stream.emit('drain');
- }
-}
-
-// if there's something in the buffer waiting, then process it
-function clearBuffer(stream, state) {
- state.bufferProcessing = true;
- var entry = state.bufferedRequest;
-
- if (stream._writev && entry && entry.next) {
- // Fast case, write everything using _writev()
- var l = state.bufferedRequestCount;
- var buffer = new Array(l);
- var holder = state.corkedRequestsFree;
- holder.entry = entry;
-
- var count = 0;
- while (entry) {
- buffer[count] = entry;
- entry = entry.next;
- count += 1;
- }
-
- doWrite(stream, state, true, state.length, buffer, '', holder.finish);
-
- // doWrite is almost always async, defer these to save a bit of time
- // as the hot path ends with doWrite
- state.pendingcb++;
- state.lastBufferedRequest = null;
- if (holder.next) {
- state.corkedRequestsFree = holder.next;
- holder.next = null;
- } else {
- var corkReq = { next: null, entry: null, finish: undefined };
- corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
- state.corkedRequestsFree = corkReq;
- }
- } else {
- // Slow case, write chunks one-by-one
- while (entry) {
- var chunk = entry.chunk;
- var encoding = entry.encoding;
- var cb = entry.callback;
- var len = state.objectMode ? 1 : chunk.length;
-
- doWrite(stream, state, false, len, chunk, encoding, cb);
- entry = entry.next;
- // if we didn't call the onwrite immediately, then
- // it means that we need to wait until it does.
- // also, that means that the chunk and cb are currently
- // being processed, so move the buffer counter past them.
- if (state.writing) {
- break;
- }
- }
-
- if (entry === null)
- state.lastBufferedRequest = null;
- }
-
- state.bufferedRequestCount = 0;
- state.bufferedRequest = entry;
- state.bufferProcessing = false;
-}
-
-Writable.prototype._write = function(chunk, encoding, cb) {
- cb(new Error('_write() is not implemented'));
-};
-
-Writable.prototype._writev = null;
-
-Writable.prototype.end = function(chunk, encoding, cb) {
- var state = this._writableState;
-
- if (typeof chunk === 'function') {
- cb = chunk;
- chunk = null;
- encoding = null;
- } else if (typeof encoding === 'function') {
- cb = encoding;
- encoding = null;
- }
-
- if (chunk !== null && chunk !== undefined)
- this.write(chunk, encoding);
-
- // .end() fully uncorks
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- // ignore unnecessary end() calls.
- if (!state.ending && !state.finished)
- endWritable(this, state, cb);
-};
-
-
-function needFinish(state) {
- return (state.ending &&
- state.length === 0 &&
- state.bufferedRequest === null &&
- !state.finished &&
- !state.writing);
-}
-
-function prefinish(stream, state) {
- if (!state.prefinished) {
- state.prefinished = true;
- stream.emit('prefinish');
- }
-}
-
-function finishMaybe(stream, state) {
- var need = needFinish(state);
- if (need) {
- if (state.pendingcb === 0) {
- prefinish(stream, state);
- state.finished = true;
- stream.emit('finish');
- } else {
- prefinish(stream, state);
- }
- }
- return need;
-}
-
-function endWritable(stream, state, cb) {
- state.ending = true;
- finishMaybe(stream, state);
- if (cb) {
- if (state.finished)
- process.nextTick(cb);
- else
- stream.once('finish', cb);
- }
- state.ended = true;
- stream.writable = false;
-}
+process.emitWarning(`The ${__filename} module is deprecated. Please use stream`,
+ 'DeprecationWarning', 'DEP00XX');
-function onCorkedFinish(corkReq, state, err) {
- var entry = corkReq.entry;
- corkReq.entry = null;
- while (entry) {
- var cb = entry.callback;
- state.pendingcb--;
- cb(err);
- entry = entry.next;
- }
- if (state.corkedRequestsFree) {
- state.corkedRequestsFree.next = corkReq;
- } else {
- state.corkedRequestsFree = corkReq;
- }
-}
+module.exports = require('internal/streams/writable');
diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js
new file mode 100644
index 00000000000000..168cb34d8215cf
--- /dev/null
+++ b/lib/internal/streams/duplex.js
@@ -0,0 +1,78 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// a duplex stream is just a stream that is both readable and writable.
+// Since JS doesn't have multiple prototypal inheritance, this class
+// prototypally inherits from Readable, and then parasitically from
+// Writable.
+
+'use strict';
+
+module.exports = Duplex;
+
+const util = require('util');
+const Readable = require('internal/streams/readable');
+const Writable = require('internal/streams/writable');
+
+util.inherits(Duplex, Readable);
+
+var keys = Object.keys(Writable.prototype);
+for (var v = 0; v < keys.length; v++) {
+ var method = keys[v];
+ if (!Duplex.prototype[method])
+ Duplex.prototype[method] = Writable.prototype[method];
+}
+
+function Duplex(options) {
+ if (!(this instanceof Duplex))
+ return new Duplex(options);
+
+ Readable.call(this, options);
+ Writable.call(this, options);
+
+ if (options && options.readable === false)
+ this.readable = false;
+
+ if (options && options.writable === false)
+ this.writable = false;
+
+ this.allowHalfOpen = true;
+ if (options && options.allowHalfOpen === false)
+ this.allowHalfOpen = false;
+
+ this.once('end', onend);
+}
+
+// the no-half-open enforcer
+function onend() {
+ // if we allow half-open state, or if the writable side ended,
+ // then we're ok.
+ if (this.allowHalfOpen || this._writableState.ended)
+ return;
+
+ // no more data can be written.
+ // But allow more writes to happen in this tick.
+ process.nextTick(onEndNT, this);
+}
+
+function onEndNT(self) {
+ self.end();
+}
diff --git a/lib/internal/streams/passthrough.js b/lib/internal/streams/passthrough.js
new file mode 100644
index 00000000000000..9bdf86ef76af3e
--- /dev/null
+++ b/lib/internal/streams/passthrough.js
@@ -0,0 +1,43 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// a passthrough stream.
+// basically just the most minimal sort of Transform stream.
+// Every written chunk gets output as-is.
+
+'use strict';
+
+module.exports = PassThrough;
+
+const Transform = require('internal/streams/transform');
+const util = require('util');
+util.inherits(PassThrough, Transform);
+
+function PassThrough(options) {
+ if (!(this instanceof PassThrough))
+ return new PassThrough(options);
+
+ Transform.call(this, options);
+}
+
+PassThrough.prototype._transform = function(chunk, encoding, cb) {
+ cb(null, chunk);
+};
diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
new file mode 100644
index 00000000000000..879e5ddb53fe40
--- /dev/null
+++ b/lib/internal/streams/readable.js
@@ -0,0 +1,994 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+'use strict';
+
+module.exports = Readable;
+Readable.ReadableState = ReadableState;
+
+const EE = require('events');
+const Stream = require('stream');
+const Buffer = require('buffer').Buffer;
+const util = require('util');
+const debug = util.debuglog('stream');
+const BufferList = require('internal/streams/BufferList');
+var StringDecoder;
+
+util.inherits(Readable, Stream);
+
+const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
+
+function prependListener(emitter, event, fn) {
+ // Sadly this is not cacheable as some libraries bundle their own
+ // event emitter implementation with them.
+ if (typeof emitter.prependListener === 'function') {
+ return emitter.prependListener(event, fn);
+ } else {
+ // This is a hack to make sure that our error handler is attached before any
+ // userland ones. NEVER DO THIS. This is here only because this code needs
+ // to continue to work with older versions of Node.js that do not include
+ // the prependListener() method. The goal is to eventually remove this hack.
+ if (!emitter._events || !emitter._events[event])
+ emitter.on(event, fn);
+ else if (Array.isArray(emitter._events[event]))
+ emitter._events[event].unshift(fn);
+ else
+ emitter._events[event] = [fn, emitter._events[event]];
+ }
+}
+
+function ReadableState(options, stream) {
+ options = options || {};
+
+ // object stream flag. Used to make read(n) ignore n and to
+ // make all the buffer merging and length checks go away
+ this.objectMode = !!options.objectMode;
+
+ if (stream instanceof Stream.Duplex)
+ this.objectMode = this.objectMode || !!options.readableObjectMode;
+
+ // the point at which it stops calling _read() to fill the buffer
+ // Note: 0 is a valid value, means "don't call _read preemptively ever"
+ var hwm = options.highWaterMark;
+ var defaultHwm = this.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
+
+ // cast to ints.
+ this.highWaterMark = ~~this.highWaterMark;
+
+ // A linked list is used to store data chunks instead of an array because the
+ // linked list can remove elements from the beginning faster than
+ // array.shift()
+ this.buffer = new BufferList();
+ this.length = 0;
+ this.pipes = null;
+ this.pipesCount = 0;
+ this.flowing = null;
+ this.ended = false;
+ this.endEmitted = false;
+ this.reading = false;
+
+ // a flag to be able to tell if the event 'readable'/'data' is emitted
+ // immediately, or on a later tick. We set this to true at first, because
+ // any actions that shouldn't happen until "later" should generally also
+ // not happen before the first read call.
+ this.sync = true;
+
+ // whenever we return null, then we set a flag to say
+ // that we're awaiting a 'readable' event emission.
+ this.needReadable = false;
+ this.emittedReadable = false;
+ this.readableListening = false;
+ this.resumeScheduled = false;
+
+ // Crypto is kind of old and crusty. Historically, its default string
+ // encoding is 'binary' so we have to make this configurable.
+ // Everything else in the universe uses 'utf8', though.
+ this.defaultEncoding = options.defaultEncoding || 'utf8';
+
+ // the number of writers that are awaiting a drain event in .pipe()s
+ this.awaitDrain = 0;
+
+ // if true, a maybeReadMore has been scheduled
+ this.readingMore = false;
+
+ this.decoder = null;
+ this.encoding = null;
+ if (options.encoding) {
+ if (!StringDecoder)
+ StringDecoder = require('string_decoder').StringDecoder;
+ this.decoder = new StringDecoder(options.encoding);
+ this.encoding = options.encoding;
+ }
+}
+
+function Readable(options) {
+ if (!(this instanceof Readable))
+ return new Readable(options);
+
+ this._readableState = new ReadableState(options, this);
+
+ // legacy
+ this.readable = true;
+
+ if (options && typeof options.read === 'function')
+ this._read = options.read;
+
+ Stream.call(this);
+}
+
+// Manually shove something into the read() buffer.
+// This returns true if the highWaterMark has not been hit yet,
+// similar to how Writable.write() returns true if you should
+// write() some more.
+Readable.prototype.push = function(chunk, encoding) {
+ var state = this._readableState;
+
+ if (!state.objectMode && typeof chunk === 'string') {
+ encoding = encoding || state.defaultEncoding;
+ if (encoding !== state.encoding) {
+ chunk = Buffer.from(chunk, encoding);
+ encoding = '';
+ }
+ }
+
+ return readableAddChunk(this, state, chunk, encoding, false);
+};
+
+// Unshift should *always* be something directly out of read()
+Readable.prototype.unshift = function(chunk) {
+ var state = this._readableState;
+ return readableAddChunk(this, state, chunk, '', true);
+};
+
+Readable.prototype.isPaused = function() {
+ return this._readableState.flowing === false;
+};
+
+function readableAddChunk(stream, state, chunk, encoding, addToFront) {
+ var er = chunkInvalid(state, chunk);
+ if (er) {
+ stream.emit('error', er);
+ } else if (chunk === null) {
+ state.reading = false;
+ onEofChunk(stream, state);
+ } else if (state.objectMode || chunk && chunk.length > 0) {
+ if (state.ended && !addToFront) {
+ const e = new Error('stream.push() after EOF');
+ stream.emit('error', e);
+ } else if (state.endEmitted && addToFront) {
+ const e = new Error('stream.unshift() after end event');
+ stream.emit('error', e);
+ } else {
+ var skipAdd;
+ if (state.decoder && !addToFront && !encoding) {
+ chunk = state.decoder.write(chunk);
+ skipAdd = (!state.objectMode && chunk.length === 0);
+ }
+
+ if (!addToFront)
+ state.reading = false;
+
+ // Don't add to the buffer if we've decoded to an empty string chunk and
+ // we're not in object mode
+ if (!skipAdd) {
+ // if we want the data now, just emit it.
+ if (state.flowing && state.length === 0 && !state.sync) {
+ stream.emit('data', chunk);
+ stream.read(0);
+ } else {
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront)
+ state.buffer.unshift(chunk);
+ else
+ state.buffer.push(chunk);
+
+ if (state.needReadable)
+ emitReadable(stream);
+ }
+ }
+
+ maybeReadMore(stream, state);
+ }
+ } else if (!addToFront) {
+ state.reading = false;
+ }
+
+ return needMoreData(state);
+}
+
+
+// if it's past the high water mark, we can push in some more.
+// Also, if we have no data yet, we can stand some
+// more bytes. This is to work around cases where hwm=0,
+// such as the repl. Also, if the push() triggered a
+// readable event, and the user called read(largeNumber) such that
+// needReadable was set, then we ought to push more, so that another
+// 'readable' event will be triggered.
+function needMoreData(state) {
+ return !state.ended &&
+ (state.needReadable ||
+ state.length < state.highWaterMark ||
+ state.length === 0);
+}
+
+// backwards compatibility.
+Readable.prototype.setEncoding = function(enc) {
+ if (!StringDecoder)
+ StringDecoder = require('string_decoder').StringDecoder;
+ this._readableState.decoder = new StringDecoder(enc);
+ this._readableState.encoding = enc;
+ return this;
+};
+
+// Don't raise the hwm > 8MB
+const MAX_HWM = 0x800000;
+function computeNewHighWaterMark(n) {
+ if (n >= MAX_HWM) {
+ n = MAX_HWM;
+ } else {
+ // Get the next highest power of 2 to prevent increasing hwm excessively in
+ // tiny amounts
+ n--;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ n++;
+ }
+ return n;
+}
+
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function howMuchToRead(n, state) {
+ if (n <= 0 || (state.length === 0 && state.ended))
+ return 0;
+ if (state.objectMode)
+ return 1;
+ if (n !== n) {
+ // Only flow one buffer at a time
+ if (state.flowing && state.length)
+ return state.buffer.head.data.length;
+ else
+ return state.length;
+ }
+ // If we're asking for more than the current hwm, then raise the hwm.
+ if (n > state.highWaterMark)
+ state.highWaterMark = computeNewHighWaterMark(n);
+ if (n <= state.length)
+ return n;
+ // Don't have enough
+ if (!state.ended) {
+ state.needReadable = true;
+ return 0;
+ }
+ return state.length;
+}
+
+// you can override either this method, or the async _read(n) below.
+Readable.prototype.read = function(n) {
+ debug('read', n);
+ n = parseInt(n, 10);
+ var state = this._readableState;
+ var nOrig = n;
+
+ if (n !== 0)
+ state.emittedReadable = false;
+
+ // if we're doing read(0) to trigger a readable event, but we
+ // already have a bunch of data in the buffer, then just trigger
+ // the 'readable' event and move on.
+ if (n === 0 &&
+ state.needReadable &&
+ (state.length >= state.highWaterMark || state.ended)) {
+ debug('read: emitReadable', state.length, state.ended);
+ if (state.length === 0 && state.ended)
+ endReadable(this);
+ else
+ emitReadable(this);
+ return null;
+ }
+
+ n = howMuchToRead(n, state);
+
+ // if we've ended, and we're now clear, then finish it up.
+ if (n === 0 && state.ended) {
+ if (state.length === 0)
+ endReadable(this);
+ return null;
+ }
+
+ // All the actual chunk generation logic needs to be
+ // *below* the call to _read. The reason is that in certain
+ // synthetic stream cases, such as passthrough streams, _read
+ // may be a completely synchronous operation which may change
+ // the state of the read buffer, providing enough data when
+ // before there was *not* enough.
+ //
+ // So, the steps are:
+ // 1. Figure out what the state of things will be after we do
+ // a read from the buffer.
+ //
+ // 2. If that resulting state will trigger a _read, then call _read.
+ // Note that this may be asynchronous, or synchronous. Yes, it is
+ // deeply ugly to write APIs this way, but that still doesn't mean
+ // that the Readable class should behave improperly, as streams are
+ // designed to be sync/async agnostic.
+ // Take note if the _read call is sync or async (ie, if the read call
+ // has returned yet), so that we know whether or not it's safe to emit
+ // 'readable' etc.
+ //
+ // 3. Actually pull the requested chunks out of the buffer and return.
+
+ // if we need a readable event, then we need to do some reading.
+ var doRead = state.needReadable;
+ debug('need readable', doRead);
+
+ // if we currently have less than the highWaterMark, then also read some
+ if (state.length === 0 || state.length - n < state.highWaterMark) {
+ doRead = true;
+ debug('length less than watermark', doRead);
+ }
+
+ // however, if we've ended, then there's no point, and if we're already
+ // reading, then it's unnecessary.
+ if (state.ended || state.reading) {
+ doRead = false;
+ debug('reading or ended', doRead);
+ } else if (doRead) {
+ debug('do read');
+ state.reading = true;
+ state.sync = true;
+ // if the length is currently zero, then we *need* a readable event.
+ if (state.length === 0)
+ state.needReadable = true;
+ // call internal read method
+ this._read(state.highWaterMark);
+ state.sync = false;
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
+ if (!state.reading)
+ n = howMuchToRead(nOrig, state);
+ }
+
+ var ret;
+ if (n > 0)
+ ret = fromList(n, state);
+ else
+ ret = null;
+
+ if (ret === null) {
+ state.needReadable = true;
+ n = 0;
+ } else {
+ state.length -= n;
+ }
+
+ if (state.length === 0) {
+ // If we have nothing in the buffer, then we want to know
+ // as soon as we *do* get something into the buffer.
+ if (!state.ended)
+ state.needReadable = true;
+
+ // If we tried to read() past the EOF, then emit end on the next tick.
+ if (nOrig !== n && state.ended)
+ endReadable(this);
+ }
+
+ if (ret !== null)
+ this.emit('data', ret);
+
+ return ret;
+};
+
+function chunkInvalid(state, chunk) {
+ var er = null;
+ if (!(chunk instanceof Buffer) &&
+ typeof chunk !== 'string' &&
+ chunk !== null &&
+ chunk !== undefined &&
+ !state.objectMode) {
+ er = new TypeError('Invalid non-string/buffer chunk');
+ }
+ return er;
+}
+
+
+function onEofChunk(stream, state) {
+ if (state.ended) return;
+ if (state.decoder) {
+ var chunk = state.decoder.end();
+ if (chunk && chunk.length) {
+ state.buffer.push(chunk);
+ state.length += state.objectMode ? 1 : chunk.length;
+ }
+ }
+ state.ended = true;
+
+ // emit 'readable' now to make sure it gets picked up.
+ emitReadable(stream);
+}
+
+// Don't emit readable right away in sync mode, because this can trigger
+// another read() call => stack overflow. This way, it might trigger
+// a nextTick recursion warning, but that's not so bad.
+function emitReadable(stream) {
+ var state = stream._readableState;
+ state.needReadable = false;
+ if (!state.emittedReadable) {
+ debug('emitReadable', state.flowing);
+ state.emittedReadable = true;
+ if (state.sync)
+ process.nextTick(emitReadable_, stream);
+ else
+ emitReadable_(stream);
+ }
+}
+
+function emitReadable_(stream) {
+ debug('emit readable');
+ stream.emit('readable');
+ flow(stream);
+}
+
+
+// at this point, the user has presumably seen the 'readable' event,
+// and called read() to consume some data. that may have triggered
+// in turn another _read(n) call, in which case reading = true if
+// it's in progress.
+// However, if we're not ended, or reading, and the length < hwm,
+// then go ahead and try to read some more preemptively.
+function maybeReadMore(stream, state) {
+ if (!state.readingMore) {
+ state.readingMore = true;
+ process.nextTick(maybeReadMore_, stream, state);
+ }
+}
+
+function maybeReadMore_(stream, state) {
+ var len = state.length;
+ while (!state.reading && !state.flowing && !state.ended &&
+ state.length < state.highWaterMark) {
+ debug('maybeReadMore read 0');
+ stream.read(0);
+ if (len === state.length)
+ // didn't get any data, stop spinning.
+ break;
+ else
+ len = state.length;
+ }
+ state.readingMore = false;
+}
+
+// abstract method. to be overridden in specific implementation classes.
+// call cb(er, data) where data is <= n in length.
+// for virtual (non-string, non-buffer) streams, "length" is somewhat
+// arbitrary, and perhaps not very meaningful.
+Readable.prototype._read = function(n) {
+ this.emit('error', new Error('_read() is not implemented'));
+};
+
+Readable.prototype.pipe = function(dest, pipeOpts) {
+ var src = this;
+ var state = this._readableState;
+
+ switch (state.pipesCount) {
+ case 0:
+ state.pipes = dest;
+ break;
+ case 1:
+ state.pipes = [state.pipes, dest];
+ break;
+ default:
+ state.pipes.push(dest);
+ break;
+ }
+ state.pipesCount += 1;
+ debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
+
+ var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
+ dest !== process.stdout &&
+ dest !== process.stderr;
+
+ var endFn = doEnd ? onend : unpipe;
+ if (state.endEmitted)
+ process.nextTick(endFn);
+ else
+ src.once('end', endFn);
+
+ dest.on('unpipe', onunpipe);
+ function onunpipe(readable) {
+ debug('onunpipe');
+ if (readable === src) {
+ cleanup();
+ }
+ }
+
+ function onend() {
+ debug('onend');
+ dest.end();
+ }
+
+ // when the dest drains, it reduces the awaitDrain counter
+ // on the source. This would be more elegant with a .once()
+ // handler in flow(), but adding and removing repeatedly is
+ // too slow.
+ var ondrain = pipeOnDrain(src);
+ dest.on('drain', ondrain);
+
+ var cleanedUp = false;
+ function cleanup() {
+ debug('cleanup');
+ // cleanup event handlers once the pipe is broken
+ dest.removeListener('close', onclose);
+ dest.removeListener('finish', onfinish);
+ dest.removeListener('drain', ondrain);
+ dest.removeListener('error', onerror);
+ dest.removeListener('unpipe', onunpipe);
+ src.removeListener('end', onend);
+ src.removeListener('end', unpipe);
+ src.removeListener('data', ondata);
+
+ cleanedUp = true;
+
+ // if the reader is waiting for a drain event from this
+ // specific writer, then it would cause it to never start
+ // flowing again.
+ // So, if this is awaiting a drain, then we just call it now.
+ // If we don't know, then assume that we are waiting for one.
+ if (state.awaitDrain &&
+ (!dest._writableState || dest._writableState.needDrain))
+ ondrain();
+ }
+
+ // If the user pushes more data while we're writing to dest then we'll end up
+ // in ondata again. However, we only want to increase awaitDrain once because
+ // dest will only emit one 'drain' event for the multiple writes.
+ // => Introduce a guard on increasing awaitDrain.
+ var increasedAwaitDrain = false;
+ src.on('data', ondata);
+ function ondata(chunk) {
+ debug('ondata');
+ increasedAwaitDrain = false;
+ var ret = dest.write(chunk);
+ if (false === ret && !increasedAwaitDrain) {
+ // If the user unpiped during `dest.write()`, it is possible
+ // to get stuck in a permanently paused state if that write
+ // also returned false.
+ // => Check whether `dest` is still a piping destination.
+ if (((state.pipesCount === 1 && state.pipes === dest) ||
+ (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
+ !cleanedUp) {
+ debug('false write response, pause', src._readableState.awaitDrain);
+ src._readableState.awaitDrain++;
+ increasedAwaitDrain = true;
+ }
+ src.pause();
+ }
+ }
+
+ // if the dest has an error, then stop piping into it.
+ // however, don't suppress the throwing behavior for this.
+ function onerror(er) {
+ debug('onerror', er);
+ unpipe();
+ dest.removeListener('error', onerror);
+ if (EE.listenerCount(dest, 'error') === 0)
+ dest.emit('error', er);
+ }
+
+ // Make sure our error handler is attached before userland ones.
+ prependListener(dest, 'error', onerror);
+
+ // Both close and finish should trigger unpipe, but only once.
+ function onclose() {
+ dest.removeListener('finish', onfinish);
+ unpipe();
+ }
+ dest.once('close', onclose);
+ function onfinish() {
+ debug('onfinish');
+ dest.removeListener('close', onclose);
+ unpipe();
+ }
+ dest.once('finish', onfinish);
+
+ function unpipe() {
+ debug('unpipe');
+ src.unpipe(dest);
+ }
+
+ // tell the dest that it's being piped to
+ dest.emit('pipe', src);
+
+ // start the flow if it hasn't been started already.
+ if (!state.flowing) {
+ debug('pipe resume');
+ src.resume();
+ }
+
+ return dest;
+};
+
+function pipeOnDrain(src) {
+ return function() {
+ var state = src._readableState;
+ debug('pipeOnDrain', state.awaitDrain);
+ if (state.awaitDrain)
+ state.awaitDrain--;
+ if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+ state.flowing = true;
+ flow(src);
+ }
+ };
+}
+
+
+Readable.prototype.unpipe = function(dest) {
+ var state = this._readableState;
+
+ // if we're not piping anywhere, then do nothing.
+ if (state.pipesCount === 0)
+ return this;
+
+ // just one destination. most common case.
+ if (state.pipesCount === 1) {
+ // passed in one, but it's not the right one.
+ if (dest && dest !== state.pipes)
+ return this;
+
+ if (!dest)
+ dest = state.pipes;
+
+ // got a match.
+ state.pipes = null;
+ state.pipesCount = 0;
+ state.flowing = false;
+ if (dest)
+ dest.emit('unpipe', this);
+ return this;
+ }
+
+ // slow case. multiple pipe destinations.
+
+ if (!dest) {
+ // remove all.
+ var dests = state.pipes;
+ var len = state.pipesCount;
+ state.pipes = null;
+ state.pipesCount = 0;
+ state.flowing = false;
+
+ for (var i = 0; i < len; i++)
+ dests[i].emit('unpipe', this);
+ return this;
+ }
+
+ // try to find the right one.
+ var index = state.pipes.indexOf(dest);
+ if (index === -1)
+ return this;
+
+ state.pipes.splice(index, 1);
+ state.pipesCount -= 1;
+ if (state.pipesCount === 1)
+ state.pipes = state.pipes[0];
+
+ dest.emit('unpipe', this);
+
+ return this;
+};
+
+// set up data events if they are asked for
+// Ensure readable listeners eventually get something
+Readable.prototype.on = function(ev, fn) {
+ const res = Stream.prototype.on.call(this, ev, fn);
+
+ if (ev === 'data') {
+ // Start flowing on next tick if stream isn't explicitly paused
+ if (this._readableState.flowing !== false)
+ this.resume();
+ } else if (ev === 'readable') {
+ const state = this._readableState;
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.emittedReadable = false;
+ if (!state.reading) {
+ process.nextTick(nReadingNextTick, this);
+ } else if (state.length) {
+ emitReadable(this, state);
+ }
+ }
+ }
+
+ return res;
+};
+Readable.prototype.addListener = Readable.prototype.on;
+
+function nReadingNextTick(self) {
+ debug('readable nexttick read 0');
+ self.read(0);
+}
+
+// pause() and resume() are remnants of the legacy readable stream API
+// If the user uses them, then switch into old mode.
+Readable.prototype.resume = function() {
+ var state = this._readableState;
+ if (!state.flowing) {
+ debug('resume');
+ state.flowing = true;
+ resume(this, state);
+ }
+ return this;
+};
+
+function resume(stream, state) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true;
+ process.nextTick(resume_, stream, state);
+ }
+}
+
+function resume_(stream, state) {
+ if (!state.reading) {
+ debug('resume read 0');
+ stream.read(0);
+ }
+
+ state.resumeScheduled = false;
+ state.awaitDrain = 0;
+ stream.emit('resume');
+ flow(stream);
+ if (state.flowing && !state.reading)
+ stream.read(0);
+}
+
+Readable.prototype.pause = function() {
+ debug('call pause flowing=%j', this._readableState.flowing);
+ if (false !== this._readableState.flowing) {
+ debug('pause');
+ this._readableState.flowing = false;
+ this.emit('pause');
+ }
+ return this;
+};
+
+function flow(stream) {
+ const state = stream._readableState;
+ debug('flow', state.flowing);
+ while (state.flowing && stream.read() !== null);
+}
+
+// wrap an old-style stream as the async data source.
+// This is *not* part of the readable stream interface.
+// It is an ugly unfortunate mess of history.
+Readable.prototype.wrap = function(stream) {
+ var state = this._readableState;
+ var paused = false;
+
+ var self = this;
+ stream.on('end', function() {
+ debug('wrapped end');
+ if (state.decoder && !state.ended) {
+ var chunk = state.decoder.end();
+ if (chunk && chunk.length)
+ self.push(chunk);
+ }
+
+ self.push(null);
+ });
+
+ stream.on('data', function(chunk) {
+ debug('wrapped data');
+ if (state.decoder)
+ chunk = state.decoder.write(chunk);
+
+ // don't skip over falsy values in objectMode
+ if (state.objectMode && (chunk === null || chunk === undefined))
+ return;
+ else if (!state.objectMode && (!chunk || !chunk.length))
+ return;
+
+ var ret = self.push(chunk);
+ if (!ret) {
+ paused = true;
+ stream.pause();
+ }
+ });
+
+ // proxy all the other methods.
+ // important when wrapping filters and duplexes.
+ for (var i in stream) {
+ if (this[i] === undefined && typeof stream[i] === 'function') {
+ this[i] = function(method) {
+ return function() {
+ return stream[method].apply(stream, arguments);
+ };
+ }(i);
+ }
+ }
+
+ // proxy certain important events.
+ for (var n = 0; n < kProxyEvents.length; n++) {
+ stream.on(kProxyEvents[n], self.emit.bind(self, kProxyEvents[n]));
+ }
+
+ // when we try to consume some more bytes, simply unpause the
+ // underlying stream.
+ self._read = function(n) {
+ debug('wrapped _read', n);
+ if (paused) {
+ paused = false;
+ stream.resume();
+ }
+ };
+
+ return self;
+};
+
+
+// exposed for testing purposes only.
+Readable._fromList = fromList;
+
+// Pluck off n bytes from an array of buffers.
+// Length is the combined lengths of all the buffers in the list.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function fromList(n, state) {
+ // nothing buffered
+ if (state.length === 0)
+ return null;
+
+ var ret;
+ if (state.objectMode)
+ ret = state.buffer.shift();
+ else if (!n || n >= state.length) {
+ // read it all, truncate the list
+ if (state.decoder)
+ ret = state.buffer.join('');
+ else if (state.buffer.length === 1)
+ ret = state.buffer.head.data;
+ else
+ ret = state.buffer.concat(state.length);
+ state.buffer.clear();
+ } else {
+ // read part of list
+ ret = fromListPartial(n, state.buffer, state.decoder);
+ }
+
+ return ret;
+}
+
+// Extracts only enough buffered data to satisfy the amount requested.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function fromListPartial(n, list, hasStrings) {
+ var ret;
+ if (n < list.head.data.length) {
+ // slice is the same for buffers and strings
+ ret = list.head.data.slice(0, n);
+ list.head.data = list.head.data.slice(n);
+ } else if (n === list.head.data.length) {
+ // first chunk is a perfect match
+ ret = list.shift();
+ } else {
+ // result spans more than one buffer
+ ret = (hasStrings ?
+ copyFromBufferString(n, list) :
+ copyFromBuffer(n, list));
+ }
+ return ret;
+}
+
+// Copies a specified amount of characters from the list of buffered data
+// chunks.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function copyFromBufferString(n, list) {
+ var p = list.head;
+ var c = 1;
+ var ret = p.data;
+ n -= ret.length;
+ while (p = p.next) {
+ const str = p.data;
+ const nb = (n > str.length ? str.length : n);
+ if (nb === str.length)
+ ret += str;
+ else
+ ret += str.slice(0, n);
+ n -= nb;
+ if (n === 0) {
+ if (nb === str.length) {
+ ++c;
+ if (p.next)
+ list.head = p.next;
+ else
+ list.head = list.tail = null;
+ } else {
+ list.head = p;
+ p.data = str.slice(nb);
+ }
+ break;
+ }
+ ++c;
+ }
+ list.length -= c;
+ return ret;
+}
+
+// Copies a specified amount of bytes from the list of buffered data chunks.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function copyFromBuffer(n, list) {
+ const ret = Buffer.allocUnsafe(n);
+ var p = list.head;
+ var c = 1;
+ p.data.copy(ret);
+ n -= p.data.length;
+ while (p = p.next) {
+ const buf = p.data;
+ const nb = (n > buf.length ? buf.length : n);
+ buf.copy(ret, ret.length - n, 0, nb);
+ n -= nb;
+ if (n === 0) {
+ if (nb === buf.length) {
+ ++c;
+ if (p.next)
+ list.head = p.next;
+ else
+ list.head = list.tail = null;
+ } else {
+ list.head = p;
+ p.data = buf.slice(nb);
+ }
+ break;
+ }
+ ++c;
+ }
+ list.length -= c;
+ return ret;
+}
+
+function endReadable(stream) {
+ var state = stream._readableState;
+
+ // If we get here before consuming all the bytes, then that is a
+ // bug in node. Should never happen.
+ if (state.length > 0)
+ throw new Error('"endReadable()" called on non-empty stream');
+
+ if (!state.endEmitted) {
+ state.ended = true;
+ process.nextTick(endReadableNT, state, stream);
+ }
+}
+
+function endReadableNT(state, stream) {
+ // Check that we didn't get one last unshift.
+ if (!state.endEmitted && state.length === 0) {
+ state.endEmitted = true;
+ stream.readable = false;
+ stream.emit('end');
+ }
+}
diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js
new file mode 100644
index 00000000000000..6700a77efe584d
--- /dev/null
+++ b/lib/internal/streams/transform.js
@@ -0,0 +1,216 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// a transform stream is a readable/writable stream where you do
+// something with the data. Sometimes it's called a "filter",
+// but that's not a great name for it, since that implies a thing where
+// some bits pass through, and others are simply ignored. (That would
+// be a valid example of a transform, of course.)
+//
+// While the output is causally related to the input, it's not a
+// necessarily symmetric or synchronous transformation. For example,
+// a zlib stream might take multiple plain-text writes(), and then
+// emit a single compressed chunk some time in the future.
+//
+// Here's how this works:
+//
+// The Transform stream has all the aspects of the readable and writable
+// stream classes. When you write(chunk), that calls _write(chunk,cb)
+// internally, and returns false if there's a lot of pending writes
+// buffered up. When you call read(), that calls _read(n) until
+// there's enough pending readable data buffered up.
+//
+// In a transform stream, the written data is placed in a buffer. When
+// _read(n) is called, it transforms the queued up data, calling the
+// buffered _write cb's as it consumes chunks. If consuming a single
+// written chunk would result in multiple output chunks, then the first
+// outputted bit calls the readcb, and subsequent chunks just go into
+// the read buffer, and will cause it to emit 'readable' if necessary.
+//
+// This way, back-pressure is actually determined by the reading side,
+// since _read has to be called to start processing a new chunk. However,
+// a pathological inflate type of transform can cause excessive buffering
+// here. For example, imagine a stream where every byte of input is
+// interpreted as an integer from 0-255, and then results in that many
+// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
+// 1kb of data being output. In this case, you could write a very small
+// amount of input, and end up with a very large amount of output. In
+// such a pathological inflating mechanism, there'd be no way to tell
+// the system to stop doing the transform. A single 4MB write could
+// cause the system to run out of memory.
+//
+// However, even in such a pathological case, only a single written chunk
+// would be consumed, and then the rest would wait (un-transformed) until
+// the results of the previous transformed chunk were consumed.
+
+'use strict';
+
+module.exports = Transform;
+
+const Duplex = require('internal/streams/duplex');
+const util = require('util');
+util.inherits(Transform, Duplex);
+
+
+function TransformState(stream) {
+ this.afterTransform = function(er, data) {
+ return afterTransform(stream, er, data);
+ };
+
+ this.needTransform = false;
+ this.transforming = false;
+ this.writecb = null;
+ this.writechunk = null;
+ this.writeencoding = null;
+}
+
+function afterTransform(stream, er, data) {
+ var ts = stream._transformState;
+ ts.transforming = false;
+
+ var cb = ts.writecb;
+
+ if (!cb)
+ return stream.emit('error', new Error('no writecb in Transform class'));
+
+ ts.writechunk = null;
+ ts.writecb = null;
+
+ if (data !== null && data !== undefined)
+ stream.push(data);
+
+ cb(er);
+
+ var rs = stream._readableState;
+ rs.reading = false;
+ if (rs.needReadable || rs.length < rs.highWaterMark) {
+ stream._read(rs.highWaterMark);
+ }
+}
+
+
+function Transform(options) {
+ if (!(this instanceof Transform))
+ return new Transform(options);
+
+ Duplex.call(this, options);
+
+ this._transformState = new TransformState(this);
+
+ var stream = this;
+
+ // start out asking for a readable event once data is transformed.
+ this._readableState.needReadable = true;
+
+ // we have implemented the _read method, and done the other things
+ // that Readable wants before the first _read call, so unset the
+ // sync guard flag.
+ this._readableState.sync = false;
+
+ if (options) {
+ if (typeof options.transform === 'function')
+ this._transform = options.transform;
+
+ if (typeof options.flush === 'function')
+ this._flush = options.flush;
+ }
+
+ // When the writable side finishes, then flush out anything remaining.
+ this.once('prefinish', function() {
+ if (typeof this._flush === 'function')
+ this._flush(function(er, data) {
+ done(stream, er, data);
+ });
+ else
+ done(stream);
+ });
+}
+
+Transform.prototype.push = function(chunk, encoding) {
+ this._transformState.needTransform = false;
+ return Duplex.prototype.push.call(this, chunk, encoding);
+};
+
+// This is the part where you do stuff!
+// override this function in implementation classes.
+// 'chunk' is an input chunk.
+//
+// Call `push(newChunk)` to pass along transformed output
+// to the readable side. You may call 'push' zero or more times.
+//
+// Call `cb(err)` when you are done with this chunk. If you pass
+// an error, then that'll put the hurt on the whole operation. If you
+// never call cb(), then you'll never get another chunk.
+Transform.prototype._transform = function(chunk, encoding, cb) {
+ throw new Error('_transform() is not implemented');
+};
+
+Transform.prototype._write = function(chunk, encoding, cb) {
+ var ts = this._transformState;
+ ts.writecb = cb;
+ ts.writechunk = chunk;
+ ts.writeencoding = encoding;
+ if (!ts.transforming) {
+ var rs = this._readableState;
+ if (ts.needTransform ||
+ rs.needReadable ||
+ rs.length < rs.highWaterMark)
+ this._read(rs.highWaterMark);
+ }
+};
+
+// Doesn't matter what the args are here.
+// _transform does all the work.
+// That we got here means that the readable side wants more data.
+Transform.prototype._read = function(n) {
+ var ts = this._transformState;
+
+ if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
+ ts.transforming = true;
+ this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
+ } else {
+ // mark that we need a transform, so that any data that comes in
+ // will get processed, now that we've asked for it.
+ ts.needTransform = true;
+ }
+};
+
+
+function done(stream, er, data) {
+ if (er)
+ return stream.emit('error', er);
+
+ if (data !== null && data !== undefined)
+ stream.push(data);
+
+ // if there's nothing in the write buffer, then that means
+ // that nothing more will ever be provided
+ var ws = stream._writableState;
+ var ts = stream._transformState;
+
+ if (ws.length)
+ throw new Error('Calling transform done when ws.length != 0');
+
+ if (ts.transforming)
+ throw new Error('Calling transform done when still transforming');
+
+ return stream.push(null);
+}
diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js
new file mode 100644
index 00000000000000..a0a37fbb7a0421
--- /dev/null
+++ b/lib/internal/streams/writable.js
@@ -0,0 +1,565 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// A bit simpler than readable streams.
+// Implement an async ._write(chunk, encoding, cb), and it'll handle all
+// the drain event emission and buffering.
+
+'use strict';
+
+module.exports = Writable;
+Writable.WritableState = WritableState;
+
+const util = require('util');
+const internalUtil = require('internal/util');
+const Stream = require('stream');
+const Buffer = require('buffer').Buffer;
+
+util.inherits(Writable, Stream);
+
+function nop() {}
+
+function WritableState(options, stream) {
+ options = options || {};
+
+ // object stream flag to indicate whether or not this stream
+ // contains buffers or objects.
+ this.objectMode = !!options.objectMode;
+
+ if (stream instanceof Stream.Duplex)
+ this.objectMode = this.objectMode || !!options.writableObjectMode;
+
+ // the point at which write() starts returning false
+ // Note: 0 is a valid value, means that we always return false if
+ // the entire buffer is not flushed immediately on write()
+ var hwm = options.highWaterMark;
+ var defaultHwm = this.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
+
+ // cast to ints.
+ this.highWaterMark = ~~this.highWaterMark;
+
+ // drain event flag.
+ this.needDrain = false;
+ // at the start of calling end()
+ this.ending = false;
+ // when end() has been called, and returned
+ this.ended = false;
+ // when 'finish' is emitted
+ this.finished = false;
+
+ // should we decode strings into buffers before passing to _write?
+ // this is here so that some node-core streams can optimize string
+ // handling at a lower level.
+ var noDecode = options.decodeStrings === false;
+ this.decodeStrings = !noDecode;
+
+ // Crypto is kind of old and crusty. Historically, its default string
+ // encoding is 'binary' so we have to make this configurable.
+ // Everything else in the universe uses 'utf8', though.
+ this.defaultEncoding = options.defaultEncoding || 'utf8';
+
+ // not an actual buffer we keep track of, but a measurement
+ // of how much we're waiting to get pushed to some underlying
+ // socket or file.
+ this.length = 0;
+
+ // a flag to see when we're in the middle of a write.
+ this.writing = false;
+
+ // when true all writes will be buffered until .uncork() call
+ this.corked = 0;
+
+ // a flag to be able to tell if the onwrite cb is called immediately,
+ // or on a later tick. We set this to true at first, because any
+ // actions that shouldn't happen until "later" should generally also
+ // not happen before the first write call.
+ this.sync = true;
+
+ // a flag to know if we're processing previously buffered items, which
+ // may call the _write() callback in the same tick, so that we don't
+ // end up in an overlapped onwrite situation.
+ this.bufferProcessing = false;
+
+ // the callback that's passed to _write(chunk,cb)
+ this.onwrite = onwrite.bind(undefined, stream);
+
+ // the callback that the user supplies to write(chunk,encoding,cb)
+ this.writecb = null;
+
+ // the amount that is being written when _write is called.
+ this.writelen = 0;
+
+ this.bufferedRequest = null;
+ this.lastBufferedRequest = null;
+
+ // number of pending user-supplied write callbacks
+ // this must be 0 before 'finish' can be emitted
+ this.pendingcb = 0;
+
+ // emit prefinish if the only thing we're waiting for is _write cbs
+ // This is relevant for synchronous Transform streams
+ this.prefinished = false;
+
+ // True if the error was already emitted and should not be thrown again
+ this.errorEmitted = false;
+
+ // count buffered requests
+ this.bufferedRequestCount = 0;
+
+ // allocate the first CorkedRequest, there is always
+ // one allocated and free to use, and we maintain at most two
+ var corkReq = { next: null, entry: null, finish: undefined };
+ corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
+ this.corkedRequestsFree = corkReq;
+}
+
+WritableState.prototype.getBuffer = function getBuffer() {
+ var current = this.bufferedRequest;
+ var out = [];
+ while (current) {
+ out.push(current);
+ current = current.next;
+ }
+ return out;
+};
+
+Object.defineProperty(WritableState.prototype, 'buffer', {
+ get: internalUtil.deprecate(function() {
+ return this.getBuffer();
+ }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
+ 'instead.', 'DEP0003')
+});
+
+// Test _writableState for inheritance to account for Duplex streams,
+// whose prototype chain only points to Readable.
+var realHasInstance;
+if (typeof Symbol === 'function' && Symbol.hasInstance) {
+ realHasInstance = Function.prototype[Symbol.hasInstance];
+ Object.defineProperty(Writable, Symbol.hasInstance, {
+ value: function(object) {
+ if (realHasInstance.call(this, object))
+ return true;
+
+ return object && object._writableState instanceof WritableState;
+ }
+ });
+} else {
+ realHasInstance = function(object) {
+ return object instanceof this;
+ };
+}
+
+function Writable(options) {
+ // Writable ctor is applied to Duplexes, too.
+ // `realHasInstance` is necessary because using plain `instanceof`
+ // would return false, as no `_writableState` property is attached.
+
+ // Trying to use the custom `instanceof` for Writable here will also break the
+ // Node.js LazyTransform implementation, which has a non-trivial getter for
+ // `_writableState` that would lead to infinite recursion.
+ if (!(realHasInstance.call(Writable, this)) &&
+ !(this instanceof Stream.Duplex)) {
+ return new Writable(options);
+ }
+
+ this._writableState = new WritableState(options, this);
+
+ // legacy.
+ this.writable = true;
+
+ if (options) {
+ if (typeof options.write === 'function')
+ this._write = options.write;
+
+ if (typeof options.writev === 'function')
+ this._writev = options.writev;
+ }
+
+ Stream.call(this);
+}
+
+// Otherwise people can pipe Writable streams, which is just wrong.
+Writable.prototype.pipe = function() {
+ this.emit('error', new Error('Cannot pipe, not readable'));
+};
+
+
+function writeAfterEnd(stream, cb) {
+ var er = new Error('write after end');
+ // TODO: defer error events consistently everywhere, not just the cb
+ stream.emit('error', er);
+ process.nextTick(cb, er);
+}
+
+// Checks that a user-supplied chunk is valid, especially for the particular
+// mode the stream is in. Currently this means that `null` is never accepted
+// and undefined/non-string values are only allowed in object mode.
+function validChunk(stream, state, chunk, cb) {
+ var valid = true;
+ var er = false;
+
+ if (chunk === null) {
+ er = new TypeError('May not write null values to stream');
+ } else if (typeof chunk !== 'string' &&
+ chunk !== undefined &&
+ !state.objectMode) {
+ er = new TypeError('Invalid non-string/buffer chunk');
+ }
+ if (er) {
+ stream.emit('error', er);
+ process.nextTick(cb, er);
+ valid = false;
+ }
+ return valid;
+}
+
+Writable.prototype.write = function(chunk, encoding, cb) {
+ var state = this._writableState;
+ var ret = false;
+ var isBuf = (chunk instanceof Buffer);
+
+ if (typeof encoding === 'function') {
+ cb = encoding;
+ encoding = null;
+ }
+
+ if (isBuf)
+ encoding = 'buffer';
+ else if (!encoding)
+ encoding = state.defaultEncoding;
+
+ if (typeof cb !== 'function')
+ cb = nop;
+
+ if (state.ended)
+ writeAfterEnd(this, cb);
+ else if (isBuf || validChunk(this, state, chunk, cb)) {
+ state.pendingcb++;
+ ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
+ }
+
+ return ret;
+};
+
+Writable.prototype.cork = function() {
+ var state = this._writableState;
+
+ state.corked++;
+};
+
+Writable.prototype.uncork = function() {
+ var state = this._writableState;
+
+ if (state.corked) {
+ state.corked--;
+
+ if (!state.writing &&
+ !state.corked &&
+ !state.finished &&
+ !state.bufferProcessing &&
+ state.bufferedRequest)
+ clearBuffer(this, state);
+ }
+};
+
+Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === 'string')
+ encoding = encoding.toLowerCase();
+ if (!Buffer.isEncoding(encoding))
+ throw new TypeError('Unknown encoding: ' + encoding);
+ this._writableState.defaultEncoding = encoding;
+ return this;
+};
+
+function decodeChunk(state, chunk, encoding) {
+ if (!state.objectMode &&
+ state.decodeStrings !== false &&
+ typeof chunk === 'string') {
+ chunk = Buffer.from(chunk, encoding);
+ }
+ return chunk;
+}
+
+// if we're already writing something, then just put this
+// in the queue, and wait our turn. Otherwise, call _write
+// If we return false, then we need a drain event, so set that flag.
+function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
+ if (!isBuf) {
+ var newChunk = decodeChunk(state, chunk, encoding);
+ if (chunk !== newChunk) {
+ encoding = 'buffer';
+ chunk = newChunk;
+ }
+ }
+ var len = state.objectMode ? 1 : chunk.length;
+
+ state.length += len;
+
+ var ret = state.length < state.highWaterMark;
+ // we must ensure that previous needDrain will not be reset to false.
+ if (!ret)
+ state.needDrain = true;
+
+ if (state.writing || state.corked) {
+ var last = state.lastBufferedRequest;
+ state.lastBufferedRequest = { chunk, encoding, callback: cb, next: null };
+ if (last) {
+ last.next = state.lastBufferedRequest;
+ } else {
+ state.bufferedRequest = state.lastBufferedRequest;
+ }
+ state.bufferedRequestCount += 1;
+ } else {
+ doWrite(stream, state, false, len, chunk, encoding, cb);
+ }
+
+ return ret;
+}
+
+function doWrite(stream, state, writev, len, chunk, encoding, cb) {
+ state.writelen = len;
+ state.writecb = cb;
+ state.writing = true;
+ state.sync = true;
+ if (writev)
+ stream._writev(chunk, state.onwrite);
+ else
+ stream._write(chunk, encoding, state.onwrite);
+ state.sync = false;
+}
+
+function onwriteError(stream, state, sync, er, cb) {
+ --state.pendingcb;
+ if (sync)
+ process.nextTick(cb, er);
+ else
+ cb(er);
+
+ stream._writableState.errorEmitted = true;
+ stream.emit('error', er);
+}
+
+function onwriteStateUpdate(state) {
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+}
+
+function onwrite(stream, er) {
+ var state = stream._writableState;
+ var sync = state.sync;
+ var cb = state.writecb;
+
+ onwriteStateUpdate(state);
+
+ if (er)
+ onwriteError(stream, state, sync, er, cb);
+ else {
+ // Check if we're actually ready to finish, but don't emit yet
+ var finished = needFinish(state);
+
+ if (!finished &&
+ !state.corked &&
+ !state.bufferProcessing &&
+ state.bufferedRequest) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ process.nextTick(afterWrite, stream, state, finished, cb);
+ } else {
+ afterWrite(stream, state, finished, cb);
+ }
+ }
+}
+
+function afterWrite(stream, state, finished, cb) {
+ if (!finished)
+ onwriteDrain(stream, state);
+ state.pendingcb--;
+ cb();
+ finishMaybe(stream, state);
+}
+
+// Must force callback to be called on nextTick, so that we don't
+// emit 'drain' before the write() consumer gets the 'false' return
+// value, and has a chance to attach a 'drain' listener.
+function onwriteDrain(stream, state) {
+ if (state.length === 0 && state.needDrain) {
+ state.needDrain = false;
+ stream.emit('drain');
+ }
+}
+
+// if there's something in the buffer waiting, then process it
+function clearBuffer(stream, state) {
+ state.bufferProcessing = true;
+ var entry = state.bufferedRequest;
+
+ if (stream._writev && entry && entry.next) {
+ // Fast case, write everything using _writev()
+ var l = state.bufferedRequestCount;
+ var buffer = new Array(l);
+ var holder = state.corkedRequestsFree;
+ holder.entry = entry;
+
+ var count = 0;
+ while (entry) {
+ buffer[count] = entry;
+ entry = entry.next;
+ count += 1;
+ }
+
+ doWrite(stream, state, true, state.length, buffer, '', holder.finish);
+
+ // doWrite is almost always async, defer these to save a bit of time
+ // as the hot path ends with doWrite
+ state.pendingcb++;
+ state.lastBufferedRequest = null;
+ if (holder.next) {
+ state.corkedRequestsFree = holder.next;
+ holder.next = null;
+ } else {
+ var corkReq = { next: null, entry: null, finish: undefined };
+ corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
+ state.corkedRequestsFree = corkReq;
+ }
+ } else {
+ // Slow case, write chunks one-by-one
+ while (entry) {
+ var chunk = entry.chunk;
+ var encoding = entry.encoding;
+ var cb = entry.callback;
+ var len = state.objectMode ? 1 : chunk.length;
+
+ doWrite(stream, state, false, len, chunk, encoding, cb);
+ entry = entry.next;
+ // if we didn't call the onwrite immediately, then
+ // it means that we need to wait until it does.
+ // also, that means that the chunk and cb are currently
+ // being processed, so move the buffer counter past them.
+ if (state.writing) {
+ break;
+ }
+ }
+
+ if (entry === null)
+ state.lastBufferedRequest = null;
+ }
+
+ state.bufferedRequestCount = 0;
+ state.bufferedRequest = entry;
+ state.bufferProcessing = false;
+}
+
+Writable.prototype._write = function(chunk, encoding, cb) {
+ cb(new Error('_write() is not implemented'));
+};
+
+Writable.prototype._writev = null;
+
+Writable.prototype.end = function(chunk, encoding, cb) {
+ var state = this._writableState;
+
+ if (typeof chunk === 'function') {
+ cb = chunk;
+ chunk = null;
+ encoding = null;
+ } else if (typeof encoding === 'function') {
+ cb = encoding;
+ encoding = null;
+ }
+
+ if (chunk !== null && chunk !== undefined)
+ this.write(chunk, encoding);
+
+ // .end() fully uncorks
+ if (state.corked) {
+ state.corked = 1;
+ this.uncork();
+ }
+
+ // ignore unnecessary end() calls.
+ if (!state.ending && !state.finished)
+ endWritable(this, state, cb);
+};
+
+
+function needFinish(state) {
+ return (state.ending &&
+ state.length === 0 &&
+ state.bufferedRequest === null &&
+ !state.finished &&
+ !state.writing);
+}
+
+function prefinish(stream, state) {
+ if (!state.prefinished) {
+ state.prefinished = true;
+ stream.emit('prefinish');
+ }
+}
+
+function finishMaybe(stream, state) {
+ var need = needFinish(state);
+ if (need) {
+ if (state.pendingcb === 0) {
+ prefinish(stream, state);
+ state.finished = true;
+ stream.emit('finish');
+ } else {
+ prefinish(stream, state);
+ }
+ }
+ return need;
+}
+
+function endWritable(stream, state, cb) {
+ state.ending = true;
+ finishMaybe(stream, state);
+ if (cb) {
+ if (state.finished)
+ process.nextTick(cb);
+ else
+ stream.once('finish', cb);
+ }
+ state.ended = true;
+ stream.writable = false;
+}
+
+function onCorkedFinish(corkReq, state, err) {
+ var entry = corkReq.entry;
+ corkReq.entry = null;
+ while (entry) {
+ var cb = entry.callback;
+ state.pendingcb--;
+ cb(err);
+ entry = entry.next;
+ }
+ if (state.corkedRequestsFree) {
+ state.corkedRequestsFree.next = corkReq;
+ } else {
+ state.corkedRequestsFree = corkReq;
+ }
+}
diff --git a/lib/stream.js b/lib/stream.js
index dca4f50fc09eac..6674a0e15c7c2a 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -25,11 +25,11 @@
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
-Stream.Readable = require('_stream_readable');
-Stream.Writable = require('_stream_writable');
-Stream.Duplex = require('_stream_duplex');
-Stream.Transform = require('_stream_transform');
-Stream.PassThrough = require('_stream_passthrough');
+Stream.Readable = require('internal/streams/readable');
+Stream.Writable = require('internal/streams/writable');
+Stream.Duplex = require('internal/streams/duplex');
+Stream.Transform = require('internal/streams/transform');
+Stream.PassThrough = require('internal/streams/passthrough');
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;
diff --git a/lib/zlib.js b/lib/zlib.js
index 00be56dffc44fb..b6162366e99ced 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -23,7 +23,7 @@
const Buffer = require('buffer').Buffer;
const internalUtil = require('internal/util');
-const Transform = require('_stream_transform');
+const Transform = require('internal/streams/transform');
const binding = process.binding('zlib');
const assert = require('assert').ok;
const kMaxLength = require('buffer').kMaxLength;
diff --git a/node.gyp b/node.gyp
index c20b07a66c6ea4..28a7011d862f58 100644
--- a/node.gyp
+++ b/node.gyp
@@ -101,6 +101,11 @@
'lib/internal/util.js',
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
+ 'lib/internal/streams/readable.js',
+ 'lib/internal/streams/writable.js',
+ 'lib/internal/streams/duplex.js',
+ 'lib/internal/streams/transform.js',
+ 'lib/internal/streams/passthrough.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
diff --git a/test/message/stdin_messages.out b/test/message/stdin_messages.out
index ad1688f15d09b8..f2c811c883e823 100644
--- a/test/message/stdin_messages.out
+++ b/test/message/stdin_messages.out
@@ -11,7 +11,7 @@ SyntaxError: Strict mode code may not include a with statement
at Socket. (bootstrap_node.js:*:*)
at emitNone (events.js:*:*)
at Socket.emit (events.js:*:*)
- at endReadableNT (_stream_readable.js:*:*)
+ at endReadableNT (internal/streams/readable.js:*:*)
at _combinedTickCallback (internal/process/next_tick.js:*:*)
42
42
@@ -29,7 +29,7 @@ Error: hello
at Socket. (bootstrap_node.js:*:*)
at emitNone (events.js:*:*)
at Socket.emit (events.js:*:*)
- at endReadableNT (_stream_readable.js:*:*)
+ at endReadableNT (internal/streams/readable.js:*:*)
[stdin]:1
throw new Error("hello")
^
@@ -44,7 +44,7 @@ Error: hello
at Socket. (bootstrap_node.js:*:*)
at emitNone (events.js:*:*)
at Socket.emit (events.js:*:*)
- at endReadableNT (_stream_readable.js:*:*)
+ at endReadableNT (internal/streams/readable.js:*:*)
100
[stdin]:1
var x = 100; y = x;
@@ -60,7 +60,7 @@ ReferenceError: y is not defined
at Socket. (bootstrap_node.js:*:*)
at emitNone (events.js:*:*)
at Socket.emit (events.js:*:*)
- at endReadableNT (_stream_readable.js:*:*)
+ at endReadableNT (internal/streams/readable.js:*:*)
[stdin]:1
var ______________________________________________; throw 10
diff --git a/test/parallel/test-stream-internals-usage-deprecation.js b/test/parallel/test-stream-internals-usage-deprecation.js
new file mode 100644
index 00000000000000..bac0b38750c781
--- /dev/null
+++ b/test/parallel/test-stream-internals-usage-deprecation.js
@@ -0,0 +1,35 @@
+// Flags: --no-warnings
+// The flag suppresses stderr output but the warning event will still emit
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+
+const mods = [
+ '_stream_readable',
+ '_stream_writable',
+ '_stream_duplex',
+ '_stream_transform',
+ '_stream_passthrough'
+];
+
+// we cannot use a forEach, the warning is emitted asynchronously
+function test() {
+ const mod = mods.shift();
+
+ if (!mod) {
+ return;
+ }
+
+ process.once('warning', common.mustCall((warning) => {
+ assert.ok(warning instanceof Error);
+ assert.strictEqual(warning.message,
+ `The ${mod}.js module is deprecated. Please use stream`);
+ assert.strictEqual(warning.code, 'DEP00XX');
+ test();
+ }));
+
+ require(mod);
+}
+
+test();
diff --git a/test/parallel/test-stream-pipe-after-end.js b/test/parallel/test-stream-pipe-after-end.js
index 02792b44554348..bd4179c0da59fb 100644
--- a/test/parallel/test-stream-pipe-after-end.js
+++ b/test/parallel/test-stream-pipe-after-end.js
@@ -22,8 +22,8 @@
'use strict';
const common = require('../common');
const assert = require('assert');
-const Readable = require('_stream_readable');
-const Writable = require('_stream_writable');
+const Readable = require('stream').Readable;
+const Writable = require('stream').Writable;
const util = require('util');
util.inherits(TestReadable, Readable);
diff --git a/test/parallel/test-stream2-base64-single-char-read-end.js b/test/parallel/test-stream2-base64-single-char-read-end.js
index 2e400c2f97ec9c..e210bad44f0f91 100644
--- a/test/parallel/test-stream2-base64-single-char-read-end.js
+++ b/test/parallel/test-stream2-base64-single-char-read-end.js
@@ -20,9 +20,10 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
+
require('../common');
-const R = require('_stream_readable');
-const W = require('_stream_writable');
+const R = require('stream').Readable;
+const W = require('stream').Writable;
const assert = require('assert');
const src = new R({encoding: 'base64'});
diff --git a/test/parallel/test-stream2-compatibility.js b/test/parallel/test-stream2-compatibility.js
index 45834ee99e5961..c14774d1eccade 100644
--- a/test/parallel/test-stream2-compatibility.js
+++ b/test/parallel/test-stream2-compatibility.js
@@ -21,8 +21,8 @@
'use strict';
require('../common');
-const R = require('_stream_readable');
-const W = require('_stream_writable');
+const R = require('stream').Readable;
+const W = require('stream').Writable;
const assert = require('assert');
const util = require('util');
diff --git a/test/parallel/test-stream2-decode-partial.js b/test/parallel/test-stream2-decode-partial.js
index b43101dbc84c3a..08b1eb3833207b 100644
--- a/test/parallel/test-stream2-decode-partial.js
+++ b/test/parallel/test-stream2-decode-partial.js
@@ -1,6 +1,6 @@
'use strict';
require('../common');
-const Readable = require('_stream_readable');
+const Readable = require('stream').Readable;
const assert = require('assert');
let buf = '';
diff --git a/test/parallel/test-stream2-objects.js b/test/parallel/test-stream2-objects.js
index 159286a4322257..9989c8fada119c 100644
--- a/test/parallel/test-stream2-objects.js
+++ b/test/parallel/test-stream2-objects.js
@@ -22,8 +22,8 @@
'use strict';
const common = require('../common');
-const Readable = require('_stream_readable');
-const Writable = require('_stream_writable');
+const Readable = require('stream').Readable;
+const Writable = require('stream').Writable;
const assert = require('assert');
// tiny node-tap lookalike.
diff --git a/test/parallel/test-stream2-readable-from-list.js b/test/parallel/test-stream2-readable-from-list.js
index 6f677ce8a4155c..0598f6d85a4660 100644
--- a/test/parallel/test-stream2-readable-from-list.js
+++ b/test/parallel/test-stream2-readable-from-list.js
@@ -23,7 +23,7 @@
'use strict';
require('../common');
const assert = require('assert');
-const fromList = require('_stream_readable')._fromList;
+const fromList = require('stream').Readable._fromList;
const BufferList = require('internal/streams/BufferList');
// tiny node-tap lookalike.
diff --git a/test/parallel/test-stream2-readable-non-empty-end.js b/test/parallel/test-stream2-readable-non-empty-end.js
index 4299f31ea12986..781b28ff63d437 100644
--- a/test/parallel/test-stream2-readable-non-empty-end.js
+++ b/test/parallel/test-stream2-readable-non-empty-end.js
@@ -22,7 +22,7 @@
'use strict';
const common = require('../common');
const assert = require('assert');
-const Readable = require('_stream_readable');
+const Readable = require('stream').Readable;
let len = 0;
const chunks = new Array(10);
diff --git a/test/parallel/test-stream2-readable-wrap-empty.js b/test/parallel/test-stream2-readable-wrap-empty.js
index 39bc509a394022..66711fc1e44026 100644
--- a/test/parallel/test-stream2-readable-wrap-empty.js
+++ b/test/parallel/test-stream2-readable-wrap-empty.js
@@ -22,7 +22,7 @@
'use strict';
const common = require('../common');
-const Readable = require('_stream_readable');
+const Readable = require('stream').Readable;
const EE = require('events').EventEmitter;
const oldStream = new EE();
diff --git a/test/parallel/test-stream2-readable-wrap.js b/test/parallel/test-stream2-readable-wrap.js
index fe9e8ce30dcf93..f429b36efa66ca 100644
--- a/test/parallel/test-stream2-readable-wrap.js
+++ b/test/parallel/test-stream2-readable-wrap.js
@@ -22,8 +22,8 @@
'use strict';
const common = require('../common');
const assert = require('assert');
-const Readable = require('_stream_readable');
-const Writable = require('_stream_writable');
+const Readable = require('stream').Readable;
+const Writable = require('stream').Writable;
const EE = require('events').EventEmitter;
function runTest(highWaterMark, objectMode, produce) {
diff --git a/test/parallel/test-stream2-set-encoding.js b/test/parallel/test-stream2-set-encoding.js
index 08d9298c6d2138..a53ac6b8688b1e 100644
--- a/test/parallel/test-stream2-set-encoding.js
+++ b/test/parallel/test-stream2-set-encoding.js
@@ -22,7 +22,7 @@
'use strict';
require('../common');
const assert = require('assert');
-const R = require('_stream_readable');
+const R = require('stream').Readable;
const util = require('util');
// tiny node-tap lookalike.
diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js
index 6e59d2d1787bf2..ec97083ac8a127 100644
--- a/test/parallel/test-stream2-transform.js
+++ b/test/parallel/test-stream2-transform.js
@@ -22,8 +22,8 @@
'use strict';
require('../common');
const assert = require('assert');
-const PassThrough = require('_stream_passthrough');
-const Transform = require('_stream_transform');
+const PassThrough = require('stream').PassThrough;
+const Transform = require('stream').Transform;
// tiny node-tap lookalike.
const tests = [];
diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js
index c983a26c400b91..f872fb397be14b 100644
--- a/test/parallel/test-stream2-writable.js
+++ b/test/parallel/test-stream2-writable.js
@@ -22,8 +22,8 @@
'use strict';
const common = require('../common');
-const W = require('_stream_writable');
-const D = require('_stream_duplex');
+const W = require('stream').Writable;
+const D = require('stream').Duplex;
const assert = require('assert');
const util = require('util');