Skip to content
16 changes: 14 additions & 2 deletions packages/errors/src/errors/StreamReadError.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ describe('StreamReadError', () => {
expect(error.cause).toBe(mockOriginalError);
});

it('creates a StreamReadError for Kernel with the correct properties', () => {
const error = new StreamReadError(
{ kernelId: 'kernel' },
{ cause: mockOriginalError },
);
expect(error).toBeInstanceOf(StreamReadError);
expect(error.code).toBe(ErrorCode.StreamReadError);
expect(error.message).toBe('Unexpected stream read error.');
expect(error.data).toStrictEqual({ kernelId: 'kernel' });
expect(error.cause).toBe(mockOriginalError);
});

it('unmarshals a valid marshaled StreamReadError for Vat', () => {
const data = { vatId: mockVatId };
const marshaledError: MarshaledOcapError = {
Expand Down Expand Up @@ -106,7 +118,7 @@ describe('StreamReadError', () => {
expect(() =>
StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions),
).toThrow(
'At path: data -- Expected the value to satisfy a union of `object | object`, but received: "invalid data"',
'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: "invalid data"',
);
});

Expand All @@ -127,7 +139,7 @@ describe('StreamReadError', () => {
expect(() =>
StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions),
).toThrow(
'At path: data -- Expected the value to satisfy a union of `object | object`, but received: [object Object]',
'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: [object Object]',
);
});
});
22 changes: 19 additions & 3 deletions packages/errors/src/errors/StreamReadError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import {
} from '../constants.js';
import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.js';

type StreamReadErrorData = { vatId: string } | { supervisorId: string };
type StreamReadErrorData =
| { vatId: string }
| { supervisorId: string }
| { kernelId: string };
type StreamReadErrorOptions = Required<ErrorOptions> &
Pick<ErrorOptionsWithStack, 'stack'>;

Expand All @@ -36,8 +39,21 @@ export class StreamReadError extends BaseError {
...marshaledErrorSchema,
code: literal(ErrorCode.StreamReadError),
data: union([
object({ vatId: string(), supervisorId: optional(never()) }),
object({ supervisorId: string(), vatId: optional(never()) }),
object({
vatId: string(),
supervisorId: optional(never()),
kernelId: optional(never()),
}),
object({
supervisorId: string(),
vatId: optional(never()),
kernelId: optional(never()),
}),
object({
kernelId: string(),
vatId: optional(never()),
supervisorId: optional(never()),
}),
]),
cause: MarshaledErrorStruct,
});
Expand Down
59 changes: 51 additions & 8 deletions packages/extension/src/kernel-worker.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import './kernel-worker-trusted-prelude.js';
import type { NonEmptyArray } from '@metamask/utils';
import type { KernelCommand, KernelCommandReply, VatId } from '@ocap/kernel';
import { Kernel } from '@ocap/kernel';
import { Kernel, VatCommandMethod } from '@ocap/kernel';
import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams';

import { makeSQLKVStore } from './sqlite-kv-store.js';
import { ExtensionVatWorkerClient } from './VatWorkerClient.js';

main('v0').catch(console.error);
main().catch(console.error);

