Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/com/communication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ export class Communication {
case 'ready':
this.handleReady(message);
break;
case 'connection_disrupted':
this.registerPendingEnvironment(message.origin);
break;
case 'dispose':
if (message.from !== this.rootEnvId) {
this.clearEnvironment(message.origin, message.from);
Expand Down
57 changes: 51 additions & 6 deletions packages/core/src/com/hosts/ws-client-host.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { io, Socket, type SocketOptions } from 'socket.io-client';
import { io, ManagerOptions, Socket, type SocketOptions } from 'socket.io-client';
import type { Message } from '../message-types.js';
import { BaseHost } from './base-host.js';
import { EventEmitter, IDisposable, SafeDisposable } from '@dazl/patterns';
import { deferred } from 'promise-assist';
import { deferred, type PromiseRejectCb, type PromiseResolveCb } from 'promise-assist';

export class WsClientHost extends BaseHost implements IDisposable {
private reinitCount = 0;
private disposables = new SafeDisposable(WsClientHost.name);
dispose = this.disposables.dispose;
isDisposed = this.disposables.isDisposed;
public connected: Promise<void>;
private socketClient: Socket;
public subscribers = new EventEmitter<{ disconnect: string; reconnect: void; connect: void }>();
private socketClient!: Socket;
public subscribers = new EventEmitter<{
disconnect: string;
reconnect: void;
connect: void;
'server-lost-client-state': void;
'server-connection-restored': void;
}>();
private stableClientId = crypto.randomUUID();

constructor(url: string, options?: Partial<SocketOptions>) {
constructor(url: string, options?: Partial<ManagerOptions & SocketOptions>) {
super();
this.disposables.add('close socket', () => this.socketClient.close());
this.disposables.add('clear subscribers', () => this.subscribers.clear());
Expand All @@ -23,27 +30,63 @@ export class WsClientHost extends BaseHost implements IDisposable {
const { promise, resolve, reject } = deferred();
this.connected = promise;

this.initSocketIO(url, path, query, options, reject, resolve);
}

private initSocketIO(
url: string,
path: string | undefined,
query: { [k: string]: string },
options: Partial<ManagerOptions & SocketOptions> | undefined,
reject: PromiseRejectCb,
resolve: PromiseResolveCb<void>,
) {
this.socketClient = io(url, {
transports: ['websocket'],
withCredentials: true, // Pass Cookie to socket io connection
path,
query,
forceNew: true,
auth: {
clientId: this.stableClientId,
},
...options,
});

this.socketClient.once('connect_error', (error) => {
if (error.message === 'timeout' && this.reinitCount < 3) {
this.reinitCount++;
this.socketClient.close();
this.initSocketIO(
url,
path,
query,
{
...options,
timeout: (options?.timeout ?? 20000) * (this.reinitCount + 1),
},
reject,
resolve,
);
return;
}
reject(new Error(`Failed to connect to socket server`, { cause: error }));
});

this.socketClient.on('connect', () => {
this.reinitCount = Infinity;
this.subscribers.emit('connect', undefined);
resolve();
});

this.socketClient.on('message', (data: unknown) => {
if (
typeof data === 'string' &&
(data === 'server-lost-client-state' || data === 'server-connection-restored')
) {
this.subscribers.emit(data, undefined);
return;
}
this.emitMessageHandlers(data as Message);
});

Expand All @@ -61,7 +104,9 @@ export class WsClientHost extends BaseHost implements IDisposable {
public postMessage(data: any) {
this.socketClient.emit('message', data);
}

close() {
this.socketClient.close();
}
disconnectSocket() {
if (this.socketClient.connected) {
this.socketClient.disconnect();
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/com/message-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export interface ReadyMessage extends BaseMessage {
type: 'ready';
}

export interface ConnectionDisruptedMessage extends BaseMessage {
type: 'connection_disrupted';
}

export interface DisposeMessage extends BaseMessage {
type: 'dispose';
}
Expand All @@ -64,6 +68,7 @@ export type Message =
| UnListenMessage
| EventMessage
| ReadyMessage
| ConnectionDisruptedMessage
| DisposeMessage
| StatusMessage;

Expand Down
5 changes: 3 additions & 2 deletions packages/runtime-node/src/node-env-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { IDisposable, SetMultiMap } from '@dazl/patterns';
import { fileURLToPath } from 'node:url';
import { parseArgs } from 'node:util';
import { extname } from 'node:path';
import { WsServerHost } from './ws-node-host.js';
import { WsNodeOptions, WsServerHost } from './ws-node-host.js';
import { ILaunchHttpServerOptions, launchEngineHttpServer } from './launch-http-server.js';
import { workerThreadInitializer2 } from './worker-thread-initializer2.js';
import { bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js';
Expand Down Expand Up @@ -50,6 +50,7 @@ export class NodeEnvManager implements IDisposable {
public async autoLaunch(
runtimeOptions: Map<string, string | boolean | undefined>,
serverOptions: ILaunchHttpServerOptions = {},
hostOptions: WsNodeOptions = {},
) {
process.env.ENGINE_FLOW_V2_DIST_URL = this.importMeta.url;
const disposeMetricsListener = bindMetricsListener(() => this.collectMetricsFromAllOpenEnvironments());
Expand All @@ -59,7 +60,7 @@ export class NodeEnvManager implements IDisposable {
const { port, socketServer, app, close } = await launchEngineHttpServer({ staticDirPath, ...serverOptions });
runtimeOptions.set('enginePort', port.toString());

const clientsHost = new WsServerHost(socketServer);
const clientsHost = new WsServerHost(socketServer, hostOptions);
clientsHost.addEventListener('message', handleRegistrationOnMessage);
const forwardingCom = new Communication(clientsHost, 'clients-host-com');
function handleRegistrationOnMessage({ data }: { data: Message }) {
Expand Down
55 changes: 46 additions & 9 deletions packages/runtime-node/src/ws-node-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ export class WsHost extends BaseHost {
type ClientEnvId = string;
type ClientId = string;

export interface WsNodeOptions {
disposeGraceMs?: number;
}

export class WsServerHost extends BaseHost implements IDisposable {
private clients = new Map<
ClientId,
{
socket: io.Socket;
socket?: io.Socket;
namespacedEnvIds: Set<ClientEnvId>;
disposeTimer?: NodeJS.Timeout;
disposed: boolean;
}
>();
private disposables = new SafeDisposable(WsServerHost.name);
Expand All @@ -33,7 +38,7 @@ export class WsServerHost extends BaseHost implements IDisposable {

constructor(
private server: io.Server | io.Namespace,
config: { disposeGraceMs?: number } = {},
config: WsNodeOptions = {},
) {
super();
this.disposeGraceMs = config.disposeGraceMs ?? 120_000;
Expand Down Expand Up @@ -63,6 +68,18 @@ export class WsServerHost extends BaseHost implements IDisposable {
};
}

private emitConnectionDisruptedMessagesForClient(namespacedEnvIds: Set<ClientEnvId>): void {
for (const envId of namespacedEnvIds) {
this.emitMessageHandlers({
type: 'connection_disrupted',
from: envId,
origin: envId,
to: '*',
forwardingChain: [],
});
}
}

private emitDisposeMessagesForClient(namespacedEnvIds: Set<ClientEnvId>): void {
for (const envId of namespacedEnvIds) {
this.emitMessageHandlers({
Expand All @@ -83,7 +100,7 @@ export class WsServerHost extends BaseHost implements IDisposable {

if (client) {
data.to = parsed.envId;
client.socket.emit('message', data);
client.socket?.emit('message', data);
return;
}
}
Expand All @@ -110,21 +127,35 @@ export class WsServerHost extends BaseHost implements IDisposable {
existingClient.disposeTimer = undefined;
}

// remove old socket listeners
existingClient.socket.removeAllListeners();
// Update socket reference
existingClient.socket = socket;
} else {

if (existingClient.disposed) {
socket.send('server-lost-client-state');
existingClient.disposed = false;
} else {
socket.send('server-connection-restored');
existingClient.namespacedEnvIds.forEach((envId) => {
this.emitMessageHandlers({
type: 'ready',
from: envId,
origin: envId,
to: '*',
forwardingChain: [],
});
});
}
} else if (!existingClient) {
// New connection: create client entry
this.clients.set(clientId, {
socket,
namespacedEnvIds: new Set(),
disposed: false,
});
}

const onMessage = (message: Message): void => {
const client = this.clients.get(clientId);
if (!client) return;
if (!client || client.disposed) return;
// Namespace the env IDs with stableClientId to differentiate between clients
const namespacedFrom = `${clientId}/${message.from}`;
const namespacedOrigin = `${clientId}/${message.origin}`;
Expand All @@ -147,12 +178,18 @@ export class WsServerHost extends BaseHost implements IDisposable {
const client = this.clients.get(clientId);
if (!client) return;

// set client as pending so that messages are queued for it
this.emitConnectionDisruptedMessagesForClient(client.namespacedEnvIds);

// Delay dispose to allow for socket recovery
client.disposeTimer = setTimeout(() => {
const clientToDispose = this.clients.get(clientId);
if (!clientToDispose) return;

this.clients.delete(clientId);
clientToDispose.disposed = true;
clientToDispose.socket?.removeAllListeners();
clientToDispose.socket = undefined;

this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds);
}, this.disposeGraceMs);
});
Expand Down
49 changes: 33 additions & 16 deletions packages/runtime-node/test/node-com.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,23 @@ describe('Socket communication', () => {
serverHost.addEventListener('message', spyServer);
clientHost1.addEventListener('message', spyClient1);
await clientHost2.dispose();

// First, connection_disrupted is sent immediately
await waitForServerCall(([arg]) => {
const message = arg.data;
expect(message.type).to.eql('connection_disrupted');
expect(message.from).to.include('/client-host2');
expect(message.origin).to.include('/client-host2');
});

// Then, after grace period, dispose message is sent
await waitForServerCall(([arg]) => {
const message = arg.data as DisposeMessage;
expect(message.type).to.eql('dispose');
expect(message.from).to.include('/client-host2');
expect(message.origin).to.include('/client-host2');
});

await waitForClient1Call(([arg]) => {
const message = arg.data as DisposeMessage;
expect(message.type).to.eql('dispose');
Expand All @@ -295,7 +306,7 @@ describe('Socket communication', () => {

it('should handle client reconnection and cancel delayed dispose', async () => {
const COMMUNICATION_ID = 'reconnect-test';
const { spy: disposeSpy } = createWaitForCall<(ev: { data: Message }) => void>('dispose');
const { spy: messageSpy } = createWaitForCall<(ev: { data: Message }) => void>('message');
const { waitForCall: waitForConnect, spy: connectSpy } = createWaitForCall<() => void>('connect');

const clientCom = new Communication(clientHost, 'client-host', serverTopology);
Expand All @@ -313,8 +324,8 @@ describe('Socket communication', () => {
const methods = clientCom.apiProxy<ICommunicationTestApi>({ id: 'server-host' }, { id: COMMUNICATION_ID });
expect(await methods.sayHello()).to.eql('hello');

// Listen for dispose & reconnect messages
serverHost.addEventListener('message', disposeSpy);
// Listen for messages & reconnect events
serverHost.addEventListener('message', messageSpy);
clientHost.subscribers.on('connect', connectSpy);

// Disconnect and quickly reconnect (before dispose delay expires)
Expand All @@ -328,8 +339,12 @@ describe('Socket communication', () => {
// Wait a bit more than the dispose delay to ensure dispose timer would have fired
await sleep(disposeGraceMs * 2);

// Verify no dispose message was sent (since reconnection cancelled it)
expect(disposeSpy.callCount).to.eql(0);
// Count only dispose messages (connection_disrupted is sent, but dispose should not be)
const disposeMessages = messageSpy.getCalls().filter((call) => {
const message = call.args[0].data as Message;
return message.type === 'dispose';
});
expect(disposeMessages.length).to.eql(0);

// Verify communication still works after reconnection
expect(await methods.sayHello()).to.eql('hello');
Expand All @@ -338,8 +353,8 @@ describe('Socket communication', () => {

it('should emit dispose message if client does not reconnect within dispose delay', async () => {
const COMMUNICATION_ID = 'dispose-test';
const { waitForCall: waitForDispose, spy: disposeSpy } =
createWaitForCall<(ev: { data: Message }) => void>('dispose');
const { waitForCall: waitForMessage, spy: messageSpy } =
createWaitForCall<(ev: { data: Message }) => void>('message');

const clientCom = new Communication(clientHost, 'client-host', serverTopology);
const serverCom = new Communication(serverHost, 'server-host');
Expand All @@ -356,20 +371,22 @@ describe('Socket communication', () => {
const methods = clientCom.apiProxy<ICommunicationTestApi>({ id: 'server-host' }, { id: COMMUNICATION_ID });
expect(await methods.sayHello()).to.eql('hello');

// Register dispose listener after initial communication
serverHost.addEventListener('message', disposeSpy);
// Register message listener after initial communication
serverHost.addEventListener('message', messageSpy);

// Disconnect without reconnecting
clientHost.disconnectSocket();
expect(clientHost.isConnected()).to.eql(false);

// Wait for dispose message to be emitted after delay
await waitForDispose(([arg]) => {
const message = arg.data as DisposeMessage;
expect(message.type).to.eql('dispose');
expect(message.from).to.include('/client-host');
expect(message.origin).to.include('/client-host');
return true;
// Wait for dispose message (filtering out connection_disrupted)
await waitForMessage(([arg]) => {
const message = arg.data;
if (message.type === 'dispose') {
expect(message.from).to.include('/client-host');
expect(message.origin).to.include('/client-host');
return true;
}
return false;
});

// Verify that connection can be established again after dispose
Expand Down