From ed91aaf9a2ca861fa132b4aa8f89223258762f3f Mon Sep 17 00:00:00 2001 From: neil Date: Sun, 13 Oct 2024 02:05:22 +0800 Subject: [PATCH 01/11] fix(fetch): ensure proper cancellation of child streams when parent stream is cancelled - Modified `resolveResponse` to properly handle the cancellation of child streams by overriding the `cancel` method on the parent response's body. - Updated unit tests to cover scenarios for both successful stream resolution and error cases involving stream cancellation. --- packages/utils/src/instrument/fetch.ts | 105 ++++++++++-------- packages/utils/test/instrument/fetch.test.ts | 110 ++++++++++++++++++- 2 files changed, 171 insertions(+), 44 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index ad28edf81e3f..6dd84b346fb5 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -115,55 +115,75 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat }); } -async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise { - if (res && res.body) { - const body = res.body; - const responseReader = body.getReader(); - - // Define a maximum duration after which we just cancel - const maxFetchDurationTimeout = setTimeout( - () => { - body.cancel().then(null, () => { - // noop - }); - }, - 90 * 1000, // 90s - ); - - let readingActive = true; - while (readingActive) { - let chunkTimeout; - try { - // abort reading if read op takes more than 5s - chunkTimeout = setTimeout(() => { - body.cancel().then(null, () => { - // noop on error - }); - }, 5000); - - // This .read() call will reject/throw when we abort due to timeouts through `body.cancel()` - const { done } = await responseReader.read(); +async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedResolving: () => void) { + let running = true; + while (running) { + try { + // This .read() call will reject/throw when `reader.cancel()` + const { done } = await reader.read(); - clearTimeout(chunkTimeout); + running = !done; - if (done) { - onFinishedResolving(); - readingActive = false; - } - } catch (error) { - readingActive = false; - } finally { - clearTimeout(chunkTimeout); + if (done) { + onFinishedResolving(); } + } catch (_) { + running = false; + } + } +} + +export function resolveResponse(res: Response, parentRes: Response, onFinishedResolving: () => void) { + if (!res.body || !parentRes.body) { + if (res.body) { + res.body.cancel(); } - clearTimeout(maxFetchDurationTimeout); + return; + } + + const body = res.body; + const parentBody = parentRes.body; + const responseReader = body.getReader(); - responseReader.releaseLock(); - body.cancel().then(null, () => { - // noop on error + const originalCancel = parentBody.cancel; + + // Override cancel method on parent response's body + parentBody.cancel = async (reason?: any) => { + responseReader.cancel("Cancelled by parent stream").catch((err) => { + console.error('Error during responseReader cancellation:', err); }); + + await originalCancel.call(parentBody, reason); } + + const originalGetReader = parentRes.body.getReader; + + // Override getReader on parent response's body + parentBody.getReader = ((opts?: any) => { + const reader = originalGetReader.call(parentBody, opts); + + const originalReaderCancel = reader.cancel; + + reader.cancel = async (reason?: any) => { + responseReader.cancel("Cancelled by parent reader").catch((err) => { + console.error('Error during responseReader cancellation:', err); + }); + + await originalReaderCancel.call(reader, reason); + } + + return reader; + }) as any + + resloveReader(responseReader, onFinishedResolving).finally(() => { + try { + responseReader.releaseLock(); + body.cancel().catch(() => { }); + } catch (_) { + // noop on error + } + }); } function streamHandler(response: Response): void { @@ -175,8 +195,7 @@ function streamHandler(response: Response): void { return; } - // eslint-disable-next-line @typescript-eslint/no-floating-promises - resolveResponse(clonedResponseForResolving, () => { + resolveResponse(clonedResponseForResolving, response, () => { triggerHandlers('fetch-body-resolved', { endTimestamp: timestampInSeconds() * 1000, response, diff --git a/packages/utils/test/instrument/fetch.test.ts b/packages/utils/test/instrument/fetch.test.ts index ea29e0c16c3e..4b26bc4ea6c9 100644 --- a/packages/utils/test/instrument/fetch.test.ts +++ b/packages/utils/test/instrument/fetch.test.ts @@ -1,4 +1,12 @@ -import { parseFetchArgs } from '../../src/instrument/fetch'; +import { parseFetchArgs, resolveResponse } from '../../src/instrument/fetch'; + +async function delay(ms: number) { + await new Promise((res) => { + setTimeout(() => { + res(true); + }, ms); + }) +} describe('instrument > parseFetchArgs', () => { it.each([ @@ -27,3 +35,103 @@ describe('instrument > parseFetchArgs', () => { expect(actual).toEqual(expected); }); }); + +describe('instrument > fetch > resolveResponse', () => { + let mockReader: jest.Mocked>; + let mockResponse: jest.Mocked; + let mockParentResponse: jest.Mocked; + let mockParentReader: jest.Mocked>; + let onFinishedResolving: jest.Mock; + + beforeEach(() => { + mockReader = { + read: jest.fn(), + cancel: jest.fn(async (reason?: any) => { + // Set read to reject on next call after cancel + mockReader.read.mockRejectedValueOnce(new Error(reason)); + }), + releaseLock: jest.fn(), + } as unknown as jest.Mocked>; + + mockResponse = { + body: { + getReader: jest.fn(() => mockReader), + cancel: jest.fn(), + } as unknown as ReadableStream + } as jest.Mocked; + + mockParentReader = { + read: jest.fn(), + cancel: jest.fn().mockResolvedValue(undefined), + releaseLock: jest.fn(), + } as unknown as jest.Mocked>; + + mockParentResponse = { + body: { + cancel: jest.fn().mockResolvedValue(undefined), + getReader: jest.fn(() => mockParentReader), + } as unknown as ReadableStream + } as jest.Mocked; + + onFinishedResolving = jest.fn(); + }); + + test('should call onFinishedResolving when the stream is fully read', async () => { + mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk' }) + .mockResolvedValueOnce({ done: true, value: null }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + // wait 100ms so all promise can be resolved/rejected + await delay(100); + + expect(mockReader.read).toHaveBeenCalledTimes(2); + expect(onFinishedResolving).toHaveBeenCalled(); + }); + + test('should handle read errors gracefully', async () => { + mockReader.read.mockRejectedValue(new Error('Read error')); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + expect(mockReader.releaseLock).toHaveBeenCalled(); + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); + + test('should cancel reader and gracefully exit when parent response is cancelled', async () => { + mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk1' }) + .mockResolvedValueOnce({ done: false, value: 'chunk2' }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + await Promise.resolve(); + await mockParentResponse.body?.cancel(); + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + expect(mockReader.releaseLock).toHaveBeenCalled(); + expect(mockReader.cancel).toHaveBeenCalled(); + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); + + test('should cancel reader and gracefully exit when parent reader is cancelled', async () => { + mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk1' }) + .mockResolvedValueOnce({ done: false, value: 'chunk2' }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + const parentReader = mockParentResponse.body!.getReader(); + await Promise.resolve(); + + await parentReader.cancel(); + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + expect(mockReader.releaseLock).toHaveBeenCalled(); + expect(mockReader.cancel).toHaveBeenCalled(); + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); +}); From f87869565a6dc286b4b4b8e8d95e0617819d2673 Mon Sep 17 00:00:00 2001 From: neil Date: Sun, 13 Oct 2024 02:31:30 +0800 Subject: [PATCH 02/11] style(fetch): fix formatting --- packages/utils/src/instrument/fetch.ts | 12 ++++++------ packages/utils/test/instrument/fetch.test.ts | 19 +++++++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index 6dd84b346fb5..4a7ded5d69f8 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -150,12 +150,12 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe // Override cancel method on parent response's body parentBody.cancel = async (reason?: any) => { - responseReader.cancel("Cancelled by parent stream").catch((err) => { + responseReader.cancel('Cancelled by parent stream').catch(err => { console.error('Error during responseReader cancellation:', err); }); await originalCancel.call(parentBody, reason); - } + }; const originalGetReader = parentRes.body.getReader; @@ -166,20 +166,20 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe const originalReaderCancel = reader.cancel; reader.cancel = async (reason?: any) => { - responseReader.cancel("Cancelled by parent reader").catch((err) => { + responseReader.cancel('Cancelled by parent reader').catch(err => { console.error('Error during responseReader cancellation:', err); }); await originalReaderCancel.call(reader, reason); - } + }; return reader; - }) as any + }) as any; resloveReader(responseReader, onFinishedResolving).finally(() => { try { responseReader.releaseLock(); - body.cancel().catch(() => { }); + body.cancel().catch(() => {}); } catch (_) { // noop on error } diff --git a/packages/utils/test/instrument/fetch.test.ts b/packages/utils/test/instrument/fetch.test.ts index 4b26bc4ea6c9..377e629e41f3 100644 --- a/packages/utils/test/instrument/fetch.test.ts +++ b/packages/utils/test/instrument/fetch.test.ts @@ -1,11 +1,11 @@ import { parseFetchArgs, resolveResponse } from '../../src/instrument/fetch'; async function delay(ms: number) { - await new Promise((res) => { + await new Promise(res => { setTimeout(() => { res(true); }, ms); - }) + }); } describe('instrument > parseFetchArgs', () => { @@ -57,7 +57,7 @@ describe('instrument > fetch > resolveResponse', () => { body: { getReader: jest.fn(() => mockReader), cancel: jest.fn(), - } as unknown as ReadableStream + } as unknown as ReadableStream, } as jest.Mocked; mockParentReader = { @@ -70,15 +70,16 @@ describe('instrument > fetch > resolveResponse', () => { body: { cancel: jest.fn().mockResolvedValue(undefined), getReader: jest.fn(() => mockParentReader), - } as unknown as ReadableStream + } as unknown as ReadableStream, } as jest.Mocked; onFinishedResolving = jest.fn(); }); test('should call onFinishedResolving when the stream is fully read', async () => { - mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk' }) - .mockResolvedValueOnce({ done: true, value: null }); + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk' }) + .mockResolvedValueOnce({ done: true, value: null }); resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); @@ -102,7 +103,8 @@ describe('instrument > fetch > resolveResponse', () => { }); test('should cancel reader and gracefully exit when parent response is cancelled', async () => { - mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk1' }) + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk1' }) .mockResolvedValueOnce({ done: false, value: 'chunk2' }); resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); @@ -118,7 +120,8 @@ describe('instrument > fetch > resolveResponse', () => { }); test('should cancel reader and gracefully exit when parent reader is cancelled', async () => { - mockReader.read.mockResolvedValueOnce({ done: false, value: 'chunk1' }) + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk1' }) .mockResolvedValueOnce({ done: false, value: 'chunk2' }); resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); From 2c6d34e26a387ed83f39213bf900d8ca2cb666e6 Mon Sep 17 00:00:00 2001 From: neil Date: Tue, 15 Oct 2024 02:40:16 +0800 Subject: [PATCH 03/11] fix: resolve multiple ESLint issues --- packages/utils/src/instrument/fetch.ts | 44 +++++++++++++------- packages/utils/test/instrument/fetch.test.ts | 9 ++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index 4a7ded5d69f8..afdc69dc79e4 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -115,7 +115,7 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat }); } -async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedResolving: () => void) { +async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { let running = true; while (running) { try { @@ -133,10 +133,23 @@ async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedReso } } -export function resolveResponse(res: Response, parentRes: Response, onFinishedResolving: () => void) { +/** + * Resolves the body stream of a `Response` object and links its cancellation to a parent `Response` body. + * + * This function attaches a custom `cancel` behavior to both the parent `Response` body and its `getReader()` method. + * When the parent stream or its reader is canceled, it triggers the cancellation of the child stream as well. + * The function also monitors the resolution of the child's body stream using `resloveReader` and performs cleanup. + * + * @param {Response} res - The `Response` object whose body stream will be resolved. + * @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`. + * @param {() => void} onFinishedResolving - A callback function to be invoked when the body stream of `res` is fully resolved. + */ +export function resolveResponse(res: Response, parentRes: Response, onFinishedResolving: () => void): void { if (!res.body || !parentRes.body) { if (res.body) { - res.body.cancel(); + res.body.cancel().catch(_ => { + // noop on error + }); } return; @@ -146,31 +159,32 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe const parentBody = parentRes.body; const responseReader = body.getReader(); - const originalCancel = parentBody.cancel; + const originalCancel = parentBody.cancel.bind(parentBody) as (reason?: any) => Promise; // Override cancel method on parent response's body parentBody.cancel = async (reason?: any) => { - responseReader.cancel('Cancelled by parent stream').catch(err => { - console.error('Error during responseReader cancellation:', err); + responseReader.cancel('Cancelled by parent stream').catch(_ => { + // noop on error }); - await originalCancel.call(parentBody, reason); + await originalCancel(reason); }; - const originalGetReader = parentRes.body.getReader; + const originalGetReader = parentRes.body.getReader.bind(parentBody) as + (options: ReadableStreamGetReaderOptions) => ReadableStreamDefaultReader; // Override getReader on parent response's body parentBody.getReader = ((opts?: any) => { - const reader = originalGetReader.call(parentBody, opts); + const reader = originalGetReader(opts) as ReadableStreamDefaultReader; - const originalReaderCancel = reader.cancel; + const originalReaderCancel = reader.cancel.bind(reader) as (reason?: any) => Promise; reader.cancel = async (reason?: any) => { - responseReader.cancel('Cancelled by parent reader').catch(err => { - console.error('Error during responseReader cancellation:', err); + responseReader.cancel('Cancelled by parent reader').catch(_ => { + // noop on error }); - await originalReaderCancel.call(reader, reason); + await originalReaderCancel(reason); }; return reader; @@ -179,7 +193,9 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe resloveReader(responseReader, onFinishedResolving).finally(() => { try { responseReader.releaseLock(); - body.cancel().catch(() => {}); + body.cancel().catch(() => { + // noop on error + }); } catch (_) { // noop on error } diff --git a/packages/utils/test/instrument/fetch.test.ts b/packages/utils/test/instrument/fetch.test.ts index 377e629e41f3..153b180911cf 100644 --- a/packages/utils/test/instrument/fetch.test.ts +++ b/packages/utils/test/instrument/fetch.test.ts @@ -86,6 +86,7 @@ describe('instrument > fetch > resolveResponse', () => { // wait 100ms so all promise can be resolved/rejected await delay(100); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.read).toHaveBeenCalledTimes(2); expect(onFinishedResolving).toHaveBeenCalled(); }); @@ -98,7 +99,9 @@ describe('instrument > fetch > resolveResponse', () => { await delay(100); expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockResponse.body?.cancel).toHaveBeenCalled(); }); @@ -114,8 +117,11 @@ describe('instrument > fetch > resolveResponse', () => { await delay(100); expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.cancel).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockResponse.body?.cancel).toHaveBeenCalled(); }); @@ -133,8 +139,11 @@ describe('instrument > fetch > resolveResponse', () => { await delay(100); expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockReader.cancel).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method expect(mockResponse.body?.cancel).toHaveBeenCalled(); }); }); From f3aaa5d7cdba459e66f9fcb5b62c3b81786be5e1 Mon Sep 17 00:00:00 2001 From: neil Date: Tue, 15 Oct 2024 22:35:53 +0800 Subject: [PATCH 04/11] Immediate stream cancellation after timeout in `_tryGetResponseText` --- .../src/coreHandlers/util/fetchUtils.ts | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts b/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts index 6502206b58b6..76285460764f 100644 --- a/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts +++ b/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts @@ -288,23 +288,40 @@ function _tryCloneResponse(response: Response): Response | void { * Fetch can return a streaming body, that may not resolve (or not for a long time). * If that happens, we rather abort after a short time than keep waiting for this. */ -function _tryGetResponseText(response: Response): Promise { - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => reject(new Error('Timeout while trying to read response body')), 500); - - _getResponseText(response) - .then( - txt => resolve(txt), - reason => reject(reason), - ) - .finally(() => clearTimeout(timeout)); - }); +async function _tryGetResponseText(response: Response): Promise { + if (!response.body) { + throw new Error('Response has no body'); + } - return _getResponseText(response); -} + const reader = response.body.getReader(); + + const decoder = new TextDecoder(); + let result = ''; + let running = true; -async function _getResponseText(response: Response): Promise { - // Force this to be a promise, just to be safe - // eslint-disable-next-line no-return-await - return await response.text(); + const timeout = setTimeout(() => { + if (running) { + reader.cancel('Timeout while trying to read response body').catch(_ => { + // This block is only triggered when stream has already been released, + // so it's safe to ignore. + }); + } + }, 500); + + try { + while (running) { + const { value, done } = await reader.read(); + + running = !done; + + if (value) { + result += decoder.decode(value, { stream: !done }); + } + } + } finally { + clearTimeout(timeout); + reader.releaseLock(); + } + + return result; } From 6d6681439bdc114ab2bfc79cc4afbf86268581e7 Mon Sep 17 00:00:00 2001 From: neil Date: Fri, 18 Oct 2024 09:48:24 +0800 Subject: [PATCH 05/11] fix file formatting --- packages/utils/src/instrument/fetch.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index afdc69dc79e4..dd5b02bd52b6 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -170,8 +170,9 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe await originalCancel(reason); }; - const originalGetReader = parentRes.body.getReader.bind(parentBody) as - (options: ReadableStreamGetReaderOptions) => ReadableStreamDefaultReader; + const originalGetReader = parentRes.body.getReader.bind(parentBody) as ( + options: ReadableStreamGetReaderOptions, + ) => ReadableStreamDefaultReader; // Override getReader on parent response's body parentBody.getReader = ((opts?: any) => { From 0d101065209f6f3d1c7c93ed8b94207ed46fe4c6 Mon Sep 17 00:00:00 2001 From: neil Date: Fri, 18 Oct 2024 11:36:11 +0800 Subject: [PATCH 06/11] Update test cases to handle new logic in fetchUtils --- .../handleNetworkBreadcrumbs.test.ts | 39 +++++++++ .../unit/coreHandlers/util/fetchUtils.test.ts | 79 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts b/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts index 04955477f679..d92d1617b3fb 100644 --- a/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts +++ b/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts @@ -40,6 +40,8 @@ function getMockResponse(contentLength?: string, body?: string, headers?: Record ...headers, }; + const encoder = new TextEncoder(); + const response = { headers: { has: (prop: string) => { @@ -49,6 +51,24 @@ function getMockResponse(contentLength?: string, body?: string, headers?: Record return internalHeaders[prop.toLowerCase() ?? '']; }, }, + body: { + getReader: () => { + return { + read: () => { + return Promise.resolve({ + done: true, + value: encoder.encode(body), + }); + }, + cancel: async () => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.resolve(body), } as unknown as Response; @@ -741,6 +761,7 @@ other-header: test`; options.networkCaptureBodies = true; const largeBody = JSON.stringify({ a: LARGE_BODY }); + const encoder = new TextEncoder(); const breadcrumb: Breadcrumb = { category: 'fetch', @@ -756,6 +777,24 @@ other-header: test`; get: () => '', }, clone: () => mockResponse, + body: { + getReader: () => { + return { + read: () => { + return Promise.resolve({ + done: true, + value: encoder.encode(largeBody), + }); + }, + cancel: async () => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, text: () => Promise.resolve(largeBody), } as unknown as Response; diff --git a/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts b/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts index 4da9ecab639e..7689cf3fd6b0 100644 --- a/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts +++ b/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts @@ -43,6 +43,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with text body', async () => { + const encoder = new TextEncoder(); + + const mockRead = vi + .fn() + .mockResolvedValueOnce({ + value: encoder.encode('text body'), + done: false, + }) + .mockResolvedValueOnce({ + value: null, + done: true, + }); + const response = { headers: { has: () => { @@ -52,6 +65,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: mockRead, + cancel: async (reason?: any) => { + mockRead.mockRejectedValue(new Error(reason)); + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.resolve('text body'), } as unknown as Response; @@ -74,6 +100,8 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with body that fails', async () => { + const mockRead = vi.fn().mockRejectedValueOnce(new Error('cannot read')); + const response = { headers: { has: () => { @@ -83,6 +111,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: mockRead, + cancel: async (_?: any) => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.reject('cannot read'), } as unknown as Response; @@ -105,6 +146,12 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with body that times out', async () => { + const encoder = new TextEncoder(); + const mockRead = vi.fn(); + + let cancelled = false; + let cancellReason = ''; + const response = { headers: { has: () => { @@ -114,6 +161,38 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: async () => { + if (cancelled) { + mockRead.mockRejectedValue(new Error(cancellReason)); + } else { + mockRead.mockResolvedValueOnce({ + value: encoder.encode('chunk'), + done: false, + }); + } + + await new Promise(res => { + setTimeout(() => { + res(1); + }, 200); + }); + + // eslint-disable-next-line no-return-await + return await mockRead(); + }, + cancel: async (reason?: any) => { + cancelled = true; + cancellReason = reason; + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => new Promise(resolve => setTimeout(() => resolve('text body'), 1000)), } as unknown as Response; From 78ac95615dcdd73838a68b8ea6326405aff20573 Mon Sep 17 00:00:00 2001 From: neil Date: Sat, 19 Oct 2024 08:27:02 +0800 Subject: [PATCH 07/11] feat: define whatwg's stream types --- packages/types/src/index.ts | 22 ++++++------ packages/types/src/instrument.ts | 10 ++---- packages/types/src/webfetchapi.ts | 17 --------- packages/types/src/whatwg/fetch.ts | 38 ++++++++++++++++++++ packages/types/src/whatwg/index.ts | 3 ++ packages/types/src/whatwg/stream.ts | 23 ++++++++++++ packages/utils/src/instrument/fetch.ts | 18 +++++++--- packages/utils/test/instrument/fetch.test.ts | 10 +++--- 8 files changed, 96 insertions(+), 45 deletions(-) delete mode 100644 packages/types/src/webfetchapi.ts create mode 100644 packages/types/src/whatwg/fetch.ts create mode 100644 packages/types/src/whatwg/index.ts create mode 100644 packages/types/src/whatwg/stream.ts diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index b100c1e9c26a..bda959230639 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -123,10 +123,7 @@ export type { StackFrame } from './stackframe'; export type { Stacktrace, StackParser, StackLineParser, StackLineParserFn } from './stacktrace'; export type { PropagationContext, TracePropagationTargets, SerializedTraceData } from './tracing'; export type { StartSpanOptions } from './startSpanOptions'; -export type { - TraceparentData, - TransactionSource, -} from './transaction'; +export type { TraceparentData, TransactionSource } from './transaction'; export type { CustomSamplingContext, SamplingContext } from './samplingcontext'; export type { DurationUnit, @@ -146,7 +143,14 @@ export type { TransportRequestExecutor, } from './transport'; export type { User } from './user'; -export type { WebFetchHeaders, WebFetchRequest } from './webfetchapi'; +export type { + WebFetchHeaders, + WebFetchRequest, + WebFetchResponse, + WebReadableStream, + WebReadableStreamDefaultReader, + WebReadableStreamReadResult, +} from './whatwg'; export type { WrappedFunction } from './wrappedfunction'; export type { HandlerDataFetch, @@ -163,13 +167,7 @@ export type { export type { BrowserClientReplayOptions, BrowserClientProfilingOptions } from './browseroptions'; export type { CheckIn, MonitorConfig, FinishedCheckIn, InProgressCheckIn, SerializedCheckIn } from './checkin'; -export type { - MetricsAggregator, - MetricBucketItem, - MetricInstance, - MetricData, - Metrics, -} from './metrics'; +export type { MetricsAggregator, MetricBucketItem, MetricInstance, MetricData, Metrics } from './metrics'; export type { ParameterizedString } from './parameterize'; export type { ContinuousProfiler, ProfilingIntegration, Profiler } from './profiling'; export type { ViewHierarchyData, ViewHierarchyWindow } from './view-hierarchy'; diff --git a/packages/types/src/instrument.ts b/packages/types/src/instrument.ts index f0b239e86b14..c41a385b3e45 100644 --- a/packages/types/src/instrument.ts +++ b/packages/types/src/instrument.ts @@ -1,7 +1,7 @@ // This should be: null | Blob | BufferSource | FormData | URLSearchParams | string // But since not all of those are available in node, we just export `unknown` here for now -import type { WebFetchHeaders } from './webfetchapi'; +import type { WebFetchResponse } from './whatwg'; // Make sure to cast it where needed! type XHRSendInput = unknown; @@ -48,13 +48,7 @@ export interface HandlerDataFetch { fetchData: SentryFetchData; // This data is among other things dumped directly onto the fetch breadcrumb data startTimestamp: number; endTimestamp?: number; - // This is actually `Response` - Note: this type is not complete. Add to it if necessary. - response?: { - readonly ok: boolean; - readonly status: number; - readonly url: string; - headers: WebFetchHeaders; - }; + response?: WebFetchResponse; error?: unknown; } diff --git a/packages/types/src/webfetchapi.ts b/packages/types/src/webfetchapi.ts deleted file mode 100644 index 78b7d464ea71..000000000000 --- a/packages/types/src/webfetchapi.ts +++ /dev/null @@ -1,17 +0,0 @@ -// These are vendored types for the standard web fetch API types because typescript needs the DOM types to be able to understand the `Request`, `Headers`, ... types and not everybody has those. - -export interface WebFetchHeaders { - append(name: string, value: string): void; - delete(name: string): void; - get(name: string): string | null; - has(name: string): boolean; - set(name: string, value: string): void; - forEach(callbackfn: (value: string, key: string, parent: WebFetchHeaders) => void): void; -} - -export interface WebFetchRequest { - readonly headers: WebFetchHeaders; - readonly method: string; - readonly url: string; - clone(): WebFetchRequest; -} diff --git a/packages/types/src/whatwg/fetch.ts b/packages/types/src/whatwg/fetch.ts new file mode 100644 index 000000000000..437bc8936f29 --- /dev/null +++ b/packages/types/src/whatwg/fetch.ts @@ -0,0 +1,38 @@ +// These are vendored types for the standard web fetch API types because typescript needs the DOM types to be able to understand the `Request`, `Headers`, ... types and not everybody has those. + +import type { WebReadableStream } from './stream'; + +export interface WebFetchHeaders { + append(name: string, value: string): void; + delete(name: string): void; + get(name: string): string | null; + has(name: string): boolean; + set(name: string, value: string): void; + forEach(callbackfn: (value: string, key: string, parent: WebFetchHeaders) => void): void; +} + +export interface WebFetchRequest { + readonly headers: WebFetchHeaders; + readonly method: string; + readonly url: string; + clone(): WebFetchRequest; +} + +export interface WebFetchResponse { + readonly ok: boolean; + readonly status: number; + readonly statusText: string; + readonly headers: WebFetchHeaders; + readonly url: string; + readonly redirected: boolean; + readonly body: WebReadableStream | null; + + clone(): WebFetchResponse; + + // Methods to consume the response body + json(): Promise; // Parses response as JSON + text(): Promise; // Reads response body as text + arrayBuffer(): Promise; // Reads response body as ArrayBuffer + blob(): Promise; // Reads response body as Blob + formData(): Promise; // Reads response body as FormData +} diff --git a/packages/types/src/whatwg/index.ts b/packages/types/src/whatwg/index.ts new file mode 100644 index 000000000000..6b7be9732ccc --- /dev/null +++ b/packages/types/src/whatwg/index.ts @@ -0,0 +1,3 @@ +export type { WebReadableStream, WebReadableStreamDefaultReader, WebReadableStreamReadResult } from './stream'; + +export type { WebFetchHeaders, WebFetchRequest, WebFetchResponse } from './fetch'; diff --git a/packages/types/src/whatwg/stream.ts b/packages/types/src/whatwg/stream.ts new file mode 100644 index 000000000000..d8814f52e0ea --- /dev/null +++ b/packages/types/src/whatwg/stream.ts @@ -0,0 +1,23 @@ +export interface WebReadableStream { + locked: boolean; // Indicates if the stream is currently locked + + cancel(reason?: any): Promise; // Cancels the stream with an optional reason + getReader(): WebReadableStreamDefaultReader; // Returns a reader for the stream +} + +export interface WebReadableStreamDefaultReader { + closed: boolean; + // Closes the stream and resolves the reader's lock + cancel(reason?: any): Promise; + + // Returns a promise with the next chunk in the stream + read(): Promise>; + + // Releases the reader's lock on the stream + releaseLock(): void; +} + +export interface WebReadableStreamReadResult { + done: boolean; // True if the reader is done with the stream + value?: R; // The data chunk read from the stream (if not done) +} diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index dd5b02bd52b6..3b2dba524abd 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import type { HandlerDataFetch } from '@sentry/types'; +import type { HandlerDataFetch, WebFetchResponse, WebReadableStreamDefaultReader } from '@sentry/types'; import { isError } from '../is'; import { addNonEnumerableProperty, fill } from '../object'; @@ -115,7 +115,7 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat }); } -async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { +async function resloveReader(reader: WebReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { let running = true; while (running) { try { @@ -143,8 +143,14 @@ async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedReso * @param {Response} res - The `Response` object whose body stream will be resolved. * @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`. * @param {() => void} onFinishedResolving - A callback function to be invoked when the body stream of `res` is fully resolved. + * + * Export For Test Only */ -export function resolveResponse(res: Response, parentRes: Response, onFinishedResolving: () => void): void { +export function resolveResponse( + res: WebFetchResponse, + parentRes: WebFetchResponse, + onFinishedResolving: () => void, +): void { if (!res.body || !parentRes.body) { if (res.body) { res.body.cancel().catch(_ => { @@ -157,6 +163,10 @@ export function resolveResponse(res: Response, parentRes: Response, onFinishedRe const body = res.body; const parentBody = parentRes.body; + // According to the WHATWG Streams API specification, when a stream is locked by calling `getReader()`, + // invoking `stream.cancel()` will result in a TypeError. + // To cancel while the stream is locked, must use `reader.cancel()` + // @seealso: https://streams.spec.whatwg.org const responseReader = body.getReader(); const originalCancel = parentBody.cancel.bind(parentBody) as (reason?: any) => Promise; @@ -212,7 +222,7 @@ function streamHandler(response: Response): void { return; } - resolveResponse(clonedResponseForResolving, response, () => { + resolveResponse(clonedResponseForResolving as WebFetchResponse, response as WebFetchResponse, () => { triggerHandlers('fetch-body-resolved', { endTimestamp: timestampInSeconds() * 1000, response, diff --git a/packages/utils/test/instrument/fetch.test.ts b/packages/utils/test/instrument/fetch.test.ts index 153b180911cf..e85a490c540f 100644 --- a/packages/utils/test/instrument/fetch.test.ts +++ b/packages/utils/test/instrument/fetch.test.ts @@ -1,3 +1,5 @@ +import type { WebFetchResponse } from '@sentry/types'; + import { parseFetchArgs, resolveResponse } from '../../src/instrument/fetch'; async function delay(ms: number) { @@ -38,8 +40,8 @@ describe('instrument > parseFetchArgs', () => { describe('instrument > fetch > resolveResponse', () => { let mockReader: jest.Mocked>; - let mockResponse: jest.Mocked; - let mockParentResponse: jest.Mocked; + let mockResponse: jest.Mocked; + let mockParentResponse: jest.Mocked; let mockParentReader: jest.Mocked>; let onFinishedResolving: jest.Mock; @@ -58,7 +60,7 @@ describe('instrument > fetch > resolveResponse', () => { getReader: jest.fn(() => mockReader), cancel: jest.fn(), } as unknown as ReadableStream, - } as jest.Mocked; + } as jest.Mocked; mockParentReader = { read: jest.fn(), @@ -71,7 +73,7 @@ describe('instrument > fetch > resolveResponse', () => { cancel: jest.fn().mockResolvedValue(undefined), getReader: jest.fn(() => mockParentReader), } as unknown as ReadableStream, - } as jest.Mocked; + } as jest.Mocked; onFinishedResolving = jest.fn(); }); From 7fd0ffe8db9b5ea5dd478c0bda23d25ffee48e55 Mon Sep 17 00:00:00 2001 From: neil Date: Sat, 19 Oct 2024 08:52:13 +0800 Subject: [PATCH 08/11] fix type error for tests --- packages/utils/test/instrument/fetch.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/utils/test/instrument/fetch.test.ts b/packages/utils/test/instrument/fetch.test.ts index e85a490c540f..d842dea4f7bb 100644 --- a/packages/utils/test/instrument/fetch.test.ts +++ b/packages/utils/test/instrument/fetch.test.ts @@ -1,4 +1,4 @@ -import type { WebFetchResponse } from '@sentry/types'; +import type { WebFetchResponse, WebReadableStream } from '@sentry/types'; import { parseFetchArgs, resolveResponse } from '../../src/instrument/fetch'; @@ -59,7 +59,7 @@ describe('instrument > fetch > resolveResponse', () => { body: { getReader: jest.fn(() => mockReader), cancel: jest.fn(), - } as unknown as ReadableStream, + } as unknown as WebReadableStream, } as jest.Mocked; mockParentReader = { @@ -72,7 +72,7 @@ describe('instrument > fetch > resolveResponse', () => { body: { cancel: jest.fn().mockResolvedValue(undefined), getReader: jest.fn(() => mockParentReader), - } as unknown as ReadableStream, + } as unknown as WebReadableStream, } as jest.Mocked; onFinishedResolving = jest.fn(); From 5c6ce1d260567e2ceb559a0207888ca74edcd3cd Mon Sep 17 00:00:00 2001 From: Neil Lei Date: Tue, 17 Dec 2024 15:04:22 +0800 Subject: [PATCH 09/11] ref: fix typo --- packages/core/src/utils-hoist/instrument/fetch.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/src/utils-hoist/instrument/fetch.ts b/packages/core/src/utils-hoist/instrument/fetch.ts index 2a662f8d8236..8ce445ba59e9 100644 --- a/packages/core/src/utils-hoist/instrument/fetch.ts +++ b/packages/core/src/utils-hoist/instrument/fetch.ts @@ -118,7 +118,7 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat }); } -async function resloveReader(reader: WebReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { +async function resolveReader(reader: WebReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { let running = true; while (running) { try { @@ -141,7 +141,7 @@ async function resloveReader(reader: WebReadableStreamDefaultReader, onFinishedR * * This function attaches a custom `cancel` behavior to both the parent `Response` body and its `getReader()` method. * When the parent stream or its reader is canceled, it triggers the cancellation of the child stream as well. - * The function also monitors the resolution of the child's body stream using `resloveReader` and performs cleanup. + * The function also monitors the resolution of the child's body stream using `resolveReader` and performs cleanup. * * @param {Response} res - The `Response` object whose body stream will be resolved. * @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`. @@ -204,7 +204,7 @@ export function resolveResponse( return reader; }) as any; - resloveReader(responseReader, onFinishedResolving).finally(() => { + resolveReader(responseReader, onFinishedResolving).finally(() => { try { responseReader.releaseLock(); body.cancel().catch(() => { From 2ed739d98883c2a4fafbfa4334dfb1bdedcd50ef Mon Sep 17 00:00:00 2001 From: Neil Lei Date: Tue, 17 Dec 2024 15:13:58 +0800 Subject: [PATCH 10/11] ref: prettify file format --- .vscode/settings.json | 2 +- packages/core/src/types-hoist/index.ts | 23 ++++++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 0f2399922cfc..7b04615099a9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -27,6 +27,6 @@ "deno.enablePaths": ["packages/deno/test"], "editor.defaultFormatter": "biomejs.biome", "[typescript]": { - "editor.defaultFormatter": "biomejs.biome" + "editor.defaultFormatter": "vscode.typescript-language-features" } } diff --git a/packages/core/src/types-hoist/index.ts b/packages/core/src/types-hoist/index.ts index dff7152bd4f5..5472960d9941 100644 --- a/packages/core/src/types-hoist/index.ts +++ b/packages/core/src/types-hoist/index.ts @@ -134,10 +134,7 @@ export type { StackFrame } from './stackframe'; export type { Stacktrace, StackParser, StackLineParser, StackLineParserFn } from './stacktrace'; export type { PropagationContext, TracePropagationTargets, SerializedTraceData } from './tracing'; export type { StartSpanOptions } from './startSpanOptions'; -export type { - TraceparentData, - TransactionSource, -} from './transaction'; +export type { TraceparentData, TransactionSource } from './transaction'; export type { CustomSamplingContext, SamplingContext } from './samplingcontext'; export type { DurationUnit, @@ -157,8 +154,14 @@ export type { TransportRequestExecutor, } from './transport'; export type { User } from './user'; -export type { WebFetchHeaders, WebFetchRequest, WebFetchResponse, WebReadableStream, - WebReadableStreamDefaultReader, WebReadableStreamReadResult } from './whatwg'; +export type { + WebFetchHeaders, + WebFetchRequest, + WebFetchResponse, + WebReadableStream, + WebReadableStreamDefaultReader, + WebReadableStreamReadResult, +} from './whatwg'; export type { WrappedFunction } from './wrappedfunction'; export type { HandlerDataFetch, @@ -175,13 +178,7 @@ export type { export type { BrowserClientReplayOptions, BrowserClientProfilingOptions } from './browseroptions'; export type { CheckIn, MonitorConfig, FinishedCheckIn, InProgressCheckIn, SerializedCheckIn } from './checkin'; -export type { - MetricsAggregator, - MetricBucketItem, - MetricInstance, - MetricData, - Metrics, -} from './metrics'; +export type { MetricsAggregator, MetricBucketItem, MetricInstance, MetricData, Metrics } from './metrics'; export type { ParameterizedString } from './parameterize'; export type { ContinuousProfiler, ProfilingIntegration, Profiler } from './profiling'; export type { ViewHierarchyData, ViewHierarchyWindow } from './view-hierarchy'; From 02c244831e313cbdcf5a2698a92fc43026c6a8fb Mon Sep 17 00:00:00 2001 From: Neil Lei Date: Tue, 17 Dec 2024 15:23:39 +0800 Subject: [PATCH 11/11] ref: fix file format --- packages/core/src/utils-hoist/instrument/fetch.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/core/src/utils-hoist/instrument/fetch.ts b/packages/core/src/utils-hoist/instrument/fetch.ts index 8ce445ba59e9..85390e6fb8bf 100644 --- a/packages/core/src/utils-hoist/instrument/fetch.ts +++ b/packages/core/src/utils-hoist/instrument/fetch.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { HandlerDataFetch, WebFetchResponse, WebReadableStreamDefaultReader } from '../../types-hoist'; - import { isError } from '../is'; import { addNonEnumerableProperty, fill } from '../object'; import { supportsNativeFetch } from '../supports';