Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 97 additions & 13 deletions packages/core/src/com/communication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import {
UnConfiguredMethodError,
UnknownCallbackIdError,
} from './communication-errors.js';
import { RemoteValue } from '../value-signal.js';

export interface ConfigEnvironmentRecord extends EnvironmentRecord {
registerMessageHandler?: boolean;
Expand Down Expand Up @@ -197,9 +198,9 @@ export class Communication {
serviceComConfig: ServiceComConfig<T> = {},
): AsyncApi<T> {
return new Proxy(Object.create(null), {
get: (obj, method) => {
get: (runtimeCache, key) => {
// let js runtime know that this is not thenable object
if (method === 'then') {
if (key === 'then') {
return undefined;
}

Expand All @@ -208,29 +209,103 @@ export class Communication {
* they used by the debugger and cause messages to be sent everywhere
* this behavior made debugging very hard and can cause errors and infinite loops
*/
if (Object.hasOwn(Object.prototype, method)) {
return Reflect.get(Object.prototype, method);
if (Object.hasOwn(Object.prototype, key)) {
return Reflect.get(Object.prototype, key);
}
if (typeof method === 'string') {
let runtimeMethod = obj[method];
if (!runtimeMethod) {
runtimeMethod = async (...args: unknown[]) =>
if (typeof key === 'string') {
let runtimeValue = runtimeCache[key];
if (!runtimeValue) {
runtimeValue = async (...args: unknown[]) =>
this.callMethod(
(await instanceToken).id,
api,
method,
key,
args,
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
obj[method] = runtimeMethod;
//////////// Signal handling ////////////
runtimeCache[key] = Object.assign(
runtimeValue,
this.createAsyncRemoteValue<T>(key, serviceComConfig, instanceToken, api),
);
}
return runtimeMethod;
return runtimeValue;
}
},
});
}

private createAsyncRemoteValue<T extends object>(
key: string,
serviceComConfig: ServiceComConfig<T>,
instanceToken: EnvironmentInstanceToken | Promise<EnvironmentInstanceToken>,
api: string,
) {
const subSignalId = key + '.' + 'subscribe';
const unsubSignalId = key + '.' + 'unsubscribe';
const streamSignalId = key + '.' + 'stream';
const getValueId = key + '.getValue';

(serviceComConfig as Record<string, AnyServiceMethodOptions>)[subSignalId] = {
emitOnly: true,
listener: true,
};
(serviceComConfig as Record<string, AnyServiceMethodOptions>)[unsubSignalId] = {
emitOnly: true,
removeListener: subSignalId,
};
(serviceComConfig as Record<string, AnyServiceMethodOptions>)[streamSignalId] = {
emitOnly: true,
listener: true,
};

const asyncRemoteValue = {
subscribe: async (...args: unknown[]) => {
return this.callMethod(
(await instanceToken).id,
api,
subSignalId,
args,
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
},
unsubscribe: async (fn: UnknownFunction) => {
return this.callMethod(
(await instanceToken).id,
api,
unsubSignalId,
[fn],
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
},
stream: async (fn: UnknownFunction) => {
return this.callMethod(
(await instanceToken).id,
api,
streamSignalId,
[fn],
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
},
getValue: async () => {
return this.callMethod(
(await instanceToken).id,
api,
getValueId,
[],
this.rootEnvId,
serviceComConfig as Record<string, AnyServiceMethodOptions>,
);
},
};

return asyncRemoteValue;
}

/**
* Add local handle event listener to Target.
*/
Expand Down Expand Up @@ -640,9 +715,18 @@ export class Communication {

private apiCall(origin: string, api: string, method: string, args: unknown[]): unknown {
if (this.apisOverrides[api]?.[method]) {
return this.apisOverrides[api][method](...[origin, ...args]);
return (this.apisOverrides[api][method] as UnknownFunction)(...[origin, ...args]);
}
if (method.includes('.')) {
const [apiName, methodName] = method.split('.') as [
string,
'subscribe' | 'unsubscribe' | 'getValue' | 'stream',
];
const signal = this.apis[api]![apiName] as RemoteValue<unknown>;
const fnAsrgs = args as [UnknownFunction];
return signal[methodName](...fnAsrgs);
}
return this.apis[api]![method]!(...args);
return (this.apis[api]![method] as UnknownFunction)(...args);
}

private unhandledMessage(message: Message): void {
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/com/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { SERVICE_CONFIG } from '../symbols.js';
import { AsyncRemoteValue, RemoteValue } from '../value-signal.js';
import { Message } from './message-types.js';

export type SerializableArguments = unknown[];
Expand Down Expand Up @@ -64,7 +65,9 @@ export type AsyncApi<T extends object> = {
? T[P]
: T[P] extends (...args: infer Args) => infer R
? (...args: Args) => Promise<R>
: never;
: T[P] extends RemoteValue<infer X>
? AsyncRemoteValue<X>
: never;
} & {
[K in Extract<keyof T, keyof object>]: never;
};
Expand Down Expand Up @@ -118,5 +121,5 @@ export interface APIService {
}

export interface RemoteAPIServicesMapping {
[remoteServiceId: string]: Record<string, AnyFunction>;
[remoteServiceId: string]: Record<string, AnyFunction | AsyncRemoteValue<any>>;
}
7 changes: 3 additions & 4 deletions packages/core/src/entities/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ export class Service<
providedFrom.has(Universal.env) || hasIntersection(providedFrom, runtimeEngine.runningEnvNames);
if (shouldIncludeService) {
if (!providedValue) {
throw new Error(
`Service is not provided at runtime.
throw new Error(`
Service is not provided at runtime.
Make sure the environment setup file exists and named correctly: [featureName].[envName].env.[ext]
Service name: ${entityKey}
Feature id: ${featureID}
Environment: ${runtimeEngine.entryEnvironment.env}
`,
);
`);
}
communication.registerAPI({ id: serviceKey }, providedValue);
return providedValue;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './runtime-feature.js';
export * from './symbols.js';
export * from './types.js';
export * from './communication.feature.js';
export * from './value-signal.js';

export * from './runtime-main.js';
export * from './runtime-configurations.js';
Expand Down
50 changes: 50 additions & 0 deletions packages/core/src/value-signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
export type Listener<T> = (data: T, version: number) => void;
export type AsyncRemoteValue<T> = {
getValue: () => Promise<T>;
stream: (handler: Listener<T>) => void;
subscribe: (handler: Listener<T>) => void;
unsubscribe: (handler: Listener<T>) => void;
};

export type RemotelyAccessibleSignalMethods = 'subscribe' | 'unsubscribe' | 'getValue';

export class RemoteValue<T> {
private handlers = new Set<Listener<T>>();
private value: T;
private version: number = 0;

constructor(initialValue: T) {
this.value = initialValue;
}

getValue = (): T => {
return this.value;
};

subscribe = (handler: Listener<T>) => {
this.handlers.add(handler);
};
stream = (handler: Listener<T>) => {
this.subscribe(handler);
handler(this.value, this.version);
};

unsubscribe = (handler: Listener<T>) => {
this.handlers.delete(handler);
};

/**
* Set the value and notify all subscribers with the new data.
* Only notifies if the value has changed.
*/
setValueAndNotify = (data: T) => {
if (this.value === data) {
return;
}
this.version++;
this.value = data;
for (const handler of this.handlers) {
handler(data, this.version);
}
};
}
40 changes: 40 additions & 0 deletions packages/core/test/node/com.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Slot,
declareComEmitter,
multiTenantMethod,
RemoteValue,
} from '@dazl/engine-core';
import * as chai from 'chai';
import { expect } from 'chai';
Expand Down Expand Up @@ -54,6 +55,45 @@ describe('Communication', () => {
expect(res).to.be.equal('Yoo!');
});

it('signal support', async () => {
class EchoServiceWithSignal {
onChange = new RemoteValue<string>('');
echo(s: string) {
return s;
}
}
const host = new BaseHost();
const main = new Communication(host, 'main');
const echoService = new EchoServiceWithSignal();
main.registerAPI({ id: 'echoService' }, echoService);

const proxy = main.apiProxy<EchoServiceWithSignal>(Promise.resolve({ id: 'main' }), { id: 'echoService' });
const out: string[] = [];
const versions: number[] = [];

const handler = (e: string, version: number) => {
out.push(e);
versions.push(version);
};
proxy.onChange.subscribe(handler);

// this code simulate cross environment communication it cannot be synchronous
await sleep(0);
echoService.onChange.setValueAndNotify('test');
//////////////////////////////////////////////////////////////////////////////

proxy.onChange.unsubscribe(handler);

// this code simulate cross environment communication it cannot be synchronous
await sleep(0);
echoService.onChange.setValueAndNotify('test 2');
await sleep(0);
//////////////////////////////////////////////////////////////////////////////

expect(versions).to.eql([1]);
expect(out).to.eql(['test']);
});

it('multi communication', async () => {
const host = new BaseHost();
const main = new Communication(host, 'main');
Expand Down