diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index ae991cca5..c7aaca0b6 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -56,6 +56,7 @@ import { UnConfiguredMethodError, UnknownCallbackIdError, } from './communication-errors.js'; +import { RemoteValue } from '../value-signal.js'; export interface ConfigEnvironmentRecord extends EnvironmentRecord { registerMessageHandler?: boolean; @@ -197,9 +198,9 @@ export class Communication { serviceComConfig: ServiceComConfig = {}, ): AsyncApi { return new Proxy(Object.create(null), { - get: (obj, method) => { + get: (runtimeCache, key) => { // let js runtime know that this is not thenable object - if (method === 'then') { + if (key === 'then') { return undefined; } @@ -208,29 +209,103 @@ export class Communication { * they used by the debugger and cause messages to be sent everywhere * this behavior made debugging very hard and can cause errors and infinite loops */ - if (Object.hasOwn(Object.prototype, method)) { - return Reflect.get(Object.prototype, method); + if (Object.hasOwn(Object.prototype, key)) { + return Reflect.get(Object.prototype, key); } - if (typeof method === 'string') { - let runtimeMethod = obj[method]; - if (!runtimeMethod) { - runtimeMethod = async (...args: unknown[]) => + if (typeof key === 'string') { + let runtimeValue = runtimeCache[key]; + if (!runtimeValue) { + runtimeValue = async (...args: unknown[]) => this.callMethod( (await instanceToken).id, api, - method, + key, args, this.rootEnvId, serviceComConfig as Record, ); - obj[method] = runtimeMethod; + //////////// Signal handling //////////// + runtimeCache[key] = Object.assign( + runtimeValue, + this.createAsyncRemoteValue(key, serviceComConfig, instanceToken, api), + ); } - return runtimeMethod; + return runtimeValue; } }, }); } + private createAsyncRemoteValue( + key: string, + serviceComConfig: ServiceComConfig, + instanceToken: EnvironmentInstanceToken | Promise, + api: string, + ) { + const subSignalId = key + '.' + 'subscribe'; + const unsubSignalId = key + '.' + 'unsubscribe'; + const streamSignalId = key + '.' + 'stream'; + const getValueId = key + '.getValue'; + + (serviceComConfig as Record)[subSignalId] = { + emitOnly: true, + listener: true, + }; + (serviceComConfig as Record)[unsubSignalId] = { + emitOnly: true, + removeListener: subSignalId, + }; + (serviceComConfig as Record)[streamSignalId] = { + emitOnly: true, + listener: true, + }; + + const asyncRemoteValue = { + subscribe: async (...args: unknown[]) => { + return this.callMethod( + (await instanceToken).id, + api, + subSignalId, + args, + this.rootEnvId, + serviceComConfig as Record, + ); + }, + unsubscribe: async (fn: UnknownFunction) => { + return this.callMethod( + (await instanceToken).id, + api, + unsubSignalId, + [fn], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + stream: async (fn: UnknownFunction) => { + return this.callMethod( + (await instanceToken).id, + api, + streamSignalId, + [fn], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + getValue: async () => { + return this.callMethod( + (await instanceToken).id, + api, + getValueId, + [], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + }; + + return asyncRemoteValue; + } + /** * Add local handle event listener to Target. */ @@ -640,9 +715,18 @@ export class Communication { private apiCall(origin: string, api: string, method: string, args: unknown[]): unknown { if (this.apisOverrides[api]?.[method]) { - return this.apisOverrides[api][method](...[origin, ...args]); + return (this.apisOverrides[api][method] as UnknownFunction)(...[origin, ...args]); + } + if (method.includes('.')) { + const [apiName, methodName] = method.split('.') as [ + string, + 'subscribe' | 'unsubscribe' | 'getValue' | 'stream', + ]; + const signal = this.apis[api]![apiName] as RemoteValue; + const fnAsrgs = args as [UnknownFunction]; + return signal[methodName](...fnAsrgs); } - return this.apis[api]![method]!(...args); + return (this.apis[api]![method] as UnknownFunction)(...args); } private unhandledMessage(message: Message): void { diff --git a/packages/core/src/com/types.ts b/packages/core/src/com/types.ts index 6104dfc05..e7146b73d 100644 --- a/packages/core/src/com/types.ts +++ b/packages/core/src/com/types.ts @@ -1,4 +1,5 @@ import { SERVICE_CONFIG } from '../symbols.js'; +import { AsyncRemoteValue, RemoteValue } from '../value-signal.js'; import { Message } from './message-types.js'; export type SerializableArguments = unknown[]; @@ -64,7 +65,9 @@ export type AsyncApi = { ? T[P] : T[P] extends (...args: infer Args) => infer R ? (...args: Args) => Promise - : never; + : T[P] extends RemoteValue + ? AsyncRemoteValue + : never; } & { [K in Extract]: never; }; @@ -118,5 +121,5 @@ export interface APIService { } export interface RemoteAPIServicesMapping { - [remoteServiceId: string]: Record; + [remoteServiceId: string]: Record>; } diff --git a/packages/core/src/entities/service.ts b/packages/core/src/entities/service.ts index 946d6f46f..05d208d58 100644 --- a/packages/core/src/entities/service.ts +++ b/packages/core/src/entities/service.ts @@ -66,14 +66,13 @@ export class Service< providedFrom.has(Universal.env) || hasIntersection(providedFrom, runtimeEngine.runningEnvNames); if (shouldIncludeService) { if (!providedValue) { - throw new Error( - `Service is not provided at runtime. + throw new Error(` +Service is not provided at runtime. Make sure the environment setup file exists and named correctly: [featureName].[envName].env.[ext] Service name: ${entityKey} Feature id: ${featureID} Environment: ${runtimeEngine.entryEnvironment.env} - `, - ); +`); } communication.registerAPI({ id: serviceKey }, providedValue); return providedValue; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index bd8f382f8..e7145f2bc 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,6 +9,7 @@ export * from './runtime-feature.js'; export * from './symbols.js'; export * from './types.js'; export * from './communication.feature.js'; +export * from './value-signal.js'; export * from './runtime-main.js'; export * from './runtime-configurations.js'; diff --git a/packages/core/src/value-signal.ts b/packages/core/src/value-signal.ts new file mode 100644 index 000000000..341d19079 --- /dev/null +++ b/packages/core/src/value-signal.ts @@ -0,0 +1,50 @@ +export type Listener = (data: T, version: number) => void; +export type AsyncRemoteValue = { + getValue: () => Promise; + stream: (handler: Listener) => void; + subscribe: (handler: Listener) => void; + unsubscribe: (handler: Listener) => void; +}; + +export type RemotelyAccessibleSignalMethods = 'subscribe' | 'unsubscribe' | 'getValue'; + +export class RemoteValue { + private handlers = new Set>(); + private value: T; + private version: number = 0; + + constructor(initialValue: T) { + this.value = initialValue; + } + + getValue = (): T => { + return this.value; + }; + + subscribe = (handler: Listener) => { + this.handlers.add(handler); + }; + stream = (handler: Listener) => { + this.subscribe(handler); + handler(this.value, this.version); + }; + + unsubscribe = (handler: Listener) => { + this.handlers.delete(handler); + }; + + /** + * Set the value and notify all subscribers with the new data. + * Only notifies if the value has changed. + */ + setValueAndNotify = (data: T) => { + if (this.value === data) { + return; + } + this.version++; + this.value = data; + for (const handler of this.handlers) { + handler(data, this.version); + } + }; +} diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index b5087bf07..1b4b99cfe 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -11,6 +11,7 @@ import { Slot, declareComEmitter, multiTenantMethod, + RemoteValue, } from '@dazl/engine-core'; import * as chai from 'chai'; import { expect } from 'chai'; @@ -54,6 +55,45 @@ describe('Communication', () => { expect(res).to.be.equal('Yoo!'); }); + it('signal support', async () => { + class EchoServiceWithSignal { + onChange = new RemoteValue(''); + echo(s: string) { + return s; + } + } + const host = new BaseHost(); + const main = new Communication(host, 'main'); + const echoService = new EchoServiceWithSignal(); + main.registerAPI({ id: 'echoService' }, echoService); + + const proxy = main.apiProxy(Promise.resolve({ id: 'main' }), { id: 'echoService' }); + const out: string[] = []; + const versions: number[] = []; + + const handler = (e: string, version: number) => { + out.push(e); + versions.push(version); + }; + proxy.onChange.subscribe(handler); + + // this code simulate cross environment communication it cannot be synchronous + await sleep(0); + echoService.onChange.setValueAndNotify('test'); + ////////////////////////////////////////////////////////////////////////////// + + proxy.onChange.unsubscribe(handler); + + // this code simulate cross environment communication it cannot be synchronous + await sleep(0); + echoService.onChange.setValueAndNotify('test 2'); + await sleep(0); + ////////////////////////////////////////////////////////////////////////////// + + expect(versions).to.eql([1]); + expect(out).to.eql(['test']); + }); + it('multi communication', async () => { const host = new BaseHost(); const main = new Communication(host, 'main');