diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 1753996a4f85de..32c2a983a4c082 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -13,6 +13,15 @@ function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } +function isStreamEmittingClose(stream) { + if (stream._writableState && !stream._writableState.emitClose) + return false; + if (stream._readableState && !stream._readableState.emitClose) + return false; + + return true; +} + function eos(stream, opts, callback) { if (arguments.length === 2) { callback = opts; @@ -71,6 +80,18 @@ function eos(stream, opts, callback) { stream.req.on('finish', onfinish); }; + const isEmittingClose = isStreamEmittingClose(stream); + + if (!isEmittingClose) { + const _destroy = stream._destroy; + stream._destroy = function(err, cb) { + _destroy.call(stream, err, (_err) => { + if (!_err) process.nextTick(() => callback.call(stream, _err)); + cb(_err); + }); + }; + } + if (isRequest(stream)) { stream.on('complete', onfinish); stream.on('abort', onclose); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index fe87f246327a3a..8672b7b3892cee 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -5,6 +5,7 @@ const { Writable, Readable, Transform, finished } = require('stream'); const assert = require('assert'); const fs = require('fs'); const { promisify } = require('util'); +const Countdown = require('../common/countdown'); { const rs = new Readable({ @@ -175,3 +176,52 @@ const { promisify } = require('util'); rs.push(null); rs.resume(); } + +{ + const rs = new Readable({ + emitClose: false, + read() {} + }); + + finished(rs, common.mustCall((err) => { + assert(!err, 'no error'); + })); + + setImmediate(() => rs.destroy()); +} + +{ + const ws = new Writable({ + emitClose: false, + write(data, enc, cb) { + cb(); + } + }); + + finished(ws, common.mustCall((err) => { + assert(!err, 'no error'); + })); + + setImmediate(() => ws.destroy()); +} + +{ + const countdown = new Countdown(1, common.mustCall()); + const rs = new Readable({ + emitClose: false, + read() {}, + destroy(err, cb) { + setImmediate(() => { + countdown.dec(); + cb(err); + }); + } + }); + + finished(rs, common.mustCall((err) => { + assert(!err, 'no error'); + assert(!countdown.remaining, 'destroy should be called first'); + })); + + setImmediate(() => rs.destroy()); +} diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ef5a39fddd881f..d078e868832004 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -5,6 +5,7 @@ const { Stream, Writable, Readable, Transform, pipeline } = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const Countdown = require('../common/countdown'); { let finished = false; @@ -477,3 +478,49 @@ const { promisify } = require('util'); { code: 'ERR_INVALID_CALLBACK' } ); } + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + emitClose: false, + write(data, enc, cb) { + cb(); + } + }); + + setImmediate(() => read.destroy()); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(err, 'should have an error'); + })); +} + +{ + const countdown = new Countdown(1, common.mustCall()); + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + emitClose: false, + write(data, enc, cb) { + cb(); + }, + destroy(err, cb) { + setImmediate(() => { + countdown.dec(); + cb(err); + }); + } + }); + + setImmediate(() => read.destroy()); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(err, 'should have an error'); + assert(!countdown.remaining, 'destroy should be called first'); + })); +}