Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions packages/extension/src/kernel-integration/VatWorkerClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import { isJsonRpcResponse } from '@metamask/utils';
import type {
JsonRpcId,
JsonRpcRequest,
JsonRpcResponse,
} from '@metamask/utils';
import type { JsonRpcId, JsonRpcResponse } from '@metamask/utils';
import type { VatWorkerManager, VatId, VatConfig } from '@ocap/kernel';
import { vatWorkerServiceMethodSpecs } from '@ocap/kernel/rpc';
import { RpcClient } from '@ocap/rpc-methods';
Expand All @@ -16,7 +12,7 @@ import type {
PostMessageEnvelope,
PostMessageTarget,
} from '@ocap/streams/browser';
import type { JsonRpcMessage, Logger } from '@ocap/utils';
import type { JsonRpcCall, JsonRpcMessage, Logger } from '@ocap/utils';
import { isJsonRpcMessage, makeLogger, stringify } from '@ocap/utils';

// Appears in the docs.
Expand All @@ -25,7 +21,7 @@ import type { ExtensionVatWorkerService } from './VatWorkerServer.ts';

export type VatWorkerClientStream = PostMessageDuplexStream<
MessageEvent<JsonRpcResponse>,
PostMessageEnvelope<JsonRpcRequest>
PostMessageEnvelope<JsonRpcCall>
>;

