From ee5a19a8243dde89702fb21c5b0d5f56b8ec1330 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 28 Sep 2019 10:24:56 +0200 Subject: [PATCH 1/9] stream: always invoke end callback Ensure that the callback passed into end() is always invoke in order to avoid bug such as deadlock the user. --- lib/_stream_writable.js | 15 +++++- .../test-stream-writable-end-cb-error.js | 48 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-stream-writable-end-cb-error.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9b75b672cbd843..6a1f194051c6db 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -606,6 +606,11 @@ Writable.prototype.end = function(chunk, encoding, cb) { else if (typeof cb === 'function') { if (!state.finished) { this.once('finish', cb); + this.once('error', (err) => { + if (!state.finished) { + cb(err); + } + }); } else { cb(new ERR_STREAM_ALREADY_FINISHED('end')); } @@ -682,10 +687,16 @@ function endWritable(stream, state, cb) { state.ending = true; finishMaybe(stream, state); if (cb) { - if (state.finished) + if (state.finished) { process.nextTick(cb); - else + } else { stream.once('finish', cb); + stream.once('error', (err) => { + if (!state.finished) { + cb(err); + } + }); + } } state.ended = true; stream.writable = false; diff --git a/test/parallel/test-stream-writable-end-cb-error.js b/test/parallel/test-stream-writable-end-cb-error.js new file mode 100644 index 00000000000000..24989a6d06a111 --- /dev/null +++ b/test/parallel/test-stream-writable-end-cb-error.js @@ -0,0 +1,48 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +{ + // Invoke end callback on failure. + const writable = new stream.Writable(); + + writable._write = (chunk, encoding, cb) => { + process.nextTick(cb, new Error('kaboom')); + }; + + writable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.write('asd'); + writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); +} + +{ + // Don't invoke end callback twice + const writable = new stream.Writable(); + + writable._write = (chunk, encoding, cb) => { + process.nextTick(cb); + }; + + let called = false; + writable.end('asd', common.mustCall((err) => { + called = true; + assert.strictEqual(err, undefined); + })); + + writable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.on('finish', common.mustCall(() => { + assert.strictEqual(called, true); + writable.emit('error', new Error('kaboom')); + })); +} From 40d2140c0562dc9d1b3a84e2e6349cf942e024e3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 2 Oct 2019 14:22:53 +0200 Subject: [PATCH 2/9] fixup: propagate to uncaught --- lib/_stream_writable.js | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 6a1f194051c6db..5853a69d400e22 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -30,6 +30,7 @@ const { Object } = primordials; module.exports = Writable; Writable.WritableState = WritableState; +const EE = require('events'); const internalUtil = require('internal/util'); const Stream = require('stream'); const { Buffer } = require('buffer'); @@ -605,12 +606,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { endWritable(this, state, cb); else if (typeof cb === 'function') { if (!state.finished) { - this.once('finish', cb); - this.once('error', (err) => { - if (!state.finished) { - cb(err); - } - }); + onFinished(this, state, cb); } else { cb(new ERR_STREAM_ALREADY_FINISHED('end')); } @@ -688,14 +684,10 @@ function endWritable(stream, state, cb) { finishMaybe(stream, state); if (cb) { if (state.finished) { + // TODO(ronag): finished before end should be an error. process.nextTick(cb); } else { - stream.once('finish', cb); - stream.once('error', (err) => { - if (!state.finished) { - cb(err); - } - }); + onFinished(stream, state, cb); } } state.ended = true; @@ -716,6 +708,24 @@ function onCorkedFinish(corkReq, state, err) { state.corkedRequestsFree.next = corkReq; } +function onFinished(stream, state, cb) { + function onerror (err) { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(err); + if (EE.listenerCount(stream, 'error') === 0) { + stream.emit('error', err); + } + } + function onfinish () { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(); + } + stream.once('finish', onfinish); + stream.once('error', onerror); +} + Object.defineProperty(Writable.prototype, 'destroyed', { // Making it explicit this property is not enumerable // because otherwise some prototype manipulation in From ffbff6caf60ac53c3ca3adb40ca4fe5338000805 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 2 Oct 2019 14:38:41 +0200 Subject: [PATCH 3/9] fixup: add test for uncaugth --- .../test-stream-writable-end-cb-uncaugth.js | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 test/parallel/test-stream-writable-end-cb-uncaugth.js diff --git a/test/parallel/test-stream-writable-end-cb-uncaugth.js b/test/parallel/test-stream-writable-end-cb-uncaugth.js new file mode 100644 index 00000000000000..d530de6e8bfde2 --- /dev/null +++ b/test/parallel/test-stream-writable-end-cb-uncaugth.js @@ -0,0 +1,23 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); +})); + +const writable = new stream.Writable(); + +writable._write = (chunk, encoding, cb) => { + cb(); +}; +writable._final = (cb) => { + cb(new Error('kaboom')) +} + +writable.write('asd'); +writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); +})); From 0162b1c75c706c2f4491c3db9d7d91af186f61ff Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 2 Oct 2019 14:44:27 +0200 Subject: [PATCH 4/9] fixup: use on --- lib/_stream_writable.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 5853a69d400e22..0145ab4b55da57 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -722,8 +722,8 @@ function onFinished(stream, state, cb) { stream.removeListener('error', onerror); cb(); } - stream.once('finish', onfinish); - stream.once('error', onerror); + stream.on('finish', onfinish); + stream.on('error', onerror); } Object.defineProperty(Writable.prototype, 'destroyed', { From c00303fc7e619811ca3f826967d1c22f39df9b9b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 2 Oct 2019 19:39:20 +0200 Subject: [PATCH 5/9] fixup: linting --- lib/_stream_writable.js | 4 ++-- test/parallel/test-stream-writable-end-cb-uncaugth.js | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 0145ab4b55da57..e7f5da7804915b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -709,7 +709,7 @@ function onCorkedFinish(corkReq, state, err) { } function onFinished(stream, state, cb) { - function onerror (err) { + function onerror(err) { stream.removeListener('finish', onfinish); stream.removeListener('error', onerror); cb(err); @@ -717,7 +717,7 @@ function onFinished(stream, state, cb) { stream.emit('error', err); } } - function onfinish () { + function onfinish() { stream.removeListener('finish', onfinish); stream.removeListener('error', onerror); cb(); diff --git a/test/parallel/test-stream-writable-end-cb-uncaugth.js b/test/parallel/test-stream-writable-end-cb-uncaugth.js index d530de6e8bfde2..ab25cac81b0bee 100644 --- a/test/parallel/test-stream-writable-end-cb-uncaugth.js +++ b/test/parallel/test-stream-writable-end-cb-uncaugth.js @@ -14,8 +14,8 @@ writable._write = (chunk, encoding, cb) => { cb(); }; writable._final = (cb) => { - cb(new Error('kaboom')) -} + cb(new Error('kaboom')); +}; writable.write('asd'); writable.end(common.mustCall((err) => { From e041ae3b483e306f0a1cbfea342ecadfa1562af5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 6 Oct 2019 18:30:26 +0200 Subject: [PATCH 6/9] fixup: another deadlock case --- lib/_stream_writable.js | 18 ++++++---- test/parallel/test-stream-writable-destroy.js | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e7f5da7804915b..7d736eddc56c6a 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -602,9 +602,9 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // Ignore unnecessary end() calls. - if (!state.ending) + if (!state.ending) { endWritable(this, state, cb); - else if (typeof cb === 'function') { + } else if (typeof cb === 'function') { if (!state.finished) { onFinished(this, state, cb); } else { @@ -683,12 +683,10 @@ function endWritable(stream, state, cb) { state.ending = true; finishMaybe(stream, state); if (cb) { - if (state.finished) { - // TODO(ronag): finished before end should be an error. + if (state.finished) process.nextTick(cb); - } else { + else onFinished(stream, state, cb); - } } state.ended = true; stream.writable = false; @@ -709,6 +707,14 @@ function onCorkedFinish(corkReq, state, err) { } function onFinished(stream, state, cb) { + if (state.destroyed && state.errorEmitted) { + // TODO(ronag): Backwards compat. Should be moved to end() without + // errorEmitted check and with errorOrDestroy. + const err = new ERR_STREAM_DESTROYED('end'); + process.nextTick(cb, err); + return; + } + function onerror(err) { stream.removeListener('finish', onfinish); stream.removeListener('error', onerror); diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index c4a96788ab24dd..60e702d4b12a96 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -292,3 +292,39 @@ const assert = require('assert'); })); write.uncork(); } + +{ + // Call end(cb) after error & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(new Error('asd')); } + }); + write.on('error', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + ticked = true; + })); + write.write('asd'); +} + +{ + // Call end(cb) after finish & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + write.on('finish', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, false); + assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); + })); + ticked = true; + })); + write.end(); +} From f71dcd909611b1799a077e9731c077995c449076 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 7 Oct 2019 10:50:36 +0200 Subject: [PATCH 7/9] fixup: don't use EE.listenerCount --- lib/_stream_writable.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 7d736eddc56c6a..fdb17ed33d52f5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -30,7 +30,6 @@ const { Object } = primordials; module.exports = Writable; Writable.WritableState = WritableState; -const EE = require('events'); const internalUtil = require('internal/util'); const Stream = require('stream'); const { Buffer } = require('buffer'); @@ -719,7 +718,7 @@ function onFinished(stream, state, cb) { stream.removeListener('finish', onfinish); stream.removeListener('error', onerror); cb(err); - if (EE.listenerCount(stream, 'error') === 0) { + if (stream.listenerCount('error') === 0) { stream.emit('error', err); } } From aee2077b961c84e9f01653bbe640b0b277c0508a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Oct 2019 16:31:09 +0200 Subject: [PATCH 8/9] fixup: prepend error listener + test --- lib/_stream_writable.js | 4 ++-- test/parallel/test-stream-writable-destroy.js | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index fdb17ed33d52f5..1ec91cbcff0c02 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -727,8 +727,8 @@ function onFinished(stream, state, cb) { stream.removeListener('error', onerror); cb(); } - stream.on('finish', onfinish); - stream.on('error', onerror); + stream.prependListener('finish', onfinish); + stream.prependListener('error', onerror); } Object.defineProperty(Writable.prototype, 'destroyed', { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 60e702d4b12a96..30e4503c05773a 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -328,3 +328,19 @@ const assert = require('assert'); })); write.end(); } + +{ + // Call end(cb) after error & destroy and don't trigger + // unhandled exception. + + const write = new Writable({ + write(chunk, enc, cb) { process.nextTick(cb); } + }); + write.once('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.end('asd', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.destroy(new Error('asd')); +} From 0a482b7e4bfee655173b22cbceaeee31ce322fac Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 11 Oct 2019 21:37:48 +0200 Subject: [PATCH 9/9] fixup: fix finish order --- lib/_stream_writable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 1ec91cbcff0c02..14b538dea784dd 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -727,7 +727,7 @@ function onFinished(stream, state, cb) { stream.removeListener('error', onerror); cb(); } - stream.prependListener('finish', onfinish); + stream.on('finish', onfinish); stream.prependListener('error', onerror); }