/**
* The main function for the kernel worker.
*
* @param defaultVatId - The id to give the default vat.
*/
async function main(defaultVatId: VatId): Promise<void> {
async function main(): Promise<void> {
const kernelStream = await receiveMessagePort(
(listener) => globalThis.addEventListener('message', listener),
(listener) => globalThis.removeEventListener('message', listener),
Expand All @@ -27,11 +26,55 @@ async function main(defaultVatId: VatId): Promise<void> {
);

// Initialize kernel store.

const kvStore = await makeSQLKVStore();

// Create and start kernel.

const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore);
await kernel.init({ defaultVatId });
await kernel.init();

// Handle the lifecycle of multiple vats.
await runVatLifecycle(kernel, ['v1', 'v2', 'v3']);

// Add default vat.
await kernel.launchVat({ id: 'v0' });
}

/**
* Runs the full lifecycle of an array of vats, including their creation,
* restart, message passing, and termination.
*
* @param kernel The kernel instance.
* @param vats An array of VatIds to be managed.
*/
async function runVatLifecycle(
kernel: Kernel,
vats: NonEmptyArray<VatId>,
): Promise<void> {
console.time(`Created vats: ${vats.join(', ')}`);
await Promise.all(vats.map(async (id) => kernel.launchVat({ id })));
console.timeEnd(`Created vats: ${vats.join(', ')}`);

console.log('Kernel vats:', kernel.getVatIds().join(', '));

// Restart a randomly selected vat from the array.
const vatToRestart = vats[Math.floor(Math.random() * vats.length)] as VatId;
console.time(`Vat "${vatToRestart}" restart`);
await kernel.restartVat(vatToRestart);
console.timeEnd(`Vat "${vatToRestart}" restart`);

// Send a "Ping" message to a randomly selected vat.
const vatToPing = vats[Math.floor(Math.random() * vats.length)] as VatId;
console.time(`Ping Vat "${vatToPing}"`);
await kernel.sendMessage(vatToPing, {
method: VatCommandMethod.Ping,
params: null,
});
console.timeEnd(`Ping Vat "${vatToPing}"`);

const vatIds = kernel.getVatIds().join(', ');
console.time(`Terminated vats: ${vatIds}`);
await kernel.terminateAllVats();
console.timeEnd(`Terminated vats: ${vatIds}`);

console.log(`Kernel has ${kernel.getVatIds().length} vats`);
}
49 changes: 45 additions & 4 deletions packages/kernel/src/Kernel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ describe('Kernel', () => {
mockWorkerService = {
launch: async () => ({}),
terminate: async () => undefined,
terminateAll: async () => undefined,
} as unknown as VatWorkerService;

launchWorkerMock = vi
Expand Down Expand Up @@ -88,6 +89,15 @@ describe('Kernel', () => {
expect(kernel.getVatIds()).toStrictEqual(['v0']);
});

it('adds multiple vats to the kernel without errors when no vat with the same ID exists', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
await kernel.launchVat({ id: 'v1' });
expect(initMock).toHaveBeenCalledTimes(2);
expect(launchWorkerMock).toHaveBeenCalledTimes(2);
expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']);
});

it('throws an error when launching a vat that already exists in the kernel', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
Expand All @@ -101,12 +111,12 @@ describe('Kernel', () => {
});
});

describe('deleteVat()', () => {
describe('terminateVat()', () => {
it('deletes a vat from the kernel without errors when the vat exists', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
expect(kernel.getVatIds()).toStrictEqual(['v0']);
await kernel.deleteVat('v0');
await kernel.terminateVat('v0');
expect(terminateMock).toHaveBeenCalledOnce();
expect(terminateWorkerMock).toHaveBeenCalledOnce();
expect(kernel.getVatIds()).toStrictEqual([]);
Expand All @@ -116,7 +126,7 @@ describe('Kernel', () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
const nonExistentVatId: VatId = 'v9';
await expect(async () =>
kernel.deleteVat(nonExistentVatId),
kernel.terminateVat(nonExistentVatId),
).rejects.toThrow(VatNotFoundError);
expect(terminateMock).not.toHaveBeenCalled();
});
Expand All @@ -125,12 +135,41 @@ describe('Kernel', () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
vi.spyOn(Vat.prototype, 'terminate').mockRejectedValueOnce('Test error');
await expect(async () => kernel.deleteVat('v0')).rejects.toThrow(
await expect(async () => kernel.terminateVat('v0')).rejects.toThrow(
'Test error',
);
});
});

describe('terminateAllVats()', () => {
it('deletes all vats from the kernel without errors', async () => {
const workerTerminateAllMock = vi
.spyOn(mockWorkerService, 'terminateAll')
.mockResolvedValue(undefined);
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
await kernel.launchVat({ id: 'v1' });
expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']);
await kernel.terminateAllVats();
expect(terminateMock).toHaveBeenCalledTimes(2);
expect(workerTerminateAllMock).toHaveBeenCalledOnce();
expect(kernel.getVatIds()).toStrictEqual([]);
});
});

describe('restartVat()', () => {
it('restarts a vat', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat({ id: 'v0' });
expect(kernel.getVatIds()).toStrictEqual(['v0']);
await kernel.restartVat('v0');
expect(terminateMock).toHaveBeenCalledOnce();
expect(terminateWorkerMock).toHaveBeenCalledOnce();
expect(kernel.getVatIds()).toStrictEqual(['v0']);
expect(initMock).toHaveBeenCalledTimes(2);
});
});

describe('sendMessage()', () => {
it('sends a message to the vat without errors when the vat exists', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
Expand Down Expand Up @@ -174,5 +213,7 @@ describe('Kernel', () => {
it.todo('initializes the kernel store');

it.todo('starts receiving messages');

it.todo('throws if the stream throws');
});
});
73 changes: 53 additions & 20 deletions packages/kernel/src/Kernel.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import '@ocap/shims/endoify';
import type { PromiseKit } from '@endo/promise-kit';
import { makePromiseKit } from '@endo/promise-kit';
import { VatAlreadyExistsError, VatNotFoundError, toError } from '@ocap/errors';
import {
StreamReadError,
VatAlreadyExistsError,
VatNotFoundError,
toError,
} from '@ocap/errors';
import type { DuplexStream } from '@ocap/streams';
import type { Logger } from '@ocap/utils';
import { makeLogger, stringify } from '@ocap/utils';
Expand Down Expand Up @@ -29,9 +32,6 @@ export class Kernel {

readonly #storage: KVStore;

// Hopefully removed when we get to n+1 vats.
readonly #defaultVatKit: PromiseKit<Vat>;

readonly #logger: Logger;

constructor(
Expand All @@ -44,16 +44,19 @@ export class Kernel {
this.#vats = new Map();
this.#vatWorkerService = vatWorkerService;
this.#storage = storage;
this.#defaultVatKit = makePromiseKit<Vat>();
this.#logger = logger ?? makeLogger('[ocap kernel]');
}

async init({ defaultVatId }: { defaultVatId: VatId }): Promise<void> {
await this.launchVat({ id: defaultVatId })
.then(this.#defaultVatKit.resolve)
.catch(this.#defaultVatKit.reject);

return this.#receiveMessages();
async init(): Promise<void> {
this.#receiveMessages().catch((error) => {
this.#logger.error('Stream read error occurred:', error);
// Errors thrown here will not be surfaced in the usual synchronous manner
// because #receiveMessages() is awaited within the constructor.
// Any error thrown inside the async loop is 'caught' within this constructor
// call stack but will be displayed as 'Uncaught (in promise)'
// since they occur after the constructor has returned.
throw new StreamReadError({ kernelId: 'kernel' }, error);
});
}

async #receiveMessages(): Promise<void> {
Expand All @@ -72,14 +75,20 @@ export class Kernel {
await this.#reply({ method, params: 'pong' });
break;
case KernelCommandMethod.Evaluate:
vat = await this.#defaultVatKit.promise;
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: await this.evaluate(vat.id, params),
});
break;
case KernelCommandMethod.CapTpCall:
vat = await this.#defaultVatKit.promise;
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: stringify(await vat.callCapTp(params)),
Expand Down Expand Up @@ -155,7 +164,7 @@ export class Kernel {
}

/**
* Gets the vat IDs in the kernel.
* Gets the vat IDs.
*
* @returns An array of vat IDs.
*/
Expand All @@ -164,7 +173,7 @@ export class Kernel {
}

/**
* Launches a vat in the kernel.
* Launches a vat.
*
* @param options - The options for launching the vat.
* @param options.id - The ID of the vat.
Expand All @@ -182,17 +191,41 @@ export class Kernel {
}

/**
* Deletes a vat from the kernel.
* Restarts a vat.
*
* @param id - The ID of the vat.
*/
async deleteVat(id: VatId): Promise<void> {
async restartVat(id: VatId): Promise<void> {
await this.terminateVat(id);
await this.launchVat({ id });
}

/**
* Terminate a vat.
*
* @param id - The ID of the vat.
*/
async terminateVat(id: VatId): Promise<void> {
const vat = this.#getVat(id);
await vat.terminate();
await this.#vatWorkerService.terminate(id).catch(console.error);
this.#vats.delete(id);
}

/**
* Terminate all vats.
*/
async terminateAllVats(): Promise<void> {
await Promise.all(
this.getVatIds().map(async (id) => {
const vat = this.#getVat(id);
await vat.terminate();
this.#vats.delete(id);
}),
);
await this.#vatWorkerService.terminateAll();
}

/**
* Send a message to a vat.
*
Expand All @@ -209,7 +242,7 @@ export class Kernel {
}

/**
* Gets a vat from the kernel.
* Gets a vat.
*
* @param id - The ID of the vat.
* @returns The vat.
Expand Down