From 32f9fddd7332c0d16eb356e1d51b4be6a38cf7e5 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 08:18:59 +0200 Subject: [PATCH 1/6] feat: place client in new "connection_disrupted" mode after disconnect and during the server re-connect grace time - wsClientHost sends a "server-lost-client-state" or "server-connection-restored" to a reconnecting client to inform it of its server state - set shorter default reconnect times to ws client - add a close method the WsClientHost --- packages/core/src/com/communication.ts | 3 ++ packages/core/src/com/hosts/ws-client-host.ts | 24 +++++++++++- packages/core/src/com/message-types.ts | 5 +++ packages/runtime-node/src/node-env-manager.ts | 3 +- packages/runtime-node/src/ws-node-host.ts | 39 +++++++++++++++++-- 5 files changed, 68 insertions(+), 6 deletions(-) diff --git a/packages/core/src/com/communication.ts b/packages/core/src/com/communication.ts index 43e4a0590..ae991cca5 100644 --- a/packages/core/src/com/communication.ts +++ b/packages/core/src/com/communication.ts @@ -349,6 +349,9 @@ export class Communication { case 'ready': this.handleReady(message); break; + case 'connection_disrupted': + this.registerPendingEnvironment(message.origin); + break; case 'dispose': if (message.from !== this.rootEnvId) { this.clearEnvironment(message.origin, message.from); diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index 707986d34..f08304a24 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -10,7 +10,13 @@ export class WsClientHost extends BaseHost implements IDisposable { isDisposed = this.disposables.isDisposed; public connected: Promise; private socketClient: Socket; - public subscribers = new EventEmitter<{ disconnect: string; reconnect: void; connect: void }>(); + public subscribers = new EventEmitter<{ + disconnect: string; + reconnect: void; + connect: void; + 'server-lost-client-state': void; + 'server-connection-restored': void; + }>(); private stableClientId = crypto.randomUUID(); constructor(url: string, options?: Partial) { @@ -28,9 +34,14 @@ export class WsClientHost extends BaseHost implements IDisposable { withCredentials: true, // Pass Cookie to socket io connection path, query, + forceNew: true, auth: { clientId: this.stableClientId, }, + randomizationFactor: 0.1, + reconnectionDelay: 100, + reconnectionDelayMax: 1000, + timeout: 3000, // Connection attempt timeout ...options, }); @@ -44,6 +55,13 @@ export class WsClientHost extends BaseHost implements IDisposable { }); this.socketClient.on('message', (data: unknown) => { + if ( + typeof data === 'string' && + (data === 'server-lost-client-state' || data === 'server-connection-restored') + ) { + this.subscribers.emit(data, undefined); + return; + } this.emitMessageHandlers(data as Message); }); @@ -61,7 +79,9 @@ export class WsClientHost extends BaseHost implements IDisposable { public postMessage(data: any) { this.socketClient.emit('message', data); } - + close() { + this.socketClient.close(); + } disconnectSocket() { if (this.socketClient.connected) { this.socketClient.disconnect(); diff --git a/packages/core/src/com/message-types.ts b/packages/core/src/com/message-types.ts index 03e4c7c20..cc1d6a2a7 100644 --- a/packages/core/src/com/message-types.ts +++ b/packages/core/src/com/message-types.ts @@ -49,6 +49,10 @@ export interface ReadyMessage extends BaseMessage { type: 'ready'; } +export interface ConnectionDisruptedMessage extends BaseMessage { + type: 'connection_disrupted'; +} + export interface DisposeMessage extends BaseMessage { type: 'dispose'; } @@ -64,6 +68,7 @@ export type Message = | UnListenMessage | EventMessage | ReadyMessage + | ConnectionDisruptedMessage | DisposeMessage | StatusMessage; diff --git a/packages/runtime-node/src/node-env-manager.ts b/packages/runtime-node/src/node-env-manager.ts index c17ec6a63..87a3042b8 100644 --- a/packages/runtime-node/src/node-env-manager.ts +++ b/packages/runtime-node/src/node-env-manager.ts @@ -50,6 +50,7 @@ export class NodeEnvManager implements IDisposable { public async autoLaunch( runtimeOptions: Map, serverOptions: ILaunchHttpServerOptions = {}, + hostOptions: { disposeGraceMs?: number } = {}, ) { process.env.ENGINE_FLOW_V2_DIST_URL = this.importMeta.url; const disposeMetricsListener = bindMetricsListener(() => this.collectMetricsFromAllOpenEnvironments()); @@ -59,7 +60,7 @@ export class NodeEnvManager implements IDisposable { const { port, socketServer, app, close } = await launchEngineHttpServer({ staticDirPath, ...serverOptions }); runtimeOptions.set('enginePort', port.toString()); - const clientsHost = new WsServerHost(socketServer); + const clientsHost = new WsServerHost(socketServer, hostOptions); clientsHost.addEventListener('message', handleRegistrationOnMessage); const forwardingCom = new Communication(clientsHost, 'clients-host-com'); function handleRegistrationOnMessage({ data }: { data: Message }) { diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index be4a31a20..d3d19a000 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -24,6 +24,7 @@ export class WsServerHost extends BaseHost implements IDisposable { socket: io.Socket; namespacedEnvIds: Set; disposeTimer?: NodeJS.Timeout; + disposed: boolean; } >(); private disposables = new SafeDisposable(WsServerHost.name); @@ -63,6 +64,18 @@ export class WsServerHost extends BaseHost implements IDisposable { }; } + private emitConnectionDisruptedMessagesForClient(namespacedEnvIds: Set): void { + for (const envId of namespacedEnvIds) { + this.emitMessageHandlers({ + type: 'connection_disrupted', + from: envId, + origin: envId, + to: '*', + forwardingChain: [], + }); + } + } + private emitDisposeMessagesForClient(namespacedEnvIds: Set): void { for (const envId of namespacedEnvIds) { this.emitMessageHandlers({ @@ -114,17 +127,34 @@ export class WsServerHost extends BaseHost implements IDisposable { existingClient.socket.removeAllListeners(); // Update socket reference existingClient.socket = socket; - } else { + + if (existingClient.disposed) { + socket.send('server-lost-client-state'); + existingClient.disposed = false; + } else { + socket.send('server-connection-restored'); + existingClient.namespacedEnvIds.forEach((envId) => { + this.emitMessageHandlers({ + type: 'ready', + from: envId, + origin: envId, + to: '*', + forwardingChain: [], + }); + }); + } + } else if (!existingClient) { // New connection: create client entry this.clients.set(clientId, { socket, namespacedEnvIds: new Set(), + disposed: false, }); } const onMessage = (message: Message): void => { const client = this.clients.get(clientId); - if (!client) return; + if (!client || client.disposed) return; // Namespace the env IDs with stableClientId to differentiate between clients const namespacedFrom = `${clientId}/${message.from}`; const namespacedOrigin = `${clientId}/${message.origin}`; @@ -147,12 +177,15 @@ export class WsServerHost extends BaseHost implements IDisposable { const client = this.clients.get(clientId); if (!client) return; + // set client as pending so that messages are queued for it + this.emitConnectionDisruptedMessagesForClient(client.namespacedEnvIds); + // Delay dispose to allow for socket recovery client.disposeTimer = setTimeout(() => { const clientToDispose = this.clients.get(clientId); if (!clientToDispose) return; - this.clients.delete(clientId); + clientToDispose.disposed = true; this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds); }, this.disposeGraceMs); }); From aeb5bc870758c202c6d103def0064fa1428d915a Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 08:51:30 +0200 Subject: [PATCH 2/6] test: fix node-com tests for graceful reconnection handling --- packages/runtime-node/test/node-com.unit.ts | 49 ++++++++++++++------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index 4a1a927e4..b51639bb4 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -279,12 +279,23 @@ describe('Socket communication', () => { serverHost.addEventListener('message', spyServer); clientHost1.addEventListener('message', spyClient1); await clientHost2.dispose(); + + // First, connection_disrupted is sent immediately + await waitForServerCall(([arg]) => { + const message = arg.data; + expect(message.type).to.eql('connection_disrupted'); + expect(message.from).to.include('/client-host2'); + expect(message.origin).to.include('/client-host2'); + }); + + // Then, after grace period, dispose message is sent await waitForServerCall(([arg]) => { const message = arg.data as DisposeMessage; expect(message.type).to.eql('dispose'); expect(message.from).to.include('/client-host2'); expect(message.origin).to.include('/client-host2'); }); + await waitForClient1Call(([arg]) => { const message = arg.data as DisposeMessage; expect(message.type).to.eql('dispose'); @@ -295,7 +306,7 @@ describe('Socket communication', () => { it('should handle client reconnection and cancel delayed dispose', async () => { const COMMUNICATION_ID = 'reconnect-test'; - const { spy: disposeSpy } = createWaitForCall<(ev: { data: Message }) => void>('dispose'); + const { spy: messageSpy } = createWaitForCall<(ev: { data: Message }) => void>('message'); const { waitForCall: waitForConnect, spy: connectSpy } = createWaitForCall<() => void>('connect'); const clientCom = new Communication(clientHost, 'client-host', serverTopology); @@ -313,8 +324,8 @@ describe('Socket communication', () => { const methods = clientCom.apiProxy({ id: 'server-host' }, { id: COMMUNICATION_ID }); expect(await methods.sayHello()).to.eql('hello'); - // Listen for dispose & reconnect messages - serverHost.addEventListener('message', disposeSpy); + // Listen for messages & reconnect events + serverHost.addEventListener('message', messageSpy); clientHost.subscribers.on('connect', connectSpy); // Disconnect and quickly reconnect (before dispose delay expires) @@ -328,8 +339,12 @@ describe('Socket communication', () => { // Wait a bit more than the dispose delay to ensure dispose timer would have fired await sleep(disposeGraceMs * 2); - // Verify no dispose message was sent (since reconnection cancelled it) - expect(disposeSpy.callCount).to.eql(0); + // Count only dispose messages (connection_disrupted is sent, but dispose should not be) + const disposeMessages = messageSpy.getCalls().filter((call) => { + const message = call.args[0].data as Message; + return message.type === 'dispose'; + }); + expect(disposeMessages.length).to.eql(0); // Verify communication still works after reconnection expect(await methods.sayHello()).to.eql('hello'); @@ -338,8 +353,8 @@ describe('Socket communication', () => { it('should emit dispose message if client does not reconnect within dispose delay', async () => { const COMMUNICATION_ID = 'dispose-test'; - const { waitForCall: waitForDispose, spy: disposeSpy } = - createWaitForCall<(ev: { data: Message }) => void>('dispose'); + const { waitForCall: waitForMessage, spy: messageSpy } = + createWaitForCall<(ev: { data: Message }) => void>('message'); const clientCom = new Communication(clientHost, 'client-host', serverTopology); const serverCom = new Communication(serverHost, 'server-host'); @@ -356,20 +371,22 @@ describe('Socket communication', () => { const methods = clientCom.apiProxy({ id: 'server-host' }, { id: COMMUNICATION_ID }); expect(await methods.sayHello()).to.eql('hello'); - // Register dispose listener after initial communication - serverHost.addEventListener('message', disposeSpy); + // Register message listener after initial communication + serverHost.addEventListener('message', messageSpy); // Disconnect without reconnecting clientHost.disconnectSocket(); expect(clientHost.isConnected()).to.eql(false); - // Wait for dispose message to be emitted after delay - await waitForDispose(([arg]) => { - const message = arg.data as DisposeMessage; - expect(message.type).to.eql('dispose'); - expect(message.from).to.include('/client-host'); - expect(message.origin).to.include('/client-host'); - return true; + // Wait for dispose message (filtering out connection_disrupted) + await waitForMessage(([arg]) => { + const message = arg.data; + if (message.type === 'dispose') { + expect(message.from).to.include('/client-host'); + expect(message.origin).to.include('/client-host'); + return true; + } + return false; }); // Verify that connection can be established again after dispose From 39f00d6ca4a97905a42969cc71df644d9b8bc5ec Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 13:53:19 +0200 Subject: [PATCH 3/6] refactor: reuse server options as an interface --- packages/runtime-node/src/node-env-manager.ts | 4 ++-- packages/runtime-node/src/ws-node-host.ts | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/runtime-node/src/node-env-manager.ts b/packages/runtime-node/src/node-env-manager.ts index 87a3042b8..ac3dd00c7 100644 --- a/packages/runtime-node/src/node-env-manager.ts +++ b/packages/runtime-node/src/node-env-manager.ts @@ -10,7 +10,7 @@ import { IDisposable, SetMultiMap } from '@dazl/patterns'; import { fileURLToPath } from 'node:url'; import { parseArgs } from 'node:util'; import { extname } from 'node:path'; -import { WsServerHost } from './ws-node-host.js'; +import { WsNodeOptions, WsServerHost } from './ws-node-host.js'; import { ILaunchHttpServerOptions, launchEngineHttpServer } from './launch-http-server.js'; import { workerThreadInitializer2 } from './worker-thread-initializer2.js'; import { bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js'; @@ -50,7 +50,7 @@ export class NodeEnvManager implements IDisposable { public async autoLaunch( runtimeOptions: Map, serverOptions: ILaunchHttpServerOptions = {}, - hostOptions: { disposeGraceMs?: number } = {}, + hostOptions: WsNodeOptions = {}, ) { process.env.ENGINE_FLOW_V2_DIST_URL = this.importMeta.url; const disposeMetricsListener = bindMetricsListener(() => this.collectMetricsFromAllOpenEnvironments()); diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index d3d19a000..ffbaec57d 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -17,6 +17,10 @@ export class WsHost extends BaseHost { type ClientEnvId = string; type ClientId = string; +export interface WsNodeOptions { + disposeGraceMs?: number; +} + export class WsServerHost extends BaseHost implements IDisposable { private clients = new Map< ClientId, @@ -34,7 +38,7 @@ export class WsServerHost extends BaseHost implements IDisposable { constructor( private server: io.Server | io.Namespace, - config: { disposeGraceMs?: number } = {}, + config: WsNodeOptions = {}, ) { super(); this.disposeGraceMs = config.disposeGraceMs ?? 120_000; From 1999f1e0e24394d85949da75a1e4db504e8e5ba1 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 14:05:37 +0200 Subject: [PATCH 4/6] refactor: modify WsServerHost to remove reference to old socket --- packages/runtime-node/src/ws-node-host.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index ffbaec57d..23e09356a 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -25,7 +25,7 @@ export class WsServerHost extends BaseHost implements IDisposable { private clients = new Map< ClientId, { - socket: io.Socket; + socket?: io.Socket; namespacedEnvIds: Set; disposeTimer?: NodeJS.Timeout; disposed: boolean; @@ -100,7 +100,7 @@ export class WsServerHost extends BaseHost implements IDisposable { if (client) { data.to = parsed.envId; - client.socket.emit('message', data); + client.socket?.emit('message', data); return; } } @@ -127,9 +127,6 @@ export class WsServerHost extends BaseHost implements IDisposable { existingClient.disposeTimer = undefined; } - // remove old socket listeners - existingClient.socket.removeAllListeners(); - // Update socket reference existingClient.socket = socket; if (existingClient.disposed) { @@ -190,6 +187,9 @@ export class WsServerHost extends BaseHost implements IDisposable { if (!clientToDispose) return; clientToDispose.disposed = true; + clientToDispose.socket?.removeAllListeners(); + clientToDispose.socket = undefined; + this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds); }, this.disposeGraceMs); }); From 8f1c96f1449082c9b9d952d4cdbe4edeb81685d6 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 14:12:36 +0200 Subject: [PATCH 5/6] refactor: remove hardcoded socket client options and allow to pass them from the outside --- packages/core/src/com/hosts/ws-client-host.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index f08304a24..4d2da2464 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -1,4 +1,4 @@ -import { io, Socket, type SocketOptions } from 'socket.io-client'; +import { io, ManagerOptions, Socket, type SocketOptions } from 'socket.io-client'; import type { Message } from '../message-types.js'; import { BaseHost } from './base-host.js'; import { EventEmitter, IDisposable, SafeDisposable } from '@dazl/patterns'; @@ -19,7 +19,7 @@ export class WsClientHost extends BaseHost implements IDisposable { }>(); private stableClientId = crypto.randomUUID(); - constructor(url: string, options?: Partial) { + constructor(url: string, options?: Partial) { super(); this.disposables.add('close socket', () => this.socketClient.close()); this.disposables.add('clear subscribers', () => this.subscribers.clear()); @@ -38,10 +38,6 @@ export class WsClientHost extends BaseHost implements IDisposable { auth: { clientId: this.stableClientId, }, - randomizationFactor: 0.1, - reconnectionDelay: 100, - reconnectionDelayMax: 1000, - timeout: 3000, // Connection attempt timeout ...options, }); From 4e1a86dad728acc070632d3f826e60d44c3033fb Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Sun, 16 Nov 2025 15:46:02 +0200 Subject: [PATCH 6/6] feat: added initial connection timeout scaling --- packages/core/src/com/hosts/ws-client-host.ts | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index 4d2da2464..00871dfe3 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -2,14 +2,15 @@ import { io, ManagerOptions, Socket, type SocketOptions } from 'socket.io-client import type { Message } from '../message-types.js'; import { BaseHost } from './base-host.js'; import { EventEmitter, IDisposable, SafeDisposable } from '@dazl/patterns'; -import { deferred } from 'promise-assist'; +import { deferred, type PromiseRejectCb, type PromiseResolveCb } from 'promise-assist'; export class WsClientHost extends BaseHost implements IDisposable { + private reinitCount = 0; private disposables = new SafeDisposable(WsClientHost.name); dispose = this.disposables.dispose; isDisposed = this.disposables.isDisposed; public connected: Promise; - private socketClient: Socket; + private socketClient!: Socket; public subscribers = new EventEmitter<{ disconnect: string; reconnect: void; @@ -29,6 +30,17 @@ export class WsClientHost extends BaseHost implements IDisposable { const { promise, resolve, reject } = deferred(); this.connected = promise; + this.initSocketIO(url, path, query, options, reject, resolve); + } + + private initSocketIO( + url: string, + path: string | undefined, + query: { [k: string]: string }, + options: Partial | undefined, + reject: PromiseRejectCb, + resolve: PromiseResolveCb, + ) { this.socketClient = io(url, { transports: ['websocket'], withCredentials: true, // Pass Cookie to socket io connection @@ -42,10 +54,27 @@ export class WsClientHost extends BaseHost implements IDisposable { }); this.socketClient.once('connect_error', (error) => { + if (error.message === 'timeout' && this.reinitCount < 3) { + this.reinitCount++; + this.socketClient.close(); + this.initSocketIO( + url, + path, + query, + { + ...options, + timeout: (options?.timeout ?? 20000) * (this.reinitCount + 1), + }, + reject, + resolve, + ); + return; + } reject(new Error(`Failed to connect to socket server`, { cause: error })); }); this.socketClient.on('connect', () => { + this.reinitCount = Infinity; this.subscribers.emit('connect', undefined); resolve(); });