From f38a5df3b25de0769c8960ccba60f485cfbd6aea Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Tue, 22 Oct 2024 19:54:09 +0200 Subject: [PATCH 01/12] Load reload and restart many vats --- packages/extension/src/kernel-worker.ts | 39 ++++++++++++++++++++----- packages/kernel/src/Kernel.ts | 10 +++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index 96a921f97..bf3022654 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -1,19 +1,17 @@ import './kernel-worker-trusted-prelude.js'; -import type { KernelCommand, KernelCommandReply, VatId } from '@ocap/kernel'; -import { Kernel } from '@ocap/kernel'; +import type { KernelCommand, KernelCommandReply } from '@ocap/kernel'; +import { Kernel, VatCommandMethod } from '@ocap/kernel'; import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams'; import { makeSQLKVStore } from './sqlite-kv-store.js'; import { ExtensionVatWorkerClient } from './VatWorkerClient.js'; -main('v0').catch(console.error); +main().catch(console.error); /** * The main function for the kernel worker. - * - * @param defaultVatId - The id to give the default vat. */ -async function main(defaultVatId: VatId): Promise { +async function main(): Promise { const kernelStream = await receiveMessagePort( (listener) => globalThis.addEventListener('message', listener), (listener) => globalThis.removeEventListener('message', listener), @@ -33,5 +31,32 @@ async function main(defaultVatId: VatId): Promise { // Create and start kernel. const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); - await kernel.init({ defaultVatId }); + await kernel.init({ defaultVatId: 'v0' }); + + console.log('Kernel started'); + + await kernel.launchVat({ id: 'v1' }); + await kernel.launchVat({ id: 'v2' }); + await kernel.launchVat({ id: 'v3' }); + console.log('Kernel vats:', kernel.getVatIds()); + + await kernel.restartVat('v2'); + console.log('Vat v2 restarted'); + + console.log('Kernel vats:', kernel.getVatIds()); + + await kernel.sendMessage('v1', { + method: VatCommandMethod.Ping, + params: null, + }); + + await kernel.deleteVat('v1'); + console.log('Vat v1 deleted'); + + console.log('Kernel vats:', kernel.getVatIds()); + + await kernel.sendMessage('v2', { + method: VatCommandMethod.Ping, + params: null, + }); } diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 784bd53ba..9c8f6ec8e 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -193,6 +193,16 @@ export class Kernel { this.#vats.delete(id); } + /** + * Restarts a vat in the kernel. + * + * @param id - The ID of the vat. + */ + async restartVat(id: VatId): Promise { + await this.deleteVat(id); + await this.launchVat({ id }); + } + /** * Send a message to a vat. * From bac8a363a04cd6e103ee6d5f432313aa95f35328 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 23 Oct 2024 15:38:20 +0200 Subject: [PATCH 02/12] Load, reload and terminate multiple vats --- .../errors/src/errors/StreamReadError.test.ts | 12 +++++++ packages/errors/src/errors/StreamReadError.ts | 22 ++++++++++-- packages/extension/src/kernel-worker.ts | 34 +++++++++---------- packages/kernel/src/Kernel.test.ts | 32 ++++++++++++++--- packages/kernel/src/Kernel.ts | 31 ++++++++++------- packages/kernel/src/Vat.ts | 5 ++- 6 files changed, 99 insertions(+), 37 deletions(-) diff --git a/packages/errors/src/errors/StreamReadError.test.ts b/packages/errors/src/errors/StreamReadError.test.ts index fd013fc5f..954c8aab6 100644 --- a/packages/errors/src/errors/StreamReadError.test.ts +++ b/packages/errors/src/errors/StreamReadError.test.ts @@ -34,6 +34,18 @@ describe('StreamReadError', () => { expect(error.cause).toBe(mockOriginalError); }); + it('creates a StreamReadError for Kernel with the correct properties', () => { + const error = new StreamReadError( + { kernelId: 'kernel' }, + { cause: mockOriginalError }, + ); + expect(error).toBeInstanceOf(StreamReadError); + expect(error.code).toBe(ErrorCode.StreamReadError); + expect(error.message).toBe('Unexpected stream read error.'); + expect(error.data).toStrictEqual({ kernelId: 'kernel' }); + expect(error.cause).toBe(mockOriginalError); + }); + it('unmarshals a valid marshaled StreamReadError for Vat', () => { const data = { vatId: mockVatId }; const marshaledError: MarshaledOcapError = { diff --git a/packages/errors/src/errors/StreamReadError.ts b/packages/errors/src/errors/StreamReadError.ts index ec81a52b0..b43c3202a 100644 --- a/packages/errors/src/errors/StreamReadError.ts +++ b/packages/errors/src/errors/StreamReadError.ts @@ -16,7 +16,10 @@ import { } from '../constants.js'; import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.js'; -type StreamReadErrorData = { vatId: string } | { supervisorId: string }; +type StreamReadErrorData = + | { vatId: string } + | { supervisorId: string } + | { kernelId: string }; type StreamReadErrorOptions = Required & Pick; @@ -36,8 +39,21 @@ export class StreamReadError extends BaseError { ...marshaledErrorSchema, code: literal(ErrorCode.StreamReadError), data: union([ - object({ vatId: string(), supervisorId: optional(never()) }), - object({ supervisorId: string(), vatId: optional(never()) }), + object({ + vatId: string(), + supervisorId: optional(never()), + kernelId: optional(never()), + }), + object({ + supervisorId: string(), + vatId: optional(never()), + kernelId: optional(never()), + }), + object({ + kernelId: string(), + vatId: optional(never()), + supervisorId: optional(never()), + }), ]), cause: MarshaledErrorStruct, }); diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index bf3022654..f51a42186 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -1,5 +1,5 @@ import './kernel-worker-trusted-prelude.js'; -import type { KernelCommand, KernelCommandReply } from '@ocap/kernel'; +import type { KernelCommand, KernelCommandReply, VatId } from '@ocap/kernel'; import { Kernel, VatCommandMethod } from '@ocap/kernel'; import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams'; @@ -33,30 +33,30 @@ async function main(): Promise { const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); await kernel.init({ defaultVatId: 'v0' }); - console.log('Kernel started'); + const vats: VatId[] = ['v1', 'v2', 'v3']; - await kernel.launchVat({ id: 'v1' }); - await kernel.launchVat({ id: 'v2' }); - await kernel.launchVat({ id: 'v3' }); - console.log('Kernel vats:', kernel.getVatIds()); + console.time(`Created vats: ${vats.join(', ')}`); + await Promise.all(vats.map(async (id) => kernel.launchVat({ id }))); + console.timeEnd(`Created vats: ${vats.join(', ')}`); - await kernel.restartVat('v2'); - console.log('Vat v2 restarted'); + console.log('Kernel vats:', kernel.getVatIds().join(', ')); - console.log('Kernel vats:', kernel.getVatIds()); + console.time('Vat "v2" restart'); + await kernel.restartVat('v2'); + console.timeEnd('Vat "v2" restart'); + console.time('Ping Vat "v1"'); await kernel.sendMessage('v1', { method: VatCommandMethod.Ping, params: null, }); + console.timeEnd('Ping Vat "v1"'); - await kernel.deleteVat('v1'); - console.log('Vat v1 deleted'); - - console.log('Kernel vats:', kernel.getVatIds()); + console.time(`Terminated vats: ${vats.join(', ')}`); + for (const vatId of vats) { + await kernel.terminateVat(vatId); + } + console.timeEnd(`Terminated vats: ${vats.join(', ')}`); - await kernel.sendMessage('v2', { - method: VatCommandMethod.Ping, - params: null, - }); + console.log('Kernel vats:', kernel.getVatIds().join(', ')); } diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 0ae90b0bc..68ce13a59 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -88,6 +88,15 @@ describe('Kernel', () => { expect(kernel.getVatIds()).toStrictEqual(['v0']); }); + it('adds multiple vats to the kernel without errors when no vat with the same ID exists', async () => { + const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + await kernel.launchVat({ id: 'v0' }); + await kernel.launchVat({ id: 'v1' }); + expect(initMock).toHaveBeenCalledTimes(2); + expect(mockGetWorkerStreams).toHaveBeenCalledTimes(2); + expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); + }); + it('throws an error when launching a vat that already exists in the kernel', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); @@ -101,12 +110,12 @@ describe('Kernel', () => { }); }); - describe('deleteVat()', () => { + describe('terminateVat()', () => { it('deletes a vat from the kernel without errors when the vat exists', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); - await kernel.deleteVat('v0'); + await kernel.terminateVat('v0'); expect(terminateMock).toHaveBeenCalledOnce(); expect(terminateWorkerMock).toHaveBeenCalledOnce(); expect(kernel.getVatIds()).toStrictEqual([]); @@ -116,7 +125,7 @@ describe('Kernel', () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); const nonExistentVatId: VatId = 'v9'; await expect(async () => - kernel.deleteVat(nonExistentVatId), + kernel.terminateVat(nonExistentVatId), ).rejects.toThrow(VatNotFoundError); expect(terminateMock).not.toHaveBeenCalled(); }); @@ -125,12 +134,25 @@ describe('Kernel', () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); vi.spyOn(Vat.prototype, 'terminate').mockRejectedValueOnce('Test error'); - await expect(async () => kernel.deleteVat('v0')).rejects.toThrow( + await expect(async () => kernel.terminateVat('v0')).rejects.toThrow( 'Test error', ); }); }); + describe('restartVat()', () => { + it('restarts a vat', async () => { + const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + await kernel.launchVat({ id: 'v0' }); + expect(kernel.getVatIds()).toStrictEqual(['v0']); + await kernel.restartVat('v0'); + expect(terminateMock).toHaveBeenCalledOnce(); + expect(mockDeleteWorker).toHaveBeenCalledOnce(); + expect(kernel.getVatIds()).toStrictEqual(['v0']); + expect(initMock).toHaveBeenCalledTimes(2); + }); + }); + describe('sendMessage()', () => { it('sends a message to the vat without errors when the vat exists', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); @@ -174,5 +196,7 @@ describe('Kernel', () => { it.todo('initializes the kernel store'); it.todo('starts receiving messages'); + + it.todo('throws an error if the stream is invalid'); }); }); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 9c8f6ec8e..823d06154 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -1,7 +1,12 @@ import '@ocap/shims/endoify'; import type { PromiseKit } from '@endo/promise-kit'; import { makePromiseKit } from '@endo/promise-kit'; -import { VatAlreadyExistsError, VatNotFoundError, toError } from '@ocap/errors'; +import { + StreamReadError, + VatAlreadyExistsError, + VatNotFoundError, + toError, +} from '@ocap/errors'; import type { DuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, stringify } from '@ocap/utils'; @@ -53,7 +58,9 @@ export class Kernel { .then(this.#defaultVatKit.resolve) .catch(this.#defaultVatKit.reject); - return this.#receiveMessages(); + this.#receiveMessages().catch((error) => { + throw new StreamReadError({ kernelId: 'kernel' }, error); + }); } async #receiveMessages(): Promise { @@ -182,25 +189,25 @@ export class Kernel { } /** - * Deletes a vat from the kernel. + * Restarts a vat in the kernel. * * @param id - The ID of the vat. */ - async deleteVat(id: VatId): Promise { - const vat = this.#getVat(id); - await vat.terminate(); - await this.#vatWorkerService.terminate(id).catch(console.error); - this.#vats.delete(id); + async restartVat(id: VatId): Promise { + await this.terminateVat(id); + await this.launchVat({ id }); } /** - * Restarts a vat in the kernel. + * Terminate a vat from the kernel. * * @param id - The ID of the vat. */ - async restartVat(id: VatId): Promise { - await this.deleteVat(id); - await this.launchVat({ id }); + async terminateVat(id: VatId): Promise { + const vat = this.#getVat(id); + await vat.terminate(); + await this.#vatWorkerService.terminate(id).catch(console.error); + this.#vats.delete(id); } /** diff --git a/packages/kernel/src/Vat.ts b/packages/kernel/src/Vat.ts index 49b88fdf5..ae336011a 100644 --- a/packages/kernel/src/Vat.ts +++ b/packages/kernel/src/Vat.ts @@ -104,7 +104,10 @@ export class Vat { */ async #receiveMessages(reader: Reader): Promise { for await (const rawMessage of reader) { - this.logger.debug('Vat received message', rawMessage); + this.logger.log( + 'Received message', + stringify(rawMessage?.content?.payload), + ); await this.streamEnvelopeReplyHandler.handle(rawMessage); } } From bf1f52c21fbc339c954e005cc9bec2e3a38d735d Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Wed, 23 Oct 2024 15:46:12 +0200 Subject: [PATCH 03/12] fix test --- packages/errors/src/errors/StreamReadError.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/errors/src/errors/StreamReadError.test.ts b/packages/errors/src/errors/StreamReadError.test.ts index 954c8aab6..a733f5550 100644 --- a/packages/errors/src/errors/StreamReadError.test.ts +++ b/packages/errors/src/errors/StreamReadError.test.ts @@ -118,7 +118,7 @@ describe('StreamReadError', () => { expect(() => StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions), ).toThrow( - 'At path: data -- Expected the value to satisfy a union of `object | object`, but received: "invalid data"', + 'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: "invalid data"', ); }); @@ -139,7 +139,7 @@ describe('StreamReadError', () => { expect(() => StreamReadError.unmarshal(marshaledError, unmarshalErrorOptions), ).toThrow( - 'At path: data -- Expected the value to satisfy a union of `object | object`, but received: [object Object]', + 'At path: data -- Expected the value to satisfy a union of `object | object | object`, but received: [object Object]', ); }); }); From 6543d56efaa389edee9cbabeb26edd4bc928655a Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 24 Oct 2024 17:07:46 +0200 Subject: [PATCH 04/12] Add terminateAllVats method --- packages/kernel/src/Kernel.test.ts | 13 +++++++++++++ packages/kernel/src/Kernel.ts | 8 ++++++++ 2 files changed, 21 insertions(+) diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 68ce13a59..0a5573448 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -140,6 +140,19 @@ describe('Kernel', () => { }); }); + describe('terminateAllVats()', () => { + it('deletes all vats from the kernel without errors', async () => { + const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + await kernel.launchVat({ id: 'v0' }); + await kernel.launchVat({ id: 'v1' }); + expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); + await kernel.terminateAllVats(); + expect(terminateMock).toHaveBeenCalledTimes(2); + expect(mockDeleteWorker).toHaveBeenCalledTimes(2); + expect(kernel.getVatIds()).toStrictEqual([]); + }); + }); + describe('restartVat()', () => { it('restarts a vat', async () => { const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 823d06154..931202121 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -210,6 +210,14 @@ export class Kernel { this.#vats.delete(id); } + /** + * Terminate a vat from the kernel. + */ + async terminateAllVats(): Promise { + const vatIds = this.getVatIds(); + await Promise.all(vatIds.map(async (id) => this.terminateVat(id))); + } + /** * Send a message to a vat. * From 88bf76510d5ce9d3afdc4d7f7569a760f93c8e33 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 24 Oct 2024 17:08:22 +0200 Subject: [PATCH 05/12] use terminateAllVats --- packages/extension/src/kernel-worker.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index f51a42186..893e3b39e 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -25,16 +25,13 @@ async function main(): Promise { ); // Initialize kernel store. - const kvStore = await makeSQLKVStore(); // Create and start kernel. - const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); await kernel.init({ defaultVatId: 'v0' }); const vats: VatId[] = ['v1', 'v2', 'v3']; - console.time(`Created vats: ${vats.join(', ')}`); await Promise.all(vats.map(async (id) => kernel.launchVat({ id }))); console.timeEnd(`Created vats: ${vats.join(', ')}`); @@ -52,11 +49,10 @@ async function main(): Promise { }); console.timeEnd('Ping Vat "v1"'); - console.time(`Terminated vats: ${vats.join(', ')}`); - for (const vatId of vats) { - await kernel.terminateVat(vatId); - } - console.timeEnd(`Terminated vats: ${vats.join(', ')}`); + const vatIds = kernel.getVatIds().join(', '); + console.time(`Terminated vats: ${vatIds}`); + await kernel.terminateAllVats(); + console.timeEnd(`Terminated vats: ${vatIds}`); - console.log('Kernel vats:', kernel.getVatIds().join(', ')); + console.log(`Kernel has ${kernel.getVatIds().length} vats`); } From b6edd0326d88f0cc2f118463a5dfdfd75c4fe695 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 25 Oct 2024 11:39:51 +0200 Subject: [PATCH 06/12] use terminateAll --- packages/kernel/src/Kernel.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 931202121..4d359110e 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -214,8 +214,14 @@ export class Kernel { * Terminate a vat from the kernel. */ async terminateAllVats(): Promise { - const vatIds = this.getVatIds(); - await Promise.all(vatIds.map(async (id) => this.terminateVat(id))); + await Promise.all( + this.getVatIds().map(async (id) => { + const vat = this.#getVat(id); + await vat.terminate(); + this.#vats.delete(id); + }), + ); + await this.#vatWorkerService.terminateAll(); } /** From 5b75bdd35c334508b62aace3d8cb2c937736f476 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 25 Oct 2024 11:43:41 +0200 Subject: [PATCH 07/12] fix tests --- packages/kernel/src/Kernel.test.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 0a5573448..e00c89557 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -40,6 +40,7 @@ describe('Kernel', () => { mockWorkerService = { launch: async () => ({}), terminate: async () => undefined, + terminateAll: async () => undefined, } as unknown as VatWorkerService; launchWorkerMock = vi @@ -89,11 +90,11 @@ describe('Kernel', () => { }); it('adds multiple vats to the kernel without errors when no vat with the same ID exists', async () => { - const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); await kernel.launchVat({ id: 'v1' }); expect(initMock).toHaveBeenCalledTimes(2); - expect(mockGetWorkerStreams).toHaveBeenCalledTimes(2); + expect(launchWorkerMock).toHaveBeenCalledTimes(2); expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); }); @@ -142,25 +143,28 @@ describe('Kernel', () => { describe('terminateAllVats()', () => { it('deletes all vats from the kernel without errors', async () => { - const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + const workerTerminateAllMock = vi + .spyOn(mockWorkerService, 'terminateAll') + .mockResolvedValue(undefined); + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); await kernel.launchVat({ id: 'v1' }); expect(kernel.getVatIds()).toStrictEqual(['v0', 'v1']); await kernel.terminateAllVats(); expect(terminateMock).toHaveBeenCalledTimes(2); - expect(mockDeleteWorker).toHaveBeenCalledTimes(2); + expect(workerTerminateAllMock).toHaveBeenCalledOnce(); expect(kernel.getVatIds()).toStrictEqual([]); }); }); describe('restartVat()', () => { it('restarts a vat', async () => { - const kernel = new Kernel(mockStream, mockWorkerService, mockKernelStore); + const kernel = new Kernel(mockStream, mockWorkerService, mockKVStore); await kernel.launchVat({ id: 'v0' }); expect(kernel.getVatIds()).toStrictEqual(['v0']); await kernel.restartVat('v0'); expect(terminateMock).toHaveBeenCalledOnce(); - expect(mockDeleteWorker).toHaveBeenCalledOnce(); + expect(terminateWorkerMock).toHaveBeenCalledOnce(); expect(kernel.getVatIds()).toStrictEqual(['v0']); expect(initMock).toHaveBeenCalledTimes(2); }); From af052315646d4dccea461c37ffd274b50b1eb6a8 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 28 Oct 2024 14:27:34 +0100 Subject: [PATCH 08/12] extract handle many vats to function and fix copy --- packages/extension/src/kernel-worker.ts | 33 +++++++++++++++++++------ packages/kernel/src/Kernel.test.ts | 2 +- packages/kernel/src/Kernel.ts | 12 ++++----- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index 893e3b39e..cb6c5a4a1 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -1,4 +1,5 @@ import './kernel-worker-trusted-prelude.js'; +import type { NonEmptyArray } from '@metamask/utils'; import type { KernelCommand, KernelCommandReply, VatId } from '@ocap/kernel'; import { Kernel, VatCommandMethod } from '@ocap/kernel'; import { MessagePortDuplexStream, receiveMessagePort } from '@ocap/streams'; @@ -31,23 +32,41 @@ async function main(): Promise { const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); await kernel.init({ defaultVatId: 'v0' }); - const vats: VatId[] = ['v1', 'v2', 'v3']; + // Handle the lifecycle of multiple vats. + await handleVatLifecycle(kernel, ['v1', 'v2', 'v3']); +} + +/** + * Manages the full lifecycle of an array of vats, including their creation, + * restart, message passing, and termination. + * + * @param kernel The kernel instance. + * @param vats An array of VatIds to be managed. + */ +async function handleVatLifecycle( + kernel: Kernel, + vats: NonEmptyArray, +): Promise { console.time(`Created vats: ${vats.join(', ')}`); await Promise.all(vats.map(async (id) => kernel.launchVat({ id }))); console.timeEnd(`Created vats: ${vats.join(', ')}`); console.log('Kernel vats:', kernel.getVatIds().join(', ')); - console.time('Vat "v2" restart'); - await kernel.restartVat('v2'); - console.timeEnd('Vat "v2" restart'); + // Restart a randomly selected vat from the array. + const vatToRestart = vats[Math.floor(Math.random() * vats.length)] as VatId; + console.time(`Vat "${vatToRestart}" restart`); + await kernel.restartVat(vatToRestart); + console.timeEnd(`Vat "${vatToRestart}" restart`); - console.time('Ping Vat "v1"'); - await kernel.sendMessage('v1', { + // Send a "Ping" message to a randomly selected vat. + const vatToPing = vats[Math.floor(Math.random() * vats.length)] as VatId; + console.time(`Ping Vat "${vatToPing}"`); + await kernel.sendMessage(vatToPing, { method: VatCommandMethod.Ping, params: null, }); - console.timeEnd('Ping Vat "v1"'); + console.timeEnd(`Ping Vat "${vatToPing}"`); const vatIds = kernel.getVatIds().join(', '); console.time(`Terminated vats: ${vatIds}`); diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index e00c89557..7ade18203 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -214,6 +214,6 @@ describe('Kernel', () => { it.todo('starts receiving messages'); - it.todo('throws an error if the stream is invalid'); + it.todo('throws if the stream throws'); }); }); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 4d359110e..22904955d 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -162,7 +162,7 @@ export class Kernel { } /** - * Gets the vat IDs in the kernel. + * Gets the vat IDs. * * @returns An array of vat IDs. */ @@ -171,7 +171,7 @@ export class Kernel { } /** - * Launches a vat in the kernel. + * Launches a vat. * * @param options - The options for launching the vat. * @param options.id - The ID of the vat. @@ -189,7 +189,7 @@ export class Kernel { } /** - * Restarts a vat in the kernel. + * Restarts a vat. * * @param id - The ID of the vat. */ @@ -199,7 +199,7 @@ export class Kernel { } /** - * Terminate a vat from the kernel. + * Terminate a vat. * * @param id - The ID of the vat. */ @@ -211,7 +211,7 @@ export class Kernel { } /** - * Terminate a vat from the kernel. + * Terminate all vats. */ async terminateAllVats(): Promise { await Promise.all( @@ -240,7 +240,7 @@ export class Kernel { } /** - * Gets a vat from the kernel. + * Gets a vat. * * @param id - The ID of the vat. * @returns The vat. From 9a6c38948c8b6db754ab1399bb37c5f4aa52bece Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 28 Oct 2024 15:40:26 +0100 Subject: [PATCH 09/12] Remove defaultVat and add one after vat lifecycle test --- packages/extension/src/kernel-worker.ts | 10 +++++---- packages/kernel/src/Kernel.test.ts | 2 -- packages/kernel/src/Kernel.ts | 28 ++++++++++++------------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index cb6c5a4a1..3b6a2384e 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -30,20 +30,22 @@ async function main(): Promise { // Create and start kernel. const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); - await kernel.init({ defaultVatId: 'v0' }); // Handle the lifecycle of multiple vats. - await handleVatLifecycle(kernel, ['v1', 'v2', 'v3']); + await runVatLifecycle(kernel, ['v1', 'v2', 'v3']); + + // Add default vat. + await kernel.launchVat({ id: 'v0' }); } /** - * Manages the full lifecycle of an array of vats, including their creation, + * Runs the full lifecycle of an array of vats, including their creation, * restart, message passing, and termination. * * @param kernel The kernel instance. * @param vats An array of VatIds to be managed. */ -async function handleVatLifecycle( +async function runVatLifecycle( kernel: Kernel, vats: NonEmptyArray, ): Promise { diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 7ade18203..e6bfe994c 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -207,9 +207,7 @@ describe('Kernel', () => { async () => new Kernel(mockStream, mockWorkerService, mockKVStore), ).not.toThrow(); }); - }); - describe('init()', () => { it.todo('initializes the kernel store'); it.todo('starts receiving messages'); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 22904955d..90ea70046 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -1,6 +1,4 @@ import '@ocap/shims/endoify'; -import type { PromiseKit } from '@endo/promise-kit'; -import { makePromiseKit } from '@endo/promise-kit'; import { StreamReadError, VatAlreadyExistsError, @@ -34,9 +32,6 @@ export class Kernel { readonly #storage: KVStore; - // Hopefully removed when we get to n+1 vats. - readonly #defaultVatKit: PromiseKit; - readonly #logger: Logger; constructor( @@ -49,16 +44,15 @@ export class Kernel { this.#vats = new Map(); this.#vatWorkerService = vatWorkerService; this.#storage = storage; - this.#defaultVatKit = makePromiseKit(); this.#logger = logger ?? makeLogger('[ocap kernel]'); - } - - async init({ defaultVatId }: { defaultVatId: VatId }): Promise { - await this.launchVat({ id: defaultVatId }) - .then(this.#defaultVatKit.resolve) - .catch(this.#defaultVatKit.reject); this.#receiveMessages().catch((error) => { + this.#logger.error('Stream read error occurred:', error); + // Errors thrown here will not be surfaced in the usual synchronous manner + // because #receiveMessages() is awaited within the constructor. + // Any error thrown inside the async loop is 'caught' within this constructor + // call stack but will be displayed as 'Uncaught (in promise)' + // since they occur after the constructor has returned. throw new StreamReadError({ kernelId: 'kernel' }, error); }); } @@ -79,14 +73,20 @@ export class Kernel { await this.#reply({ method, params: 'pong' }); break; case KernelCommandMethod.Evaluate: - vat = await this.#defaultVatKit.promise; + if (!this.#vats.size) { + throw new Error('No vats available to call'); + } + vat = this.#vats.values().next().value as Vat; await this.#reply({ method, params: await this.evaluate(vat.id, params), }); break; case KernelCommandMethod.CapTpCall: - vat = await this.#defaultVatKit.promise; + if (!this.#vats.size) { + throw new Error('No vats available to call'); + } + vat = this.#vats.values().next().value as Vat; await this.#reply({ method, params: stringify(await vat.callCapTp(params)), From e2ed81073e42097baf6235d4a5828ef9359f15fd Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 28 Oct 2024 15:43:15 +0100 Subject: [PATCH 10/12] Restore kernel init method --- packages/extension/src/kernel-worker.ts | 1 + packages/kernel/src/Kernel.test.ts | 2 ++ packages/kernel/src/Kernel.ts | 2 ++ 3 files changed, 5 insertions(+) diff --git a/packages/extension/src/kernel-worker.ts b/packages/extension/src/kernel-worker.ts index 3b6a2384e..ade802455 100644 --- a/packages/extension/src/kernel-worker.ts +++ b/packages/extension/src/kernel-worker.ts @@ -30,6 +30,7 @@ async function main(): Promise { // Create and start kernel. const kernel = new Kernel(kernelStream, vatWorkerClient, kvStore); + await kernel.init(); // Handle the lifecycle of multiple vats. await runVatLifecycle(kernel, ['v1', 'v2', 'v3']); diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index e6bfe994c..7ade18203 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -207,7 +207,9 @@ describe('Kernel', () => { async () => new Kernel(mockStream, mockWorkerService, mockKVStore), ).not.toThrow(); }); + }); + describe('init()', () => { it.todo('initializes the kernel store'); it.todo('starts receiving messages'); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 90ea70046..8e6d7f31e 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -45,7 +45,9 @@ export class Kernel { this.#vatWorkerService = vatWorkerService; this.#storage = storage; this.#logger = logger ?? makeLogger('[ocap kernel]'); + } + async init(): Promise { this.#receiveMessages().catch((error) => { this.#logger.error('Stream read error occurred:', error); // Errors thrown here will not be surfaced in the usual synchronous manner From a8a11a9af4db7a1f7c687db4743aabbb100c0434 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 28 Oct 2024 15:45:17 +0100 Subject: [PATCH 11/12] revert change --- packages/kernel/src/Vat.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/kernel/src/Vat.ts b/packages/kernel/src/Vat.ts index ae336011a..043e5fbe7 100644 --- a/packages/kernel/src/Vat.ts +++ b/packages/kernel/src/Vat.ts @@ -104,10 +104,7 @@ export class Vat { */ async #receiveMessages(reader: Reader): Promise { for await (const rawMessage of reader) { - this.logger.log( - 'Received message', - stringify(rawMessage?.content?.payload), - ); + this.logger.log('Received message', rawMessage); await this.streamEnvelopeReplyHandler.handle(rawMessage); } } From 813adc8910e7f783384aaee9e6b9d8cbb383c397 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Mon, 28 Oct 2024 15:45:55 +0100 Subject: [PATCH 12/12] Revert message --- packages/kernel/src/Vat.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kernel/src/Vat.ts b/packages/kernel/src/Vat.ts index 043e5fbe7..49b88fdf5 100644 --- a/packages/kernel/src/Vat.ts +++ b/packages/kernel/src/Vat.ts @@ -104,7 +104,7 @@ export class Vat { */ async #receiveMessages(reader: Reader): Promise { for await (const rawMessage of reader) { - this.logger.log('Received message', rawMessage); + this.logger.debug('Vat received message', rawMessage); await this.streamEnvelopeReplyHandler.handle(rawMessage); } }