Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 137 additions & 16 deletions packages/core/src/com/communication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ import {
UnConfiguredMethodError,
UnknownCallbackIdError,
} from './communication-errors.js';
import {
type RemoteValueAsyncMethods,
type RemoteValue,
remoteValueAsyncMethods,
ReconnectFunction,
RemoteValueListener,
} from '../remote-value.js';

export interface ConfigEnvironmentRecord extends EnvironmentRecord {
registerMessageHandler?: boolean;
Expand Down Expand Up @@ -94,6 +101,8 @@ export class Communication {
private messageIdPrefix: string;
// manual DEBUG_MODE
private DEBUG = false;
// Track RemoteValue subscriptions with their current versions
private remoteValueTracking = new Map<string, { currentVersion: number; reconnect: ReconnectFunction<unknown> }>();
constructor(
private host: Target,
id: string,
Expand Down Expand Up @@ -196,9 +205,9 @@ export class Communication {
serviceComConfig: ServiceComConfig<T> = {},
): AsyncApi<T> {
return new Proxy(Object.create(null), {
get: (obj, method) => {
get: (runtimeCache, key) => {
// let js runtime know that this is not thenable object
if (method === 'then') {
if (key === 'then') {
return undefined;
}

Expand All @@ -207,28 +216,95 @@ export class Communication {
* they used by the debugger and cause messages to be sent everywhere
* this behavior made debugging very hard and can cause errors and infinite loops
*/
if (Object.hasOwn(Object.prototype, method)) {
return Reflect.get(Object.prototype, method);
if (Object.hasOwn(Object.prototype, key)) {
return Reflect.get(Object.prototype, key);
}
if (typeof method === 'string') {
let runtimeMethod = obj[method];
if (!runtimeMethod) {
runtimeMethod = async (...args: unknown[]) =>
if (typeof key === 'string') {
let runtimeValue = runtimeCache[key];
if (!runtimeValue) {
runtimeValue = async (...args: unknown[]) =>
this.callMethod(
(await instanceToken).id,
api,
method,
key,
args,
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
obj[method] = runtimeMethod;
runtimeCache[key] = Object.assign(
runtimeValue,
this.createAsyncRemoteValue<T>(key, serviceComConfig, instanceToken, api),
);
}
return runtimeMethod;
return runtimeValue;
}
},
});
}
private createAsyncRemoteValue<T extends object>(
key: string,
serviceComConfig: ServiceComConfig<T>,
instanceToken: EnvironmentInstanceToken | Promise<EnvironmentInstanceToken>,
api: string,
) {
const config = serviceComConfig as Record<string, AnyServiceMethodOptions>;
const methods = {
subscribe: `${key}.subscribe`,
unsubscribe: `${key}.unsubscribe`,
stream: `${key}.stream`,
getValue: `${key}.getValue`,
};

config[methods.subscribe] = { emitOnly: true, listener: true };
config[methods.stream] = { emitOnly: true, listener: true };
config[methods.unsubscribe] = { emitOnly: true, removeListener: methods.subscribe };

const createMethod =
(methodName: string) =>
async (...args: unknown[]) => {
return this.callMethod((await instanceToken).id, api, methodName, args, this.rootEnvId, config);
};

const fnHandlers = new WeakMap<RemoteValueListener<unknown>, RemoteValueListener<unknown>>();
const reconnect = createMethod(`${key}.reconnect`) as ReconnectFunction<unknown>;
const createHandlerMethod = (methodName: string) => {
return async (...args: unknown[]) => {
const envId = (await instanceToken).id;
const handlerId = this.getHandlerId(envId, api, methodName);
const callback = args[0] as UnknownFunction;
let wrapedCallback = fnHandlers.get(callback);
if (!wrapedCallback) {
wrapedCallback = (value, version) => {
this.remoteValueTracking.set(handlerId, {
currentVersion: version,
reconnect,
});
return callback(value, version);
};
fnHandlers.set(callback, wrapedCallback);
}
args[0] = wrapedCallback;
return this.callMethod(envId, api, methodName, args, this.rootEnvId, config);
};
};

return {
subscribe: createHandlerMethod(methods.subscribe),
stream: createHandlerMethod(methods.stream),
unsubscribe: async (fn: RemoteValueListener<unknown>) => {
const wrapedCallback = fnHandlers.get(fn);
return this.callMethod(
(await instanceToken).id,
api,
methods.unsubscribe,
[wrapedCallback],
this.rootEnvId,
config,
);
},
getValue: createMethod(methods.getValue),
};
}

/**
* Add local handle event listener to Target.
Expand Down Expand Up @@ -634,11 +710,23 @@ export class Communication {
this.post(env.host, message);
}

private apiCall(origin: string, api: string, method: string, args: unknown[]): unknown {
if (this.apisOverrides[api]?.[method]) {
return this.apisOverrides[api][method](...[origin, ...args]);
private apiCall(origin: string, api: string, callPath: string, args: unknown[]): unknown {
const method = this.apisOverrides[api]?.[callPath];
if (typeof method === 'function') {
return method(...[origin, ...args]);
}
// support for RemoteValue
const [apiName, ...subActions] = callPath.split('.');
if (
apiName &&
subActions.length === 1 &&
remoteValueAsyncMethods.has(subActions[0] as RemoteValueAsyncMethods)
) {
const remoteValue = this.apis[api]![apiName] as RemoteValue<unknown>;
const methodName = subActions[0] as RemoteValueAsyncMethods;
return (remoteValue[methodName] as UnknownFunction)(...args);
}
return this.apis[api]![method]!(...args);
return (this.apis[api]![callPath] as UnknownFunction)(...args);
}

private unhandledMessage(message: Message): void {
Expand All @@ -659,7 +747,6 @@ export class Communication {
rej: (reason: unknown) => void,
) {
const removeListenerRef = methodConfig?.removeAllListeners || methodConfig?.removeListener;

if (removeListenerRef && !methodConfig?.listener) {
const listenerHandlerId = this.getHandlerId(envId, api, removeListenerRef);
const listenerHandlersBucket = this.handlers.get(listenerHandlerId);
Expand Down Expand Up @@ -825,6 +912,28 @@ export class Communication {
}
}
}

/**
* Reconnect all RemoteValue subscriptions for a given environment.
* Called automatically when an environment reconnects.
*/
async reconnectRemoteValues(envId: string) {
for (const [handlerId, tracking] of this.remoteValueTracking.entries()) {
if (this.DEBUG) {
console.debug(
`Reconnected RemoteValue for handler ${handlerId} in from ${envId} to version ${tracking.currentVersion}`,
);
}
await tracking.reconnect(tracking.currentVersion).then((res) => {
if (!res) {
return;
}
for (const handler of this.handlers.get(handlerId)?.callbacks || []) {
handler(res.value, res.version);
}
});
}
}
private async handleUnListen(message: UnListenMessage) {
const namespacedHandlerId = message.handlerId + message.origin;
const dispatcher = this.eventDispatchers.get(namespacedHandlerId)?.dispatcher;
Expand Down Expand Up @@ -966,14 +1075,26 @@ export class Communication {
this.subscribeToEnvironmentDispose((disposedEnvId) => {
if (envId === disposedEnvId) {
this.handlers.delete(handlerId);
this.remoteValueTracking.delete(handlerId);
}
});
}
handlersBucket
? handlersBucket.callbacks.add(fn)
: this.handlers.set(handlerId, { message, callbacks: new Set([fn]) });

return handlerId;
}

private isRemoteValueSubscription(method: string): boolean {
return method.endsWith('.subscribe') || method.endsWith('.stream');
}

private getRemoteValuePropertyName(method: string): string {
// Extract property name from 'propertyName.subscribe' or 'propertyName.stream'
const parts = method.split('.');
return parts.slice(0, -1).join('.');
}
private createCallbackRecord(
message: Message,
callbackId: string,
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/com/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { AsyncRemoteValue, RemoteValue } from '../remote-value.js';
import { SERVICE_CONFIG } from '../symbols.js';
import { Message } from './message-types.js';

Expand Down Expand Up @@ -64,7 +65,9 @@ export type AsyncApi<T extends object> = {
? T[P]
: T[P] extends (...args: infer Args) => infer R
? (...args: Args) => Promise<R>
: never;
: T[P] extends RemoteValue<infer X>
? AsyncRemoteValue<X>
: never;
} & {
[K in Extract<keyof T, keyof object>]: never;
};
Expand Down Expand Up @@ -118,5 +121,5 @@ export interface APIService {
}

export interface RemoteAPIServicesMapping {
[remoteServiceId: string]: Record<string, AnyFunction>;
[remoteServiceId: string]: Record<string, AnyFunction | AsyncRemoteValue<any>>;
}
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './runtime-feature.js';
export * from './symbols.js';
export * from './types.js';
export * from './communication.feature.js';
export * from './remote-value.js';

export * from './runtime-main.js';
export * from './runtime-configurations.js';
Expand Down
73 changes: 73 additions & 0 deletions packages/core/src/remote-value.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
export type RemoteValueListener<T> = (data: T, version: number) => void;
export type ReconnectFunction<T> = (currentVersion: number) => Promise<{
value: T;
version: number;
} | null>;

export type AsyncRemoteValue<T> = {
getValue: () => Promise<T>;
stream: (handler: RemoteValueListener<T>) => void;
subscribe: (handler: RemoteValueListener<T>) => void;
unsubscribe: (handler: RemoteValueListener<T>) => void;
};

export const remoteValueAsyncMethods = new Set([
'getValue',
'stream',
'subscribe',
'unsubscribe',
'reconnect',
] as const);
export type RemoteValueAsyncMethods = typeof remoteValueAsyncMethods extends Set<infer U> ? U : never;

export class RemoteValue<T> {
private handlers = new Set<RemoteValueListener<T>>();
private value: T;
private version: number = 0;

constructor(initialValue: T) {
this.value = initialValue;
}

getValue = (): T => {
return this.value;
};

subscribe = (handler: RemoteValueListener<T>) => {
this.handlers.add(handler);
};
stream = (handler: RemoteValueListener<T>) => {
this.subscribe(handler);
handler(this.value, this.version);
};

unsubscribe = (handler: RemoteValueListener<T>) => {
this.handlers.delete(handler);
};

/**
* Set the value and notify all subscribers with the new data.
* Only notifies if the value has changed.
*/
setValueAndNotify = (data: T) => {
if (this.value === data) {
return;
}
this.version++;
this.value = data;
for (const handler of this.handlers) {
handler(data, this.version);
}
};

/**
* Reconnect method to sync version and retrieve latest value if needed.
* Returns the latest value and version if there's a mismatch, otherwise returns null.
*/
reconnect = (currentVersion: number): { value: T; version: number } | null => {
if (currentVersion !== this.version) {
return { value: this.value, version: this.version };
}
return null;
};
}
Loading