Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kernel/src/Supervisor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('Supervisor', () => {
expect(consoleErrorSpy).toHaveBeenCalledWith(
`Unexpected read error from Supervisor "${supervisor.id}"`,
new Error(
'Message cannot be processed by stream (must be JSON-serializable):\nnull',
'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull',
),
);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/kernel/src/Vat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('Vat', () => {
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Unexpected read error',
new Error(
'Message cannot be processed by stream (must be JSON-serializable):\nnull',
'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull',
),
);
});
Expand Down
41 changes: 23 additions & 18 deletions packages/streams/src/BaseStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('BaseReader', () => {
it('throws if getReceiveInput is called more than once', () => {
const reader = new TestReader();
expect(() => reader.getReceiveInput()).toThrow(
'receiveInput has already been accessed',
'TestReader received multiple calls to getReceiveInput()',
);
});

Expand Down Expand Up @@ -98,7 +98,7 @@ describe('BaseReader', () => {
reader.receiveInput(badMessage);

await expect(reader.next()).rejects.toThrow(
'Message cannot be processed by stream (must be JSON-serializable)',
'TestReader: Message cannot be processed (must be JSON-serializable)',
);
});

Expand All @@ -110,7 +110,7 @@ describe('BaseReader', () => {
reader.receiveInput(badMessage);

await expect(nextP).rejects.toThrow(
'Message cannot be processed by stream (must be JSON-serializable)',
'TestReader: Message cannot be processed (must be JSON-serializable)',
);
});

Expand Down Expand Up @@ -139,7 +139,7 @@ describe('BaseReader', () => {
reader.receiveInput({});

await expect(reader.next()).rejects.toThrow(
'Message failed type validation',
'TestReader: Message failed type validation',
);
});

Expand All @@ -151,7 +151,9 @@ describe('BaseReader', () => {
const nextP = reader.next();
reader.receiveInput({});

await expect(nextP).rejects.toThrow('Message failed type validation');
await expect(nextP).rejects.toThrow(
'TestReader: Message failed type validation',
);
});

it('ends after receiving done signal, before read is enqueued', async () => {
Expand Down Expand Up @@ -277,14 +279,17 @@ describe('BaseReader', () => {
describe('BaseWriter', () => {
describe('initialization', () => {
it('constructs a BaseWriter', () => {
const writer = new TestWriter(() => undefined);
const writer = new TestWriter({ onDispatch: () => undefined });
expect(writer).toBeInstanceOf(BaseWriter);
expect(writer[Symbol.asyncIterator]()).toBe(writer);
});

it('calls onEnd once when ending', async () => {
const onEnd = vi.fn();
const writer = new TestWriter(() => undefined, onEnd);
const writer = new TestWriter({
onDispatch: () => undefined,
onEnd,
});
expect(onEnd).not.toHaveBeenCalled();

await writer.return();
Expand All @@ -297,7 +302,7 @@ describe('BaseWriter', () => {
describe('next and sending messages', () => {
it('dispatches messages', async () => {
const dispatchSpy = vi.fn();
const writer = new TestWriter(dispatchSpy);
const writer = new TestWriter({ onDispatch: dispatchSpy });

const message = 42;
const nextP = writer.next(message);
Expand All @@ -310,7 +315,7 @@ describe('BaseWriter', () => {
const dispatchSpy = vi.fn().mockImplementationOnce(() => {
throw new Error('foo');
});
const writer = new TestWriter(dispatchSpy);
const writer = new TestWriter({ onDispatch: dispatchSpy });

await expect(writer.next(42)).rejects.toThrow(
makeErrorMatcher(
Expand All @@ -336,7 +341,7 @@ describe('BaseWriter', () => {
.mockImplementationOnce(() => {
throw new Error('foo');
});
const writer = new TestWriter(dispatchSpy);
const writer = new TestWriter({ onDispatch: dispatchSpy });

await expect(writer.next(42)).rejects.toThrow(
'TestWriter experienced repeated dispatch failures.',
Expand All @@ -358,14 +363,14 @@ describe('BaseWriter', () => {

describe('return', () => {
it('ends the stream', async () => {
const writer = new TestWriter(() => undefined);
const writer = new TestWriter({ onDispatch: () => undefined });

expect(await writer.return()).toStrictEqual(makeDoneResult());
expect(await writer.next(42)).toStrictEqual(makeDoneResult());
});

it('is idempotent', async () => {
const writer = new TestWriter(() => undefined);
const writer = new TestWriter({ onDispatch: () => undefined });

expect(await writer.return()).toStrictEqual(makeDoneResult());
expect(await writer.return()).toStrictEqual(makeDoneResult());
Expand All @@ -374,28 +379,28 @@ describe('BaseWriter', () => {

describe('throw', () => {
it('ends the stream', async () => {
const writer = new TestWriter(() => undefined);
const writer = new TestWriter({ onDispatch: () => undefined });

expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult());
expect(await writer.next(42)).toStrictEqual(makeDoneResult());
});

it('is idempotent', async () => {
const writer = new TestWriter(() => undefined);
const writer = new TestWriter({ onDispatch: () => undefined });

expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult());
expect(await writer.throw(new Error())).toStrictEqual(makeDoneResult());
});

it('breaks out of failed onDispatch with failed onEnd', async () => {
const writer = new TestWriter(
() => {
const writer = new TestWriter({
onDispatch: () => {
throw new Error('onDispatchError');
},
() => {
onEnd: () => {
throw new Error('onEndError');
},
);
});

await expect(
async () => await writer.throw(new Error('thrownError')),
Expand Down
52 changes: 32 additions & 20 deletions packages/streams/src/BaseStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ export type ValidateInput<Read extends Json> = (input: Json) => input is Read;
export type ReceiveInput = (input: unknown) => void;

export type BaseReaderArgs<Read extends Json> = {
validateInput?: ValidateInput<Read> | undefined;
name?: string | undefined;
onEnd?: OnEnd | undefined;
validateInput?: ValidateInput<Read> | undefined;
};

/**
Expand All @@ -127,6 +128,8 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
*/
readonly #buffer = makeStreamBuffer<IteratorResult<Read, undefined>>();

readonly #name: string;

readonly #validateInput?: ValidateInput<Read> | undefined;

#onEnd?: OnEnd | undefined;
Expand All @@ -137,13 +140,15 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
* Constructs a {@link BaseReader}.
*
* @param args - Options bag.
* @param args.validateInput - A function that validates input from the transport.
* @param args.onEnd - A function that is called when the stream ends. For any cleanup that
* @param args.name - The name of the stream, for logging purposes. Defaults to the class name.
* should happen when the stream ends, such as closing a message port.
* @param args.onEnd - A function that is called when the stream ends. For any cleanup that
* @param args.validateInput - A function that validates input from the transport.
*/
constructor({ validateInput, onEnd }: BaseReaderArgs<Read>) {
this.#validateInput = validateInput;
constructor({ name, onEnd, validateInput }: BaseReaderArgs<Read>) {
this.#name = name ?? this.constructor.name;
this.#onEnd = onEnd;
this.#validateInput = validateInput;
harden(this);
}

Expand All @@ -155,7 +160,9 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
*/
protected getReceiveInput(): ReceiveInput {
if (this.#didExposeReceiveInput) {
throw new Error('receiveInput has already been accessed');
throw new Error(
`${this.#name} received multiple calls to getReceiveInput()`,
);
}
this.#didExposeReceiveInput = true;
return this.#receiveInput.bind(this);
Expand All @@ -165,7 +172,7 @@ export class BaseReader<Read extends Json> implements Reader<Read> {
if (!isDispatchable(input)) {
await this.#handleInputError(
new Error(
`Message cannot be processed by stream (must be JSON-serializable):\n${stringify(input)}`,
`${this.#name}: Message cannot be processed (must be JSON-serializable):\n${stringify(input)}`,
),
);
return;
Expand All @@ -184,7 +191,9 @@ export class BaseReader<Read extends Json> implements Reader<Read> {

if (this.#validateInput?.(unmarshaled) === false) {
await this.#handleInputError(
new Error(`Message failed type validation:\n${stringify(unmarshaled)}`),
new Error(
`${this.#name}: Message failed type validation:\n${stringify(unmarshaled)}`,
),
);
return;
}
Expand Down Expand Up @@ -252,13 +261,19 @@ export type Dispatch<Yield extends Json> = (
value: Dispatchable<Yield>,
) => void | Promise<void>;

export type BaseWriterArgs<Write extends Json> = {
onDispatch: Dispatch<Write>;
name?: string | undefined;
onEnd?: OnEnd | undefined;
};

/**
* The base of a writable async iterator stream.
*/
export class BaseWriter<Write extends Json> implements Writer<Write> {
#isDone: boolean = false;

readonly #logName: string = 'BaseWriter';
readonly #name: string = 'BaseWriter';

readonly #onDispatch: Dispatch<Write>;

Expand All @@ -267,17 +282,14 @@ export class BaseWriter<Write extends Json> implements Writer<Write> {
/**
* Constructs a {@link BaseWriter}.
*
* @param logName - The name of the stream, for logging purposes.
* @param onDispatch - A function that dispatches messages over the underlying transport mechanism.
* @param onEnd - A function that is called when the stream ends. For any cleanup that
* @param args - Options bag.
* @param args.onDispatch - A function that dispatches messages over the underlying transport mechanism.
* @param args.onEnd - A function that is called when the stream ends. For any cleanup that
* @param args.name - The name of the stream, for logging purposes. Defaults to the class name.
* should happen when the stream ends, such as closing a message port.
*/
constructor(
logName: string,
onDispatch: Dispatch<Write>,
onEnd?: () => void,
) {
this.#logName = logName;
constructor({ name, onDispatch, onEnd }: BaseWriterArgs<Write>) {
this.#name = name ?? this.constructor.name;
this.#onDispatch = onDispatch;
this.#onEnd = onEnd;
harden(this);
Expand Down Expand Up @@ -310,7 +322,7 @@ export class BaseWriter<Write extends Json> implements Writer<Write> {
// Break out of repeated failure to dispatch an error. It is unclear how this would occur
// in practice, but it's the kind of failure mode where it's better to be sure.
const repeatedFailureError = new Error(
`${this.#logName} experienced repeated dispatch failures.`,
`${this.#name} experienced repeated dispatch failures.`,
{ cause: error },
);
await this.#onDispatch(makeStreamErrorSignal(repeatedFailureError));
Expand All @@ -321,7 +333,7 @@ export class BaseWriter<Write extends Json> implements Writer<Write> {
error instanceof Error ? error : new Error(String(error)),
true,
);
throw new Error(`${this.#logName} experienced a dispatch failure`, {
throw new Error(`${this.#name} experienced a dispatch failure`, {
cause: error,
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/streams/src/ChromeRuntimeStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ describe.concurrent('ChromeRuntimeWriter', () => {
asChromeRuntime(runtime),
ChromeRuntimeStreamTarget.Background,
ChromeRuntimeStreamTarget.Offscreen,
onEnd,
{ onEnd },
);

expect(await writer.return()).toStrictEqual(makeDoneResult());
Expand Down Expand Up @@ -324,7 +324,7 @@ describe.concurrent('ChromeRuntimeDuplexStream', () => {
});

await expect(duplexStream.write(42)).rejects.toThrow(
'ChromeRuntimeWriter experienced a dispatch failure',
'ChromeRuntimeDuplexStream experienced a dispatch failure',
);
expect(await duplexStream.next()).toStrictEqual(makeDoneResult());
});
Expand Down
20 changes: 12 additions & 8 deletions packages/streams/src/ChromeRuntimeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { BaseDuplexStream } from './BaseDuplexStream.js';
import type {
BaseReaderArgs,
ValidateInput,
OnEnd,
BaseWriterArgs,
ReceiveInput,
} from './BaseStream.js';
import { BaseReader, BaseWriter } from './BaseStream.js';
Expand Down Expand Up @@ -142,19 +142,19 @@ export class ChromeRuntimeWriter<Write extends Json> extends BaseWriter<Write> {
runtime: ChromeRuntime,
target: ChromeRuntimeStreamTarget,
source: ChromeRuntimeStreamTarget,
onEnd?: OnEnd,
{ name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
) {
super(
'ChromeRuntimeWriter',
async (value: Dispatchable<Write>) => {
super({
name,
onDispatch: async (value: Dispatchable<Write>) => {
await runtime.sendMessage({
target,
source,
payload: value,
});
},
onEnd,
);
});
harden(this);
}
}
Expand Down Expand Up @@ -192,6 +192,7 @@ export class ChromeRuntimeDuplexStream<
localTarget,
remoteTarget,
{
name: 'ChromeRuntimeDuplexStream',
validateInput,
onEnd: async () => {
await writer.return();
Expand All @@ -202,8 +203,11 @@ export class ChromeRuntimeDuplexStream<
runtime,
remoteTarget,
localTarget,
async () => {
await reader.return();
{
name: 'ChromeRuntimeDuplexStream',
onEnd: async () => {
await reader.return();
},
},
);
super(reader, writer);
Expand Down
4 changes: 2 additions & 2 deletions packages/streams/src/MessagePortStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe('MessagePortWriter', () => {
it('calls onEnd once when ending', async () => {
const { port1 } = new MessageChannel();
const onEnd = vi.fn();
const writer = new MessagePortWriter(port1, onEnd);
const writer = new MessagePortWriter(port1, { onEnd });

expect(await writer.return()).toStrictEqual(makeDoneResult());
expect(onEnd).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -185,7 +185,7 @@ describe('MessagePortDuplexStream', () => {
const duplexStream = await makeDuplexStream({ port1, port2 });

await expect(duplexStream.write(42)).rejects.toThrow(
'MessagePortWriter experienced a dispatch failure',
'MessagePortDuplexStream experienced a dispatch failure',
);
expect(await duplexStream.next()).toStrictEqual(makeDoneResult());
});
Expand Down
Loading