Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions packages/core/src/com/communication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ export class Communication {
const callback = args[0] as UnknownFunction;
let wrapedCallback = fnHandlers.get(callback);
if (!wrapedCallback) {
wrapedCallback = (value, version) => {
wrapedCallback = (value, version, ...callbackArgs) => {
this.remoteValueTracking.set(handlerId, {
currentVersion: version,
reconnect,
});
return callback(value, version);
return callback(value, version, ...callbackArgs);
};
fnHandlers.set(callback, wrapedCallback);
}
Expand Down Expand Up @@ -929,7 +929,7 @@ export class Communication {
return;
}
for (const handler of this.handlers.get(handlerId)?.callbacks || []) {
handler(res.value, res.version);
handler(res.value, res.version, 'all');
}
});
}
Expand Down Expand Up @@ -1085,16 +1085,6 @@ export class Communication {

return handlerId;
}

private isRemoteValueSubscription(method: string): boolean {
return method.endsWith('.subscribe') || method.endsWith('.stream');
}

private getRemoteValuePropertyName(method: string): string {
// Extract property name from 'propertyName.subscribe' or 'propertyName.stream'
const parts = method.split('.');
return parts.slice(0, -1).join('.');
}
private createCallbackRecord(
message: Message,
callbackId: string,
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/com/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { RemoteAggregatedValue, AsyncRemoteAggregatedValue } from '../remote-aggregated-value.js';
import { AsyncRemoteValue, RemoteValue } from '../remote-value.js';
import { SERVICE_CONFIG } from '../symbols.js';
import { Message } from './message-types.js';
Expand Down Expand Up @@ -67,7 +68,9 @@ export type AsyncApi<T extends object> = {
? (...args: Args) => Promise<R>
: T[P] extends RemoteValue<infer X>
? AsyncRemoteValue<X>
: never;
: T[P] extends RemoteAggregatedValue<infer X>
? AsyncRemoteAggregatedValue<X>
: never;
} & {
[K in Extract<keyof T, keyof object>]: never;
};
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export * from './symbols.js';
export * from './types.js';
export * from './communication.feature.js';
export * from './remote-value.js';
export * from './remote-aggregated-value.js';

export * from './runtime-main.js';
export * from './runtime-configurations.js';
Expand Down
61 changes: 61 additions & 0 deletions packages/core/src/remote-aggregated-value.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
export type RemoteAggregatedValueListener<T> = (data: T | T[], version: number, modifier: 'item' | 'all') => void;

export type AsyncRemoteAggregatedValue<T> = {
getValue: () => Promise<T[]>;
subscribe: (handler: RemoteAggregatedValueListener<T>) => void;
unsubscribe: (handler: RemoteAggregatedValueListener<T>) => void;
};

export class RemoteAggregatedValue<T> {
private allItems: T[] = [];
private limit: number;
private handlers = new Set<RemoteAggregatedValueListener<T>>();
private version: number = 0;

constructor({ limit = 10 }: { limit?: number } = {}) {
this.limit = limit;
}

getValue = (): T[] => {
return this.allItems;
};

subscribe = (handler: RemoteAggregatedValueListener<T>) => {
handler(this.allItems, this.version, 'all');
this.handlers.add(handler);
};

unsubscribe = (handler: RemoteAggregatedValueListener<T>) => {
this.handlers.delete(handler);
};

push(item: T) {
this.allItems.push(item);
if (this.allItems.length > this.limit) {
this.allItems.shift();
}
this.version++;
for (const handler of this.handlers) {
handler(item, this.version, 'item');
}
}

clear() {
this.allItems = [];
this.version++;
for (const handler of this.handlers) {
handler([], this.version, 'all');
}
}

/**
* Reconnect method to sync version and retrieve latest value if needed.
* Returns the latest value and version if there's a mismatch, otherwise returns null.
*/
reconnect = (currentVersion: number): { value: T[]; version: number } | null => {
if (currentVersion !== this.version) {
return { value: this.allItems, version: this.version };
}
return null;
};
}
182 changes: 180 additions & 2 deletions packages/core/test/node/com.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Environment,
Feature,
RemoteValue,
RemoteAggregatedValue,
RuntimeEngine,
SERVICE_CONFIG,
Service,
Expand Down Expand Up @@ -246,14 +247,12 @@ describe('Communication', () => {
service.counter.setValueAndNotify(20);
await waitOneTickAfter();
expect(events, 'received update before reconnect').to.eql([{ value: 20, version: 1 }]);

comServer.clearEnvironment(comClient.getEnvironmentId());

service.counter.setValueAndNotify(30);
service.counter.setValueAndNotify(50);

comServer.registerEnv('client', hostClient);

await comClient.reconnectRemoteValues('server');

await waitFor(() => {
Expand All @@ -266,6 +265,185 @@ describe('Communication', () => {
});
});

describe('RemoteAggregatedValue', () => {
interface ServiceWithRemoteAggregatedValue {
items: RemoteAggregatedValue<string>;
}

function setupCrossEnvCommunication() {
const hostServer = new BaseHost();
const hostClient = new BaseHost();
const comServer = new Communication(hostServer, 'server');
const comClient = new Communication(hostClient, 'client');

comServer.registerEnv('client', hostClient);
comClient.registerEnv('server', hostServer);

return { comServer, comClient, hostServer, hostClient };
}

function createAggregatedEventCollector<T>() {
const events: Array<{ value: T | T[]; version: number; modifier: 'item' | 'all' }> = [];
const handler = (value: T | T[], version: number, modifier: 'item' | 'all') => {
events.push({ value: structuredClone(value), version, modifier });
};
return { events, handler };
}

const waitOneTickAfter = () => sleep(0);

it('should start with an empty array', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });

const value = await proxy.items.getValue();

expect(value).to.eql([]);
});
it('should subscribe to current value immediately', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
service.items.push('a'); // version 1
service.items.push('b'); // version 2
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });
const { events, handler } = createAggregatedEventCollector<string>();

proxy.items.subscribe(handler);
await waitOneTickAfter();

expect(events).to.eql([{ value: ['a', 'b'], version: 2, modifier: 'all' }]);
});
it('should push items and notify subscribers with versions', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });
const { events, handler } = createAggregatedEventCollector<string>();

