From 03fe7fee32f1a821e0b9597d2de20c452315a257 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Wed, 5 Nov 2025 15:13:33 +0200 Subject: [PATCH 01/16] feat(ws): implement reconnection logic and client connection disposal handling --- packages/core/src/com/hosts/ws-client-host.ts | 30 +++- packages/runtime-node/src/ws-node-host.ts | 149 ++++++++++++++---- packages/runtime-node/test/node-com.unit.ts | 97 +++++++++++- 3 files changed, 236 insertions(+), 40 deletions(-) diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index ffb874529..a3744f393 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -10,7 +10,8 @@ export class WsClientHost extends BaseHost implements IDisposable { isDisposed = this.disposables.isDisposed; public connected: Promise; private socketClient: Socket; - public subscribers = new EventEmitter<{ disconnect: void; reconnect: void }>(); + public subscribers = new EventEmitter<{ disconnect: void; reconnect: void; connect: void }>(); + private stableClientId = globalThis.crypto.randomUUID(); constructor(url: string, options?: Partial) { super(); @@ -24,10 +25,12 @@ export class WsClientHost extends BaseHost implements IDisposable { this.socketClient = io(url, { transports: ['websocket'], - forceNew: true, withCredentials: true, // Pass Cookie to socket io connection path, query, + auth: { + clientId: this.stableClientId, + }, ...options, }); @@ -36,15 +39,16 @@ export class WsClientHost extends BaseHost implements IDisposable { }); this.socketClient.on('connect', () => { - this.socketClient.on('message', (data: unknown) => { - this.emitMessageHandlers(data as Message); - }); + this.subscribers.emit('connect', undefined); resolve(); }); + this.socketClient.on('message', (data: unknown) => { + this.emitMessageHandlers(data as Message); + }); + this.socketClient.on('disconnect', () => { this.subscribers.emit('disconnect', undefined); - this.socketClient.close(); }); this.socketClient.on('reconnect', () => { @@ -57,4 +61,18 @@ export class WsClientHost extends BaseHost implements IDisposable { public postMessage(data: any) { this.socketClient.emit('message', data); } + + disconnectSocket() { + if (this.socketClient.connected) { + this.socketClient.disconnect(); + } + } + reconnectSocket() { + if (!this.socketClient.connected) { + this.socketClient.connect(); + } + } + isConnected(): boolean { + return this.socketClient.connected; + } } diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 81b0e3b23..ad3751c60 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -14,47 +14,132 @@ export class WsHost extends BaseHost { } } +type EnvId = string; +type StableId = string; + export class WsServerHost extends BaseHost implements IDisposable { - private socketToEnvId = new Map(); + private clients = new Map< + StableId, + { + socket: io.Socket; + namespacedEnvIds: Set; + disposeTimer?: NodeJS.Timeout; + } + >(); private disposables = new SafeDisposable(WsServerHost.name); dispose = this.disposables.dispose; isDisposed = this.disposables.isDisposed; + private disposeDelayMs: number; - constructor(private server: io.Server | io.Namespace) { + constructor( + private server: io.Server | io.Namespace, + config: { disposeDelayMs?: number } = {}, + ) { super(); + this.disposeDelayMs = config.disposeDelayMs ?? 120000; // 2 minutes default this.server.on('connection', this.onConnection); this.disposables.add('connection', () => this.server.off('connection', this.onConnection)); this.disposables.add('clear handlers', () => this.handlers.clear()); + this.disposables.add('dispose clients', () => { + // clear pending dispose timers and emit dispose messages for all env IDs + for (const client of this.clients.values()) { + if (client.disposeTimer) { + clearTimeout(client.disposeTimer); + } + this.emitDisposeMessagesForClient(client.namespacedEnvIds); + } + this.clients.clear(); + }); + } + + private extractClientIdAndEnvId(namespacedId: string): { stableClientId: string; envId: string } | null { + const slashIndex = namespacedId.indexOf('/'); + if (slashIndex === -1) { + return null; + } + return { + stableClientId: namespacedId.substring(0, slashIndex), + envId: namespacedId.substring(slashIndex + 1), + }; + } + + private emitDisposeMessagesForClient(namespacedEnvIds: Set): void { + for (const envId of namespacedEnvIds) { + this.emitMessageHandlers({ + type: 'dispose', + from: envId, + origin: envId, + to: '*', + forwardingChain: [], + }); + } } public postMessage(data: Message) { if (data.to !== '*') { - if (this.socketToEnvId.has(data.to)) { - const { socket, clientID } = this.socketToEnvId.get(data.to)!; - data.to = clientID; - socket.emit('message', data); - } else { - this.server.emit('message', data); + const parsed = this.extractClientIdAndEnvId(data.to); + if (parsed) { + const client = this.clients.get(parsed.stableClientId); + + if (client) { + data.to = parsed.envId; + client.socket.emit('message', data); + return; + } } + // If not found in any client, broadcast + this.server.emit('message', data); } else { this.server.emit('message', data); } } private onConnection = (socket: io.Socket): void => { - const nameSpace = (original: string) => `${socket.id}/${original}`; + const stableClientId = socket.handshake.auth?.clientId as string | undefined; + + if (!stableClientId) { + throw new Error('Client must provide a stable client ID in socket.handshake.auth.clientId'); + } + + // Handle reconnection: update socket and clear dispose timer + const existingClient = this.clients.get(stableClientId); + if (existingClient) { + console.log(`Client reconnected! Stable Client ID: ${stableClientId}`); + + // Clear dispose timer if exists + if (existingClient.disposeTimer) { + console.log(` Clearing dispose timer for client: ${stableClientId}`); + clearTimeout(existingClient.disposeTimer); + existingClient.disposeTimer = undefined; + } + + // remove old socket listeners + existingClient.socket.removeAllListeners(); + // Update socket reference + existingClient.socket = socket; + console.log(`✅ Reconnection detected - dispose cancelled for stable client: ${stableClientId}`); + } else { + // New connection: create client entry + this.clients.set(stableClientId, { + socket, + namespacedEnvIds: new Set(), + }); + } + const onMessage = (message: Message): void => { - // this mapping should not be here because of forwarding of messages - // maybe change message forwarding to have 'forward destination' and correct 'from' - // also maybe we can put the init of the map on 'connection' event - // maybe we can notify from client about the new connected id - const originId = nameSpace(message.origin); - const fromId = nameSpace(message.from); - this.socketToEnvId.set(fromId, { socket, clientID: message.from }); - this.socketToEnvId.set(originId, { socket, clientID: message.origin }); - // modify message to be able to forward it - message.from = fromId; - message.origin = originId; + const client = this.clients.get(stableClientId); + if (!client) return; + // Namespace the env IDs with stableClientId to differentiate between clients + const namespacedFrom = `${stableClientId}/${message.from}`; + const namespacedOrigin = `${stableClientId}/${message.origin}`; + + // Track namespaced env IDs for this client + client.namespacedEnvIds.add(namespacedFrom); + client.namespacedEnvIds.add(namespacedOrigin); + + // Modify message with namespaced IDs for routing + message.from = namespacedFrom; + message.origin = namespacedOrigin; this.emitMessageHandlers(message); }; @@ -62,18 +147,18 @@ export class WsServerHost extends BaseHost implements IDisposable { socket.once('disconnect', () => { socket.off('message', onMessage); - for (const [envId, { socket: soc }] of this.socketToEnvId.entries()) { - if (socket === soc) { - this.socketToEnvId.delete(envId); - this.emitMessageHandlers({ - type: 'dispose', - from: envId, - origin: envId, - to: '*', - forwardingChain: [], - }); - } - } + + const client = this.clients.get(stableClientId); + if (!client) return; + + // Delay dispose to allow for socket recovery + client.disposeTimer = setTimeout(() => { + const clientToDispose = this.clients.get(stableClientId); + if (!clientToDispose) return; + + this.clients.delete(stableClientId); + this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds); + }, this.disposeDelayMs); }); }; } diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index 11c592ba5..e33718c0d 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -14,7 +14,7 @@ import { expect } from 'chai'; import { safeListeningHttpServer } from 'create-listening-server'; import { fork } from 'node:child_process'; import type { Socket } from 'node:net'; -import { waitFor } from 'promise-assist'; +import { sleep, waitFor } from 'promise-assist'; import sinon, { spy } from 'sinon'; import * as io from 'socket.io'; @@ -56,7 +56,7 @@ describe('Socket communication', () => { }); clientHost = new WsClientHost(serverTopology['server-host']); - serverHost = new WsServerHost(nameSpace); + serverHost = new WsServerHost(nameSpace, { disposeDelayMs: 10 }); // 10ms remove disconnected clients for test speed await clientHost.connected; }); @@ -284,6 +284,99 @@ describe('Socket communication', () => { expect(message.from).to.equal('server-host'); }); }); + + it('should handle client reconnection and cancel delayed dispose', async () => { + const COMMUNICATION_ID = 'reconnect-test'; + const { spy: disposeSpy } = createWaitForCall<(ev: { data: Message }) => void>('dispose'); + const { waitForCall: waitForConnect, spy: connectSpy } = createWaitForCall<() => void>('connect'); + + const clientCom = new Communication(clientHost, 'client-host', serverTopology); + const serverCom = new Communication(serverHost, 'server-host'); + + serverCom.registerAPI( + { id: COMMUNICATION_ID }, + { + sayHello: () => 'hello', + sayHelloWithDataAndParams: (name: string) => `hello ${name}`, + }, + ); + + // Test communication works + const methods = clientCom.apiProxy({ id: 'server-host' }, { id: COMMUNICATION_ID }); + expect(await methods.sayHello()).to.eql('hello'); + + // Listen for dispose & reconnect messages + serverHost.addEventListener('message', disposeSpy); + clientHost.subscribers.on('connect', connectSpy); + + // Disconnect and quickly reconnect (before dispose delay expires) + clientHost.disconnectSocket(); + expect(clientHost.isConnected()).to.eql(false); + + // Reconnect immediately (within the 10ms dispose delay) + clientHost.reconnectSocket(); + await waitForConnect(() => true); + + // Wait a bit more than the dispose delay to ensure dispose timer would have fired + await sleep(50); + + // Verify no dispose message was sent (since reconnection cancelled it) + expect(disposeSpy.callCount).to.eql(0); + + // Verify communication still works after reconnection + expect(await methods.sayHello()).to.eql('hello'); + expect(await methods.sayHelloWithDataAndParams('reconnected')).to.eq('hello reconnected'); + }); + + it('should emit dispose message if client does not reconnect within dispose delay', async () => { + const COMMUNICATION_ID = 'dispose-test'; + const { waitForCall: waitForDispose, spy: disposeSpy } = + createWaitForCall<(ev: { data: Message }) => void>('dispose'); + + const clientCom = new Communication(clientHost, 'client-host', serverTopology); + const serverCom = new Communication(serverHost, 'server-host'); + + serverCom.registerAPI( + { id: COMMUNICATION_ID }, + { + sayHello: () => 'hello', + sayHelloWithDataAndParams: (name: string) => `hello ${name}`, + }, + ); + + // Test communication works + const methods = clientCom.apiProxy({ id: 'server-host' }, { id: COMMUNICATION_ID }); + expect(await methods.sayHello()).to.eql('hello'); + + // Register dispose listener after initial communication + serverHost.addEventListener('message', disposeSpy); + + // Disconnect without reconnecting + clientHost.disconnectSocket(); + expect(clientHost.isConnected()).to.eql(false); + + // Wait for dispose message to be emitted after delay + await waitForDispose(([arg]) => { + const message = arg.data as DisposeMessage; + expect(message.type).to.eql('dispose'); + expect(message.from).to.include('/client-host'); + expect(message.origin).to.include('/client-host'); + return true; + }); + + // Verify that connection can be established again after dispose + const { waitForCall: waitForConnect, spy: connectSpy } = + createWaitForCall<() => void>('reconnect-after-dispose'); + clientHost.subscribers.on('connect', connectSpy); + + clientHost.reconnectSocket(); + await waitForConnect(() => true); + expect(clientHost.isConnected()).to.eql(true); + + // Verify communication works again after reconnection + expect(await methods.sayHello()).to.eql('hello'); + expect(await methods.sayHelloWithDataAndParams('after-dispose')).to.eq('hello after-dispose'); + }); }); describe('IPC communication', () => { From 7e8320e5058126733ecdc83274501a74479d843c Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Wed, 5 Nov 2025 15:26:07 +0200 Subject: [PATCH 02/16] feat: add reason for disconnect event --- packages/core/src/com/hosts/ws-client-host.ts | 6 +++--- packages/runtime-node/test/node-com.unit.ts | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index a3744f393..20afd1de9 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -10,7 +10,7 @@ export class WsClientHost extends BaseHost implements IDisposable { isDisposed = this.disposables.isDisposed; public connected: Promise; private socketClient: Socket; - public subscribers = new EventEmitter<{ disconnect: void; reconnect: void; connect: void }>(); + public subscribers = new EventEmitter<{ disconnect: string; reconnect: void; connect: void }>(); private stableClientId = globalThis.crypto.randomUUID(); constructor(url: string, options?: Partial) { @@ -47,8 +47,8 @@ export class WsClientHost extends BaseHost implements IDisposable { this.emitMessageHandlers(data as Message); }); - this.socketClient.on('disconnect', () => { - this.subscribers.emit('disconnect', undefined); + this.socketClient.on('disconnect', (reason: string) => { + this.subscribers.emit('disconnect', reason); }); this.socketClient.on('reconnect', () => { diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index e33718c0d..cfce8b0b5 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -238,6 +238,7 @@ describe('Socket communication', () => { await waitFor( () => { expect(spy.callCount).to.be.eq(1); + expect(spy.firstCall.args[0]).to.be.a('string'); }, { timeout: 2_000, @@ -387,7 +388,6 @@ describe('IPC communication', () => { const mainHost = new BaseHost(); const communication = new Communication(mainHost, 'main'); const forked = fork(new URL('./process-entry.js', import.meta.url)); - disposables.add(() => forked.kill()); const host = new IPCHost(forked); communication.registerEnv('process', host); communication.registerMessageHandler(host); @@ -399,6 +399,9 @@ describe('IPC communication', () => { ); expect(await proxy.echo()).to.eq('yo'); + + // Clean up after the test completes + forked.kill(); }); it('handles forked process closing', async () => { From 7f3959fdf098b70a9b89a19d3154fd2dd6258dd2 Mon Sep 17 00:00:00 2001 From: Ido Rosenthal Date: Wed, 5 Nov 2025 16:18:01 +0200 Subject: [PATCH 03/16] chore: remove debug logs --- packages/runtime-node/src/ws-node-host.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index ad3751c60..0fe613758 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -104,11 +104,8 @@ export class WsServerHost extends BaseHost implements IDisposable { // Handle reconnection: update socket and clear dispose timer const existingClient = this.clients.get(stableClientId); if (existingClient) { - console.log(`Client reconnected! Stable Client ID: ${stableClientId}`); - // Clear dispose timer if exists if (existingClient.disposeTimer) { - console.log(` Clearing dispose timer for client: ${stableClientId}`); clearTimeout(existingClient.disposeTimer); existingClient.disposeTimer = undefined; } @@ -117,7 +114,6 @@ export class WsServerHost extends BaseHost implements IDisposable { existingClient.socket.removeAllListeners(); // Update socket reference existingClient.socket = socket; - console.log(`✅ Reconnection detected - dispose cancelled for stable client: ${stableClientId}`); } else { // New connection: create client entry this.clients.set(stableClientId, { From 02e8ecf2f2a1b3b637bcbb81421208a20e67a4e6 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:18:53 +0200 Subject: [PATCH 04/16] test: ensure every listening abstraction is closed otherwise timers don't get cleaned up properly --- packages/runtime-node/test/node-com.unit.ts | 18 +++++++---- .../test/node-env.manager.unit.ts | 31 +++++++++++-------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index cfce8b0b5..b53944f32 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -56,7 +56,9 @@ describe('Socket communication', () => { }); clientHost = new WsClientHost(serverTopology['server-host']); + disposables.add(() => clientHost.dispose()); serverHost = new WsServerHost(nameSpace, { disposeDelayMs: 10 }); // 10ms remove disconnected clients for test speed + disposables.add(() => serverHost.dispose()); await clientHost.connected; }); @@ -225,14 +227,14 @@ describe('Socket communication', () => { it('notifies if environment is disconnected', async () => { const spy = sinon.spy(); const clientCom = new Communication(clientHost, 'client-host', serverTopology); - const { id } = await socketClientInitializer({ + const socketClient = await socketClientInitializer({ communication: clientCom, env: new Environment('server-host', 'node', 'single'), }); + disposables.add(() => socketClient.dispose()); + expect(socketClient.id).to.not.eq(undefined); - expect(id).to.not.eq(undefined); - - const host = clientCom.getEnvironmentHost(id); + const host = clientCom.getEnvironmentHost(socketClient.id); (host as WsClientHost).subscribers.on('disconnect', spy); await socketServer.close(); await waitFor( @@ -252,22 +254,26 @@ describe('Socket communication', () => { const { waitForCall: waitForClient1Call, spy: spyClient1 } = createWaitForCall<(ev: { data: Message }) => void>('client'); const clientHost1 = new WsClientHost(serverTopology['server-host']!); + disposables.add(() => clientHost1.dispose()); const clientHost2 = new WsClientHost(serverTopology['server-host']!); + const clientCom1 = new Communication(clientHost1, 'client-host1', serverTopology); const clientCom2 = new Communication(clientHost2, 'client-host2', serverTopology); new Communication(serverHost, 'server-host'); - await socketClientInitializer({ + const socketClient1 = await socketClientInitializer({ communication: clientCom1, env: { env: 'server-host', }, }); - await socketClientInitializer({ + disposables.add(() => socketClient1.dispose()); + const socketClient2 = await socketClientInitializer({ communication: clientCom2, env: { env: 'server-host', }, }); + disposables.add(() => socketClient2.dispose()); clientCom1.registerEnv('client-host2', clientCom1.getEnvironmentHost('server-host')!); serverHost.addEventListener('message', spyServer); clientHost1.addEventListener('message', spyClient1); diff --git a/packages/runtime-node/test/node-env.manager.unit.ts b/packages/runtime-node/test/node-env.manager.unit.ts index d63c61406..280d31d0d 100644 --- a/packages/runtime-node/test/node-env.manager.unit.ts +++ b/packages/runtime-node/test/node-env.manager.unit.ts @@ -12,6 +12,14 @@ import { runEnv as runAEnv } from '../test-kit/entrypoints/a.node.js'; import testFeature from '../test-kit/feature/test-feature.js'; describe('NodeEnvManager', () => { + const disposables = new Set<() => Promise | void>(); + afterEach(async () => { + for (const dispose of Array.from(disposables).reverse()) { + await dispose(); + } + disposables.clear(); + }); + const meta = { url: import.meta.resolve('../test-kit/entrypoints/') }; const testCommunicationId = 'test'; @@ -39,14 +47,11 @@ describe('NodeEnvManager', () => { }; manager = new NodeEnvManager(meta, featureEnvironmentsMapping); + disposables.add(() => manager.dispose()); const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']])); nodeEnvsPort = port; communication = getClientCom(port); - }); - - afterEach(async () => { - await communication.dispose(); - await manager.dispose(); + disposables.add(() => communication.dispose()); }); it('should reach env "a"', async () => { @@ -68,7 +73,9 @@ describe('NodeEnvManager', () => { it('should handle two communication with the same', async () => { // setup new com instance with the same id const communication2 = new Communication(new BaseHost(), testCommunicationId); + disposables.add(() => communication2.dispose()); const host = new WsClientHost('http://localhost:' + nodeEnvsPort, {}); + disposables.add(() => host.dispose()); communication2.registerEnv(aEnv.env, host); communication2.registerEnv(bEnv.env, host); @@ -85,20 +92,21 @@ describe('NodeEnvManager', () => { }); describe('NodeEnvManager with 2 node envs, one remote the other in a worker thread', () => { - let closeEnvA: () => Promise; let nodeEnvsManager: NodeEnvManager; let communication: Communication; beforeEach(async () => { const { port: aPort, socketServer, close } = await launchEngineHttpServer(); - closeEnvA = close; + disposables.add(() => close()); + const wsServerHost = new WsServerHost(socketServer); + disposables.add(() => wsServerHost.dispose()); await runAEnv({ Feature: testFeature, topLevelConfig: [ COM.configure({ config: { - host: new WsServerHost(socketServer), + host: wsServerHost, id: aEnv.env, }, }), @@ -125,13 +133,10 @@ describe('NodeEnvManager', () => { }; nodeEnvsManager = new NodeEnvManager(meta, featureEnvironmentsMapping); + disposables.add(() => nodeEnvsManager.dispose()); const { port } = await nodeEnvsManager.autoLaunch(new Map([['feature', 'test-feature']])); communication = getClientCom(port); - }); - afterEach(async () => { - await communication.dispose(); - await closeEnvA(); - await nodeEnvsManager.dispose(); + disposables.add(() => communication.dispose()); }); it('should reach env "a"', async () => { From 61c49bc70f7d7f4494a8959e15f845897f8ec6e4 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:22:22 +0200 Subject: [PATCH 05/16] refactor: can access global symbol directly --- packages/core/src/com/hosts/ws-client-host.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index 20afd1de9..707986d34 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -11,7 +11,7 @@ export class WsClientHost extends BaseHost implements IDisposable { public connected: Promise; private socketClient: Socket; public subscribers = new EventEmitter<{ disconnect: string; reconnect: void; connect: void }>(); - private stableClientId = globalThis.crypto.randomUUID(); + private stableClientId = crypto.randomUUID(); constructor(url: string, options?: Partial) { super(); From 23ab5478ce0e8886e079bb8a3acabd01b319b802 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:49:25 +0200 Subject: [PATCH 06/16] refactor: rename type for clarity --- packages/runtime-node/src/ws-node-host.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 0fe613758..b3c179bef 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -15,11 +15,11 @@ export class WsHost extends BaseHost { } type EnvId = string; -type StableId = string; +type ClientId = string; export class WsServerHost extends BaseHost implements IDisposable { private clients = new Map< - StableId, + ClientId, { socket: io.Socket; namespacedEnvIds: Set; From 34937818e43e6cde4e9440f16b717863919e0ce9 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:50:38 +0200 Subject: [PATCH 07/16] refactor: add separator for number clarity --- packages/runtime-node/src/ws-node-host.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index b3c179bef..74385f90c 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -36,7 +36,7 @@ export class WsServerHost extends BaseHost implements IDisposable { config: { disposeDelayMs?: number } = {}, ) { super(); - this.disposeDelayMs = config.disposeDelayMs ?? 120000; // 2 minutes default + this.disposeDelayMs = config.disposeDelayMs ?? 120_000; this.server.on('connection', this.onConnection); this.disposables.add('connection', () => this.server.off('connection', this.onConnection)); this.disposables.add('clear handlers', () => this.handlers.clear()); From 45f8581d9b4d4e93cd9f0e8531a5b87118692ea1 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:52:21 +0200 Subject: [PATCH 08/16] refactor: rename type for clarity --- packages/runtime-node/src/ws-node-host.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 74385f90c..44abcd792 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -14,7 +14,7 @@ export class WsHost extends BaseHost { } } -type EnvId = string; +type ClientEnvId = string; type ClientId = string; export class WsServerHost extends BaseHost implements IDisposable { @@ -22,7 +22,7 @@ export class WsServerHost extends BaseHost implements IDisposable { ClientId, { socket: io.Socket; - namespacedEnvIds: Set; + namespacedEnvIds: Set; disposeTimer?: NodeJS.Timeout; } >(); @@ -63,7 +63,7 @@ export class WsServerHost extends BaseHost implements IDisposable { }; } - private emitDisposeMessagesForClient(namespacedEnvIds: Set): void { + private emitDisposeMessagesForClient(namespacedEnvIds: Set): void { for (const envId of namespacedEnvIds) { this.emitMessageHandlers({ type: 'dispose', From 09be97067788a1775ac21a6a562f45e8b6a8717b Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:54:34 +0200 Subject: [PATCH 09/16] refactor: use slice because we're chads --- packages/runtime-node/src/ws-node-host.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 44abcd792..45e0a7dc7 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -58,8 +58,8 @@ export class WsServerHost extends BaseHost implements IDisposable { return null; } return { - stableClientId: namespacedId.substring(0, slashIndex), - envId: namespacedId.substring(slashIndex + 1), + stableClientId: namespacedId.slice(0, slashIndex), + envId: namespacedId.slice(slashIndex + 1), }; } From 0322c520960a890f9aed4a7ef21f2ec67e1b1db3 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:56:15 +0200 Subject: [PATCH 10/16] refactor: use undefined instead of the 1 billion dollar mistake --- packages/runtime-node/src/ws-node-host.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 45e0a7dc7..76b1f5051 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -52,10 +52,10 @@ export class WsServerHost extends BaseHost implements IDisposable { }); } - private extractClientIdAndEnvId(namespacedId: string): { stableClientId: string; envId: string } | null { + private extractClientIdAndEnvId(namespacedId: string): { stableClientId: string; envId: string } | undefined { const slashIndex = namespacedId.indexOf('/'); if (slashIndex === -1) { - return null; + return undefined; } return { stableClientId: namespacedId.slice(0, slashIndex), From 8869420adbfe2a5e6f455f776e6108e429848846 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 12:59:19 +0200 Subject: [PATCH 11/16] refactor: rename for clarity --- packages/runtime-node/src/ws-node-host.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 76b1f5051..10c8474ed 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -95,14 +95,14 @@ export class WsServerHost extends BaseHost implements IDisposable { } private onConnection = (socket: io.Socket): void => { - const stableClientId = socket.handshake.auth?.clientId as string | undefined; + const clientId = socket.handshake.auth?.clientId as string | undefined; - if (!stableClientId) { - throw new Error('Client must provide a stable client ID in socket.handshake.auth.clientId'); + if (!clientId) { + throw new Error('Client must provide a client ID in socket.handshake.auth.clientId'); } // Handle reconnection: update socket and clear dispose timer - const existingClient = this.clients.get(stableClientId); + const existingClient = this.clients.get(clientId); if (existingClient) { // Clear dispose timer if exists if (existingClient.disposeTimer) { @@ -116,18 +116,18 @@ export class WsServerHost extends BaseHost implements IDisposable { existingClient.socket = socket; } else { // New connection: create client entry - this.clients.set(stableClientId, { + this.clients.set(clientId, { socket, namespacedEnvIds: new Set(), }); } const onMessage = (message: Message): void => { - const client = this.clients.get(stableClientId); + const client = this.clients.get(clientId); if (!client) return; // Namespace the env IDs with stableClientId to differentiate between clients - const namespacedFrom = `${stableClientId}/${message.from}`; - const namespacedOrigin = `${stableClientId}/${message.origin}`; + const namespacedFrom = `${clientId}/${message.from}`; + const namespacedOrigin = `${clientId}/${message.origin}`; // Track namespaced env IDs for this client client.namespacedEnvIds.add(namespacedFrom); @@ -144,15 +144,15 @@ export class WsServerHost extends BaseHost implements IDisposable { socket.once('disconnect', () => { socket.off('message', onMessage); - const client = this.clients.get(stableClientId); + const client = this.clients.get(clientId); if (!client) return; // Delay dispose to allow for socket recovery client.disposeTimer = setTimeout(() => { - const clientToDispose = this.clients.get(stableClientId); + const clientToDispose = this.clients.get(clientId); if (!clientToDispose) return; - this.clients.delete(stableClientId); + this.clients.delete(clientId); this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds); }, this.disposeDelayMs); }); From 4d7359853d095c608d79fdb83868653d42000361 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 13:02:19 +0200 Subject: [PATCH 12/16] fix: use socket.id for backwards compatibility previously, the unstable socket.id was used to identify a client. now clients pass a stable uuid they generate during code evaluation. --- packages/runtime-node/src/ws-node-host.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 10c8474ed..3d0e3dfa7 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -95,11 +95,7 @@ export class WsServerHost extends BaseHost implements IDisposable { } private onConnection = (socket: io.Socket): void => { - const clientId = socket.handshake.auth?.clientId as string | undefined; - - if (!clientId) { - throw new Error('Client must provide a client ID in socket.handshake.auth.clientId'); - } + const clientId = socket.handshake.auth?.clientId ?? socket.id; // Handle reconnection: update socket and clear dispose timer const existingClient = this.clients.get(clientId); From d4dccd46acd7531d91e164c3feeda63c5bbd2475 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 14:04:38 +0200 Subject: [PATCH 13/16] refactor: extract and reuse dispose delay timer --- packages/runtime-node/test/node-com.unit.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index b53944f32..033e37f24 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -24,6 +24,7 @@ interface ICommunicationTestApi { } describe('Socket communication', () => { + const disposeDelayMs = 10; let clientHost: WsClientHost; let serverHost: WsServerHost; let socketServer: io.Server; @@ -57,7 +58,7 @@ describe('Socket communication', () => { clientHost = new WsClientHost(serverTopology['server-host']); disposables.add(() => clientHost.dispose()); - serverHost = new WsServerHost(nameSpace, { disposeDelayMs: 10 }); // 10ms remove disconnected clients for test speed + serverHost = new WsServerHost(nameSpace, { disposeDelayMs }); disposables.add(() => serverHost.dispose()); await clientHost.connected; }); @@ -325,7 +326,7 @@ describe('Socket communication', () => { await waitForConnect(() => true); // Wait a bit more than the dispose delay to ensure dispose timer would have fired - await sleep(50); + await sleep(disposeDelayMs * 2); // Verify no dispose message was sent (since reconnection cancelled it) expect(disposeSpy.callCount).to.eql(0); From 097ed03a532ced0bb86dabe32e7b6dbff3a58e23 Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 14:05:42 +0200 Subject: [PATCH 14/16] refactor: rename variable for clarity the delay is used as grace time to be able to reconnect --- packages/runtime-node/src/ws-node-host.ts | 8 ++++---- packages/runtime-node/test/node-com.unit.ts | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 3d0e3dfa7..f6cfdaf53 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -29,14 +29,14 @@ export class WsServerHost extends BaseHost implements IDisposable { private disposables = new SafeDisposable(WsServerHost.name); dispose = this.disposables.dispose; isDisposed = this.disposables.isDisposed; - private disposeDelayMs: number; + private disposeGraceMs: number; constructor( private server: io.Server | io.Namespace, - config: { disposeDelayMs?: number } = {}, + config: { disposeGraceMs?: number } = {}, ) { super(); - this.disposeDelayMs = config.disposeDelayMs ?? 120_000; + this.disposeGraceMs = config.disposeGraceMs ?? 120_000; this.server.on('connection', this.onConnection); this.disposables.add('connection', () => this.server.off('connection', this.onConnection)); this.disposables.add('clear handlers', () => this.handlers.clear()); @@ -150,7 +150,7 @@ export class WsServerHost extends BaseHost implements IDisposable { this.clients.delete(clientId); this.emitDisposeMessagesForClient(clientToDispose.namespacedEnvIds); - }, this.disposeDelayMs); + }, this.disposeGraceMs); }); }; } diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index 033e37f24..af201824e 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -24,7 +24,7 @@ interface ICommunicationTestApi { } describe('Socket communication', () => { - const disposeDelayMs = 10; + const disposeGraceMs = 10; let clientHost: WsClientHost; let serverHost: WsServerHost; let socketServer: io.Server; @@ -58,7 +58,7 @@ describe('Socket communication', () => { clientHost = new WsClientHost(serverTopology['server-host']); disposables.add(() => clientHost.dispose()); - serverHost = new WsServerHost(nameSpace, { disposeDelayMs }); + serverHost = new WsServerHost(nameSpace, { disposeGraceMs }); disposables.add(() => serverHost.dispose()); await clientHost.connected; }); @@ -326,7 +326,7 @@ describe('Socket communication', () => { await waitForConnect(() => true); // Wait a bit more than the dispose delay to ensure dispose timer would have fired - await sleep(disposeDelayMs * 2); + await sleep(disposeGraceMs * 2); // Verify no dispose message was sent (since reconnection cancelled it) expect(disposeSpy.callCount).to.eql(0); From bacb2efa78a4d40c30d91156677cb6d33cd01f9a Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 14:11:12 +0200 Subject: [PATCH 15/16] feat: close sockets without clientId --- packages/runtime-node/src/ws-node-host.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index f6cfdaf53..be4a31a20 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -95,7 +95,11 @@ export class WsServerHost extends BaseHost implements IDisposable { } private onConnection = (socket: io.Socket): void => { - const clientId = socket.handshake.auth?.clientId ?? socket.id; + const clientId = socket.handshake.auth?.clientId; + if (!clientId) { + socket.disconnect(true); + return; + } // Handle reconnection: update socket and clear dispose timer const existingClient = this.clients.get(clientId); From ee9034c9cd1c03e167ea150fe8dccc3f88e10aeb Mon Sep 17 00:00:00 2001 From: Avi Vahl Date: Thu, 6 Nov 2025 14:23:04 +0200 Subject: [PATCH 16/16] test: revert killing only if test succeeds --- packages/runtime-node/test/node-com.unit.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index af201824e..4a1a927e4 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -395,6 +395,7 @@ describe('IPC communication', () => { const mainHost = new BaseHost(); const communication = new Communication(mainHost, 'main'); const forked = fork(new URL('./process-entry.js', import.meta.url)); + disposables.add(() => forked.kill()); const host = new IPCHost(forked); communication.registerEnv('process', host); communication.registerMessageHandler(host); @@ -406,9 +407,6 @@ describe('IPC communication', () => { ); expect(await proxy.echo()).to.eq('yo'); - - // Clean up after the test completes - forked.kill(); }); it('handles forked process closing', async () => {