Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/extension/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@endo/eventual-send": "^1.2.4",
"@endo/exo": "^1.5.2",
"@endo/patterns": "^1.4.2",
"@endo/promise-kit": "^1.1.4",
"@metamask/snaps-utils": "^8.3.0",
"@metamask/utils": "^9.1.0",
"@ocap/kernel": "workspace:^",
Expand All @@ -51,6 +52,7 @@
"@metamask/eslint-config": "^13.0.0",
"@metamask/eslint-config-nodejs": "^13.0.0",
"@metamask/eslint-config-typescript": "^13.0.0",
"@ocap/test-utils": "workspace:^",
"@types/chrome": "^0.0.268",
"@typescript-eslint/eslint-plugin": "^8.1.0",
"@typescript-eslint/parser": "^8.1.0",
Expand Down
82 changes: 82 additions & 0 deletions packages/extension/src/VatWorkerClient.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import '@ocap/shims/endoify';
import type { VatId } from '@ocap/kernel';
import { delay } from '@ocap/test-utils';
import type { Logger } from '@ocap/utils';
import { makeLogger } from '@ocap/utils';
import { describe, it, expect, beforeEach, vi } from 'vitest';

import { VatWorkerServiceMethod } from './vat-worker-service.js';
import type { ExtensionVatWorkerClient } from './VatWorkerClient.js';
import { makeTestClient } from '../test/vat-worker-service.js';

describe('ExtensionVatWorkerClient', () => {
let serverPort: MessagePort;
let clientPort: MessagePort;

let clientLogger: Logger;

let client: ExtensionVatWorkerClient;

beforeEach(() => {
const serviceMessageChannel = new MessageChannel();
serverPort = serviceMessageChannel.port1;
clientPort = serviceMessageChannel.port2;

clientLogger = makeLogger('[test client]');
client = makeTestClient(clientPort, clientLogger);
});

it('calls logger.debug when receiving an unexpected message', async () => {
const debugSpy = vi.spyOn(clientLogger, 'debug');
const unexpectedMessage = 'foobar';
serverPort.postMessage(unexpectedMessage);
await delay(100);
expect(debugSpy).toHaveBeenCalledOnce();
expect(debugSpy).toHaveBeenLastCalledWith(
'Received unexpected message',
unexpectedMessage,
);
});

it.each`
method
${VatWorkerServiceMethod.Init}
${VatWorkerServiceMethod.Delete}
`(
"calls logger.error when receiving a $method reply it wasn't waiting for",
async ({ method }) => {
const errorSpy = vi.spyOn(clientLogger, 'error');
const unexpectedReply = {
method,
id: 9,
vatId: 'v0',
};
serverPort.postMessage(unexpectedReply);
await delay(100);
expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy).toHaveBeenLastCalledWith(
'Received unexpected reply',
unexpectedReply,
);
},
);

it(`calls logger.error when receiving a ${VatWorkerServiceMethod.Init} reply without a port`, async () => {
const errorSpy = vi.spyOn(clientLogger, 'error');
const vatId: VatId = 'v0';
// eslint-disable-next-line @typescript-eslint/no-floating-promises
client.initWorker(vatId);
const reply = {
method: VatWorkerServiceMethod.Init,
id: 1,
vatId: 'v0',
};
serverPort.postMessage(reply);
await delay(100);
expect(errorSpy).toHaveBeenCalledOnce();
expect(errorSpy.mock.lastCall?.[0]).toBe(
'Expected a port with message reply',
);
expect(errorSpy.mock.lastCall?.[1]).toMatchObject({ data: reply });
});
});
135 changes: 135 additions & 0 deletions packages/extension/src/VatWorkerClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { makePromiseKit } from '@endo/promise-kit';
import type { PromiseKit } from '@endo/promise-kit';
import type {
StreamEnvelope,
StreamEnvelopeReply,
VatWorkerService,
VatId,
} from '@ocap/kernel';
import type { DuplexStream } from '@ocap/streams';
import { MessagePortDuplexStream } from '@ocap/streams';
import type { Logger } from '@ocap/utils';
import { makeCounter, makeHandledCallback, makeLogger } from '@ocap/utils';

import type { AddListener } from './vat-worker-service.js';
import {
isVatWorkerServiceMessage,
VatWorkerServiceMethod,
} from './vat-worker-service.js';
// Appears in the docs.
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import type { ExtensionVatWorkerServer } from './VatWorkerServer.js';

type PromiseCallbacks<Resolve = unknown> = Omit<PromiseKit<Resolve>, 'promise'>;

