diff --git a/cloudflare-gastown/container/plugin/client.test.ts b/cloudflare-gastown/container/plugin/client.test.ts index 35f05bb6b5..340f403c93 100644 --- a/cloudflare-gastown/container/plugin/client.test.ts +++ b/cloudflare-gastown/container/plugin/client.test.ts @@ -1,5 +1,12 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { GastownClient, MayorGastownClient, GastownApiError, createClientFromEnv } from './client'; +import { + GastownClient, + MayorGastownClient, + GastownApiError, + createClientFromEnv, + registerPluginClient, + refreshPluginClientTokens, +} from './client'; import type { GastownEnv, MayorGastownEnv } from './types'; const TEST_ENV: GastownEnv = { @@ -7,6 +14,7 @@ const TEST_ENV: GastownEnv = { sessionToken: 'test-jwt-token', agentId: 'agent-111', rigId: 'rig-222', + townId: 'town-333', }; function mockFetch(data: unknown, status = 200) { @@ -48,7 +56,9 @@ describe('GastownClient', () => { expect(fetchMock).toHaveBeenCalledTimes(1); const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/agents/agent-111/prime'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/agents/agent-111/prime' + ); const headers = new Headers(init.headers); expect(headers.get('Authorization')).toBe('Bearer test-jwt-token'); expect(headers.get('Content-Type')).toBe('application/json'); @@ -81,7 +91,7 @@ describe('GastownClient', () => { expect(result).toEqual(bead); const [url] = (globalThis.fetch as ReturnType).mock.calls[0] as [string]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/beads/bead-1'); + expect(url).toBe('https://gastown.example.com/api/towns/town-333/rigs/rig-222/beads/bead-1'); }); it('closeBead() sends agent_id in body', async () => { @@ -94,7 +104,9 @@ describe('GastownClient', () => { string, RequestInit, ]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/beads/bead-1/close'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/beads/bead-1/close' + ); expect(init.method).toBe('POST'); expect(JSON.parse(init.body as string)).toEqual({ agent_id: 'agent-111' }); }); @@ -112,7 +124,9 @@ describe('GastownClient', () => { string, RequestInit, ]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/agents/agent-111/done'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/agents/agent-111/done' + ); expect(JSON.parse(init.body as string)).toEqual({ branch: 'feat/test', pr_url: 'https://github.com/pr/1', @@ -145,7 +159,9 @@ describe('GastownClient', () => { expect(result).toEqual(mail); const [url] = (globalThis.fetch as ReturnType).mock.calls[0] as [string]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/agents/agent-111/mail'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/agents/agent-111/mail' + ); }); it('writeCheckpoint() posts data to checkpoint endpoint', async () => { @@ -157,7 +173,9 @@ describe('GastownClient', () => { string, RequestInit, ]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/agents/agent-111/checkpoint'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/agents/agent-111/checkpoint' + ); expect(JSON.parse(init.body as string)).toEqual({ data: { step: 3, files: ['a.ts'] } }); }); @@ -172,7 +190,7 @@ describe('GastownClient', () => { string, RequestInit, ]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/escalations'); + expect(url).toBe('https://gastown.example.com/api/towns/town-333/rigs/rig-222/escalations'); expect(JSON.parse(init.body as string)).toEqual({ title: 'blocked', priority: 'high' }); }); @@ -246,7 +264,9 @@ describe('GastownClient', () => { // Verify no double slashes in the URL by calling prime void c.prime(); const [url] = (globalThis.fetch as ReturnType).mock.calls[0] as [string]; - expect(url).toBe('https://gastown.example.com/api/rigs/rig-222/agents/agent-111/prime'); + expect(url).toBe( + 'https://gastown.example.com/api/towns/town-333/rigs/rig-222/agents/agent-111/prime' + ); }); }); @@ -262,6 +282,7 @@ describe('createClientFromEnv', () => { process.env.GASTOWN_SESSION_TOKEN = 'tok'; process.env.GASTOWN_AGENT_ID = 'agent-1'; process.env.GASTOWN_RIG_ID = 'rig-1'; + process.env.GASTOWN_TOWN_ID = 'town-1'; const client = createClientFromEnv(); expect(client).toBeInstanceOf(GastownClient); @@ -358,3 +379,115 @@ describe('MayorGastownClient', () => { expect(url).toBe('https://gastown.example.com/api/mayor/town-1/tools/convoys/convoy-1'); }); }); + +// ── Token refresh tests ───────────────────────────────────────────────── + +describe('GastownClient.setToken', () => { + const originalFetch = globalThis.fetch; + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('uses the new token after setToken()', async () => { + const env: GastownEnv = { + apiUrl: 'https://gastown.example.com', + sessionToken: 'old-token', + agentId: 'agent-1', + rigId: 'rig-1', + townId: 'town-1', + }; + const client = new GastownClient(env); + + // Replace token + client.setToken('new-token'); + + const fetchMock = mockFetch({ + agent: {}, + hooked_bead: null, + undelivered_mail: [], + open_beads: [], + }); + globalThis.fetch = fetchMock; + + await client.prime(); + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + const headers = new Headers(init.headers); + expect(headers.get('Authorization')).toBe('Bearer new-token'); + }); +}); + +describe('MayorGastownClient.setToken', () => { + const originalFetch = globalThis.fetch; + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('uses the new token after setToken()', async () => { + const env: MayorGastownEnv = { + apiUrl: 'https://gastown.example.com', + sessionToken: 'old-mayor-token', + agentId: 'mayor-1', + townId: 'town-1', + }; + const client = new MayorGastownClient(env); + + client.setToken('fresh-mayor-token'); + + const fetchMock = mockFetch([]); + globalThis.fetch = fetchMock; + + await client.listRigs(); + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + const headers = new Headers(init.headers); + expect(headers.get('Authorization')).toBe('Bearer fresh-mayor-token'); + }); +}); + +describe('plugin client registry', () => { + const REGISTRY_KEY = Symbol.for('gastown.pluginClientRegistry'); + + beforeEach(() => { + // Clear the global registry before each test + (globalThis as Record)[REGISTRY_KEY] = []; + }); + + it('registerPluginClient + refreshPluginClientTokens updates all clients', () => { + const client1 = new GastownClient({ + apiUrl: 'https://example.com', + sessionToken: 'old-1', + agentId: 'a1', + rigId: 'r1', + townId: 't1', + }); + const client2 = new MayorGastownClient({ + apiUrl: 'https://example.com', + sessionToken: 'old-2', + agentId: 'a2', + townId: 't2', + }); + + registerPluginClient(client1); + registerPluginClient(client2); + + refreshPluginClientTokens('refreshed-token'); + + // Verify by making a request and checking the auth header + const fetchMock = mockFetch([]); + const originalFetch = globalThis.fetch; + globalThis.fetch = fetchMock; + + void client1.checkMail(); + const [, init1] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(new Headers(init1.headers).get('Authorization')).toBe('Bearer refreshed-token'); + + void client2.listRigs(); + const [, init2] = fetchMock.mock.calls[1] as [string, RequestInit]; + expect(new Headers(init2.headers).get('Authorization')).toBe('Bearer refreshed-token'); + + globalThis.fetch = originalFetch; + }); +}); diff --git a/cloudflare-gastown/container/plugin/client.ts b/cloudflare-gastown/container/plugin/client.ts index 1627c2a16f..9f0c0c7a72 100644 --- a/cloudflare-gastown/container/plugin/client.ts +++ b/cloudflare-gastown/container/plugin/client.ts @@ -38,6 +38,11 @@ export class GastownClient { this.townId = env.townId; } + /** Hot-swap the session token for subsequent API calls. */ + setToken(token: string): void { + this.token = token; + } + private rigPath(path: string): string { return `${this.baseUrl}/api/towns/${this.townId}/rigs/${this.rigId}${path}`; } @@ -46,11 +51,16 @@ export class GastownClient { return this.rigPath(`/agents/${this.agentId}${path}`); } + /** Returns the most up-to-date token: explicit setToken() value first, then this.token. */ + private currentToken(): string { + return this.token; + } + private async request(url: string, init?: RequestInit): Promise { // Normalize headers so callers can pass plain objects, Headers instances, or tuples const headers = new Headers(init?.headers); headers.set('Content-Type', 'application/json'); - headers.set('Authorization', `Bearer ${this.token}`); + headers.set('Authorization', `Bearer ${this.currentToken()}`); let response: Response; try { @@ -208,14 +218,24 @@ export class MayorGastownClient { this.townId = env.townId; } + /** Hot-swap the session token for subsequent API calls. */ + setToken(token: string): void { + this.token = token; + } + private mayorPath(path: string): string { return `${this.baseUrl}/api/mayor/${this.townId}/tools${path}`; } + /** Returns the most up-to-date token. */ + private currentToken(): string { + return this.token; + } + private async request(url: string, init?: RequestInit): Promise { const headers = new Headers(init?.headers); headers.set('Content-Type', 'application/json'); - headers.set('Authorization', `Bearer ${this.token}`); + headers.set('Authorization', `Bearer ${this.currentToken()}`); let response: Response; try { @@ -322,6 +342,40 @@ export class MayorGastownClient { } } +// ── Plugin client token registry ───────────────────────────────────────── +// The plugin client runs inside the kilo SDK plugin (loaded by createKilo()), +// while the control server runs in the container's main module. They share +// the same Bun process but different TypeScript project roots, so they can't +// cross-import. We use a globalThis registry keyed by a well-known symbol so +// the control server can push fresh tokens into plugin client instances. + +type TokenRefreshable = { setToken(token: string): void }; + +const REGISTRY_KEY = Symbol.for('gastown.pluginClientRegistry'); + +function getRegistry(): TokenRefreshable[] { + const g = globalThis as Record; + if (!Array.isArray(g[REGISTRY_KEY])) { + g[REGISTRY_KEY] = []; + } + return g[REGISTRY_KEY] as TokenRefreshable[]; +} + +/** Register a plugin client so its token can be refreshed externally. */ +export function registerPluginClient(client: TokenRefreshable): void { + getRegistry().push(client); +} + +/** + * Update the session token on all registered plugin clients. + * Accessible from any module in the same process via the global symbol. + */ +export function refreshPluginClientTokens(token: string): void { + for (const client of getRegistry()) { + client.setToken(token); + } +} + export class GastownApiError extends Error { readonly status: number; diff --git a/cloudflare-gastown/container/plugin/index.ts b/cloudflare-gastown/container/plugin/index.ts index c8d8b3c094..40dae20a57 100644 --- a/cloudflare-gastown/container/plugin/index.ts +++ b/cloudflare-gastown/container/plugin/index.ts @@ -1,5 +1,10 @@ import type { Plugin } from '@kilocode/plugin'; -import { createClientFromEnv, createMayorClientFromEnv, GastownApiError } from './client'; +import { + createClientFromEnv, + createMayorClientFromEnv, + GastownApiError, + registerPluginClient, +} from './client'; import { createTools } from './tools'; import { createMayorTools } from './mayor-tools'; @@ -29,6 +34,7 @@ export const GastownPlugin: Plugin = async ({ client }) => { if (isMayor) { try { mayorClient = createMayorClientFromEnv(); + registerPluginClient(mayorClient); } catch (err) { const message = err instanceof Error ? err.message : String(err); console.error( @@ -41,6 +47,7 @@ export const GastownPlugin: Plugin = async ({ client }) => { } else { try { gastownClient = createClientFromEnv(); + registerPluginClient(gastownClient); } catch (err) { const message = err instanceof Error ? err.message : String(err); console.error( diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index d86461156b..cd73cd021f 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -11,10 +11,17 @@ import { stopAll, getAgentEvents, registerEventSink, + updateAgentToken, } from './process-manager'; -import { startHeartbeat, stopHeartbeat } from './heartbeat'; +import { startHeartbeat, stopHeartbeat, updateHeartbeatToken } from './heartbeat'; import { mergeBranch, setupRigBrowseWorktree } from './git-manager'; -import { StartAgentRequest, SendMessageRequest, MergeRequest, SetupRepoRequest } from './types'; +import { + StartAgentRequest, + SendMessageRequest, + MergeRequest, + SetupRepoRequest, + RefreshTokenRequest, +} from './types'; import type { AgentStatusResponse, HealthResponse, @@ -153,6 +160,34 @@ app.post('/agents/:agentId/message', async c => { return c.json({ sent: true }); }); +// POST /agents/:agentId/refresh-token +// Hot-swap the session JWT on a running agent. Called by the TownDO +// to push a fresh token before the current one expires. +app.post('/agents/:agentId/refresh-token', async c => { + const { agentId } = c.req.param(); + if (!getAgentStatus(agentId)) { + return c.json({ error: `Agent ${agentId} not found` }, 404); + } + const body: unknown = await c.req.json().catch(() => null); + const parsed = RefreshTokenRequest.safeParse(body); + if (!parsed.success) { + return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); + } + + const updated = updateAgentToken(agentId, parsed.data.token); + if (!updated) { + return c.json({ error: `Agent ${agentId} not found` }, 404); + } + + // Also update the heartbeat module's token so heartbeat POSTs use the + // fresh JWT. The heartbeat uses a single module-level token shared + // across all agents, so any refresh keeps heartbeats alive. + updateHeartbeatToken(parsed.data.token); + + console.log(`[control-server] Refreshed token for agent ${agentId}`); + return c.json({ refreshed: true }); +}); + // GET /agents/:agentId/status app.get('/agents/:agentId/status', c => { const { agentId } = c.req.param(); diff --git a/cloudflare-gastown/container/src/heartbeat.ts b/cloudflare-gastown/container/src/heartbeat.ts index b09531207b..d1718d72b5 100644 --- a/cloudflare-gastown/container/src/heartbeat.ts +++ b/cloudflare-gastown/container/src/heartbeat.ts @@ -27,6 +27,14 @@ export function startHeartbeat(apiUrl: string, token: string): void { console.log(`Heartbeat reporter started (interval=${HEARTBEAT_INTERVAL_MS}ms)`); } +/** + * Update the session token used for heartbeat requests. + * Called when the TownDO pushes a fresh JWT to the container. + */ +export function updateHeartbeatToken(token: string): void { + sessionToken = token; +} + /** * Stop the heartbeat reporter. */ diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index ea4afcfda9..6867b47484 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -451,6 +451,40 @@ export async function sendMessage(agentId: string, prompt: string): Promise; + const registry = g[PLUGIN_REGISTRY_KEY]; + if (!Array.isArray(registry)) return; + for (const client of registry) { + if (client && typeof client === 'object' && 'setToken' in client) { + (client as { setToken(t: string): void }).setToken(token); + } + } +} + +/** + * Hot-swap the session token on a running agent. + * Updates the ManagedAgent record (for broadcastEvent, completion reporting) + * AND all registered plugin clients (for tool-call API requests). + */ +export function updateAgentToken(agentId: string, token: string): boolean { + const agent = agents.get(agentId); + if (!agent) return false; + agent.gastownSessionToken = token; + // Also push the fresh token to plugin client instances in this process + refreshPluginClientTokens(token); + return true; +} + export function getAgentStatus(agentId: string): ManagedAgent | null { return agents.get(agentId) ?? null; } diff --git a/cloudflare-gastown/container/src/types.ts b/cloudflare-gastown/container/src/types.ts index 7da0d95778..6d7b350206 100644 --- a/cloudflare-gastown/container/src/types.ts +++ b/cloudflare-gastown/container/src/types.ts @@ -71,6 +71,11 @@ export const SendMessageRequest = z.object({ }); export type SendMessageRequest = z.infer; +export const RefreshTokenRequest = z.object({ + token: z.string().min(1), +}); +export type RefreshTokenRequest = z.infer; + // ── Agent lifecycle ───────────────────────────────────────────────────── export const AgentStatus = z.enum(['starting', 'running', 'stopping', 'exited', 'failed']); diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 6a7e719cd3..b10d630a5d 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1045,7 +1045,24 @@ export class TownDO extends DurableObject { let sessionStatus: 'idle' | 'active' | 'starting'; if (isAlive) { - const sent = await dispatch.sendMessageToAgent(this.env, townId, mayor.id, message); + // Mint a fresh JWT so the running agent has a valid token for any + // tool calls triggered by this message. This is the primary fix for + // #923 — without this, persistent agents (Mayor) get 401s after 8h. + const townConfig = await this.getTownConfig(); + const freshToken = await dispatch.mintAgentToken(this.env, { + agentId: mayor.id, + rigId: `mayor-${townId}`, + townId, + userId: townConfig.owner_user_id ?? townId, + }); + + const sent = await dispatch.sendMessageToAgent( + this.env, + townId, + mayor.id, + message, + freshToken ?? undefined + ); sessionStatus = sent ? 'active' : 'idle'; } else { const townConfig = await this.getTownConfig(); @@ -1121,6 +1138,19 @@ export class TownDO extends DurableObject { const isAlive = containerStatus.status === 'running' || containerStatus.status === 'starting'; if (isAlive) { + // Proactively refresh the agent's JWT so it doesn't go stale while + // the user keeps the page open (which keeps the container alive). + const townConfig = await this.getTownConfig(); + const freshToken = await dispatch.mintAgentToken(this.env, { + agentId: mayor.id, + rigId: `mayor-${townId}`, + townId, + userId: townConfig.owner_user_id ?? townId, + }); + if (freshToken) { + void dispatch.refreshAgentToken(this.env, townId, mayor.id, freshToken); + } + const status = mayor.status === 'working' || mayor.status === 'stalled' ? 'active' : 'idle'; return { agentId: mayor.id, sessionStatus: status }; } @@ -1964,6 +1994,14 @@ export class TownDO extends DurableObject { } } + // Refresh agent JWTs before any work that might trigger API calls. + // Throttled internally to once per hour (tokens have 8h expiry). + try { + await this.refreshRunningAgentTokens(); + } catch (err) { + console.warn(`${TOWN_LOG} alarm: refreshRunningAgentTokens failed`, err); + } + // Process reviews FIRST so the refinery gets assigned before the // scheduler dispatches new polecats. This prevents downstream beads // from starting before upstream reviews are merged. @@ -2022,6 +2060,50 @@ export class TownDO extends DurableObject { } } + /** + * Proactively refresh JWTs for all running agents. Called from the alarm + * handler but throttled to once per hour (tokens have 8h expiry, so + * hourly refresh provides ample safety margin). This is the safety net + * for agents that stay alive without receiving user messages — e.g. the + * refinery during a long review, or a mayor in a town with automated + * sling triggers. + */ + private lastTokenRefreshAt = 0; + private async refreshRunningAgentTokens(): Promise { + const TOKEN_REFRESH_INTERVAL_MS = 60 * 60_000; // 1 hour + const now = Date.now(); + if (now - this.lastTokenRefreshAt < TOKEN_REFRESH_INTERVAL_MS) return; + this.lastTokenRefreshAt = now; + + const townId = this.townId; + const activeAgents = agents.listAgents(this.sql, { status: 'working' }); + const stalledAgents = agents.listAgents(this.sql, { status: 'stalled' }); + const allActive = [...activeAgents, ...stalledAgents]; + + if (allActive.length === 0) return; + + const townConfig = await this.getTownConfig(); + const userId = townConfig.owner_user_id ?? townId; + + for (const agent of allActive) { + const rigId = agent.rig_id ?? `mayor-${townId}`; + const token = await dispatch.mintAgentToken(this.env, { + agentId: agent.id, + rigId, + townId, + userId, + }); + if (token) { + const ok = await dispatch.refreshAgentToken(this.env, townId, agent.id, token); + if (ok) { + console.log( + `${TOWN_LOG} refreshRunningAgentTokens: refreshed token for ${agent.identity}` + ); + } + } + } + } + private hasActiveWork(): boolean { const activeAgentRows = [ ...query( diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 841ae4e713..d5b4127ba0 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -412,17 +412,56 @@ export async function stopAgentInContainer( } } +/** + * Push a fresh JWT to an already-running agent in the container. + * Updates the agent's ManagedAgent record, heartbeat module, and plugin + * client tokens so all subsequent API calls use the new JWT. + */ +export async function refreshAgentToken( + env: Env, + townId: string, + agentId: string, + token: string +): Promise { + try { + const container = getTownContainerStub(env, townId); + const response = await container.fetch(`http://container/agents/${agentId}/refresh-token`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ token }), + }); + return response.ok; + } catch { + return false; + } +} + /** * Send a follow-up message to an existing agent in the container. + * Optionally refreshes the agent's JWT before sending the message. */ export async function sendMessageToAgent( env: Env, townId: string, agentId: string, - message: string + message: string, + /** Fresh JWT to push to the agent before sending the message. */ + token?: string ): Promise { try { const container = getTownContainerStub(env, townId); + + // Refresh the token first so the agent has a valid JWT for any tool + // calls triggered by processing this message. + if (token) { + const refreshed = await refreshAgentToken(env, townId, agentId, token); + if (!refreshed) { + console.warn( + `${TOWN_LOG} sendMessageToAgent: token refresh failed for agent ${agentId}, sending message anyway` + ); + } + } + const response = await container.fetch(`http://container/agents/${agentId}/message`, { method: 'POST', headers: { 'Content-Type': 'application/json' },