Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions packages/kernel/src/Kernel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,56 @@ describe('Kernel', () => {
});

describe('restartVat()', () => {
// Disabling this test for now, as vat restart is not currently a thing
it.todo('restarts a vat', async () => {
it('preserves vat state across multiple restarts', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat(mockVatConfig);
await kernel.restartVat('v1');
expect(kernel.getVatIds()).toStrictEqual(['v1']);
await kernel.restartVat('v1');
expect(kernel.getVatIds()).toStrictEqual(['v1']);
expect(terminateMock).toHaveBeenCalledTimes(2);
expect(launchWorkerMock).toHaveBeenCalledTimes(3); // initial + 2 restarts
expect(launchWorkerMock).toHaveBeenLastCalledWith('v1', mockVatConfig);
});

it('restarts a vat', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat(mockVatConfig);
expect(kernel.getVatIds()).toStrictEqual(['v1']);
await kernel.restartVat('v1');
expect(terminateMock).toHaveBeenCalledOnce();
expect(terminateWorkerMock).toHaveBeenCalledOnce();
expect(launchWorkerMock).toHaveBeenCalledTimes(2);
expect(launchWorkerMock).toHaveBeenLastCalledWith('v1', mockVatConfig);
expect(kernel.getVatIds()).toStrictEqual(['v1']);
expect(initMock).toHaveBeenCalledTimes(2);
});

it('throws error when restarting non-existent vat', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await expect(kernel.restartVat('v999')).rejects.toThrow(VatNotFoundError);
expect(terminateMock).not.toHaveBeenCalled();
expect(launchWorkerMock).not.toHaveBeenCalled();
});

it('handles restart failure during termination', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat(mockVatConfig);
terminateMock.mockRejectedValueOnce(new Error('Termination failed'));
await expect(kernel.restartVat('v1')).rejects.toThrow(
'Termination failed',
);
expect(launchWorkerMock).toHaveBeenCalledTimes(1);
});

it('handles restart failure during launch', async () => {
const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore);
await kernel.launchVat(mockVatConfig);
launchWorkerMock.mockRejectedValueOnce(new Error('Launch failed'));
await expect(kernel.restartVat('v1')).rejects.toThrow('Launch failed');
expect(terminateMock).toHaveBeenCalledOnce();
expect(kernel.getVatIds()).toStrictEqual([]);
});
});

describe('sendMessage()', () => {
Expand Down
218 changes: 126 additions & 92 deletions packages/kernel/src/Kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import type {
ClusterConfig,
VatConfig,
} from './types.js';
import { VatStateService } from './vat-state-service.js';
import { Vat } from './Vat.js';

export class Kernel {
Expand All @@ -43,6 +44,8 @@ export class Kernel {

readonly #logger: Logger;

readonly #vatStateService: VatStateService;

constructor(
stream: DuplexStream<KernelCommand, KernelCommandReply>,
vatWorkerService: VatWorkerService,
Expand All @@ -54,6 +57,7 @@ export class Kernel {
this.#vatWorkerService = vatWorkerService;
this.#storage = makeKernelStore(rawStorage);
this.#logger = logger ?? makeLogger('[ocap kernel]');
this.#vatStateService = new VatStateService();
}

async init(): Promise<void> {
Expand All @@ -68,80 +72,6 @@ export class Kernel {
});
}

async #receiveMessages(): Promise<void> {
for await (const message of this.#stream) {
if (!isKernelCommand(message)) {
this.#logger.error('Received unexpected message', message);
continue;
}

const { method, params } = message;

let vat: Vat;

switch (method) {
case KernelCommandMethod.ping:
await this.#reply({ method, params: 'pong' });
break;
case KernelCommandMethod.evaluate:
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: await this.evaluate(vat.vatId, params),
});
break;
case KernelCommandMethod.capTpCall:
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: stringify(await vat.callCapTp(params)),
});
break;
case KernelCommandMethod.kvSet:
this.kvSet(params.key, params.value);
await this.#reply({
method,
params: `~~~ set "${params.key}" to "${params.value}" ~~~`,
});
break;
case KernelCommandMethod.kvGet: {
try {
const value = this.kvGet(params);
const result =
typeof value === 'string' ? `"${value}"` : `${value}`;
await this.#reply({
method,
params: `~~~ got ${result} ~~~`,
});
} catch (problem) {
// TODO: marshal
await this.#reply({
method,
params: String(toError(problem)),
});
}
break;
}
default:
console.error(
'kernel worker received unexpected command',
// @ts-expect-error Runtime does not respect "never".
{ method: method.valueOf(), params },
);
}
}
}

async #reply(message: KernelCommandReply): Promise<void> {
await this.#stream.write(message);
}