export class ExtensionVatWorkerClient implements VatWorkerManager {
Expand Down Expand Up @@ -61,8 +57,10 @@ export class ExtensionVatWorkerClient implements VatWorkerManager {
this.#rpcClient = new RpcClient(
vatWorkerServiceMethodSpecs,
async (request) => {
if (request.method === 'launch') {
this.#portMap.set(request.id, undefined);
if ('id' in request) {
if (request.method === 'launch') {
this.#portMap.set(request.id, undefined);
}
}
await this.#stream.write({ payload: request, transfer: [] });
},
Expand Down
14 changes: 9 additions & 5 deletions packages/extension/src/kernel-integration/kernel-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { JsonRpcEngine } from '@metamask/json-rpc-engine';
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
import { isJsonRpcRequest } from '@metamask/utils';
import type { ClusterConfig } from '@ocap/kernel';
import { ClusterConfigStruct, Kernel } from '@ocap/kernel';
import { makeSQLKernelDatabase } from '@ocap/store/sqlite/wasm';
Expand All @@ -9,7 +8,8 @@ import {
MessagePortDuplexStream,
receiveMessagePort,
} from '@ocap/streams/browser';
import { fetchValidatedJson, Logger } from '@ocap/utils';
import { fetchValidatedJson, isJsonRpcCall, Logger } from '@ocap/utils';
import type { JsonRpcCall } from '@ocap/utils';

import { makeLoggingMiddleware } from './middleware/logging.ts';
import { createPanelMessageMiddleware } from './middleware/panel-message.ts';
Expand Down Expand Up @@ -39,9 +39,9 @@ async function main(): Promise<void> {
);

const kernelStream = await MessagePortDuplexStream.make<
JsonRpcRequest,
JsonRpcCall,
JsonRpcResponse
>(port, isJsonRpcRequest);
>(port, isJsonRpcCall);

// Initialize kernel dependencies
const vatWorkerClient = ExtensionVatWorkerClient.make(
Expand All @@ -63,7 +63,11 @@ async function main(): Promise<void> {
const kernelEngine = new JsonRpcEngine();
kernelEngine.push(makeLoggingMiddleware(logger.subLogger('kernel-command')));
kernelEngine.push(createPanelMessageMiddleware(kernel, kernelDatabase));
receiveUiConnections(async (request) => kernelEngine.handle(request), logger);
// JsonRpcEngine type error: does not handle JSON-RPC notifications
receiveUiConnections(
async (request) => kernelEngine.handle(request as JsonRpcRequest),
logger,
);
const launchDefaultSubcluster = firstTime || ALWAYS_RESET_STORAGE;

const defaultSubcluster = await fetchValidatedJson<ClusterConfig>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import type { PostMessageTarget } from '@ocap/streams/browser';
import { delay } from '@ocap/test-utils';
import { TestDuplexStream } from '@ocap/test-utils/streams';
import type { Logger } from '@ocap/utils';
import type { JsonRpcCall, Logger } from '@ocap/utils';
import { describe, it, expect, vi, beforeEach } from 'vitest';

import {
Expand Down Expand Up @@ -161,7 +161,7 @@ describe('ui-connections', () => {
const logger = makeMockLogger();

const mockHandleMessage = vi.fn(
async (_request: JsonRpcRequest): Promise<JsonRpcResponse> => ({
async (_request: JsonRpcCall): Promise<JsonRpcResponse> => ({
id: 'foo',
jsonrpc: '2.0' as const,
result: { vats: [], clusterConfig },
Expand Down
14 changes: 6 additions & 8 deletions packages/extension/src/kernel-integration/ui-connections.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import { PostMessageDuplexStream } from '@ocap/streams/browser';
import { stringify } from '@ocap/utils';
import type { Logger } from '@ocap/utils';
import type { JsonRpcCall, Logger } from '@ocap/utils';
import { nanoid } from 'nanoid';

import { isUiControlCommand } from './ui-control-command.ts';
Expand All @@ -11,18 +11,16 @@ import type { UiControlCommand } from './ui-control-command.ts';
export const UI_CONTROL_CHANNEL_NAME = 'ui-control';

export type KernelControlStream = PostMessageDuplexStream<
JsonRpcRequest,
JsonRpcCall,
JsonRpcResponse
>;

export type KernelControlReplyStream = PostMessageDuplexStream<
JsonRpcResponse,
JsonRpcRequest
JsonRpcCall
>;

type HandleInstanceMessage = (
request: JsonRpcRequest,
) => Promise<JsonRpcResponse>;
type HandleInstanceMessage = (request: JsonRpcCall) => Promise<JsonRpcResponse>;

/**
* Establishes a connection between a UI instance and the kernel. Should be called
Expand All @@ -45,7 +43,7 @@ export const establishKernelConnection = async (

const kernelStream = await PostMessageDuplexStream.make<
JsonRpcResponse,
JsonRpcRequest
JsonRpcCall
>({
validateInput: isJsonRpcResponse,
messageTarget: instanceChannel,
Expand Down
15 changes: 8 additions & 7 deletions packages/extension/src/offscreen.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
import { isJsonRpcResponse } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import type { DuplexStream } from '@ocap/streams';
import {
initializeMessageChannel,
ChromeRuntimeDuplexStream,
MessagePortDuplexStream,
} from '@ocap/streams/browser';
import type { PostMessageTarget } from '@ocap/streams/browser';
import { delay, Logger } from '@ocap/utils';
import { delay, isJsonRpcCall, Logger } from '@ocap/utils';
import type { JsonRpcCall } from '@ocap/utils';

import { makeIframeVatWorker } from './kernel-integration/iframe-vat-worker.ts';
import { ExtensionVatWorkerService } from './kernel-integration/VatWorkerServer.ts';
Expand All @@ -25,9 +26,9 @@ async function main(): Promise<void> {

// Create stream for messages from the background script
const backgroundStream = await ChromeRuntimeDuplexStream.make<
JsonRpcRequest,
JsonRpcCall,
JsonRpcResponse
>(chrome.runtime, 'offscreen', 'background', isJsonRpcRequest);
>(chrome.runtime, 'offscreen', 'background', isJsonRpcCall);

const { kernelStream, vatWorkerService } = await makeKernelWorker();

Expand All @@ -45,7 +46,7 @@ async function main(): Promise<void> {
* @returns The message port stream for worker communication
*/
async function makeKernelWorker(): Promise<{
kernelStream: DuplexStream<JsonRpcResponse, JsonRpcRequest>;
kernelStream: DuplexStream<JsonRpcResponse, JsonRpcCall>;
vatWorkerService: ExtensionVatWorkerService;
}> {
const worker = new Worker('kernel-worker.js', { type: 'module' });
Expand All @@ -56,7 +57,7 @@ async function makeKernelWorker(): Promise<{

const kernelStream = await MessagePortDuplexStream.make<
JsonRpcResponse,
JsonRpcRequest
JsonRpcCall
>(port, isJsonRpcResponse);

const vatWorkerService = ExtensionVatWorkerService.make(
Expand Down
36 changes: 21 additions & 15 deletions packages/kernel/src/Kernel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { CapData } from '@endo/marshal';
import { serializeError } from '@metamask/rpc-errors';
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
import { hasProperty } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import {
StreamReadError,
VatAlreadyExistsError,
Expand All @@ -11,6 +12,7 @@ import type { ExtractParams, ExtractResult } from '@ocap/rpc-methods';
import type { KernelDatabase } from '@ocap/store';
import type { DuplexStream } from '@ocap/streams';
import { Logger } from '@ocap/utils';
import type { JsonRpcCall } from '@ocap/utils';

import { KernelQueue } from './KernelQueue.ts';
import { KernelRouter } from './KernelRouter.ts';
Expand All @@ -33,7 +35,7 @@ import { VatHandle } from './VatHandle.ts';

export class Kernel {
/** Command channel from the controlling console/browser extension/test driver */
readonly #commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>;
readonly #commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>;

readonly #rpcService: RpcService<typeof kernelHandlers>;

Expand Down Expand Up @@ -70,7 +72,7 @@ export class Kernel {
*/
// eslint-disable-next-line no-restricted-syntax
private constructor(
commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>,
commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>,
vatWorkerService: VatWorkerManager,
kernelDatabase: KernelDatabase,
options: {
Expand Down Expand Up @@ -108,7 +110,7 @@ export class Kernel {
* @returns A promise for the new kernel instance.
*/
static async make(
commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>,
commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>,
vatWorkerService: VatWorkerManager,
kernelDatabase: KernelDatabase,
options: {
Expand Down Expand Up @@ -155,25 +157,29 @@ export class Kernel {
*
* @param message - The message to handle.
*/
async #handleCommandMessage(message: JsonRpcRequest): Promise<void> {
async #handleCommandMessage(message: JsonRpcCall): Promise<void> {
try {
this.#rpcService.assertHasMethod(message.method);
const result = await this.#rpcService.execute(
message.method,
message.params,
);
await this.#commandStream.write({
id: message.id,
jsonrpc: '2.0',
result,
});
if (hasProperty(message, 'id') && typeof message.id === 'string') {
await this.#commandStream.write({
id: message.id,
jsonrpc: '2.0',
result,
});
}
} catch (error) {
this.#logger.error('Error executing command', error);
await this.#commandStream.write({
id: message.id,
jsonrpc: '2.0',
error: serializeError(error),
});
if (hasProperty(message, 'id') && typeof message.id === 'string') {
await this.#commandStream.write({
id: message.id,
jsonrpc: '2.0',
error: serializeError(error),
});
}
}
}

Expand Down
10 changes: 7 additions & 3 deletions packages/kernel/src/rpc/kernel/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import type { MethodRequest } from '@ocap/rpc-methods';
import type {
HandlerRecord,
MethodRequest,
MethodSpecRecord,
} from '@ocap/rpc-methods';

import { pingHandler, pingSpec } from '../vat/ping.ts';

export const kernelHandlers = {
ping: pingHandler,
} as const;
} as HandlerRecord<typeof pingHandler>;

export const kernelMethodSpecs = {
ping: pingSpec,
} as const;
} as MethodSpecRecord<typeof pingSpec>;

type Handlers = (typeof kernelHandlers)[keyof typeof kernelHandlers];

Expand Down
6 changes: 4 additions & 2 deletions packages/kernel/src/rpc/vat-syscall/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import type { MethodSpecRecord, HandlerRecord } from '@ocap/rpc-methods';

import { vatSyscallSpec, vatSyscallHandler } from './vat-syscall.ts';

export const vatSyscallHandlers = {
syscall: vatSyscallHandler,
} as const;
} as HandlerRecord<typeof vatSyscallHandler>;

export const vatSyscallMethodSpecs = {
syscall: vatSyscallSpec,
} as const;
} as MethodSpecRecord<typeof vatSyscallSpec>;

type Handlers = (typeof vatSyscallHandlers)[keyof typeof vatSyscallHandlers];

Expand Down
27 changes: 27 additions & 0 deletions packages/rpc-methods/src/RpcClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ describe('RpcClient', () => {
});
});

describe('notify', () => {
it('should call a notification method', async () => {
const sendMessage = vi.fn(async () => Promise.resolve());
const client = new RpcClient(getMethods(), sendMessage, 'test');
await client.notify('method3', ['test']);
expect(sendMessage).toHaveBeenCalledWith({
jsonrpc: jsonrpc2,
method: 'method3',
params: ['test'],
});
});

it('should log an error if the message fails to send', async () => {
const logger = makeLogger('[test]');
const sendMessage = vi.fn(async () =>
Promise.reject(new Error('test error')),
);
const client = new RpcClient(getMethods(), sendMessage, 'test', logger);
const logError = vi.spyOn(logger, 'error');
await client.notify('method3', ['test']);
expect(logError).toHaveBeenCalledWith(
'Failed to send notification',
new Error('test error'),
);
});
});

describe('callAndGetId', () => {
it('should call a method and return the id', async () => {
const client = new RpcClient(getMethods(), vi.fn(), 'test');
Expand Down
Loading
Loading