Skip to content

Pipe to multiple writable streams doesn't throttle and may blow the memory (v4) #6491

@guymguym

Description

@guymguym
  • Version: v4.4.3
  • Platform: Darwin 15.4.0 Darwin Kernel Version 15.4.0
  • Subsystem: stream

Hi
See below the code to reproduce and the output.
With node v0.10 or >=v5.11 the issue doesn't happen.
I think this is the commit that fixed it - #6023
Should it be backported to v4 LTS?

The test case is when piping from single readable to multiple writable streams.
One writable stream is a realistic one that has some delay to process the data.
The other writable stream is used for progress reporting, so calls its callback immediately.
With nodejs v4 the behavior is that adding a fast writable pipe will consume the reader completely into memory which can blow the process.

Thanks!
Guy

The code to reproduce:

'use strict';

var stream = require('stream');

if (require.main === module) {
    main();
}

function main() {

    console.log('');
    console.log('===========');
    console.log('NO PROGRESS');
    console.log('===========');
    test('none', function() {

        console.log('');
        console.log('=======================');
        console.log('PROGRESS WITH TRANSFORM');
        console.log('=======================');
        test('transform', function() {

            console.log('');
            console.log('======================');
            console.log('PROGRESS WITH WRITABLE');
            console.log('======================');
            test('writable', function() {});
        });
    });
}

function test(progress_mode, callback) {
    var nr = 0;
    var nw = 0;
    var HIDDEN_KEY = 'tralala##';
    var CHUNK_SIZE = 16 * 1024;

    var input = new stream.Readable({
        highWaterMark: CHUNK_SIZE
    });
    input._read = function() {
        if (nr >= 20) {
            console.log('Read: done');
            this.push(null);
            return;
        }
        // set a special property on the buffer so writer can verify if copied
        var buf = new Buffer(CHUNK_SIZE);
        buf[HIDDEN_KEY + nr] = buf;
        buf.fill(nr % 256);
        this.push(buf);
        nr += 1;
    };

    var output = new stream.Writable();
    output._write = function(data, encoding, callback) {
        // check that the buffer has our special property set by the reader
        if (data[HIDDEN_KEY + nw] !== data) {
            console.error('DATA GOT COPIED... AAAAAAAAAAAAHHHHH !!!', nw);
        }
        // check how much readhead occured
        var readahead = nr - nw;
        if (readahead > 3) {
            console.error('TOO MUCH READAHEAD', readahead);
        } else {
            console.log('Readahead', readahead);
        }
        nw += 1;
        // slow down the writes
        setTimeout(callback, 10);
    };

    if (progress_mode === 'transform') {
        var progress_transform = new stream.Transform({
            highWaterMark: 0
        });
        progress_transform._transform = function(data, encoding, callback) {
            callback(null, data);
        };
        input.pipe(progress_transform).pipe(output);
    } else if (progress_mode === 'writable') {
        var progress_writable = new stream.Writable({
            highWaterMark: 0
        });
        progress_writable._write = function(data, encoding, callback) {
            callback();
        };
        input.pipe(progress_writable);
        input.pipe(output);
    } else {
        input.pipe(output);
    }

    output.on('finish', callback);
}

Here is the output:

$ node progress_stream.js 

===========
NO PROGRESS
===========
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Readahead 2
Read: done
Readahead 1

=======================
PROGRESS WITH TRANSFORM
=======================
Readahead 2
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Readahead 3
Read: done
Readahead 2
Readahead 1

======================
PROGRESS WITH WRITABLE
======================
Readahead 2
Read: done
TOO MUCH READAHEAD 19
TOO MUCH READAHEAD 18
TOO MUCH READAHEAD 17
TOO MUCH READAHEAD 16
TOO MUCH READAHEAD 15
TOO MUCH READAHEAD 14
TOO MUCH READAHEAD 13
TOO MUCH READAHEAD 12
TOO MUCH READAHEAD 11
TOO MUCH READAHEAD 10
TOO MUCH READAHEAD 9
TOO MUCH READAHEAD 8
TOO MUCH READAHEAD 7
TOO MUCH READAHEAD 6
TOO MUCH READAHEAD 5
TOO MUCH READAHEAD 4
Readahead 3
Readahead 2
Readahead 1

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionIssues that look for answers.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions