From 128c0b711e2610b5c2ba03b283ea86a44270ac28 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 19:52:31 +0300 Subject: [PATCH 01/10] stream: unref wip --- lib/internal/streams/readable.js | 38 ++++++++++++ test/parallel/test-stream-unref.js | 97 ++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 test/parallel/test-stream-unref.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 7a2aad41ee2baa..a3c53f54f63f4c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -33,6 +33,7 @@ const { SafeSet, SymbolAsyncIterator, Symbol, + ReflectApply, } = primordials; module.exports = Readable; @@ -1154,6 +1155,39 @@ async function* createAsyncIterator(stream, options) { } } +function staticUnref(stream) { + // TODO - We can probably have a better way to do this + function unref() { + return async function* unref() { + const iterator = this.iterator({ destroyOnReturn: false }); + + for await (const val of iterator) { + yield val; + } + }.call(this); + } + + // Instead of regular operator registration + const newStream = Readable.from(ReflectApply(unref, stream, [])); + + // TODO - we should have better way instead of listening to those events, no? + // stream.once('close', () => { + // destroyImpl.destroyer(newStream, null); + // }); + stream.once('error', (e) => { + destroyImpl.destroyer(newStream, e); + }); + + newStream.on('error', (e) => { + if (e.name !== 'AbortError') { + destroyImpl.destroyer(stream, e); + } + }); + + return newStream; +} + + // Making it explicit these properties are not enumerable // because otherwise some prototype manipulation in // userland will fail. @@ -1398,6 +1432,10 @@ Readable.from = function(iterable, opts) { return from(Readable, iterable, opts); }; +Readable.unref = function(stream) { + return staticUnref(stream); +}; + let webStreamsAdapters; // Lazy to avoid circular references diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js new file mode 100644 index 00000000000000..69db92878ff4f3 --- /dev/null +++ b/test/parallel/test-stream-unref.js @@ -0,0 +1,97 @@ +'use strict'; +require('../common'); + +const { + Readable, +} = require('stream'); +const { it } = require('node:test'); +const { strictEqual } = require('assert'); + +const { from, unref } = Readable; + +const nextTick = () => new Promise((resolve) => process.nextTick(resolve)); + +it('unref stream error should propagate to original one', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + + let emittedError; + originalStream.on('error', (e) => { + emittedError = e; + }); + const unrefStream = unref(originalStream); + + const thrownError = new Error('test'); + + unrefStream.destroy(thrownError); + + await nextTick(); + strictEqual(unrefStream.destroyed, true); + strictEqual(originalStream.destroyed, true); + + // Need another tick to propagate the error + await nextTick(); + strictEqual(emittedError, thrownError); +}); + +it('Original stream error should propagate to unref one', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + const unrefStream = unref(originalStream); + + let emittedError; + unrefStream.on('error', (e) => { + emittedError = e; + }); + + const thrownError = new Error('test'); + + originalStream.destroy(thrownError); + + await nextTick(); + strictEqual(unrefStream.destroyed, true); + strictEqual(originalStream.destroyed, true); + + await nextTick(); + strictEqual(emittedError, thrownError); +}); + +it('Should not close original stream when unref one finished but not consumed all data', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + + const streamShouldClose = unref(originalStream); + + // eslint-disable-next-line no-unused-vars + for await (const _ of streamShouldClose) { + break; + } + + await nextTick(); + strictEqual(streamShouldClose.destroyed, true); + strictEqual(originalStream.destroyed, false); +}); + +it('Should close original stream when unref one consume all data', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + originalStream.name = 'originalStream'; + + const unrefStream = unref(originalStream); + unrefStream.name = 'unrefStream'; + + // This throw an abort error + await unrefStream.toArray(); + + await nextTick(); + + strictEqual(unrefStream.destroyed, true); + strictEqual(originalStream.destroyed, true); +}); + +// This fail +it('original stream close should close unref one', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + const streamShouldClose = unref(originalStream); + + await originalStream.toArray(); + + strictEqual(originalStream.destroyed, true); + strictEqual(streamShouldClose.destroyed, true); +}); From e2f8efdc58428758f0d8cc62bd7005b40c6a9c9f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 20:42:34 +0300 Subject: [PATCH 02/10] stream: unref wip --- test/parallel/test-stream-unref.js | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index 69db92878ff4f3..b6c8d4778de435 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -57,24 +57,22 @@ it('Original stream error should propagate to unref one', async () => { it('Should not close original stream when unref one finished but not consumed all data', async () => { const originalStream = from([1, 2, 3, 4, 5]); - const streamShouldClose = unref(originalStream); + const unrefStream = unref(originalStream); // eslint-disable-next-line no-unused-vars - for await (const _ of streamShouldClose) { + for await (const _ of unrefStream) { break; } await nextTick(); - strictEqual(streamShouldClose.destroyed, true); + strictEqual(unrefStream.destroyed, true); strictEqual(originalStream.destroyed, false); }); it('Should close original stream when unref one consume all data', async () => { const originalStream = from([1, 2, 3, 4, 5]); - originalStream.name = 'originalStream'; const unrefStream = unref(originalStream); - unrefStream.name = 'unrefStream'; // This throw an abort error await unrefStream.toArray(); @@ -88,10 +86,10 @@ it('Should close original stream when unref one consume all data', async () => { // This fail it('original stream close should close unref one', async () => { const originalStream = from([1, 2, 3, 4, 5]); - const streamShouldClose = unref(originalStream); + const unrefStream = unref(originalStream); await originalStream.toArray(); strictEqual(originalStream.destroyed, true); - strictEqual(streamShouldClose.destroyed, true); + strictEqual(unrefStream.destroyed, true); }); From 4e37e9d91956c1fb02168d41b001c802b9715d03 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 21:24:20 +0300 Subject: [PATCH 03/10] stream: unref wip --- test/parallel/test-stream-unref.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index b6c8d4778de435..4da096e986e98f 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -5,7 +5,7 @@ const { Readable, } = require('stream'); const { it } = require('node:test'); -const { strictEqual } = require('assert'); +const { strictEqual, deepStrictEqual } = require('assert'); const { from, unref } = Readable; @@ -74,8 +74,8 @@ it('Should close original stream when unref one consume all data', async () => { const unrefStream = unref(originalStream); - // This throw an abort error - await unrefStream.toArray(); + const data = await unrefStream.toArray(); + deepStrictEqual(data, [1, 2, 3, 4, 5]); await nextTick(); From 2e83f52fcd0333eaa192efa4a096d44c653f4216 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 21:38:44 +0300 Subject: [PATCH 04/10] stream: fix failed test and use wrap instead --- lib/internal/streams/readable.js | 35 ++++++++++-------------------- test/parallel/test-stream-unref.js | 1 - 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a3c53f54f63f4c..8624331ad9fe22 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -33,7 +33,6 @@ const { SafeSet, SymbolAsyncIterator, Symbol, - ReflectApply, } = primordials; module.exports = Readable; @@ -1156,35 +1155,25 @@ async function* createAsyncIterator(stream, options) { } function staticUnref(stream) { - // TODO - We can probably have a better way to do this - function unref() { - return async function* unref() { - const iterator = this.iterator({ destroyOnReturn: false }); - - for await (const val of iterator) { - yield val; - } - }.call(this); - } - - // Instead of regular operator registration - const newStream = Readable.from(ReflectApply(unref, stream, [])); + const unrefStream = new Readable({ + objectMode: stream.readableObjectMode ?? stream.objectMode ?? true, + // highWaterMark 1 so no cache would exist... TODO - need to revisit this + highWaterMark: 1, + // TODO - what about other options? + destroy(err, callback) { + // Not destroying the stream here as we unref it. + callback(err); + }, + }).wrap(stream); - // TODO - we should have better way instead of listening to those events, no? - // stream.once('close', () => { - // destroyImpl.destroyer(newStream, null); - // }); - stream.once('error', (e) => { - destroyImpl.destroyer(newStream, e); - }); - newStream.on('error', (e) => { + unrefStream.on('error', (e) => { if (e.name !== 'AbortError') { destroyImpl.destroyer(stream, e); } }); - return newStream; + return unrefStream; } diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index 4da096e986e98f..e7415bf3652bc1 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -83,7 +83,6 @@ it('Should close original stream when unref one consume all data', async () => { strictEqual(originalStream.destroyed, true); }); -// This fail it('original stream close should close unref one', async () => { const originalStream = from([1, 2, 3, 4, 5]); const unrefStream = unref(originalStream); From 16083e06790caca22bf6609936a0094f7b6e3805 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 21:48:19 +0300 Subject: [PATCH 05/10] stream: unref fix missed 1 item in cache --- lib/internal/streams/readable.js | 5 +++-- test/parallel/test-stream-unref.js | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 8624331ad9fe22..382a9f3b13f51c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1157,8 +1157,9 @@ async function* createAsyncIterator(stream, options) { function staticUnref(stream) { const unrefStream = new Readable({ objectMode: stream.readableObjectMode ?? stream.objectMode ?? true, - // highWaterMark 1 so no cache would exist... TODO - need to revisit this - highWaterMark: 1, + // highWaterMark 0 as unref is basically a proxy, so don't consume more data + // as we would lose it when continue consuming from the original stream + highWaterMark: 0, // TODO - what about other options? destroy(err, callback) { // Not destroying the stream here as we unref it. diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index e7415bf3652bc1..859fa05b55bfb6 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -69,6 +69,16 @@ it('Should not close original stream when unref one finished but not consumed al strictEqual(originalStream.destroyed, false); }); +it('Should continue consuming the original stream data from where the unref stopped', async () => { + const originalStream = from([1, 2, 3, 4, 5]); + + const firstItem = await unref(originalStream).take(1).toArray(); + deepStrictEqual(firstItem, [1]); + + const restOfData = await originalStream.toArray(); + deepStrictEqual(restOfData, [2, 3, 4, 5]); +}); + it('Should close original stream when unref one consume all data', async () => { const originalStream = from([1, 2, 3, 4, 5]); From 5bc4b919d243323f6de5ea5376a407384399a415 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 14 May 2023 22:22:53 +0300 Subject: [PATCH 06/10] stream: use once when listening to error event --- lib/internal/streams/readable.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 382a9f3b13f51c..b87ebd3f058e7c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1167,8 +1167,7 @@ function staticUnref(stream) { }, }).wrap(stream); - - unrefStream.on('error', (e) => { + unrefStream.once('error', (e) => { if (e.name !== 'AbortError') { destroyImpl.destroyer(stream, e); } @@ -1422,9 +1421,7 @@ Readable.from = function(iterable, opts) { return from(Readable, iterable, opts); }; -Readable.unref = function(stream) { - return staticUnref(stream); -}; +Readable.unref = staticUnref; let webStreamsAdapters; From b59b8dce9c30735b547261ec8c002039c6069f7c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 9 Jun 2023 16:08:53 +0300 Subject: [PATCH 07/10] stream: add test for detecting memory leak --- test/parallel/test-stream-unref.js | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index 859fa05b55bfb6..2bfb6a51861371 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -3,6 +3,8 @@ require('../common'); const { Readable, + pipeline, + PassThrough } = require('stream'); const { it } = require('node:test'); const { strictEqual, deepStrictEqual } = require('assert'); @@ -102,3 +104,39 @@ it('original stream close should close unref one', async () => { strictEqual(originalStream.destroyed, true); strictEqual(unrefStream.destroyed, true); }); + +it('make sure not leaking memory', async () => { + function getMemoryAllocatedInMB() { + return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100; + } + + const bigData = () => from(async function* () { + const obj = Array.from({length: 100000}, () => (Array.from({length: 15}, (_, i) => i))) + while (true) { + yield obj.map((item) => item.slice(0)); + await new Promise((resolve) => setTimeout(resolve, 1)); + } + }()); + + const originalStream = pipeline(bigData(), new PassThrough({objectMode: true}), () => { + }); + const unrefStream = unref(originalStream); + originalStream.iterator({destroyOnReturn: true}) + + // making sure some data passed so we won't catch something that is related to the infra + const iterator = originalStream.iterator({destroyOnReturn: true}); + for (let j = 0; j < 10; j++) { + await iterator.next() + } + + const currentMemory = getMemoryAllocatedInMB(); + + for (let j = 0; j < 10; j++) { + await iterator.next() + } + + const newMemory = getMemoryAllocatedInMB(); + + originalStream.destroy(null); + strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`); +}); From 489cebf8eac6ab3fa942b0c6eb356edbf1a12061 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 9 Jun 2023 16:09:36 +0300 Subject: [PATCH 08/10] stream: try to fix, this can have major consonances, really WIP just want the CI to run --- lib/internal/streams/readable.js | 35 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index b87ebd3f058e7c..8dd1cbe70e3c47 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -108,7 +108,6 @@ function ReadableState(options, stream, isDuplex) { // linked list can remove elements from the beginning faster than // array.shift(). this.buffer = new BufferList(); - this.length = 0; this.pipes = []; this.flowing = null; this.ended = false; @@ -191,6 +190,16 @@ function ReadableState(options, stream, isDuplex) { } +ObjectDefineProperties(ReadableState.prototype, { + length: { + __proto__: null, + get() { + return this.buffer.length; + }, + }, +}); + + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -1108,7 +1117,7 @@ async function* createAsyncIterator(stream, options) { let callback = nop; function next(resolve) { - if (this === stream) { + if (this === stream || !resolve) { callback(); callback = nop; } else { @@ -1155,23 +1164,15 @@ async function* createAsyncIterator(stream, options) { } function staticUnref(stream) { - const unrefStream = new Readable({ - objectMode: stream.readableObjectMode ?? stream.objectMode ?? true, - // highWaterMark 0 as unref is basically a proxy, so don't consume more data - // as we would lose it when continue consuming from the original stream - highWaterMark: 0, - // TODO - what about other options? - destroy(err, callback) { - // Not destroying the stream here as we unref it. - callback(err); - }, - }).wrap(stream); + const unrefStream = Object.create(stream, { + }); - unrefStream.once('error', (e) => { - if (e.name !== 'AbortError') { - destroyImpl.destroyer(stream, e); + unrefStream._destroy = (err) => { + if (err?.name !== 'AbortError') { + destroyImpl.destroyer(stream, err); } - }); + }; + unrefStream._readableState = Object.create(stream._readableState); return unrefStream; } From 930b34cccc170c3d48e043fa470b558dfe0fccb7 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 9 Jun 2023 16:17:20 +0300 Subject: [PATCH 09/10] stream: format --- lib/internal/streams/readable.js | 5 +++-- test/parallel/test-stream-unref.js | 16 ++++++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 8dd1cbe70e3c47..410e239d34412e 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -33,6 +33,7 @@ const { SafeSet, SymbolAsyncIterator, Symbol, + ObjectCreate, } = primordials; module.exports = Readable; @@ -1164,7 +1165,7 @@ async function* createAsyncIterator(stream, options) { } function staticUnref(stream) { - const unrefStream = Object.create(stream, { + const unrefStream = ObjectCreate(stream, { }); unrefStream._destroy = (err) => { @@ -1172,7 +1173,7 @@ function staticUnref(stream) { destroyImpl.destroyer(stream, err); } }; - unrefStream._readableState = Object.create(stream._readableState); + unrefStream._readableState = { __proto__: stream._readableState }; return unrefStream; } diff --git a/test/parallel/test-stream-unref.js b/test/parallel/test-stream-unref.js index 2bfb6a51861371..60935374fa27f8 100644 --- a/test/parallel/test-stream-unref.js +++ b/test/parallel/test-stream-unref.js @@ -111,28 +111,28 @@ it('make sure not leaking memory', async () => { } const bigData = () => from(async function* () { - const obj = Array.from({length: 100000}, () => (Array.from({length: 15}, (_, i) => i))) + const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i))); while (true) { yield obj.map((item) => item.slice(0)); await new Promise((resolve) => setTimeout(resolve, 1)); } }()); - const originalStream = pipeline(bigData(), new PassThrough({objectMode: true}), () => { + const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => { }); - const unrefStream = unref(originalStream); - originalStream.iterator({destroyOnReturn: true}) + unref(originalStream); + originalStream.iterator({ destroyOnReturn: true }); - // making sure some data passed so we won't catch something that is related to the infra - const iterator = originalStream.iterator({destroyOnReturn: true}); + // Making sure some data passed so we won't catch something that is related to the infra + const iterator = originalStream.iterator({ destroyOnReturn: true }); for (let j = 0; j < 10; j++) { - await iterator.next() + await iterator.next(); } const currentMemory = getMemoryAllocatedInMB(); for (let j = 0; j < 10; j++) { - await iterator.next() + await iterator.next(); } const newMemory = getMemoryAllocatedInMB(); From e699f2a2e508d37b892af963b15c9cf510cdbc1c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Fri, 9 Jun 2023 17:29:02 +0300 Subject: [PATCH 10/10] Revert "stream: try to fix, this can have major consonances, really WIP" This reverts commit 489cebf8 --- lib/internal/streams/readable.js | 36 +++++++++++++++----------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 410e239d34412e..b87ebd3f058e7c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -33,7 +33,6 @@ const { SafeSet, SymbolAsyncIterator, Symbol, - ObjectCreate, } = primordials; module.exports = Readable; @@ -109,6 +108,7 @@ function ReadableState(options, stream, isDuplex) { // linked list can remove elements from the beginning faster than // array.shift(). this.buffer = new BufferList(); + this.length = 0; this.pipes = []; this.flowing = null; this.ended = false; @@ -191,16 +191,6 @@ function ReadableState(options, stream, isDuplex) { } -ObjectDefineProperties(ReadableState.prototype, { - length: { - __proto__: null, - get() { - return this.buffer.length; - }, - }, -}); - - function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -1118,7 +1108,7 @@ async function* createAsyncIterator(stream, options) { let callback = nop; function next(resolve) { - if (this === stream || !resolve) { + if (this === stream) { callback(); callback = nop; } else { @@ -1165,15 +1155,23 @@ async function* createAsyncIterator(stream, options) { } function staticUnref(stream) { - const unrefStream = ObjectCreate(stream, { - }); + const unrefStream = new Readable({ + objectMode: stream.readableObjectMode ?? stream.objectMode ?? true, + // highWaterMark 0 as unref is basically a proxy, so don't consume more data + // as we would lose it when continue consuming from the original stream + highWaterMark: 0, + // TODO - what about other options? + destroy(err, callback) { + // Not destroying the stream here as we unref it. + callback(err); + }, + }).wrap(stream); - unrefStream._destroy = (err) => { - if (err?.name !== 'AbortError') { - destroyImpl.destroyer(stream, err); + unrefStream.once('error', (e) => { + if (e.name !== 'AbortError') { + destroyImpl.destroyer(stream, e); } - }; - unrefStream._readableState = { __proto__: stream._readableState }; + }); return unrefStream; }