diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 2eda2815e..437ea10ec 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -55,6 +55,13 @@ import { UnConfiguredMethodError, UnknownCallbackIdError, } from './communication-errors.js'; +import { + type RemoteValueAsyncMethods, + type RemoteValue, + remoteValueAsyncMethods, + ReconnectFunction, + RemoteValueListener, +} from '../remote-value.js'; export interface ConfigEnvironmentRecord extends EnvironmentRecord { registerMessageHandler?: boolean; @@ -94,6 +101,8 @@ export class Communication { private messageIdPrefix: string; // manual DEBUG_MODE private DEBUG = false; + // Track RemoteValue subscriptions with their current versions + private remoteValueTracking = new Map }>(); constructor( private host: Target, id: string, @@ -196,9 +205,9 @@ export class Communication { serviceComConfig: ServiceComConfig = {}, ): AsyncApi { return new Proxy(Object.create(null), { - get: (obj, method) => { + get: (runtimeCache, key) => { // let js runtime know that this is not thenable object - if (method === 'then') { + if (key === 'then') { return undefined; } @@ -207,28 +216,95 @@ export class Communication { * they used by the debugger and cause messages to be sent everywhere * this behavior made debugging very hard and can cause errors and infinite loops */ - if (Object.hasOwn(Object.prototype, method)) { - return Reflect.get(Object.prototype, method); + if (Object.hasOwn(Object.prototype, key)) { + return Reflect.get(Object.prototype, key); } - if (typeof method === 'string') { - let runtimeMethod = obj[method]; - if (!runtimeMethod) { - runtimeMethod = async (...args: unknown[]) => + if (typeof key === 'string') { + let runtimeValue = runtimeCache[key]; + if (!runtimeValue) { + runtimeValue = async (...args: unknown[]) => this.callMethod( (await instanceToken).id, api, - method, + key, args, this.rootEnvId, serviceComConfig as Record, ); - obj[method] = runtimeMethod; + runtimeCache[key] = Object.assign( + runtimeValue, + this.createAsyncRemoteValue(key, serviceComConfig, instanceToken, api), + ); } - return runtimeMethod; + return runtimeValue; } }, }); } + private createAsyncRemoteValue( + key: string, + serviceComConfig: ServiceComConfig, + instanceToken: EnvironmentInstanceToken | Promise, + api: string, + ) { + const config = serviceComConfig as Record; + const methods = { + subscribe: `${key}.subscribe`, + unsubscribe: `${key}.unsubscribe`, + stream: `${key}.stream`, + getValue: `${key}.getValue`, + }; + + config[methods.subscribe] = { emitOnly: true, listener: true }; + config[methods.stream] = { emitOnly: true, listener: true }; + config[methods.unsubscribe] = { emitOnly: true, removeListener: methods.subscribe }; + + const createMethod = + (methodName: string) => + async (...args: unknown[]) => { + return this.callMethod((await instanceToken).id, api, methodName, args, this.rootEnvId, config); + }; + + const fnHandlers = new WeakMap, RemoteValueListener>(); + const reconnect = createMethod(`${key}.reconnect`) as ReconnectFunction; + const createHandlerMethod = (methodName: string) => { + return async (...args: unknown[]) => { + const envId = (await instanceToken).id; + const handlerId = this.getHandlerId(envId, api, methodName); + const callback = args[0] as UnknownFunction; + let wrapedCallback = fnHandlers.get(callback); + if (!wrapedCallback) { + wrapedCallback = (value, version) => { + this.remoteValueTracking.set(handlerId, { + currentVersion: version, + reconnect, + }); + return callback(value, version); + }; + fnHandlers.set(callback, wrapedCallback); + } + args[0] = wrapedCallback; + return this.callMethod(envId, api, methodName, args, this.rootEnvId, config); + }; + }; + + return { + subscribe: createHandlerMethod(methods.subscribe), + stream: createHandlerMethod(methods.stream), + unsubscribe: async (fn: RemoteValueListener) => { + const wrapedCallback = fnHandlers.get(fn); + return this.callMethod( + (await instanceToken).id, + api, + methods.unsubscribe, + [wrapedCallback], + this.rootEnvId, + config, + ); + }, + getValue: createMethod(methods.getValue), + }; + } /** * Add local handle event listener to Target. @@ -634,11 +710,23 @@ export class Communication { this.post(env.host, message); } - private apiCall(origin: string, api: string, method: string, args: unknown[]): unknown { - if (this.apisOverrides[api]?.[method]) { - return this.apisOverrides[api][method](...[origin, ...args]); + private apiCall(origin: string, api: string, callPath: string, args: unknown[]): unknown { + const method = this.apisOverrides[api]?.[callPath]; + if (typeof method === 'function') { + return method(...[origin, ...args]); + } + // support for RemoteValue + const [apiName, ...subActions] = callPath.split('.'); + if ( + apiName && + subActions.length === 1 && + remoteValueAsyncMethods.has(subActions[0] as RemoteValueAsyncMethods) + ) { + const remoteValue = this.apis[api]![apiName] as RemoteValue; + const methodName = subActions[0] as RemoteValueAsyncMethods; + return (remoteValue[methodName] as UnknownFunction)(...args); } - return this.apis[api]![method]!(...args); + return (this.apis[api]![callPath] as UnknownFunction)(...args); } private unhandledMessage(message: Message): void { @@ -659,7 +747,6 @@ export class Communication { rej: (reason: unknown) => void, ) { const removeListenerRef = methodConfig?.removeAllListeners || methodConfig?.removeListener; - if (removeListenerRef && !methodConfig?.listener) { const listenerHandlerId = this.getHandlerId(envId, api, removeListenerRef); const listenerHandlersBucket = this.handlers.get(listenerHandlerId); @@ -825,6 +912,28 @@ export class Communication { } } } + + /** + * Reconnect all RemoteValue subscriptions for a given environment. + * Called automatically when an environment reconnects. + */ + async reconnectRemoteValues(envId: string) { + for (const [handlerId, tracking] of this.remoteValueTracking.entries()) { + if (this.DEBUG) { + console.debug( + `Reconnected RemoteValue for handler ${handlerId} in from ${envId} to version ${tracking.currentVersion}`, + ); + } + await tracking.reconnect(tracking.currentVersion).then((res) => { + if (!res) { + return; + } + for (const handler of this.handlers.get(handlerId)?.callbacks || []) { + handler(res.value, res.version); + } + }); + } + } private async handleUnListen(message: UnListenMessage) { const namespacedHandlerId = message.handlerId + message.origin; const dispatcher = this.eventDispatchers.get(namespacedHandlerId)?.dispatcher; @@ -966,14 +1075,26 @@ export class Communication { this.subscribeToEnvironmentDispose((disposedEnvId) => { if (envId === disposedEnvId) { this.handlers.delete(handlerId); + this.remoteValueTracking.delete(handlerId); } }); } handlersBucket ? handlersBucket.callbacks.add(fn) : this.handlers.set(handlerId, { message, callbacks: new Set([fn]) }); + return handlerId; } + + private isRemoteValueSubscription(method: string): boolean { + return method.endsWith('.subscribe') || method.endsWith('.stream'); + } + + private getRemoteValuePropertyName(method: string): string { + // Extract property name from 'propertyName.subscribe' or 'propertyName.stream' + const parts = method.split('.'); + return parts.slice(0, -1).join('.'); + } private createCallbackRecord( message: Message, callbackId: string, diff --git a/packages/core/src/com/types.ts b/packages/core/src/com/types.ts index 6104dfc05..f1a23bfcb 100644 --- a/packages/core/src/com/types.ts +++ b/packages/core/src/com/types.ts @@ -1,3 +1,4 @@ +import { AsyncRemoteValue, RemoteValue } from '../remote-value.js'; import { SERVICE_CONFIG } from '../symbols.js'; import { Message } from './message-types.js'; @@ -64,7 +65,9 @@ export type AsyncApi = { ? T[P] : T[P] extends (...args: infer Args) => infer R ? (...args: Args) => Promise - : never; + : T[P] extends RemoteValue + ? AsyncRemoteValue + : never; } & { [K in Extract]: never; }; @@ -118,5 +121,5 @@ export interface APIService { } export interface RemoteAPIServicesMapping { - [remoteServiceId: string]: Record; + [remoteServiceId: string]: Record>; } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index bd8f382f8..65125e54b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,6 +9,7 @@ export * from './runtime-feature.js'; export * from './symbols.js'; export * from './types.js'; export * from './communication.feature.js'; +export * from './remote-value.js'; export * from './runtime-main.js'; export * from './runtime-configurations.js'; diff --git a/packages/core/src/remote-value.ts b/packages/core/src/remote-value.ts new file mode 100644 index 000000000..355050695 --- /dev/null +++ b/packages/core/src/remote-value.ts @@ -0,0 +1,73 @@ +export type RemoteValueListener = (data: T, version: number) => void; +export type ReconnectFunction = (currentVersion: number) => Promise<{ + value: T; + version: number; +} | null>; + +export type AsyncRemoteValue = { + getValue: () => Promise; + stream: (handler: RemoteValueListener) => void; + subscribe: (handler: RemoteValueListener) => void; + unsubscribe: (handler: RemoteValueListener) => void; +}; + +export const remoteValueAsyncMethods = new Set([ + 'getValue', + 'stream', + 'subscribe', + 'unsubscribe', + 'reconnect', +] as const); +export type RemoteValueAsyncMethods = typeof remoteValueAsyncMethods extends Set ? U : never; + +export class RemoteValue { + private handlers = new Set>(); + private value: T; + private version: number = 0; + + constructor(initialValue: T) { + this.value = initialValue; + } + + getValue = (): T => { + return this.value; + }; + + subscribe = (handler: RemoteValueListener) => { + this.handlers.add(handler); + }; + stream = (handler: RemoteValueListener) => { + this.subscribe(handler); + handler(this.value, this.version); + }; + + unsubscribe = (handler: RemoteValueListener) => { + this.handlers.delete(handler); + }; + + /** + * Set the value and notify all subscribers with the new data. + * Only notifies if the value has changed. + */ + setValueAndNotify = (data: T) => { + if (this.value === data) { + return; + } + this.version++; + this.value = data; + for (const handler of this.handlers) { + handler(data, this.version); + } + }; + + /** + * Reconnect method to sync version and retrieve latest value if needed. + * Returns the latest value and version if there's a mismatch, otherwise returns null. + */ + reconnect = (currentVersion: number): { value: T; version: number } | null => { + if (currentVersion !== this.version) { + return { value: this.value, version: this.version }; + } + return null; + }; +} diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index b5087bf07..cfbb6d661 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -5,6 +5,7 @@ import { Communication, Environment, Feature, + RemoteValue, RuntimeEngine, SERVICE_CONFIG, Service, @@ -54,6 +55,217 @@ describe('Communication', () => { expect(res).to.be.equal('Yoo!'); }); + describe('RemoteValue', () => { + interface ServiceWithRemoteValue { + counter: RemoteValue; + } + + function setupCrossEnvCommunication() { + const hostMain = new BaseHost(); + const hostChild = hostMain.open(); + const comMain = new Communication(hostMain, 'main'); + const comChild = new Communication(hostChild, 'child'); + + comMain.registerEnv('child', hostChild); + comChild.registerEnv('main', hostMain); + + return { comMain, comChild, hostMain, hostChild }; + } + + function createEventCollector() { + const events: Array<{ value: T; version: number }> = []; + const handler = (value: T, version: number) => { + events.push({ value, version }); + }; + return { events, handler }; + } + + const waitOneTickAfter = () => sleep(0); + + it('should subscribe and unsubscribe to value changes', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + + expect(events, 'event with change').to.eql([{ value: 1, version: 1 }]); + + proxy.counter.unsubscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(2); + await waitOneTickAfter(); + + expect(events, 'no new events after unsubscribe').to.eql([{ value: 1, version: 1 }]); + }); + it('should stream current value immediately (subscribe+fetch)', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(5) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.stream(handler); + await waitOneTickAfter(); + + expect(events, 'immediate with initial value').to.eql([{ value: 5, version: 0 }]); + + service.counter.setValueAndNotify(10); + await waitOneTickAfter(); + + expect(events, 'initial + notify').to.eql([ + { value: 5, version: 0 }, + { value: 10, version: 1 }, + ]); + }); + it('should get current value via getValue', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(42) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + + const value = await proxy.counter.getValue(); + + expect(value).to.equal(42); + }); + it('should support multiple subscribers receiving same updates', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const collector1 = createEventCollector(); + const collector2 = createEventCollector(); + + proxy.counter.subscribe(collector1.handler); + proxy.counter.subscribe(collector2.handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + service.counter.setValueAndNotify(2); + await waitOneTickAfter(); + + expect(collector1.events, 'collector1 events').to.eql([ + { value: 1, version: 1 }, + { value: 2, version: 2 }, + ]); + expect(collector2.events, 'collector2 events').to.eql([ + { value: 1, version: 1 }, + { value: 2, version: 2 }, + ]); + }); + it('should track version increments across value changes', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(10); + await waitOneTickAfter(); + service.counter.setValueAndNotify(20); + await waitOneTickAfter(); + service.counter.setValueAndNotify(30); + await waitOneTickAfter(); + + expect(events.map((e) => e.version)).to.eql([1, 2, 3]); + }); + it('should not notify when value does not change', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(5) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(5); // same value + await waitOneTickAfter(); + + expect(events, 'no notifications').to.eql([]); + }); + it('should handle specific unsubscribe with multiple subscribers', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const collector1 = createEventCollector(); + const collector2 = createEventCollector(); + + proxy.counter.subscribe(collector1.handler); + proxy.counter.subscribe(collector2.handler); + await waitOneTickAfter(); + + proxy.counter.unsubscribe(collector1.handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + + expect(collector1.events, 'unsubscribed, no events').to.eql([]); + expect(collector2.events, 'still subscribed').to.eql([{ value: 1, version: 1 }]); + }); + it('should allow reconnect RemoteValue subscriptions on environment reconnection', async () => { + const hostServer = new BaseHost(); + const hostClient = new BaseHost(); + const comClient = new Communication(hostClient, 'client'); + const comServer = new Communication(hostServer, 'server'); + + comClient.registerEnv('server', hostServer); + comServer.registerEnv('client', hostClient); + + const service = { counter: new RemoteValue(10) }; + comServer.registerAPI({ id: 'myService' }, service); + + const clientService = comClient.apiProxy({ id: 'server' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + // Subscribe to updates + clientService.counter.subscribe(handler); + await waitOneTickAfter(); + + // Update value before reconnection + service.counter.setValueAndNotify(20); + await waitOneTickAfter(); + expect(events, 'received update before reconnect').to.eql([{ value: 20, version: 1 }]); + + comServer.clearEnvironment(comClient.getEnvironmentId()); + + service.counter.setValueAndNotify(30); + service.counter.setValueAndNotify(50); + + comServer.registerEnv('client', hostClient); + + await comClient.reconnectRemoteValues('server'); + + await waitFor(() => { + // Verify we got the latest synced value + expect(events.length, 'received reconnection sync').to.be.greaterThan(1); + const lastEvent = events[events.length - 1]; + expect(lastEvent?.value, 'synced to latest value').to.equal(50); + expect(lastEvent?.version, 'synced to latest version').to.equal(3); + }); + }); + }); + it('multi communication', async () => { const host = new BaseHost(); const main = new Communication(host, 'main');