From 24ef7d6ec8243962e983326de22175cad0060f96 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 29 May 2023 00:05:12 +0530 Subject: [PATCH 1/3] lib: fix blob.stream() causing hanging promises Refs: https://github.com/nodejs/node/issues/47993#issuecomment-1546901936 --- lib/internal/blob.js | 60 ++++++++++++++++++++------------------ test/parallel/test-blob.js | 31 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/lib/internal/blob.js b/lib/internal/blob.js index ee8e1c75819ab9..8b8963710fa7fa 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -329,34 +329,38 @@ class Blob { pull(c) { const { promise, resolve, reject } = createDeferredPromise(); this.pendingPulls.push({ resolve, reject }); - reader.pull((status, buffer) => { - // If pendingPulls is empty here, the stream had to have - // been canceled, and we don't really care about the result. - // we can simply exit. - if (this.pendingPulls.length === 0) { - return; - } - const pending = this.pendingPulls.shift(); - if (status === 0) { - // EOS - c.close(); - pending.resolve(); - return; - } else if (status < 0) { - // The read could fail for many different reasons when reading - // from a non-memory resident blob part (e.g. file-backed blob). - // The error details the system error code. - const error = lazyDOMException('The blob could not be read', 'NotReadableError'); - - c.error(error); - pending.reject(error); - return; - } - if (buffer !== undefined) { - c.enqueue(new Uint8Array(buffer)); - } - pending.resolve(); - }); + const readNext = () => { + reader.pull((status, buffer) => { + // If pendingPulls is empty here, the stream had to have + // been canceled, and we don't really care about the result. + // We can simply exit. + if (this.pendingPulls.length === 0) { + return; + } + if (status === 0) { + // EOS + c.close(); + const pending = this.pendingPulls.shift(); + pending.resolve(); + return; + } else if (status < 0) { + // The read could fail for many different reasons when reading + // from a non-memory resident blob part (e.g. file-backed blob). + // The error details the system error code. + const error = lazyDOMException('The blob could not be read', 'NotReadableError'); + const pending = this.pendingPulls.shift(); + c.error(error); + pending.reject(error); + return; + } + if (buffer !== undefined) { + c.enqueue(new Uint8Array(buffer)); + } + // We keep reading until we either reach EOS or some error + queueMicrotask(() => readNext()); + }); + }; + readNext(); return promise; }, cancel(reason) { diff --git a/test/parallel/test-blob.js b/test/parallel/test-blob.js index 6b6ce70687660e..494e5cebdb3f94 100644 --- a/test/parallel/test-blob.js +++ b/test/parallel/test-blob.js @@ -237,6 +237,37 @@ assert.throws(() => new Blob({}), { assert(res.done); })().then(common.mustCall()); +(async () => { + const b = new Blob(Array(10).fill('hello')); + const reader = b.stream().getReader(); + const chunks = []; + while (true) { + const res = await reader.read(); + if (res.done) break; + assert.strictEqual(res.value.byteLength, 5); + chunks.push(res.value); + } + assert.strictEqual(chunks.length, 10); +})().then(common.mustCall()); + +(async () => { + const b = new Blob(Array(10).fill('hello')); + const reader = b.stream().getReader(); + const chunks = []; + while (true) { + const res = await reader.read(); + if (chunks.length === 5) { + reader.cancel('boom'); + break; + } + if (res.done) break; + assert.strictEqual(res.value.byteLength, 5); + chunks.push(res.value); + } + assert.strictEqual(chunks.length, 5); + reader.closed.then(common.mustCall()); +})().then(common.mustCall()); + { const b = new Blob(['hello\n'], { endings: 'native' }); assert.strictEqual(b.size, EOL.length + 5); From 3f321a194cc00d273a913eb990e4cc9f3939856b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 30 May 2023 18:55:53 +0530 Subject: [PATCH 2/3] fixup! add back pressure --- lib/internal/blob.js | 8 +++++++- test/parallel/test-blob.js | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/internal/blob.js b/lib/internal/blob.js index 8b8963710fa7fa..9f3f50d8348b24 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -357,7 +357,13 @@ class Blob { c.enqueue(new Uint8Array(buffer)); } // We keep reading until we either reach EOS or some error - queueMicrotask(() => readNext()); + queueMicrotask(() => { + if (c.desiredSize <= 0) { + // A manual backpressure check. + return; + } + readNext(); + }); }); }; readNext(); diff --git a/test/parallel/test-blob.js b/test/parallel/test-blob.js index 494e5cebdb3f94..27dee5690d7e06 100644 --- a/test/parallel/test-blob.js +++ b/test/parallel/test-blob.js @@ -1,4 +1,4 @@ -// Flags: --no-warnings +// Flags: --no-warnings --expose-internals 'use strict'; const common = require('../common'); @@ -6,6 +6,7 @@ const assert = require('assert'); const { Blob } = require('buffer'); const { inspect } = require('util'); const { EOL } = require('os'); +const { kState } = require('internal/webstreams/util'); { const b = new Blob(); @@ -268,6 +269,19 @@ assert.throws(() => new Blob({}), { reader.closed.then(common.mustCall()); })().then(common.mustCall()); +(async () => { + const b = new Blob(Array(10).fill('hello')); + const stream = b.stream(); + const reader = stream.getReader(); + assert.strictEqual(stream[kState].controller.desiredSize, 1); + const { value, done } = await reader.read(); + assert.strictEqual(value.byteLength, 5); + assert(!done); + setTimeout(() => { + assert.strictEqual(stream[kState].controller.desiredSize, 0); + }, 0); +})().then(common.mustCall()); + { const b = new Blob(['hello\n'], { endings: 'native' }); assert.strictEqual(b.size, EOL.length + 5); From fd56c64ce21500e88b7b6d728b47042ab56dfc81 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Wed, 7 Jun 2023 14:31:39 +0530 Subject: [PATCH 3/3] Update lib/internal/blob.js Co-authored-by: Matteo Collina --- lib/internal/blob.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/blob.js b/lib/internal/blob.js index 9f3f50d8348b24..0d0e9906dbdb31 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -356,7 +356,8 @@ class Blob { if (buffer !== undefined) { c.enqueue(new Uint8Array(buffer)); } - // We keep reading until we either reach EOS or some error + // We keep reading until we either reach EOS, some error, or we + // hit the flow rate of the stream (c.desiredSize). queueMicrotask(() => { if (c.desiredSize <= 0) { // A manual backpressure check.