Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/controllers/src/plugins/PluginController.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe('PluginController Controller', () => {
it('can create a worker and plugin controller', async () => {
const workerExecutionEnvironment = new WebWorkerExecutionEnvironmentService(
{
setupWorkerConnection: jest.fn(),
setupPluginProvider: jest.fn(),
workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))),
},
);
Expand Down Expand Up @@ -45,7 +45,7 @@ describe('PluginController Controller', () => {
it('can add a plugin and use its JSON-RPC api with a WebWorkerExecutionEnvironmentService', async () => {
const webWorkerExecutionEnvironment = new WebWorkerExecutionEnvironmentService(
{
setupWorkerConnection: jest.fn(),
setupPluginProvider: jest.fn(),
workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))),
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe('Worker Controller', () => {
it('can boot', async () => {
const webWorkerExecutionEnvironmentService = new WebWorkerExecutionEnvironmentService(
{
setupWorkerConnection: () => {
setupPluginProvider: () => {
// do nothing
},
workerUrl: new URL('https://foo.bar.baz'),
Expand All @@ -21,7 +21,7 @@ describe('Worker Controller', () => {
it('can create a plugin worker and start the plugin', async () => {
const webWorkerExecutionEnvironmentService = new WebWorkerExecutionEnvironmentService(
{
setupWorkerConnection: () => {
setupPluginProvider: () => {
// do nothing
},
workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import {
} from 'json-rpc-engine';
import { ExecutionEnvironmentService } from '../services/ExecutionEnvironmentService';

export type SetupWorkerConnection = (stream: Duplex) => void;
export type SetupPluginProvider = (pluginName: string, stream: Duplex) => void;

interface WorkerControllerArgs {
setupWorkerConnection: SetupWorkerConnection;
setupPluginProvider: SetupPluginProvider;
workerUrl: URL;
}

interface WorkerStreams {
command: Duplex;
rpc: Duplex;
rpc: Duplex | null;
_connection: WorkerParentPostMessageStream;
}

Expand All @@ -34,7 +34,7 @@ export type PluginRpcHook = (
) => Promise<unknown>;

interface WorkerWrapper {
workerId: string;
id: string;
streams: WorkerStreams;
rpcEngine: JsonRpcEngine;
worker: Worker;
Expand All @@ -50,33 +50,33 @@ export class WebWorkerExecutionEnvironmentService

private workers: Map<string, WorkerWrapper>;

private setupWorkerConnection: SetupWorkerConnection;
private setupPluginProvider: SetupPluginProvider;

private pluginToWorkerMap: Map<string, string>;

private workerToPluginMap: Map<string, string>;

constructor({ setupWorkerConnection, workerUrl }: WorkerControllerArgs) {
constructor({ setupPluginProvider, workerUrl }: WorkerControllerArgs) {
this.workerUrl = workerUrl;
this.setupWorkerConnection = setupWorkerConnection;
this.setupPluginProvider = setupPluginProvider;
this.store = new ObservableStore({ workers: {} });
this.workers = new Map();
this.pluginToWorkerMap = new Map();
this.workerToPluginMap = new Map();
this._pluginRpcHooks = new Map();
}

_setWorker(workerId: string, workerObj: WorkerWrapper): void {
this.workers.set(workerId, workerObj);
private _setWorker(workerId: string, workerWrapper: WorkerWrapper): void {
this.workers.set(workerId, workerWrapper);

const newWorkerState = {
...(this.store.getState().workers as Record<string, WorkerWrapper>),
[workerId]: workerObj,
[workerId]: workerWrapper,
};
this.store.updateState({ workers: newWorkerState });
}

_deleteWorker(workerId: string): void {
private _deleteWorker(workerId: string): void {
this.workers.delete(workerId);

const newWorkerState = {
Expand All @@ -94,13 +94,13 @@ export class WebWorkerExecutionEnvironmentService
throw new Error('Must send object.');
}

const workerObj = this.workers.get(workerId);
if (!workerObj) {
const workerWrapper = this.workers.get(workerId);
if (!workerWrapper) {
throw new Error(`Worker with id ${workerId} not found.`);
}

console.log('Parent: Sending Command', message);
const response: PendingJsonRpcResponse<unknown> = await workerObj.rpcEngine.handle(
const response: PendingJsonRpcResponse<unknown> = await workerWrapper.rpcEngine.handle(
message,
);
if (response.error) {
Expand All @@ -123,20 +123,20 @@ export class WebWorkerExecutionEnvironmentService
}

terminate(workerId: string): void {
const workerObj = this.workers.get(workerId);
if (!workerObj) {
const workerWrapper = this.workers.get(workerId);
if (!workerWrapper) {
throw new Error(`Worked with id "${workerId}" not found.`);
}

Object.values(workerObj.streams).forEach((stream) => {
Object.values(workerWrapper.streams).forEach((stream) => {
try {
!stream.destroyed && stream.destroy();
stream.removeAllListeners();
} catch (err) {
console.log('Error while destroying stream', err);
}
});
workerObj.worker.terminate();
workerWrapper.worker.terminate();
this._removePluginAndWorkerMapping(workerId);
this._deleteWorker(workerId);
console.log(`worker:${workerId} terminated`);
Expand Down Expand Up @@ -182,16 +182,20 @@ export class WebWorkerExecutionEnvironmentService
);
}

const _workerId = await this._initWorker();
this._mapPluginAndWorker(pluginData.pluginName, _workerId);
const worker = await this._initWorker();
this._mapPluginAndWorker(pluginData.pluginName, worker.id);
this.setupPluginProvider(
pluginData.pluginName,
(worker.streams.rpc as unknown) as Duplex,
);

const result = await this._command(_workerId, {
const result = await this._command(worker.id, {
jsonrpc: '2.0',
method: 'installPlugin',
params: pluginData,
id: nanoid(),
});
this._createPluginHooks(pluginData.pluginName, _workerId);
this._createPluginHooks(pluginData.pluginName, worker.id);
return result;
}

Expand Down Expand Up @@ -224,9 +228,7 @@ export class WebWorkerExecutionEnvironmentService
this.pluginToWorkerMap.delete(pluginName);
}

async _initWorker(): Promise<string> {
console.log('_initWorker');

private async _initWorker(): Promise<WorkerWrapper> {
const workerId = nanoid();
const worker = new Worker(this.workerUrl, {
name: workerId,
Expand All @@ -240,18 +242,21 @@ export class WebWorkerExecutionEnvironmentService

rpcEngine.push(jsonRpcConnection.middleware);

this._setWorker(workerId, {
workerId,
const workerWrapper = {
id: workerId,
streams,
rpcEngine,
worker,
});
};
this._setWorker(workerId, workerWrapper);

await this._command(workerId, {
jsonrpc: '2.0',
method: 'ping',
id: nanoid(),
});
return workerId;

return workerWrapper;
}

_initWorkerStreams(worker: Worker, workerId: string): WorkerStreams {
Expand All @@ -265,19 +270,19 @@ export class WebWorkerExecutionEnvironmentService
const commandStream = mux.createStream(PLUGIN_STREAM_NAMES.COMMAND);

const rpcStream = mux.createStream(PLUGIN_STREAM_NAMES.JSON_RPC);
this.setupWorkerConnection((rpcStream as unknown) as Duplex);

// Typecast justification: stream type mismatch
// Typecast: stream type mismatch
return {
command: (commandStream as unknown) as Duplex,
rpc: (rpcStream as unknown) as Duplex,
rpc: rpcStream,
_connection: workerStream,
};
}
}

/**
* Sets up stream multiplexing for the given stream
* Sets up stream multiplexing for the given stream.
*
* @param {any} connectionStream - the stream to mux
* @param {string} streamName - the name of the stream, for identification in errors
* @return {stream.Stream} the multiplexed stream
Expand All @@ -289,7 +294,7 @@ function setupMultiplex(
const mux = new ObjectMultiplex();
pump(
connectionStream,
// Typecast justification: stream type mismatch
// Typecast: stream type mismatch
(mux as unknown) as Duplex,
connectionStream,
(err) => {
Expand Down