diff --git a/packages/kernel/src/Supervisor.test.ts b/packages/kernel/src/Supervisor.test.ts index c311ec827..cfa34482d 100644 --- a/packages/kernel/src/Supervisor.test.ts +++ b/packages/kernel/src/Supervisor.test.ts @@ -35,7 +35,7 @@ describe('Supervisor', () => { expect(consoleErrorSpy).toHaveBeenCalledWith( `Unexpected read error from Supervisor "${supervisor.id}"`, new Error( - 'Message cannot be processed by stream (must be JSON-serializable):\nnull', + 'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull', ), ); }); diff --git a/packages/kernel/src/Vat.test.ts b/packages/kernel/src/Vat.test.ts index d2d73768d..d7d31cab1 100644 --- a/packages/kernel/src/Vat.test.ts +++ b/packages/kernel/src/Vat.test.ts @@ -69,7 +69,7 @@ describe('Vat', () => { expect(consoleErrorSpy).toHaveBeenCalledWith( 'Unexpected read error', new Error( - 'Message cannot be processed by stream (must be JSON-serializable):\nnull', + 'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull', ), ); }); diff --git a/packages/streams/src/BaseStream.test.ts b/packages/streams/src/BaseStream.test.ts index dbf1e16b5..6a7705a10 100644 --- a/packages/streams/src/BaseStream.test.ts +++ b/packages/streams/src/BaseStream.test.ts @@ -25,7 +25,7 @@ describe('BaseReader', () => { it('throws if getReceiveInput is called more than once', () => { const reader = new TestReader(); expect(() => reader.getReceiveInput()).toThrow( - 'receiveInput has already been accessed', + 'TestReader received multiple calls to getReceiveInput()', ); }); @@ -98,7 +98,7 @@ describe('BaseReader', () => { reader.receiveInput(badMessage); await expect(reader.next()).rejects.toThrow( - 'Message cannot be processed by stream (must be JSON-serializable)', + 'TestReader: Message cannot be processed (must be JSON-serializable)', ); }); @@ -110,7 +110,7 @@ describe('BaseReader', () => { reader.receiveInput(badMessage); await expect(nextP).rejects.toThrow( - 'Message cannot be processed by stream (must be JSON-serializable)', + 'TestReader: Message cannot be processed (must be JSON-serializable)', ); }); @@ -139,7 +139,7 @@ describe('BaseReader', () => { reader.receiveInput({}); await expect(reader.next()).rejects.toThrow( - 'Message failed type validation', + 'TestReader: Message failed type validation', ); }); @@ -151,7 +151,9 @@ describe('BaseReader', () => { const nextP = reader.next(); reader.receiveInput({}); - await expect(nextP).rejects.toThrow('Message failed type validation'); + await expect(nextP).rejects.toThrow( + 'TestReader: Message failed type validation', + ); }); it('ends after receiving done signal, before read is enqueued', async () => { @@ -277,14 +279,17 @@ describe('BaseReader', () => { describe('BaseWriter', () => { describe('initialization', () => { it('constructs a BaseWriter', () => { - const writer = new TestWriter(() => undefined); + const writer = new TestWriter({ onDispatch: () => undefined }); expect(writer).toBeInstanceOf(BaseWriter); expect(writer[Symbol.asyncIterator]()).toBe(writer); }); it('calls onEnd once when ending', async () => { const onEnd = vi.fn(); - const writer = new TestWriter(() => undefined, onEnd); + const writer = new TestWriter({ + onDispatch: () => undefined, + onEnd, + }); expect(onEnd).not.toHaveBeenCalled(); await writer.return(); @@ -297,7 +302,7 @@ describe('BaseWriter', () => { describe('next and sending messages', () => { it('dispatches messages', async () => { const dispatchSpy = vi.fn(); - const writer = new TestWriter(dispatchSpy); + const writer = new TestWriter({ onDispatch: dispatchSpy }); const message = 42; const nextP = writer.next(message); @@ -310,7 +315,7 @@ describe('BaseWriter', () => { const dispatchSpy = vi.fn().mockImplementationOnce(() => { throw new Error('foo'); }); - const writer = new TestWriter(dispatchSpy); + const writer = new TestWriter({ onDispatch: dispatchSpy }); await expect(writer.next(42)).rejects.toThrow( makeErrorMatcher( @@ -336,7 +341,7 @@ describe('BaseWriter', () => { .mockImplementationOnce(() => { throw new Error('foo'); }); - const writer = new TestWriter(dispatchSpy); + const writer = new TestWriter({ onDispatch: dispatchSpy }); await expect(writer.next(42)).rejects.toThrow( 'TestWriter experienced repeated dispatch failures.', @@ -358,14 +363,14 @@ describe('BaseWriter', () => { describe('return', () => { it('ends the stream', async () => { - const writer = new TestWriter(() => undefined); + const writer = new TestWriter({ onDispatch: () => undefined }); expect(await writer.return()).toStrictEqual(makeDoneResult()); expect(await writer.next(42)).toStrictEqual(makeDoneResult()); }); it('is idempotent', async () => { - const writer = new TestWriter(() => undefined); + const writer = new TestWriter({ onDispatch: () => undefined }); expect(await writer.return()).toStrictEqual(makeDoneResult()); expect(await writer.return()).toStrictEqual(makeDoneResult()); @@ -374,28 +379,28 @@ describe('BaseWriter', () => { describe('throw', () => { it('ends the stream', async () => { - const writer = new TestWriter(() => undefined); + const writer = new TestWriter({ onDispatch: () => undefined }); expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult()); expect(await writer.next(42)).toStrictEqual(makeDoneResult()); }); it('is idempotent', async () => { - const writer = new TestWriter(() => undefined); + const writer = new TestWriter({ onDispatch: () => undefined }); expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult()); expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult()); }); it('breaks out of failed onDispatch with failed onEnd', async () => { - const writer = new TestWriter( - () => { + const writer = new TestWriter({ + onDispatch: () => { throw new Error('onDispatchError'); }, - () => { + onEnd: () => { throw new Error('onEndError'); }, - ); + }); await expect( async () => await writer.throw(new Error('thrownError')), diff --git a/packages/streams/src/BaseStream.ts b/packages/streams/src/BaseStream.ts index 9de32e278..3b2b4156a 100644 --- a/packages/streams/src/BaseStream.ts +++ b/packages/streams/src/BaseStream.ts @@ -106,8 +106,9 @@ export type ValidateInput = (input: Json) => input is Read; export type ReceiveInput = (input: unknown) => void; export type BaseReaderArgs = { - validateInput?: ValidateInput | undefined; + name?: string | undefined; onEnd?: OnEnd | undefined; + validateInput?: ValidateInput | undefined; }; /** @@ -127,6 +128,8 @@ export class BaseReader implements Reader { */ readonly #buffer = makeStreamBuffer>(); + readonly #name: string; + readonly #validateInput?: ValidateInput | undefined; #onEnd?: OnEnd | undefined; @@ -137,13 +140,15 @@ export class BaseReader implements Reader { * Constructs a {@link BaseReader}. * * @param args - Options bag. - * @param args.validateInput - A function that validates input from the transport. - * @param args.onEnd - A function that is called when the stream ends. For any cleanup that + * @param args.name - The name of the stream, for logging purposes. Defaults to the class name. * should happen when the stream ends, such as closing a message port. + * @param args.onEnd - A function that is called when the stream ends. For any cleanup that + * @param args.validateInput - A function that validates input from the transport. */ - constructor({ validateInput, onEnd }: BaseReaderArgs) { - this.#validateInput = validateInput; + constructor({ name, onEnd, validateInput }: BaseReaderArgs) { + this.#name = name ?? this.constructor.name; this.#onEnd = onEnd; + this.#validateInput = validateInput; harden(this); } @@ -155,7 +160,9 @@ export class BaseReader implements Reader { */ protected getReceiveInput(): ReceiveInput { if (this.#didExposeReceiveInput) { - throw new Error('receiveInput has already been accessed'); + throw new Error( + `${this.#name} received multiple calls to getReceiveInput()`, + ); } this.#didExposeReceiveInput = true; return this.#receiveInput.bind(this); @@ -165,7 +172,7 @@ export class BaseReader implements Reader { if (!isDispatchable(input)) { await this.#handleInputError( new Error( - `Message cannot be processed by stream (must be JSON-serializable):\n${stringify(input)}`, + `${this.#name}: Message cannot be processed (must be JSON-serializable):\n${stringify(input)}`, ), ); return; @@ -184,7 +191,9 @@ export class BaseReader implements Reader { if (this.#validateInput?.(unmarshaled) === false) { await this.#handleInputError( - new Error(`Message failed type validation:\n${stringify(unmarshaled)}`), + new Error( + `${this.#name}: Message failed type validation:\n${stringify(unmarshaled)}`, + ), ); return; } @@ -252,13 +261,19 @@ export type Dispatch = ( value: Dispatchable, ) => void | Promise; +export type BaseWriterArgs = { + onDispatch: Dispatch; + name?: string | undefined; + onEnd?: OnEnd | undefined; +}; + /** * The base of a writable async iterator stream. */ export class BaseWriter implements Writer { #isDone: boolean = false; - readonly #logName: string = 'BaseWriter'; + readonly #name: string = 'BaseWriter'; readonly #onDispatch: Dispatch; @@ -267,17 +282,14 @@ export class BaseWriter implements Writer { /** * Constructs a {@link BaseWriter}. * - * @param logName - The name of the stream, for logging purposes. - * @param onDispatch - A function that dispatches messages over the underlying transport mechanism. - * @param onEnd - A function that is called when the stream ends. For any cleanup that + * @param args - Options bag. + * @param args.onDispatch - A function that dispatches messages over the underlying transport mechanism. + * @param args.onEnd - A function that is called when the stream ends. For any cleanup that + * @param args.name - The name of the stream, for logging purposes. Defaults to the class name. * should happen when the stream ends, such as closing a message port. */ - constructor( - logName: string, - onDispatch: Dispatch, - onEnd?: () => void, - ) { - this.#logName = logName; + constructor({ name, onDispatch, onEnd }: BaseWriterArgs) { + this.#name = name ?? this.constructor.name; this.#onDispatch = onDispatch; this.#onEnd = onEnd; harden(this); @@ -310,7 +322,7 @@ export class BaseWriter implements Writer { // Break out of repeated failure to dispatch an error. It is unclear how this would occur // in practice, but it's the kind of failure mode where it's better to be sure. const repeatedFailureError = new Error( - `${this.#logName} experienced repeated dispatch failures.`, + `${this.#name} experienced repeated dispatch failures.`, { cause: error }, ); await this.#onDispatch(makeStreamErrorSignal(repeatedFailureError)); @@ -321,7 +333,7 @@ export class BaseWriter implements Writer { error instanceof Error ? error : new Error(String(error)), true, ); - throw new Error(`${this.#logName} experienced a dispatch failure`, { + throw new Error(`${this.#name} experienced a dispatch failure`, { cause: error, }); } diff --git a/packages/streams/src/ChromeRuntimeStream.test.ts b/packages/streams/src/ChromeRuntimeStream.test.ts index d208f77c8..d4245a97f 100644 --- a/packages/streams/src/ChromeRuntimeStream.test.ts +++ b/packages/streams/src/ChromeRuntimeStream.test.ts @@ -259,7 +259,7 @@ describe.concurrent('ChromeRuntimeWriter', () => { asChromeRuntime(runtime), ChromeRuntimeStreamTarget.Background, ChromeRuntimeStreamTarget.Offscreen, - onEnd, + { onEnd }, ); expect(await writer.return()).toStrictEqual(makeDoneResult()); @@ -324,7 +324,7 @@ describe.concurrent('ChromeRuntimeDuplexStream', () => { }); await expect(duplexStream.write(42)).rejects.toThrow( - 'ChromeRuntimeWriter experienced a dispatch failure', + 'ChromeRuntimeDuplexStream experienced a dispatch failure', ); expect(await duplexStream.next()).toStrictEqual(makeDoneResult()); }); diff --git a/packages/streams/src/ChromeRuntimeStream.ts b/packages/streams/src/ChromeRuntimeStream.ts index ee1a7483d..f325b3a8d 100644 --- a/packages/streams/src/ChromeRuntimeStream.ts +++ b/packages/streams/src/ChromeRuntimeStream.ts @@ -21,7 +21,7 @@ import { BaseDuplexStream } from './BaseDuplexStream.js'; import type { BaseReaderArgs, ValidateInput, - OnEnd, + BaseWriterArgs, ReceiveInput, } from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; @@ -142,11 +142,11 @@ export class ChromeRuntimeWriter extends BaseWriter { runtime: ChromeRuntime, target: ChromeRuntimeStreamTarget, source: ChromeRuntimeStreamTarget, - onEnd?: OnEnd, + { name, onEnd }: Omit, 'onDispatch'> = {}, ) { - super( - 'ChromeRuntimeWriter', - async (value: Dispatchable) => { + super({ + name, + onDispatch: async (value: Dispatchable) => { await runtime.sendMessage({ target, source, @@ -154,7 +154,7 @@ export class ChromeRuntimeWriter extends BaseWriter { }); }, onEnd, - ); + }); harden(this); } } @@ -192,6 +192,7 @@ export class ChromeRuntimeDuplexStream< localTarget, remoteTarget, { + name: 'ChromeRuntimeDuplexStream', validateInput, onEnd: async () => { await writer.return(); @@ -202,8 +203,11 @@ export class ChromeRuntimeDuplexStream< runtime, remoteTarget, localTarget, - async () => { - await reader.return(); + { + name: 'ChromeRuntimeDuplexStream', + onEnd: async () => { + await reader.return(); + }, }, ); super(reader, writer); diff --git a/packages/streams/src/MessagePortStream.test.ts b/packages/streams/src/MessagePortStream.test.ts index ff6191cf8..23275d311 100644 --- a/packages/streams/src/MessagePortStream.test.ts +++ b/packages/streams/src/MessagePortStream.test.ts @@ -131,7 +131,7 @@ describe('MessagePortWriter', () => { it('calls onEnd once when ending', async () => { const { port1 } = new MessageChannel(); const onEnd = vi.fn(); - const writer = new MessagePortWriter(port1, onEnd); + const writer = new MessagePortWriter(port1, { onEnd }); expect(await writer.return()).toStrictEqual(makeDoneResult()); expect(onEnd).toHaveBeenCalledTimes(1); @@ -185,7 +185,7 @@ describe('MessagePortDuplexStream', () => { const duplexStream = await makeDuplexStream({ port1, port2 }); await expect(duplexStream.write(42)).rejects.toThrow( - 'MessagePortWriter experienced a dispatch failure', + 'MessagePortDuplexStream experienced a dispatch failure', ); expect(await duplexStream.next()).toStrictEqual(makeDoneResult()); }); diff --git a/packages/streams/src/MessagePortStream.ts b/packages/streams/src/MessagePortStream.ts index e27238628..27a398f3e 100644 --- a/packages/streams/src/MessagePortStream.ts +++ b/packages/streams/src/MessagePortStream.ts @@ -22,7 +22,11 @@ import type { Json } from '@metamask/utils'; import { BaseDuplexStream } from './BaseDuplexStream.js'; -import type { BaseReaderArgs, ValidateInput, OnEnd } from './BaseStream.js'; +import type { + BaseReaderArgs, + BaseWriterArgs, + ValidateInput, +} from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; import type { Dispatchable, OnMessage } from './utils.js'; @@ -79,15 +83,18 @@ harden(MessagePortReader); * - The module-level documentation for more details. */ export class MessagePortWriter extends BaseWriter { - constructor(port: MessagePort, onEnd?: OnEnd) { - super( - 'MessagePortWriter', - (value: Dispatchable) => port.postMessage(value), - async () => { + constructor( + port: MessagePort, + { name, onEnd }: Omit, 'onDispatch'> = {}, + ) { + super({ + name, + onDispatch: (value: Dispatchable) => port.postMessage(value), + onEnd: async () => { port.close(); await onEnd?.(); }, - ); + }); port.start(); harden(this); } @@ -108,13 +115,17 @@ export class MessagePortDuplexStream< private constructor(port: MessagePort, validateInput?: ValidateInput) { let writer: MessagePortWriter; // eslint-disable-line prefer-const const reader = new MessagePortReader(port, { + name: 'MessagePortDuplexStream', validateInput, onEnd: async () => { await writer.return(); }, }); - writer = new MessagePortWriter(port, async () => { - await reader.return(); + writer = new MessagePortWriter(port, { + name: 'MessagePortDuplexStream', + onEnd: async () => { + await reader.return(); + }, }); super(reader, writer); } diff --git a/packages/streams/src/PostMessageStream.test.ts b/packages/streams/src/PostMessageStream.test.ts index 54e9e2b30..7282bee75 100644 --- a/packages/streams/src/PostMessageStream.test.ts +++ b/packages/streams/src/PostMessageStream.test.ts @@ -130,7 +130,7 @@ describe('PostMessageWriter', () => { it('calls onEnd once when ending', async () => { const { postMessageFn } = makePostMessageMock(); const onEnd = vi.fn(); - const writer = new PostMessageWriter(postMessageFn, onEnd); + const writer = new PostMessageWriter(postMessageFn, { onEnd }); expect(await writer.return()).toStrictEqual(makeDoneResult()); expect(onEnd).toHaveBeenCalledTimes(1); @@ -196,7 +196,7 @@ describe('PostMessageDuplexStream', () => { ); await expect(duplexStream.write(42)).rejects.toThrow( - 'PostMessageWriter experienced a dispatch failure', + 'PostMessageDuplexStream experienced a dispatch failure', ); expect(await duplexStream.next()).toStrictEqual(makeDoneResult()); }); diff --git a/packages/streams/src/PostMessageStream.ts b/packages/streams/src/PostMessageStream.ts index e70772049..d5e554071 100644 --- a/packages/streams/src/PostMessageStream.ts +++ b/packages/streams/src/PostMessageStream.ts @@ -9,10 +9,12 @@ import type { Json } from '@metamask/utils'; import { BaseDuplexStream } from './BaseDuplexStream.js'; -import type { BaseReaderArgs, ValidateInput, OnEnd } from './BaseStream.js'; +import type { + BaseReaderArgs, + BaseWriterArgs, + ValidateInput, +} from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; -// Used in docstring. -// eslint-disable-next-line @typescript-eslint/no-unused-vars import type { Dispatchable, OnMessage, PostMessage } from './utils.js'; type SetListener = (onMessage: OnMessage) => void; @@ -64,8 +66,17 @@ harden(PostMessageReader); * @see {@link PostMessageReader} for the corresponding readable stream. */ export class PostMessageWriter extends BaseWriter { - constructor(postMessageFn: PostMessage, onEnd?: OnEnd) { - super('PostMessageWriter', postMessageFn, onEnd); + constructor( + postMessageFn: PostMessage, + { name, onEnd }: Omit, 'onDispatch'> = {}, + ) { + super({ + name, + onDispatch: (value: Dispatchable) => postMessageFn(value), + onEnd: async () => { + await onEnd?.(); + }, + }); harden(this); } } @@ -96,13 +107,17 @@ export class PostMessageDuplexStream< ) { let writer: PostMessageWriter; // eslint-disable-line prefer-const const reader = new PostMessageReader(setListener, removeListener, { + name: 'PostMessageDuplexStream', validateInput, onEnd: async () => { await writer.return(); }, }); - writer = new PostMessageWriter(postMessageFn, async () => { - await reader.return(); + writer = new PostMessageWriter(postMessageFn, { + name: 'PostMessageDuplexStream', + onEnd: async () => { + await reader.return(); + }, }); super(reader, writer); } diff --git a/packages/streams/test/stream-mocks.ts b/packages/streams/test/stream-mocks.ts index e917d7318..76cef0917 100644 --- a/packages/streams/test/stream-mocks.ts +++ b/packages/streams/test/stream-mocks.ts @@ -6,6 +6,7 @@ import type { ReceiveInput, BaseReaderArgs, ValidateInput, + BaseWriterArgs, } from '../src/BaseStream.js'; import { BaseReader, BaseWriter } from '../src/BaseStream.js'; @@ -33,9 +34,9 @@ export class TestWriter extends BaseWriter { return this.#onDispatch; } - constructor(onDispatch: Dispatch, onEnd?: () => void) { - super('TestWriter', onDispatch, onEnd); - this.#onDispatch = onDispatch; + constructor(args: BaseWriterArgs) { + super(args); + this.#onDispatch = args.onDispatch; } } @@ -69,8 +70,19 @@ export class TestDuplexStream< writerOnEnd, }: TestDuplexStreamOptions = {}, ) { - const reader = new TestReader({ validateInput, onEnd: readerOnEnd }); - super(reader, new TestWriter(onDispatch, writerOnEnd)); + const reader = new TestReader({ + name: 'TestDuplexStream', + onEnd: readerOnEnd, + validateInput, + }); + super( + reader, + new TestWriter({ + name: 'TestDuplexStream', + onDispatch, + onEnd: writerOnEnd, + }), + ); this.#onDispatch = onDispatch; this.#receiveInput = reader.receiveInput; }