From 100da3fd0f86f19b0f5c488afe23de12b495f213 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 20 Mar 2025 18:34:43 -0700 Subject: [PATCH 1/3] feat: Handle vat resumption on vat or kernel restart Closes #437 --- .../src/kernel-integration/kernel-worker.ts | 26 +- packages/kernel-test/src/liveslots.test.ts | 19 ++ .../kernel-test/src/message-to-promise-vat.js | 89 ++++++ packages/kernel-test/src/resume-vat.js | 117 +++++++ packages/kernel-test/src/resume.test.ts | 298 ++++++++++++++++++ packages/kernel/src/Kernel.test.ts | 12 +- packages/kernel/src/Kernel.ts | 115 ++++--- packages/kernel/src/VatHandle.test.ts | 3 +- packages/kernel/src/VatHandle.ts | 31 +- packages/kernel/src/VatKVStore.ts | 83 ++++- .../kernel/src/store/kernel-store.test.ts | 15 +- packages/kernel/src/store/kernel-store.ts | 181 +++++++++-- packages/kernel/test/storage.ts | 103 +++++- packages/store/src/sqlite/common.test.ts | 1 + packages/store/src/sqlite/common.ts | 4 + packages/store/src/sqlite/nodejs.ts | 11 + packages/store/src/sqlite/wasm.ts | 13 + packages/store/src/types.ts | 1 + vitest.config.ts | 12 +- 19 files changed, 991 insertions(+), 143 deletions(-) create mode 100644 packages/kernel-test/src/message-to-promise-vat.js create mode 100644 packages/kernel-test/src/resume-vat.js create mode 100644 packages/kernel-test/src/resume.test.ts diff --git a/packages/extension/src/kernel-integration/kernel-worker.ts b/packages/extension/src/kernel-integration/kernel-worker.ts index d5451b204..c09d9f407 100644 --- a/packages/extension/src/kernel-integration/kernel-worker.ts +++ b/packages/extension/src/kernel-integration/kernel-worker.ts @@ -21,6 +21,14 @@ import { ExtensionVatWorkerClient } from './VatWorkerClient.ts'; const logger = makeLogger('[kernel worker]'); const DB_FILENAME = 'store.db'; +// XXX Warning: Setting this flag to true causes persistent storage to be +// cleared on extension load. This is a hack to aid development debugging, +// wherein extension reloads are almost exclusively used for retrying from +// scratch after tweaking the code to fix something. Setting the flag will +// prevent the accumulation of long term persistent state, so it should be +// cleared (or simply removed) prior to release. +const ALWAYS_RESET_STORAGE = true; + main().catch(logger.error); /** @@ -44,23 +52,21 @@ async function main(): Promise { const kernelDatabase = await makeSQLKernelDatabase({ dbFilename: DB_FILENAME, }); + const firstTime = !kernelDatabase.kernelKVStore.get('initialized'); const kernel = await Kernel.make( kernelStream, vatWorkerClient, kernelDatabase, { - // XXX Warning: Clearing storage here is a hack to aid development - // debugging, wherein extension reloads are almost exclusively used for - // retrying after tweaking some fix. The following line will prevent - // the accumulation of long term kernel state. - resetStorage: true, + resetStorage: ALWAYS_RESET_STORAGE, }, ); const kernelEngine = new JsonRpcEngine(); kernelEngine.push(loggingMiddleware); kernelEngine.push(createPanelMessageMiddleware(kernel, kernelDatabase)); receiveUiConnections(async (request) => kernelEngine.handle(request), logger); + const launchDefaultSubcluster = firstTime || ALWAYS_RESET_STORAGE; const defaultSubcluster = await fetchValidatedJson( new URL('../vats/default-cluster.json', import.meta.url).href, @@ -74,14 +80,18 @@ async function main(): Promise { // here of `launchSubcluster` turns out to depend on aspects of the IPC // setup completing successfully but those pieces aren't ready in time, then // it could get stuck. Current experience suggests this is not a problem, - // but as yet have only an intuitive sense (i.e., promises, yay) why this + // but as yet we have only an intuitive sense (i.e., promises, yay) why this // might be true rather than a principled explanation that it is necessarily // true. Hence this comment to serve as a marker if some problem crops up // with startup wedging and some poor soul is reading through the code // trying to diagnose it. (async () => { - const result = await kernel.launchSubcluster(defaultSubcluster); - console.log(`Subcluster launched: ${JSON.stringify(result)}`); + if (launchDefaultSubcluster) { + const result = await kernel.launchSubcluster(defaultSubcluster); + console.log(`Subcluster launched: ${JSON.stringify(result)}`); + } else { + console.log(`Resuming kernel execution`); + } })(), ]); } diff --git a/packages/kernel-test/src/liveslots.test.ts b/packages/kernel-test/src/liveslots.test.ts index 8af65311e..1c8774ab0 100644 --- a/packages/kernel-test/src/liveslots.test.ts +++ b/packages/kernel-test/src/liveslots.test.ts @@ -304,4 +304,23 @@ describe('liveslots promise handling', () => { ]; expect(vatLogs).toStrictEqual(reference); }, 30000); + + it('messageToPromise: send to promise before resolution', async () => { + const [bootstrapResult, vatLogs] = await runTestVats( + 'message-to-promise-vat', + 'messageToPromise', + ); + expect(bootstrapResult).toBe('p2succ'); + const reference = [ + `Alice: running test messageToPromise`, + `Alice: invoking loopback`, + `Alice: second result resolved to 'deferred something'`, + `Alice: loopback done`, + `Bob: setup`, + `Bob: doResolve`, + `Bob: thing.doSomething`, + `Bob: loopback`, + ]; + expect(vatLogs).toStrictEqual(reference); + }, 30000); }); diff --git a/packages/kernel-test/src/message-to-promise-vat.js b/packages/kernel-test/src/message-to-promise-vat.js new file mode 100644 index 000000000..17801261d --- /dev/null +++ b/packages/kernel-test/src/message-to-promise-vat.js @@ -0,0 +1,89 @@ +import { E } from '@endo/eventual-send'; +import { Far } from '@endo/marshal'; +import { makePromiseKit } from '@endo/promise-kit'; +/** + * Build function for vats that will run various tests. + * + * @param {*} _vatPowers - Special powers granted to this vat (not used here). + * @param {*} parameters - Initialization parameters from the vat's config object. + * @param {*} _baggage - Root of vat's persistent state (not used here). + * @returns {*} The root object for the new vat. + */ +export function buildRootObject(_vatPowers, parameters, _baggage) { + const name = parameters?.name ?? 'anonymous'; + const test = parameters?.test ?? 'unspecified'; + + /** + * Print a message to the log. + * + * @param {string} message - The message to print. + */ + function log(message) { + console.log(`${name}: ${message}`); + } + + /** + * Print a message to the log, tagged as part of the test output. + * + * @param {string} message - The message to print. + */ + function tlog(message) { + console.log(`::> ${name}: ${message}`); + } + + log(`buildRootObject`); + log(`configuration parameters: ${JSON.stringify(parameters)}`); + + const thing = Far('thing', { + doSomething() { + tlog(`thing.doSomething`); + return `deferred something`; + }, + }); + + let resolveDeferred; + + return Far('root', { + async bootstrap(vats) { + log(`bootstrap start`); + tlog(`running test ${test}`); + const promise1 = E(vats.bob).setup(); + const promise2 = E(promise1).doSomething(); + const doneP = promise2.then( + (res) => { + tlog(`second result resolved to '${res}'`); + return 'p2succ'; + }, + (rej) => { + tlog(`second result rejected with '${rej}'`); + return 'p2fail'; + }, + ); + await E(vats.bob).doResolve(); + tlog(`invoking loopback`); + await E(vats.bob).loopback(); + tlog(`loopback done`); + return doneP; + }, + + // This is a hack that effectively does the job of stdout.flush() even + // though we don't have access to stdout itself here. It makes sure we + // capture all the log output prior to the return value from `bootstrap` + // resolving. + loopback() { + tlog(`loopback`); + return undefined; + }, + + setup() { + tlog(`setup`); + const { promise, resolve } = makePromiseKit(); + resolveDeferred = resolve; + return promise; + }, + doResolve() { + tlog(`doResolve`); + resolveDeferred(thing); + }, + }); +} diff --git a/packages/kernel-test/src/resume-vat.js b/packages/kernel-test/src/resume-vat.js new file mode 100644 index 000000000..3e167b3db --- /dev/null +++ b/packages/kernel-test/src/resume-vat.js @@ -0,0 +1,117 @@ +/* global harden */ +import { E } from '@endo/eventual-send'; +import { Far } from '@endo/marshal'; + +/** + * Build function for generic test vat. + * + * @param {unknown} _vatPowers - Special powers granted to this vat (not used here). + * @param {unknown} parameters - Initialization parameters from the vat's config object. + * @param {unknown} baggage - Root of vat's persistent state (not used here). + * @returns {unknown} The root object for the new vat. + */ +export function buildRootObject(_vatPowers, parameters, baggage) { + const name = parameters?.name ?? 'anonymous'; + + /** + * Print a message to the log. + * + * @param {string} message - The message to print. + */ + function log(message) { + console.log(`${name}: ${message}`); + } + + /** + * Print a message to the log, tagged as part of the test output. + * + * @param {string} message - The message to print. + */ + function tlog(message) { + console.log(`::> ${name}: ${message}`); + } + + log(`buildRootObject`); + + let startCount; + if (baggage.has('name')) { + const savedName = baggage.get('name'); + tlog(`saved name is ${savedName}`); + + startCount = baggage.get('startCount') + 1; + baggage.set('startCount', startCount); + } else { + baggage.init('name', name); + tlog(`saving name`); + + baggage.init('startCount', 1); + startCount = 1; + } + tlog(`start count: ${startCount}`); + + const me = Far('root', { + async bootstrap(vats) { + tlog(`bootstrap()`); + // Explanation for the following bit of gymnastics: we'd like to save + // `vats` itself in the baggage, but we can't because the entry for our + // own root is a local reference and thus not durable, and we can't remove + // this entry from `vats` directly because, being a parameter object, it + // arrived hardened. So instead we have to copy it sans the unwritable element. + const writeVats = {}; + for (const [prop, value] of Object.entries(vats)) { + if (value !== me) { + writeVats[prop] = value; + } + } + baggage.init('vats', harden(writeVats)); + + const pIntroB = E(vats.bob).intro(me); + const pIntroC = E(vats.carol).intro(me); + const pGreetB = E(vats.bob).greet(`hello from ${name}`); + const pGreetC = E(vats.carol).greet(`hello from ${name}`); + const results = await Promise.all([pIntroB, pIntroC, pGreetB, pGreetC]); + const [, , greetB, greetC] = results; + tlog(`Bob answers greeting: '${greetB}'`); + tlog(`Carol answers greeting: '${greetC}'`); + tlog(`end bootstrap`); + await E(vats.bob).loopback(); + return `bootstrap ${name}`; + }, + intro(bootVat) { + tlog(`intro()`); + baggage.init('bootVat', bootVat); + }, + greet(greeting) { + tlog(`greet('${greeting}')`); + return `${name} returns your greeting '${greeting}'`; + }, + async resume() { + tlog(`resume()`); + if (baggage.has('vats')) { + // I am the bootstrap vat + tlog(`resumed vat is bootstrap`); + const vats = baggage.get('vats'); + const pGreetB = E(vats.bob).greet(`hello again from ${name}`); + const pGreetC = E(vats.carol).greet(`hello again from ${name}`); + const [greetB, greetC] = await Promise.all([pGreetB, pGreetC]); + tlog(`Bob answers greeting: '${greetB}'`); + tlog(`Carol answers greeting: '${greetC}'`); + await E(vats.bob).loopback(); + } + if (baggage.has('bootVat')) { + // I am Bob or Carol + tlog(`resumed vat is not bootstrap`); + const bootVat = baggage.get('bootVat'); + const greetBack = await E(bootVat).greet(`hello boot vat from ${name}`); + tlog(`boot vat returns greeting with '${greetBack}'`); + await E(bootVat).loopback(); + } + tlog(`end resume`); + return `resume ${name}`; + }, + loopback() { + return undefined; + }, + }); + return me; +} diff --git a/packages/kernel-test/src/resume.test.ts b/packages/kernel-test/src/resume.test.ts new file mode 100644 index 000000000..e33846144 --- /dev/null +++ b/packages/kernel-test/src/resume.test.ts @@ -0,0 +1,298 @@ +import '@ocap/shims/endoify'; +import { makePromiseKit } from '@endo/promise-kit'; +import type { + KernelCommand, + KernelCommandReply, + ClusterConfig, +} from '@ocap/kernel'; +import { Kernel } from '@ocap/kernel'; +import type { KernelDatabase } from '@ocap/store'; +import { makeSQLKernelDatabase } from '@ocap/store/sqlite/nodejs'; +import { NodeWorkerDuplexStream } from '@ocap/streams'; +import { + MessagePort as NodeMessagePort, + MessageChannel as NodeMessageChannel, +} from 'node:worker_threads'; +import { describe, expect, it } from 'vitest'; + +import { kunser } from '../../kernel/src/kernel-marshal.ts'; +import { NodejsVatWorkerService } from '../../nodejs/src/kernel/VatWorkerService.ts'; + +const origStdoutWrite = process.stdout.write.bind(process.stdout); +let buffered: string = ''; +// @ts-expect-error Some type def used by lint is just wrong (compiler likes it ok, but lint whines) +process.stdout.write = (buffer: string, encoding, callback): void => { + buffered += buffer; + origStdoutWrite(buffer, encoding, callback); +}; + +/** + * Construct a bundle path URL from a bundle name. + * + * @param bundleName - The name of the bundle. + * + * @returns a path string for the named bundle. + */ +function bundleSpec(bundleName: string): string { + return new URL(`${bundleName}.bundle`, import.meta.url).toString(); +} + +const testSubcluster = { + bootstrap: 'alice', + forceReset: true, + bundles: null, // grrrrr + vats: { + alice: { + bundleSpec: bundleSpec('resume-vat'), + parameters: { + name: 'Alice', + }, + }, + bob: { + bundleSpec: bundleSpec('resume-vat'), + parameters: { + name: 'Bob', + }, + }, + carol: { + bundleSpec: bundleSpec('resume-vat'), + parameters: { + name: 'Carol', + }, + }, + }, +}; + +/** + * Handle all the boilerplate to set up a kernel instance. + * + * @param kernelDatabase - The database that will hold the persistent state. + * @param resetStorage - If true, reset the database as part of setting up. + * + * @returns the new kernel instance. + */ +async function makeKernel( + kernelDatabase: KernelDatabase, + resetStorage: boolean, +): Promise { + const kernelPort: NodeMessagePort = new NodeMessageChannel().port1; + const nodeStream = new NodeWorkerDuplexStream< + KernelCommand, + KernelCommandReply + >(kernelPort); + const vatWorkerClient = new NodejsVatWorkerService({}); + const kernel = await Kernel.make( + nodeStream, + vatWorkerClient, + kernelDatabase, + { + resetStorage, + }, + ); + return kernel; +} + +/** + * Take a pass through the JavaScript run loop. + * + * @param delay - Optional delay (in ms) to wait for things to catch up. + */ +async function waitForQuiescence(delay: number = 0): Promise { + const { promise, resolve } = makePromiseKit(); + setTimeout(() => resolve(null), delay); + await promise; +} + +/** + * De-interleave various vats' output to squeeze out interprocess I/O + * non-determinism in CI. + * + * @param logs - An array of log lines. + * + * @returns `logs` sorted by vat. + */ +function sortLogs(logs: string[]): string[] { + logs.sort((a: string, b: string): number => { + const colonA = a.indexOf(':'); + if (colonA < 0) { + return 0; + } + const prefixA = a.substring(0, colonA); + const colonB = b.indexOf(':'); + if (colonB < 0) { + return 0; + } + const prefixB = b.substring(0, colonB); + return prefixA.localeCompare(prefixB); + }); + return logs; +} + +/** + * Convert a raw output buffer into a list of lines suitable for examination. + * + * @param buffer - The raw buffer to convert. + * + * @returns the relevant contents of `buffer`, massaged for use. + */ +function extractVatLogs(buffer: string): string[] { + const result = buffer + .split('\n') + .filter((line: string) => line.startsWith('::> ')) + .map((line: string) => line.slice(4)); + return sortLogs(result); +} + +/** + * Bootstrap the set of test vats. + * + * @param kernel - The kernel to run in. + * @param config - Subcluster configuration telling what vats to run. + * + * @returns the bootstrap result. + */ +async function runBootstrap( + kernel: Kernel, + config: ClusterConfig, +): Promise { + const bootstrapResultRaw = await kernel.launchSubcluster(config); + if (bootstrapResultRaw === undefined) { + throw Error(`this can't happen but eslint is stupid`); + } + return kunser(bootstrapResultRaw); +} + +/** + * Send the `resume message to the root of one of the test vats. + * + * @param kernel - Our kernel. + * @param rootRef - KRef of the object to which the message is sent. + * + * @returns the result returned from `resume`. + */ +async function runResume(kernel: Kernel, rootRef: string): Promise { + const resumeResultRaw = await kernel.queueMessageFromKernel( + rootRef, + 'resume', + [], + ); + return kunser(resumeResultRaw); +} + +const bootstrapReference = [ + `Alice: saving name`, + `Alice: start count: 1`, + `Bob: saving name`, + `Bob: start count: 1`, + `Carol: saving name`, + `Carol: start count: 1`, + `Alice: bootstrap()`, + `Bob: intro()`, + `Carol: intro()`, + `Bob: greet('hello from Alice')`, + `Carol: greet('hello from Alice')`, + `Alice: Bob answers greeting: 'Bob returns your greeting 'hello from Alice''`, + `Alice: Carol answers greeting: 'Carol returns your greeting 'hello from Alice''`, + `Alice: end bootstrap`, +]; +const aliceRestartReference = [ + `Alice: saved name is Alice`, + `Alice: start count: 2`, +]; +const aliceResumeReference = [ + `Alice: resume()`, + `Alice: resumed vat is bootstrap`, + `Bob: greet('hello again from Alice')`, + `Carol: greet('hello again from Alice')`, + `Alice: Bob answers greeting: 'Bob returns your greeting 'hello again from Alice''`, + `Alice: Carol answers greeting: 'Carol returns your greeting 'hello again from Alice''`, + `Alice: end resume`, +]; +// prettier-ignore +const bobRestartReference = [ + `Bob: saved name is Bob`, + `Bob: start count: 2`, +]; +const bobResumeReference = [ + `Bob: resume()`, + `Bob: resumed vat is not bootstrap`, + `Alice: greet('hello boot vat from Bob')`, + `Bob: boot vat returns greeting with 'Alice returns your greeting 'hello boot vat from Bob''`, + `Bob: end resume`, +]; +const carolRestartReference = [ + `Carol: saved name is Carol`, + `Carol: start count: 2`, +]; +const carolResumeReference = [ + `Carol: resume()`, + `Carol: resumed vat is not bootstrap`, + `Alice: greet('hello boot vat from Carol')`, + `Carol: boot vat returns greeting with 'Alice returns your greeting 'hello boot vat from Carol''`, + `Carol: end resume`, +]; +const reference = sortLogs([ + ...bootstrapReference, + + ...aliceRestartReference, + ...bobRestartReference, + ...carolRestartReference, + + ...aliceResumeReference, + ...bobResumeReference, + ...carolResumeReference, +]); + +describe('restarting vats', async () => { + it('exercise restart vats individually', async () => { + const kernelDatabase = await makeSQLKernelDatabase({ + dbFilename: ':memory:', + }); + const kernel = await makeKernel(kernelDatabase, true); + + buffered = ''; + const bootstrapResult = await runBootstrap(kernel, testSubcluster); + expect(bootstrapResult).toBe('bootstrap Alice'); + + await waitForQuiescence(); + await kernel.restartVat('v1'); + await kernel.restartVat('v2'); + await kernel.restartVat('v3'); + + const resumeResultA = await runResume(kernel, 'ko1'); + expect(resumeResultA).toBe('resume Alice'); + const resumeResultB = await runResume(kernel, 'ko2'); + expect(resumeResultB).toBe('resume Bob'); + const resumeResultC = await runResume(kernel, 'ko3'); + expect(resumeResultC).toBe('resume Carol'); + + await waitForQuiescence(1000); + const vatLogs = extractVatLogs(buffered); + expect(vatLogs).toStrictEqual(reference); + }, 30000); + + it('exercise restart kernel', async () => { + const kernelDatabase = await makeSQLKernelDatabase({ + dbFilename: ':memory:', + }); + const kernel1 = await makeKernel(kernelDatabase, true); + + buffered = ''; + const bootstrapResult = await runBootstrap(kernel1, testSubcluster); + expect(bootstrapResult).toBe('bootstrap Alice'); + await waitForQuiescence(); + + const kernel2 = await makeKernel(kernelDatabase, false); + + const resumeResultA = await runResume(kernel2, 'ko1'); + expect(resumeResultA).toBe('resume Alice'); + const resumeResultB = await runResume(kernel2, 'ko2'); + expect(resumeResultB).toBe('resume Bob'); + const resumeResultC = await runResume(kernel2, 'ko3'); + expect(resumeResultC).toBe('resume Carol'); + + await waitForQuiescence(1000); + const vatLogs = extractVatLogs(buffered); + expect(vatLogs).toStrictEqual(reference); + }, 30000); +}); diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index dc6a8c7e9..43c6f9fb2 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -194,8 +194,8 @@ describe('Kernel', () => { describe('terminateAllVats()', () => { it('deletes all vats from the kernel without errors', async () => { - const workerTerminateAllMock = vi - .spyOn(mockWorkerService, 'terminateAll') + const workerTerminateMock = vi + .spyOn(mockWorkerService, 'terminate') .mockResolvedValue(undefined); const kernel = await Kernel.make( mockStream, @@ -209,7 +209,7 @@ describe('Kernel', () => { await kernel.terminateAllVats(); expect(vatHandles[0]?.terminate).toHaveBeenCalledOnce(); expect(vatHandles[1]?.terminate).toHaveBeenCalledOnce(); - expect(workerTerminateAllMock).toHaveBeenCalledOnce(); + expect(workerTerminateMock).toHaveBeenCalledTimes(2); expect(kernel.getVatIds()).toStrictEqual([]); }); }); @@ -367,15 +367,11 @@ describe('Kernel', () => { ); kernel.clusterConfig = makeMockClusterConfig(); await kernel.launchVat(makeMockVatConfig()); - const workerTerminateAllMock = vi - .spyOn(mockWorkerService, 'terminateAll') - .mockResolvedValue(undefined); const launchSubclusterMock = vi .spyOn(kernel, 'launchSubcluster') .mockResolvedValueOnce(undefined); await kernel.reload(); expect(vatHandles[0]?.terminate).toHaveBeenCalledTimes(1); - expect(workerTerminateAllMock).toHaveBeenCalledOnce(); expect(launchSubclusterMock).toHaveBeenCalledOnce(); }); @@ -396,7 +392,7 @@ describe('Kernel', () => { ); kernel.clusterConfig = makeMockClusterConfig(); const error = new Error('Termination failed'); - vi.spyOn(mockWorkerService, 'terminateAll').mockRejectedValueOnce(error); + vi.spyOn(kernel, 'terminateAllVats').mockRejectedValueOnce(error); await expect(kernel.reload()).rejects.toThrow(error); }); }); diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index eed95cd5b..6bc5eaf5d 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -82,9 +82,6 @@ export class Kernel { /** Logger for outputting messages (such as errors) to the console */ readonly #logger: Logger; - /** Count of currently pending entries in the kernel's run queue */ - #runQueueLength: number; - /** Thunk to signal run queue transition from empty to non-empty */ #wakeUpTheRunQueue: (() => void) | null; @@ -120,13 +117,11 @@ export class Kernel { this.#vats = new Map(); this.#vatWorkerService = vatWorkerService; + this.#kernelStore = makeKernelStore(kernelDatabase); if (options.resetStorage) { - kernelDatabase.clear(); + this.#resetKernelState(); } - - this.#kernelStore = makeKernelStore(kernelDatabase); this.#logger = options.logger ?? makeLogger('[ocap kernel]'); - this.#runQueueLength = this.#kernelStore.runQueueLength(); this.#wakeUpTheRunQueue = null; } @@ -169,6 +164,11 @@ export class Kernel { this.#logger.error('Stream read error:', error); throw new StreamReadError({ kernelId: 'kernel' }, error); }); + const starts: Promise[] = []; + for (const { vatID, vatConfig } of this.#kernelStore.getAllVatRecords()) { + starts.push(this.#runVat(vatID, vatConfig)); + } + await Promise.all(starts); this.#run().catch((error) => { this.#logger.error('Run loop error:', error); throw error; @@ -204,7 +204,7 @@ export class Kernel { continue; } - while (this.#runQueueLength > 0) { + while (this.#kernelStore.runQueueLength() > 0) { const item = this.#dequeueRun(); if (item) { yield item; @@ -213,7 +213,7 @@ export class Kernel { } } - if (this.#runQueueLength === 0) { + if (this.#kernelStore.runQueueLength() === 0) { const { promise, resolve } = makePromiseKit(); if (this.#wakeUpTheRunQueue !== null) { Fail`wakeUpTheRunQueue function already set`; @@ -315,15 +315,29 @@ export class Kernel { })); } + /** + * Launches a new vat. + * + * @param vatConfig - Configuration for the new vat. + * + * @returns a promise for the KRef of the new vat's root object. + */ + async launchVat(vatConfig: VatConfig): Promise { + const vatId = this.#kernelStore.getNextVatId(); + await this.#runVat(vatId, vatConfig); + this.#kernelStore.initEndpoint(vatId); + const rootRef = this.exportFromVat(vatId, ROOT_OBJECT_VREF); + this.#kernelStore.setVatConfig(vatId, vatConfig); + return rootRef; + } + /** * Start a new or resurrected vat running. * * @param vatId - The ID of the vat to start. * @param vatConfig - Its configuration. - * - * @returns a promise for the KRef of the vat's root object. */ - async #startVat(vatId: VatId, vatConfig: VatConfig): Promise { + async #runVat(vatId: VatId, vatConfig: VatConfig): Promise { if (this.#vats.has(vatId)) { throw new VatAlreadyExistsError(vatId); } @@ -336,20 +350,6 @@ export class Kernel { kernelStore: this.#kernelStore, }); this.#vats.set(vatId, vat); - this.#kernelStore.initEndpoint(vatId); - const rootRef = this.exportFromVat(vatId, ROOT_OBJECT_VREF); - return rootRef; - } - - /** - * Launches a vat. - * - * @param vatConfig - Configuration for the new vat. - * - * @returns a promise for the KRef of the new vat's root object. - */ - async launchVat(vatConfig: VatConfig): Promise { - return this.#startVat(this.#kernelStore.getNextVatId(), vatConfig); } /** @@ -417,8 +417,7 @@ export class Kernel { */ enqueueRun(item: RunQueueItem): void { this.#kernelStore.enqueueRun(item); - this.#runQueueLength += 1; - if (this.#runQueueLength === 1 && this.#wakeUpTheRunQueue) { + if (this.#kernelStore.runQueueLength() === 1 && this.#wakeUpTheRunQueue) { const wakeUpTheRunQueue = this.#wakeUpTheRunQueue; this.#wakeUpTheRunQueue = null; wakeUpTheRunQueue(); @@ -431,9 +430,7 @@ export class Kernel { * @returns the next item in the run queue, or undefined if the queue is empty. */ #dequeueRun(): RunQueueItem | undefined { - this.#runQueueLength -= 1; - const result = this.#kernelStore.dequeueRun(); - return result; + return this.#kernelStore.dequeueRun(); } /** @@ -558,7 +555,7 @@ export class Kernel { context === 'kernel' && isPromise, `${kpid} is not a kernel promise`, ); - log(`@@@@ deliver ${vatId} notify ${kpid}`); + log(`@@@@ deliver ${vatId} notify ${vatId} ${kpid}`); const promise = this.#kernelStore.getKernelPromise(kpid); const { state, value } = promise; assert(value, `no value for promise ${kpid}`); @@ -591,7 +588,7 @@ export class Kernel { } const vat = this.#getVat(vatId); await vat.deliverNotify(resolutions); - log(`@@@@ done ${vatId} notify ${kpid}`); + log(`@@@@ done ${vatId} notify ${vatId} ${kpid}`); break; } case 'dropExports': { @@ -772,17 +769,13 @@ export class Kernel { rootIds[vatName] = rootRef; roots[vatName] = kslot(rootRef, 'vatRoot'); } - let resultP: Promise | undefined> = - Promise.resolve(undefined); if (config.bootstrap) { const bootstrapRoot = rootIds[config.bootstrap]; if (bootstrapRoot) { - resultP = this.queueMessageFromKernel(bootstrapRoot, 'bootstrap', [ - roots, - ]); + return this.queueMessageFromKernel(bootstrapRoot, 'bootstrap', [roots]); } } - return resultP; + return undefined; } /** @@ -797,42 +790,57 @@ export class Kernel { throw new VatNotFoundError(vatId); } const { config } = vat; - await this.terminateVat(vatId); - await this.#startVat(vatId, config); + await this.#stopVat(vatId, false); + await this.#runVat(vatId, config); return vat; } /** - * Terminate a vat. + * Stop a vat from running. + * + * Note that after this operation, the vat will be in a weird twilight zone + * between existence and nonexistence, so this operation should only be used + * as a component of vat restart (which will push it back into existence) or + * vat termination (which will push it all the way into nonexistence). * * @param vatId - The ID of the vat. + * @param terminating - If true, the vat is being killed, if false, it's being + * restarted. */ - async terminateVat(vatId: VatId): Promise { + async #stopVat(vatId: VatId, terminating: boolean): Promise { const vat = this.#getVat(vatId); if (!vat) { throw new VatNotFoundError(vatId); } - await vat.terminate(); + await vat.terminate(terminating); await this.#vatWorkerService.terminate(vatId).catch(console.error); this.#vats.delete(vatId); } + /** + * Terminate a vat with extreme prejudice. + * + * @param vatId - The ID of the vat. + */ + async terminateVat(vatId: VatId): Promise { + await this.#stopVat(vatId, true); + this.#kernelStore.deleteVatConfig(vatId); + } + /** * Terminate all vats. */ async terminateAllVats(): Promise { await Promise.all( this.getVatIds().map(async (id) => { - const vat = this.#getVat(id); - await vat.terminate(); - this.#vats.delete(id); + await this.terminateVat(id); }), ); - await this.#vatWorkerService.terminateAll(); } /** - * Reload the kernel. + * Terminate all running vats and reload the default subcluster. + * This is for debugging purposes only. */ async reload(): Promise { if (!this.#mostRecentSubcluster) { @@ -867,10 +875,17 @@ export class Kernel { } /** - * Reset the kernel state. + * Stop all running vats and reset the kernel state. */ async reset(): Promise { await this.terminateAllVats(); + this.#resetKernelState(); + } + + /** + * Reset the kernel state. + */ + #resetKernelState(): void { this.#kernelStore.clear(); this.#kernelStore.reset(); } diff --git a/packages/kernel/src/VatHandle.test.ts b/packages/kernel/src/VatHandle.test.ts index 9a62e6847..0b688d825 100644 --- a/packages/kernel/src/VatHandle.test.ts +++ b/packages/kernel/src/VatHandle.test.ts @@ -144,9 +144,10 @@ describe('VatHandle', () => { params: [], }); - await vat.terminate(); + await vat.terminate(true); await expect(messagePromise).rejects.toThrow('Vat was deleted.'); + expect(await stream.next()).toStrictEqual({ done: true, value: undefined, diff --git a/packages/kernel/src/VatHandle.ts b/packages/kernel/src/VatHandle.ts index bbc54597f..51edaafbf 100644 --- a/packages/kernel/src/VatHandle.ts +++ b/packages/kernel/src/VatHandle.ts @@ -11,6 +11,7 @@ import type { DuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, makeCounter } from '@ocap/utils'; +import { kser } from './kernel-marshal.ts'; import type { Kernel } from './Kernel.ts'; import { VatCommandMethod } from './messages/index.ts'; import type { @@ -138,7 +139,10 @@ export class VatHandle { Promise.all([this.#vatStream.drain(this.handleMessage.bind(this))]).catch( async (error) => { this.#logger.error(`Unexpected read error`, error); - await this.terminate(new StreamReadError({ vatId: this.vatId }, error)); + await this.terminate( + true, + new StreamReadError({ vatId: this.vatId }, error), + ); }, ); @@ -574,20 +578,33 @@ export class VatHandle { /** * Terminates the vat. * + * @param terminating - If true, the vat is being killed permanently, so clean + * up its state and reject any promises that would be left dangling. * @param error - The error to terminate the vat with. */ - async terminate(error?: Error): Promise { + async terminate(terminating: boolean, error?: Error): Promise { await this.#vatStream.end(error); - // Handle orphaned messages - for (const [messageId, promiseCallback] of this.#unresolvedMessages) { - promiseCallback?.reject(error ?? new VatDeletedError(this.vatId)); - this.#unresolvedMessages.delete(messageId); + if (terminating) { + // Reject promises exported to other vats for which this vat is the decider + const failure = kser(new VatDeletedError(this.vatId)); + for (const kpid of this.#kernelStore.getPromisesByDecider(this.vatId)) { + this.#kernel.doResolve(this.vatId, [[kpid, true, failure]]); + } + + // Reject promises for results of method invocations from the kernel + for (const [messageId, promiseCallback] of this.#unresolvedMessages) { + promiseCallback?.reject(error ?? new VatDeletedError(this.vatId)); + this.#unresolvedMessages.delete(messageId); + } + + // Expunge this vat's persistent state + this.#kernelStore.deleteVat(this.vatId); } } /** - * Send a command to a vat. + * Send a command into the vat. * * @param payload - The command to send. * @returns A promise that resolves the response to the command. diff --git a/packages/kernel/src/VatKVStore.ts b/packages/kernel/src/VatKVStore.ts index ace35601c..b1a79ba3c 100644 --- a/packages/kernel/src/VatKVStore.ts +++ b/packages/kernel/src/VatKVStore.ts @@ -6,6 +6,8 @@ export type VatKVStore = KVStore & { checkpoint(): VatCheckpoint; }; +/* eslint-disable no-lonely-if, no-else-return */ // stupid rules that encourage unclear code + /** * Create an in-memory VatKVStore for a vat, backed by a Map and tracking * changes so that they can be reported at the end of a crank. @@ -17,6 +19,48 @@ export type VatKVStore = KVStore & { export function makeVatKVStore(state: Map): VatKVStore { let sets: Map = new Map(); let deletes: Set = new Set(); + let keyCache: string[] | null = null; + let lastNextKey: string | null = null; + let lastNextKeyIndex: number = -1; + + /** + * Binary search for key position. + * I totally can't believe I have to write this in 2025. + * + * @param key - The key to search `keyCache` for. + * + * @returns the index into `keyCache` of the first key that is greater than + * `key`, or -1 if no such key exists. + */ + function search(key: string): number { + if (keyCache === null) { + // This shouldn't happen, but just in case... + return -1; + } + let beg = 0; + let end = keyCache.length - 1; + if (key < (keyCache[beg] as string)) { + return beg; + } + if ((keyCache[end] as string) < key) { + return -1; + } + while (beg <= end) { + const mid = Math.floor((beg + end) / 2); + if (keyCache[mid] === key) { + return mid; + } + if (key < (keyCache[mid] as string)) { + end = mid - 1; + } else { + beg = mid + 1; + } + if (beg === end) { + return beg; + } + } + return -1; + } return { get(key: string): string | undefined { @@ -27,28 +71,45 @@ export function makeVatKVStore(state: Map): VatKVStore { if (result) { return result; } - throw Error(`no record matching key '${key}'`); + throw Error(`no value matching key '${key}'`); }, - getNextKey(_previousKey: string): string | undefined { - // WARNING: this is a VERY expensive and complicated operation to - // implement if the backing store is an ordinary Map object fronted by the - // sets & deletes tables as we are doing here. However, the only customer - // of this KVStore is Liveslots, which does not use this operation, so it - // is not actually required. This "implementation" simply returns - // undefined, solely in interest of making the compiler happy -- it does - // not actually work! If you try to use it expecting something useful, it - // will go badly for you. - return undefined; + getNextKey(key: string): string | undefined { + if (keyCache === null) { + keyCache = Array.from(state.keys()).sort(); + } + const index = lastNextKey === key ? lastNextKeyIndex : search(key); + if (index < 0) { + lastNextKey = null; + lastNextKeyIndex = -1; + return undefined; + } + lastNextKey = keyCache[index] as string; + if (key < lastNextKey) { + lastNextKeyIndex = index; + return lastNextKey; + } else { + if (index + 1 >= keyCache.length) { + lastNextKey = null; + lastNextKeyIndex = -1; + return undefined; + } else { + lastNextKey = keyCache[index + 1] as string; + lastNextKeyIndex = index + 1; + return lastNextKey; + } + } }, set(key: string, value: string): void { state.set(key, value); sets.set(key, value); deletes.delete(key); + keyCache = null; }, delete(key: string): void { state.delete(key); sets.delete(key); deletes.add(key); + keyCache = null; }, checkpoint(): VatCheckpoint { const result: VatCheckpoint = [sets, deletes]; diff --git a/packages/kernel/src/store/kernel-store.test.ts b/packages/kernel/src/store/kernel-store.test.ts index 83799c1ed..4726a0eef 100644 --- a/packages/kernel/src/store/kernel-store.test.ts +++ b/packages/kernel/src/store/kernel-store.test.ts @@ -59,18 +59,21 @@ describe('kernel store', () => { 'allocateErefForKref', 'clear', 'clearReachableFlag', - 'createStoredQueue', 'decRefCount', 'decrementRefCount', 'deleteClistEntry', + 'deleteEndpoint', 'deleteKernelObject', 'deleteKernelPromise', + 'deleteVat', + 'deleteVatConfig', 'dequeueRun', 'enqueuePromiseMessage', 'enqueueRun', 'erefToKref', 'forgetEref', 'forgetKref', + 'getAllVatRecords', 'getGCActions', 'getKernelPromise', 'getKernelPromiseMessageQueue', @@ -78,9 +81,12 @@ describe('kernel store', () => { 'getNextVatId', 'getObjectRefCount', 'getOwner', + 'getPromisesByDecider', 'getQueueLength', 'getReachableFlag', 'getRefCount', + 'getRefCount', + 'getVatConfig', 'hasCListEntry', 'incRefCount', 'incrementRefCount', @@ -99,6 +105,7 @@ describe('kernel store', () => { 'setGCActions', 'setObjectRefCount', 'setPromiseDecider', + 'setVatConfig', ]); }); }); @@ -191,12 +198,6 @@ describe('kernel store', () => { expect(() => ks.getKernelPromise('kp1')).toThrow( 'unknown kernel promise kp1', ); - expect(() => ks.enqueuePromiseMessage('kp1', mm('not really'))).toThrow( - 'queue kp1 not initialized', - ); - expect(() => ks.getKernelPromiseMessageQueue('kp1')).toThrow( - 'queue kp1 not initialized', - ); expect(() => ks.getKernelPromise('kp99')).toThrow( 'unknown kernel promise kp99', ); diff --git a/packages/kernel/src/store/kernel-store.ts b/packages/kernel/src/store/kernel-store.ts index 2d059fe3c..5aed217dd 100644 --- a/packages/kernel/src/store/kernel-store.ts +++ b/packages/kernel/src/store/kernel-store.ts @@ -39,14 +39,16 @@ * ${kpid}.value = JSON(CAPDATA) // value settled to, if settled * * C-lists - * cle.${endpointId}.${eref} = ${kref} // ERef->KRef mapping - * clk.${endpointId}.${kref} = ${eref} // KRef->ERef mapping + * cle.${endid}.${eref} = ${kref} // ERef->KRef mapping + * clk.${endid}.${kref} = ${eref} // KRef->ERef mapping * * Vat bookkeeping * e.nextObjectId.${endid} = NN // allocation counter for imported object ERefs * e.nextPromiseId.${endid} = NN // allocation counter for imported promise ERefs + * vatConfig.${vatid} = JSON(CONFIG) // vat's configuration object * * Kernel bookkeeping + * initialized = true // if set, indicates the store has been initialized * nextVatId = NN // allocation counter for vat IDs * nextRemoteId = NN // allocation counter for remote IDs * k.nextObjectId = NN // allocation counter for object KRefs @@ -71,10 +73,12 @@ import type { KRef, ERef, RunQueueItem, + RunQueueItemSend, PromiseState, KernelPromise, RunQueueItemBringOutYourDead, GCAction, + VatConfig, } from '../types.ts'; import { insistGCActionType, insistVatId, RunQueueItemType } from '../types.ts'; @@ -90,6 +94,14 @@ type StoredQueue = { delete(): void; }; +type VatRecord = { + vatID: VatId; + vatConfig: VatConfig; +}; + +const VAT_CONFIG_BASE = 'vatConfig.'; +const VAT_CONFIG_BASE_LEN = VAT_CONFIG_BASE.length; + /** * Test if a KRef designates a promise. * @@ -123,7 +135,9 @@ export function makeKernelStore(kdb: KernelDatabase) { const kv: KVStore = kdb.kernelKVStore; /** The kernel's run queue. */ - let runQueue = createStoredQueue('run', true); + let runQueue = provideStoredQueue('run', true); + /** Cache of the run queue's current length */ + let runQueueLengthCache = -1; /** Counter for allocating VatIDs */ let nextVatId = provideCachedStoredValue('nextVatId', '1'); /** Counter for allocating RemoteIDs */ @@ -219,23 +233,6 @@ export function makeKernelStore(kdb: KernelDatabase) { return current as string; } - /** - * Create a new (empty) persistently stored queue. - * - * @param queueName - The name for the queue (must be unique among queues). - * @param cached - Optional flag: set to true if the queue should cache its - * @returns An object for interacting with the new queue. - */ - function createStoredQueue( - queueName: string, - cached: boolean = false, - ): StoredQueue { - const qk = `queue.${queueName}`; - kv.set(`${qk}.head`, '1'); - kv.set(`${qk}.tail`, '1'); - return provideStoredQueue(queueName, cached); - } - /** * Find out how long some queue is. * @@ -271,8 +268,8 @@ export function makeKernelStore(kdb: KernelDatabase) { const provideValue = cached ? provideCachedStoredValue : provideRawStoredValue; - const head = provideValue(`${qk}.head`); - const tail = provideValue(`${qk}.tail`); + const head = provideValue(`${qk}.head`, '1'); + const tail = provideValue(`${qk}.tail`, '1'); if (head.get() === undefined || tail.get() === undefined) { throw Error(`queue ${queueName} not initialized`); } @@ -319,6 +316,7 @@ export function makeKernelStore(kdb: KernelDatabase) { * @param message - The message to enqueue. */ function enqueueRun(message: RunQueueItem): void { + runQueueLengthCache += 1; runQueue.enqueue(message); } @@ -329,6 +327,7 @@ export function makeKernelStore(kdb: KernelDatabase) { * empty. */ function dequeueRun(): RunQueueItem | undefined { + runQueueLengthCache -= 1; return runQueue.dequeue() as RunQueueItem | undefined; } @@ -338,7 +337,10 @@ export function makeKernelStore(kdb: KernelDatabase) { * @returns the number of items in the run queue. */ function runQueueLength(): number { - return getQueueLength('run'); + if (runQueueLengthCache < 0) { + runQueueLengthCache = getQueueLength('run'); + } + return runQueueLengthCache; } /** @@ -513,7 +515,7 @@ export function makeKernelStore(kdb: KernelDatabase) { subscribers: [], }; const kpid = getNextPromiseId(); - createStoredQueue(kpid, false); + provideStoredQueue(kpid, false); kv.set(`${kpid}.state`, 'unresolved'); kv.set(`${kpid}.subscribers`, '[]'); kv.set(refCountKey(kpid), '1'); @@ -629,12 +631,18 @@ export function makeKernelStore(kdb: KernelDatabase) { ): void { const queue = provideStoredQueue(kpid, false); for (const message of getKernelPromiseMessageQueue(kpid)) { - queue.enqueue(message); + const messageItem: RunQueueItemSend = { + type: 'send', + target: kpid, + message, + }; + enqueueRun(messageItem); } kv.set(`${kpid}.state`, rejected ? 'rejected' : 'fulfilled'); kv.set(`${kpid}.value`, JSON.stringify(value)); kv.delete(`${kpid}.decider`); kv.delete(`${kpid}.subscribers`); + queue.delete(); } /** @@ -746,6 +754,117 @@ export function makeKernelStore(kdb: KernelDatabase) { } } + /** + * Generator that yields all the keys beginning with a given prefix. + * + * @param prefix - The prefix of interest. + * + * @yields the keys that start with `prefix`. + */ + function* getPrefixedKeys(prefix: string): Generator { + let key: string | undefined = prefix; + for (;;) { + key = kv.getNextKey(key); + if (!key) { + break; + } + if (!key.startsWith(prefix)) { + break; + } + yield key; + } + } + + /** + * Generator that yield the promises decided by a given vat. + * + * @param decider - The vat ID of the vat of interest. + * + * @yields the kpids of all the promises decided by `decider`. + */ + function* getPromisesByDecider(decider: VatId): Generator { + const basePrefix = `cle.${decider}.`; + for (const key of getPrefixedKeys(`${basePrefix}p`)) { + const kpid = kv.getRequired(key); + const kp = getKernelPromise(kpid); + if (kp.state === 'unresolved' && kp.decider === decider) { + yield kpid; + } + } + } + + /** + * Delete all persistent state associated with an endpoint. + * + * @param endpointId - The endpoint whose state is to be deleted. + */ + function deleteEndpoint(endpointId: EndpointId): void { + for (const key of getPrefixedKeys(`cle.${endpointId}.`)) { + kv.delete(key); + } + for (const key of getPrefixedKeys(`clk.${endpointId}.`)) { + kv.delete(key); + } + kv.delete(`e.nextObjectId.${endpointId}`); + kv.delete(`e.nextPromiseId.${endpointId}`); + } + + /** + * Delete all persistent state associated with a vat. + * + * @param vatId - The vat whose state is to be deleted. + */ + function deleteVat(vatId: VatId): void { + deleteEndpoint(vatId); + kv.delete(`${VAT_CONFIG_BASE}${vatId}`); + kdb.deleteVatStore(vatId); + } + + /** + * Generator that yields the configurations of running vats. + * + * @yields a series of vat records for all configured vats. + */ + function* getAllVatRecords(): Generator { + for (const vatKey of getPrefixedKeys(VAT_CONFIG_BASE)) { + const vatID = vatKey.slice(VAT_CONFIG_BASE_LEN); + const vatConfig = getVatConfig(vatID); + yield { vatID, vatConfig }; + } + } + + /** + * Fetch the stored configuration for a vat. + * + * @param vatID - The vat whose configuration is sought. + * + * @returns the configuration for the given vat. + */ + function getVatConfig(vatID: VatId): VatConfig { + return JSON.parse( + kv.getRequired(`${VAT_CONFIG_BASE}${vatID}`), + ) as VatConfig; + } + + /** + * Store the configuration for a vat. + * + * @param vatID - The vat whose configuration is to be set. + * @param vatConfig - The configuration to write. + */ + function setVatConfig(vatID: VatId, vatConfig: VatConfig): void { + kv.set(`${VAT_CONFIG_BASE}${vatID}`, JSON.stringify(vatConfig)); + } + + /** + * Delete the stored configuration for a vat. + * + * @param vatID - The vat whose configuration is to be deleted. + */ + function deleteVatConfig(vatID: VatId): void { + kv.delete(`${VAT_CONFIG_BASE}${vatID}`); + } + /** * Delete everything from the database. */ @@ -769,7 +888,7 @@ export function makeKernelStore(kdb: KernelDatabase) { */ function reset(): void { kdb.clear(); - runQueue = createStoredQueue('run', true); + runQueue = provideStoredQueue('run', true); nextVatId = provideCachedStoredValue('nextVatId', '1'); nextRemoteId = provideCachedStoredValue('nextRemoteId', '1'); nextObjectId = provideCachedStoredValue('nextObjectId', '1'); @@ -1048,7 +1167,7 @@ export function makeKernelStore(kdb: KernelDatabase) { maybeFreeKrefs.add(kref); } setObjectRefCount(kref, counts); - + kv.set('initialized', 'true'); return false; } @@ -1079,7 +1198,14 @@ export function makeKernelStore(kdb: KernelDatabase) { addClistEntry, forgetEref, forgetKref, + getAllVatRecords, + getVatConfig, + setVatConfig, + deleteVatConfig, + getPromisesByDecider, clear, + deleteEndpoint, + deleteVat, makeVatStore, reset, kv, @@ -1096,7 +1222,6 @@ export function makeKernelStore(kdb: KernelDatabase) { scheduleReap, incrementRefCount, decrementRefCount, - createStoredQueue, deleteClistEntry, getQueueLength, }); diff --git a/packages/kernel/test/storage.ts b/packages/kernel/test/storage.ts index e507905f1..54bba622b 100644 --- a/packages/kernel/test/storage.ts +++ b/packages/kernel/test/storage.ts @@ -1,5 +1,7 @@ import type { KVStore, KernelDatabase, VatStore } from '@ocap/store'; +/* eslint-disable no-lonely-if, no-else-return */ // stupid rules that encourage unclear code + /** * A mock key/value store realized as a Map. * @@ -18,28 +20,92 @@ export function makeMapKVStore(): KVStore { * @returns The mock {@link KVStore}. */ function makeMapKVStoreInternal(map: Map): KVStore { + let keyCache: string[] | null = null; + let lastNextKey: string | null = null; + let lastNextKeyIndex: number = -1; + /** - * Like `get`, but fail if the key isn't there. + * Binary search for key position. + * + * @param key - The key to search `keyCache` for. * - * @param key - The key to fetch. - * @returns The value at `key`. + * @returns the index into `keyCache` for the first key that is greater than + * `key`, or -1 if no such key exists. */ - function getRequired(key: string): string { - const result = map.get(key); - if (result === undefined) { - throw Error(`No value found for key ${key}.`); + function search(key: string): number { + if (keyCache === null) { + return -1; + } + let beg = 0; + let end = keyCache.length - 1; + if (key < (keyCache[beg] as string)) { + return beg; + } + if ((keyCache[end] as string) < key) { + return -1; + } + while (beg <= end) { + const mid = Math.floor((beg + end) / 2); + if (keyCache[mid] === key) { + return mid; + } + if (key < (keyCache[mid] as string)) { + end = mid - 1; + } else { + beg = mid + 1; + } + if (beg === end) { + return beg; + } } - return result; + return -1; } return { - get: map.get.bind(map), - getNextKey: (_key: string): string | undefined => { - throw Error(`mock store does not (yet) support getNextKey`); + get(key: string): string | undefined { + return map.get(key); + }, + getNextKey(key: string): string | undefined { + if (keyCache === null) { + keyCache = Array.from(map.keys()).sort(); + } + const index = lastNextKey === key ? lastNextKeyIndex : search(key); + if (index < 0) { + lastNextKey = null; + lastNextKeyIndex = -1; + return undefined; + } + lastNextKey = keyCache[index] as string; + if (key < lastNextKey) { + lastNextKeyIndex = index; + return lastNextKey; + } else { + if (index + 1 >= keyCache.length) { + lastNextKey = null; + lastNextKeyIndex = -1; + return undefined; + } else { + lastNextKey = keyCache[index + 1] as string; + lastNextKeyIndex = index + 1; + return lastNextKey; + } + } + }, + getRequired(key: string): string { + const result = map.get(key); + if (result === undefined) { + throw Error(`No value found for key ${key}.`); + } + return result; + }, + set(key: string, value: string): void { + map.set(key, value); + keyCache = null; + }, + delete(key: string): void { + map.delete(key); + keyCache = null; }, - getRequired, - set: map.set.bind(map), - delete: map.delete.bind(map), }; } @@ -77,20 +143,23 @@ function makeMapVatStore(_vatID: string): ClearableVatStore { */ export function makeMapKernelDatabase(): KernelDatabase { const map = new Map(); - const vatStores = new Set(); + const vatStores = new Map(); return { kernelKVStore: makeMapKVStoreInternal(map), clear: () => { map.clear(); - for (const vs of vatStores) { + for (const vs of vatStores.values()) { vs.clear(); } }, executeQuery: () => [], makeVatStore: (vatID: string) => { const store = makeMapVatStore(vatID); - vatStores.add(store); + vatStores.set(vatID, store); return store; }, + deleteVatStore: (vatID: string) => { + vatStores.delete(vatID); + }, }; } diff --git a/packages/store/src/sqlite/common.test.ts b/packages/store/src/sqlite/common.test.ts index fd27e0133..5fa1a4fdb 100644 --- a/packages/store/src/sqlite/common.test.ts +++ b/packages/store/src/sqlite/common.test.ts @@ -48,6 +48,7 @@ describe('SQL_QUERIES', () => { 'CREATE_TABLE_VS', 'DELETE', 'DELETE_VS', + 'DELETE_VS_ALL', 'DROP', 'DROP_VS', 'GET', diff --git a/packages/store/src/sqlite/common.ts b/packages/store/src/sqlite/common.ts index 81b62a091..860c7d042 100644 --- a/packages/store/src/sqlite/common.ts +++ b/packages/store/src/sqlite/common.ts @@ -48,6 +48,10 @@ export const SQL_QUERIES = { DELETE FROM kv_vatstore WHERE vatID = ? AND key = ? `, + DELETE_VS_ALL: ` + DELETE FROM kv_vatstore + WHERE vatID = ? + `, CLEAR: ` DELETE FROM kv `, diff --git a/packages/store/src/sqlite/nodejs.ts b/packages/store/src/sqlite/nodejs.ts index ded6a7562..bfb674f0d 100644 --- a/packages/store/src/sqlite/nodejs.ts +++ b/packages/store/src/sqlite/nodejs.ts @@ -160,6 +160,7 @@ export async function makeSQLKernelDatabase({ const sqlVatstoreGetAll = db.prepare(SQL_QUERIES.GET_ALL_VS); const sqlVatstoreSet = db.prepare(SQL_QUERIES.SET_VS); const sqlVatstoreDelete = db.prepare(SQL_QUERIES.DELETE_VS); + const sqlVatstoreDeleteAll = db.prepare(SQL_QUERIES.DELETE_VS_ALL); /** * Create a new VatStore for a vat. @@ -213,11 +214,21 @@ export async function makeSQLKernelDatabase({ }; } + /** + * Delete an entire VatStore. + * + * @param vatId - The vat whose store is to be deleted. + */ + function deleteVatStore(vatId: string): void { + sqlVatstoreDeleteAll.run(vatId); + } + return { kernelKVStore: kvStore, executeQuery: kvExecuteQuery, clear: db.transaction(kvClear), makeVatStore, + deleteVatStore, }; } diff --git a/packages/store/src/sqlite/wasm.ts b/packages/store/src/sqlite/wasm.ts index a4cb92262..dc5da9ae3 100644 --- a/packages/store/src/sqlite/wasm.ts +++ b/packages/store/src/sqlite/wasm.ts @@ -221,6 +221,7 @@ export async function makeSQLKernelDatabase({ const sqlVatstoreGetAll = db.prepare(SQL_QUERIES.GET_ALL_VS); const sqlVatstoreSet = db.prepare(SQL_QUERIES.SET_VS); const sqlVatstoreDelete = db.prepare(SQL_QUERIES.DELETE_VS); + const sqlVatstoreDeleteAll = db.prepare(SQL_QUERIES.DELETE_VS_ALL); const sqlBeginTransaction = db.prepare(SQL_QUERIES.BEGIN_TRANSACTION); const sqlCommitTransaction = db.prepare(SQL_QUERIES.COMMIT_TRANSACTION); const sqlAbortTransaction = db.prepare(SQL_QUERIES.ABORT_TRANSACTION); @@ -291,10 +292,22 @@ export async function makeSQLKernelDatabase({ }; } + /** + * Delete an entire VatStore. + * + * @param vatId - The vat whose store is to be deleted. + */ + function deleteVatStore(vatId: string): void { + sqlVatstoreDeleteAll.bind([vatId]); + sqlVatstoreDeleteAll.step(); + sqlVatstoreDeleteAll.reset(); + } + return { kernelKVStore: kvStore, clear: kvClear, executeQuery, makeVatStore, + deleteVatStore, }; } diff --git a/packages/store/src/types.ts b/packages/store/src/types.ts index 330e91f0a..204189f35 100644 --- a/packages/store/src/types.ts +++ b/packages/store/src/types.ts @@ -16,4 +16,5 @@ export type KernelDatabase = { executeQuery(sql: string): Record[]; clear(): void; makeVatStore(vatID: string): VatStore; + deleteVatStore(vatID: string): void; }; diff --git a/vitest.config.ts b/vitest.config.ts index ec1c2402b..fde0fbb67 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -82,10 +82,10 @@ export default defineConfig({ lines: 100, }, 'packages/extension/**': { - statements: 81.76, + statements: 81.02, functions: 83.85, - branches: 78.98, - lines: 81.78, + branches: 76.76, + lines: 81.03, }, 'packages/kernel/**': { statements: 82.4, @@ -106,10 +106,10 @@ export default defineConfig({ lines: 0, }, 'packages/store/**': { - statements: 94.01, - functions: 93.93, + statements: 91.9, + functions: 88.57, branches: 84.78, - lines: 93.97, + lines: 91.86, }, 'packages/streams/**': { statements: 100, From 71e14f55ed86dc6f36440509879ac3ec410a32bb Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 27 Mar 2025 12:11:48 -0700 Subject: [PATCH 2/3] chore: mergerize ya! --- packages/kernel-test/src/resume.test.ts | 2 +- packages/kernel/src/VatHandle.ts | 2 +- packages/kernel/src/store/kernel-store.test.ts | 1 - packages/kernel/src/utils/wait-quiescent.test.ts | 2 ++ vitest.config.ts | 8 ++++---- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/kernel-test/src/resume.test.ts b/packages/kernel-test/src/resume.test.ts index e33846144..8a8ff20c0 100644 --- a/packages/kernel-test/src/resume.test.ts +++ b/packages/kernel-test/src/resume.test.ts @@ -15,7 +15,7 @@ import { } from 'node:worker_threads'; import { describe, expect, it } from 'vitest'; -import { kunser } from '../../kernel/src/kernel-marshal.ts'; +import { kunser } from '../../kernel/src/services/kernel-marshal.ts'; import { NodejsVatWorkerService } from '../../nodejs/src/kernel/VatWorkerService.ts'; const origStdoutWrite = process.stdout.write.bind(process.stdout); diff --git a/packages/kernel/src/VatHandle.ts b/packages/kernel/src/VatHandle.ts index 51edaafbf..5f9d956bd 100644 --- a/packages/kernel/src/VatHandle.ts +++ b/packages/kernel/src/VatHandle.ts @@ -11,7 +11,6 @@ import type { DuplexStream } from '@ocap/streams'; import type { Logger } from '@ocap/utils'; import { makeLogger, makeCounter } from '@ocap/utils'; -import { kser } from './kernel-marshal.ts'; import type { Kernel } from './Kernel.ts'; import { VatCommandMethod } from './messages/index.ts'; import type { @@ -19,6 +18,7 @@ import type { VatCommand, VatCommandReturnType, } from './messages/index.ts'; +import { kser } from './services/kernel-marshal.ts'; import type { KernelStore } from './store/kernel-store.ts'; import { parseRef } from './store/utils/parse-ref.ts'; import type { diff --git a/packages/kernel/src/store/kernel-store.test.ts b/packages/kernel/src/store/kernel-store.test.ts index 4726a0eef..277002c92 100644 --- a/packages/kernel/src/store/kernel-store.test.ts +++ b/packages/kernel/src/store/kernel-store.test.ts @@ -85,7 +85,6 @@ describe('kernel store', () => { 'getQueueLength', 'getReachableFlag', 'getRefCount', - 'getRefCount', 'getVatConfig', 'hasCListEntry', 'incRefCount', diff --git a/packages/kernel/src/utils/wait-quiescent.test.ts b/packages/kernel/src/utils/wait-quiescent.test.ts index 1b2a2ebab..7433a22ce 100644 --- a/packages/kernel/src/utils/wait-quiescent.test.ts +++ b/packages/kernel/src/utils/wait-quiescent.test.ts @@ -41,8 +41,10 @@ describe('wait-quiescent', () => { // Create nested promise chains await Promise.resolve().then(async () => { results.push(1); + // eslint-disable-next-line promise/no-nesting await Promise.resolve().then(async () => { results.push(2); + // eslint-disable-next-line promise/no-nesting await Promise.resolve().then(() => { results.push(3); return results; diff --git a/vitest.config.ts b/vitest.config.ts index fde0fbb67..3bb1cc592 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -88,10 +88,10 @@ export default defineConfig({ lines: 81.03, }, 'packages/kernel/**': { - statements: 82.4, - functions: 88.71, - branches: 64.15, - lines: 82.51, + statements: 82.35, + functions: 90.14, + branches: 64.74, + lines: 82.46, }, 'packages/nodejs/**': { statements: 72.91, From fe328f5f526ae7768bb85e74f835313d7b16e2c3 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 27 Mar 2025 13:59:19 -0700 Subject: [PATCH 3/3] chore: get rid of a small bit of extra added additional redundancy --- packages/kernel/src/VatKVStore.ts | 43 ++----------------------- packages/kernel/src/utils/key-search.ts | 42 ++++++++++++++++++++++++ packages/kernel/test/storage.ts | 42 +++--------------------- vitest.config.ts | 6 ++-- 4 files changed, 52 insertions(+), 81 deletions(-) create mode 100644 packages/kernel/src/utils/key-search.ts diff --git a/packages/kernel/src/VatKVStore.ts b/packages/kernel/src/VatKVStore.ts index b1a79ba3c..7ede0198a 100644 --- a/packages/kernel/src/VatKVStore.ts +++ b/packages/kernel/src/VatKVStore.ts @@ -1,6 +1,7 @@ import type { KVStore } from '@ocap/store'; import type { VatCheckpoint } from './types.ts'; +import { keySearch } from './utils/key-search.ts'; export type VatKVStore = KVStore & { checkpoint(): VatCheckpoint; @@ -23,45 +24,6 @@ export function makeVatKVStore(state: Map): VatKVStore { let lastNextKey: string | null = null; let lastNextKeyIndex: number = -1; - /** - * Binary search for key position. - * I totally can't believe I have to write this in 2025. - * - * @param key - The key to search `keyCache` for. - * - * @returns the index into `keyCache` of the first key that is greater than - * `key`, or -1 if no such key exists. - */ - function search(key: string): number { - if (keyCache === null) { - // This shouldn't happen, but just in case... - return -1; - } - let beg = 0; - let end = keyCache.length - 1; - if (key < (keyCache[beg] as string)) { - return beg; - } - if ((keyCache[end] as string) < key) { - return -1; - } - while (beg <= end) { - const mid = Math.floor((beg + end) / 2); - if (keyCache[mid] === key) { - return mid; - } - if (key < (keyCache[mid] as string)) { - end = mid - 1; - } else { - beg = mid + 1; - } - if (beg === end) { - return beg; - } - } - return -1; - } - return { get(key: string): string | undefined { return state.get(key); @@ -77,7 +39,8 @@ export function makeVatKVStore(state: Map): VatKVStore { if (keyCache === null) { keyCache = Array.from(state.keys()).sort(); } - const index = lastNextKey === key ? lastNextKeyIndex : search(key); + const index = + lastNextKey === key ? lastNextKeyIndex : keySearch(keyCache, key); if (index < 0) { lastNextKey = null; lastNextKeyIndex = -1; diff --git a/packages/kernel/src/utils/key-search.ts b/packages/kernel/src/utils/key-search.ts new file mode 100644 index 000000000..9bf03b5d8 --- /dev/null +++ b/packages/kernel/src/utils/key-search.ts @@ -0,0 +1,42 @@ +/** + * Binary search for key position. + * + * I totally can't believe I have to write this in 2025, but the JS Array + * `indexOf` function does exhaustive search (O(n) instead of O(ln(N))) because + * it can't know the array is both sorted and has no undefined elements. + * + * @param arr - A sorted array of strings. + * @param key - The key to search `arr` for. + * + * @returns the index into `arr` of the first key that is greater than + * `key`, or -1 if no such key exists. + */ +export function keySearch(arr: string[], key: string): number { + if (arr === null) { + // This shouldn't happen, but just in case... + return -1; + } + let beg = 0; + let end = arr.length - 1; + if (key < (arr[beg] as string)) { + return beg; + } + if ((arr[end] as string) < key) { + return -1; + } + while (beg <= end) { + const mid = Math.floor((beg + end) / 2); + if (arr[mid] === key) { + return mid; + } + if (key < (arr[mid] as string)) { + end = mid - 1; + } else { + beg = mid + 1; + } + if (beg === end) { + return beg; + } + } + return -1; +} diff --git a/packages/kernel/test/storage.ts b/packages/kernel/test/storage.ts index 54bba622b..dc7b8edef 100644 --- a/packages/kernel/test/storage.ts +++ b/packages/kernel/test/storage.ts @@ -1,5 +1,7 @@ import type { KVStore, KernelDatabase, VatStore } from '@ocap/store'; +import { keySearch } from '../src/utils/key-search.ts'; + /* eslint-disable no-lonely-if, no-else-return */ // stupid rules that encourage unclear code /** @@ -24,43 +26,6 @@ function makeMapKVStoreInternal(map: Map): KVStore { let lastNextKey: string | null = null; let lastNextKeyIndex: number = -1; - /** - * Binary search for key position. - * - * @param key - The key to search `keyCache` for. - * - * @returns the index into `keyCache` for the first key that is greater than - * `key`, or -1 if no such key exists. - */ - function search(key: string): number { - if (keyCache === null) { - return -1; - } - let beg = 0; - let end = keyCache.length - 1; - if (key < (keyCache[beg] as string)) { - return beg; - } - if ((keyCache[end] as string) < key) { - return -1; - } - while (beg <= end) { - const mid = Math.floor((beg + end) / 2); - if (keyCache[mid] === key) { - return mid; - } - if (key < (keyCache[mid] as string)) { - end = mid - 1; - } else { - beg = mid + 1; - } - if (beg === end) { - return beg; - } - } - return -1; - } - return { get(key: string): string | undefined { return map.get(key); @@ -69,7 +34,8 @@ function makeMapKVStoreInternal(map: Map): KVStore { if (keyCache === null) { keyCache = Array.from(map.keys()).sort(); } - const index = lastNextKey === key ? lastNextKeyIndex : search(key); + const index = + lastNextKey === key ? lastNextKeyIndex : keySearch(keyCache, key); if (index < 0) { lastNextKey = null; lastNextKeyIndex = -1; diff --git a/vitest.config.ts b/vitest.config.ts index 3bb1cc592..bb273e399 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -88,10 +88,10 @@ export default defineConfig({ lines: 81.03, }, 'packages/kernel/**': { - statements: 82.35, + statements: 82.52, functions: 90.14, - branches: 64.74, - lines: 82.46, + branches: 65.15, + lines: 82.63, }, 'packages/nodejs/**': { statements: 72.91,