From 2a9b1e078142c3e249df1c2a77d165e9691ad8d2 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Fri, 21 Nov 2025 11:30:44 +0200 Subject: [PATCH 1/6] feat: removeValue for sharing values across envs --- packages/core/src/com/communication.ts | 117 +++++++++++++++++--- packages/core/src/com/types.ts | 7 +- packages/core/src/index.ts | 1 + packages/core/src/remote-value.ts | 49 +++++++++ packages/core/test/node/com.spec.ts | 142 +++++++++++++++++++++++++ 5 files changed, 299 insertions(+), 17 deletions(-) create mode 100644 packages/core/src/remote-value.ts diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 2eda2815e..6af17029a 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -55,6 +55,7 @@ import { UnConfiguredMethodError, UnknownCallbackIdError, } from './communication-errors.js'; +import { type RemoteValueAsyncMethods, type RemoteValue, remoteValueAsyncMethods } from '../remote-value.js'; export interface ConfigEnvironmentRecord extends EnvironmentRecord { registerMessageHandler?: boolean; @@ -196,9 +197,9 @@ export class Communication { serviceComConfig: ServiceComConfig = {}, ): AsyncApi { return new Proxy(Object.create(null), { - get: (obj, method) => { + get: (runtimeCache, key) => { // let js runtime know that this is not thenable object - if (method === 'then') { + if (key === 'then') { return undefined; } @@ -207,28 +208,100 @@ export class Communication { * they used by the debugger and cause messages to be sent everywhere * this behavior made debugging very hard and can cause errors and infinite loops */ - if (Object.hasOwn(Object.prototype, method)) { - return Reflect.get(Object.prototype, method); + if (Object.hasOwn(Object.prototype, key)) { + return Reflect.get(Object.prototype, key); } - if (typeof method === 'string') { - let runtimeMethod = obj[method]; - if (!runtimeMethod) { - runtimeMethod = async (...args: unknown[]) => + if (typeof key === 'string') { + let runtimeValue = runtimeCache[key]; + if (!runtimeValue) { + runtimeValue = async (...args: unknown[]) => this.callMethod( (await instanceToken).id, api, - method, + key, args, this.rootEnvId, serviceComConfig as Record, ); - obj[method] = runtimeMethod; + runtimeCache[key] = Object.assign( + runtimeValue, + this.createAsyncRemoteValue(key, serviceComConfig, instanceToken, api), + ); } - return runtimeMethod; + return runtimeValue; } }, }); } + private createAsyncRemoteValue( + key: string, + serviceComConfig: ServiceComConfig, + instanceToken: EnvironmentInstanceToken | Promise, + api: string, + ) { + const subSignalId = key + '.' + 'subscribe'; + const unsubSignalId = key + '.' + 'unsubscribe'; + const streamSignalId = key + '.' + 'stream'; + const getValueId = key + '.getValue'; + + (serviceComConfig as Record)[subSignalId] = { + emitOnly: true, + listener: true, + }; + (serviceComConfig as Record)[unsubSignalId] = { + emitOnly: true, + removeListener: subSignalId, + }; + (serviceComConfig as Record)[streamSignalId] = { + emitOnly: true, + listener: true, + }; + + const asyncRemoteValue = { + subscribe: async (...args: unknown[]) => { + return this.callMethod( + (await instanceToken).id, + api, + subSignalId, + args, + this.rootEnvId, + serviceComConfig as Record, + ); + }, + unsubscribe: async (fn: UnknownFunction) => { + return this.callMethod( + (await instanceToken).id, + api, + unsubSignalId, + [fn], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + stream: async (fn: UnknownFunction) => { + return this.callMethod( + (await instanceToken).id, + api, + streamSignalId, + [fn], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + getValue: async () => { + return this.callMethod( + (await instanceToken).id, + api, + getValueId, + [], + this.rootEnvId, + serviceComConfig as Record, + ); + }, + }; + + return asyncRemoteValue; + } /** * Add local handle event listener to Target. @@ -634,11 +707,25 @@ export class Communication { this.post(env.host, message); } - private apiCall(origin: string, api: string, method: string, args: unknown[]): unknown { - if (this.apisOverrides[api]?.[method]) { - return this.apisOverrides[api][method](...[origin, ...args]); + private apiCall(origin: string, api: string, callPath: string, args: unknown[]): unknown { + const method = this.apisOverrides[api]?.[callPath]; + if (typeof method === 'function') { + return method(...[origin, ...args]); + } + // support for RemoteValue + const [apiName, ...subActions] = callPath.split('.'); + if ( + apiName && + subActions.length === 1 && + remoteValueAsyncMethods.has(subActions[0] as RemoteValueAsyncMethods) + ) { + const remoteValue = this.apis[api]![apiName] as RemoteValue; + const methodName = subActions[0] as RemoteValueAsyncMethods; + const fnArgs = args as [UnknownFunction]; + return remoteValue[methodName](...fnArgs); } - return this.apis[api]![method]!(...args); + // + return (this.apis[api]![callPath] as UnknownFunction)(...args); } private unhandledMessage(message: Message): void { diff --git a/packages/core/src/com/types.ts b/packages/core/src/com/types.ts index 6104dfc05..f1a23bfcb 100644 --- a/packages/core/src/com/types.ts +++ b/packages/core/src/com/types.ts @@ -1,3 +1,4 @@ +import { AsyncRemoteValue, RemoteValue } from '../remote-value.js'; import { SERVICE_CONFIG } from '../symbols.js'; import { Message } from './message-types.js'; @@ -64,7 +65,9 @@ export type AsyncApi = { ? T[P] : T[P] extends (...args: infer Args) => infer R ? (...args: Args) => Promise - : never; + : T[P] extends RemoteValue + ? AsyncRemoteValue + : never; } & { [K in Extract]: never; }; @@ -118,5 +121,5 @@ export interface APIService { } export interface RemoteAPIServicesMapping { - [remoteServiceId: string]: Record; + [remoteServiceId: string]: Record>; } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index bd8f382f8..65125e54b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,6 +9,7 @@ export * from './runtime-feature.js'; export * from './symbols.js'; export * from './types.js'; export * from './communication.feature.js'; +export * from './remote-value.js'; export * from './runtime-main.js'; export * from './runtime-configurations.js'; diff --git a/packages/core/src/remote-value.ts b/packages/core/src/remote-value.ts new file mode 100644 index 000000000..762687501 --- /dev/null +++ b/packages/core/src/remote-value.ts @@ -0,0 +1,49 @@ +export type Listener = (data: T) => void; +export type AsyncRemoteValue = { + getValue: () => Promise; + stream: (handler: Listener) => void; + subscribe: (handler: Listener) => void; + unsubscribe: (handler: Listener) => void; +}; + +export const remoteValueAsyncMethods = new Set(['getValue', 'stream', 'subscribe', 'unsubscribe'] as const); +export type RemoteValueAsyncMethods = typeof remoteValueAsyncMethods extends Set ? U : never; + +export class RemoteValue { + private handlers = new Set>(); + private value: T; + + constructor(initialValue: T) { + this.value = initialValue; + } + + getValue = (): T => { + return this.value; + }; + + subscribe = (handler: Listener) => { + this.handlers.add(handler); + }; + stream = (handler: Listener) => { + this.subscribe(handler); + handler(this.value); + }; + + unsubscribe = (handler: Listener) => { + this.handlers.delete(handler); + }; + + /** + * Set the value and notify all subscribers with the new data. + * Only notifies if the value has changed. + */ + setValueAndNotify = (data: T) => { + if (this.value === data) { + return; + } + this.value = data; + for (const handler of this.handlers) { + handler(data); + } + }; +} diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index b5087bf07..d79f22be5 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -5,6 +5,7 @@ import { Communication, Environment, Feature, + RemoteValue, RuntimeEngine, SERVICE_CONFIG, Service, @@ -54,6 +55,147 @@ describe('Communication', () => { expect(res).to.be.equal('Yoo!'); }); + describe('RemoteValue', () => { + interface ServiceWithRemoteValue { + counter: RemoteValue; + } + + function setupCrossEnvCommunication() { + const hostMain = new BaseHost(); + const hostChild = hostMain.open(); + const comMain = new Communication(hostMain, 'main'); + const comChild = new Communication(hostChild, 'child'); + + comMain.registerEnv('child', hostChild); + comChild.registerEnv('main', hostMain); + + return { comMain, comChild, hostMain, hostChild }; + } + + function createEventCollector() { + const events: Array<{ value: T }> = []; + const handler = (value: T) => { + events.push({ value }); + }; + return { events, handler }; + } + + const waitOneTickAfter = () => sleep(0); + + it('should subscribe and unsubscribe to value changes', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + + expect(events, 'event with change').to.eql([{ value: 1 }]); + + proxy.counter.unsubscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(2); + await waitOneTickAfter(); + + expect(events, 'no new events after unsubscribe').to.eql([{ value: 1 }]); + }); + it('should stream current value immediately (subscribe+fetch)', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(5) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.stream(handler); + await waitOneTickAfter(); + + expect(events, 'immediate with initial value').to.eql([{ value: 5 }]); + + service.counter.setValueAndNotify(10); + await waitOneTickAfter(); + + expect(events, 'initial + notify').to.eql([{ value: 5 }, { value: 10 }]); + }); + it('should get current value via getValue', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(42) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + + const value = await proxy.counter.getValue(); + + expect(value).to.equal(42); + }); + it('should support multiple subscribers receiving same updates', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const collector1 = createEventCollector(); + const collector2 = createEventCollector(); + + proxy.counter.subscribe(collector1.handler); + proxy.counter.subscribe(collector2.handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + service.counter.setValueAndNotify(2); + await waitOneTickAfter(); + + expect(collector1.events, 'collector1 events').to.eql([{ value: 1 }, { value: 2 }]); + expect(collector2.events, 'collector2 events').to.eql([{ value: 1 }, { value: 2 }]); + }); + it('should not notify when value does not change', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(5) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(5); // same value + await waitOneTickAfter(); + + expect(events, 'no notifications').to.eql([]); + }); + it('should handle specific unsubscribe with multiple subscribers', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const collector1 = createEventCollector(); + const collector2 = createEventCollector(); + + proxy.counter.subscribe(collector1.handler); + proxy.counter.subscribe(collector2.handler); + await waitOneTickAfter(); + + proxy.counter.unsubscribe(collector1.handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(1); + await waitOneTickAfter(); + + expect(collector1.events, 'unsubscribed, no events').to.eql([]); + expect(collector2.events, 'still subscribed').to.eql([{ value: 1 }]); + }); + }); + it('multi communication', async () => { const host = new BaseHost(); const main = new Communication(host, 'main'); From 450f6907f9554fa1e4ff2add67ce6251959ba0f7 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Fri, 21 Nov 2025 16:35:41 +0200 Subject: [PATCH 2/6] feat: add reconnect method to RemoteValue --- packages/core/src/com/communication.ts | 16 ++++-- packages/core/src/remote-value.ts | 42 +++++++++++---- packages/core/test/node/com.spec.ts | 74 ++++++++++++++++++++++---- 3 files changed, 108 insertions(+), 24 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 6af17029a..02d2b0ecb 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -243,6 +243,7 @@ export class Communication { const unsubSignalId = key + '.' + 'unsubscribe'; const streamSignalId = key + '.' + 'stream'; const getValueId = key + '.getValue'; + const reconnectId = key + '.reconnect'; (serviceComConfig as Record)[subSignalId] = { emitOnly: true, @@ -298,6 +299,16 @@ export class Communication { serviceComConfig as Record, ); }, + reconnect: async (currentVersion: number) => { + return this.callMethod( + (await instanceToken).id, + api, + reconnectId, + [currentVersion], + this.rootEnvId, + serviceComConfig as Record, + ); + }, }; return asyncRemoteValue; @@ -719,10 +730,9 @@ export class Communication { subActions.length === 1 && remoteValueAsyncMethods.has(subActions[0] as RemoteValueAsyncMethods) ) { - const remoteValue = this.apis[api]![apiName] as RemoteValue; + const remoteValue = this.apis[api]![apiName] as unknown as RemoteValue; const methodName = subActions[0] as RemoteValueAsyncMethods; - const fnArgs = args as [UnknownFunction]; - return remoteValue[methodName](...fnArgs); + return (remoteValue[methodName] as UnknownFunction)(...args); } // return (this.apis[api]![callPath] as UnknownFunction)(...args); diff --git a/packages/core/src/remote-value.ts b/packages/core/src/remote-value.ts index 762687501..d8d74d7be 100644 --- a/packages/core/src/remote-value.ts +++ b/packages/core/src/remote-value.ts @@ -1,17 +1,25 @@ -export type Listener = (data: T) => void; +export type RemoteValueListener = (data: T, version: number) => void; export type AsyncRemoteValue = { getValue: () => Promise; - stream: (handler: Listener) => void; - subscribe: (handler: Listener) => void; - unsubscribe: (handler: Listener) => void; + stream: (handler: RemoteValueListener) => void; + subscribe: (handler: RemoteValueListener) => void; + unsubscribe: (handler: RemoteValueListener) => void; + reconnect: (currentVersion: number) => Promise<{ value: T; version: number } | null>; }; -export const remoteValueAsyncMethods = new Set(['getValue', 'stream', 'subscribe', 'unsubscribe'] as const); +export const remoteValueAsyncMethods = new Set([ + 'getValue', + 'stream', + 'subscribe', + 'unsubscribe', + 'reconnect', +] as const); export type RemoteValueAsyncMethods = typeof remoteValueAsyncMethods extends Set ? U : never; export class RemoteValue { - private handlers = new Set>(); + private handlers = new Set>(); private value: T; + private version: number = 0; constructor(initialValue: T) { this.value = initialValue; @@ -21,15 +29,15 @@ export class RemoteValue { return this.value; }; - subscribe = (handler: Listener) => { + subscribe = (handler: RemoteValueListener) => { this.handlers.add(handler); }; - stream = (handler: Listener) => { + stream = (handler: RemoteValueListener) => { this.subscribe(handler); - handler(this.value); + handler(this.value, this.version); }; - unsubscribe = (handler: Listener) => { + unsubscribe = (handler: RemoteValueListener) => { this.handlers.delete(handler); }; @@ -41,9 +49,21 @@ export class RemoteValue { if (this.value === data) { return; } + this.version++; this.value = data; for (const handler of this.handlers) { - handler(data); + handler(data, this.version); } }; + + /** + * Reconnect method to sync version and retrieve latest value if needed. + * Returns the latest value and version if there's a mismatch, otherwise returns null. + */ + reconnect = (currentVersion: number): { value: T; version: number } | null => { + if (currentVersion !== this.version) { + return { value: this.value, version: this.version }; + } + return null; + }; } diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index d79f22be5..fcb579f97 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -73,9 +73,9 @@ describe('Communication', () => { } function createEventCollector() { - const events: Array<{ value: T }> = []; - const handler = (value: T) => { - events.push({ value }); + const events: Array<{ value: T; version: number }> = []; + const handler = (value: T, version: number) => { + events.push({ value, version }); }; return { events, handler }; } @@ -96,7 +96,7 @@ describe('Communication', () => { service.counter.setValueAndNotify(1); await waitOneTickAfter(); - expect(events, 'event with change').to.eql([{ value: 1 }]); + expect(events, 'event with change').to.eql([{ value: 1, version: 1 }]); proxy.counter.unsubscribe(handler); await waitOneTickAfter(); @@ -104,7 +104,7 @@ describe('Communication', () => { service.counter.setValueAndNotify(2); await waitOneTickAfter(); - expect(events, 'no new events after unsubscribe').to.eql([{ value: 1 }]); + expect(events, 'no new events after unsubscribe').to.eql([{ value: 1, version: 1 }]); }); it('should stream current value immediately (subscribe+fetch)', async () => { const { comMain, comChild } = setupCrossEnvCommunication(); @@ -117,12 +117,15 @@ describe('Communication', () => { proxy.counter.stream(handler); await waitOneTickAfter(); - expect(events, 'immediate with initial value').to.eql([{ value: 5 }]); + expect(events, 'immediate with initial value').to.eql([{ value: 5, version: 0 }]); service.counter.setValueAndNotify(10); await waitOneTickAfter(); - expect(events, 'initial + notify').to.eql([{ value: 5 }, { value: 10 }]); + expect(events, 'initial + notify').to.eql([ + { value: 5, version: 0 }, + { value: 10, version: 1 }, + ]); }); it('should get current value via getValue', async () => { const { comMain, comChild } = setupCrossEnvCommunication(); @@ -153,8 +156,34 @@ describe('Communication', () => { service.counter.setValueAndNotify(2); await waitOneTickAfter(); - expect(collector1.events, 'collector1 events').to.eql([{ value: 1 }, { value: 2 }]); - expect(collector2.events, 'collector2 events').to.eql([{ value: 1 }, { value: 2 }]); + expect(collector1.events, 'collector1 events').to.eql([ + { value: 1, version: 1 }, + { value: 2, version: 2 }, + ]); + expect(collector2.events, 'collector2 events').to.eql([ + { value: 1, version: 1 }, + { value: 2, version: 2 }, + ]); + }); + it('should track version increments across value changes', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(0) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + service.counter.setValueAndNotify(10); + await waitOneTickAfter(); + service.counter.setValueAndNotify(20); + await waitOneTickAfter(); + service.counter.setValueAndNotify(30); + await waitOneTickAfter(); + + expect(events.map((e) => e.version)).to.eql([1, 2, 3]); }); it('should not notify when value does not change', async () => { const { comMain, comChild } = setupCrossEnvCommunication(); @@ -192,7 +221,32 @@ describe('Communication', () => { await waitOneTickAfter(); expect(collector1.events, 'unsubscribed, no events').to.eql([]); - expect(collector2.events, 'still subscribed').to.eql([{ value: 1 }]); + expect(collector2.events, 'still subscribed').to.eql([{ value: 1, version: 1 }]); + }); + it('should reconnect and sync version correctly', async () => { + const { comMain, comChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(10) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + + // Initial version is 0, reconnect with matching version should return null + let result = await proxy.counter.reconnect(0); + expect(result, 'initial sync, versions match').to.eql(null); + + // Update value multiple times + service.counter.setValueAndNotify(20); + await waitOneTickAfter(); + service.counter.setValueAndNotify(30); + await waitOneTickAfter(); + + // Reconnect with old version should return latest value and version + result = await proxy.counter.reconnect(0); + expect(result, 'versions mismatch, return latest').to.eql({ value: 30, version: 2 }); + + // Reconnect with current version should return null + result = await proxy.counter.reconnect(2); + expect(result, 'versions match, no update needed').to.eql(null); }); }); From aaadbed24881b3187560c6a7936d8755d70cd270 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sat, 22 Nov 2025 11:37:04 +0200 Subject: [PATCH 3/6] feat: auto sync on reconnect --- packages/core/src/com/communication.ts | 73 ++++++++++++++++++++++++++ packages/core/test/node/com.spec.ts | 41 +++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 02d2b0ecb..a4de5f59d 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -95,6 +95,11 @@ export class Communication { private messageIdPrefix: string; // manual DEBUG_MODE private DEBUG = false; + // Track RemoteValue subscriptions with their current versions + private remoteValueTracking = new Map< + string, + { currentVersion: number; envId: string; api: string; method: string } + >(); constructor( private host: Target, id: string, @@ -880,6 +885,12 @@ export class Communication { if (!handlers) { return; } + // Track version for RemoteValue subscriptions + const tracking = this.remoteValueTracking.get(message.handlerId); + if (tracking && message.data.length === 2 && typeof message.data[1] === 'number') { + // RemoteValue listeners receive (value, version) + tracking.currentVersion = message.data[1]; + } for (const handler of handlers.callbacks) { handler(...message.data); } @@ -917,11 +928,51 @@ export class Communication { this.sendTo(message.to, { ...message, callbackId: undefined }); } } + // Reconnect RemoteValue subscriptions + this.reconnectRemoteValues(from); for (const reConnectHandler of this.reConnectListeners) { reConnectHandler(from); } } } + + /** + * Reconnect all RemoteValue subscriptions for a given environment. + * Called automatically when an environment reconnects. + */ + private reconnectRemoteValues(envId: string): void { + for (const [handlerId, tracking] of this.remoteValueTracking.entries()) { + if (tracking.envId === envId) { + // Call reconnect method for this RemoteValue + const reconnectMethod = `${tracking.method}.reconnect`; + this.callMethod(envId, tracking.api, reconnectMethod, [tracking.currentVersion], this.rootEnvId, {}) + .then((result) => { + if (result && typeof result === 'object' && 'value' in result && 'version' in result) { + // Version mismatch detected, update tracking and notify handlers + const syncData = result as { value: unknown; version: number }; + tracking.currentVersion = syncData.version; + + // Notify all handlers with the updated value + const handlers = this.handlers.get(handlerId); + if (handlers) { + for (const handler of handlers.callbacks) { + handler(syncData.value, syncData.version); + } + } + } + }) + .catch((error) => { + // Log error but don't fail the reconnection process + if (this.DEBUG) { + console.error( + `Failed to reconnect RemoteValue ${tracking.api}.${tracking.method} for ${envId}:`, + error, + ); + } + }); + } + } + } private async handleUnListen(message: UnListenMessage) { const namespacedHandlerId = message.handlerId + message.origin; const dispatcher = this.eventDispatchers.get(namespacedHandlerId)?.dispatcher; @@ -1063,14 +1114,36 @@ export class Communication { this.subscribeToEnvironmentDispose((disposedEnvId) => { if (envId === disposedEnvId) { this.handlers.delete(handlerId); + this.remoteValueTracking.delete(handlerId); } }); } handlersBucket ? handlersBucket.callbacks.add(fn) : this.handlers.set(handlerId, { message, callbacks: new Set([fn]) }); + + // Track RemoteValue subscriptions + if (this.isRemoteValueSubscription(method)) { + this.remoteValueTracking.set(handlerId, { + currentVersion: 0, + envId, + api, + method: this.getRemoteValuePropertyName(method), + }); + } + return handlerId; } + + private isRemoteValueSubscription(method: string): boolean { + return method.endsWith('.subscribe') || method.endsWith('.stream'); + } + + private getRemoteValuePropertyName(method: string): string { + // Extract property name from 'propertyName.subscribe' or 'propertyName.stream' + const parts = method.split('.'); + return parts.slice(0, -1).join('.'); + } private createCallbackRecord( message: Message, callbackId: string, diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index fcb579f97..ef53ed0f3 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -248,6 +248,47 @@ describe('Communication', () => { result = await proxy.counter.reconnect(2); expect(result, 'versions match, no update needed').to.eql(null); }); + it('should auto-reconnect RemoteValue subscriptions on environment reconnection', async () => { + const { comMain, comChild, hostChild } = setupCrossEnvCommunication(); + const service = { counter: new RemoteValue(10) }; + comChild.registerAPI({ id: 'myService' }, service); + + const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const { events, handler } = createEventCollector(); + + // Subscribe to updates + proxy.counter.subscribe(handler); + await waitOneTickAfter(); + + // Update value before reconnection + service.counter.setValueAndNotify(20); + await waitOneTickAfter(); + expect(events, 'received update before reconnect').to.eql([{ value: 20, version: 1 }]); + + // Simulate environment disconnection by removing message handler + comChild.removeMessageHandler(hostChild); + + // Reconnect the environment with new service instance that has progressed + const newService = { counter: new RemoteValue(10) }; + // Simulate the service progressing while "disconnected" + newService.counter.setValueAndNotify(30); // version 1 + newService.counter.setValueAndNotify(40); // version 2 + + // Create new Communication with APIs passed in constructor (like other reconnection tests) + const reconnectedComChild = new Communication(hostChild, 'child', undefined, undefined, undefined, { + apis: { myService: newService } as any, + }); + reconnectedComChild.registerEnv('main', comMain.getEnvironmentHost('main')!); + + // Wait for auto-reconnection to happen and sync to complete + await waitFor(() => { + // Verify we got the latest synced value + expect(events.length, 'received reconnection sync').to.be.greaterThan(1); + const lastEvent = events[events.length - 1]; + expect(lastEvent?.value, 'synced to latest value').to.equal(40); + expect(lastEvent?.version, 'synced to latest version').to.equal(2); + }); + }); }); it('multi communication', async () => { From 3be3eb1aab696b7cb4d3eddc646755a6fcffbfc2 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sat, 22 Nov 2025 12:00:32 +0200 Subject: [PATCH 4/6] refactor: simplify createAsyncRemoteValue implementation --- packages/core/src/com/communication.ts | 92 ++++++-------------------- 1 file changed, 22 insertions(+), 70 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index a4de5f59d..8b4b15734 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -244,79 +244,31 @@ export class Communication { instanceToken: EnvironmentInstanceToken | Promise, api: string, ) { - const subSignalId = key + '.' + 'subscribe'; - const unsubSignalId = key + '.' + 'unsubscribe'; - const streamSignalId = key + '.' + 'stream'; - const getValueId = key + '.getValue'; - const reconnectId = key + '.reconnect'; - - (serviceComConfig as Record)[subSignalId] = { - emitOnly: true, - listener: true, - }; - (serviceComConfig as Record)[unsubSignalId] = { - emitOnly: true, - removeListener: subSignalId, - }; - (serviceComConfig as Record)[streamSignalId] = { - emitOnly: true, - listener: true, + const config = serviceComConfig as Record; + const methods = { + subscribe: `${key}.subscribe`, + unsubscribe: `${key}.unsubscribe`, + stream: `${key}.stream`, + getValue: `${key}.getValue`, + reconnect: `${key}.reconnect`, }; - const asyncRemoteValue = { - subscribe: async (...args: unknown[]) => { - return this.callMethod( - (await instanceToken).id, - api, - subSignalId, - args, - this.rootEnvId, - serviceComConfig as Record, - ); - }, - unsubscribe: async (fn: UnknownFunction) => { - return this.callMethod( - (await instanceToken).id, - api, - unsubSignalId, - [fn], - this.rootEnvId, - serviceComConfig as Record, - ); - }, - stream: async (fn: UnknownFunction) => { - return this.callMethod( - (await instanceToken).id, - api, - streamSignalId, - [fn], - this.rootEnvId, - serviceComConfig as Record, - ); - }, - getValue: async () => { - return this.callMethod( - (await instanceToken).id, - api, - getValueId, - [], - this.rootEnvId, - serviceComConfig as Record, - ); - }, - reconnect: async (currentVersion: number) => { - return this.callMethod( - (await instanceToken).id, - api, - reconnectId, - [currentVersion], - this.rootEnvId, - serviceComConfig as Record, - ); - }, - }; + config[methods.subscribe] = { emitOnly: true, listener: true }; + config[methods.stream] = { emitOnly: true, listener: true }; + config[methods.unsubscribe] = { emitOnly: true, removeListener: methods.subscribe }; - return asyncRemoteValue; + const createMethod = + (methodName: string) => + async (...args: unknown[]) => { + return this.callMethod((await instanceToken).id, api, methodName, args, this.rootEnvId, config); + }; + return { + subscribe: createMethod(methods.subscribe), + unsubscribe: createMethod(methods.unsubscribe), + stream: createMethod(methods.stream), + getValue: createMethod(methods.getValue), + reconnect: createMethod(methods.reconnect), + }; } /** From acd705fdc51db02ff3900215dab67e6a9893b598 Mon Sep 17 00:00:00 2001 From: Barak Igal Date: Sun, 23 Nov 2025 16:29:39 +0200 Subject: [PATCH 5/6] refacgtor: improve reconnect flow --- packages/core/src/com/communication.ts | 116 ++++++++++++------------- packages/core/src/remote-value.ts | 6 +- packages/core/test/node/com.spec.ts | 61 ++++--------- 3 files changed, 81 insertions(+), 102 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 8b4b15734..e32820300 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -55,7 +55,13 @@ import { UnConfiguredMethodError, UnknownCallbackIdError, } from './communication-errors.js'; -import { type RemoteValueAsyncMethods, type RemoteValue, remoteValueAsyncMethods } from '../remote-value.js'; +import { + type RemoteValueAsyncMethods, + type RemoteValue, + remoteValueAsyncMethods, + ReconnectFunction, + RemoteValueListener, +} from '../remote-value.js'; export interface ConfigEnvironmentRecord extends EnvironmentRecord { registerMessageHandler?: boolean; @@ -96,10 +102,7 @@ export class Communication { // manual DEBUG_MODE private DEBUG = false; // Track RemoteValue subscriptions with their current versions - private remoteValueTracking = new Map< - string, - { currentVersion: number; envId: string; api: string; method: string } - >(); + private remoteValueTracking = new Map }>(); constructor( private host: Target, id: string, @@ -250,7 +253,6 @@ export class Communication { unsubscribe: `${key}.unsubscribe`, stream: `${key}.stream`, getValue: `${key}.getValue`, - reconnect: `${key}.reconnect`, }; config[methods.subscribe] = { emitOnly: true, listener: true }; @@ -262,12 +264,45 @@ export class Communication { async (...args: unknown[]) => { return this.callMethod((await instanceToken).id, api, methodName, args, this.rootEnvId, config); }; + + const fnHandlers = new WeakMap, RemoteValueListener>(); + const reconnect = createMethod(`${key}.reconnect`) as ReconnectFunction; + const createHandlerMethod = (methodName: string) => { + return async (...args: unknown[]) => { + const envId = (await instanceToken).id; + const handlerId = this.getHandlerId(envId, api, methodName); + const callback = args[0] as UnknownFunction; + let wrapedCallback = fnHandlers.get(callback); + if (!wrapedCallback) { + wrapedCallback = (value, version) => { + this.remoteValueTracking.set(handlerId, { + currentVersion: version, + reconnect, + }); + return callback(value, version); + }; + fnHandlers.set(callback, wrapedCallback); + } + args[0] = wrapedCallback; + return this.callMethod(envId, api, methodName, args, this.rootEnvId, config); + }; + }; + return { - subscribe: createMethod(methods.subscribe), - unsubscribe: createMethod(methods.unsubscribe), - stream: createMethod(methods.stream), + subscribe: createHandlerMethod(methods.subscribe), + stream: createHandlerMethod(methods.stream), + unsubscribe: async (fn: RemoteValueListener) => { + const wrapedCallback = fnHandlers.get(fn); + return this.callMethod( + (await instanceToken).id, + api, + methods.unsubscribe, + [wrapedCallback], + this.rootEnvId, + config, + ); + }, getValue: createMethod(methods.getValue), - reconnect: createMethod(methods.reconnect), }; } @@ -713,7 +748,6 @@ export class Communication { rej: (reason: unknown) => void, ) { const removeListenerRef = methodConfig?.removeAllListeners || methodConfig?.removeListener; - if (removeListenerRef && !methodConfig?.listener) { const listenerHandlerId = this.getHandlerId(envId, api, removeListenerRef); const listenerHandlersBucket = this.handlers.get(listenerHandlerId); @@ -837,12 +871,6 @@ export class Communication { if (!handlers) { return; } - // Track version for RemoteValue subscriptions - const tracking = this.remoteValueTracking.get(message.handlerId); - if (tracking && message.data.length === 2 && typeof message.data[1] === 'number') { - // RemoteValue listeners receive (value, version) - tracking.currentVersion = message.data[1]; - } for (const handler of handlers.callbacks) { handler(...message.data); } @@ -880,8 +908,6 @@ export class Communication { this.sendTo(message.to, { ...message, callbackId: undefined }); } } - // Reconnect RemoteValue subscriptions - this.reconnectRemoteValues(from); for (const reConnectHandler of this.reConnectListeners) { reConnectHandler(from); } @@ -892,37 +918,21 @@ export class Communication { * Reconnect all RemoteValue subscriptions for a given environment. * Called automatically when an environment reconnects. */ - private reconnectRemoteValues(envId: string): void { + async reconnectRemoteValues(envId: string) { for (const [handlerId, tracking] of this.remoteValueTracking.entries()) { - if (tracking.envId === envId) { - // Call reconnect method for this RemoteValue - const reconnectMethod = `${tracking.method}.reconnect`; - this.callMethod(envId, tracking.api, reconnectMethod, [tracking.currentVersion], this.rootEnvId, {}) - .then((result) => { - if (result && typeof result === 'object' && 'value' in result && 'version' in result) { - // Version mismatch detected, update tracking and notify handlers - const syncData = result as { value: unknown; version: number }; - tracking.currentVersion = syncData.version; - - // Notify all handlers with the updated value - const handlers = this.handlers.get(handlerId); - if (handlers) { - for (const handler of handlers.callbacks) { - handler(syncData.value, syncData.version); - } - } - } - }) - .catch((error) => { - // Log error but don't fail the reconnection process - if (this.DEBUG) { - console.error( - `Failed to reconnect RemoteValue ${tracking.api}.${tracking.method} for ${envId}:`, - error, - ); - } - }); + if (this.DEBUG) { + console.debug( + `Reconnected RemoteValue for handler ${handlerId} in from ${envId} to version ${tracking.currentVersion}`, + ); } + await tracking.reconnect(tracking.currentVersion).then((res) => { + if (!res) { + return; + } + for (const handler of this.handlers.get(handlerId)?.callbacks || []) { + handler(res.value, res.version); + } + }); } } private async handleUnListen(message: UnListenMessage) { @@ -1074,16 +1084,6 @@ export class Communication { ? handlersBucket.callbacks.add(fn) : this.handlers.set(handlerId, { message, callbacks: new Set([fn]) }); - // Track RemoteValue subscriptions - if (this.isRemoteValueSubscription(method)) { - this.remoteValueTracking.set(handlerId, { - currentVersion: 0, - envId, - api, - method: this.getRemoteValuePropertyName(method), - }); - } - return handlerId; } diff --git a/packages/core/src/remote-value.ts b/packages/core/src/remote-value.ts index d8d74d7be..355050695 100644 --- a/packages/core/src/remote-value.ts +++ b/packages/core/src/remote-value.ts @@ -1,10 +1,14 @@ export type RemoteValueListener = (data: T, version: number) => void; +export type ReconnectFunction = (currentVersion: number) => Promise<{ + value: T; + version: number; +} | null>; + export type AsyncRemoteValue = { getValue: () => Promise; stream: (handler: RemoteValueListener) => void; subscribe: (handler: RemoteValueListener) => void; unsubscribe: (handler: RemoteValueListener) => void; - reconnect: (currentVersion: number) => Promise<{ value: T; version: number } | null>; }; export const remoteValueAsyncMethods = new Set([ diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index ef53ed0f3..cfbb6d661 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -223,41 +223,23 @@ describe('Communication', () => { expect(collector1.events, 'unsubscribed, no events').to.eql([]); expect(collector2.events, 'still subscribed').to.eql([{ value: 1, version: 1 }]); }); - it('should reconnect and sync version correctly', async () => { - const { comMain, comChild } = setupCrossEnvCommunication(); - const service = { counter: new RemoteValue(10) }; - comChild.registerAPI({ id: 'myService' }, service); - - const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); - - // Initial version is 0, reconnect with matching version should return null - let result = await proxy.counter.reconnect(0); - expect(result, 'initial sync, versions match').to.eql(null); - - // Update value multiple times - service.counter.setValueAndNotify(20); - await waitOneTickAfter(); - service.counter.setValueAndNotify(30); - await waitOneTickAfter(); + it('should allow reconnect RemoteValue subscriptions on environment reconnection', async () => { + const hostServer = new BaseHost(); + const hostClient = new BaseHost(); + const comClient = new Communication(hostClient, 'client'); + const comServer = new Communication(hostServer, 'server'); - // Reconnect with old version should return latest value and version - result = await proxy.counter.reconnect(0); - expect(result, 'versions mismatch, return latest').to.eql({ value: 30, version: 2 }); + comClient.registerEnv('server', hostServer); + comServer.registerEnv('client', hostClient); - // Reconnect with current version should return null - result = await proxy.counter.reconnect(2); - expect(result, 'versions match, no update needed').to.eql(null); - }); - it('should auto-reconnect RemoteValue subscriptions on environment reconnection', async () => { - const { comMain, comChild, hostChild } = setupCrossEnvCommunication(); const service = { counter: new RemoteValue(10) }; - comChild.registerAPI({ id: 'myService' }, service); + comServer.registerAPI({ id: 'myService' }, service); - const proxy = comMain.apiProxy({ id: 'child' }, { id: 'myService' }); + const clientService = comClient.apiProxy({ id: 'server' }, { id: 'myService' }); const { events, handler } = createEventCollector(); // Subscribe to updates - proxy.counter.subscribe(handler); + clientService.counter.subscribe(handler); await waitOneTickAfter(); // Update value before reconnection @@ -265,28 +247,21 @@ describe('Communication', () => { await waitOneTickAfter(); expect(events, 'received update before reconnect').to.eql([{ value: 20, version: 1 }]); - // Simulate environment disconnection by removing message handler - comChild.removeMessageHandler(hostChild); + comServer.clearEnvironment(comClient.getEnvironmentId()); + + service.counter.setValueAndNotify(30); + service.counter.setValueAndNotify(50); - // Reconnect the environment with new service instance that has progressed - const newService = { counter: new RemoteValue(10) }; - // Simulate the service progressing while "disconnected" - newService.counter.setValueAndNotify(30); // version 1 - newService.counter.setValueAndNotify(40); // version 2 + comServer.registerEnv('client', hostClient); - // Create new Communication with APIs passed in constructor (like other reconnection tests) - const reconnectedComChild = new Communication(hostChild, 'child', undefined, undefined, undefined, { - apis: { myService: newService } as any, - }); - reconnectedComChild.registerEnv('main', comMain.getEnvironmentHost('main')!); + await comClient.reconnectRemoteValues('server'); - // Wait for auto-reconnection to happen and sync to complete await waitFor(() => { // Verify we got the latest synced value expect(events.length, 'received reconnection sync').to.be.greaterThan(1); const lastEvent = events[events.length - 1]; - expect(lastEvent?.value, 'synced to latest value').to.equal(40); - expect(lastEvent?.version, 'synced to latest version').to.equal(2); + expect(lastEvent?.value, 'synced to latest value').to.equal(50); + expect(lastEvent?.version, 'synced to latest version').to.equal(3); }); }); }); From 5fe7e6b9f61f7c7a29dfc3ee08af8c1aa44c5f09 Mon Sep 17 00:00:00 2001 From: Barak Igal Date: Sun, 23 Nov 2025 16:36:29 +0200 Subject: [PATCH 6/6] refactor: remove unnecessary type assertion in Communication class --- packages/core/src/com/communication.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index e32820300..437ea10ec 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -722,11 +722,10 @@ export class Communication { subActions.length === 1 && remoteValueAsyncMethods.has(subActions[0] as RemoteValueAsyncMethods) ) { - const remoteValue = this.apis[api]![apiName] as unknown as RemoteValue; + const remoteValue = this.apis[api]![apiName] as RemoteValue; const methodName = subActions[0] as RemoteValueAsyncMethods; return (remoteValue[methodName] as UnknownFunction)(...args); } - // return (this.apis[api]![callPath] as UnknownFunction)(...args); }