From e5e2af8ebf27502d8cc533f208f279b94206e1fb Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 21 Dec 2025 12:01:17 +0200 Subject: [PATCH 1/2] chore: remove unused methods --- packages/core/src/com/communication.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 437ea10ec..9e69730d6 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -1085,16 +1085,6 @@ export class Communication { return handlerId; } - - private isRemoteValueSubscription(method: string): boolean { - return method.endsWith('.subscribe') || method.endsWith('.stream'); - } - - private getRemoteValuePropertyName(method: string): string { - // Extract property name from 'propertyName.subscribe' or 'propertyName.stream' - const parts = method.split('.'); - return parts.slice(0, -1).join('.'); - } private createCallbackRecord( message: Message, callbackId: string, From 11c61b87ab250738b4b3791ec54500faa5df6b0a Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 21 Dec 2025 14:22:22 +0200 Subject: [PATCH 2/2] feat: add RemoteAggregatedValue class --- packages/core/src/com/communication.ts | 6 +- packages/core/src/com/types.ts | 5 +- packages/core/src/index.ts | 1 + packages/core/src/remote-aggregated-value.ts | 61 +++++++ packages/core/test/node/com.spec.ts | 182 ++++++++++++++++++- 5 files changed, 249 insertions(+), 6 deletions(-) create mode 100644 packages/core/src/remote-aggregated-value.ts diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 9e69730d6..84a3930b9 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -274,12 +274,12 @@ export class Communication { const callback = args[0] as UnknownFunction; let wrapedCallback = fnHandlers.get(callback); if (!wrapedCallback) { - wrapedCallback = (value, version) => { + wrapedCallback = (value, version, ...callbackArgs) => { this.remoteValueTracking.set(handlerId, { currentVersion: version, reconnect, }); - return callback(value, version); + return callback(value, version, ...callbackArgs); }; fnHandlers.set(callback, wrapedCallback); } @@ -929,7 +929,7 @@ export class Communication { return; } for (const handler of this.handlers.get(handlerId)?.callbacks || []) { - handler(res.value, res.version); + handler(res.value, res.version, 'all'); } }); } diff --git a/packages/core/src/com/types.ts b/packages/core/src/com/types.ts index f1a23bfcb..1968c43f0 100644 --- a/packages/core/src/com/types.ts +++ b/packages/core/src/com/types.ts @@ -1,3 +1,4 @@ +import { RemoteAggregatedValue, AsyncRemoteAggregatedValue } from '../remote-aggregated-value.js'; import { AsyncRemoteValue, RemoteValue } from '../remote-value.js'; import { SERVICE_CONFIG } from '../symbols.js'; import { Message } from './message-types.js'; @@ -67,7 +68,9 @@ export type AsyncApi = { ? (...args: Args) => Promise : T[P] extends RemoteValue ? AsyncRemoteValue - : never; + : T[P] extends RemoteAggregatedValue + ? AsyncRemoteAggregatedValue + : never; } & { [K in Extract]: never; }; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 65125e54b..7304c27c7 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -10,6 +10,7 @@ export * from './symbols.js'; export * from './types.js'; export * from './communication.feature.js'; export * from './remote-value.js'; +export * from './remote-aggregated-value.js'; export * from './runtime-main.js'; export * from './runtime-configurations.js'; diff --git a/packages/core/src/remote-aggregated-value.ts b/packages/core/src/remote-aggregated-value.ts new file mode 100644 index 000000000..daf207678 --- /dev/null +++ b/packages/core/src/remote-aggregated-value.ts @@ -0,0 +1,61 @@ +export type RemoteAggregatedValueListener = (data: T | T[], version: number, modifier: 'item' | 'all') => void; + +export type AsyncRemoteAggregatedValue = { + getValue: () => Promise; + subscribe: (handler: RemoteAggregatedValueListener) => void; + unsubscribe: (handler: RemoteAggregatedValueListener) => void; +}; + +export class RemoteAggregatedValue { + private allItems: T[] = []; + private limit: number; + private handlers = new Set>(); + private version: number = 0; + + constructor({ limit = 10 }: { limit?: number } = {}) { + this.limit = limit; + } + + getValue = (): T[] => { + return this.allItems; + }; + + subscribe = (handler: RemoteAggregatedValueListener) => { + handler(this.allItems, this.version, 'all'); + this.handlers.add(handler); + }; + + unsubscribe = (handler: RemoteAggregatedValueListener) => { + this.handlers.delete(handler); + }; + + push(item: T) { + this.allItems.push(item); + if (this.allItems.length > this.limit) { + this.allItems.shift(); + } + this.version++; + for (const handler of this.handlers) { + handler(item, this.version, 'item'); + } + } + + clear() { + this.allItems = []; + this.version++; + for (const handler of this.handlers) { + handler([], this.version, 'all'); + } + } + + /** + * Reconnect method to sync version and retrieve latest value if needed. + * Returns the latest value and version if there's a mismatch, otherwise returns null. + */ + reconnect = (currentVersion: number): { value: T[]; version: number } | null => { + if (currentVersion !== this.version) { + return { value: this.allItems, version: this.version }; + } + return null; + }; +} diff --git a/packages/core/test/node/com.spec.ts b/packages/core/test/node/com.spec.ts index cfbb6d661..91810e73e 100644 --- a/packages/core/test/node/com.spec.ts +++ b/packages/core/test/node/com.spec.ts @@ -6,6 +6,7 @@ import { Environment, Feature, RemoteValue, + RemoteAggregatedValue, RuntimeEngine, SERVICE_CONFIG, Service, @@ -246,14 +247,12 @@ describe('Communication', () => { service.counter.setValueAndNotify(20); await waitOneTickAfter(); expect(events, 'received update before reconnect').to.eql([{ value: 20, version: 1 }]); - comServer.clearEnvironment(comClient.getEnvironmentId()); service.counter.setValueAndNotify(30); service.counter.setValueAndNotify(50); comServer.registerEnv('client', hostClient); - await comClient.reconnectRemoteValues('server'); await waitFor(() => { @@ -266,6 +265,185 @@ describe('Communication', () => { }); }); + describe('RemoteAggregatedValue', () => { + interface ServiceWithRemoteAggregatedValue { + items: RemoteAggregatedValue; + } + + function setupCrossEnvCommunication() { + const hostServer = new BaseHost(); + const hostClient = new BaseHost(); + const comServer = new Communication(hostServer, 'server'); + const comClient = new Communication(hostClient, 'client'); + + comServer.registerEnv('client', hostClient); + comClient.registerEnv('server', hostServer); + + return { comServer, comClient, hostServer, hostClient }; + } + + function createAggregatedEventCollector() { + const events: Array<{ value: T | T[]; version: number; modifier: 'item' | 'all' }> = []; + const handler = (value: T | T[], version: number, modifier: 'item' | 'all') => { + events.push({ value: structuredClone(value), version, modifier }); + }; + return { events, handler }; + } + + const waitOneTickAfter = () => sleep(0); + + it('should start with an empty array', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + + const value = await proxy.items.getValue(); + + expect(value).to.eql([]); + }); + it('should subscribe to current value immediately', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + service.items.push('a'); // version 1 + service.items.push('b'); // version 2 + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + const { events, handler } = createAggregatedEventCollector(); + + proxy.items.subscribe(handler); + await waitOneTickAfter(); + + expect(events).to.eql([{ value: ['a', 'b'], version: 2, modifier: 'all' }]); + }); + it('should push items and notify subscribers with versions', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + const { events, handler } = createAggregatedEventCollector(); + + proxy.items.subscribe(handler); + await waitOneTickAfter(); + service.items.push('a'); + await waitOneTickAfter(); + service.items.push('b'); + await waitOneTickAfter(); + + expect(events).to.eql([ + { value: [], version: 0, modifier: 'all' }, + { value: 'a', version: 1, modifier: 'item' }, + { value: 'b', version: 2, modifier: 'item' }, + ]); + }); + it('should respect the limit', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue({ limit: 3 }) }; + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + + service.items.push('1'); + service.items.push('2'); + service.items.push('3'); + service.items.push('4'); + await waitOneTickAfter(); + + const value = await proxy.items.getValue(); + expect(value).to.eql(['2', '3', '4']); + }); + it('should unsubscribe correctly', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + const { events, handler } = createAggregatedEventCollector(); + + proxy.items.subscribe(handler); + await waitOneTickAfter(); + + service.items.push('a'); + await waitOneTickAfter(); + + proxy.items.unsubscribe(handler); + await waitOneTickAfter(); + + service.items.push('b'); + await waitOneTickAfter(); + + expect(events).to.eql([ + { value: [], version: 0, modifier: 'all' }, + { value: 'a', version: 1, modifier: 'item' }, + ]); + }); + it('should clear items and notify subscribers', async () => { + const { comServer, comClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + comClient.registerAPI({ id: 'myService' }, service); + const proxy = comServer.apiProxy({ id: 'client' }, { id: 'myService' }); + const { events, handler } = createAggregatedEventCollector(); + + proxy.items.subscribe(handler); + await waitOneTickAfter(); + + service.items.push('a'); + await waitOneTickAfter(); + + service.items.clear(); + await waitOneTickAfter(); + + expect(events).to.eql([ + { value: [], version: 0, modifier: 'all' }, + { value: 'a', version: 1, modifier: 'item' }, + { value: [], version: 2, modifier: 'all' }, + ]); + + const valueAfterClear = await proxy.items.getValue(); + expect(valueAfterClear, 'getValue after clear').to.eql([]); + }); + + it('should reconnect RemoteAggregatedValue subscriptions after env reconnect', async () => { + const { comServer, comClient, hostClient } = setupCrossEnvCommunication(); + const service = { items: new RemoteAggregatedValue() }; + comServer.registerAPI({ id: 'myService' }, service); + const clientService = comClient.apiProxy( + { id: 'server' }, + { id: 'myService' }, + ); + const { events, handler } = createAggregatedEventCollector(); + + // initial subscription -> should immediately receive current value + clientService.items.subscribe(handler); + await waitOneTickAfter(); + expect(events, 'initial subscribe emits current value').to.eql([ + { value: [], version: 0, modifier: 'all' }, + ]); + + // push one item while connected + service.items.push('a'); // v1 + await waitOneTickAfter(); + // simulate env disconnect + comServer.clearEnvironment(comClient.getEnvironmentId()); + + // mutate while disconnected: subscriber should not see these yet + service.items.push('b'); // v2 + service.items.push('c'); // v3 + await waitOneTickAfter(); + + // reconnect the env and ask to resubscribe remote values + comServer.registerEnv('client', hostClient); + // reconnect is a method on the service side; it expects the origin env id (subscriber) as input + await comClient.reconnectRemoteValues('client'); + + await waitFor(() => { + expect(events.length, 'received reconnection sync').to.be.greaterThan(2); + const last = events[events.length - 1]; + expect(last?.modifier, 'reconnect emits full snapshot').to.equal('all'); + expect(last?.version, 'reconnected to latest version').to.equal(3); + expect(last?.value, 'reconnected to latest items').to.eql(['a', 'b', 'c']); + }); + }); + }); + it('multi communication', async () => { const host = new BaseHost(); const main = new Communication(host, 'main');