proxy.items.subscribe(handler);
await waitOneTickAfter();
service.items.push('a');
await waitOneTickAfter();
service.items.push('b');
await waitOneTickAfter();

expect(events).to.eql([
{ value: [], version: 0, modifier: 'all' },
{ value: 'a', version: 1, modifier: 'item' },
{ value: 'b', version: 2, modifier: 'item' },
]);
});
it('should respect the limit', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>({ limit: 3 }) };
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });

service.items.push('1');
service.items.push('2');
service.items.push('3');
service.items.push('4');
await waitOneTickAfter();

const value = await proxy.items.getValue();
expect(value).to.eql(['2', '3', '4']);
});
it('should unsubscribe correctly', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });
const { events, handler } = createAggregatedEventCollector<string>();

proxy.items.subscribe(handler);
await waitOneTickAfter();

service.items.push('a');
await waitOneTickAfter();

proxy.items.unsubscribe(handler);
await waitOneTickAfter();

service.items.push('b');
await waitOneTickAfter();

expect(events).to.eql([
{ value: [], version: 0, modifier: 'all' },
{ value: 'a', version: 1, modifier: 'item' },
]);
});
it('should clear items and notify subscribers', async () => {
const { comServer, comClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
comClient.registerAPI({ id: 'myService' }, service);
const proxy = comServer.apiProxy<ServiceWithRemoteAggregatedValue>({ id: 'client' }, { id: 'myService' });
const { events, handler } = createAggregatedEventCollector<string>();

proxy.items.subscribe(handler);
await waitOneTickAfter();

service.items.push('a');
await waitOneTickAfter();

service.items.clear();
await waitOneTickAfter();

expect(events).to.eql([
{ value: [], version: 0, modifier: 'all' },
{ value: 'a', version: 1, modifier: 'item' },
{ value: [], version: 2, modifier: 'all' },
]);

const valueAfterClear = await proxy.items.getValue();
expect(valueAfterClear, 'getValue after clear').to.eql([]);
});

it('should reconnect RemoteAggregatedValue subscriptions after env reconnect', async () => {
const { comServer, comClient, hostClient } = setupCrossEnvCommunication();
const service = { items: new RemoteAggregatedValue<string>() };
comServer.registerAPI({ id: 'myService' }, service);
const clientService = comClient.apiProxy<ServiceWithRemoteAggregatedValue>(
{ id: 'server' },
{ id: 'myService' },
);
const { events, handler } = createAggregatedEventCollector<string>();

// initial subscription -> should immediately receive current value
clientService.items.subscribe(handler);
await waitOneTickAfter();
expect(events, 'initial subscribe emits current value').to.eql([
{ value: [], version: 0, modifier: 'all' },
]);

// push one item while connected
service.items.push('a'); // v1
await waitOneTickAfter();
// simulate env disconnect
comServer.clearEnvironment(comClient.getEnvironmentId());

// mutate while disconnected: subscriber should not see these yet
service.items.push('b'); // v2
service.items.push('c'); // v3
await waitOneTickAfter();

// reconnect the env and ask to resubscribe remote values
comServer.registerEnv('client', hostClient);
// reconnect is a method on the service side; it expects the origin env id (subscriber) as input
await comClient.reconnectRemoteValues('client');

await waitFor(() => {
expect(events.length, 'received reconnection sync').to.be.greaterThan(2);
const last = events[events.length - 1];
expect(last?.modifier, 'reconnect emits full snapshot').to.equal('all');
expect(last?.version, 'reconnected to latest version').to.equal(3);
expect(last?.value, 'reconnected to latest items').to.eql(['a', 'b', 'c']);
});
});
});

it('multi communication', async () => {
const host = new BaseHost();
const main = new Communication(host, 'main');
Expand Down