Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hono } from 'hono';
import { z } from 'zod';
import { runAgent } from './agent-runner';
import {
stopAgent,
Expand All @@ -24,29 +25,45 @@ import type {
const MAX_TICKETS = 1000;
const streamTickets = new Map<string, { agentId: string; expiresAt: number }>();

export const app = new Hono();
// Minimal Zod schema for the town config delivered via X-Town-Config header.
// Uses z.record() so any string-keyed object is accepted and future keys are preserved.
const TownConfigHeader = z.record(z.string(), z.unknown());

// Apply town config from X-Town-Config header (sent by TownDO on every request)
let currentTownConfig: Record<string, unknown> | null = null;
// Last-known-good town config. Updated on every request that carries the header.
// Used as a fallback by code that runs outside a request context (e.g. background tasks).
let lastKnownTownConfig: Record<string, unknown> | null = null;

/** Get the latest town config delivered via X-Town-Config header. */
export function getCurrentTownConfig(): Record<string, unknown> | null {
return currentTownConfig;
return lastKnownTownConfig;
}

export const app = new Hono();

// Parse and validate town config from X-Town-Config header (sent by TownDO on
// every request). The validated config is stored in a module-level cache
// accessible via getCurrentTownConfig().
app.use('*', async (c, next) => {
const configHeader = c.req.header('X-Town-Config');
if (configHeader) {
try {
const parsed = JSON.parse(configHeader);
currentTownConfig = parsed;
const hasToken =
typeof parsed.kilocode_token === 'string' && parsed.kilocode_token.length > 0;
console.log(
`[control-server] X-Town-Config received: hasKilocodeToken=${hasToken} keys=${Object.keys(parsed).join(',')}`
);
const raw: unknown = JSON.parse(configHeader);
const result = TownConfigHeader.safeParse(raw);
if (result.success) {
lastKnownTownConfig = result.data;
const hasToken =
typeof result.data.kilocode_token === 'string' && result.data.kilocode_token.length > 0;
console.log(
`[control-server] X-Town-Config received: hasKilocodeToken=${hasToken} keys=${Object.keys(result.data).join(',')}`
);
} else {
console.warn(
'[control-server] X-Town-Config header failed validation:',
result.error.issues
);
}
} catch {
console.warn('[control-server] X-Town-Config header malformed');
console.warn('[control-server] X-Town-Config header malformed (invalid JSON)');
}
}
await next();
Expand Down
19 changes: 16 additions & 3 deletions cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
*/

import { createOpencode, type OpencodeClient } from '@kilocode/sdk';
import { z } from 'zod';
import type { ManagedAgent, StartAgentRequest, KiloSSEEvent, KiloSSEEventData } from './types';
import { reportAgentCompleted } from './completion-reporter';

const MANAGER_LOG = '[process-manager]';

// Validates the shape returned by client.session.create() so we fail fast
// if the SDK changes its return type.
const SessionResponse = z.object({ id: z.string().min(1) }).passthrough();

type SDKInstance = {
client: OpencodeClient;
server: { url: string; close(): void };
Expand Down Expand Up @@ -298,9 +303,17 @@ export async function startAgent(

// 2. Create a session
const sessionResult = await client.session.create({ body: {} });
const session = sessionResult.data ?? sessionResult;
const sessionId =
typeof session === 'object' && session && 'id' in session ? String(session.id) : '';
const rawSession: unknown = sessionResult.data ?? sessionResult;
const parsed = SessionResponse.safeParse(rawSession);
if (!parsed.success) {
console.error(
`${MANAGER_LOG} SDK session.create returned unexpected shape:`,
JSON.stringify(rawSession).slice(0, 200),
parsed.error.issues
);
throw new Error('SDK session.create response missing required "id" field');
}
const sessionId = parsed.data.id;
agent.sessionId = sessionId;

// 3. Subscribe to events (async, runs in background)
Expand Down
1 change: 1 addition & 0 deletions cloudflare-gastown/src/db/tables/bead-events.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const BeadEventType = z.enum([
'status_changed',
'closed',
'escalated',
'notification_failed',
'mail_sent',
'review_submitted',
'review_completed',
Expand Down
1 change: 1 addition & 0 deletions cloudflare-gastown/src/db/tables/rig-bead-events.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const BeadEventType = z.enum([
'status_changed',
'closed',
'escalated',
'notification_failed',
'mail_sent',
'review_submitted',
'review_completed',
Expand Down
47 changes: 44 additions & 3 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ export class TownDO extends DurableObject<Env> {

// Fire-and-forget dispatch so the sling call returns immediately.
// The alarm loop retries if this fails.
void this.dispatchAgent(hookedAgent, bead);
this.dispatchAgent(hookedAgent, bead).catch(err =>
console.error(`${TOWN_LOG} slingBead: fire-and-forget dispatchAgent failed:`, err)
);
await this.armAlarmIfNeeded();
return { bead, agent: hookedAgent };
}
Expand Down Expand Up @@ -1072,7 +1074,26 @@ export class TownDO extends DurableObject<Env> {
if (input.severity !== 'low') {
this.sendMayorMessage(
`[Escalation:${input.severity}] rig=${input.source_rig_id} ${input.message}`
).catch(err => console.warn(`${TOWN_LOG} routeEscalation: failed to notify mayor:`, err));
).catch(err => {
console.warn(`${TOWN_LOG} routeEscalation: failed to notify mayor:`, err);
try {
beadOps.logBeadEvent(this.sql, {
beadId,
agentId: input.source_agent_id ?? null,
eventType: 'notification_failed',
metadata: {
target: 'mayor',
reason: err instanceof Error ? err.message : String(err),
severity: input.severity,
},
});
} catch (logErr) {
console.error(
`${TOWN_LOG} routeEscalation: failed to log notification_failed event:`,
logErr
);
}
});
}

return escalation;
Expand Down Expand Up @@ -1539,7 +1560,27 @@ export class TownDO extends DurableObject<Env> {
if (newSeverity !== 'low') {
this.sendMayorMessage(
`[Re-Escalation:${newSeverity}] rig=${esc.source_rig_id} ${esc.message}`
).catch(() => {});
).catch(err => {
console.warn(`${TOWN_LOG} re-escalation: failed to notify mayor:`, err);
try {
beadOps.logBeadEvent(this.sql, {
beadId: esc.id,
agentId: null,
eventType: 'notification_failed',
metadata: {
target: 'mayor',
reason: err instanceof Error ? err.message : String(err),
severity: newSeverity,
re_escalation: true,
},
});
} catch (logErr) {
console.error(
`${TOWN_LOG} re-escalation: failed to log notification_failed event:`,
logErr
);
}
});
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions cloudflare-gastown/src/gastown.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { withCloudflareAccess, validateCfAccessRequest } from './middleware/cf-a
import {
authMiddleware,
agentOnlyMiddleware,
townIdMiddleware,
type AuthVariables,
} from './middleware/auth.middleware';
import {
Expand Down Expand Up @@ -134,10 +135,13 @@ app.get('/', c => c.html(dashboardHtml()));

app.get('/health', c => c.json({ status: 'ok' }));

// ── Auth ────────────────────────────────────────────────────────────────
// ── Town ID + Auth ──────────────────────────────────────────────────────
// All rig routes live under /api/towns/:townId/rigs/:rigId so the townId
// is always available from the URL path. Auth middleware skipped in dev.
// is always available from the URL path.
// townIdMiddleware always runs (even in dev) so c.get('townId') is
// guaranteed for handlers. Auth middleware is skipped in dev.

app.use('/api/towns/:townId/rigs/:rigId/*', townIdMiddleware);
app.use('/api/towns/:townId/rigs/:rigId/*', async (c, next) =>
c.env.ENVIRONMENT === 'development' ? next() : authMiddleware(c, next)
);
Expand Down
8 changes: 3 additions & 5 deletions cloudflare-gastown/src/handlers/rig-agent-events.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod';
import { getTownDOStub } from '../dos/Town.do';
import { resSuccess, resError } from '../util/res.util';
import { parseJsonBody } from '../util/parse-json-body.util';
import { getEnforcedAgentId, getTownId } from '../middleware/auth.middleware';
import { getEnforcedAgentId } from '../middleware/auth.middleware';
import type { GastownEnv } from '../gastown.worker';

const AppendEventBody = z.object({
Expand Down Expand Up @@ -34,8 +34,7 @@ export async function handleAppendAgentEvent(c: Context<GastownEnv>, params: { r
return c.json(resError('agent_id does not match authenticated agent'), 403);
}

const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.appendAgentEvent(parsed.data.agent_id, parsed.data.event_type, parsed.data.data);
return c.json(resSuccess({ appended: true }), 201);
Expand All @@ -58,8 +57,7 @@ export async function handleGetAgentEvents(
return c.json(resError('Invalid query parameters'), 400);
}

const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const events = await town.getAgentEvents(
params.agentId,
Expand Down
40 changes: 13 additions & 27 deletions cloudflare-gastown/src/handlers/rig-agents.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { z } from 'zod';
import { getTownDOStub } from '../dos/Town.do';
import { resSuccess, resError } from '../util/res.util';
import { parseJsonBody } from '../util/parse-json-body.util';
import { getTownId } from '../middleware/auth.middleware';
import { AgentRole, AgentStatus } from '../types';
import type { GastownEnv } from '../gastown.worker';

Expand Down Expand Up @@ -42,8 +41,7 @@ export async function handleRegisterAgent(c: Context<GastownEnv>, params: { rigI
400
);
}
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const agent = await town.registerAgent({ ...parsed.data, rig_id: params.rigId });
return c.json(resSuccess(agent), 201);
Expand All @@ -58,8 +56,7 @@ export async function handleListAgents(c: Context<GastownEnv>, params: { rigId:
return c.json(resError('Invalid role or status filter'), 400);
}

const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const agents = await town.listAgents({
role: role?.data,
Expand All @@ -73,8 +70,7 @@ export async function handleGetAgent(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const agent = await town.getAgentAsync(params.agentId);
if (!agent || agent.rig_id !== params.rigId) return c.json(resError('Agent not found'), 404);
Expand All @@ -96,8 +92,7 @@ export async function handleHookBead(
console.log(
`${AGENT_LOG} handleHookBead: rigId=${params.rigId} agentId=${params.agentId} beadId=${parsed.data.bead_id}`
);
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.hookBead(params.agentId, parsed.data.bead_id);
console.log(`${AGENT_LOG} handleHookBead: hooked successfully`);
Expand All @@ -108,8 +103,7 @@ export async function handleUnhookBead(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.unhookBead(params.agentId);
return c.json(resSuccess({ unhooked: true }));
Expand All @@ -119,8 +113,7 @@ export async function handlePrime(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const context = await town.prime(params.agentId);
return c.json(resSuccess(context));
Expand All @@ -137,8 +130,7 @@ export async function handleAgentDone(
400
);
}
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.agentDone(params.agentId, parsed.data);
return c.json(resSuccess({ done: true }));
Expand All @@ -159,8 +151,7 @@ export async function handleAgentCompleted(
400
);
}
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.agentCompleted(params.agentId, parsed.data);
return c.json(resSuccess({ completed: true }));
Expand All @@ -177,8 +168,7 @@ export async function handleWriteCheckpoint(
400
);
}
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.writeCheckpoint(params.agentId, parsed.data.data);
return c.json(resSuccess({ written: true }));
Expand All @@ -188,8 +178,7 @@ export async function handleCheckMail(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const messages = await town.checkMail(params.agentId);
return c.json(resSuccess(messages));
Expand All @@ -203,8 +192,7 @@ export async function handleHeartbeat(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.touchAgentHeartbeat(params.agentId);
return c.json(resSuccess({ heartbeat: true }));
Expand All @@ -230,8 +218,7 @@ export async function handleGetOrCreateAgent(c: Context<GastownEnv>, params: { r
console.log(
`${AGENT_LOG} handleGetOrCreateAgent: rigId=${params.rigId} role=${parsed.data.role}`
);
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const agent = await town.getOrCreateAgent(parsed.data.role, params.rigId);
console.log(`${AGENT_LOG} handleGetOrCreateAgent: result=${JSON.stringify(agent).slice(0, 200)}`);
Expand All @@ -242,8 +229,7 @@ export async function handleDeleteAgent(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const agent = await town.getAgentAsync(params.agentId);
if (!agent || agent.rig_id !== params.rigId) return c.json(resError('Agent not found'), 404);
Expand Down
6 changes: 2 additions & 4 deletions cloudflare-gastown/src/handlers/rig-bead-events.handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Context } from 'hono';
import { getTownDOStub } from '../dos/Town.do';
import { resSuccess, resError } from '../util/res.util';
import { getTownId } from '../middleware/auth.middleware';
import { resSuccess } from '../util/res.util';
import type { GastownEnv } from '../gastown.worker';

export async function handleListBeadEvents(c: Context<GastownEnv>, params: { rigId: string }) {
Expand All @@ -14,8 +13,7 @@ export async function handleListBeadEvents(c: Context<GastownEnv>, params: { rig
? parsedLimit
: undefined;

const townId = getTownId(c);
if (!townId) return c.json(resError('Missing townId'), 400);
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
const events = await town.listBeadEvents({ beadId, since, limit });
return c.json(resSuccess(events));
Expand Down
Loading