Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/kernel/src/Supervisor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ describe('Supervisor', () => {
await delay(10);
expect(consoleErrorSpy).toHaveBeenCalledWith(
`Unexpected read error from Supervisor "${supervisor.id}"`,
new Error('Received unexpected message from transport:\nnull'),
new Error(
'Message cannot be processed by stream (must be JSON-serializable):\nnull',
),
);
});
});
Expand Down
4 changes: 3 additions & 1 deletion packages/kernel/src/Vat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ describe('Vat', () => {
await delay(10);
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Unexpected read error',
new Error('Received unexpected message from transport:\nnull'),
new Error(
'Message cannot be processed by stream (must be JSON-serializable):\nnull',
),
);
});
});
Expand Down
55 changes: 46 additions & 9 deletions packages/streams/src/BaseStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { makeErrorMatcherFactory } from '@ocap/test-utils';
import { describe, expect, it, vi } from 'vitest';

import { BaseReader, BaseWriter } from './BaseStream.js';
import type { ValidateInput } from './BaseStream.js';
import {
makeDoneResult,
makePendingResult,
Expand Down Expand Up @@ -30,7 +31,7 @@ describe('BaseReader', () => {

it('calls onEnd once when ending', async () => {
const onEnd = vi.fn();
const reader = new TestReader(onEnd);
const reader = new TestReader({ onEnd });
expect(onEnd).not.toHaveBeenCalled();

await reader.return();
Expand All @@ -50,6 +51,19 @@ describe('BaseReader', () => {
expect(await reader.next()).toStrictEqual(makePendingResult(message));
});

it('calls validateInput with received input if specified', async () => {
const validateInput = vi.fn((_value: unknown) => true);
const reader = new TestReader({
validateInput: validateInput as unknown as ValidateInput<number>,
});

const message = 42;
reader.receiveInput(message);

expect(await reader.next()).toStrictEqual(makePendingResult(message));
expect(validateInput).toHaveBeenCalledWith(message);
});

it('emits message received after next()', async () => {
const reader = new TestReader();

Expand Down Expand Up @@ -77,26 +91,26 @@ describe('BaseReader', () => {
}
});

it('throws after receiving unexpected message, before read is enqueued', async () => {
it('throws after receiving non-Dispatchable input, before read is enqueued', async () => {
const reader = new TestReader();

const unexpectedMessage = Symbol('foo');
reader.receiveInput(unexpectedMessage);
const badMessage = Symbol('foo');
reader.receiveInput(badMessage);

await expect(reader.next()).rejects.toThrow(
'Received unexpected message from transport',
'Message cannot be processed by stream (must be JSON-serializable)',
);
});

it('throws after receiving unexpected message, after read is enqueued', async () => {
it('throws after receiving non-Dispatchable input, after read is enqueued', async () => {
const reader = new TestReader();

const nextP = reader.next();
const unexpectedMessage = Symbol('foo');
reader.receiveInput(unexpectedMessage);
const badMessage = Symbol('foo');
reader.receiveInput(badMessage);

await expect(nextP).rejects.toThrow(
'Received unexpected message from transport',
'Message cannot be processed by stream (must be JSON-serializable)',
);
});

Expand All @@ -117,6 +131,29 @@ describe('BaseReader', () => {
await expect(nextP).rejects.toThrow('foo');
});

it('throws after receiving invalid input, before read is enqueued', async () => {
const reader = new TestReader({
validateInput: (value) => typeof value === 'number',
});

reader.receiveInput({});

await expect(reader.next()).rejects.toThrow(
'Message failed type validation',
);
});

it('throws after receiving invalid input, after read is enqueued', async () => {
const reader = new TestReader({
validateInput: (value) => typeof value === 'number',
});

const nextP = reader.next();
reader.receiveInput({});

await expect(nextP).rejects.toThrow('Message failed type validation');
});

it('ends after receiving done signal, before read is enqueued', async () => {
const reader = new TestReader();

Expand Down
50 changes: 37 additions & 13 deletions packages/streams/src/BaseStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,22 @@ harden(makeStreamBuffer);
*/
export type OnEnd = () => void | Promise<void>;

/**
* A function that validates input to a readable stream.
*/
export type ValidateInput<Read extends Json> = (input: Json) => input is Read;

/**
* A function that receives input from a transport mechanism to a readable stream.
* Validates that the input is an {@link IteratorResult}, and throws if it is not.
*/
export type ReceiveInput = (input: unknown) => void;

export type BaseReaderArgs<Read extends Json> = {
validateInput?: ValidateInput<Read> | undefined;
onEnd?: OnEnd | undefined;
};

/**
* The base of a readable async iterator stream.
*
Expand All @@ -117,17 +127,22 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
*/
readonly #buffer = makeStreamBuffer<IteratorResult<Read, undefined>>();

readonly #validateInput?: ValidateInput<Read> | undefined;

#onEnd?: OnEnd | undefined;

#didExposeReceiveInput: boolean = false;

/**
* Constructs a {@link BaseReader}.
*
* @param onEnd - A function that is called when the stream ends. For any cleanup that
* @param args - Options bag.
* @param args.validateInput - A function that validates input from the transport.
* @param args.onEnd - A function that is called when the stream ends. For any cleanup that
* should happen when the stream ends, such as closing a message port.
*/
constructor(onEnd?: () => void) {
constructor({ validateInput, onEnd }: BaseReaderArgs<Read>) {
this.#validateInput = validateInput;
this.#onEnd = onEnd;
harden(this);
}
Expand All @@ -148,22 +163,17 @@ export class BaseReader<Read extends Json> implements Reader<Read> {

readonly #receiveInput: ReceiveInput = async (input) => {
if (!isDispatchable(input)) {
const error = new Error(
`Received unexpected message from transport:\n${stringify(input)}`,
await this.#handleInputError(
new Error(
`Message cannot be processed by stream (must be JSON-serializable):\n${stringify(input)}`,
),
);
if (!this.#buffer.hasPendingReads()) {
this.#buffer.put(error);
}
await this.#end(error);
return;
}

const unmarshaled = unmarshal(input);
if (unmarshaled instanceof Error) {
if (!this.#buffer.hasPendingReads()) {
this.#buffer.put(unmarshaled);
}
await this.#end(unmarshaled);
await this.#handleInputError(unmarshaled);
return;
}

Expand All @@ -172,9 +182,23 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
return;
}

this.#buffer.put(makePendingResult(unmarshaled as Read));
if (this.#validateInput?.(unmarshaled) === false) {
await this.#handleInputError(
new Error(`Message failed type validation:\n${stringify(unmarshaled)}`),
);
return;
}

this.#buffer.put(makePendingResult(unmarshaled));
};

async #handleInputError(error: Error): Promise<void> {
if (!this.#buffer.hasPendingReads()) {
this.#buffer.put(error);
}
await this.#end(error);
}

/**
* Ends the stream. Calls and then unsets the `#onEnd` method.
* Idempotent.
Expand Down
39 changes: 37 additions & 2 deletions packages/streams/src/ChromeRuntimeStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { stringify } from '@ocap/utils';
import { describe, expect, it, vi } from 'vitest';

import { makeAck } from './BaseDuplexStream.js';
import type { ValidateInput } from './BaseStream.js';
import type { ChromeRuntime } from './chrome.js';
import type { MessageEnvelope } from './ChromeRuntimeStream.js';
import {
Expand Down Expand Up @@ -94,6 +95,25 @@ describe('ChromeRuntimeReader', () => {
expect(await reader.next()).toStrictEqual(makePendingResult(message));
});

it('calls validateInput with received input if specified', async () => {
const { runtime, dispatchRuntimeMessage } = makeRuntime();
const validateInput = vi
.fn()
.mockReturnValue(true) as unknown as ValidateInput<number>;
const reader = new ChromeRuntimeReader(
asChromeRuntime(runtime),
ChromeRuntimeStreamTarget.Background,
ChromeRuntimeStreamTarget.Offscreen,
{ validateInput },
);

const message = { foo: 'bar' };
dispatchRuntimeMessage(message);

expect(await reader.next()).toStrictEqual(makePendingResult(message));
expect(validateInput).toHaveBeenCalledWith(message);
});

it('ignores messages from other extensions', async () => {
const { runtime, dispatchRuntimeMessage } = makeRuntime();
const reader = new ChromeRuntimeReader(
Expand Down Expand Up @@ -187,7 +207,7 @@ describe('ChromeRuntimeReader', () => {
asChromeRuntime(runtime),
ChromeRuntimeStreamTarget.Background,
ChromeRuntimeStreamTarget.Offscreen,
onEnd,
{ onEnd },
);

dispatchRuntimeMessage(makeStreamDoneSignal());
Expand Down Expand Up @@ -251,12 +271,13 @@ describe.concurrent('ChromeRuntimeWriter', () => {

describe.concurrent('ChromeRuntimeDuplexStream', () => {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
const makeDuplexStream = async () => {
const makeDuplexStream = async (validateInput?: ValidateInput<number>) => {
const { runtime, dispatchRuntimeMessage } = makeRuntime();
const duplexStreamP = ChromeRuntimeDuplexStream.make(
asChromeRuntime(runtime),
ChromeRuntimeStreamTarget.Background,
ChromeRuntimeStreamTarget.Offscreen,
validateInput,
);
dispatchRuntimeMessage(makeAck());

Expand All @@ -282,6 +303,20 @@ describe.concurrent('ChromeRuntimeDuplexStream', () => {
expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream);
});

it('calls validateInput with received input if specified', async () => {
const validateInput = vi
.fn()
.mockReturnValue(true) as unknown as ValidateInput<number>;
const [duplexStream, { dispatchRuntimeMessage }] =
await makeDuplexStream(validateInput);

const message = { foo: 'bar' };
dispatchRuntimeMessage(message);

expect(await duplexStream.next()).toStrictEqual(makePendingResult(message));
expect(validateInput).toHaveBeenCalledWith(message);
});

it('ends the reader when the writer ends', async () => {
const [duplexStream, { runtime }] = await makeDuplexStream();
runtime.sendMessage.mockImplementationOnce(() => {
Expand Down
28 changes: 21 additions & 7 deletions packages/streams/src/ChromeRuntimeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import type { Json } from '@metamask/utils';
import { stringify } from '@ocap/utils';

import { BaseDuplexStream } from './BaseDuplexStream.js';
import type { OnEnd, ReceiveInput } from './BaseStream.js';
import type {
BaseReaderArgs,
ValidateInput,
OnEnd,
ReceiveInput,
} from './BaseStream.js';
import { BaseReader, BaseWriter } from './BaseStream.js';
import type { ChromeRuntime, ChromeMessageSender } from './chrome.js';
import type { Dispatchable } from './utils.js';
Expand Down Expand Up @@ -67,17 +72,20 @@ export class ChromeRuntimeReader<Read extends Json> extends BaseReader<Read> {
runtime: ChromeRuntime,
target: ChromeRuntimeStreamTarget,
source: ChromeRuntimeStreamTarget,
onEnd?: OnEnd,
{ validateInput, onEnd }: BaseReaderArgs<Read> = {},
) {
// eslint-disable-next-line prefer-const
let messageListener: (
message: unknown,
sender: ChromeMessageSender,
) => void;

super(async () => {
runtime.onMessage.removeListener(messageListener);
await onEnd?.();
super({
validateInput,
onEnd: async () => {
runtime.onMessage.removeListener(messageListener);
await onEnd?.();
},
});

this.#receiveInput = super.getReceiveInput();
Expand Down Expand Up @@ -176,14 +184,18 @@ export class ChromeRuntimeDuplexStream<
runtime: ChromeRuntime,
localTarget: ChromeRuntimeStreamTarget,
remoteTarget: ChromeRuntimeStreamTarget,
validateInput?: ValidateInput<Read>,
) {
let writer: ChromeRuntimeWriter<Write>; // eslint-disable-line prefer-const
const reader = new ChromeRuntimeReader<Read>(
runtime,
localTarget,
remoteTarget,
async () => {
await writer.return();
{
validateInput,
onEnd: async () => {
await writer.return();
},
},
);
writer = new ChromeRuntimeWriter<Write>(
Expand All @@ -202,6 +214,7 @@ export class ChromeRuntimeDuplexStream<
runtime: ChromeRuntime,
localTarget: ChromeRuntimeStreamTarget,
remoteTarget: ChromeRuntimeStreamTarget,
validateInput?: ValidateInput<Read>,
): Promise<ChromeRuntimeDuplexStream<Read, Write>> {
if (localTarget === remoteTarget) {
throw new Error('localTarget and remoteTarget must be different');
Expand All @@ -211,6 +224,7 @@ export class ChromeRuntimeDuplexStream<
runtime,
localTarget,
remoteTarget,
validateInput,
);
await stream.synchronize();
return stream;
Expand Down
Loading