diff --git a/packages/kernel/src/Supervisor.test.ts b/packages/kernel/src/Supervisor.test.ts index 3823831c1..c311ec827 100644 --- a/packages/kernel/src/Supervisor.test.ts +++ b/packages/kernel/src/Supervisor.test.ts @@ -34,7 +34,9 @@ describe('Supervisor', () => { await delay(10); expect(consoleErrorSpy).toHaveBeenCalledWith( `Unexpected read error from Supervisor "${supervisor.id}"`, - new Error('Received unexpected message from transport:\nnull'), + new Error( + 'Message cannot be processed by stream (must be JSON-serializable):\nnull', + ), ); }); }); diff --git a/packages/kernel/src/Vat.test.ts b/packages/kernel/src/Vat.test.ts index 998a2f882..d2d73768d 100644 --- a/packages/kernel/src/Vat.test.ts +++ b/packages/kernel/src/Vat.test.ts @@ -68,7 +68,9 @@ describe('Vat', () => { await delay(10); expect(consoleErrorSpy).toHaveBeenCalledWith( 'Unexpected read error', - new Error('Received unexpected message from transport:\nnull'), + new Error( + 'Message cannot be processed by stream (must be JSON-serializable):\nnull', + ), ); }); }); diff --git a/packages/streams/src/BaseStream.test.ts b/packages/streams/src/BaseStream.test.ts index 756f06d9c..dbf1e16b5 100644 --- a/packages/streams/src/BaseStream.test.ts +++ b/packages/streams/src/BaseStream.test.ts @@ -2,6 +2,7 @@ import { makeErrorMatcherFactory } from '@ocap/test-utils'; import { describe, expect, it, vi } from 'vitest'; import { BaseReader, BaseWriter } from './BaseStream.js'; +import type { ValidateInput } from './BaseStream.js'; import { makeDoneResult, makePendingResult, @@ -30,7 +31,7 @@ describe('BaseReader', () => { it('calls onEnd once when ending', async () => { const onEnd = vi.fn(); - const reader = new TestReader(onEnd); + const reader = new TestReader({ onEnd }); expect(onEnd).not.toHaveBeenCalled(); await reader.return(); @@ -50,6 +51,19 @@ describe('BaseReader', () => { expect(await reader.next()).toStrictEqual(makePendingResult(message)); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi.fn((_value: unknown) => true); + const reader = new TestReader({ + validateInput: validateInput as unknown as ValidateInput, + }); + + const message = 42; + reader.receiveInput(message); + + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + it('emits message received after next()', async () => { const reader = new TestReader(); @@ -77,26 +91,26 @@ describe('BaseReader', () => { } }); - it('throws after receiving unexpected message, before read is enqueued', async () => { + it('throws after receiving non-Dispatchable input, before read is enqueued', async () => { const reader = new TestReader(); - const unexpectedMessage = Symbol('foo'); - reader.receiveInput(unexpectedMessage); + const badMessage = Symbol('foo'); + reader.receiveInput(badMessage); await expect(reader.next()).rejects.toThrow( - 'Received unexpected message from transport', + 'Message cannot be processed by stream (must be JSON-serializable)', ); }); - it('throws after receiving unexpected message, after read is enqueued', async () => { + it('throws after receiving non-Dispatchable input, after read is enqueued', async () => { const reader = new TestReader(); const nextP = reader.next(); - const unexpectedMessage = Symbol('foo'); - reader.receiveInput(unexpectedMessage); + const badMessage = Symbol('foo'); + reader.receiveInput(badMessage); await expect(nextP).rejects.toThrow( - 'Received unexpected message from transport', + 'Message cannot be processed by stream (must be JSON-serializable)', ); }); @@ -117,6 +131,29 @@ describe('BaseReader', () => { await expect(nextP).rejects.toThrow('foo'); }); + it('throws after receiving invalid input, before read is enqueued', async () => { + const reader = new TestReader({ + validateInput: (value) => typeof value === 'number', + }); + + reader.receiveInput({}); + + await expect(reader.next()).rejects.toThrow( + 'Message failed type validation', + ); + }); + + it('throws after receiving invalid input, after read is enqueued', async () => { + const reader = new TestReader({ + validateInput: (value) => typeof value === 'number', + }); + + const nextP = reader.next(); + reader.receiveInput({}); + + await expect(nextP).rejects.toThrow('Message failed type validation'); + }); + it('ends after receiving done signal, before read is enqueued', async () => { const reader = new TestReader(); diff --git a/packages/streams/src/BaseStream.ts b/packages/streams/src/BaseStream.ts index 6cc2f76f0..9de32e278 100644 --- a/packages/streams/src/BaseStream.ts +++ b/packages/streams/src/BaseStream.ts @@ -94,12 +94,22 @@ harden(makeStreamBuffer); */ export type OnEnd = () => void | Promise; +/** + * A function that validates input to a readable stream. + */ +export type ValidateInput = (input: Json) => input is Read; + /** * A function that receives input from a transport mechanism to a readable stream. * Validates that the input is an {@link IteratorResult}, and throws if it is not. */ export type ReceiveInput = (input: unknown) => void; +export type BaseReaderArgs = { + validateInput?: ValidateInput | undefined; + onEnd?: OnEnd | undefined; +}; + /** * The base of a readable async iterator stream. * @@ -117,6 +127,8 @@ export class BaseReader implements Reader { */ readonly #buffer = makeStreamBuffer>(); + readonly #validateInput?: ValidateInput | undefined; + #onEnd?: OnEnd | undefined; #didExposeReceiveInput: boolean = false; @@ -124,10 +136,13 @@ export class BaseReader implements Reader { /** * Constructs a {@link BaseReader}. * - * @param onEnd - A function that is called when the stream ends. For any cleanup that + * @param args - Options bag. + * @param args.validateInput - A function that validates input from the transport. + * @param args.onEnd - A function that is called when the stream ends. For any cleanup that * should happen when the stream ends, such as closing a message port. */ - constructor(onEnd?: () => void) { + constructor({ validateInput, onEnd }: BaseReaderArgs) { + this.#validateInput = validateInput; this.#onEnd = onEnd; harden(this); } @@ -148,22 +163,17 @@ export class BaseReader implements Reader { readonly #receiveInput: ReceiveInput = async (input) => { if (!isDispatchable(input)) { - const error = new Error( - `Received unexpected message from transport:\n${stringify(input)}`, + await this.#handleInputError( + new Error( + `Message cannot be processed by stream (must be JSON-serializable):\n${stringify(input)}`, + ), ); - if (!this.#buffer.hasPendingReads()) { - this.#buffer.put(error); - } - await this.#end(error); return; } const unmarshaled = unmarshal(input); if (unmarshaled instanceof Error) { - if (!this.#buffer.hasPendingReads()) { - this.#buffer.put(unmarshaled); - } - await this.#end(unmarshaled); + await this.#handleInputError(unmarshaled); return; } @@ -172,9 +182,23 @@ export class BaseReader implements Reader { return; } - this.#buffer.put(makePendingResult(unmarshaled as Read)); + if (this.#validateInput?.(unmarshaled) === false) { + await this.#handleInputError( + new Error(`Message failed type validation:\n${stringify(unmarshaled)}`), + ); + return; + } + + this.#buffer.put(makePendingResult(unmarshaled)); }; + async #handleInputError(error: Error): Promise { + if (!this.#buffer.hasPendingReads()) { + this.#buffer.put(error); + } + await this.#end(error); + } + /** * Ends the stream. Calls and then unsets the `#onEnd` method. * Idempotent. diff --git a/packages/streams/src/ChromeRuntimeStream.test.ts b/packages/streams/src/ChromeRuntimeStream.test.ts index eaa949691..d208f77c8 100644 --- a/packages/streams/src/ChromeRuntimeStream.test.ts +++ b/packages/streams/src/ChromeRuntimeStream.test.ts @@ -3,6 +3,7 @@ import { stringify } from '@ocap/utils'; import { describe, expect, it, vi } from 'vitest'; import { makeAck } from './BaseDuplexStream.js'; +import type { ValidateInput } from './BaseStream.js'; import type { ChromeRuntime } from './chrome.js'; import type { MessageEnvelope } from './ChromeRuntimeStream.js'; import { @@ -94,6 +95,25 @@ describe('ChromeRuntimeReader', () => { expect(await reader.next()).toStrictEqual(makePendingResult(message)); }); + it('calls validateInput with received input if specified', async () => { + const { runtime, dispatchRuntimeMessage } = makeRuntime(); + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const reader = new ChromeRuntimeReader( + asChromeRuntime(runtime), + ChromeRuntimeStreamTarget.Background, + ChromeRuntimeStreamTarget.Offscreen, + { validateInput }, + ); + + const message = { foo: 'bar' }; + dispatchRuntimeMessage(message); + + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + it('ignores messages from other extensions', async () => { const { runtime, dispatchRuntimeMessage } = makeRuntime(); const reader = new ChromeRuntimeReader( @@ -187,7 +207,7 @@ describe('ChromeRuntimeReader', () => { asChromeRuntime(runtime), ChromeRuntimeStreamTarget.Background, ChromeRuntimeStreamTarget.Offscreen, - onEnd, + { onEnd }, ); dispatchRuntimeMessage(makeStreamDoneSignal()); @@ -251,12 +271,13 @@ describe.concurrent('ChromeRuntimeWriter', () => { describe.concurrent('ChromeRuntimeDuplexStream', () => { // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - const makeDuplexStream = async () => { + const makeDuplexStream = async (validateInput?: ValidateInput) => { const { runtime, dispatchRuntimeMessage } = makeRuntime(); const duplexStreamP = ChromeRuntimeDuplexStream.make( asChromeRuntime(runtime), ChromeRuntimeStreamTarget.Background, ChromeRuntimeStreamTarget.Offscreen, + validateInput, ); dispatchRuntimeMessage(makeAck()); @@ -282,6 +303,20 @@ describe.concurrent('ChromeRuntimeDuplexStream', () => { expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const [duplexStream, { dispatchRuntimeMessage }] = + await makeDuplexStream(validateInput); + + const message = { foo: 'bar' }; + dispatchRuntimeMessage(message); + + expect(await duplexStream.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + it('ends the reader when the writer ends', async () => { const [duplexStream, { runtime }] = await makeDuplexStream(); runtime.sendMessage.mockImplementationOnce(() => { diff --git a/packages/streams/src/ChromeRuntimeStream.ts b/packages/streams/src/ChromeRuntimeStream.ts index 9ad5e8f65..ee1a7483d 100644 --- a/packages/streams/src/ChromeRuntimeStream.ts +++ b/packages/streams/src/ChromeRuntimeStream.ts @@ -18,7 +18,12 @@ import type { Json } from '@metamask/utils'; import { stringify } from '@ocap/utils'; import { BaseDuplexStream } from './BaseDuplexStream.js'; -import type { OnEnd, ReceiveInput } from './BaseStream.js'; +import type { + BaseReaderArgs, + ValidateInput, + OnEnd, + ReceiveInput, +} from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; import type { ChromeRuntime, ChromeMessageSender } from './chrome.js'; import type { Dispatchable } from './utils.js'; @@ -67,7 +72,7 @@ export class ChromeRuntimeReader extends BaseReader { runtime: ChromeRuntime, target: ChromeRuntimeStreamTarget, source: ChromeRuntimeStreamTarget, - onEnd?: OnEnd, + { validateInput, onEnd }: BaseReaderArgs = {}, ) { // eslint-disable-next-line prefer-const let messageListener: ( @@ -75,9 +80,12 @@ export class ChromeRuntimeReader extends BaseReader { sender: ChromeMessageSender, ) => void; - super(async () => { - runtime.onMessage.removeListener(messageListener); - await onEnd?.(); + super({ + validateInput, + onEnd: async () => { + runtime.onMessage.removeListener(messageListener); + await onEnd?.(); + }, }); this.#receiveInput = super.getReceiveInput(); @@ -176,14 +184,18 @@ export class ChromeRuntimeDuplexStream< runtime: ChromeRuntime, localTarget: ChromeRuntimeStreamTarget, remoteTarget: ChromeRuntimeStreamTarget, + validateInput?: ValidateInput, ) { let writer: ChromeRuntimeWriter; // eslint-disable-line prefer-const const reader = new ChromeRuntimeReader( runtime, localTarget, remoteTarget, - async () => { - await writer.return(); + { + validateInput, + onEnd: async () => { + await writer.return(); + }, }, ); writer = new ChromeRuntimeWriter( @@ -202,6 +214,7 @@ export class ChromeRuntimeDuplexStream< runtime: ChromeRuntime, localTarget: ChromeRuntimeStreamTarget, remoteTarget: ChromeRuntimeStreamTarget, + validateInput?: ValidateInput, ): Promise> { if (localTarget === remoteTarget) { throw new Error('localTarget and remoteTarget must be different'); @@ -211,6 +224,7 @@ export class ChromeRuntimeDuplexStream< runtime, localTarget, remoteTarget, + validateInput, ); await stream.synchronize(); return stream; diff --git a/packages/streams/src/MessagePortStream.test.ts b/packages/streams/src/MessagePortStream.test.ts index 580c40a9f..ff6191cf8 100644 --- a/packages/streams/src/MessagePortStream.test.ts +++ b/packages/streams/src/MessagePortStream.test.ts @@ -2,6 +2,7 @@ import { delay } from '@ocap/test-utils'; import { describe, expect, it, vi } from 'vitest'; import { makeAck } from './BaseDuplexStream.js'; +import type { ValidateInput } from './BaseStream.js'; import { MessagePortDuplexStream, MessagePortReader, @@ -35,6 +36,21 @@ describe('MessagePortReader', () => { expect(await reader.next()).toStrictEqual(makePendingResult(message)); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const { port1, port2 } = new MessageChannel(); + const reader = new MessagePortReader(port1, { validateInput }); + + const message = { foo: 'bar' }; + port2.postMessage(message); + await delay(10); + + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + it('closes the port when done', async () => { const { port1, port2 } = new MessageChannel(); const closeSpy = vi.spyOn(port1, 'close'); @@ -68,7 +84,7 @@ describe('MessagePortReader', () => { it('calls onEnd once when ending', async () => { const { port1, port2 } = new MessageChannel(); const onEnd = vi.fn(); - const reader = new MessagePortReader(port1, onEnd); + const reader = new MessagePortReader(port1, { onEnd }); port2.postMessage(makeStreamDoneSignal()); await delay(10); @@ -127,8 +143,12 @@ describe('MessagePortWriter', () => { describe('MessagePortDuplexStream', () => { const makeDuplexStream = async ( channel: MessageChannel = new MessageChannel(), + validateInput?: ValidateInput, ): Promise> => { - const duplexStreamP = MessagePortDuplexStream.make(channel.port1); + const duplexStreamP = MessagePortDuplexStream.make( + channel.port1, + validateInput, + ); channel.port2.postMessage(makeAck()); await delay(10); @@ -142,6 +162,19 @@ describe('MessagePortDuplexStream', () => { expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const channel = new MessageChannel(); + const duplexStream = await makeDuplexStream(channel, validateInput); + + channel.port2.postMessage(42); + + expect(await duplexStream.next()).toStrictEqual(makePendingResult(42)); + expect(validateInput).toHaveBeenCalledWith(42); + }); + it('ends the reader when the writer ends', async () => { const { port1, port2 } = new MessageChannel(); vi.spyOn(port1, 'postMessage') diff --git a/packages/streams/src/MessagePortStream.ts b/packages/streams/src/MessagePortStream.ts index c4268bbff..e27238628 100644 --- a/packages/streams/src/MessagePortStream.ts +++ b/packages/streams/src/MessagePortStream.ts @@ -22,7 +22,7 @@ import type { Json } from '@metamask/utils'; import { BaseDuplexStream } from './BaseDuplexStream.js'; -import type { OnEnd } from './BaseStream.js'; +import type { BaseReaderArgs, ValidateInput, OnEnd } from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; import type { Dispatchable, OnMessage } from './utils.js'; @@ -38,14 +38,20 @@ import type { Dispatchable, OnMessage } from './utils.js'; * - The module-level documentation for more details. */ export class MessagePortReader extends BaseReader { - constructor(port: MessagePort, onEnd?: OnEnd) { + constructor( + port: MessagePort, + { validateInput, onEnd }: BaseReaderArgs = {}, + ) { // eslint-disable-next-line prefer-const let onMessage: OnMessage; - super(async () => { - port.removeEventListener('message', onMessage); - port.close(); - await onEnd?.(); + super({ + validateInput, + onEnd: async () => { + port.removeEventListener('message', onMessage); + port.close(); + await onEnd?.(); + }, }); const receiveInput = super.getReceiveInput(); @@ -99,10 +105,13 @@ export class MessagePortDuplexStream< > { // Unavoidable exception to our preference for #-private names. // eslint-disable-next-line no-restricted-syntax - private constructor(port: MessagePort) { + private constructor(port: MessagePort, validateInput?: ValidateInput) { let writer: MessagePortWriter; // eslint-disable-line prefer-const - const reader = new MessagePortReader(port, async () => { - await writer.return(); + const reader = new MessagePortReader(port, { + validateInput, + onEnd: async () => { + await writer.return(); + }, }); writer = new MessagePortWriter(port, async () => { await reader.return(); @@ -112,8 +121,12 @@ export class MessagePortDuplexStream< static async make( port: MessagePort, + validateInput?: ValidateInput, ): Promise> { - const stream = new MessagePortDuplexStream(port); + const stream = new MessagePortDuplexStream( + port, + validateInput, + ); await stream.synchronize(); return stream; } diff --git a/packages/streams/src/PostMessageStream.test.ts b/packages/streams/src/PostMessageStream.test.ts index c85d2d088..54e9e2b30 100644 --- a/packages/streams/src/PostMessageStream.test.ts +++ b/packages/streams/src/PostMessageStream.test.ts @@ -2,6 +2,7 @@ import { delay } from '@ocap/test-utils'; import { describe, it, expect, vi } from 'vitest'; import { makeAck } from './BaseDuplexStream.js'; +import type { ValidateInput } from './BaseStream.js'; import { PostMessageDuplexStream, PostMessageReader, @@ -50,6 +51,22 @@ describe('PostMessageReader', () => { expect(await reader.next()).toStrictEqual(makePendingResult(message)); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const { postMessageFn, setListener, removeListener } = + makePostMessageMock(); + const reader = new PostMessageReader(setListener, removeListener, { + validateInput, + }); + + const message = { foo: 'bar' }; + postMessageFn(message); + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + it('removes its listener when it ends', async () => { const { postMessageFn, setListener, removeListener, listeners } = makePostMessageMock(); @@ -83,7 +100,9 @@ describe('PostMessageReader', () => { const { postMessageFn, setListener, removeListener } = makePostMessageMock(); const onEnd = vi.fn(); - const reader = new PostMessageReader(setListener, removeListener, onEnd); + const reader = new PostMessageReader(setListener, removeListener, { + onEnd, + }); postMessageFn(makeStreamDoneSignal()); @@ -121,15 +140,21 @@ describe('PostMessageWriter', () => { }); describe('PostMessageDuplexStream', () => { - // eslint-disable-next-line @typescript-eslint/explicit-function-return-type - const makeDuplexStream = async (sendMessage: PostMessage) => { - const { postMessageFn, setListener, removeListener } = - makePostMessageMock(); + const makeDuplexStream = async ( + sendMessage: PostMessage, + postMessageMock: ReturnType< + typeof makePostMessageMock + > = makePostMessageMock(), + validateInput?: ValidateInput, + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type + ) => { + const { postMessageFn, setListener, removeListener } = postMessageMock; const duplexStreamP = PostMessageDuplexStream.make( sendMessage, setListener, removeListener, + validateInput, ); postMessageFn(makeAck()); await delay(10); @@ -144,6 +169,22 @@ describe('PostMessageDuplexStream', () => { expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream); }); + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const postMessageMock = makePostMessageMock(); + const [duplexStream] = await makeDuplexStream( + () => undefined, + postMessageMock, + validateInput, + ); + + postMessageMock.postMessageFn(42); + expect(await duplexStream.next()).toStrictEqual(makePendingResult(42)); + expect(validateInput).toHaveBeenCalledWith(42); + }); + it('ends the reader when the writer ends', async () => { const [duplexStream] = await makeDuplexStream( vi diff --git a/packages/streams/src/PostMessageStream.ts b/packages/streams/src/PostMessageStream.ts index 6db292179..e70772049 100644 --- a/packages/streams/src/PostMessageStream.ts +++ b/packages/streams/src/PostMessageStream.ts @@ -9,7 +9,7 @@ import type { Json } from '@metamask/utils'; import { BaseDuplexStream } from './BaseDuplexStream.js'; -import type { OnEnd } from './BaseStream.js'; +import type { BaseReaderArgs, ValidateInput, OnEnd } from './BaseStream.js'; import { BaseReader, BaseWriter } from './BaseStream.js'; // Used in docstring. // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -30,14 +30,17 @@ export class PostMessageReader extends BaseReader { constructor( setListener: SetListener, removeListener: RemoveListener, - onEnd?: OnEnd, + { validateInput, onEnd }: BaseReaderArgs = {}, ) { // eslint-disable-next-line prefer-const let onMessage: OnMessage; - super(async () => { - removeListener(onMessage); - await onEnd?.(); + super({ + validateInput, + onEnd: async () => { + removeListener(onMessage); + await onEnd?.(); + }, }); const receiveInput = super.getReceiveInput(); @@ -89,15 +92,15 @@ export class PostMessageDuplexStream< postMessageFn: PostMessage, setListener: SetListener, removeListener: RemoveListener, + validateInput?: ValidateInput, ) { let writer: PostMessageWriter; // eslint-disable-line prefer-const - const reader = new PostMessageReader( - setListener, - removeListener, - async () => { + const reader = new PostMessageReader(setListener, removeListener, { + validateInput, + onEnd: async () => { await writer.return(); }, - ); + }); writer = new PostMessageWriter(postMessageFn, async () => { await reader.return(); }); @@ -108,11 +111,13 @@ export class PostMessageDuplexStream< postMessageFn: PostMessage, setListener: SetListener, removeListener: RemoveListener, + validateInput?: ValidateInput, ): Promise> { const stream = new PostMessageDuplexStream( postMessageFn, setListener, removeListener, + validateInput, ); await stream.synchronize(); return stream; diff --git a/packages/streams/test/stream-mocks.ts b/packages/streams/test/stream-mocks.ts index 4d0a6371b..e917d7318 100644 --- a/packages/streams/test/stream-mocks.ts +++ b/packages/streams/test/stream-mocks.ts @@ -1,7 +1,12 @@ import type { Json } from '@metamask/utils'; import { BaseDuplexStream, makeAck } from '../src/BaseDuplexStream.js'; -import type { Dispatch, ReceiveInput } from '../src/BaseStream.js'; +import type { + Dispatch, + ReceiveInput, + BaseReaderArgs, + ValidateInput, +} from '../src/BaseStream.js'; import { BaseReader, BaseWriter } from '../src/BaseStream.js'; export class TestReader extends BaseReader { @@ -11,8 +16,8 @@ export class TestReader extends BaseReader { return this.#receiveInput; } - constructor(onEnd?: () => void) { - super(onEnd); + constructor(args: BaseReaderArgs = {}) { + super(args); this.#receiveInput = super.getReceiveInput(); } @@ -34,7 +39,8 @@ export class TestWriter extends BaseWriter { } } -type TestDuplexStreamOptions = { +type TestDuplexStreamOptions = { + validateInput?: ValidateInput | undefined; readerOnEnd?: () => void; writerOnEnd?: () => void; }; @@ -57,9 +63,13 @@ export class TestDuplexStream< constructor( onDispatch: Dispatch, - { readerOnEnd, writerOnEnd }: TestDuplexStreamOptions = {}, + { + validateInput, + readerOnEnd, + writerOnEnd, + }: TestDuplexStreamOptions = {}, ) { - const reader = new TestReader(readerOnEnd); + const reader = new TestReader({ validateInput, onEnd: readerOnEnd }); super(reader, new TestWriter(onDispatch, writerOnEnd)); this.#onDispatch = onDispatch; this.#receiveInput = reader.receiveInput; @@ -89,7 +99,7 @@ export class TestDuplexStream< */ static async make( onDispatch: Dispatch, - opts: TestDuplexStreamOptions = {}, + opts: TestDuplexStreamOptions = {}, ): Promise> { const stream = new TestDuplexStream(onDispatch, opts); await stream.completeSynchronization();