diff --git a/packages/errors/src/index.test.ts b/packages/errors/src/index.test.ts index e7d1e62e0..052a8f289 100644 --- a/packages/errors/src/index.test.ts +++ b/packages/errors/src/index.test.ts @@ -7,6 +7,8 @@ describe('index', () => { expect(Object.keys(indexModule).sort()).toStrictEqual([ 'ErrorCode', 'ErrorSentinel', + 'MarshaledErrorStruct', + 'MarshaledOcapErrorStruct', 'StreamReadError', 'VatAlreadyExistsError', 'VatCapTpConnectionExistsError', diff --git a/packages/errors/src/index.ts b/packages/errors/src/index.ts index c2aaf7186..a56fc15c4 100644 --- a/packages/errors/src/index.ts +++ b/packages/errors/src/index.ts @@ -5,7 +5,12 @@ export { VatAlreadyExistsError } from './errors/VatAlreadyExistsError.js'; export { VatDeletedError } from './errors/VatDeletedError.js'; export { VatNotFoundError } from './errors/VatNotFoundError.js'; export { StreamReadError } from './errors/StreamReadError.js'; -export { ErrorCode, ErrorSentinel } from './constants.js'; +export { + ErrorCode, + ErrorSentinel, + MarshaledErrorStruct, + MarshaledOcapErrorStruct, +} from './constants.js'; export { toError } from './utils/toError.js'; export { isOcapError } from './utils/isOcapError.js'; export { marshalError } from './marshal/marshalError.js'; diff --git a/packages/extension/src/background.ts b/packages/extension/src/background.ts index ac6b64133..b2e4ef1a9 100644 --- a/packages/extension/src/background.ts +++ b/packages/extension/src/background.ts @@ -1,6 +1,6 @@ import type { Json } from '@metamask/utils'; -import { ClusterCommandMethod, isClusterCommandReply } from '@ocap/kernel'; -import type { ClusterCommand } from '@ocap/kernel'; +import { KernelCommandMethod, isKernelCommandReply } from '@ocap/kernel'; +import type { KernelCommand } from '@ocap/kernel'; import { ChromeRuntimeTarget, ChromeRuntimeDuplexStream } from '@ocap/streams'; const OFFSCREEN_DOCUMENT_PATH = '/offscreen.html'; @@ -31,7 +31,7 @@ async function main(): Promise { * * @param command - The command to send. */ - const sendClusterCommand = async (command: ClusterCommand): Promise => { + const sendClusterCommand = async (command: KernelCommand): Promise => { await offscreenStream.write(command); }; @@ -40,21 +40,21 @@ async function main(): Promise { capTpCall: { value: async (method: string, params: Json[]) => sendClusterCommand({ - method: ClusterCommandMethod.CapTpCall, + method: KernelCommandMethod.capTpCall, params: { method, params }, }), }, evaluate: { value: async (source: string) => sendClusterCommand({ - method: ClusterCommandMethod.Evaluate, + method: KernelCommandMethod.evaluate, params: source, }), }, ping: { value: async () => sendClusterCommand({ - method: ClusterCommandMethod.Ping, + method: KernelCommandMethod.ping, params: null, }), }, @@ -64,14 +64,14 @@ async function main(): Promise { kvGet: { value: async (key: string) => sendClusterCommand({ - method: ClusterCommandMethod.KVGet, + method: KernelCommandMethod.kvGet, params: key, }), }, kvSet: { value: async (key: string, value: string) => sendClusterCommand({ - method: ClusterCommandMethod.KVSet, + method: KernelCommandMethod.kvSet, params: { key, value }, }), }, @@ -81,24 +81,24 @@ async function main(): Promise { // With this we can click the extension action button to wake up the service worker. chrome.action.onClicked.addListener(() => { sendClusterCommand({ - method: ClusterCommandMethod.Ping, + method: KernelCommandMethod.ping, params: null, }).catch(console.error); }); // Handle replies from the offscreen document for await (const message of offscreenStream) { - if (!isClusterCommandReply(message)) { + if (!isKernelCommandReply(message)) { console.error('Background received unexpected message', message); continue; } switch (message.method) { - case ClusterCommandMethod.Evaluate: - case ClusterCommandMethod.CapTpCall: - case ClusterCommandMethod.Ping: - case ClusterCommandMethod.KVGet: - case ClusterCommandMethod.KVSet: + case KernelCommandMethod.evaluate: + case KernelCommandMethod.capTpCall: + case KernelCommandMethod.ping: + case KernelCommandMethod.kvGet: + case KernelCommandMethod.kvSet: console.log(message.params); break; default: diff --git a/packages/extension/src/iframe.ts b/packages/extension/src/iframe.ts index e8db30dd0..df7a00aaf 100644 --- a/packages/extension/src/iframe.ts +++ b/packages/extension/src/iframe.ts @@ -1,8 +1,7 @@ import { makeExo } from '@endo/exo'; import { M } from '@endo/patterns'; import { Supervisor } from '@ocap/kernel'; -import type { StreamEnvelope, StreamEnvelopeReply } from '@ocap/kernel'; -import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams'; +import { MessagePortMultiplexer, receiveMessagePort } from '@ocap/streams'; main().catch(console.error); @@ -10,12 +9,10 @@ main().catch(console.error); * The main function for the iframe. */ async function main(): Promise { - const stream = await receiveMessagePort( + const multiplexer = await receiveMessagePort( (listener) => addEventListener('message', listener), (listener) => removeEventListener('message', listener), - ).then(async (port) => - MessagePortDuplexStream.make(port), - ); + ).then(async (port) => new MessagePortMultiplexer(port)); const bootstrap = makeExo( 'TheGreatFrangooly', @@ -23,7 +20,7 @@ async function main(): Promise { { whatIsTheGreatFrangooly: () => 'Crowned with Chaos' }, ); - const supervisor = new Supervisor({ id: 'iframe', stream, bootstrap }); + const supervisor = new Supervisor({ id: 'iframe', multiplexer, bootstrap }); console.log(supervisor.evaluate('["Hello", "world!"].join(" ");')); } diff --git a/packages/extension/src/kernel/VatWorkerClient.test.ts b/packages/extension/src/kernel/VatWorkerClient.test.ts index 3b23a2db5..5ff1dd1c5 100644 --- a/packages/extension/src/kernel/VatWorkerClient.test.ts +++ b/packages/extension/src/kernel/VatWorkerClient.test.ts @@ -40,8 +40,8 @@ describe('ExtensionVatWorkerClient', () => { it.each` method - ${VatWorkerServiceCommandMethod.Launch} - ${VatWorkerServiceCommandMethod.Terminate} + ${VatWorkerServiceCommandMethod.launch} + ${VatWorkerServiceCommandMethod.terminate} `( "calls logger.error when receiving a $method reply it wasn't waiting for", async ({ method }) => { @@ -63,7 +63,7 @@ describe('ExtensionVatWorkerClient', () => { }, ); - it(`calls logger.error when receiving a ${VatWorkerServiceCommandMethod.Launch} reply without a port`, async () => { + it(`calls logger.error when receiving a ${VatWorkerServiceCommandMethod.launch} reply without a port`, async () => { const errorSpy = vi.spyOn(clientLogger, 'error'); const vatId: VatId = 'v0'; // eslint-disable-next-line @typescript-eslint/no-floating-promises @@ -71,7 +71,7 @@ describe('ExtensionVatWorkerClient', () => { const reply = { id: 'm1', payload: { - method: VatWorkerServiceCommandMethod.Launch, + method: VatWorkerServiceCommandMethod.launch, params: { vatId: 'v0' }, }, }; diff --git a/packages/extension/src/kernel/VatWorkerClient.ts b/packages/extension/src/kernel/VatWorkerClient.ts index 5ad43edde..84ae3c6f2 100644 --- a/packages/extension/src/kernel/VatWorkerClient.ts +++ b/packages/extension/src/kernel/VatWorkerClient.ts @@ -7,14 +7,12 @@ import { isVatWorkerServiceCommandReply, } from '@ocap/kernel'; import type { - StreamEnvelope, - StreamEnvelopeReply, VatWorkerService, VatId, VatWorkerServiceCommand, } from '@ocap/kernel'; -import type { DuplexStream } from '@ocap/streams'; -import { MessagePortDuplexStream } from '@ocap/streams'; +import type { DuplexStream, MultiplexEnvelope } from '@ocap/streams'; +import { isMultiplexEnvelope, MessagePortDuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeCounter, makeHandledCallback, makeLogger } from '@ocap/utils'; @@ -77,23 +75,23 @@ export class ExtensionVatWorkerClient implements VatWorkerService { async launch( vatId: VatId, - ): Promise> { + ): Promise> { return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.Launch, + method: VatWorkerServiceCommandMethod.launch, params: { vatId }, }); } async terminate(vatId: VatId): Promise { return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.Terminate, + method: VatWorkerServiceCommandMethod.terminate, params: { vatId }, }); } async terminateAll(): Promise { return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.TerminateAll, + method: VatWorkerServiceCommandMethod.terminateAll, params: null, }); } @@ -122,19 +120,20 @@ export class ExtensionVatWorkerClient implements VatWorkerService { } switch (method) { - case VatWorkerServiceCommandMethod.Launch: + case VatWorkerServiceCommandMethod.launch: if (!port) { this.#logger.error('Expected a port with message reply', event); return; } promise.resolve( - MessagePortDuplexStream.make( + new MessagePortDuplexStream( port, + isMultiplexEnvelope, ), ); break; - case VatWorkerServiceCommandMethod.Terminate: - case VatWorkerServiceCommandMethod.TerminateAll: + case VatWorkerServiceCommandMethod.terminate: + case VatWorkerServiceCommandMethod.terminateAll: // If we were caching streams on the client this would be a good place // to remove them. promise.resolve(undefined); diff --git a/packages/extension/src/kernel/VatWorkerServer.test.ts b/packages/extension/src/kernel/VatWorkerServer.test.ts index cb6924a07..365e356a7 100644 --- a/packages/extension/src/kernel/VatWorkerServer.test.ts +++ b/packages/extension/src/kernel/VatWorkerServer.test.ts @@ -76,14 +76,14 @@ describe('ExtensionVatWorkerServer', () => { clientPort.postMessage({ id: 'm0', payload: { - method: VatWorkerServiceCommandMethod.Launch, + method: VatWorkerServiceCommandMethod.launch, params: { vatId }, }, }); clientPort.postMessage({ id: 'm1', payload: { - method: VatWorkerServiceCommandMethod.TerminateAll, + method: VatWorkerServiceCommandMethod.terminateAll, params: null, }, }); @@ -92,7 +92,7 @@ describe('ExtensionVatWorkerServer', () => { expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy.mock.lastCall?.[0]).toBe( - `Error handling ${VatWorkerServiceCommandMethod.TerminateAll} for vatId ${vatId}`, + `Error handling ${VatWorkerServiceCommandMethod.terminateAll} for vatId ${vatId}`, ); expect(errorSpy.mock.lastCall?.[1]).toBe(vatNotFoundError); }); diff --git a/packages/extension/src/kernel/VatWorkerServer.ts b/packages/extension/src/kernel/VatWorkerServer.ts index 4abaa31dd..96c13c05b 100644 --- a/packages/extension/src/kernel/VatWorkerServer.ts +++ b/packages/extension/src/kernel/VatWorkerServer.ts @@ -86,17 +86,17 @@ export class ExtensionVatWorkerServer { }; switch (method) { - case VatWorkerServiceCommandMethod.Launch: + case VatWorkerServiceCommandMethod.launch: await this.#launch(params.vatId) .then((port) => this.#postMessage({ id, payload }, [port])) .catch(async (error) => handleError(error, params.vatId)); break; - case VatWorkerServiceCommandMethod.Terminate: + case VatWorkerServiceCommandMethod.terminate: await this.#terminate(params.vatId) .then(() => this.#postMessage({ id, payload })) .catch(async (error) => handleError(error, params.vatId)); break; - case VatWorkerServiceCommandMethod.TerminateAll: + case VatWorkerServiceCommandMethod.terminateAll: await Promise.all( Array.from(this.#vatWorkers.keys()).map(async (vatId) => this.#terminate(vatId).catch((error) => handleError(error, vatId)), diff --git a/packages/extension/src/kernel/kernel-worker.ts b/packages/extension/src/kernel/kernel-worker.ts index 1ee5fefce..f5c99ebc9 100644 --- a/packages/extension/src/kernel/kernel-worker.ts +++ b/packages/extension/src/kernel/kernel-worker.ts @@ -65,7 +65,7 @@ async function runVatLifecycle( const vatToPing = vats[Math.floor(Math.random() * vats.length)] as VatId; console.time(`Ping Vat "${vatToPing}"`); await kernel.sendMessage(vatToPing, { - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: null, }); console.timeEnd(`Ping Vat "${vatToPing}"`); diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 7ade18203..73f219415 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -1,7 +1,11 @@ import '@ocap/shims/endoify'; import { VatAlreadyExistsError, VatNotFoundError } from '@ocap/errors'; -import type { MessagePortDuplexStream, DuplexStream } from '@ocap/streams'; +import type { + MessagePortDuplexStream, + DuplexStream, + MultiplexEnvelope, +} from '@ocap/streams'; import type { MockInstance } from 'vitest'; import { describe, it, expect, vi, beforeEach } from 'vitest'; @@ -12,7 +16,6 @@ import type { KernelCommandReply, VatCommand, } from './messages/index.js'; -import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; import type { VatId, VatWorkerService } from './types.js'; import { Vat } from './Vat.js'; import { makeMapKVStore } from '../test/storage.js'; @@ -45,9 +48,7 @@ describe('Kernel', () => { launchWorkerMock = vi .spyOn(mockWorkerService, 'launch') - .mockResolvedValue( - {} as DuplexStream, - ); + .mockResolvedValue({} as DuplexStream); terminateWorkerMock = vi .spyOn(mockWorkerService, 'terminate') .mockResolvedValue(undefined); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 8e6d7f31e..a1e34b14e 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -5,6 +5,7 @@ import { VatNotFoundError, toError, } from '@ocap/errors'; +import { StreamMultiplexer } from '@ocap/streams'; import type { DuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, stringify } from '@ocap/utils'; @@ -71,10 +72,10 @@ export class Kernel { let vat: Vat; switch (method) { - case KernelCommandMethod.Ping: + case KernelCommandMethod.ping: await this.#reply({ method, params: 'pong' }); break; - case KernelCommandMethod.Evaluate: + case KernelCommandMethod.evaluate: if (!this.#vats.size) { throw new Error('No vats available to call'); } @@ -84,7 +85,7 @@ export class Kernel { params: await this.evaluate(vat.id, params), }); break; - case KernelCommandMethod.CapTpCall: + case KernelCommandMethod.capTpCall: if (!this.#vats.size) { throw new Error('No vats available to call'); } @@ -94,14 +95,14 @@ export class Kernel { params: stringify(await vat.callCapTp(params)), }); break; - case KernelCommandMethod.KVSet: + case KernelCommandMethod.kvSet: this.kvSet(params.key, params.value); await this.#reply({ method, params: `~~~ set "${params.key}" to "${params.value}" ~~~`, }); break; - case KernelCommandMethod.KVGet: { + case KernelCommandMethod.kvGet: { try { const value = this.kvGet(params); const result = @@ -143,7 +144,7 @@ export class Kernel { async evaluate(vatId: VatId, source: string): Promise { try { const result = await this.sendMessage(vatId, { - method: VatCommandMethod.Evaluate, + method: VatCommandMethod.evaluate, params: source, }); return String(result); @@ -184,7 +185,7 @@ export class Kernel { throw new VatAlreadyExistsError(id); } const stream = await this.#vatWorkerService.launch(id); - const vat = new Vat({ id, stream }); + const vat = new Vat({ id, multiplexer: new StreamMultiplexer(stream) }); this.#vats.set(vat.id, vat); await vat.init(); return vat; diff --git a/packages/kernel/src/Supervisor.test.ts b/packages/kernel/src/Supervisor.test.ts index cfa34482d..2de320b53 100644 --- a/packages/kernel/src/Supervisor.test.ts +++ b/packages/kernel/src/Supervisor.test.ts @@ -1,22 +1,30 @@ import '@ocap/shims/endoify'; +import type { MultiplexEnvelope } from '@ocap/streams'; import { delay } from '@ocap/test-utils'; -import { TestDuplexStream } from '@ocap/test-utils/streams'; +import { TestDuplexStream, TestMultiplexer } from '@ocap/test-utils/streams'; +import { stringify } from '@ocap/utils'; import { describe, it, expect, vi } from 'vitest'; import { VatCommandMethod } from './messages/index.js'; -import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; -import * as streamEnvelope from './stream-envelope.js'; import { Supervisor } from './Supervisor.js'; -const makeSupervisor = async (): Promise<{ +const makeSupervisor = async ( + handleWrite: (input: unknown) => void | Promise = () => undefined, +): Promise<{ supervisor: Supervisor; - stream: TestDuplexStream; + stream: TestDuplexStream; }> => { const stream = await TestDuplexStream.make< - StreamEnvelope, - StreamEnvelopeReply - >(() => undefined); - return { supervisor: new Supervisor({ id: 'test-id', stream }), stream }; + MultiplexEnvelope, + MultiplexEnvelope + >(handleWrite); + return { + supervisor: new Supervisor({ + id: 'test-id', + multiplexer: new TestMultiplexer(stream), + }), + stream, + }; }; describe('Supervisor', () => { @@ -30,28 +38,35 @@ describe('Supervisor', () => { it('throws if the stream throws', async () => { const { supervisor, stream } = await makeSupervisor(); const consoleErrorSpy = vi.spyOn(console, 'error'); - stream.receiveInput(NaN); + await stream.receiveInput(NaN); await delay(10); expect(consoleErrorSpy).toHaveBeenCalledWith( `Unexpected read error from Supervisor "${supervisor.id}"`, - new Error( - 'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull', - ), + expect.any(Error), ); }); }); describe('handleMessage', () => { - it('throws if the stream envelope handler throws', async () => { - const { stream } = await makeSupervisor(); + it('throws if receiving an unexpected message', async () => { + const { supervisor, stream } = await makeSupervisor(); const consoleErrorSpy = vi.spyOn(console, 'error'); - stream.receiveInput({ type: 'command', payload: { method: 'test' } }); + await stream.receiveInput({ + channel: 'command', + payload: { method: 'test' }, + }); await delay(10); expect(consoleErrorSpy).toHaveBeenCalledOnce(); expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Supervisor stream error:', - 'Stream envelope handler received unexpected value', + `Unexpected read error from Supervisor "${supervisor.id}"`, + new Error( + `TestMultiplexer#command: Message failed type validation:\n${stringify( + { + method: 'test', + }, + )}`, + ), ); }); @@ -61,11 +76,11 @@ describe('Supervisor', () => { await supervisor.handleMessage({ id: 'v0:0', - payload: { method: VatCommandMethod.Ping, params: null }, + payload: { method: VatCommandMethod.ping, params: null }, }); expect(replySpy).toHaveBeenCalledWith('v0:0', { - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: 'pong', }); }); @@ -76,22 +91,22 @@ describe('Supervisor', () => { await supervisor.handleMessage({ id: 'v0:0', - payload: { method: VatCommandMethod.CapTpInit, params: null }, + payload: { method: VatCommandMethod.capTpInit, params: null }, }); expect(replySpy).toHaveBeenCalledWith('v0:0', { - method: VatCommandMethod.CapTpInit, + method: VatCommandMethod.capTpInit, params: '~~~ CapTP Initialized ~~~', }); }); it('handles CapTP messages', async () => { - const { supervisor } = await makeSupervisor(); - const wrapCapTpSpy = vi.spyOn(streamEnvelope, 'wrapCapTp'); + const handleWrite = vi.fn(); + const { supervisor } = await makeSupervisor(handleWrite); await supervisor.handleMessage({ id: 'v0:0', - payload: { method: VatCommandMethod.CapTpInit, params: null }, + payload: { method: VatCommandMethod.capTpInit, params: null }, }); const capTpQuestion = { @@ -103,7 +118,7 @@ describe('Supervisor', () => { await delay(10); - const capTpAnswer = { + const capTpPayload = { type: 'CTP_RETURN', epoch: 0, answerID: 'q-1', @@ -112,7 +127,10 @@ describe('Supervisor', () => { slots: [], }, }; - expect(wrapCapTpSpy).toHaveBeenCalledWith(capTpAnswer); + expect(handleWrite).toHaveBeenCalledWith({ + channel: 'capTp', + payload: capTpPayload, + }); }); it('handles Evaluate messages', async () => { @@ -121,11 +139,11 @@ describe('Supervisor', () => { await supervisor.handleMessage({ id: 'v0:0', - payload: { method: VatCommandMethod.Evaluate, params: '2 + 2' }, + payload: { method: VatCommandMethod.evaluate, params: '2 + 2' }, }); expect(replySpy).toHaveBeenCalledWith('v0:0', { - method: VatCommandMethod.Evaluate, + method: VatCommandMethod.evaluate, params: '4', }); }); @@ -138,7 +156,7 @@ describe('Supervisor', () => { await supervisor.handleMessage({ id: 'v0:0', // @ts-expect-error - invalid params type. - payload: { method: VatCommandMethod.Evaluate, params: null }, + payload: { method: VatCommandMethod.evaluate, params: null }, }); expect(replySpy).not.toHaveBeenCalled(); diff --git a/packages/kernel/src/Supervisor.ts b/packages/kernel/src/Supervisor.ts index 4ee979ffa..74a4fd765 100644 --- a/packages/kernel/src/Supervisor.ts +++ b/packages/kernel/src/Supervisor.ts @@ -1,31 +1,26 @@ import { makeCapTP } from '@endo/captp'; +import type { Json } from '@metamask/utils'; import { StreamReadError } from '@ocap/errors'; -import type { DuplexStream } from '@ocap/streams'; +import type { HandledDuplexStream, StreamMultiplexer } from '@ocap/streams'; import { stringify } from '@ocap/utils'; -import type { - CapTpMessage, - VatCommand, - VatCommandReply, -} from './messages/index.js'; -import { VatCommandMethod } from './messages/index.js'; -import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; -import { - makeStreamEnvelopeHandler, - wrapCapTp, - wrapStreamCommandReply, -} from './stream-envelope.js'; +import type { VatCommand, VatCommandReply } from './messages/index.js'; +import { isVatCommand, VatCommandMethod } from './messages/index.js'; type SupervisorConstructorProps = { id: string; - stream: DuplexStream; + multiplexer: StreamMultiplexer; bootstrap?: unknown; }; export class Supervisor { readonly id: string; - readonly #stream: DuplexStream; + readonly #multiplexer: StreamMultiplexer; + + readonly #commandStream: HandledDuplexStream; + + readonly #capTpStream: HandledDuplexStream; readonly #defaultCompartment = new Compartment({ URL }); @@ -33,37 +28,37 @@ export class Supervisor { capTp?: ReturnType; - constructor({ id, stream, bootstrap }: SupervisorConstructorProps) { + constructor({ id, multiplexer, bootstrap }: SupervisorConstructorProps) { this.id = id; this.#bootstrap = bootstrap; - this.#stream = stream; - - const streamEnvelopeHandler = makeStreamEnvelopeHandler( - { - command: this.handleMessage.bind(this), - capTp: async (content) => this.capTp?.dispatch(content), - }, - (error) => console.error('Supervisor stream error:', error), + this.#multiplexer = multiplexer; + this.#commandStream = multiplexer.addChannel( + 'command', + isVatCommand, + this.handleMessage.bind(this), + ); + this.#capTpStream = multiplexer.addChannel( + 'capTp', + // The streams already enforce that the values are JSON. + (_value): _value is Json => true, + // eslint-disable-next-line no-void + async (content): Promise => void this.capTp?.dispatch(content), ); - this.#stream - .drain(async (value) => { - await streamEnvelopeHandler.handle(value); - }) - .catch((error) => { - console.error( - `Unexpected read error from Supervisor "${this.id}"`, - error, - ); - throw new StreamReadError({ supervisorId: this.id }, error); - }); + multiplexer.drainAll().catch((error) => { + console.error( + `Unexpected read error from Supervisor "${this.id}"`, + error, + ); + throw new StreamReadError({ supervisorId: this.id }, error); + }); } /** * Terminates the Supervisor. */ async terminate(): Promise { - await this.#stream.return(); + await this.#multiplexer.return(); } /** @@ -75,7 +70,7 @@ export class Supervisor { */ async handleMessage({ id, payload }: VatCommand): Promise { switch (payload.method) { - case VatCommandMethod.Evaluate: { + case VatCommandMethod.evaluate: { if (typeof payload.params !== 'string') { console.error( 'Supervisor received command with unexpected params', @@ -86,27 +81,26 @@ export class Supervisor { } const result = this.evaluate(payload.params); await this.replyToMessage(id, { - method: VatCommandMethod.Evaluate, + method: VatCommandMethod.evaluate, params: stringify(result), }); break; } - case VatCommandMethod.CapTpInit: { + case VatCommandMethod.capTpInit: { this.capTp = makeCapTP( 'iframe', - async (content: unknown) => - this.#stream.write(wrapCapTp(content as CapTpMessage)), + async (content: Json) => this.#capTpStream.write(content), this.#bootstrap, ); await this.replyToMessage(id, { - method: VatCommandMethod.CapTpInit, + method: VatCommandMethod.capTpInit, params: '~~~ CapTP Initialized ~~~', }); break; } - case VatCommandMethod.Ping: + case VatCommandMethod.ping: await this.replyToMessage(id, { - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: 'pong', }); break; @@ -126,10 +120,11 @@ export class Supervisor { * @param payload - The payload to reply with. */ async replyToMessage( - id: VatCommand['id'], + id: VatCommandReply['id'], payload: VatCommandReply['payload'], ): Promise { - await this.#stream.write(wrapStreamCommandReply({ id, payload })); + console.log('replyToMessage', id, payload); + await this.#commandStream.write({ id, payload }); } /** diff --git a/packages/kernel/src/Vat.test.ts b/packages/kernel/src/Vat.test.ts index d7d31cab1..4400c2767 100644 --- a/packages/kernel/src/Vat.test.ts +++ b/packages/kernel/src/Vat.test.ts @@ -3,20 +3,15 @@ import { VatCapTpConnectionExistsError, VatCapTpConnectionNotFoundError, } from '@ocap/errors'; +import type { MultiplexEnvelope } from '@ocap/streams'; import { delay, makePromiseKitMock } from '@ocap/test-utils'; -import { TestDuplexStream } from '@ocap/test-utils/streams'; -import { stringify } from '@ocap/utils'; +import { TestDuplexStream, TestMultiplexer } from '@ocap/test-utils/streams'; +import { makeLogger, stringify } from '@ocap/utils'; +import type { Logger } from '@ocap/utils'; import { describe, it, expect, vi } from 'vitest'; import { VatCommandMethod } from './messages/index.js'; -import type { - CapTpMessage, - VatCommand, - VatCommandReply, -} from './messages/index.js'; -import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; -import * as streamEnvelope from './stream-envelope.js'; -import { makeStreamEnvelopeReplyHandler } from './stream-envelope.js'; +import type { VatCommand, VatCommandReply } from './messages/index.js'; import { Vat } from './Vat.js'; vi.mock('@endo/eventual-send', () => ({ @@ -27,15 +22,24 @@ vi.mock('@endo/eventual-send', () => ({ }), })); -const makeVat = async (): Promise<{ +const makeVat = async ( + logger?: Logger, +): Promise<{ vat: Vat; - stream: TestDuplexStream; + stream: TestDuplexStream; }> => { const stream = await TestDuplexStream.make< - StreamEnvelopeReply, - StreamEnvelope + MultiplexEnvelope, + MultiplexEnvelope >(() => undefined); - return { vat: new Vat({ id: 'v0', stream }), stream }; + return { + vat: new Vat({ + id: 'v0', + multiplexer: new TestMultiplexer(stream), + logger, + }), + stream, + }; }; describe('Vat', () => { @@ -52,7 +56,7 @@ describe('Vat', () => { await vat.init(); expect(sendMessageMock).toHaveBeenCalledWith({ - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: null, }); expect(capTpMock).toHaveBeenCalled(); @@ -63,14 +67,12 @@ describe('Vat', () => { vi.spyOn(vat, 'sendMessage').mockResolvedValueOnce(undefined); vi.spyOn(vat, 'makeCapTp').mockResolvedValueOnce(undefined); await vat.init(); - const consoleErrorSpy = vi.spyOn(vat.logger, 'error'); - stream.receiveInput(NaN); + const logErrorSpy = vi.spyOn(vat.logger, 'error'); + await stream.receiveInput(NaN); await delay(10); - expect(consoleErrorSpy).toHaveBeenCalledWith( + expect(logErrorSpy).toHaveBeenCalledWith( 'Unexpected read error', - new Error( - 'TestDuplexStream: Message cannot be processed (must be JSON-serializable):\nnull', - ), + expect.any(Error), ); }); }); @@ -79,7 +81,7 @@ describe('Vat', () => { it('sends a message and resolves the promise', async () => { const { vat } = await makeVat(); const mockMessage = { - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: null, } as VatCommand['payload']; const sendMessagePromise = vat.sendMessage(mockMessage); @@ -89,26 +91,12 @@ describe('Vat', () => { }); }); - describe('#receiveMessages', () => { - it('receives messages correctly', async () => { - const { vat, stream } = await makeVat(); - vi.spyOn(vat, 'sendMessage').mockResolvedValueOnce(undefined); - vi.spyOn(vat, 'makeCapTp').mockResolvedValueOnce(undefined); - const handleSpy = vi.spyOn(vat.streamEnvelopeReplyHandler, 'handle'); - await vat.init(); - const rawMessage = { type: 'command', payload: { method: 'test' } }; - stream.receiveInput(rawMessage); - await delay(10); - expect(handleSpy).toHaveBeenCalledWith(rawMessage); - }); - }); - describe('handleMessage', () => { it('resolves the payload when the message id exists in unresolvedMessages', async () => { const { vat } = await makeVat(); const mockMessageId = 'v0:1'; const mockPayload: VatCommandReply['payload'] = { - method: VatCommandMethod.Evaluate, + method: VatCommandMethod.evaluate, params: 'test-response', }; const mockPromiseKit = { resolve: vi.fn(), reject: vi.fn() }; @@ -120,11 +108,11 @@ describe('Vat', () => { it('logs an error when the message id does not exist in unresolvedMessages', async () => { const { vat } = await makeVat(); - const consoleErrorSpy = vi.spyOn(console, 'error'); + const logErrorSpy = vi.spyOn(vat.logger, 'error'); const nonExistentMessageId = 'v0:9'; const mockPayload: VatCommandReply['payload'] = { - method: VatCommandMethod.Ping, + method: VatCommandMethod.ping, params: 'pong', }; @@ -133,10 +121,10 @@ describe('Vat', () => { payload: mockPayload, }); - expect(consoleErrorSpy).toHaveBeenCalledWith( + expect(logErrorSpy).toHaveBeenCalledWith( `No unresolved message with id "${nonExistentMessageId}".`, ); - consoleErrorSpy.mockRestore(); + logErrorSpy.mockRestore(); }); }); @@ -170,44 +158,37 @@ describe('Vat', () => { it('creates a CapTP connection and sends CapTpInit message', async () => { const { vat } = await makeVat(); - // @ts-expect-error - streamEnvelopeReplyHandler is readonly - vat.streamEnvelopeReplyHandler = makeStreamEnvelopeReplyHandler( - {}, - console.warn, - ); const sendMessageMock = vi .spyOn(vat, 'sendMessage') .mockResolvedValueOnce(undefined); await vat.makeCapTp(); - expect( - vat.streamEnvelopeReplyHandler.contentHandlers.capTp, - ).toBeDefined(); expect(sendMessageMock).toHaveBeenCalledWith({ - method: VatCommandMethod.CapTpInit, + method: VatCommandMethod.capTpInit, params: null, }); }); it('handles CapTP messages', async () => { - const { vat } = await makeVat(); - vi.spyOn(vat, 'sendMessage').mockResolvedValueOnce(undefined); - const wrapCapTpSpy = vi.spyOn(streamEnvelope, 'wrapCapTp'); - const consoleLogSpy = vi.spyOn(vat.logger, 'log'); + const logger = makeLogger('[test]'); + const logSpy = vi.spyOn(logger, 'log'); + const { vat, stream } = await makeVat(logger); + vi.spyOn(vat, 'sendMessage') + .mockResolvedValueOnce(undefined) + .mockResolvedValueOnce(undefined); - await vat.makeCapTp(); + await vat.init(); - const capTpQuestion = { + const capTpPayload = { type: 'CTP_BOOTSTRAP', epoch: 0, questionID: 'q-1', }; - await vat.streamEnvelopeReplyHandler.contentHandlers.capTp?.( - capTpQuestion as CapTpMessage, - ); + await stream.receiveInput({ channel: 'capTp', payload: capTpPayload }); + await delay(10); - expect(consoleLogSpy).toHaveBeenCalledWith( + expect(logSpy).toHaveBeenCalledWith( 'CapTP from vat', - stringify(capTpQuestion), + stringify(capTpPayload), ); const capTpAnswer = { @@ -219,7 +200,11 @@ describe('Vat', () => { slots: [], }, }; - expect(wrapCapTpSpy).toHaveBeenCalledWith(capTpAnswer); + + expect(logSpy).toHaveBeenLastCalledWith( + 'CapTP to vat', + stringify(capTpAnswer), + ); }); }); diff --git a/packages/kernel/src/Vat.ts b/packages/kernel/src/Vat.ts index 49b88fdf5..d00237f67 100644 --- a/packages/kernel/src/Vat.ts +++ b/packages/kernel/src/Vat.ts @@ -1,44 +1,39 @@ import { makeCapTP } from '@endo/captp'; import { E } from '@endo/eventual-send'; import { makePromiseKit } from '@endo/promise-kit'; +import type { Json } from '@metamask/utils'; import { VatCapTpConnectionExistsError, VatCapTpConnectionNotFoundError, VatDeletedError, StreamReadError, } from '@ocap/errors'; -import type { DuplexStream, Reader } from '@ocap/streams'; +import type { HandledDuplexStream, StreamMultiplexer } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, makeCounter, stringify } from '@ocap/utils'; -import { VatCommandMethod } from './messages/index.js'; +import { isVatCommandReply, VatCommandMethod } from './messages/index.js'; import type { - CapTpMessage, CapTpPayload, VatCommandReply, VatCommand, } from './messages/index.js'; -import type { - StreamEnvelope, - StreamEnvelopeReply, - StreamEnvelopeReplyHandler, -} from './stream-envelope.js'; -import { - makeStreamEnvelopeReplyHandler, - wrapCapTp, - wrapStreamCommand, -} from './stream-envelope.js'; import type { PromiseCallbacks, VatId } from './types.js'; type VatConstructorProps = { id: VatId; - stream: DuplexStream; + multiplexer: StreamMultiplexer; + logger?: Logger | undefined; }; export class Vat { readonly id: VatConstructorProps['id']; - readonly #stream: VatConstructorProps['stream']; + readonly #multiplexer: StreamMultiplexer; + + readonly #commandStream: HandledDuplexStream; + + readonly #capTpStream: HandledDuplexStream; readonly logger: Logger; @@ -47,18 +42,26 @@ export class Vat { readonly unresolvedMessages: Map = new Map(); - readonly streamEnvelopeReplyHandler: StreamEnvelopeReplyHandler; - capTp?: ReturnType; - constructor({ id, stream }: VatConstructorProps) { + constructor({ id, multiplexer, logger }: VatConstructorProps) { this.id = id; - this.#stream = stream; - this.logger = makeLogger(`[vat ${id}]`); + this.logger = logger ?? makeLogger(`[vat ${id}]`); this.#messageCounter = makeCounter(); - this.streamEnvelopeReplyHandler = makeStreamEnvelopeReplyHandler( - { command: this.handleMessage.bind(this) }, - (error) => console.error('Vat stream error:', error), + this.#multiplexer = multiplexer; + this.#commandStream = multiplexer.addChannel( + 'command', + isVatCommandReply, + this.handleMessage.bind(this), + ); + this.#capTpStream = multiplexer.addChannel( + 'capTp', + // The streams already enforce that the values are JSON. + (_value): _value is Json => true, + async (content): Promise => { + this.logger.log('CapTP from vat', stringify(content)); + this.capTp?.dispatch(content); + }, ); } @@ -72,7 +75,7 @@ export class Vat { async handleMessage({ id, payload }: VatCommandReply): Promise { const promiseCallbacks = this.unresolvedMessages.get(id); if (promiseCallbacks === undefined) { - console.error(`No unresolved message with id "${id}".`); + this.logger.error(`No unresolved message with id "${id}".`); } else { this.unresolvedMessages.delete(id); promiseCallbacks.resolve(payload.params); @@ -86,29 +89,17 @@ export class Vat { */ async init(): Promise { /* v8 ignore next 4: Not known to be possible. */ - this.#receiveMessages(this.#stream).catch((error) => { + this.#multiplexer.drainAll().catch((error) => { this.logger.error(`Unexpected read error`, error); throw new StreamReadError({ vatId: this.id }, error); }); - await this.sendMessage({ method: VatCommandMethod.Ping, params: null }); + await this.sendMessage({ method: VatCommandMethod.ping, params: null }); this.logger.debug('Created'); return await this.makeCapTp(); } - /** - * Receives messages from a vat. - * - * @param reader - The reader for the messages. - */ - async #receiveMessages(reader: Reader): Promise { - for await (const rawMessage of reader) { - this.logger.debug('Vat received message', rawMessage); - await this.streamEnvelopeReplyHandler.handle(rawMessage); - } - } - /** * Make a CapTP connection. * @@ -119,22 +110,15 @@ export class Vat { throw new VatCapTpConnectionExistsError(this.id); } - // Handle writes here. #receiveMessages() handles reads. - const ctp = makeCapTP(this.id, async (content: unknown) => { + const ctp = makeCapTP(this.id, async (content: Json) => { this.logger.log('CapTP to vat', stringify(content)); - await this.#stream.write(wrapCapTp(content as CapTpMessage)); + await this.#capTpStream.write(content); }); this.capTp = ctp; - this.streamEnvelopeReplyHandler.contentHandlers.capTp = async ( - content: CapTpMessage, - ) => { - this.logger.log('CapTP from vat', stringify(content)); - ctp.dispatch(content); - }; return this.sendMessage({ - method: VatCommandMethod.CapTpInit, + method: VatCommandMethod.capTpInit, params: null, }); } @@ -156,7 +140,7 @@ export class Vat { * Terminates the vat. */ async terminate(): Promise { - await this.#stream.return(); + await this.#multiplexer.return(); // Handle orphaned messages for (const [messageId, promiseCallback] of this.unresolvedMessages) { @@ -176,7 +160,7 @@ export class Vat { const { promise, reject, resolve } = makePromiseKit(); const messageId = this.#nextMessageId(); this.unresolvedMessages.set(messageId, { reject, resolve }); - await this.#stream.write(wrapStreamCommand({ id: messageId, payload })); + await this.#commandStream.write({ id: messageId, payload }); return promise; } diff --git a/packages/kernel/src/index.test.ts b/packages/kernel/src/index.test.ts index c11c0bd83..543c6417f 100644 --- a/packages/kernel/src/index.test.ts +++ b/packages/kernel/src/index.test.ts @@ -5,16 +5,19 @@ import * as indexModule from './index.js'; describe('index', () => { it('has the expected exports', () => { - expect(Object.keys(indexModule)).toStrictEqual( - expect.arrayContaining( - ['Kernel', 'Vat'].concat( - ['Cluster', 'Kernel', 'Vat', 'VatWorkerService'].flatMap((value) => [ - `${value}CommandMethod`, - `is${value}Command`, - `is${value}CommandReply`, - ]), - ), - ), - ); + expect(Object.keys(indexModule).sort()).toStrictEqual([ + 'Kernel', + 'KernelCommandMethod', + 'Supervisor', + 'Vat', + 'VatCommandMethod', + 'VatWorkerServiceCommandMethod', + 'isKernelCommand', + 'isKernelCommandReply', + 'isVatCommand', + 'isVatCommandReply', + 'isVatWorkerServiceCommand', + 'isVatWorkerServiceCommandReply', + ]); }); }); diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts index e6fc58cb9..6d4566618 100644 --- a/packages/kernel/src/index.ts +++ b/packages/kernel/src/index.ts @@ -3,5 +3,4 @@ export { Kernel } from './Kernel.js'; export type { KVStore } from './kernel-store.js'; export { Vat } from './Vat.js'; export { Supervisor } from './Supervisor.js'; -export type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; export type { VatId, VatWorkerService } from './types.js'; diff --git a/packages/kernel/src/messages/captp.test.ts b/packages/kernel/src/messages/captp.test.ts deleted file mode 100644 index 3a49222ea..000000000 --- a/packages/kernel/src/messages/captp.test.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import { isCapTpMessage, isCapTpPayload } from './captp.js'; - -describe('isCapTpMessage', () => { - it.each` - value | expectedResult | description - ${{ type: 'CTP_CALL', epoch: 0 }} | ${true} | ${'valid type with numerical epoch'} - ${{ type: 'CTP_CALL' }} | ${false} | ${'missing epoch'} - `('returns $expectedResult for $description', ({ value, expectedResult }) => { - expect(isCapTpMessage(value)).toBe(expectedResult); - }); -}); - -describe('isCapTpPayload', () => { - it.each` - value | expectedResult | description - ${{ method: 'foo', params: [0, 'bar', false] }} | ${true} | ${'valid command with string data'} - ${{ method: 'foo' }} | ${false} | ${'no params'} - ${{ method: 'foo', params: 'bar' }} | ${false} | ${'nonarray params'} - `('returns $expectedResult for $description', ({ value, expectedResult }) => { - expect(isCapTpPayload(value)).toBe(expectedResult); - }); -}); diff --git a/packages/kernel/src/messages/captp.ts b/packages/kernel/src/messages/captp.ts deleted file mode 100644 index 9f0d7d9fe..000000000 --- a/packages/kernel/src/messages/captp.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type { Json } from '@metamask/utils'; -import { isObject } from '@metamask/utils'; - -export type CapTpPayload = { - method: string; - params: Json[]; -}; - -export const isCapTpPayload = (value: unknown): value is CapTpPayload => - isObject(value) && - typeof value.method === 'string' && - Array.isArray(value.params); - -export type CapTpMessage = { - type: Type; - epoch: number; - [key: string]: Json; -}; - -export const isCapTpMessage = (value: unknown): value is CapTpMessage => - isObject(value) && - typeof value.type === 'string' && - value.type.startsWith('CTP_') && - typeof value.epoch === 'number'; diff --git a/packages/kernel/src/messages/cluster.ts b/packages/kernel/src/messages/cluster.ts deleted file mode 100644 index c98524af0..000000000 --- a/packages/kernel/src/messages/cluster.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { TypeGuard } from '@ocap/utils'; - -import { kernelCommand } from './kernel.js'; -import { makeMessageKit } from './message-kit.js'; - -const clusterCommand = { - ...kernelCommand, -}; - -const clusterCommandKit = makeMessageKit(clusterCommand); - -export const ClusterCommandMethod = clusterCommandKit.methods; - -export type ClusterCommand = typeof clusterCommandKit.send; -export const isClusterCommand: TypeGuard = - clusterCommandKit.sendGuard; - -export type ClusterCommandReply = typeof clusterCommandKit.reply; -export const isClusterCommandReply: TypeGuard = - clusterCommandKit.replyGuard; diff --git a/packages/kernel/src/messages/index.ts b/packages/kernel/src/messages/index.ts index 8d2e9d511..7cb1fb05b 100644 --- a/packages/kernel/src/messages/index.ts +++ b/packages/kernel/src/messages/index.ts @@ -1,17 +1,3 @@ -// CapTP. - -export { isCapTpPayload, isCapTpMessage } from './captp.js'; -export type { CapTpPayload, CapTpMessage } from './captp.js'; - -// Cluster commands. - -export { - ClusterCommandMethod, - isClusterCommand, - isClusterCommandReply, -} from './cluster.js'; -export type { ClusterCommand, ClusterCommandReply } from './cluster.js'; - // Kernel commands. export { @@ -19,7 +5,11 @@ export { isKernelCommand, isKernelCommandReply, } from './kernel.js'; -export type { KernelCommand, KernelCommandReply } from './kernel.js'; +export type { + CapTpPayload, + KernelCommand, + KernelCommandReply, +} from './kernel.js'; // Vat commands. diff --git a/packages/kernel/src/messages/kernel.test.ts b/packages/kernel/src/messages/kernel.test.ts index 6e8f2a3d6..ab95c0440 100644 --- a/packages/kernel/src/messages/kernel.test.ts +++ b/packages/kernel/src/messages/kernel.test.ts @@ -9,12 +9,12 @@ import { describe('isKernelCommand', () => { it.each` value | expectedResult | description - ${{ method: KernelCommandMethod.KVGet, params: 'data' }} | ${true} | ${'valid command with string data'} - ${{ method: KernelCommandMethod.KVSet, params: { key: 'foo', value: 'bar' } }} | ${true} | ${'valid command with object data'} + ${{ method: KernelCommandMethod.kvGet, params: 'data' }} | ${true} | ${'valid command with string data'} + ${{ method: KernelCommandMethod.kvSet, params: { key: 'foo', value: 'bar' } }} | ${true} | ${'valid command with object data'} ${123} | ${false} | ${'invalid command: primitive number'} ${{ method: true, params: 'data' }} | ${false} | ${'invalid command: invalid type'} - ${{ method: KernelCommandMethod.KVSet }} | ${false} | ${'invalid command: missing data'} - ${{ method: KernelCommandMethod.KVSet, params: 123 }} | ${false} | ${'invalid command: data is a primitive number'} + ${{ method: KernelCommandMethod.kvSet }} | ${false} | ${'invalid command: missing data'} + ${{ method: KernelCommandMethod.kvSet, params: 123 }} | ${false} | ${'invalid command: data is a primitive number'} ${{ method: 123, params: null }} | ${false} | ${'invalid command: invalid type and valid data'} ${{ method: 'some-type', params: true }} | ${false} | ${'invalid command: valid type and invalid data'} `('returns $expectedResult for $description', ({ value, expectedResult }) => { @@ -25,12 +25,12 @@ describe('isKernelCommand', () => { describe('isKernelCommandReply', () => { it.each` value | expectedResult | description - ${{ method: KernelCommandMethod.KVGet, params: 'foo' }} | ${true} | ${'valid command reply with string data'} - ${{ method: KernelCommandMethod.KVGet, params: null }} | ${false} | ${'invalid command reply: with null data'} + ${{ method: KernelCommandMethod.kvGet, params: 'foo' }} | ${true} | ${'valid command reply with string data'} + ${{ method: KernelCommandMethod.kvGet, params: null }} | ${false} | ${'invalid command reply: with null data'} ${123} | ${false} | ${'invalid command reply: primitive number'} ${{ method: true, params: 'data' }} | ${false} | ${'invalid command reply: invalid type'} - ${{ method: KernelCommandMethod.KVSet }} | ${false} | ${'invalid command reply: missing data'} - ${{ method: KernelCommandMethod.KVSet, params: 123 }} | ${false} | ${'invalid command reply: data is a primitive number'} + ${{ method: KernelCommandMethod.kvSet }} | ${false} | ${'invalid command reply: missing data'} + ${{ method: KernelCommandMethod.kvSet, params: 123 }} | ${false} | ${'invalid command reply: data is a primitive number'} ${{ method: 123, params: null }} | ${false} | ${'invalid command reply: invalid type and valid data'} ${{ method: 'some-type', params: true }} | ${false} | ${'invalid command reply: valid type and invalid data'} `('returns $expectedResult for $description', ({ value, expectedResult }) => { diff --git a/packages/kernel/src/messages/kernel.ts b/packages/kernel/src/messages/kernel.ts index 6ec8eb690..6edba84b5 100644 --- a/packages/kernel/src/messages/kernel.ts +++ b/packages/kernel/src/messages/kernel.ts @@ -1,41 +1,77 @@ -import { isObject } from '@metamask/utils'; +import { + object, + union, + literal, + string, + is, + array, +} from '@metamask/superstruct'; +import type { Infer } from '@metamask/superstruct'; +import { UnsafeJsonStruct } from '@metamask/utils'; import type { TypeGuard } from '@ocap/utils'; -import type { CapTpPayload } from './captp.js'; -import { isCapTpPayload } from './captp.js'; -import { makeMessageKit, messageType } from './message-kit.js'; -import { vatTestCommand } from './vat-test.js'; +import { + VatTestCommandMethod, + VatTestMethodStructs, + VatTestReplyStructs, +} from './vat.js'; -export const kernelCommand = { - CapTpCall: messageType( - (send) => isCapTpPayload(send), - (reply) => typeof reply === 'string', - ), +export const KernelCommandMethod = { + evaluate: VatTestCommandMethod.evaluate, + capTpCall: 'capTpCall', + kvSet: 'kvSet', + kvGet: 'kvGet', + ping: VatTestCommandMethod.ping, +} as const; - KVSet: messageType<{ key: string; value: string }, string>( - (send) => - isObject(send) && - typeof send.key === 'string' && - typeof send.value === 'string', - (reply) => typeof reply === 'string', - ), +const CapTpPayloadStruct = object({ + method: string(), + params: array(UnsafeJsonStruct), +}); - KVGet: messageType( - (send) => typeof send === 'string', - (reply) => typeof reply === 'string', - ), +export type CapTpPayload = Infer; - ...vatTestCommand, -}; +const KernelCommandStruct = union([ + object({ + method: literal(KernelCommandMethod.capTpCall), + params: CapTpPayloadStruct, + }), + object({ + method: literal(KernelCommandMethod.kvSet), + params: object({ key: string(), value: string() }), + }), + object({ + method: literal(KernelCommandMethod.kvGet), + params: string(), + }), + VatTestMethodStructs.evaluate, + VatTestMethodStructs.ping, +]); -const kernelCommandKit = makeMessageKit(kernelCommand); +const KernelCommandReplyStruct = union([ + object({ + method: literal(KernelCommandMethod.capTpCall), + params: string(), + }), + object({ + method: literal(KernelCommandMethod.kvSet), + params: string(), + }), + object({ + method: literal(KernelCommandMethod.kvGet), + params: string(), + }), + VatTestReplyStructs.evaluate, + VatTestReplyStructs.ping, +]); -export const KernelCommandMethod = kernelCommandKit.methods; +export type KernelCommand = Infer; +export type KernelCommandReply = Infer; -export type KernelCommand = typeof kernelCommandKit.send; -export const isKernelCommand: TypeGuard = - kernelCommandKit.sendGuard; +export const isKernelCommand: TypeGuard = ( + value: unknown, +): value is KernelCommand => is(value, KernelCommandStruct); -export type KernelCommandReply = typeof kernelCommandKit.reply; -export const isKernelCommandReply: TypeGuard = - kernelCommandKit.replyGuard; +export const isKernelCommandReply: TypeGuard = ( + value: unknown, +): value is KernelCommandReply => is(value, KernelCommandReplyStruct); diff --git a/packages/kernel/src/messages/message-kit.ts b/packages/kernel/src/messages/message-kit.ts deleted file mode 100644 index 841b843f3..000000000 --- a/packages/kernel/src/messages/message-kit.ts +++ /dev/null @@ -1,140 +0,0 @@ -import '@ocap/shims/endoify'; - -import { isObject } from '@metamask/utils'; -import type { Json } from '@metamask/utils'; -import type { ExtractGuardType, TypeGuard } from '@ocap/utils'; - -import { isMessageLike } from './message.js'; -import { uncapitalize } from './utils.js'; - -// Message kit. - -export type BoolExpr = (value: unknown) => boolean; - -export type SourceLike = Record; - -type MessageUnion = { - [Key in keyof Source]: Key extends string - ? { - method: Uncapitalize; - params: ExtractGuardType; - } - : never; -}[keyof Source]; - -export type Send = MessageUnion; - -export type Reply = MessageUnion; - -/** - * A typescript utility used to reduce boilerplate in message type declarations. - * - * @param sendGuard - A boolean expression that returns true for SendType values. - * @param replyGuard - A boolean expression that returns true for ReplyType values. - * @returns A pair of type guards. - */ -export const messageType = ( - sendGuard: BoolExpr, - replyGuard: BoolExpr, -): [TypeGuard, TypeGuard] => [ - (val): val is SendType => sendGuard(val), - (val): val is ReplyType => replyGuard(val), -]; - -type Methods = { - [Key in keyof Source]: Key extends string ? Uncapitalize : never; -}; - -const makeMethods = ( - source: Source, -): Methods => { - return Object.fromEntries( - Object.keys(source).map((key) => [key, uncapitalize(key)]), - ) as Methods; -}; - -const makeGuard = ( - source: Source, - methods: Methods, - index: Index, -): TypeGuard> => { - const guards = Object.fromEntries( - Object.entries(source).map(([key, value]) => [ - uncapitalize(key), - value[index], - ]), - ) as Record>; - - return (value: unknown): value is MessageUnion => - isMessageLike(value) && - Object.values(methods).includes(value.method) && - guards[value.method as keyof typeof guards](value.params); -}; - -/** - * An object type encapsulating all of the schematics that define a functional - * group of messages. - */ -export type MessageKit = { - source: Source; - methods: Methods; - send: Send; - sendGuard: TypeGuard>; - reply: Reply; - replyGuard: TypeGuard>; -}; - -export const makeMessageKit = ( - source: Source, -): MessageKit => { - const methods = makeMethods(source); - - return { - source, - methods, - sendGuard: makeGuard(source, methods, 0), - replyGuard: makeGuard(source, methods, 1), - } as MessageKit; -}; - -/** - * An object type encapsulating all of the schematics that define a functional - * group of messages as a payload wrapped with a message id. - */ -type IdentifiedMessageKit< - Source extends SourceLike, - MessageId extends string, -> = { - source: Source; - methods: Methods; - send: { id: MessageId; payload: Send }; - sendGuard: TypeGuard<{ id: MessageId; payload: Send }>; - reply: { id: MessageId; payload: Reply }; - replyGuard: TypeGuard<{ id: MessageId; payload: Reply }>; -}; - -export const makeIdentifiedMessageKit = < - Source extends SourceLike, - MessageId extends string, ->({ - source, - isMessageId, -}: { - source: Source; - isMessageId: TypeGuard; -}): IdentifiedMessageKit => { - const messageKit = makeMessageKit(source); - - return { - source: messageKit.source, - methods: messageKit.methods, - sendGuard: (value: unknown) => - isObject(value) && - isMessageId(value.id) && - messageKit.sendGuard(value.payload), - replyGuard: (value: unknown) => - isObject(value) && - isMessageId(value.id) && - messageKit.replyGuard(value.payload), - } as IdentifiedMessageKit; -}; diff --git a/packages/kernel/src/messages/message.ts b/packages/kernel/src/messages/message.ts deleted file mode 100644 index cd0c2ca10..000000000 --- a/packages/kernel/src/messages/message.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { is } from '@metamask/superstruct'; -import type { Json } from '@metamask/utils'; -import { isObject, UnsafeJsonStruct } from '@metamask/utils'; - -import { uncapitalize } from './utils.js'; - -const isJsonUnsafe = (value: unknown): value is Json => - is(value, UnsafeJsonStruct); - -export type MessageLike = { method: Uncapitalize; params: Json }; - -const isMethodLike = (value: unknown): value is Uncapitalize => - typeof value === 'string' && uncapitalize(value) === value; - -export const isMessageLike = (value: unknown): value is MessageLike => - isObject(value) && isMethodLike(value.method) && isJsonUnsafe(value.params); diff --git a/packages/kernel/src/messages/utils.ts b/packages/kernel/src/messages/utils.ts deleted file mode 100644 index ca9d7b510..000000000 --- a/packages/kernel/src/messages/utils.ts +++ /dev/null @@ -1,4 +0,0 @@ -// Uncapitalize. - -export const uncapitalize = (value: string): Uncapitalize => - (value.at(0)?.toLowerCase() + value.slice(1)) as Uncapitalize; diff --git a/packages/kernel/src/messages/vat-test.ts b/packages/kernel/src/messages/vat-test.ts deleted file mode 100644 index 74f257090..000000000 --- a/packages/kernel/src/messages/vat-test.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { messageType } from './message-kit.js'; - -export const vatTestCommand = { - Evaluate: messageType( - (send) => typeof send === 'string', - (reply) => typeof reply === 'string', - ), - - Ping: messageType( - (send) => send === null, - (reply) => reply === 'pong', - ), -}; diff --git a/packages/kernel/src/messages/vat-worker-service.test.ts b/packages/kernel/src/messages/vat-worker-service.test.ts index 0989dd5b2..7a2dc5403 100644 --- a/packages/kernel/src/messages/vat-worker-service.test.ts +++ b/packages/kernel/src/messages/vat-worker-service.test.ts @@ -15,15 +15,15 @@ import { import type { VatId } from '../types.js'; const launchPayload: VatWorkerServiceCommandReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.Launch, + method: VatWorkerServiceCommandMethod.launch, params: { vatId: 'v0' }, }); const terminatePayload: VatWorkerServiceCommandReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.Terminate, + method: VatWorkerServiceCommandMethod.terminate, params: { vatId: 'v0' }, }); const terminateAllPayload: VatWorkerServiceCommandReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.TerminateAll, + method: VatWorkerServiceCommandMethod.terminateAll, params: null, }); diff --git a/packages/kernel/src/messages/vat-worker-service.ts b/packages/kernel/src/messages/vat-worker-service.ts index 1d9f4355f..dabf75cd5 100644 --- a/packages/kernel/src/messages/vat-worker-service.ts +++ b/packages/kernel/src/messages/vat-worker-service.ts @@ -1,65 +1,72 @@ -import { hasProperty, isObject } from '@metamask/utils'; -import { isMarshaledError } from '@ocap/errors'; -import type { MarshaledError } from '@ocap/errors'; +import { object, union, optional, is, literal } from '@metamask/superstruct'; +import type { Infer } from '@metamask/superstruct'; +import { MarshaledErrorStruct } from '@ocap/errors'; import type { TypeGuard } from '@ocap/utils'; -import { makeIdentifiedMessageKit, messageType } from './message-kit.js'; -import type { VatId } from '../types.js'; -import { isVatId } from '../types.js'; +import { VatIdStruct, VatMessageIdStruct } from '../types.js'; -const hasOptionalMarshaledError = (value: object): boolean => - !hasProperty(value, 'error') || isMarshaledError(value.error); +export const VatWorkerServiceCommandMethod = { + launch: 'launch', + terminate: 'terminate', + terminateAll: 'terminateAll', +} as const; -export const vatWorkerServiceCommand = { - Launch: messageType< - { vatId: VatId }, - { vatId: VatId; error?: MarshaledError } - >( - (send) => isObject(send) && isVatId(send.vatId), - (reply) => - isObject(reply) && - isVatId(reply.vatId) && - hasOptionalMarshaledError(reply), - ), - - Terminate: messageType< - { vatId: VatId }, - { vatId: VatId; error?: MarshaledError } - >( - (send) => isObject(send) && isVatId(send.vatId), - (reply) => - isObject(reply) && - isVatId(reply.vatId) && - hasOptionalMarshaledError(reply), - ), - - TerminateAll: messageType< - null, - null | { vatId?: VatId; error: MarshaledError } - >( - (send) => send === null, - (reply) => - reply === null || - (isObject(reply) && - isMarshaledError(reply.error) && - (!hasProperty(reply, 'vatId') || isVatId(reply.vatId))), - ), -}; +const VatWorkerServiceCommandStruct = object({ + id: VatMessageIdStruct, + payload: union([ + object({ + method: literal(VatWorkerServiceCommandMethod.launch), + params: object({ vatId: VatIdStruct }), + }), + object({ + method: literal(VatWorkerServiceCommandMethod.terminate), + params: object({ vatId: VatIdStruct }), + }), + object({ + method: literal(VatWorkerServiceCommandMethod.terminateAll), + params: literal(null), + }), + ]), +}); -const messageKit = makeIdentifiedMessageKit({ - source: vatWorkerServiceCommand, - isMessageId: (value: unknown): value is `m${number}` => - typeof value === 'string' && - value.at(0) === 'm' && - value.slice(1) === String(Number(value.slice(1))), +const VatWorkerServiceCommandReplyStruct = object({ + id: VatMessageIdStruct, + payload: union([ + object({ + method: union([ + literal(VatWorkerServiceCommandMethod.launch), + literal(VatWorkerServiceCommandMethod.terminate), + ]), + params: object({ + vatId: VatIdStruct, + error: optional(MarshaledErrorStruct), + }), + }), + object({ + method: literal(VatWorkerServiceCommandMethod.terminateAll), + params: union([ + literal(null), + object({ + vatId: optional(VatIdStruct), + error: MarshaledErrorStruct, + }), + ]), + }), + ]), }); -export const VatWorkerServiceCommandMethod = messageKit.methods; +export type VatWorkerServiceCommand = Infer< + typeof VatWorkerServiceCommandStruct +>; +export type VatWorkerServiceCommandReply = Infer< + typeof VatWorkerServiceCommandReplyStruct +>; -export type VatWorkerServiceCommand = typeof messageKit.send; -export const isVatWorkerServiceCommand: TypeGuard = - messageKit.sendGuard; +export const isVatWorkerServiceCommand: TypeGuard = ( + value: unknown, +): value is VatWorkerServiceCommand => is(value, VatWorkerServiceCommandStruct); -export type VatWorkerServiceCommandReply = typeof messageKit.reply; -export const isVatWorkerServiceCommandReply: TypeGuard = - messageKit.replyGuard; +export const isVatWorkerServiceCommandReply: TypeGuard< + VatWorkerServiceCommandReply +> = (value: unknown): value is VatWorkerServiceCommandReply => + is(value, VatWorkerServiceCommandReplyStruct); diff --git a/packages/kernel/src/messages/vat.test.ts b/packages/kernel/src/messages/vat.test.ts index 8b28c63df..be829cecc 100644 --- a/packages/kernel/src/messages/vat.test.ts +++ b/packages/kernel/src/messages/vat.test.ts @@ -3,7 +3,7 @@ import { describe, expect, it } from 'vitest'; import { isVatCommand, VatCommandMethod } from './vat.js'; describe('isVatCommand', () => { - const payload = { method: VatCommandMethod.Evaluate, params: '3 + 3' }; + const payload = { method: VatCommandMethod.evaluate, params: '3 + 3' }; it.each` value | expectedResult | description diff --git a/packages/kernel/src/messages/vat.ts b/packages/kernel/src/messages/vat.ts index 2ba698102..d6f704866 100644 --- a/packages/kernel/src/messages/vat.ts +++ b/packages/kernel/src/messages/vat.ts @@ -1,29 +1,97 @@ -import { makeIdentifiedMessageKit, messageType } from './message-kit.js'; -import { vatTestCommand } from './vat-test.js'; +import { + object, + union, + literal, + refine, + string, + is, +} from '@metamask/superstruct'; +import type { Infer } from '@metamask/superstruct'; + import { isVatId } from '../types.js'; import type { VatId } from '../types.js'; -export const vatCommand = { - CapTpInit: messageType( - (send) => send === null, - (reply) => typeof reply === 'string', - ), - - ...vatTestCommand, -}; - -const vatMessageKit = makeIdentifiedMessageKit({ - source: vatCommand, - isMessageId: (value: unknown): value is `${VatId}:${number}` => - typeof value === 'string' && - /^\w+:\d+$/u.test(value) && - isVatId(value.split(':')[0]), +type VatMessageId = `${VatId}:${number}`; + +const isVatMessageId = (value: unknown): value is VatMessageId => + typeof value === 'string' && + /^\w+:\d+$/u.test(value) && + isVatId(value.split(':')[0]); + +export const VatTestCommandMethod = { + evaluate: 'evaluate', + ping: 'ping', +} as const; + +export const VatCommandMethod = { + ...VatTestCommandMethod, + capTpInit: 'capTpInit', +} as const; + +const VatMessageIdStruct = refine(string(), 'VatMessageId', isVatMessageId); + +export const VatTestMethodStructs = { + [VatCommandMethod.evaluate]: object({ + method: literal(VatCommandMethod.evaluate), + params: string(), + }), + [VatCommandMethod.ping]: object({ + method: literal(VatCommandMethod.ping), + params: literal(null), + }), +} as const; + +const VatMethodStructs = { + ...VatTestMethodStructs, + [VatCommandMethod.capTpInit]: object({ + method: literal(VatCommandMethod.capTpInit), + params: literal(null), + }), +} as const; + +const VatCommandStruct = object({ + id: VatMessageIdStruct, + payload: union([ + VatMethodStructs.evaluate, + VatMethodStructs.ping, + VatMethodStructs.capTpInit, + ]), +}); + +export type VatCommand = Infer; + +export const VatTestReplyStructs = { + [VatCommandMethod.evaluate]: object({ + method: literal(VatCommandMethod.evaluate), + params: string(), + }), + [VatCommandMethod.ping]: object({ + method: literal(VatCommandMethod.ping), + params: string(), + }), +} as const; + +const VatReplyStructs = { + ...VatTestReplyStructs, + [VatCommandMethod.capTpInit]: object({ + method: literal(VatCommandMethod.capTpInit), + params: string(), + }), +} as const; + +const VatCommandReplyStruct = object({ + id: VatMessageIdStruct, + payload: union([ + VatReplyStructs.evaluate, + VatReplyStructs.ping, + VatReplyStructs.capTpInit, + ]), }); -export const VatCommandMethod = vatMessageKit.methods; +export type VatCommandReply = Infer; -export type VatCommand = typeof vatMessageKit.send; -export const isVatCommand = vatMessageKit.sendGuard; +export const isVatCommand = (value: unknown): value is VatCommand => + is(value, VatCommandStruct); -export type VatCommandReply = typeof vatMessageKit.reply; -export const isVatCommandReply = vatMessageKit.replyGuard; +export const isVatCommandReply = (value: unknown): value is VatCommandReply => + is(value, VatCommandReplyStruct); diff --git a/packages/kernel/src/stream-envelope.test.ts b/packages/kernel/src/stream-envelope.test.ts deleted file mode 100644 index 626036a0a..000000000 --- a/packages/kernel/src/stream-envelope.test.ts +++ /dev/null @@ -1,160 +0,0 @@ -import '@ocap/shims/endoify'; - -import { stringify } from '@ocap/utils'; -import { describe, it, expect } from 'vitest'; - -import type { - CapTpMessage, - VatCommand, - VatCommandReply, -} from './messages/index.js'; -import { VatCommandMethod } from './messages/index.js'; -import { - wrapCapTp, - wrapStreamCommand, - makeStreamEnvelopeHandler, - wrapStreamCommandReply, - makeStreamEnvelopeReplyHandler, -} from './stream-envelope.js'; - -describe('StreamEnvelopeHandler', () => { - const commandContent: VatCommand = { - id: 'v0:0', - payload: { method: VatCommandMethod.Evaluate, params: '1 + 1' }, - }; - const capTpContent: CapTpMessage = { - type: 'CTP_CALL', - epoch: 0, - // Our assumptions about the form of a CapTpMessage are weak. - foo: 'bar', - }; - - const commandLabel = wrapStreamCommand(commandContent).label; - const capTpLabel = wrapCapTp(capTpContent).label; - - const testEnvelopeHandlers = { - command: async () => commandLabel, - capTp: async () => capTpLabel, - }; - - const testErrorHandler = (problem: unknown, value: unknown): never => { - throw new Error(`TEST ${String(problem)} ${stringify(value)}`); - }; - - it.each` - wrapper | content | label - ${wrapStreamCommand} | ${commandContent} | ${commandLabel} - ${wrapCapTp} | ${capTpContent} | ${capTpLabel} - `( - 'handles valid StreamEnvelope $content', - async ({ wrapper, content, label }) => { - const handler = makeStreamEnvelopeHandler( - testEnvelopeHandlers, - testErrorHandler, - ); - expect(await handler.handle(wrapper(content))).toStrictEqual(label); - }, - ); - - it('routes invalid envelopes to default error handler', async () => { - const handler = makeStreamEnvelopeHandler(testEnvelopeHandlers); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow(/^Stream envelope handler received unexpected value/u); - }); - - it('routes invalid envelopes to supplied error handler', async () => { - const handler = makeStreamEnvelopeHandler( - testEnvelopeHandlers, - testErrorHandler, - ); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow( - /^TEST Stream envelope handler received unexpected value/u, - ); - }); - - it('routes valid stream envelopes with an unhandled label to the error handler', async () => { - const handler = makeStreamEnvelopeHandler( - { command: testEnvelopeHandlers.command }, - testErrorHandler, - ); - await expect(handler.handle(wrapCapTp(capTpContent))).rejects.toThrow( - /^TEST Stream envelope handler received an envelope with known but unexpected label/u, - ); - }); -}); - -describe('StreamEnvelopeReplyHandler', () => { - const commandContent: VatCommandReply = { - id: 'v0:0', - payload: { method: VatCommandMethod.Evaluate, params: '2' }, - }; - const capTpContent: CapTpMessage = { - type: 'CTP_CALL', - epoch: 0, - // Our assumptions about the form of a CapTpMessage are weak. - foo: 'bar', - }; - - const commandLabel = wrapStreamCommandReply(commandContent).label; - const capTpLabel = wrapCapTp(capTpContent).label; - - const testEnvelopeHandlers = { - command: async () => commandLabel, - capTp: async () => capTpLabel, - }; - - const testErrorHandler = (problem: unknown, value: unknown): never => { - throw new Error(`TEST ${String(problem)} ${stringify(value)}`); - }; - - it.each` - wrapper | content | label - ${wrapStreamCommandReply} | ${commandContent} | ${commandLabel} - ${wrapCapTp} | ${capTpContent} | ${capTpLabel} - `( - 'handles valid StreamEnvelopeReply $content', - async ({ wrapper, content, label }) => { - const handler = makeStreamEnvelopeReplyHandler( - testEnvelopeHandlers, - testErrorHandler, - ); - expect(await handler.handle(wrapper(content))).toStrictEqual(label); - }, - ); - - it('routes invalid envelopes to default error handler', async () => { - const handler = makeStreamEnvelopeReplyHandler(testEnvelopeHandlers); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow(/^Stream envelope handler received unexpected value/u); - }); - - it('routes invalid envelopes to supplied error handler', async () => { - const handler = makeStreamEnvelopeReplyHandler( - testEnvelopeHandlers, - testErrorHandler, - ); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow( - /^TEST Stream envelope handler received unexpected value/u, - ); - }); - - it('routes valid stream envelopes with an unhandled label to the error handler', async () => { - const handler = makeStreamEnvelopeReplyHandler( - { command: testEnvelopeHandlers.command }, - testErrorHandler, - ); - await expect(handler.handle(wrapCapTp(capTpContent))).rejects.toThrow( - /^TEST Stream envelope handler received an envelope with known but unexpected label/u, - ); - }); -}); diff --git a/packages/kernel/src/stream-envelope.ts b/packages/kernel/src/stream-envelope.ts deleted file mode 100644 index fceb6fb5e..000000000 --- a/packages/kernel/src/stream-envelope.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { makeStreamEnvelopeKit } from '@ocap/streams'; -import type { ExtractGuardType } from '@ocap/utils'; - -import { - isCapTpMessage, - isVatCommand, - isVatCommandReply, -} from './messages/index.js'; -import type { - CapTpMessage, - VatCommand, - VatCommandReply, -} from './messages/index.js'; - -// Declare and destructure the envelope kit. - -enum EnvelopeLabel { - Command = 'command', - CapTp = 'capTp', -} - -// makeStreamEnvelopeKit requires an enum of labels but typescript -// doesn't support enums as bounds on template parameters. -// -// See https://github.com/microsoft/TypeScript/issues/30611 -// -// This workaround makes something equivalently type inferenceable. -// eslint-disable-next-line @typescript-eslint/no-unused-vars -const envelopeLabels = Object.values(EnvelopeLabel); - -// For now, this envelope kit is for intial sends only - -const envelopeKit = makeStreamEnvelopeKit< - typeof envelopeLabels, - { - command: VatCommand; - capTp: CapTpMessage; - } ->({ - command: isVatCommand, - capTp: isCapTpMessage, -}); - -export type StreamEnvelope = ExtractGuardType< - typeof envelopeKit.isStreamEnvelope ->; -export type StreamEnvelopeHandler = ReturnType< - typeof envelopeKit.makeStreamEnvelopeHandler ->; - -export const wrapStreamCommand = envelopeKit.streamEnveloper.command.wrap; -export const wrapCapTp = envelopeKit.streamEnveloper.capTp.wrap; -export const { makeStreamEnvelopeHandler } = envelopeKit; - -// For now, a separate envelope kit for replies only - -const streamEnvelopeReplyKit = makeStreamEnvelopeKit< - typeof envelopeLabels, - { - command: VatCommandReply; - capTp: CapTpMessage; - } ->({ - command: isVatCommandReply, - capTp: isCapTpMessage, -}); - -export type StreamEnvelopeReply = ExtractGuardType< - typeof streamEnvelopeReplyKit.isStreamEnvelope ->; -export type StreamEnvelopeReplyHandler = ReturnType< - typeof streamEnvelopeReplyKit.makeStreamEnvelopeHandler ->; - -export const wrapStreamCommandReply = - streamEnvelopeReplyKit.streamEnveloper.command.wrap; -// Note: We don't differentiate between wrapCapTp and wrapCapTpReply -export const { makeStreamEnvelopeHandler: makeStreamEnvelopeReplyHandler } = - streamEnvelopeReplyKit; diff --git a/packages/kernel/src/types.ts b/packages/kernel/src/types.ts index bdf120041..e2d88eb25 100644 --- a/packages/kernel/src/types.ts +++ b/packages/kernel/src/types.ts @@ -1,7 +1,6 @@ import type { PromiseKit } from '@endo/promise-kit'; -import type { DuplexStream } from '@ocap/streams'; - -import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; +import { define } from '@metamask/superstruct'; +import type { DuplexStream, MultiplexEnvelope } from '@ocap/streams'; export type VatId = `v${number}`; @@ -10,6 +9,20 @@ export const isVatId = (value: unknown): value is VatId => value.at(0) === 'v' && value.slice(1) === String(Number(value.slice(1))); +export const VatIdStruct = define('VatId', isVatId); + +export type VatMessageId = `m${number}`; + +export const isVatMessageId = (value: unknown): value is VatMessageId => + typeof value === 'string' && + value.at(0) === 'm' && + value.slice(1) === String(Number(value.slice(1))); + +export const VatMessageIdStruct = define( + 'VatMessageId', + isVatMessageId, +); + export type PromiseCallbacks = Omit< PromiseKit, 'promise' @@ -25,7 +38,7 @@ export type VatWorkerService = { */ launch: ( vatId: VatId, - ) => Promise>; + ) => Promise>; /** * Terminate a worker identified by its vat id. * diff --git a/packages/streams/documents/make-stream-envelope-kit.md b/packages/streams/documents/make-stream-envelope-kit.md deleted file mode 100644 index 2fd579d73..000000000 --- a/packages/streams/documents/make-stream-envelope-kit.md +++ /dev/null @@ -1,124 +0,0 @@ ---- -title: Making a StreamEnvelopeKit -group: Documents -category: Guides ---- - -# makeStreamEnvelopeKit - -### Template parameters must be explicitly declared - -To ensure proper typescript inference behavior, it is necessary to explicitly declare the template parameters when calling `makeStreamEnvelopeKit`. See the [example](#example) below for the recommended declaration pattern. - -### Passing an enum as a template parameter - -Due to a [typescript limitation](https://github.com/microsoft/TypeScript/issues/30611) it is not possible to specify an enum as the expected type of a template parameter. Therefore `makeStreamEnvelopeKit` will accept template parameters which are not within its intended bounds; improperly specified template parameters will result in improper typescript inference behavior. See the [example](#example) below for the recommended declaration pattern. - -### Example declaration - -```ts -import { makeStreamEnvelopeKit } from '@ocap/streams'; - -// Declare the content types. -type FooContent = { - a: number; - b: string; -}; - -type BarContent = { - c: boolean; -}; - -// Specify envelope labels in an enum. -enum EnvelopeLabel { - Foo = 'foo', - Bar = 'bar', -} - -// Create a string[] from the EnvelopeLabel enum. -const labels = Object.values(EnvelopeLabel); - -// Make the StreamEnvelopeKit. -export const myStreamEnvelopeKit = makeStreamEnvelopeKit< - // Pass the EnvelopeLabel enum as `typeof labels`. - typeof labels, - // Specify the content type for each content label. - { - // foo matches the value 'foo' of EnvelopeLabel.Foo - foo: FooContent; - bar: BarContent; - } ->({ - // Specify the type guards for each envelope label. - foo: (value: unknown): value is FooContent => - isObject(value) && - typeof value.a === 'number' && - typeof value.b === 'string', - - // bar matches the value 'bar' of EnvelopeLabel.Bar - bar: (value: unknown): value is BarContent => - isObject(value) && typeof value.c === 'boolean', -}); -``` - -### Enveloper use - -The low level enveloping functionality is available via the included `streamEnveloper` and `isStreamEnvelope`. - -```ts -// Destructure your new envelope kit. -const { streamEnveloper, isStreamEnvelope } = myStreamEnvelopeKit; - -// Wrap some FooContent. -const envelope = streamEnveloper.foo.wrap({ - a: 1, - b: 'one', -}); - -// Protect your assumptions with the supplied type guard. -if (isStreamEnvelope(envelope)) { - // ~~~ Unwrap your envelope right away! ~~~ - const content = streamEnveloper[envelope.label].unwrap(envelope); -} -``` - -### Handler use - -If you know in advance how you plan to handle with your envelopes, you can let a `StreamEnvelopeHandler` do the checking and unwrapping for you. - -```ts -// Destructure the maker from the kit. -const { makeStreamEnvelopeHandler } = myStreamEnvelopeKit; - -// Declare how you want your envelope labels handled. -const streamEnvelopeHandler = makeStreamEnvelopeHandler( - { - // The content type is automatically inferred in the declaration. - foo: async (content) => { - await delay(content.a); - return content.b; - }, - bar: async (content) => (content.c ? 'yes' : 'no'), - }, - // The optional errorHandler can throw or return. - // If unspecified, the default behavior is to throw. - (reason, value) => { - if (reason.match(/unexpected value/u)) { - throw new Error(`[myStreamError] ${reason}`); - } - return ['[myStreamWarning]', reason, value]; - }, -); - -// Read messages from an @ocap/streams Reader. -for await (const newMessage of myStreamReader) { - // And handle the message. - await streamEnvelopeHandler - .handle(newMessage) - // If the errorHandler throws, you can catch it here. - .catch(console.error) - // Otherwise, the promise resolves to the value returned by - // its appropriate content handler, or by the errorHandler. - .then(console.log); -} -``` diff --git a/packages/streams/src/BaseDuplexStream.test.ts b/packages/streams/src/BaseDuplexStream.test.ts index 91eaf0d08..c9c5ccfbb 100644 --- a/packages/streams/src/BaseDuplexStream.test.ts +++ b/packages/streams/src/BaseDuplexStream.test.ts @@ -3,7 +3,7 @@ import { stringify } from '@ocap/utils'; import { describe, expect, it, vi } from 'vitest'; import { BaseDuplexStream, makeAck, makeSyn } from './BaseDuplexStream.js'; -import { makeDoneResult, makePendingResult } from './utils.js'; +import { makePendingResult, makeStreamDoneSignal } from './utils.js'; import { TestDuplexStream } from '../test/stream-mocks.js'; describe('BaseDuplexStream', () => { @@ -26,7 +26,7 @@ describe('BaseDuplexStream', () => { it('resolves the synchronization promise when receiving an ACK', async () => { const duplexStream = new TestDuplexStream(() => undefined); - duplexStream.receiveInput(makeAck()); + await duplexStream.receiveInput(makeAck()); expect(await duplexStream.completeSynchronization()).toBeUndefined(); }); @@ -37,7 +37,7 @@ describe('BaseDuplexStream', () => { throw error; }); - duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); await delay(10); expect(onDispatch).toHaveBeenCalledTimes(2); expect(onDispatch).toHaveBeenNthCalledWith(2, makeAck()); @@ -47,8 +47,8 @@ describe('BaseDuplexStream', () => { const duplexStream = new TestDuplexStream(() => undefined); duplexStream.synchronize().catch(() => undefined); - duplexStream.receiveInput(makeSyn()); - duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); await expect(duplexStream.next()).rejects.toThrow( 'Received duplicate SYN message during synchronization', ); @@ -58,7 +58,7 @@ describe('BaseDuplexStream', () => { const duplexStream = new TestDuplexStream(() => undefined); duplexStream.synchronize().catch(() => undefined); - duplexStream.receiveInput(42); + await duplexStream.receiveInput(42); await expect(duplexStream.next()).rejects.toThrow( `Received unexpected message during synchronization: ${stringify({ done: false, @@ -71,8 +71,8 @@ describe('BaseDuplexStream', () => { const duplexStream = new TestDuplexStream(() => undefined); duplexStream.synchronize().catch(() => undefined); - duplexStream.receiveInput(makeSyn()); - duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); await expect(duplexStream.next()).rejects.toThrow( 'Received duplicate SYN message during synchronization', ); @@ -85,8 +85,8 @@ describe('BaseDuplexStream', () => { const duplexStream = new TestDuplexStream(() => undefined); duplexStream.synchronize().catch(() => undefined); - duplexStream.receiveInput(makeSyn()); - duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); + await duplexStream.receiveInput(makeSyn()); await expect(duplexStream.write(42)).rejects.toThrow( 'Received duplicate SYN message during synchronization', ); @@ -149,7 +149,7 @@ describe('BaseDuplexStream', () => { const duplexStream = await TestDuplexStream.make(() => undefined); const message = 42; - duplexStream.receiveInput(message); + await duplexStream.receiveInput(message); expect(await duplexStream.next()).toStrictEqual(makePendingResult(message)); }); @@ -158,19 +158,29 @@ describe('BaseDuplexStream', () => { const duplexStream = new TestDuplexStream(() => undefined); const nextP = duplexStream.next(); - const message = 42; await duplexStream.completeSynchronization(); - duplexStream.receiveInput(message); + await duplexStream.receiveInput(42); + expect(await nextP).toStrictEqual(makePendingResult(42)); + }); + + it('reads from the reader, with input validation', async () => { + const duplexStream = await TestDuplexStream.make(() => undefined, { + validateInput: (value: unknown): value is number => + typeof value === 'number', + }); - expect(await nextP).toStrictEqual(makePendingResult(message)); + await duplexStream.receiveInput(42); + expect(await duplexStream.next()).toStrictEqual(makePendingResult(42)); }); it('drains the reader in order', async () => { const duplexStream = await TestDuplexStream.make(() => undefined); const messages = [1, 2, 3]; - messages.forEach((message) => duplexStream.receiveInput(message)); + await Promise.all( + messages.map(async (message) => duplexStream.receiveInput(message)), + ); await duplexStream.return(); let index = 0; @@ -241,7 +251,7 @@ describe('BaseDuplexStream', () => { readerOnEnd, }); - duplexStream.receiveInput(makeDoneResult()); + await duplexStream.receiveInput(makeStreamDoneSignal()); expect(readerOnEnd).toHaveBeenCalledOnce(); }); diff --git a/packages/streams/src/BaseDuplexStream.ts b/packages/streams/src/BaseDuplexStream.ts index 486ba3c4c..89d749469 100644 --- a/packages/streams/src/BaseDuplexStream.ts +++ b/packages/streams/src/BaseDuplexStream.ts @@ -2,10 +2,9 @@ import type { PromiseKit } from '@endo/promise-kit'; import { makePromiseKit } from '@endo/promise-kit'; import type { Reader } from '@endo/stream'; import { isObject } from '@metamask/utils'; -import type { Json } from '@metamask/utils'; import { stringify } from '@ocap/utils'; -import type { BaseReader, BaseWriter } from './BaseStream.js'; +import type { BaseReader, BaseWriter, ValidateInput } from './BaseStream.js'; import { makeDoneResult } from './utils.js'; export enum DuplexStreamSentinel { @@ -35,6 +34,29 @@ export const makeAck = (): DuplexStreamAck => ({ const isAck = (value: unknown): value is DuplexStreamAck => isObject(value) && value[DuplexStreamSentinel.Ack] === true; +type StreamSignal = DuplexStreamSyn | DuplexStreamAck; + +const isDuplexStreamSignal = (value: unknown): value is StreamSignal => + isSyn(value) || isAck(value); + +/** + * Make a validator for input to a duplex stream. Constructor helper for concrete + * duplex stream implementations. + * + * Validators passed in by consumers must be augmented such that errors aren't + * thrown for {@link StreamSignal} values. + * + * @param validateInput - The validator for the stream's input type. + * @returns A validator for the stream's input type, or `undefined` if no + * validation is desired. + */ +export const makeDuplexStreamInputValidator = ( + validateInput?: ValidateInput, +): ((value: unknown) => value is Read) | undefined => + validateInput && + ((value: unknown): value is Read => + isDuplexStreamSignal(value) || validateInput(value)); + enum SynchronizationStatus { Idle = 0, Pending = 1, @@ -46,9 +68,9 @@ enum SynchronizationStatus { * Backed up by separate {@link BaseReader} and {@link BaseWriter} instances under the hood. */ export abstract class BaseDuplexStream< - Read extends Json, + Read, ReadStream extends BaseReader, - Write extends Json = Read, + Write = Read, WriteStream extends BaseWriter = BaseWriter, > implements Reader { @@ -117,7 +139,7 @@ export abstract class BaseDuplexStream< * * @returns A promise that resolves when the stream is synchronized. */ - protected async synchronize(): Promise { + async synchronize(): Promise { if (this.#synchronizationStatus !== SynchronizationStatus.Idle) { return this.#syncKit.promise; } @@ -219,7 +241,7 @@ harden(BaseDuplexStream); /** * A duplex stream. Essentially a {@link Reader} with a `write()` method. */ -export type DuplexStream = Pick< +export type DuplexStream = Pick< BaseDuplexStream, Write, BaseWriter>, 'next' | 'write' | 'drain' | 'return' | 'throw' > & { diff --git a/packages/streams/src/BaseStream.test.ts b/packages/streams/src/BaseStream.test.ts index 6a7705a10..cfb3079b3 100644 --- a/packages/streams/src/BaseStream.test.ts +++ b/packages/streams/src/BaseStream.test.ts @@ -46,7 +46,7 @@ describe('BaseReader', () => { const reader = new TestReader(); const message = 42; - reader.receiveInput(message); + await reader.receiveInput(message); expect(await reader.next()).toStrictEqual(makePendingResult(message)); }); @@ -58,7 +58,7 @@ describe('BaseReader', () => { }); const message = 42; - reader.receiveInput(message); + await reader.receiveInput(message); expect(await reader.next()).toStrictEqual(makePendingResult(message)); expect(validateInput).toHaveBeenCalledWith(message); @@ -69,7 +69,7 @@ describe('BaseReader', () => { const nextP = reader.next(); const message = 42; - reader.receiveInput(message); + await reader.receiveInput(message); expect(await nextP).toStrictEqual(makePendingResult(message)); }); @@ -78,7 +78,9 @@ describe('BaseReader', () => { const reader = new TestReader(); const messages = [1, 2, 3]; - messages.forEach((message) => reader.receiveInput(message)); + await Promise.all( + messages.map(async (message) => reader.receiveInput(message)), + ); let index = 0; for await (const message of reader) { @@ -91,33 +93,10 @@ describe('BaseReader', () => { } }); - it('throws after receiving non-Dispatchable input, before read is enqueued', async () => { - const reader = new TestReader(); - - const badMessage = Symbol('foo'); - reader.receiveInput(badMessage); - - await expect(reader.next()).rejects.toThrow( - 'TestReader: Message cannot be processed (must be JSON-serializable)', - ); - }); - - it('throws after receiving non-Dispatchable input, after read is enqueued', async () => { - const reader = new TestReader(); - - const nextP = reader.next(); - const badMessage = Symbol('foo'); - reader.receiveInput(badMessage); - - await expect(nextP).rejects.toThrow( - 'TestReader: Message cannot be processed (must be JSON-serializable)', - ); - }); - it('throws after receiving marshaled error, before read is enqueued', async () => { const reader = new TestReader(); - reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); + await reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); await expect(reader.next()).rejects.toThrow('foo'); }); @@ -126,7 +105,7 @@ describe('BaseReader', () => { const reader = new TestReader(); const nextP = reader.next(); - reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); + await reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); await expect(nextP).rejects.toThrow('foo'); }); @@ -136,7 +115,7 @@ describe('BaseReader', () => { validateInput: (value) => typeof value === 'number', }); - reader.receiveInput({}); + await reader.receiveInput({}); await expect(reader.next()).rejects.toThrow( 'TestReader: Message failed type validation', @@ -149,7 +128,7 @@ describe('BaseReader', () => { }); const nextP = reader.next(); - reader.receiveInput({}); + await reader.receiveInput({}); await expect(nextP).rejects.toThrow( 'TestReader: Message failed type validation', @@ -159,7 +138,7 @@ describe('BaseReader', () => { it('ends after receiving done signal, before read is enqueued', async () => { const reader = new TestReader(); - reader.receiveInput(makeStreamDoneSignal()); + await reader.receiveInput(makeStreamDoneSignal()); expect(await reader.next()).toStrictEqual(makeDoneResult()); expect(await reader.next()).toStrictEqual(makeDoneResult()); @@ -169,7 +148,7 @@ describe('BaseReader', () => { const reader = new TestReader(); const nextP = reader.next(); - reader.receiveInput(makeStreamDoneSignal()); + await reader.receiveInput(makeStreamDoneSignal()); expect(await nextP).toStrictEqual(makeDoneResult()); expect(await reader.next()).toStrictEqual(makeDoneResult()); @@ -178,7 +157,7 @@ describe('BaseReader', () => { it('enqueues input before returning', async () => { const reader = new TestReader(); - reader.receiveInput(1); + await reader.receiveInput(1); await reader.return(); expect(await reader.next()).toStrictEqual(makePendingResult(1)); @@ -189,7 +168,7 @@ describe('BaseReader', () => { const reader = new TestReader(); await reader.return(); - reader.receiveInput(1); + await reader.receiveInput(1); expect(await reader.next()).toStrictEqual(makeDoneResult()); expect(await reader.next()).toStrictEqual(makeDoneResult()); @@ -198,8 +177,8 @@ describe('BaseReader', () => { it('enqueues input before throwing', async () => { const reader = new TestReader(); - reader.receiveInput(1); - reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); + await reader.receiveInput(1); + await reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); expect(await reader.next()).toStrictEqual(makePendingResult(1)); await expect(reader.next()).rejects.toThrow('foo'); @@ -209,8 +188,8 @@ describe('BaseReader', () => { it('ignores input after throwing', async () => { const reader = new TestReader(); - reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); - reader.receiveInput(1); + await reader.receiveInput(makeStreamErrorSignal(new Error('foo'))); + await reader.receiveInput(1); await expect(reader.next()).rejects.toThrow('foo'); expect(await reader.next()).toStrictEqual(makeDoneResult()); diff --git a/packages/streams/src/BaseStream.ts b/packages/streams/src/BaseStream.ts index 3b2b4156a..3dffed5db 100644 --- a/packages/streams/src/BaseStream.ts +++ b/packages/streams/src/BaseStream.ts @@ -1,12 +1,9 @@ import { makePromiseKit } from '@endo/promise-kit'; import type { Reader, Writer } from '@endo/stream'; -import type { Json } from '@metamask/utils'; import { stringify } from '@ocap/utils'; import type { Dispatchable, PromiseCallbacks, Writable } from './utils.js'; import { - assertIsWritable, - isDispatchable, makeDoneResult, makePendingResult, makeStreamDoneSignal, @@ -16,8 +13,10 @@ import { unmarshal, } from './utils.js'; -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -const makeStreamBuffer = >() => { +const makeStreamBuffer = < + Value extends IteratorResult, + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type +>() => { const inputBuffer: (Value | Error)[] = []; const outputBuffer: PromiseCallbacks[] = []; let done = false; @@ -34,15 +33,15 @@ const makeStreamBuffer = >() => { if (done) { return; } - done = true; + for (const { resolve, reject } of outputBuffer) { error ? reject(error) : resolve(makeDoneResult() as Value); } outputBuffer.length = 0; }, - hasPendingReads() { + hasPendingReads(): boolean { return outputBuffer.length > 0; }, @@ -52,7 +51,7 @@ const makeStreamBuffer = >() => { * @see `end()` for behavior when the stream ends. * @param value - The value or error to put. */ - put(value: Value | Error) { + put(value: Value | Error): void { if (done) { return; } @@ -65,7 +64,7 @@ const makeStreamBuffer = >() => { inputBuffer.push(value); }, - async get() { + async get(): Promise { if (inputBuffer.length > 0) { const value = inputBuffer.shift() as Value; return value instanceof Error @@ -97,15 +96,15 @@ export type OnEnd = () => void | Promise; /** * A function that validates input to a readable stream. */ -export type ValidateInput = (input: Json) => input is Read; +export type ValidateInput = (input: unknown) => input is Read; /** * A function that receives input from a transport mechanism to a readable stream. * Validates that the input is an {@link IteratorResult}, and throws if it is not. */ -export type ReceiveInput = (input: unknown) => void; +export type ReceiveInput = (input: unknown) => Promise; -export type BaseReaderArgs = { +export type BaseReaderArgs = { name?: string | undefined; onEnd?: OnEnd | undefined; validateInput?: ValidateInput | undefined; @@ -121,7 +120,7 @@ export type BaseReaderArgs = { * The result of any value received before the stream ends is guaranteed to be observable * by the consumer. */ -export class BaseReader implements Reader { +export class BaseReader implements Reader { /** * A buffer for managing backpressure (writes > reads) and "suction" (reads > writes) for a stream. * Modeled on `AsyncQueue` from `@endo/stream`, but with arrays under the hood instead of a promise chain. @@ -168,15 +167,9 @@ export class BaseReader implements Reader { return this.#receiveInput.bind(this); } - readonly #receiveInput: ReceiveInput = async (input) => { - if (!isDispatchable(input)) { - await this.#handleInputError( - new Error( - `${this.#name}: Message cannot be processed (must be JSON-serializable):\n${stringify(input)}`, - ), - ); - return; - } + readonly #receiveInput = async (input: unknown): Promise => { + // eslint-disable-next-line @typescript-eslint/await-thenable + await null; const unmarshaled = unmarshal(input); if (unmarshaled instanceof Error) { @@ -216,8 +209,9 @@ export class BaseReader implements Reader { */ async #end(error?: Error): Promise { this.#buffer.end(error); - await this.#onEnd?.(); + const onEndP = this.#onEnd?.(); this.#onEnd = undefined; + await onEndP; } [Symbol.asyncIterator](): typeof this { @@ -257,11 +251,11 @@ export class BaseReader implements Reader { } harden(BaseReader); -export type Dispatch = ( +export type Dispatch = ( value: Dispatchable, ) => void | Promise; -export type BaseWriterArgs = { +export type BaseWriterArgs = { onDispatch: Dispatch; name?: string | undefined; onEnd?: OnEnd | undefined; @@ -270,14 +264,14 @@ export type BaseWriterArgs = { /** * The base of a writable async iterator stream. */ -export class BaseWriter implements Writer { +export class BaseWriter implements Writer { #isDone: boolean = false; readonly #name: string = 'BaseWriter'; readonly #onDispatch: Dispatch; - #onEnd: OnEnd | undefined; + #onEnd?: OnEnd | undefined; /** * Constructs a {@link BaseWriter}. @@ -311,7 +305,6 @@ export class BaseWriter implements Writer { value: Writable, hasFailed = false, ): Promise> { - assertIsWritable(value); try { await this.#onDispatch(marshal(value)); return value === StreamDoneSymbol || value instanceof Error @@ -342,8 +335,9 @@ export class BaseWriter implements Writer { async #end(): Promise { this.#isDone = true; - await this.#onEnd?.(); + const onEndP = this.#onEnd?.(); this.#onEnd = undefined; + await onEndP; } [Symbol.asyncIterator](): typeof this { diff --git a/packages/streams/src/StreamMultiplexer.test.ts b/packages/streams/src/StreamMultiplexer.test.ts new file mode 100644 index 000000000..455d14a94 --- /dev/null +++ b/packages/streams/src/StreamMultiplexer.test.ts @@ -0,0 +1,297 @@ +import { delay, makePromiseKitMock } from '@ocap/test-utils'; +import { describe, expect, it, vi } from 'vitest'; + +import type { ValidateInput } from './BaseStream.js'; +import { StreamMultiplexer } from './StreamMultiplexer.js'; +import type { MultiplexEnvelope } from './StreamMultiplexer.js'; +import { makeDoneResult } from './utils.js'; +import { TestDuplexStream, TestMultiplexer } from '../test/stream-mocks.js'; + +vi.mock('@endo/promise-kit', () => makePromiseKitMock()); + +const isString: ValidateInput = (value) => typeof value === 'string'; + +const isNumber: ValidateInput = (value) => typeof value === 'number'; + +const makeEnvelope = ( + channel: string, + payload: unknown, +): MultiplexEnvelope => ({ + channel, + payload, +}); + +const noop = (_value: unknown): void => undefined; + +describe('StreamMultiplexer', () => { + it('constructs a StreamMultiplexer', () => { + const duplex = new TestDuplexStream( + () => undefined, + ); + const multiplex = new TestMultiplexer(duplex); + expect(multiplex).toBeInstanceOf(StreamMultiplexer); + }); + + describe('addChannel', () => { + it('makes and adds channels', async () => { + const [multiplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + expect(ch1[Symbol.asyncIterator]()).toBe(ch1); + expect(ch2[Symbol.asyncIterator]()).toBe(ch2); + }); + + it('throws if adding a channel with the same name multiple times', async () => { + const [multiplex] = await TestMultiplexer.make(); + multiplex.addChannel('1', isString, noop); + expect(() => multiplex.addChannel('1', isString, noop)).toThrow( + 'Channel "1" already exists', + ); + }); + + it('throws if adding channels after starting', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + multiplex.addChannel('1', isString, noop); + const startP = multiplex.start(); + + expect(() => multiplex.addChannel('2', isNumber, noop)).toThrow( + 'Channels must be added before starting the multiplexer', + ); + + await Promise.all([startP, duplex.return()]); + }); + + it('throws if adding channels after ending', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + // Add one channel so we can start the multiplexer. + multiplex.addChannel('1', isString, noop); + + await Promise.all([multiplex.drainAll(), duplex.return()]); + + expect(() => multiplex.addChannel('2', isNumber, noop)).toThrow( + 'Channels must be added before starting', + ); + }); + }); + + describe('start', () => { + it('is idempotent', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + // Add one channel so we can start the multiplexer. + multiplex.addChannel('1', isString, noop); + const startP = Promise.all([multiplex.start(), multiplex.start()]).then( + () => undefined, + ); + await duplex.return(); + expect(await startP).toBeUndefined(); + }); + + it('enables draining channels separately', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1Handler = vi.fn(); + const ch2Handler = vi.fn(); + const ch1 = multiplex.addChannel('1', isString, ch1Handler); + const ch2 = multiplex.addChannel('2', isNumber, ch2Handler); + + const startAndDrainP = Promise.all([ + multiplex.start(), + ch1.drain(), + ch2.drain(), + ]); + + await Promise.all([ + duplex.receiveInput(makeEnvelope('1', 'foo')), + duplex.receiveInput(makeEnvelope('2', 42)), + ]); + + await delay(10); + + await duplex.return(); + await startAndDrainP; + + expect(ch1Handler).toHaveBeenCalledWith('foo'); + expect(ch2Handler).toHaveBeenCalledWith(42); + }); + }); + + describe('drainAll', () => { + it('throws if draining when there are no channels', async () => { + const [multiplex] = await TestMultiplexer.make(); + await expect(multiplex.drainAll()).rejects.toThrow( + 'TestMultiplexer has no channels', + ); + }); + + it('forwards input to the correct channel', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1Handler = vi.fn(); + const ch2Handler = vi.fn(); + multiplex.addChannel('1', isString, ch1Handler); + multiplex.addChannel('2', isNumber, ch2Handler); + const drainP = multiplex.drainAll(); + + await Promise.all([ + duplex.receiveInput(makeEnvelope('1', 'foo')), + duplex.receiveInput(makeEnvelope('2', 42)), + ]); + + await delay(10); + + expect(ch1Handler).toHaveBeenCalledWith('foo'); + expect(ch2Handler).toHaveBeenCalledWith(42); + + await duplex.return(); + await drainP; + }); + + it('ends all streams when the duplex stream returns', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + const drainP = multiplex.drainAll(); + + await duplex.return(); + + expect(await duplex.next()).toStrictEqual(makeDoneResult()); + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + expect(await drainP).toBeUndefined(); + }); + + it('ends all streams when any channel returns', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + const drainP = multiplex.drainAll(); + + await ch1.return(); + + expect(await duplex.next()).toStrictEqual(makeDoneResult()); + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + expect(await drainP).toBeUndefined(); + }); + + it('ends all streams when the duplex stream throws', async () => { + const onDispatch = vi.fn(); + const [multiplex] = await TestMultiplexer.make( + await TestDuplexStream.make(onDispatch), + ); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + onDispatch.mockImplementationOnce(() => { + throw new Error('foo'); + }); + + const drainP = multiplex.drainAll(); + + await expect(ch1.write('foo')).rejects.toThrow( + 'TestDuplexStream experienced a dispatch failure', + ); + + await expect(drainP).rejects.toThrow( + 'TestDuplexStream experienced a dispatch failure', + ); + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + }); + + it('ends all streams when a channel throws', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + const drainP = multiplex.drainAll(); + + await duplex.receiveInput(makeEnvelope('1', 42)); + + await expect(drainP).rejects.toThrow( + 'TestMultiplexer#1: Message failed type validation', + ); + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + }); + + it('ends all streams when receiving a message for a non-existent channel', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + const drainP = multiplex.drainAll(); + + // There is no channel 3 + await duplex.receiveInput(makeEnvelope('3', 42)); + + await expect(drainP).rejects.toThrow( + 'TestMultiplexer received message for unknown channel: 3', + ); + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + }); + }); + + describe('writing', () => { + it('writes channel messages correctly', async () => { + const dispatch = vi.fn(); + const duplex = await TestDuplexStream.make< + MultiplexEnvelope, + MultiplexEnvelope + >(dispatch); + const [multiplex] = await TestMultiplexer.make(duplex); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + await ch1.write('foo'); + await ch2.write(42); + + expect(dispatch).toHaveBeenCalledWith({ + channel: '1', + payload: 'foo', + }); + expect(dispatch).toHaveBeenLastCalledWith({ + channel: '2', + payload: 42, + }); + }); + + it('returns done results from channel writes after ending', async () => { + const [multiplex, duplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + await Promise.all([multiplex.drainAll(), duplex.return()]); + + expect(await ch1.write('foo')).toStrictEqual(makeDoneResult()); + expect(await ch2.write(42)).toStrictEqual(makeDoneResult()); + }); + }); + + describe('return', () => { + it('ends the multiplexer and its channels', async () => { + const [multiplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + await multiplex.return(); + + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + }); + + it('is idempotent', async () => { + const [multiplex] = await TestMultiplexer.make(); + const ch1 = multiplex.addChannel('1', isString, noop); + const ch2 = multiplex.addChannel('2', isNumber, noop); + + await multiplex.return(); + + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + + await multiplex.return(); + + expect(await ch1.next()).toStrictEqual(makeDoneResult()); + expect(await ch2.next()).toStrictEqual(makeDoneResult()); + }); + }); +}); diff --git a/packages/streams/src/StreamMultiplexer.ts b/packages/streams/src/StreamMultiplexer.ts new file mode 100644 index 000000000..bf2ee5109 --- /dev/null +++ b/packages/streams/src/StreamMultiplexer.ts @@ -0,0 +1,330 @@ +/** + * This module provides a class and utilities for multiplexing duplex streams. A + * multiplexer is not a stream itself, but rather a wrapper around a duplex stream. + * The multiplexer provides methods for creating "channels" over the underlying stream, + * which are themselves duplex streams and may have a different message type and + * validation logic. + * + * The multiplexer is constructed in an idle state, and must be explicitly "started" + * via the `start()` or `drainAll()` methods. All channels must be added before the + * multiplexer is started. + * + * Starting the multiplexer will synchronize the underlying duplex stream, if it is + * synchronizable. Therefore, in order to prevent message loss, callers **should not** + * synchronize the underlying duplex stream before passing it to the multiplexer. For + * the same reason, the multiplexer will throw if any channels are added after it has + * started. + * + * @module StreamMultiplexer + */ + +import { isObject } from '@metamask/utils'; + +import type { DuplexStream } from './BaseDuplexStream.js'; +import type { + BaseReaderArgs, + ValidateInput, + ReceiveInput, +} from './BaseStream.js'; +import { BaseReader } from './BaseStream.js'; +import { makeDoneResult } from './utils.js'; + +/** + * A duplex stream that can maybe be synchronized. + */ +type SynchronizableDuplexStream = DuplexStream< + Read, + Write +> & { + synchronize?: () => Promise; +}; + +/** + * The read stream implementation for {@link StreamMultiplexer} channels. + */ +class ChannelReader extends BaseReader { + // eslint-disable-next-line no-restricted-syntax + private constructor(args: BaseReaderArgs) { + super(args); + } + + static make( + args: BaseReaderArgs, + ): [ChannelReader, ReceiveInput] { + const channel = new ChannelReader(args); + return [channel, channel.getReceiveInput()] as const; + } +} + +type ChannelName = string; + +/** + * A multiplex envelope. The wrapper for all values passing through the underlying + * duplex stream of a {@link StreamMultiplexer}. + */ +export type MultiplexEnvelope = { + channel: ChannelName; + payload: Payload; +}; + +/** + * Type guard for {@link MultiplexEnvelope}. Only verifies that the `payload` property + * is not `undefined`, assuming that multiplexer channels will be responsible for + * performing further validation. + * + * @param value - The value to check. + * @returns Whether the value is a {@link MultiplexEnvelope}. + */ +export const isMultiplexEnvelope = ( + value: unknown, +): value is MultiplexEnvelope => + isObject(value) && + typeof value.channel === 'string' && + typeof value.payload !== 'undefined'; + +type HandleRead = (value: Read) => void | Promise; + +/** + * A duplex stream whose `drain` method does not accept a callback. We say it is + * "handled" because in practice the callback is bound to the `drain` method. + */ +export type HandledDuplexStream = Omit< + DuplexStream, + 'drain' +> & { + drain: () => Promise; +}; + +enum MultiplexerStatus { + Idle = 0, + Running = 1, + Done = 2, +} + +type ChannelRecord = { + channelName: ChannelName; + stream: HandledDuplexStream; + receiveInput: ReceiveInput; +}; + +export class StreamMultiplexer { + #status: MultiplexerStatus; + + readonly #name: string; + + readonly #channels: Map>; + + readonly #stream: SynchronizableDuplexStream< + MultiplexEnvelope, + MultiplexEnvelope + >; + + /** + * Creates a new multiplexer over the specified duplex stream. If the duplex stream + * is synchronizable, it will be synchronized by the multiplexer and **should not** + * be synchronized by the caller. + * + * @param stream - The underlying duplex stream. + * @param name - The multiplexer name. + */ + constructor( + stream: SynchronizableDuplexStream< + MultiplexEnvelope, + MultiplexEnvelope + >, + name?: string, + ) { + this.#status = MultiplexerStatus.Idle; + this.#channels = new Map(); + this.#name = name ?? this.constructor.name; + this.#stream = stream; + } + + /** + * Starts the multiplexer and drains all of its channels. Use either this method or + * {@link start} to drain the multiplexer. + * + * @returns A promise resolves when the multiplexer and its channels have ended. + */ + async drainAll(): Promise { + if (this.#channels.size === 0) { + throw new Error(`${this.#name} has no channels`); + } + + const promise = Promise.all([ + this.start(), + ...Array.from(this.#channels.values()).map(async ({ stream }) => + stream.drain(), + ), + ]).then(async () => this.#end()); + + // Set up cleanup and prevent unhandled rejections. The caller is still expected to + // handle rejections. + promise.catch(async (error) => this.#end(error)); + + return promise; + } + + /** + * Idempotently starts the multiplexer by draining the underlying duplex stream and + * forwarding messages to the appropriate channels. Ends the multiplexer if the duplex + * stream ends. Use either this method or {@link drainAll} to drain the multiplexer. + * + * If the duplex stream is synchronizable, it will be synchronized by the multiplexer + * and **should not** be synchronized by the caller. + */ + async start(): Promise { + if (this.#status !== MultiplexerStatus.Idle) { + return; + } + this.#status = MultiplexerStatus.Running; + + await this.#stream.synchronize?.(); + + for await (const envelope of this.#stream) { + const channel = this.#channels.get(envelope.channel); + if (channel === undefined) { + await this.#end( + new Error( + `${this.#name} received message for unknown channel: ${envelope.channel}`, + ), + ); + return; + } + await channel.receiveInput(envelope.payload); + } + await this.#end(); + } + + /** + * Adds a channel to the multiplexer. Must be called before starting the + * multiplexer. + * + * @param channelName - The channel name. Must be unique. + * @param validateInput - The channel's input validator. + * @param handleRead - The channel's drain handler. + * @returns The channel stream. + */ + addChannel( + channelName: ChannelName, + validateInput: ValidateInput, + handleRead: HandleRead, + ): HandledDuplexStream { + if (this.#status !== MultiplexerStatus.Idle) { + throw new Error('Channels must be added before starting the multiplexer'); + } + if (this.#channels.has(channelName)) { + throw new Error(`Channel "${channelName}" already exists.`); + } + + const { stream, receiveInput } = this.#makeChannel( + channelName, + validateInput, + handleRead, + ); + + // We downcast some properties in order to store all records in one place. + this.#channels.set(channelName, { + channelName, + stream: stream as HandledDuplexStream, + receiveInput, + }); + + return stream; + } + + /** + * Constructs a channel. Channels are objects that implement the {@link HandledDuplexStream} + * interface. Internally, they are backed up by a {@link ChannelReader} and a wrapped + * write method that forwards messages to the underlying duplex stream. + * + * @param channelName - The channel name. Must be unique. + * @param validateInput - The channel's input validator. + * @param handleRead - The channel's drain handler. + * @returns The channel stream and its `receiveInput` method. + */ + #makeChannel( + channelName: ChannelName, + validateInput: ValidateInput, + handleRead: HandleRead, + ): { + stream: HandledDuplexStream; + receiveInput: ReceiveInput; + } { + let isDone = false; + + const [reader, receiveInput] = ChannelReader.make({ + validateInput, + name: `${this.#name}#${channelName}`, + onEnd: async () => { + isDone = true; + await this.#end(); + }, + }); + + const write = async ( + payload: Write, + ): Promise> => { + if (isDone) { + return makeDoneResult(); + } + + const writeP = this.#stream.write({ + channel: channelName, + payload, + }); + writeP.catch(async (error) => { + isDone = true; + await reader.throw(error); + }); + return writeP; + }; + + const drain = async (): Promise => { + for await (const value of reader) { + await handleRead(value); + } + }; + + // Create and return the DuplexStream interface + const stream: HandledDuplexStream = { + next: reader.next.bind(reader), + return: reader.return.bind(reader), + throw: reader.throw.bind(reader), + write, + drain, + [Symbol.asyncIterator]() { + return stream; + }, + }; + + return { stream, receiveInput }; + } + + /** + * Ends the multiplexer and its channels. + */ + async return(): Promise { + await this.#end(); + } + + async #end(error?: Error): Promise { + if (this.#status === MultiplexerStatus.Done) { + return; + } + this.#status = MultiplexerStatus.Done; + + const end = async ( + stream: DuplexStream, + ): Promise => + error === undefined ? stream.return() : stream.throw(error); + + // eslint-disable-next-line promise/no-promise-in-callback + await Promise.all([ + end(this.#stream), + ...Array.from(this.#channels.values()).map(async (channel) => + end(channel.stream), + ), + ]); + } +} diff --git a/packages/streams/src/ChromeRuntimeStream.test.ts b/packages/streams/src/browser/ChromeRuntimeStream.test.ts similarity index 80% rename from packages/streams/src/ChromeRuntimeStream.test.ts rename to packages/streams/src/browser/ChromeRuntimeStream.test.ts index d4245a97f..338a20bc4 100644 --- a/packages/streams/src/ChromeRuntimeStream.test.ts +++ b/packages/streams/src/browser/ChromeRuntimeStream.test.ts @@ -2,21 +2,23 @@ import { delay } from '@ocap/test-utils'; import { stringify } from '@ocap/utils'; import { describe, expect, it, vi } from 'vitest'; -import { makeAck } from './BaseDuplexStream.js'; -import type { ValidateInput } from './BaseStream.js'; -import type { ChromeRuntime } from './chrome.js'; import type { MessageEnvelope } from './ChromeRuntimeStream.js'; import { ChromeRuntimeReader, ChromeRuntimeWriter, ChromeRuntimeStreamTarget, ChromeRuntimeDuplexStream, + ChromeRuntimeMultiplexer, } from './ChromeRuntimeStream.js'; +import { makeAck } from '../BaseDuplexStream.js'; +import type { ValidateInput } from '../BaseStream.js'; +import type { ChromeRuntime } from '../chrome.js'; +import { StreamMultiplexer } from '../StreamMultiplexer.js'; import { makeDoneResult, makePendingResult, makeStreamDoneSignal, -} from './utils.js'; +} from '../utils.js'; const makeEnvelope = ( value: unknown, @@ -114,6 +116,24 @@ describe('ChromeRuntimeReader', () => { expect(validateInput).toHaveBeenCalledWith(message); }); + it('throws if validateInput throws', async () => { + const { runtime, dispatchRuntimeMessage } = makeRuntime(); + const validateInput = (() => { + throw new Error('foo'); + }) as unknown as ValidateInput; + const reader = new ChromeRuntimeReader( + asChromeRuntime(runtime), + ChromeRuntimeStreamTarget.Background, + ChromeRuntimeStreamTarget.Offscreen, + { validateInput }, + ); + + const message = { foo: 'bar' }; + dispatchRuntimeMessage(message); + await expect(reader.next()).rejects.toThrow('foo'); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + }); + it('ignores messages from other extensions', async () => { const { runtime, dispatchRuntimeMessage } = makeRuntime(); const reader = new ChromeRuntimeReader( @@ -216,6 +236,25 @@ describe('ChromeRuntimeReader', () => { expect(await reader.next()).toStrictEqual(makeDoneResult()); expect(onEnd).toHaveBeenCalledTimes(1); }); + + it('handles errors from onEnd function', async () => { + const { runtime, dispatchRuntimeMessage } = makeRuntime(); + const onEnd = vi.fn(() => { + throw new Error('foo'); + }); + const reader = new ChromeRuntimeReader( + asChromeRuntime(runtime), + ChromeRuntimeStreamTarget.Background, + ChromeRuntimeStreamTarget.Offscreen, + { onEnd }, + ); + + dispatchRuntimeMessage(makeStreamDoneSignal()); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + }); }); describe.concurrent('ChromeRuntimeWriter', () => { @@ -339,3 +378,38 @@ describe.concurrent('ChromeRuntimeDuplexStream', () => { expect(await readP).toStrictEqual(makeDoneResult()); }); }); + +describe('ChromeRuntimeMultiplexer', () => { + it('constructs a ChromeRuntimeMultiplexer', () => { + const multiplexer = new ChromeRuntimeMultiplexer( + asChromeRuntime(makeRuntime().runtime), + ChromeRuntimeStreamTarget.Background, + ChromeRuntimeStreamTarget.Offscreen, + ); + + expect(multiplexer).toBeInstanceOf(StreamMultiplexer); + }); + + it('can create and drain channels', async () => { + const { runtime, dispatchRuntimeMessage } = makeRuntime(); + const multiplexer = new ChromeRuntimeMultiplexer( + asChromeRuntime(runtime), + ChromeRuntimeStreamTarget.Background, + ChromeRuntimeStreamTarget.Offscreen, + ); + const handleRead = vi.fn(); + multiplexer.addChannel( + '1', + (value: unknown): value is number => typeof value === 'number', + handleRead, + ); + + const drainP = multiplexer.drainAll(); + dispatchRuntimeMessage(makeAck()); + dispatchRuntimeMessage({ channel: '1', payload: 42 }); + dispatchRuntimeMessage(makeStreamDoneSignal()); + + await drainP; + expect(handleRead).toHaveBeenCalledWith(42); + }); +}); diff --git a/packages/streams/src/ChromeRuntimeStream.ts b/packages/streams/src/browser/ChromeRuntimeStream.ts similarity index 82% rename from packages/streams/src/ChromeRuntimeStream.ts rename to packages/streams/src/browser/ChromeRuntimeStream.ts index f325b3a8d..7cacae37b 100644 --- a/packages/streams/src/ChromeRuntimeStream.ts +++ b/packages/streams/src/browser/ChromeRuntimeStream.ts @@ -17,16 +17,25 @@ import type { Json } from '@metamask/utils'; import { stringify } from '@ocap/utils'; -import { BaseDuplexStream } from './BaseDuplexStream.js'; +import { + BaseDuplexStream, + makeDuplexStreamInputValidator, +} from '../BaseDuplexStream.js'; import type { BaseReaderArgs, ValidateInput, BaseWriterArgs, ReceiveInput, -} from './BaseStream.js'; -import { BaseReader, BaseWriter } from './BaseStream.js'; -import type { ChromeRuntime, ChromeMessageSender } from './chrome.js'; -import type { Dispatchable } from './utils.js'; +} from '../BaseStream.js'; +import { BaseReader, BaseWriter } from '../BaseStream.js'; +import type { ChromeRuntime, ChromeMessageSender } from '../chrome.js'; +import { + isMultiplexEnvelope, + StreamMultiplexer, +} from '../StreamMultiplexer.js'; +import type { MultiplexEnvelope } from '../StreamMultiplexer.js'; +import { isJsonUnsafe } from '../utils.js'; +import type { Dispatchable } from '../utils.js'; export enum ChromeRuntimeStreamTarget { Background = 'background', @@ -123,7 +132,9 @@ export class ChromeRuntimeReader extends BaseReader { return; } - this.#receiveInput(message.payload); + this.#receiveInput(message.payload).catch(async (error) => + this.throw(error), + ); } } harden(ChromeRuntimeReader); @@ -178,9 +189,7 @@ export class ChromeRuntimeDuplexStream< Write, ChromeRuntimeWriter > { - // Unavoidable exception to our preference for #-private names. - // eslint-disable-next-line no-restricted-syntax - private constructor( + constructor( runtime: ChromeRuntime, localTarget: ChromeRuntimeStreamTarget, remoteTarget: ChromeRuntimeStreamTarget, @@ -193,7 +202,7 @@ export class ChromeRuntimeDuplexStream< remoteTarget, { name: 'ChromeRuntimeDuplexStream', - validateInput, + validateInput: makeDuplexStreamInputValidator(validateInput), onEnd: async () => { await writer.return(); }, @@ -235,3 +244,27 @@ export class ChromeRuntimeDuplexStream< } } harden(ChromeRuntimeDuplexStream); + +const isJsonMultiplexEnvelope = ( + value: unknown, +): value is MultiplexEnvelope => + isMultiplexEnvelope(value) && isJsonUnsafe(value.payload); + +export class ChromeRuntimeMultiplexer extends StreamMultiplexer { + constructor( + runtime: ChromeRuntime, + localTarget: ChromeRuntimeStreamTarget, + remoteTarget: ChromeRuntimeStreamTarget, + name?: string, + ) { + super( + new ChromeRuntimeDuplexStream< + MultiplexEnvelope, + MultiplexEnvelope + >(runtime, localTarget, remoteTarget, isJsonMultiplexEnvelope), + name, + ); + harden(this); + } +} +harden(ChromeRuntimeMultiplexer); diff --git a/packages/streams/src/MessagePortStream.test.ts b/packages/streams/src/browser/MessagePortStream.test.ts similarity index 82% rename from packages/streams/src/MessagePortStream.test.ts rename to packages/streams/src/browser/MessagePortStream.test.ts index 23275d311..a3f8bb7e5 100644 --- a/packages/streams/src/MessagePortStream.test.ts +++ b/packages/streams/src/browser/MessagePortStream.test.ts @@ -1,18 +1,19 @@ import { delay } from '@ocap/test-utils'; import { describe, expect, it, vi } from 'vitest'; -import { makeAck } from './BaseDuplexStream.js'; -import type { ValidateInput } from './BaseStream.js'; import { MessagePortDuplexStream, + MessagePortMultiplexer, MessagePortReader, MessagePortWriter, } from './MessagePortStream.js'; +import { makeAck } from '../BaseDuplexStream.js'; +import type { ValidateInput } from '../BaseStream.js'; import { makeDoneResult, makePendingResult, makeStreamDoneSignal, -} from './utils.js'; +} from '../utils.js'; describe('MessagePortReader', () => { it('constructs a MessagePortReader', () => { @@ -51,6 +52,18 @@ describe('MessagePortReader', () => { expect(validateInput).toHaveBeenCalledWith(message); }); + it('throws if validateInput throws', async () => { + const validateInput = (() => { + throw new Error('foo'); + }) as unknown as ValidateInput; + const { port1, port2 } = new MessageChannel(); + const reader = new MessagePortReader(port1, { validateInput }); + + port2.postMessage(42); + await expect(reader.next()).rejects.toThrow('foo'); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + }); + it('closes the port when done', async () => { const { port1, port2 } = new MessageChannel(); const closeSpy = vi.spyOn(port1, 'close'); @@ -201,3 +214,31 @@ describe('MessagePortDuplexStream', () => { expect(await readP).toStrictEqual(makeDoneResult()); }); }); + +describe('MessagePortMultiplexer', () => { + it('constructs a MessagePortMultiplexer', () => { + const { port1 } = new MessageChannel(); + const multiplexer = new MessagePortMultiplexer(port1); + + expect(multiplexer).toBeInstanceOf(MessagePortMultiplexer); + }); + + it('can create and drain channels', async () => { + const { port1, port2 } = new MessageChannel(); + const multiplexer = new MessagePortMultiplexer(port1); + const handleRead = vi.fn(); + multiplexer.addChannel( + '1', + (value: unknown): value is number => typeof value === 'number', + handleRead, + ); + + const drainP = multiplexer.drainAll(); + port2.postMessage(makeAck()); + port2.postMessage({ channel: '1', payload: 42 }); + port2.postMessage(makeStreamDoneSignal()); + + await drainP; + expect(handleRead).toHaveBeenCalledWith(42); + }); +}); diff --git a/packages/streams/src/MessagePortStream.ts b/packages/streams/src/browser/MessagePortStream.ts similarity index 75% rename from packages/streams/src/MessagePortStream.ts rename to packages/streams/src/browser/MessagePortStream.ts index 27a398f3e..97ccf6423 100644 --- a/packages/streams/src/MessagePortStream.ts +++ b/packages/streams/src/browser/MessagePortStream.ts @@ -19,16 +19,22 @@ * @module MessagePort streams */ -import type { Json } from '@metamask/utils'; - -import { BaseDuplexStream } from './BaseDuplexStream.js'; +import type { OnMessage } from './utils.js'; +import { + BaseDuplexStream, + makeDuplexStreamInputValidator, +} from '../BaseDuplexStream.js'; import type { BaseReaderArgs, BaseWriterArgs, ValidateInput, -} from './BaseStream.js'; -import { BaseReader, BaseWriter } from './BaseStream.js'; -import type { Dispatchable, OnMessage } from './utils.js'; +} from '../BaseStream.js'; +import { BaseReader, BaseWriter } from '../BaseStream.js'; +import { + isMultiplexEnvelope, + StreamMultiplexer, +} from '../StreamMultiplexer.js'; +import type { Dispatchable } from '../utils.js'; /** * A readable stream over a {@link MessagePort}. @@ -41,17 +47,15 @@ import type { Dispatchable, OnMessage } from './utils.js'; * - {@link MessagePortWriter} for the corresponding writable stream. * - The module-level documentation for more details. */ -export class MessagePortReader extends BaseReader { +export class MessagePortReader extends BaseReader { constructor( port: MessagePort, { validateInput, onEnd }: BaseReaderArgs = {}, ) { - // eslint-disable-next-line prefer-const - let onMessage: OnMessage; - super({ validateInput, onEnd: async () => { + // eslint-disable-next-line @typescript-eslint/no-use-before-define port.removeEventListener('message', onMessage); port.close(); await onEnd?.(); @@ -60,12 +64,12 @@ export class MessagePortReader extends BaseReader { const receiveInput = super.getReceiveInput(); - onMessage = (messageEvent) => { + const onMessage: OnMessage = (messageEvent) => { if (messageEvent.ports.length > 0) { return; } - receiveInput(messageEvent.data); + receiveInput(messageEvent.data).catch(async (error) => this.throw(error)); }; port.addEventListener('message', onMessage); port.start(); @@ -82,7 +86,7 @@ harden(MessagePortReader); * - {@link MessagePortReader} for the corresponding readable stream. * - The module-level documentation for more details. */ -export class MessagePortWriter extends BaseWriter { +export class MessagePortWriter extends BaseWriter { constructor( port: MessagePort, { name, onEnd }: Omit, 'onDispatch'> = {}, @@ -102,21 +106,19 @@ export class MessagePortWriter extends BaseWriter { harden(MessagePortWriter); export class MessagePortDuplexStream< - Read extends Json, - Write extends Json = Read, + Read, + Write = Read, > extends BaseDuplexStream< Read, MessagePortReader, Write, MessagePortWriter > { - // Unavoidable exception to our preference for #-private names. - // eslint-disable-next-line no-restricted-syntax - private constructor(port: MessagePort, validateInput?: ValidateInput) { + constructor(port: MessagePort, validateInput?: ValidateInput) { let writer: MessagePortWriter; // eslint-disable-line prefer-const const reader = new MessagePortReader(port, { name: 'MessagePortDuplexStream', - validateInput, + validateInput: makeDuplexStreamInputValidator(validateInput), onEnd: async () => { await writer.return(); }, @@ -130,7 +132,7 @@ export class MessagePortDuplexStream< super(reader, writer); } - static async make( + static async make( port: MessagePort, validateInput?: ValidateInput, ): Promise> { @@ -143,3 +145,11 @@ export class MessagePortDuplexStream< } } harden(MessagePortDuplexStream); + +export class MessagePortMultiplexer extends StreamMultiplexer { + constructor(port: MessagePort, name?: string) { + super(new MessagePortDuplexStream(port, isMultiplexEnvelope), name); + harden(this); + } +} +harden(MessagePortMultiplexer); diff --git a/packages/streams/src/PostMessageStream.test.ts b/packages/streams/src/browser/PostMessageStream.test.ts similarity index 79% rename from packages/streams/src/PostMessageStream.test.ts rename to packages/streams/src/browser/PostMessageStream.test.ts index 7282bee75..b291d1054 100644 --- a/packages/streams/src/PostMessageStream.test.ts +++ b/packages/streams/src/browser/PostMessageStream.test.ts @@ -1,19 +1,21 @@ import { delay } from '@ocap/test-utils'; import { describe, it, expect, vi } from 'vitest'; -import { makeAck } from './BaseDuplexStream.js'; -import type { ValidateInput } from './BaseStream.js'; import { PostMessageDuplexStream, + PostMessageMultiplexer, PostMessageReader, PostMessageWriter, } from './PostMessageStream.js'; import type { PostMessage } from './utils.js'; +import { makeAck } from '../BaseDuplexStream.js'; +import type { ValidateInput } from '../BaseStream.js'; +import { StreamMultiplexer } from '../StreamMultiplexer.js'; import { makeDoneResult, makePendingResult, makeStreamDoneSignal, -} from './utils.js'; +} from '../utils.js'; // This function declares its own return type. // eslint-disable-next-line @typescript-eslint/explicit-function-return-type @@ -67,6 +69,21 @@ describe('PostMessageReader', () => { expect(validateInput).toHaveBeenCalledWith(message); }); + it('throws if validateInput throws', async () => { + const { postMessageFn, setListener, removeListener } = + makePostMessageMock(); + const validateInput = (() => { + throw new Error('foo'); + }) as unknown as ValidateInput; + const reader = new PostMessageReader(setListener, removeListener, { + validateInput, + }); + + postMessageFn(42); + await expect(reader.next()).rejects.toThrow('foo'); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + }); + it('removes its listener when it ends', async () => { const { postMessageFn, setListener, removeListener, listeners } = makePostMessageMock(); @@ -213,3 +230,41 @@ describe('PostMessageDuplexStream', () => { expect(await readP).toStrictEqual(makeDoneResult()); }); }); + +describe('PostMessageMultiplexer', () => { + it('constructs a PostMessageMultiplexer', () => { + const { postMessageFn, setListener, removeListener } = + makePostMessageMock(); + const multiplexer = new PostMessageMultiplexer( + postMessageFn, + setListener, + removeListener, + ); + + expect(multiplexer).toBeInstanceOf(StreamMultiplexer); + }); + + it('can create and drain channels', async () => { + const { postMessageFn, setListener, removeListener } = + makePostMessageMock(); + const multiplexer = new PostMessageMultiplexer( + postMessageFn, + setListener, + removeListener, + ); + const handleRead = vi.fn(); + multiplexer.addChannel( + '1', + (value: unknown): value is number => typeof value === 'number', + handleRead, + ); + + const drainP = multiplexer.drainAll(); + postMessageFn(makeAck()); + postMessageFn({ channel: '1', payload: 42 }); + postMessageFn(makeStreamDoneSignal()); + + await drainP; + expect(handleRead).toHaveBeenCalledWith(42); + }); +}); diff --git a/packages/streams/src/PostMessageStream.ts b/packages/streams/src/browser/PostMessageStream.ts similarity index 73% rename from packages/streams/src/PostMessageStream.ts rename to packages/streams/src/browser/PostMessageStream.ts index d5e554071..7517f30ad 100644 --- a/packages/streams/src/PostMessageStream.ts +++ b/packages/streams/src/browser/PostMessageStream.ts @@ -6,16 +6,22 @@ * @module PostMessage streams */ -import type { Json } from '@metamask/utils'; - -import { BaseDuplexStream } from './BaseDuplexStream.js'; +import type { OnMessage, PostMessage } from './utils.js'; +import { + BaseDuplexStream, + makeDuplexStreamInputValidator, +} from '../BaseDuplexStream.js'; import type { BaseReaderArgs, BaseWriterArgs, ValidateInput, -} from './BaseStream.js'; -import { BaseReader, BaseWriter } from './BaseStream.js'; -import type { Dispatchable, OnMessage, PostMessage } from './utils.js'; +} from '../BaseStream.js'; +import { BaseReader, BaseWriter } from '../BaseStream.js'; +import { + isMultiplexEnvelope, + StreamMultiplexer, +} from '../StreamMultiplexer.js'; +import type { Dispatchable } from '../utils.js'; type SetListener = (onMessage: OnMessage) => void; type RemoveListener = (onMessage: OnMessage) => void; @@ -28,7 +34,7 @@ type RemoveListener = (onMessage: OnMessage) => void; * * @see {@link PostMessageWriter} for the corresponding writable stream. */ -export class PostMessageReader extends BaseReader { +export class PostMessageReader extends BaseReader { constructor( setListener: SetListener, removeListener: RemoveListener, @@ -51,7 +57,7 @@ export class PostMessageReader extends BaseReader { return; } - receiveInput(messageEvent.data); + receiveInput(messageEvent.data).catch(async (error) => this.throw(error)); }; setListener(onMessage); @@ -65,7 +71,7 @@ harden(PostMessageReader); * * @see {@link PostMessageReader} for the corresponding readable stream. */ -export class PostMessageWriter extends BaseWriter { +export class PostMessageWriter extends BaseWriter { constructor( postMessageFn: PostMessage, { name, onEnd }: Omit, 'onDispatch'> = {}, @@ -89,17 +95,15 @@ harden(PostMessageWriter); * @see {@link PostMessageWriter} for the corresponding writable stream. */ export class PostMessageDuplexStream< - Read extends Json, - Write extends Json = Read, + Read, + Write = Read, > extends BaseDuplexStream< Read, PostMessageReader, Write, PostMessageWriter > { - // Unavoidable exception to our preference for #-private names. - // eslint-disable-next-line no-restricted-syntax - private constructor( + constructor( postMessageFn: PostMessage, setListener: SetListener, removeListener: RemoveListener, @@ -108,7 +112,7 @@ export class PostMessageDuplexStream< let writer: PostMessageWriter; // eslint-disable-line prefer-const const reader = new PostMessageReader(setListener, removeListener, { name: 'PostMessageDuplexStream', - validateInput, + validateInput: makeDuplexStreamInputValidator(validateInput), onEnd: async () => { await writer.return(); }, @@ -122,7 +126,7 @@ export class PostMessageDuplexStream< super(reader, writer); } - static async make( + static async make( postMessageFn: PostMessage, setListener: SetListener, removeListener: RemoveListener, @@ -139,3 +143,24 @@ export class PostMessageDuplexStream< } } harden(PostMessageDuplexStream); + +export class PostMessageMultiplexer extends StreamMultiplexer { + constructor( + postMessageFn: PostMessage, + setListener: SetListener, + removeListener: RemoveListener, + name?: string, + ) { + super( + new PostMessageDuplexStream( + postMessageFn, + setListener, + removeListener, + isMultiplexEnvelope, + ), + name, + ); + harden(this); + } +} +harden(PostMessageMultiplexer); diff --git a/packages/streams/src/browser/utils.ts b/packages/streams/src/browser/utils.ts new file mode 100644 index 000000000..5f6a4d2b2 --- /dev/null +++ b/packages/streams/src/browser/utils.ts @@ -0,0 +1,2 @@ +export type PostMessage = (message: unknown) => void; +export type OnMessage = (event: MessageEvent) => void; diff --git a/packages/streams/src/envelope-handler.test.ts b/packages/streams/src/envelope-handler.test.ts deleted file mode 100644 index 2269153b4..000000000 --- a/packages/streams/src/envelope-handler.test.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import { makeStreamEnvelopeHandler } from './envelope-handler.js'; -import { - barContent, - fooContent, - isStreamEnvelope, - Label, - streamEnveloper, -} from '../test/envelope-kit-fixtures.js'; - -describe('StreamEnvelopeHandler', () => { - const testEnvelopeHandlers = { - foo: async () => Label.Foo, - bar: async () => Label.Bar, - }; - - const testErrorHandler = (problem: unknown): never => { - throw new Error(`TEST ${String(problem)}`); - }; - - it.each` - wrapper | content | label - ${streamEnveloper.foo.wrap} | ${fooContent} | ${Label.Foo} - ${streamEnveloper.bar.wrap} | ${barContent} | ${Label.Bar} - `('handles valid StreamEnvelopes', async ({ wrapper, content, label }) => { - const handler = makeStreamEnvelopeHandler( - streamEnveloper, - isStreamEnvelope, - testEnvelopeHandlers, - testErrorHandler, - ); - console.debug(wrapper(content)); - expect(await handler.handle(wrapper(content))).toStrictEqual(label); - }); - - it('routes invalid envelopes to default error handler', async () => { - const handler = makeStreamEnvelopeHandler( - streamEnveloper, - isStreamEnvelope, - testEnvelopeHandlers, - ); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow(/^Stream envelope handler received unexpected value/u); - }); - - it('routes invalid envelopes to supplied error handler', async () => { - const handler = makeStreamEnvelopeHandler( - streamEnveloper, - isStreamEnvelope, - testEnvelopeHandlers, - testErrorHandler, - ); - await expect( - // @ts-expect-error label is intentionally unknown - handler.handle({ label: 'unknown', content: [] }), - ).rejects.toThrow( - /^TEST Stream envelope handler received unexpected value/u, - ); - }); - - it('routes valid stream envelopes with an unhandled label to the error handler', async () => { - const handler = makeStreamEnvelopeHandler( - streamEnveloper, - isStreamEnvelope, - { foo: testEnvelopeHandlers.foo }, - testErrorHandler, - ); - await expect( - handler.handle(streamEnveloper.bar.wrap(barContent)), - ).rejects.toThrow( - /^TEST Stream envelope handler received an envelope with known but unexpected label/u, - ); - }); -}); diff --git a/packages/streams/src/envelope-handler.ts b/packages/streams/src/envelope-handler.ts deleted file mode 100644 index 9bbdf2da5..000000000 --- a/packages/streams/src/envelope-handler.ts +++ /dev/null @@ -1,142 +0,0 @@ -import { stringify } from '@ocap/utils'; - -import type { StreamEnvelope } from './envelope.js'; -import type { StreamEnveloper } from './enveloper.js'; -import type { TypeMap } from './utils/generics.js'; - -/** - * A handler for automatically unwrapping stream envelopes and handling their content. - */ -export type StreamEnvelopeHandler< - Labels extends readonly string[], - ContentMap extends TypeMap, -> = { - /** - * Checks an unknown value for envelope labels, applying the label's handler - * if known, and applying the error handler if the label is not handled or if - * the content did not pass the envelope's type guard. - * - * @template Envelope - The type of the envelope. - * @param envelope - The envelope to handle. - * @returns The result of the handler. - */ - handle: >( - envelope: Envelope, - ) => Promise; - /** - * The bag of async content handlers labeled with the {@link EnvelopeLabel} they handle. - */ - contentHandlers: StreamEnvelopeContentHandlerBag; - /** - * The error handler for the stream envelope handler. - */ - errorHandler: StreamEnvelopeErrorHandler; -}; - -/** - * A handler for a specific stream envelope label. - */ -type StreamEnvelopeContentHandler< - EnvelopeLabel extends string, - ContentMap extends TypeMap, - Label extends EnvelopeLabel, -> = (content: ContentMap[Label]) => Promise; - -/** - * An object with {@link EnvelopeLabel} keys mapping to an appropriate {@link StreamEnvelopeContentHandler}. - * If the stream envelope handler encounters a well-formed stream envelope without a defined handler, - * the envelope will be passed to the {@link ErrorHandler}. - */ -export type StreamEnvelopeContentHandlerBag< - Labels extends readonly string[], - ContentMap extends TypeMap, -> = { - [Label in Labels[number]]?: (content: ContentMap[Label]) => Promise; -}; - -/** - * A handler for stream envelope parsing errors. - * If the {@link StreamEnvelopeHandler} encounters an error while parsing the supplied value, - * it will pass the reason and value to the error handler. - */ -export type StreamEnvelopeErrorHandler = ( - reason: string, - value: unknown, -) => unknown; - -/** - * The default handler for stream envelope parsing errors. - * - * @param reason - The reason for the error. - * @param value - The value that caused the error. - */ -const defaultStreamEnvelopeErrorHandler: StreamEnvelopeErrorHandler = ( - reason, - value, -) => { - throw new Error(`${reason} ${stringify(value)}`); -}; - -/** - * Makes a {@link StreamEnvelopeHandler} which handles an unknown value. - * - * If the supplied value is a valid envelope with a defined {@link StreamEnvelopeHandler}, - * the stream envelope handler will return whatever the defined handler returns. - * - * If the stream envelope handler is passed a well-formed stream envelope without a defined handler, - * an explanation and the envelope will be passed to the supplied {@link StreamEnvelopeErrorHandler}. - * - * If the stream envelope handler encounters an error while parsing the supplied value, - * it will pass the reason and value to the supplied {@link StreamEnvelopeErrorHandler}. - * - * If no error handler is supplied, the default error handling behavior is to throw. - * - * @param streamEnveloper - A {@link StreamEnveloper} made with the same Labels. - * @param isStreamEnvelope - A type guard which identifies stream envelopes. - * @param contentHandlers - A bag of async content handlers labeled with the {@link EnvelopeLabel} they handle. - * @param errorHandler - An optional synchronous error handler. - * @returns The stream envelope handler. - */ -export const makeStreamEnvelopeHandler = < - Labels extends readonly string[], - ContentMap extends TypeMap, ->( - streamEnveloper: StreamEnveloper, - isStreamEnvelope: ( - value: unknown, - ) => value is StreamEnvelope, - contentHandlers: StreamEnvelopeContentHandlerBag, - errorHandler: StreamEnvelopeErrorHandler = defaultStreamEnvelopeErrorHandler, -): StreamEnvelopeHandler => { - return { - handle: async (value: unknown) => { - if (!isStreamEnvelope(value)) { - return errorHandler( - 'Stream envelope handler received unexpected value', - value, - ); - } - const envelope = value; - const handler = contentHandlers[envelope.label] as - | StreamEnvelopeContentHandler< - Labels[number], - ContentMap, - typeof envelope.label - > - | undefined; - const enveloper = streamEnveloper[envelope.label]; - if (!handler || !enveloper) { - console.debug(`handler: ${stringify(handler)}`); - console.debug(`enveloper: ${stringify(enveloper)}`); - return errorHandler( - 'Stream envelope handler received an envelope with known but unexpected label', - envelope, - ); - } - const content = enveloper.unwrap(envelope); - return await handler(content); - }, - contentHandlers, - errorHandler, - }; -}; diff --git a/packages/streams/src/envelope-kit.test.ts b/packages/streams/src/envelope-kit.test.ts deleted file mode 100644 index f4eee1019..000000000 --- a/packages/streams/src/envelope-kit.test.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import { makeStreamEnvelopeKit } from './envelope-kit.js'; -import type { - Bar, - ContentMap, - Foo, - labels, -} from '../test/envelope-kit-fixtures.js'; - -describe('makeStreamEnvelopeKit', () => { - it.each` - property - ${'streamEnveloper'} - ${'isStreamEnvelope'} - ${'makeStreamEnvelopeHandler'} - `('has the expected property: $property', ({ property }) => { - const streamEnvelopeKit = makeStreamEnvelopeKit({ - foo: (value: unknown): value is Foo => true, - bar: (value: unknown): value is Bar => true, - }); - expect(streamEnvelopeKit).toHaveProperty(property); - }); -}); diff --git a/packages/streams/src/envelope-kit.ts b/packages/streams/src/envelope-kit.ts deleted file mode 100644 index ad6dc0d5a..000000000 --- a/packages/streams/src/envelope-kit.ts +++ /dev/null @@ -1,95 +0,0 @@ -import type { - StreamEnvelopeContentHandlerBag, - StreamEnvelopeErrorHandler, - StreamEnvelopeHandler, -} from './envelope-handler.js'; -import { makeStreamEnvelopeHandler as makeHandler } from './envelope-handler.js'; -import type { StreamEnvelope } from './envelope.js'; -import { isLabeled } from './envelope.js'; -import type { - Enveloper, - StreamEnveloper, - StreamEnveloperGuards, -} from './enveloper.js'; -import { makeStreamEnveloper } from './enveloper.js'; -import type { TypeMap } from './utils/generics.js'; - -export type MakeStreamEnvelopeHandler< - Labels extends readonly string[], - ContentMap extends TypeMap, -> = ( - contentHandlers: StreamEnvelopeContentHandlerBag, - errorHandler?: StreamEnvelopeErrorHandler, -) => StreamEnvelopeHandler; - -export type StreamEnvelopeKit< - Labels extends readonly string[], - ContentMap extends TypeMap, -> = { - streamEnveloper: StreamEnveloper; - isStreamEnvelope: ( - value: unknown, - ) => value is StreamEnvelope; - makeStreamEnvelopeHandler: MakeStreamEnvelopeHandler; -}; - -/** - * Make a {@link StreamEnvelopeKit}. - * The template parameters must be explicitly declared. See tutorial for suggested declaration pattern. - * - * @tutorial documents/make-stream-envelope-kit.md - An example showing how to specify the template parameters, including how to pass an enum type as a template parameter. - * @template Labels - An enum of envelope labels. WARNING: if specified improperly, typescript inference fails. See referenced tutorial. - * @template Content - An object type mapping the specified labels to the type of content they label. - * @param guards - An object mapping the specified envelope labels to a type guard of their contents. - * @returns The {@link StreamEnvelopeKit}. - */ -export const makeStreamEnvelopeKit = < - Labels extends string[], - ContentMap extends TypeMap, ->( - guards: StreamEnveloperGuards, -): StreamEnvelopeKit => { - const streamEnveloper = makeStreamEnveloper(guards); - const isStreamEnvelope = ( - value: unknown, - ): value is StreamEnvelope => - isLabeled(value) && - ( - Object.values(streamEnveloper) as Enveloper[] - ).some((enveloper) => enveloper.check(value)); - - /** - * Makes a {@link StreamEnvelopeHandler} which handles an unknown value. - * - * If the supplied value is a valid envelope with a defined {@link StreamEnvelopeHandler}, - * the stream envelope handler will return whatever the defined handler returns. - * - * If the stream envelope handler is passed a well-formed stream envelope without a defined handler, - * an explanation and the envelope will be passed to the supplied {@link StreamEnvelopeErrorHandler}. - * - * If the stream envelope handler encounters an error while parsing the supplied value, - * it will pass the reason and value to the supplied {@link StreamEnvelopeErrorHandler}. - * - * If no error handler is supplied, the default error handling behavior is to throw. - * - * @param contentHandlers - A bag of async content handlers labeled with the {@link EnvelopeLabel} they handle. - * @param errorHandler - An optional synchronous error handler. - * @returns The stream envelope handler. - */ - const makeStreamEnvelopeHandler = ( - contentHandlers: StreamEnvelopeContentHandlerBag, - errorHandler?: StreamEnvelopeErrorHandler, - ): StreamEnvelopeHandler => - makeHandler( - streamEnveloper, - isStreamEnvelope, - contentHandlers, - errorHandler, - ); - - return { - streamEnveloper, - isStreamEnvelope, - makeStreamEnvelopeHandler, - }; -}; diff --git a/packages/streams/src/envelope-kit.types.test.ts b/packages/streams/src/envelope-kit.types.test.ts deleted file mode 100644 index 76ed93812..000000000 --- a/packages/streams/src/envelope-kit.types.test.ts +++ /dev/null @@ -1,214 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import { makeStreamEnvelopeKit } from './envelope-kit.js'; -import type { - Bar, - ContentMap, - Foo, - labels, -} from '../test/envelope-kit-fixtures.js'; -import { - makeStreamEnvelopeHandler as kitMakeStreamEnvelopeHandler, - isStreamEnvelope, - Label, - streamEnveloper, - fooContent, - barContent, -} from '../test/envelope-kit-fixtures.js'; - -const inferNumber = (value: number): number => value; -const inferString = (value: string): string => value; -const inferBoolean = (value: boolean): boolean => value; - -describe('makeStreamEnvelopeKit', () => { - it('causes a typescript error when supplying typeguard keys not matching the label type', () => { - // @ts-expect-error the bar key is missing - makeStreamEnvelopeKit({ - foo: (value: unknown): value is Foo => true, - }); - makeStreamEnvelopeKit({ - foo: (value: unknown): value is Foo => true, - bar: (value: unknown): value is Bar => true, - // @ts-expect-error the qux key is not included in labels - qux: (value: unknown): value is 'qux' => false, - }); - }); - - describe('kitted makeStreamEnvelopeHandler', () => { - it('provides proper typescript inferences', () => { - // all label arguments are optional - kitMakeStreamEnvelopeHandler({}); - // bar is optional - kitMakeStreamEnvelopeHandler({ - foo: async (content) => { - inferNumber(content.a); - // @ts-expect-error a is not a string - inferString(content.a); - // @ts-expect-error b is not a number - inferNumber(content.b); - inferString(content.b); - // @ts-expect-error c is undefined - value.content.c; - }, - }); - // keys not included in labels are forbidden - kitMakeStreamEnvelopeHandler({ - // @ts-expect-error the qux key is not included in labels - qux: async (content: any) => content, - }); - }); - }); -}); - -describe('isStreamEnvelope', () => { - it('provides proper typescript inferences', () => { - const value: any = null; - if (isStreamEnvelope(value)) { - switch (value.label) { - case Label.Foo: - inferNumber(value.content.a); - // @ts-expect-error a is not a string - inferString(value.content.a); - // @ts-expect-error b is not a number - inferNumber(value.content.b); - inferString(value.content.b); - // @ts-expect-error c is undefined - value.content.c; - break; - case Label.Bar: - // @ts-expect-error a is undefined - value.content.a; - // @ts-expect-error a is undefined - value.content.b; - inferBoolean(value.content.c); - break; - default: // unreachable - // @ts-expect-error label options are exhausted - value.label; - } - } - }); -}); - -describe('StreamEnveloper', () => { - describe('check', () => { - it('provides proper typescript inferences', () => { - const envelope: any = null; - if (streamEnveloper.foo.check(envelope)) { - inferNumber(envelope.content.a); - // @ts-expect-error a is not a string - inferString(envelope.content.a); - // @ts-expect-error b is not a number - inferNumber(envelope.content.b); - inferString(envelope.content.b); - // @ts-expect-error c is not defined - envelope.content.c; - switch (envelope.label) { - case Label.Foo: - // eslint-disable-next-line vitest/no-conditional-expect - expect(envelope.label).toMatch(Label.Foo); - break; - // @ts-expect-error label is Label.Foo - case Label.Bar: // unreachable - // @ts-expect-error label is inferred to be never - envelope.label.length; - break; - default: // unreachable - // @ts-expect-error label is inferred to be never - envelope.label.length; - } - } - - if (streamEnveloper.bar.check(envelope)) { - // @ts-expect-error a is not defined - envelope.content.a; - // @ts-expect-error b is not defined - envelope.content.b; - inferBoolean(envelope.content.c); - switch (envelope.label) { - // @ts-expect-error label is Label.Bar - case Label.Foo: // unreachable - // @ts-expect-error label is inferred to be never - envelope.label.length; - break; - case Label.Bar: - // eslint-disable-next-line vitest/no-conditional-expect - expect(envelope.label).toMatch(Label.Bar); - break; - default: // unreachable - // @ts-expect-error label is inferred to be never - envelope.label.length; - } - } - }); - }); - - describe('wrap', () => { - it('provides proper typescript inferences', () => { - streamEnveloper.foo.wrap(fooContent); - // @ts-expect-error foo rejects barContent - streamEnveloper.foo.wrap(barContent); - // @ts-expect-error bar rejects fooContent - streamEnveloper.bar.wrap(fooContent); - streamEnveloper.bar.wrap(barContent); - }); - }); - - describe('unwrap', () => { - it('provides proper typescript inferences', () => { - const envelope: any = null; - try { - const content = streamEnveloper.foo.unwrap(envelope); - - inferNumber(content.a); - // @ts-expect-error a is not a string - inferString(content.a); - // @ts-expect-error b is not a number - inferNumber(content.b); - inferString(content.b); - // @ts-expect-error c is undefined - content.c; - } catch { - undefined; - } - - try { - // @ts-expect-error envelope was already inferred to be Envelope - content = streamEnveloper.bar.unwrap(envelope); - } catch { - undefined; - } - }); - }); - - describe('label', () => { - it('provides proper typescript inferences', () => { - const fooEnveloper: any = streamEnveloper.foo; - const inferFooEnveloper = ( - enveloper: typeof streamEnveloper.foo, - ): unknown => enveloper; - const inferBarEnveloper = ( - enveloper: typeof streamEnveloper.bar, - ): unknown => enveloper; - - type Enveloper = (typeof streamEnveloper)[keyof typeof streamEnveloper]; - const ambiguousEnveloper = fooEnveloper as Enveloper; - - switch (ambiguousEnveloper.label) { - case Label.Foo: - inferFooEnveloper(ambiguousEnveloper); - // @ts-expect-error label = Label.Foo implies ambiguousEnveloper is a FooEnveloper - inferBarEnveloper(ambiguousEnveloper); - break; - case Label.Bar: - // @ts-expect-error label = Label.Bar implies ambiguousEnveloper is a BarEnveloper - inferFooEnveloper(ambiguousEnveloper); - inferBarEnveloper(ambiguousEnveloper); - break; - default: // unreachable - // @ts-expect-error label options are exhausted - ambiguousEnveloper.label; - } - }); - }); -}); diff --git a/packages/streams/src/envelope.test.ts b/packages/streams/src/envelope.test.ts deleted file mode 100644 index a7ef53db7..000000000 --- a/packages/streams/src/envelope.test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import type { Foo } from '../test/envelope-kit-fixtures.js'; -import { - barContent, - fooContent, - isStreamEnvelope, - streamEnveloper, -} from '../test/envelope-kit-fixtures.js'; - -describe('isStreamEnvelope', () => { - it.each` - value - ${streamEnveloper.foo.wrap(fooContent)} - ${streamEnveloper.bar.wrap(barContent)} - `('returns true for valid envelopes: $value', ({ value }) => { - expect(isStreamEnvelope(value)).toBe(true); - }); - - it.each` - value - ${null} - ${true} - ${[]} - ${{}} - ${fooContent} - ${{ id: '0x5012C312312' }} - ${streamEnveloper.foo.wrap(barContent as unknown as Foo)} - `('returns false for invalid values: $value', ({ value }) => { - expect(isStreamEnvelope(value)).toBe(false); - }); -}); diff --git a/packages/streams/src/envelope.ts b/packages/streams/src/envelope.ts deleted file mode 100644 index 5a53f4f2f..000000000 --- a/packages/streams/src/envelope.ts +++ /dev/null @@ -1,35 +0,0 @@ -// Envelope types and type guards. - -import { isObject } from '@metamask/utils'; - -import type { TypeMap } from './utils/generics.js'; - -export type Envelope