From bb8a36f7970d7a925a08ee341ccb1d6e3b739c4b Mon Sep 17 00:00:00 2001 From: syn Date: Mon, 4 May 2026 09:30:35 -0500 Subject: [PATCH 1/9] refactor(kiloclaw): consolidate catch-all proxy blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All four catch-all paths (/i/:instanceId/*, host-based, cookie-routed, default personal) now share a single `proxyThroughTarget` helper. Previously the host branch used the helper while the other three inlined the same ~130-line HTTP + WebSocket relay; this commit collapses them onto the helper. The helper gains optional `unreachableHint` / `startingUpHint` parameters so the default-personal branch can keep its user-facing hint strings (these are test-asserted). All other behavior is preserved — same status codes, same JSON shapes, identical WebSocket relay semantics. One pre-existing inconsistency is unified rather than preserved: the cookie branch used to return `{ error: 'WebSocket upgrade failed' }` with status 502 when the upstream returned no webSocket. The helper (and the /i and default branches) return the raw containerResponse in that case, which is strictly more informative. No test asserted the cookie-branch-specific response. src/index.ts: +48 / -393 (net -345). --- services/kiloclaw/src/index.ts | 441 ++++----------------------------- 1 file changed, 48 insertions(+), 393 deletions(-) diff --git a/services/kiloclaw/src/index.ts b/services/kiloclaw/src/index.ts index 1b4382462..f26d879cc 100644 --- a/services/kiloclaw/src/index.ts +++ b/services/kiloclaw/src/index.ts @@ -159,39 +159,49 @@ function routingTargetUrl(target: ProviderRoutingTarget, pathname: string, searc /** * Forward an HTTP or WebSocket request through to a provider-routed target. * - * Shared by the host-based branch of the catch-all proxy. The existing - * cookie-based branch and the `/i/:instanceId/*` route predate this helper - * and inline the same logic — they can be consolidated in a follow-up. + * Shared by all four catch-all proxy branches (`/i/:instanceId/*`, host-based, + * cookie-routed, and default personal). `logTag` is prepended to console logs + * so failing requests can be traced back to their originating branch. * - * `logTag` is prepended to console logs so failing requests can be traced - * back to their originating branch. + * Optional `unreachableHint` / `startingUpHint` are attached to the 503 JSON + * bodies the helper emits when the upstream fetch fails or the container is + * still booting. Only the default-personal branch surfaces hints today (tests + * assert the specific strings); branches that prefer the bare error + * (`{ "error": "Instance not reachable" }`) just omit them. */ async function proxyThroughTarget(opts: { request: Request; targetUrl: string; forwardHeaders: Headers; logTag: string; + unreachableHint?: string; + startingUpHint?: string; }): Promise { - const { request, targetUrl, forwardHeaders, logTag } = opts; + const { request, targetUrl, forwardHeaders, logTag, unreachableHint, startingUpHint } = opts; const isWebSocketRequest = request.headers.get('Upgrade')?.toLowerCase() === 'websocket'; + const unreachableBody: Record = { error: 'Instance not reachable' }; + if (unreachableHint) unreachableBody.hint = unreachableHint; + const startingUpBody: Record = { error: 'Instance is starting up' }; + if (startingUpHint) startingUpBody.hint = startingUpHint; + if (isWebSocketRequest) { let containerResponse: Response; try { containerResponse = await fetch(targetUrl, { headers: forwardHeaders }); } catch (err) { console.error(`${logTag} WS fetch failed:`, err); - return Response.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); + return Response.json(unreachableBody, { + status: 503, + headers: { 'Retry-After': '5' }, + }); } if (containerResponse.status === 502) { - return Response.json( - { error: 'Instance is starting up' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); + return Response.json(startingUpBody, { + status: 503, + headers: { 'Retry-After': '5' }, + }); } const containerWs = containerResponse.webSocket; @@ -279,10 +289,10 @@ async function proxyThroughTarget(opts: { return httpResponse; } catch (err) { console.error(`${logTag} HTTP fetch failed:`, err); - return Response.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); + return Response.json(unreachableBody, { + status: 503, + headers: { 'Retry-After': '5' }, + }); } } @@ -507,117 +517,12 @@ app.all('/i/:instanceId/*', async c => { status.runtimeId ); - const isWebSocketRequest = c.req.raw.headers.get('Upgrade')?.toLowerCase() === 'websocket'; - - if (isWebSocketRequest) { - let containerResponse: Response; - try { - containerResponse = await fetch(targetUrl, { headers: forwardHeaders }); - } catch (err) { - console.error('[PROXY /i] Fly Proxy fetch failed:', err); - return c.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - - if (containerResponse.status === 502) { - return c.json( - { error: 'Instance is starting up' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - - const containerWs = containerResponse.webSocket; - if (!containerWs) { - return containerResponse; - } - - const [clientWs, serverWs] = Object.values(new WebSocketPair()); - serverWs.accept(); - containerWs.accept(); - - let droppedToContainer = 0; - let droppedToClient = 0; - - serverWs.addEventListener('message', event => { - if (containerWs.readyState === WebSocket.OPEN) { - containerWs.send(event.data as string | ArrayBuffer); - } else { - droppedToContainer++; - if (droppedToContainer === 1) { - console.warn( - '[WS /i] First dropped client->container message (readyState:', - containerWs.readyState, - ')' - ); - } - } - }); - containerWs.addEventListener('message', event => { - const data = transformWsMessage(event.data as string | ArrayBuffer); - if (serverWs.readyState === WebSocket.OPEN) { - serverWs.send(data); - } else { - droppedToClient++; - if (droppedToClient === 1) { - console.warn( - '[WS /i] First dropped container->client message (readyState:', - serverWs.readyState, - ')' - ); - } - } - }); - - const logDropSummary = () => { - const totalDropped = droppedToClient + droppedToContainer; - if (totalDropped > 0) { - console.warn( - '[WS /i] Connection closed with', - totalDropped, - 'dropped messages (toClient:', - droppedToClient, - 'toContainer:', - droppedToContainer, - ')' - ); - } - }; - - serverWs.addEventListener('close', event => { - logDropSummary(); - safeClose(containerWs, event.code, event.reason); - }); - containerWs.addEventListener('close', event => { - logDropSummary(); - safeClose(serverWs, event.code, sanitizeCloseReason(event.reason)); - }); - serverWs.addEventListener('error', () => safeClose(containerWs, 1011, 'Client error')); - containerWs.addEventListener('error', () => safeClose(serverWs, 1011, 'Container error')); - - return new Response(null, { status: 101, webSocket: clientWs }); - } - - // HTTP proxy - const requestBody = c.req.raw.body ? await c.req.raw.arrayBuffer() : null; - try { - const httpResponse = await fetch(targetUrl, { - method: c.req.raw.method, - headers: forwardHeaders, - body: requestBody, - }); - if (httpResponse.status === 502) { - return startingUpPage(); - } - return httpResponse; - } catch (err) { - console.error('[PROXY /i] HTTP fetch failed:', err); - return c.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } + return proxyThroughTarget({ + request: c.req.raw, + targetUrl, + forwardHeaders, + logTag: '[PROXY /i]', + }); }); // ============================================================================= @@ -977,117 +882,12 @@ app.all('*', async c => { providerHeaders: routingTarget.headers, }); - const isWebSocketRequest = request.headers.get('Upgrade')?.toLowerCase() === 'websocket'; - - if (isWebSocketRequest) { - let containerResponse: Response; - try { - containerResponse = await fetch(targetUrl, { headers: forwardHeaders }); - } catch (err) { - console.error('[PROXY] Cookie-routed WS fetch failed:', err); - return c.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - - if (containerResponse.status === 502) { - return c.json( - { error: 'Instance is starting up' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - - const containerWs = containerResponse.webSocket; - if (!containerWs) { - return c.json({ error: 'WebSocket upgrade failed' }, 502); - } - containerWs.accept(); - const [clientWs, serverWs] = Object.values(new WebSocketPair()); - serverWs.accept(); - - let cookieDroppedToContainer = 0; - let cookieDroppedToClient = 0; - - serverWs.addEventListener('message', event => { - if (containerWs.readyState === WebSocket.OPEN) { - containerWs.send(event.data as string | ArrayBuffer); - } else { - cookieDroppedToContainer++; - if (cookieDroppedToContainer === 1) { - console.warn( - '[WS cookie] First dropped client->container message (readyState:', - containerWs.readyState, - ')' - ); - } - } - }); - containerWs.addEventListener('message', event => { - const data = transformWsMessage(event.data as string | ArrayBuffer); - if (serverWs.readyState === WebSocket.OPEN) { - serverWs.send(data); - } else { - cookieDroppedToClient++; - if (cookieDroppedToClient === 1) { - console.warn( - '[WS cookie] First dropped container->client message (readyState:', - serverWs.readyState, - ')' - ); - } - } - }); - - const logCookieDropSummary = () => { - const totalDropped = cookieDroppedToClient + cookieDroppedToContainer; - if (totalDropped > 0) { - console.warn( - '[WS cookie] Connection closed with', - totalDropped, - 'dropped messages (toClient:', - cookieDroppedToClient, - 'toContainer:', - cookieDroppedToContainer, - ')' - ); - } - }; - - serverWs.addEventListener('close', event => { - logCookieDropSummary(); - safeClose(containerWs, event.code, event.reason); - }); - containerWs.addEventListener('close', event => { - logCookieDropSummary(); - safeClose(serverWs, event.code, sanitizeCloseReason(event.reason)); - }); - serverWs.addEventListener('error', () => safeClose(containerWs, 1011, 'Client error')); - containerWs.addEventListener('error', () => - safeClose(serverWs, 1011, 'Container error') - ); - return new Response(null, { status: 101, webSocket: clientWs }); - } - - // HTTP proxy - const requestBody = request.body ? await request.arrayBuffer() : null; - try { - const httpResponse = await fetch(targetUrl, { - method: request.method, - headers: forwardHeaders, - body: requestBody, - }); - if (httpResponse.status === 502) { - return startingUpPage(); - } - return httpResponse; - } catch (err) { - console.error('[PROXY] Cookie-routed HTTP fetch failed:', err); - return c.json( - { error: 'Instance not reachable' }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } + return proxyThroughTarget({ + request, + targetUrl, + forwardHeaders, + logTag: '[PROXY cookie]', + }); } } } @@ -1155,8 +955,6 @@ app.all('*', async c => { console.log('[PROXY] Handling request:', url.pathname, 'runtime:', runtimeId); - const isWebSocketRequest = request.headers.get('Upgrade')?.toLowerCase() === 'websocket'; - if (!c.env.GATEWAY_TOKEN_SECRET) { console.error('[CONFIG] Missing required environment variables: GATEWAY_TOKEN_SECRET'); return c.json( @@ -1176,157 +974,14 @@ app.all('*', async c => { providerHeaders: routingTarget.headers, }); - // WebSocket proxy - if (isWebSocketRequest) { - console.log('[WS] Proxying WebSocket connection to OpenClaw via Fly Proxy'); - - let containerResponse: Response; - try { - containerResponse = await fetch(targetUrl, { - headers: forwardHeaders, - }); - } catch (err) { - console.error('[WS] Fly Proxy fetch failed:', err); - return c.json( - { - error: 'Instance not reachable', - hint: 'Your instance may not be running. Start it from the dashboard.', - }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - console.log('[WS] Fly Proxy response status:', containerResponse.status); - - // Gateway not ready yet — return a clear JSON error for WebSocket clients - if (containerResponse.status === 502) { - return c.json( - { - error: 'Instance is starting up', - hint: 'The gateway process is still initializing. Please retry shortly.', - }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - - const containerWs = containerResponse.webSocket; - if (!containerWs) { - console.error('[WS] No WebSocket in response - returning direct response'); - return containerResponse; - } - - const [clientWs, serverWs] = Object.values(new WebSocketPair()); - - serverWs.accept(); - containerWs.accept(); - - let catchAllDroppedToContainer = 0; - let catchAllDroppedToClient = 0; - - // Client -> Container relay - serverWs.addEventListener('message', event => { - if (containerWs.readyState === WebSocket.OPEN) { - containerWs.send(event.data as string | ArrayBuffer); - } else { - catchAllDroppedToContainer++; - if (catchAllDroppedToContainer === 1) { - console.warn( - '[WS] First dropped client->container message (readyState:', - containerWs.readyState, - ')' - ); - } - } - }); - - // Container -> Client relay with error transformation - containerWs.addEventListener('message', event => { - const data = transformWsMessage(event.data as string | ArrayBuffer); - if (serverWs.readyState === WebSocket.OPEN) { - serverWs.send(data); - } else { - catchAllDroppedToClient++; - if (catchAllDroppedToClient === 1) { - console.warn( - '[WS] First dropped container->client message (readyState:', - serverWs.readyState, - ')' - ); - } - } - }); - - const logCatchAllDropSummary = () => { - const totalDropped = catchAllDroppedToClient + catchAllDroppedToContainer; - if (totalDropped > 0) { - console.warn( - '[WS] Connection closed with', - totalDropped, - 'dropped messages (toClient:', - catchAllDroppedToClient, - 'toContainer:', - catchAllDroppedToContainer, - ')' - ); - } - }; - - // Close relay - serverWs.addEventListener('close', event => { - logCatchAllDropSummary(); - safeClose(containerWs, event.code, event.reason); - }); - - containerWs.addEventListener('close', event => { - logCatchAllDropSummary(); - safeClose(serverWs, event.code, sanitizeCloseReason(event.reason)); - }); - - // Error relay - serverWs.addEventListener('error', event => { - console.error('[WS] Client error:', event); - safeClose(containerWs, 1011, 'Client error'); - }); - - containerWs.addEventListener('error', event => { - console.error('[WS] Container error:', event); - safeClose(serverWs, 1011, 'Container error'); - }); - - return new Response(null, { - status: 101, - webSocket: clientWs, - }); - } - - // HTTP proxy - // Buffer body upfront so it can be replayed on crash-recovery retry (streams are one-shot). - const requestBody = request.body ? await request.arrayBuffer() : null; - console.log('[HTTP] Proxying:', url.pathname + url.search); - let httpResponse: Response; - try { - httpResponse = await fetch(targetUrl, { - method: request.method, - headers: forwardHeaders, - body: requestBody, - }); - } catch (err) { - console.error('[HTTP] Fly Proxy fetch failed:', err); - return c.json( - { - error: 'Instance not reachable', - hint: 'Your instance may not be running. Start it from the dashboard.', - }, - { status: 503, headers: { 'Retry-After': '5' } } - ); - } - console.log('[HTTP] Response status:', httpResponse.status); - - // Gateway not ready yet — show friendly "starting up" page instead of raw 502 - if (httpResponse.status === 502) { - return startingUpPage(); - } - - return httpResponse; + return proxyThroughTarget({ + request, + targetUrl, + forwardHeaders, + logTag: '[PROXY default]', + unreachableHint: 'Your instance may not be running. Start it from the dashboard.', + startingUpHint: 'The gateway process is still initializing. Please retry shortly.', + }); }); export default class extends WorkerEntrypoint { From 410191e84c9e79776e5b18077b1a0d8a87e86773 Mon Sep 17 00:00:00 2001 From: syn Date: Mon, 4 May 2026 09:55:20 -0500 Subject: [PATCH 2/9] refactor(worker-utils): host label + sandbox-id helpers for cross-package reuse Move the pure sandboxId <-> hostname-label logic (plus sandboxId <-> userId encoding) from `services/kiloclaw/src/auth/` into `@kilocode/worker-utils` so `apps/web` can use it to mint per-instance URLs in PR3 without duplicating the base64url / base32hex encoding. - New subpath exports: `@kilocode/worker-utils/hostname-label` and `@kilocode/worker-utils/sandbox-id`. - The existing `services/kiloclaw/src/auth/hostname-label.ts` and `sandbox-id.ts` become thin re-export shims so the many existing `./auth/hostname-label` / `./auth/sandbox-id` imports inside the worker don't have to migrate all at once. - Tests move with the implementation. No behaviour change. --- packages/worker-utils/package.json | 2 + .../worker-utils/src}/hostname-label.test.ts | 6 +- packages/worker-utils/src/hostname-label.ts | 240 +++++++++++++++++ packages/worker-utils/src/sandbox-id.ts | 50 ++++ services/kiloclaw/src/auth/hostname-label.ts | 251 ++---------------- services/kiloclaw/src/auth/sandbox-id.ts | 58 +--- 6 files changed, 322 insertions(+), 285 deletions(-) rename {services/kiloclaw/src/auth => packages/worker-utils/src}/hostname-label.test.ts (98%) create mode 100644 packages/worker-utils/src/hostname-label.ts create mode 100644 packages/worker-utils/src/sandbox-id.ts diff --git a/packages/worker-utils/package.json b/packages/worker-utils/package.json index 5a9540a46..8523d8263 100644 --- a/packages/worker-utils/package.json +++ b/packages/worker-utils/package.json @@ -6,6 +6,8 @@ "exports": { ".": "./src/index.ts", "./instance-id": "./src/instance-id.ts", + "./sandbox-id": "./src/sandbox-id.ts", + "./hostname-label": "./src/hostname-label.ts", "./redact-headers": "./src/redact-headers.ts", "./kiloclaw-billing-observability": "./src/kiloclaw-billing-observability.ts" }, diff --git a/services/kiloclaw/src/auth/hostname-label.test.ts b/packages/worker-utils/src/hostname-label.test.ts similarity index 98% rename from services/kiloclaw/src/auth/hostname-label.test.ts rename to packages/worker-utils/src/hostname-label.test.ts index 961347151..fb36408f7 100644 --- a/services/kiloclaw/src/auth/hostname-label.test.ts +++ b/packages/worker-utils/src/hostname-label.test.ts @@ -1,13 +1,13 @@ import { describe, it, expect } from 'vitest'; -import { sandboxIdFromUserId } from './sandbox-id'; -import { sandboxIdFromInstanceId } from '@kilocode/worker-utils/instance-id'; +import { sandboxIdFromUserId } from './sandbox-id.js'; +import { sandboxIdFromInstanceId } from './instance-id.js'; import { hostnameLabelFromSandboxId, sandboxIdFromHostnameLabel, instanceUrl, parseInstanceHost, MAX_HOSTNAME_LABEL_LENGTH, -} from './hostname-label'; +} from './hostname-label.js'; describe('hostnameLabelFromSandboxId', () => { it('maps an instance-keyed sandboxId to `i-{32hex}`', () => { diff --git a/packages/worker-utils/src/hostname-label.ts b/packages/worker-utils/src/hostname-label.ts new file mode 100644 index 000000000..5957fb78d --- /dev/null +++ b/packages/worker-utils/src/hostname-label.ts @@ -0,0 +1,240 @@ +/** + * Hostname label <-> sandboxId translation for per-instance virtual hosting + * on `*.kiloclaw.ai` (or a configured dev-suffix). + * + * Two instance shapes map to two label prefixes: + * + * instance-keyed sandboxId "ki_{32hex}" <-> "i-{32hex}" + * legacy sandboxId base64url(userId) <-> "u-{base32hex(userId)}" + * + * Prefix disambiguates the two cases without a database lookup. + * + * The per-instance URL used to inject per-instance origins into + * `OPENCLAW_ALLOWED_ORIGINS` and (post-PR2) to route incoming requests by + * `Host` is built from two env-configurable pieces: + * + * KILOCLAW_INSTANCE_HOST_SUFFIX default ".kiloclaw.ai" + * KILOCLAW_INSTANCE_URL_SCHEME default "https" + * + * Dev parity: set the suffix to `.kiloclaw.localhost:8795` and the scheme to + * `http` and the worker will both inject `http://