/**
* Evaluate a string in the default iframe.
*
Expand Down Expand Up @@ -192,17 +122,7 @@ export class Kernel {
if (this.#vats.has(vatId)) {
throw new VatAlreadyExistsError(vatId);
}
const multiplexer = await this.#vatWorkerService.launch(vatId, vatConfig);
multiplexer.start().catch((error) => this.#logger.error(error));
const commandStream = multiplexer.createChannel<
VatCommandReply,
VatCommand
>('command', isVatCommandReply);
const capTpStream = multiplexer.createChannel<Json, Json>('capTp');
const vat = new Vat({ vatId, vatConfig, commandStream, capTpStream });
this.#vats.set(vat.vatId, vat);
await vat.init();
return vat;
return this.#initVat(vatId, vatConfig);
}

/**
Expand All @@ -223,14 +143,18 @@ export class Kernel {
/**
* Restarts a vat.
*
* @param id - The ID of the vat.
* @param vatId - The ID of the vat.
* @returns A promise that resolves the restarted vat.
*/
async restartVat(id: VatId): Promise<void> {
await this.terminateVat(id);
// XXX TODO the following line has been hacked up to enable a successful
// build, but is entirely wrong. Restart expressed this way loses the original vat
// ID and configuration.
await this.launchVat({ sourceSpec: 'not-really-there.js' });
async restartVat(vatId: VatId): Promise<Vat> {
const state = this.#vatStateService.get(vatId);
if (!state) {
throw new VatNotFoundError(vatId);
}

await this.terminateVat(vatId);
const vat = await this.#initVat(vatId, state.config);
return vat;
}

/**
Expand Down Expand Up @@ -274,6 +198,116 @@ export class Kernel {
return vat.sendMessage(command);
}

// --------------------------------------------------------------------------
// Private methods
// --------------------------------------------------------------------------

/**
* Receives messages from the stream.
*/
async #receiveMessages(): Promise<void> {
for await (const message of this.#stream) {
if (!isKernelCommand(message)) {
this.#logger.error('Received unexpected message', message);
continue;
}

const { method, params } = message;

let vat: Vat;

switch (method) {
case KernelCommandMethod.ping:
await this.#reply({ method, params: 'pong' });
break;
case KernelCommandMethod.evaluate:
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: await this.evaluate(vat.vatId, params),
});
break;
case KernelCommandMethod.capTpCall:
if (!this.#vats.size) {
throw new Error('No vats available to call');
}
vat = this.#vats.values().next().value as Vat;
await this.#reply({
method,
params: stringify(await vat.callCapTp(params)),
});
break;
case KernelCommandMethod.kvSet:
this.kvSet(params.key, params.value);
await this.#reply({
method,
params: `~~~ set "${params.key}" to "${params.value}" ~~~`,
});
break;
case KernelCommandMethod.kvGet: {
try {
const value = this.kvGet(params);
const result =
typeof value === 'string' ? `"${value}"` : `${value}`;
await this.#reply({
method,
params: `~~~ got ${result} ~~~`,
});
} catch (problem) {
// TODO: marshal
await this.#reply({
method,
params: String(toError(problem)),
});
}
break;
}
default:
console.error(
'kernel worker received unexpected command',
// @ts-expect-error Runtime does not respect "never".
{ method: method.valueOf(), params },
);
}
}
}

/**
* Replies to a message.
*
* @param message - The message to reply to.
*/
async #reply(message: KernelCommandReply): Promise<void> {
await this.#stream.write(message);
}

/**
* Initializes a vat.
*
* @param vatId - The ID of the vat.
* @param vatConfig - The configuration of the vat.
* @returns A promise that resolves the vat.
*/
async #initVat(vatId: VatId, vatConfig: VatConfig): Promise<Vat> {
const multiplexer = await this.#vatWorkerService.launch(vatId, vatConfig);
multiplexer.start().catch((error) => this.#logger.error(error));
const commandStream = multiplexer.createChannel<
VatCommandReply,
VatCommand
>('command', isVatCommandReply);
const capTpStream = multiplexer.createChannel<Json, Json>('capTp');
const vat = new Vat({ vatId, vatConfig, commandStream, capTpStream });
this.#vats.set(vat.vatId, vat);
this.#vatStateService.set(vatId, {
config: vatConfig,
});
await vat.init();
return vat;
}

/**
* Gets a vat.
*
Expand Down
11 changes: 6 additions & 5 deletions packages/kernel/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type EndpointState<IdType> = {
kRefToERef: Map<KRef, ERef>;
};

type VatState = {
type KernelVatState = {
messagePort: typeof MessagePort;
state: EndpointState<VatId>;
source: string;
Expand All @@ -72,7 +72,7 @@ export type KernelPromise = {
};

export type KernelState = {
vats: Map<VatId, VatState>;
vats: Map<VatId, KernelVatState>;
remotes: Map<RemoteId, RemoteState>;
kernelPromises: Map<KRef, KernelPromise>;
};
Expand Down Expand Up @@ -176,9 +176,10 @@ export const VatConfigStruct = define<VatConfig>('VatConfig', (value) => {
return false;
}

const { sourceSpec, bundleSpec, bundleName, creationOptions, parameters } =
value as Record<string, unknown>;
const specOnly = { sourceSpec, bundleSpec, bundleName };
const { creationOptions, parameters, ...specOnly } = value as Record<
string,
unknown
>;

return (
is(specOnly, UserCodeSpecStruct) &&
Expand Down
Loading