diff --git a/packages/controllers/src/plugins/PluginController.test.ts b/packages/controllers/src/plugins/PluginController.test.ts index 73fedaa458..c38ed708a2 100644 --- a/packages/controllers/src/plugins/PluginController.test.ts +++ b/packages/controllers/src/plugins/PluginController.test.ts @@ -13,7 +13,7 @@ describe('PluginController Controller', () => { it('can create a worker and plugin controller', async () => { const workerExecutionEnvironment = new WebWorkerExecutionEnvironmentService( { - setupWorkerConnection: jest.fn(), + setupPluginProvider: jest.fn(), workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))), }, ); @@ -45,7 +45,7 @@ describe('PluginController Controller', () => { it('can add a plugin and use its JSON-RPC api with a WebWorkerExecutionEnvironmentService', async () => { const webWorkerExecutionEnvironment = new WebWorkerExecutionEnvironmentService( { - setupWorkerConnection: jest.fn(), + setupPluginProvider: jest.fn(), workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))), }, ); diff --git a/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.test.ts b/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.test.ts index b9d46143fa..2e3cb3103d 100644 --- a/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.test.ts +++ b/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.test.ts @@ -10,7 +10,7 @@ describe('Worker Controller', () => { it('can boot', async () => { const webWorkerExecutionEnvironmentService = new WebWorkerExecutionEnvironmentService( { - setupWorkerConnection: () => { + setupPluginProvider: () => { // do nothing }, workerUrl: new URL('https://foo.bar.baz'), @@ -21,7 +21,7 @@ describe('Worker Controller', () => { it('can create a plugin worker and start the plugin', async () => { const webWorkerExecutionEnvironmentService = new WebWorkerExecutionEnvironmentService( { - setupWorkerConnection: () => { + setupPluginProvider: () => { // do nothing }, workerUrl: new URL(URL.createObjectURL(new Blob([workerCode]))), diff --git a/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.ts b/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.ts index 6a7a40c304..20eeeb3f0c 100644 --- a/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.ts +++ b/packages/controllers/src/services/WebWorkerExecutionEnvironmentService.ts @@ -14,16 +14,16 @@ import { } from 'json-rpc-engine'; import { ExecutionEnvironmentService } from '../services/ExecutionEnvironmentService'; -export type SetupWorkerConnection = (stream: Duplex) => void; +export type SetupPluginProvider = (pluginName: string, stream: Duplex) => void; interface WorkerControllerArgs { - setupWorkerConnection: SetupWorkerConnection; + setupPluginProvider: SetupPluginProvider; workerUrl: URL; } interface WorkerStreams { command: Duplex; - rpc: Duplex; + rpc: Duplex | null; _connection: WorkerParentPostMessageStream; } @@ -34,7 +34,7 @@ export type PluginRpcHook = ( ) => Promise; interface WorkerWrapper { - workerId: string; + id: string; streams: WorkerStreams; rpcEngine: JsonRpcEngine; worker: Worker; @@ -50,15 +50,15 @@ export class WebWorkerExecutionEnvironmentService private workers: Map; - private setupWorkerConnection: SetupWorkerConnection; + private setupPluginProvider: SetupPluginProvider; private pluginToWorkerMap: Map; private workerToPluginMap: Map; - constructor({ setupWorkerConnection, workerUrl }: WorkerControllerArgs) { + constructor({ setupPluginProvider, workerUrl }: WorkerControllerArgs) { this.workerUrl = workerUrl; - this.setupWorkerConnection = setupWorkerConnection; + this.setupPluginProvider = setupPluginProvider; this.store = new ObservableStore({ workers: {} }); this.workers = new Map(); this.pluginToWorkerMap = new Map(); @@ -66,17 +66,17 @@ export class WebWorkerExecutionEnvironmentService this._pluginRpcHooks = new Map(); } - _setWorker(workerId: string, workerObj: WorkerWrapper): void { - this.workers.set(workerId, workerObj); + private _setWorker(workerId: string, workerWrapper: WorkerWrapper): void { + this.workers.set(workerId, workerWrapper); const newWorkerState = { ...(this.store.getState().workers as Record), - [workerId]: workerObj, + [workerId]: workerWrapper, }; this.store.updateState({ workers: newWorkerState }); } - _deleteWorker(workerId: string): void { + private _deleteWorker(workerId: string): void { this.workers.delete(workerId); const newWorkerState = { @@ -94,13 +94,13 @@ export class WebWorkerExecutionEnvironmentService throw new Error('Must send object.'); } - const workerObj = this.workers.get(workerId); - if (!workerObj) { + const workerWrapper = this.workers.get(workerId); + if (!workerWrapper) { throw new Error(`Worker with id ${workerId} not found.`); } console.log('Parent: Sending Command', message); - const response: PendingJsonRpcResponse = await workerObj.rpcEngine.handle( + const response: PendingJsonRpcResponse = await workerWrapper.rpcEngine.handle( message, ); if (response.error) { @@ -123,12 +123,12 @@ export class WebWorkerExecutionEnvironmentService } terminate(workerId: string): void { - const workerObj = this.workers.get(workerId); - if (!workerObj) { + const workerWrapper = this.workers.get(workerId); + if (!workerWrapper) { throw new Error(`Worked with id "${workerId}" not found.`); } - Object.values(workerObj.streams).forEach((stream) => { + Object.values(workerWrapper.streams).forEach((stream) => { try { !stream.destroyed && stream.destroy(); stream.removeAllListeners(); @@ -136,7 +136,7 @@ export class WebWorkerExecutionEnvironmentService console.log('Error while destroying stream', err); } }); - workerObj.worker.terminate(); + workerWrapper.worker.terminate(); this._removePluginAndWorkerMapping(workerId); this._deleteWorker(workerId); console.log(`worker:${workerId} terminated`); @@ -182,16 +182,20 @@ export class WebWorkerExecutionEnvironmentService ); } - const _workerId = await this._initWorker(); - this._mapPluginAndWorker(pluginData.pluginName, _workerId); + const worker = await this._initWorker(); + this._mapPluginAndWorker(pluginData.pluginName, worker.id); + this.setupPluginProvider( + pluginData.pluginName, + (worker.streams.rpc as unknown) as Duplex, + ); - const result = await this._command(_workerId, { + const result = await this._command(worker.id, { jsonrpc: '2.0', method: 'installPlugin', params: pluginData, id: nanoid(), }); - this._createPluginHooks(pluginData.pluginName, _workerId); + this._createPluginHooks(pluginData.pluginName, worker.id); return result; } @@ -224,9 +228,7 @@ export class WebWorkerExecutionEnvironmentService this.pluginToWorkerMap.delete(pluginName); } - async _initWorker(): Promise { - console.log('_initWorker'); - + private async _initWorker(): Promise { const workerId = nanoid(); const worker = new Worker(this.workerUrl, { name: workerId, @@ -240,18 +242,21 @@ export class WebWorkerExecutionEnvironmentService rpcEngine.push(jsonRpcConnection.middleware); - this._setWorker(workerId, { - workerId, + const workerWrapper = { + id: workerId, streams, rpcEngine, worker, - }); + }; + this._setWorker(workerId, workerWrapper); + await this._command(workerId, { jsonrpc: '2.0', method: 'ping', id: nanoid(), }); - return workerId; + + return workerWrapper; } _initWorkerStreams(worker: Worker, workerId: string): WorkerStreams { @@ -265,19 +270,19 @@ export class WebWorkerExecutionEnvironmentService const commandStream = mux.createStream(PLUGIN_STREAM_NAMES.COMMAND); const rpcStream = mux.createStream(PLUGIN_STREAM_NAMES.JSON_RPC); - this.setupWorkerConnection((rpcStream as unknown) as Duplex); - // Typecast justification: stream type mismatch + // Typecast: stream type mismatch return { command: (commandStream as unknown) as Duplex, - rpc: (rpcStream as unknown) as Duplex, + rpc: rpcStream, _connection: workerStream, }; } } /** - * Sets up stream multiplexing for the given stream + * Sets up stream multiplexing for the given stream. + * * @param {any} connectionStream - the stream to mux * @param {string} streamName - the name of the stream, for identification in errors * @return {stream.Stream} the multiplexed stream @@ -289,7 +294,7 @@ function setupMultiplex( const mux = new ObjectMultiplex(); pump( connectionStream, - // Typecast justification: stream type mismatch + // Typecast: stream type mismatch (mux as unknown) as Duplex, connectionStream, (err) => {