diff --git a/packages/extension/package.json b/packages/extension/package.json index 6d8d2dfc6..1ad39316c 100644 --- a/packages/extension/package.json +++ b/packages/extension/package.json @@ -36,6 +36,7 @@ "@endo/eventual-send": "^1.2.4", "@endo/exo": "^1.5.2", "@endo/patterns": "^1.4.2", + "@endo/promise-kit": "^1.1.4", "@metamask/snaps-utils": "^8.3.0", "@metamask/utils": "^9.1.0", "@ocap/kernel": "workspace:^", @@ -51,6 +52,7 @@ "@metamask/eslint-config": "^13.0.0", "@metamask/eslint-config-nodejs": "^13.0.0", "@metamask/eslint-config-typescript": "^13.0.0", + "@ocap/test-utils": "workspace:^", "@types/chrome": "^0.0.268", "@typescript-eslint/eslint-plugin": "^8.1.0", "@typescript-eslint/parser": "^8.1.0", diff --git a/packages/extension/src/VatWorkerClient.test.ts b/packages/extension/src/VatWorkerClient.test.ts new file mode 100644 index 000000000..1751be368 --- /dev/null +++ b/packages/extension/src/VatWorkerClient.test.ts @@ -0,0 +1,82 @@ +import '@ocap/shims/endoify'; +import type { VatId } from '@ocap/kernel'; +import { delay } from '@ocap/test-utils'; +import type { Logger } from '@ocap/utils'; +import { makeLogger } from '@ocap/utils'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import { VatWorkerServiceMethod } from './vat-worker-service.js'; +import type { ExtensionVatWorkerClient } from './VatWorkerClient.js'; +import { makeTestClient } from '../test/vat-worker-service.js'; + +describe('ExtensionVatWorkerClient', () => { + let serverPort: MessagePort; + let clientPort: MessagePort; + + let clientLogger: Logger; + + let client: ExtensionVatWorkerClient; + + beforeEach(() => { + const serviceMessageChannel = new MessageChannel(); + serverPort = serviceMessageChannel.port1; + clientPort = serviceMessageChannel.port2; + + clientLogger = makeLogger('[test client]'); + client = makeTestClient(clientPort, clientLogger); + }); + + it('calls logger.debug when receiving an unexpected message', async () => { + const debugSpy = vi.spyOn(clientLogger, 'debug'); + const unexpectedMessage = 'foobar'; + serverPort.postMessage(unexpectedMessage); + await delay(100); + expect(debugSpy).toHaveBeenCalledOnce(); + expect(debugSpy).toHaveBeenLastCalledWith( + 'Received unexpected message', + unexpectedMessage, + ); + }); + + it.each` + method + ${VatWorkerServiceMethod.Init} + ${VatWorkerServiceMethod.Delete} + `( + "calls logger.error when receiving a $method reply it wasn't waiting for", + async ({ method }) => { + const errorSpy = vi.spyOn(clientLogger, 'error'); + const unexpectedReply = { + method, + id: 9, + vatId: 'v0', + }; + serverPort.postMessage(unexpectedReply); + await delay(100); + expect(errorSpy).toHaveBeenCalledOnce(); + expect(errorSpy).toHaveBeenLastCalledWith( + 'Received unexpected reply', + unexpectedReply, + ); + }, + ); + + it(`calls logger.error when receiving a ${VatWorkerServiceMethod.Init} reply without a port`, async () => { + const errorSpy = vi.spyOn(clientLogger, 'error'); + const vatId: VatId = 'v0'; + // eslint-disable-next-line @typescript-eslint/no-floating-promises + client.initWorker(vatId); + const reply = { + method: VatWorkerServiceMethod.Init, + id: 1, + vatId: 'v0', + }; + serverPort.postMessage(reply); + await delay(100); + expect(errorSpy).toHaveBeenCalledOnce(); + expect(errorSpy.mock.lastCall?.[0]).toBe( + 'Expected a port with message reply', + ); + expect(errorSpy.mock.lastCall?.[1]).toMatchObject({ data: reply }); + }); +}); diff --git a/packages/extension/src/VatWorkerClient.ts b/packages/extension/src/VatWorkerClient.ts new file mode 100644 index 000000000..a37dcc689 --- /dev/null +++ b/packages/extension/src/VatWorkerClient.ts @@ -0,0 +1,135 @@ +import { makePromiseKit } from '@endo/promise-kit'; +import type { PromiseKit } from '@endo/promise-kit'; +import type { + StreamEnvelope, + StreamEnvelopeReply, + VatWorkerService, + VatId, +} from '@ocap/kernel'; +import type { DuplexStream } from '@ocap/streams'; +import { MessagePortDuplexStream } from '@ocap/streams'; +import type { Logger } from '@ocap/utils'; +import { makeCounter, makeHandledCallback, makeLogger } from '@ocap/utils'; + +import type { AddListener } from './vat-worker-service.js'; +import { + isVatWorkerServiceMessage, + VatWorkerServiceMethod, +} from './vat-worker-service.js'; +// Appears in the docs. +// eslint-disable-next-line @typescript-eslint/no-unused-vars +import type { ExtensionVatWorkerServer } from './VatWorkerServer.js'; + +type PromiseCallbacks = Omit, 'promise'>; + +export class ExtensionVatWorkerClient implements VatWorkerService { + readonly #logger: Logger; + + readonly #unresolvedMessages: Map = new Map(); + + readonly #messageCounter = makeCounter(); + + readonly #postMessage: (message: unknown) => void; + + /** + * The client end of the vat worker service, intended to be constructed in + * the kernel worker. Sends initWorker and deleteWorker requests to the + * server and wraps the initWorker response in a DuplexStream for consumption + * by the kernel. + * + * @see {@link ExtensionVatWorkerServer} for the other end of the service. + * + * @param postMessage - A method for posting a message to the server. + * @param addListener - A method for registering a listener for messages from the server. + * @param logger - An optional {@link Logger}. Defaults to a new logger labeled '[vat worker client]'. + */ + constructor( + postMessage: (message: unknown) => void, + addListener: AddListener, + logger?: Logger, + ) { + this.#postMessage = postMessage; + this.#logger = logger ?? makeLogger('[vat worker client]'); + addListener(makeHandledCallback(this.#handleMessage.bind(this))); + } + + async #sendMessage( + method: + | typeof VatWorkerServiceMethod.Init + | typeof VatWorkerServiceMethod.Delete, + vatId: VatId, + ): Promise { + const message = { + id: this.#messageCounter(), + method, + vatId, + }; + const { promise, resolve, reject } = makePromiseKit(); + this.#unresolvedMessages.set(message.id, { + resolve: resolve as (value: unknown) => void, + reject, + }); + this.#postMessage(message); + return promise; + } + + async initWorker( + vatId: VatId, + ): Promise> { + return this.#sendMessage(VatWorkerServiceMethod.Init, vatId); + } + + async deleteWorker(vatId: VatId): Promise { + return this.#sendMessage(VatWorkerServiceMethod.Delete, vatId); + } + + async #handleMessage(event: MessageEvent): Promise { + if (!isVatWorkerServiceMessage(event.data)) { + // This happens when other messages pass through the same channel. + this.#logger.debug('Received unexpected message', event.data); + return; + } + + const { id, method, error } = event.data; + const port = event.ports.at(0); + + const promise = this.#unresolvedMessages.get(id); + + if (!promise) { + this.#logger.error('Received unexpected reply', event.data); + return; + } + + if (error) { + promise.reject(error); + return; + } + + switch (method) { + case VatWorkerServiceMethod.Init: + if (!port) { + this.#logger.error('Expected a port with message reply', event); + return; + } + promise.resolve( + new MessagePortDuplexStream( + port, + ), + ); + break; + case VatWorkerServiceMethod.Delete: + // If we were caching streams on the client this would be a good place + // to remove them. + promise.resolve(undefined); + break; + /* v8 ignore next 6: Not known to be possible. */ + default: + this.#logger.error( + 'Received message with unexpected method', + // @ts-expect-error Runtime does not respect "never". + method.valueOf(), + ); + } + } +} +harden(ExtensionVatWorkerClient); diff --git a/packages/extension/src/VatWorkerServer.test.ts b/packages/extension/src/VatWorkerServer.test.ts new file mode 100644 index 000000000..24fd14748 --- /dev/null +++ b/packages/extension/src/VatWorkerServer.test.ts @@ -0,0 +1,57 @@ +import '@ocap/shims/endoify'; +import { delay } from '@ocap/test-utils'; +import type { Logger } from '@ocap/utils'; +import { makeLogger } from '@ocap/utils'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import type { ExtensionVatWorkerServer } from './VatWorkerServer.js'; +import { makeTestServer } from '../test/vat-worker-service.js'; + +describe('VatWorker', () => { + let serverPort: MessagePort; + let clientPort: MessagePort; + + let logger: Logger; + + let server: ExtensionVatWorkerServer; + + // let vatPort: MessagePort; + let kernelPort: MessagePort; + + beforeEach(() => { + const serviceMessageChannel = new MessageChannel(); + serverPort = serviceMessageChannel.port1; + clientPort = serviceMessageChannel.port2; + + logger = makeLogger('[test server]'); + + const deliveredMessageChannel = new MessageChannel(); + // vatPort = deliveredMessageChannel.port1; + kernelPort = deliveredMessageChannel.port2; + + server = makeTestServer({ serverPort, logger, kernelPort }); + }); + + it('starts', () => { + server.start(); + expect(serverPort.onmessage).toBeDefined(); + }); + + it('throws if started twice', () => { + server.start(); + expect(() => server.start()).toThrow(/already running/u); + }); + + it('calls logger.debug when receiving an unexpected message', async () => { + const debugSpy = vi.spyOn(logger, 'debug'); + const unexpectedMessage = 'foobar'; + server.start(); + clientPort.postMessage(unexpectedMessage); + await delay(100); + expect(debugSpy).toHaveBeenCalledOnce(); + expect(debugSpy).toHaveBeenLastCalledWith( + 'Received unexpected message', + unexpectedMessage, + ); + }); +}); diff --git a/packages/extension/src/VatWorkerServer.ts b/packages/extension/src/VatWorkerServer.ts new file mode 100644 index 000000000..850196fc0 --- /dev/null +++ b/packages/extension/src/VatWorkerServer.ts @@ -0,0 +1,119 @@ +import type { VatId } from '@ocap/kernel'; +import type { Logger } from '@ocap/utils'; +import { makeHandledCallback, makeLogger } from '@ocap/utils'; + +import { + isVatWorkerServiceMessage, + VatWorkerServiceMethod, + type AddListener, + type PostMessage, + type VatWorker, +} from './vat-worker-service.js'; +// Appears in the docs. +// eslint-disable-next-line @typescript-eslint/no-unused-vars +import type { ExtensionVatWorkerClient } from './VatWorkerClient.js'; + +export class ExtensionVatWorkerServer { + readonly #logger; + + readonly #vatWorkers: Map = new Map(); + + readonly #postMessage: PostMessage; + + readonly #addListener: AddListener; + + readonly #makeWorker: (vatId: VatId) => VatWorker; + + #running = false; + + /** + * The server end of the vat worker service, intended to be constructed in + * the offscreen document. Listens for initWorker and deleteWorker requests + * from the client and uses the {@link VatWorker} methods to effect those + * requests. + * + * @see {@link ExtensionVatWorkerClient} for the other end of the service. + * + * @param postMessage - A method for posting a message to the client. + * @param addListener - A method for registering a listener for messages from the client. + * @param makeWorker - A method for making a {@link VatWorker}. + * @param logger - An optional {@link Logger}. Defaults to a new logger labeled '[vat worker server]'. + */ + constructor( + postMessage: PostMessage, + addListener: (listener: (event: MessageEvent) => void) => void, + makeWorker: (vatId: VatId) => VatWorker, + logger?: Logger, + ) { + this.#postMessage = postMessage; + this.#addListener = addListener; + this.#makeWorker = makeWorker; + this.#logger = logger ?? makeLogger('[vat worker server]'); + } + + start(): void { + if (this.#running) { + throw new Error('VatWorkerServer already running.'); + } + this.#addListener(makeHandledCallback(this.#handleMessage.bind(this))); + this.#running = true; + } + + async #handleMessage(event: MessageEvent): Promise { + if (!isVatWorkerServiceMessage(event.data)) { + // This happens when other messages pass through the same channel. + this.#logger.debug('Received unexpected message', event.data); + return; + } + + const { method, id, vatId } = event.data; + + const handleProblem = async (problem: Error): Promise => { + this.#logger.error( + `Error handling ${method} for vatId ${vatId}`, + problem, + ); + this.#postMessage({ method, id, vatId, error: problem }); + }; + + switch (method) { + case VatWorkerServiceMethod.Init: + await this.#initVatWorker(vatId) + .then((port) => this.#postMessage({ method, id, vatId }, [port])) + .catch(handleProblem); + break; + case VatWorkerServiceMethod.Delete: + await this.#deleteVatWorker(vatId) + .then(() => this.#postMessage({ method, id, vatId })) + .catch(handleProblem); + break; + /* v8 ignore next 6: Not known to be possible. */ + default: + this.#logger.error( + 'Received message with unexpected method', + // @ts-expect-error Runtime does not respect "never". + method.valueOf(), + ); + } + } + + async #initVatWorker(vatId: VatId): Promise { + if (this.#vatWorkers.has(vatId)) { + throw new Error(`Worker for vat ${vatId} already exists.`); + } + const vatWorker = this.#makeWorker(vatId); + const [port] = await vatWorker.init(); + this.#vatWorkers.set(vatId, vatWorker); + return port; + } + + async #deleteVatWorker(vatId: VatId): Promise { + const vatWorker = this.#vatWorkers.get(vatId); + if (!vatWorker) { + throw new Error(`Worker for vat ${vatId} does not exist.`); + } + await vatWorker.delete(); + this.#vatWorkers.delete(vatId); + } +} +harden(ExtensionVatWorkerServer); diff --git a/packages/extension/src/iframe-vat-worker.ts b/packages/extension/src/iframe-vat-worker.ts index 44c88f362..9d52be12e 100644 --- a/packages/extension/src/iframe-vat-worker.ts +++ b/packages/extension/src/iframe-vat-worker.ts @@ -1,12 +1,8 @@ import { createWindow } from '@metamask/snaps-utils'; -import type { - VatId, - VatWorker, - StreamEnvelopeReply, - StreamEnvelope, -} from '@ocap/kernel'; +import type { VatId } from '@ocap/kernel'; import type { initializeMessageChannel } from '@ocap/streams'; -import { MessagePortDuplexStream } from '@ocap/streams'; + +import type { VatWorker } from './vat-worker-service.js'; const IFRAME_URI = 'iframe.html'; @@ -23,12 +19,8 @@ export const makeIframeVatWorker = ( testId: 'ocap-iframe', }); const port = await getPort(newWindow); - const stream = new MessagePortDuplexStream< - StreamEnvelopeReply, - StreamEnvelope - >(port); - return [stream, newWindow]; + return [port, newWindow]; }, delete: async (): Promise => { const iframe = document.getElementById(vatHtmlId); diff --git a/packages/extension/src/offscreen.ts b/packages/extension/src/offscreen.ts index 3b1e79acc..990cb7cd4 100644 --- a/packages/extension/src/offscreen.ts +++ b/packages/extension/src/offscreen.ts @@ -14,6 +14,8 @@ import { import { stringify } from '@ocap/utils'; import { makeIframeVatWorker } from './iframe-vat-worker.js'; +import { ExtensionVatWorkerClient } from './VatWorkerClient.js'; +import { ExtensionVatWorkerServer } from './VatWorkerServer.js'; main().catch(console.error); @@ -27,11 +29,38 @@ async function main(): Promise { ChromeRuntimeTarget.Background, ); - const kernel = new Kernel(); - const iframeReadyP = kernel.launchVat({ - id: 'v0', - worker: makeIframeVatWorker('v0', initializeMessageChannel), - }); + const kernelWorker = makeKernelWorker(); + + // Setup mock VatWorker service. + + const { port1: serverPort, port2: clientPort } = new MessageChannel(); + + const vatWorkerServer = new ExtensionVatWorkerServer( + (message: unknown, transfer?: Transferable[]) => + transfer + ? serverPort.postMessage(message, transfer) + : serverPort.postMessage(message), + (listener) => { + serverPort.onmessage = listener; + }, + (vatId: VatId) => makeIframeVatWorker(vatId, initializeMessageChannel), + ); + + vatWorkerServer.start(); + + const vatWorkerClient = new ExtensionVatWorkerClient( + (message: unknown) => clientPort.postMessage(message), + (listener) => { + clientPort.onmessage = listener; + }, + ); + + // Create kernel. + + const kernel = new Kernel(vatWorkerClient); + const iframeReadyP = kernel.launchVat({ id: 'v0' }); + + // Setup glue. /** * Reply to a command from the background script. @@ -44,8 +73,6 @@ async function main(): Promise { await backgroundStream.write(commandReply); }; - const kernelWorker = makeKernelWorker(); - // Handle messages from the background service worker and the kernel SQLite worker. await Promise.all([ (async () => { diff --git a/packages/extension/src/vat-worker-service.test.ts b/packages/extension/src/vat-worker-service.test.ts new file mode 100644 index 000000000..96ae4fe1f --- /dev/null +++ b/packages/extension/src/vat-worker-service.test.ts @@ -0,0 +1,81 @@ +import '@ocap/shims/endoify'; +import type { VatId } from '@ocap/kernel'; +import { MessagePortDuplexStream } from '@ocap/streams'; +import type { MockInstance } from 'vitest'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import type { VatWorker } from './vat-worker-service.js'; +import type { ExtensionVatWorkerClient } from './VatWorkerClient.js'; +import type { ExtensionVatWorkerServer } from './VatWorkerServer.js'; +import { + getMockMakeWorker, + makeTestClient, + makeTestServer, +} from '../test/vat-worker-service.js'; + +describe('VatWorker', () => { + let serverPort: MessagePort; + let clientPort: MessagePort; + + let server: ExtensionVatWorkerServer; + let client: ExtensionVatWorkerClient; + + // let vatPort: MessagePort; + let kernelPort: MessagePort; + + let mockWorker: VatWorker; + + let mockMakeWorker: (vatId: VatId) => VatWorker; + let mockInitWorker: MockInstance; + let mockDeleteWorker: MockInstance; + + beforeEach(() => { + const serviceMessageChannel = new MessageChannel(); + serverPort = serviceMessageChannel.port1; + clientPort = serviceMessageChannel.port2; + + const deliveredMessageChannel = new MessageChannel(); + // vatPort = deliveredMessageChannel.port1; + kernelPort = deliveredMessageChannel.port2; + + [mockWorker, mockMakeWorker] = getMockMakeWorker(kernelPort); + + mockInitWorker = vi.spyOn(mockWorker, 'init'); + mockDeleteWorker = vi.spyOn(mockWorker, 'delete'); + }); + + // low key integration test + describe('Service', () => { + beforeEach(() => { + client = makeTestClient(clientPort); + server = makeTestServer({ serverPort, makeWorker: mockMakeWorker }); + server.start(); + }); + + it('initializes and deletes a worker', async () => { + const vatId: VatId = 'v0'; + const stream = await client.initWorker(vatId); + expect(stream).toBeInstanceOf(MessagePortDuplexStream); + expect(mockInitWorker).toHaveBeenCalledOnce(); + expect(mockDeleteWorker).not.toHaveBeenCalled(); + + await client.deleteWorker(vatId); + expect(mockInitWorker).toHaveBeenCalledOnce(); + expect(mockDeleteWorker).toHaveBeenCalledOnce(); + }); + + it('throws when deleting a nonexistent worker', async () => { + await expect(async () => await client.deleteWorker('v0')).rejects.toThrow( + /vat v0 does not exist/u, + ); + }); + + it('throws when initializing the same worker twice', async () => { + const vatId: VatId = 'v0'; + await client.initWorker(vatId); + await expect(async () => await client.initWorker(vatId)).rejects.toThrow( + /vat v0 already exists/u, + ); + }); + }); +}); diff --git a/packages/extension/src/vat-worker-service.ts b/packages/extension/src/vat-worker-service.ts new file mode 100644 index 000000000..6e5a6c329 --- /dev/null +++ b/packages/extension/src/vat-worker-service.ts @@ -0,0 +1,38 @@ +import { isObject } from '@metamask/utils'; +import type { VatId } from '@ocap/kernel'; + +export enum VatWorkerServiceMethod { + Init = 'iframe-vat-worker-init', + Delete = 'iframe-vat-worker-delete', +} + +type MessageId = number; + +export type VatWorker = { + init: () => Promise<[MessagePort, unknown]>; + delete: () => Promise; +}; + +export type VatWorkerServiceMessage = { + method: + | typeof VatWorkerServiceMethod.Init + | typeof VatWorkerServiceMethod.Delete; + id: MessageId; + vatId: VatId; + error?: Error; +}; + +export const isVatWorkerServiceMessage = ( + value: unknown, +): value is VatWorkerServiceMessage => + isObject(value) && + typeof value.id === 'number' && + Object.values(VatWorkerServiceMethod).includes( + value.method as VatWorkerServiceMethod, + ) && + typeof value.vatId === 'string'; + +export type PostMessage = (message: unknown, transfer?: Transferable[]) => void; +export type AddListener = ( + listener: (event: MessageEvent) => void, +) => void; diff --git a/packages/extension/test/vat-worker-service.ts b/packages/extension/test/vat-worker-service.ts new file mode 100644 index 000000000..434244957 --- /dev/null +++ b/packages/extension/test/vat-worker-service.ts @@ -0,0 +1,61 @@ +import type { VatId } from '@ocap/kernel'; +import type { Logger } from '@ocap/utils'; +import { vi } from 'vitest'; + +import type { VatWorker } from '../src/vat-worker-service.js'; +import { ExtensionVatWorkerClient } from '../src/VatWorkerClient.js'; +import { ExtensionVatWorkerServer } from '../src/VatWorkerServer.js'; + +type MakeVatWorker = (vatId: VatId) => VatWorker; + +export const getMockMakeWorker = ( + kernelPort: MessagePort, +): [VatWorker, MakeVatWorker] => { + const mockWorker = { + init: vi.fn().mockResolvedValue([kernelPort, {}]), + delete: vi.fn().mockResolvedValue(undefined), + }; + + return [mockWorker, vi.fn().mockReturnValue(mockWorker)]; +}; + +export const makeTestClient = ( + port: MessagePort, + logger?: Logger, +): ExtensionVatWorkerClient => + new ExtensionVatWorkerClient( + (message: unknown) => port.postMessage(message), + (listener) => { + port.onmessage = listener; + }, + logger, + ); + +type MakeTestServerArgs = { + serverPort: MessagePort; + logger?: Logger; +} & ( + | { + makeWorker: MakeVatWorker; + kernelPort?: never; + } + | { + makeWorker?: never; + kernelPort: MessagePort; + } +); + +export const makeTestServer = ( + args: MakeTestServerArgs, +): ExtensionVatWorkerServer => + new ExtensionVatWorkerServer( + (message: unknown, transfer?: Transferable[]) => + transfer + ? args.serverPort.postMessage(message, transfer) + : args.serverPort.postMessage(message), + (listener) => { + args.serverPort.onmessage = listener; + }, + args.makeWorker ?? getMockMakeWorker(args.kernelPort)[1], + args.logger, + ); diff --git a/packages/extension/tsconfig.json b/packages/extension/tsconfig.json index 903959299..f1b8d8945 100644 --- a/packages/extension/tsconfig.json +++ b/packages/extension/tsconfig.json @@ -18,6 +18,7 @@ "include": [ "./src/**/*.ts", "./src/**/*-trusted-prelude.js", - "./src/dev-console.js" + "./src/dev-console.js", + "./test/**/*.ts" ] } diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index dd94cf5ca..22f08d7bd 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -1,20 +1,35 @@ +import type { DuplexStream } from '@ocap/streams'; +import type { MockInstance } from 'vitest'; import { describe, it, expect, vi, beforeEach } from 'vitest'; import { Kernel } from './Kernel.js'; import type { VatCommand } from './messages.js'; -import type { VatId, VatWorker } from './types.js'; +import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; +import type { VatId } from './types.js'; import { Vat } from './Vat.js'; +import type { VatWorkerClient } from './VatWorkerClient.js'; describe('Kernel', () => { - let mockWorker: VatWorker; - let initMock: unknown; - let terminateMock: unknown; + let mockWorkerService: VatWorkerClient; + let mockGetWorkerStreams: MockInstance; + let mockDeleteWorker: MockInstance; + let initMock: MockInstance; + let terminateMock: MockInstance; beforeEach(() => { - mockWorker = { - init: vi.fn().mockResolvedValue([{}]), - delete: vi.fn(), - }; + mockWorkerService = { + initWorker: async () => ({}), + deleteWorker: async () => undefined, + } as unknown as VatWorkerClient; + + mockGetWorkerStreams = vi + .spyOn(mockWorkerService, 'initWorker') + .mockResolvedValue( + {} as DuplexStream, + ); + mockDeleteWorker = vi + .spyOn(mockWorkerService, 'deleteWorker') + .mockResolvedValue(undefined); initMock = vi.spyOn(Vat.prototype, 'init').mockImplementation(vi.fn()); terminateMock = vi @@ -24,41 +39,40 @@ describe('Kernel', () => { describe('getVatIds()', () => { it('returns an empty array when no vats are added', () => { - const kernel = new Kernel(); + const kernel = new Kernel(mockWorkerService); expect(kernel.getVatIds()).toStrictEqual([]); }); it('returns the vat IDs after adding a vat', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); }); it('returns multiple vat IDs after adding multiple vats', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); - await kernel.launchVat({ id: 'v1', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); + await kernel.launchVat({ id: 'v1' }); expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); }); }); describe('launchVat()', () => { it('adds a vat to the kernel without errors when no vat with the same ID exists', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); expect(initMock).toHaveBeenCalledOnce(); - expect(mockWorker.init).toHaveBeenCalled(); + expect(mockGetWorkerStreams).toHaveBeenCalled(); expect(kernel.getVatIds()).toStrictEqual(['v0']); }); it('throws an error when launching a vat that already exists in the kernel', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); await expect( kernel.launchVat({ id: 'v0', - worker: mockWorker, }), ).rejects.toThrow('Vat with ID v0 already exists.'); expect(kernel.getVatIds()).toStrictEqual(['v0']); @@ -67,17 +81,17 @@ describe('Kernel', () => { describe('deleteVat()', () => { it('deletes a vat from the kernel without errors when the vat exists', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); await kernel.deleteVat('v0'); expect(terminateMock).toHaveBeenCalledOnce(); - expect(mockWorker.delete).toHaveBeenCalledOnce(); + expect(mockDeleteWorker).toHaveBeenCalledOnce(); expect(kernel.getVatIds()).toStrictEqual([]); }); it('throws an error when deleting a vat that does not exist in the kernel', async () => { - const kernel = new Kernel(); + const kernel = new Kernel(mockWorkerService); const nonExistentVatId: VatId = 'v9'; await expect(async () => kernel.deleteVat(nonExistentVatId), @@ -86,8 +100,8 @@ describe('Kernel', () => { }); it('throws an error when a vat terminate method throws', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); vi.spyOn(Vat.prototype, 'terminate').mockRejectedValueOnce('Test error'); await expect(async () => kernel.deleteVat('v0')).rejects.toThrow( 'Test error', @@ -97,8 +111,8 @@ describe('Kernel', () => { describe('sendMessage()', () => { it('sends a message to the vat without errors when the vat exists', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); vi.spyOn(Vat.prototype, 'sendMessage').mockResolvedValueOnce('test'); expect( await kernel.sendMessage( @@ -109,7 +123,7 @@ describe('Kernel', () => { }); it('throws an error when sending a message to the vat that does not exist in the kernel', async () => { - const kernel = new Kernel(); + const kernel = new Kernel(mockWorkerService); const nonExistentVatId: VatId = 'v9'; await expect(async () => kernel.sendMessage(nonExistentVatId, {} as VatCommand['payload']), @@ -117,8 +131,8 @@ describe('Kernel', () => { }); it('throws an error when sending a message to the vat throws', async () => { - const kernel = new Kernel(); - await kernel.launchVat({ id: 'v0', worker: mockWorker }); + const kernel = new Kernel(mockWorkerService); + await kernel.launchVat({ id: 'v0' }); vi.spyOn(Vat.prototype, 'sendMessage').mockRejectedValueOnce('error'); await expect(async () => kernel.sendMessage('v0', {} as VatCommand['payload']), @@ -128,7 +142,7 @@ describe('Kernel', () => { describe('constructor()', () => { it('initializes the kernel without errors', () => { - expect(async () => new Kernel()).not.toThrow(); + expect(async () => new Kernel(mockWorkerService)).not.toThrow(); }); }); }); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 70a7982be..7fe14486c 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -1,13 +1,16 @@ import '@ocap/shims/endoify'; import type { VatCommand } from './messages.js'; -import type { VatId, VatWorker } from './types.js'; +import type { VatId, VatWorkerService } from './types.js'; import { Vat } from './Vat.js'; export class Kernel { - readonly #vats: Map; + readonly #vats: Map; - constructor() { + readonly #vatWorkerService: VatWorkerService; + + constructor(vatWorkerService: VatWorkerService) { this.#vats = new Map(); + this.#vatWorkerService = vatWorkerService; } /** @@ -24,22 +27,15 @@ export class Kernel { * * @param options - The options for launching the vat. * @param options.id - The ID of the vat. - * @param options.worker - The worker to use for the vat. * @returns A promise that resolves the vat. */ - async launchVat({ - id, - worker, - }: { - id: VatId; - worker: VatWorker; - }): Promise { + async launchVat({ id }: { id: VatId }): Promise { if (this.#vats.has(id)) { throw new Error(`Vat with ID ${id} already exists.`); } - const [stream] = await worker.init(); + const stream = await this.#vatWorkerService.initWorker(id); const vat = new Vat({ id, stream }); - this.#vats.set(vat.id, { vat, worker }); + this.#vats.set(vat.id, vat); await vat.init(); return vat; } @@ -50,10 +46,9 @@ export class Kernel { * @param id - The ID of the vat. */ async deleteVat(id: VatId): Promise { - const vatRecord = this.#getVatRecord(id); - const { vat, worker } = vatRecord; + const vat = this.#getVat(id); await vat.terminate(); - await worker.delete(); + await this.#vatWorkerService.deleteWorker(id).catch(console.error); this.#vats.delete(id); } @@ -68,21 +63,21 @@ export class Kernel { id: VatId, command: VatCommand['payload'], ): Promise { - const { vat } = this.#getVatRecord(id); + const vat = this.#getVat(id); return vat.sendMessage(command); } /** - * Gets a vat record from the kernel. + * Gets a vat from the kernel. * * @param id - The ID of the vat. - * @returns The vat record (vat and worker). + * @returns The vat. */ - #getVatRecord(id: VatId): { vat: Vat; worker: VatWorker } { - const vatRecord = this.#vats.get(id); - if (vatRecord === undefined) { + #getVat(id: VatId): Vat { + const vat = this.#vats.get(id); + if (vat === undefined) { throw new Error(`Vat with ID ${id} does not exist.`); } - return vatRecord; + return vat; } } diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts index 41bb133af..25b8a1c61 100644 --- a/packages/kernel/src/index.ts +++ b/packages/kernel/src/index.ts @@ -4,4 +4,4 @@ export { Vat } from './Vat.js'; export { Supervisor } from './Supervisor.js'; export type * from './messages.js'; export type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; -export type { VatId, VatWorker } from './types.js'; +export type { VatId, VatWorkerService } from './types.js'; diff --git a/packages/kernel/src/types.ts b/packages/kernel/src/types.ts index 8a8930c84..e25a8e894 100644 --- a/packages/kernel/src/types.ts +++ b/packages/kernel/src/types.ts @@ -1,7 +1,7 @@ import type { PromiseKit } from '@endo/promise-kit'; import type { DuplexStream } from '@ocap/streams'; -import type { StreamEnvelopeReply, StreamEnvelope } from './stream-envelope.js'; +import type { StreamEnvelope, StreamEnvelopeReply } from './stream-envelope.js'; export type VatId = `v${number}`; @@ -10,11 +10,14 @@ export const isVatId = (value: unknown): value is VatId => value.at(0) === 'v' && value.slice(1) === String(Number(value.slice(1))); -export type VatWorker = { - init: () => Promise< - [DuplexStream, unknown] - >; - delete: () => Promise; -}; +export type PromiseCallbacks = Omit< + PromiseKit, + 'promise' +>; -export type PromiseCallbacks = Omit, 'promise'>; +export type VatWorkerService = { + initWorker: ( + vatId: VatId, + ) => Promise>; + deleteWorker: (vatId: VatId) => Promise; +}; diff --git a/packages/utils/package.json b/packages/utils/package.json index 3ece22cca..de167709d 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -51,6 +51,7 @@ "@metamask/eslint-config": "^13.0.0", "@metamask/eslint-config-nodejs": "^13.0.0", "@metamask/eslint-config-typescript": "^13.0.0", + "@ocap/test-utils": "workspace:^", "@ts-bridge/cli": "^0.5.1", "@ts-bridge/shims": "^0.1.1", "@typescript-eslint/eslint-plugin": "^8.1.0", diff --git a/packages/utils/src/handled-callback.test.ts b/packages/utils/src/handled-callback.test.ts new file mode 100644 index 000000000..7cc3c4e33 --- /dev/null +++ b/packages/utils/src/handled-callback.test.ts @@ -0,0 +1,37 @@ +import { delay } from '@ocap/test-utils'; +import { vi, describe, it, expect } from 'vitest'; + +import { makeHandledCallback } from './handled-callback.js'; + +describe('makeHandledCallback', () => { + it('returns a function', () => { + const callback = makeHandledCallback(async () => Promise.resolve()); + expect(callback).toBeInstanceOf(Function); + }); + + it('calls the original callback', () => { + const originalCallback = vi.fn().mockResolvedValueOnce(undefined); + const callback = makeHandledCallback(originalCallback); + + // eslint-disable-next-line n/callback-return + callback(); + + expect(originalCallback).toHaveBeenCalledOnce(); + }); + + it('throws an error if the original callback throws an error', async () => { + const consoleErrorSpy = vi.spyOn(console, 'error'); + const error = new Error('test error'); + const originalCallback = vi.fn().mockRejectedValueOnce(error); + const callback = makeHandledCallback(originalCallback); + + // eslint-disable-next-line n/callback-return + callback(); + await delay(); + + expect(consoleErrorSpy).toHaveBeenCalledOnce(); + expect(consoleErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ message: error.message }), + ); + }); +}); diff --git a/packages/utils/src/handled-callback.ts b/packages/utils/src/handled-callback.ts new file mode 100644 index 000000000..a4da266d7 --- /dev/null +++ b/packages/utils/src/handled-callback.ts @@ -0,0 +1,14 @@ +/** + * Wrap an async callback to ensure any errors are at least logged. + * + * @param callback - The async callback to wrap. + * @returns The wrapped callback. + */ +export const makeHandledCallback = ( + callback: (...args: Args) => Promise, +) => { + return (...args: Args): void => { + // eslint-disable-next-line n/no-callback-literal, n/callback-return + callback(...args).catch(console.error); + }; +}; diff --git a/packages/utils/src/index.test.ts b/packages/utils/src/index.test.ts index ca15974fb..ff8ab9aa9 100644 --- a/packages/utils/src/index.test.ts +++ b/packages/utils/src/index.test.ts @@ -8,6 +8,7 @@ describe('index', () => { expect.arrayContaining([ 'makeCounter', 'makeLogger', + 'makeHandledCallback', 'stringify', 'isPrimitive', 'isTypedArray', diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index 32c8eaedb..82b3aaada 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -1,3 +1,4 @@ +export { makeHandledCallback } from './handled-callback.js'; export type { Logger } from './logger.js'; export { makeLogger } from './logger.js'; export { makeCounter } from './counter.js'; diff --git a/packages/utils/tsconfig.json b/packages/utils/tsconfig.json index 292f1d164..85b667185 100644 --- a/packages/utils/tsconfig.json +++ b/packages/utils/tsconfig.json @@ -5,6 +5,6 @@ "lib": ["DOM", "ES2022"], "types": ["ses", "vitest", "vitest/jsdom"] }, - "references": [], + "references": [{ "path": "../test-utils" }], "include": ["./src"] } diff --git a/yarn.lock b/yarn.lock index 9fe7e74ff..f2bae6e35 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1388,6 +1388,7 @@ __metadata: "@endo/eventual-send": "npm:^1.2.4" "@endo/exo": "npm:^1.5.2" "@endo/patterns": "npm:^1.4.2" + "@endo/promise-kit": "npm:^1.1.4" "@metamask/auto-changelog": "npm:^3.4.4" "@metamask/eslint-config": "npm:^13.0.0" "@metamask/eslint-config-nodejs": "npm:^13.0.0" @@ -1397,6 +1398,7 @@ __metadata: "@ocap/kernel": "workspace:^" "@ocap/shims": "workspace:^" "@ocap/streams": "workspace:^" + "@ocap/test-utils": "workspace:^" "@ocap/utils": "workspace:^" "@sqlite.org/sqlite-wasm": "npm:3.46.1-build3" "@types/chrome": "npm:^0.0.268" @@ -1615,6 +1617,7 @@ __metadata: "@metamask/eslint-config-nodejs": "npm:^13.0.0" "@metamask/eslint-config-typescript": "npm:^13.0.0" "@metamask/utils": "npm:^9.1.0" + "@ocap/test-utils": "workspace:^" "@ts-bridge/cli": "npm:^0.5.1" "@ts-bridge/shims": "npm:^0.1.1" "@typescript-eslint/eslint-plugin": "npm:^8.1.0"