export class ExtensionVatWorkerClient implements VatWorkerService {
readonly #logger: Logger;

readonly #unresolvedMessages: Map<number, PromiseCallbacks> = new Map();

readonly #messageCounter = makeCounter();

readonly #postMessage: (message: unknown) => void;

/**
* The client end of the vat worker service, intended to be constructed in
* the kernel worker. Sends initWorker and deleteWorker requests to the
* server and wraps the initWorker response in a DuplexStream for consumption
* by the kernel.
*
* @see {@link ExtensionVatWorkerServer} for the other end of the service.
*
* @param postMessage - A method for posting a message to the server.
* @param addListener - A method for registering a listener for messages from the server.
* @param logger - An optional {@link Logger}. Defaults to a new logger labeled '[vat worker client]'.
*/
constructor(
postMessage: (message: unknown) => void,
addListener: AddListener,
logger?: Logger,
) {
this.#postMessage = postMessage;
this.#logger = logger ?? makeLogger('[vat worker client]');
addListener(makeHandledCallback(this.#handleMessage.bind(this)));
}

async #sendMessage<Return>(
method:
| typeof VatWorkerServiceMethod.Init
| typeof VatWorkerServiceMethod.Delete,
vatId: VatId,
): Promise<Return> {
const message = {
id: this.#messageCounter(),
method,
vatId,
};
const { promise, resolve, reject } = makePromiseKit<Return>();
this.#unresolvedMessages.set(message.id, {
resolve: resolve as (value: unknown) => void,
reject,
});
this.#postMessage(message);
return promise;
}

async initWorker(
vatId: VatId,
): Promise<DuplexStream<StreamEnvelopeReply, StreamEnvelope>> {
return this.#sendMessage(VatWorkerServiceMethod.Init, vatId);
}

async deleteWorker(vatId: VatId): Promise<undefined> {
return this.#sendMessage(VatWorkerServiceMethod.Delete, vatId);
}

async #handleMessage(event: MessageEvent<unknown>): Promise<void> {
if (!isVatWorkerServiceMessage(event.data)) {
// This happens when other messages pass through the same channel.
this.#logger.debug('Received unexpected message', event.data);
return;
}

const { id, method, error } = event.data;
const port = event.ports.at(0);

const promise = this.#unresolvedMessages.get(id);

if (!promise) {
this.#logger.error('Received unexpected reply', event.data);
return;
}

if (error) {
promise.reject(error);
return;
}

switch (method) {
case VatWorkerServiceMethod.Init:
if (!port) {
this.#logger.error('Expected a port with message reply', event);
return;
}
promise.resolve(
new MessagePortDuplexStream<StreamEnvelope, StreamEnvelopeReply>(
port,
),
);
break;
case VatWorkerServiceMethod.Delete:
// If we were caching streams on the client this would be a good place
// to remove them.
promise.resolve(undefined);
break;
/* v8 ignore next 6: Not known to be possible. */
default:
this.#logger.error(
'Received message with unexpected method',
// @ts-expect-error Runtime does not respect "never".
method.valueOf(),
);
}
}
}
harden(ExtensionVatWorkerClient);
57 changes: 57 additions & 0 deletions packages/extension/src/VatWorkerServer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import '@ocap/shims/endoify';
import { delay } from '@ocap/test-utils';
import type { Logger } from '@ocap/utils';
import { makeLogger } from '@ocap/utils';
import { describe, it, expect, beforeEach, vi } from 'vitest';

import type { ExtensionVatWorkerServer } from './VatWorkerServer.js';
import { makeTestServer } from '../test/vat-worker-service.js';

describe('VatWorker', () => {
let serverPort: MessagePort;
let clientPort: MessagePort;

let logger: Logger;

let server: ExtensionVatWorkerServer;

// let vatPort: MessagePort;
let kernelPort: MessagePort;

beforeEach(() => {
const serviceMessageChannel = new MessageChannel();
serverPort = serviceMessageChannel.port1;
clientPort = serviceMessageChannel.port2;

logger = makeLogger('[test server]');

const deliveredMessageChannel = new MessageChannel();
// vatPort = deliveredMessageChannel.port1;
kernelPort = deliveredMessageChannel.port2;

server = makeTestServer({ serverPort, logger, kernelPort });
});

it('starts', () => {
server.start();
expect(serverPort.onmessage).toBeDefined();
});

it('throws if started twice', () => {
server.start();
expect(() => server.start()).toThrow(/already running/u);
});

it('calls logger.debug when receiving an unexpected message', async () => {
const debugSpy = vi.spyOn(logger, 'debug');
const unexpectedMessage = 'foobar';
server.start();
clientPort.postMessage(unexpectedMessage);
await delay(100);
expect(debugSpy).toHaveBeenCalledOnce();
expect(debugSpy).toHaveBeenLastCalledWith(
'Received unexpected message',
unexpectedMessage,
);
});
});
Loading