From e3c49b0ad8d94c753b913e3eb2227519ef426fa6 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 18:19:59 +0000 Subject: [PATCH 1/7] feat(gastown): add container_eviction event type and /container-eviction endpoint Add foundation for Tier 1.5 graceful container eviction: - Add container_eviction to TownEventType enum - Add POST /api/towns/:townId/container-eviction route with container JWT auth for the container to signal SIGTERM receipt - Add recordContainerEviction() RPC on TownDO that inserts the event and sets a draining flag in DO KV storage - Add isDraining() RPC for reconciler consumption - Clear draining flag on agent heartbeat (container alive after restart) - Handle container_eviction in reconciler applyEvent (no-op, audit only) --- .../src/db/tables/town-events.table.ts | 1 + cloudflare-gastown/src/dos/Town.do.ts | 34 +++++++++++++ cloudflare-gastown/src/dos/town/reconciler.ts | 7 +++ cloudflare-gastown/src/gastown.worker.ts | 12 +++++ .../src/handlers/town-eviction.handler.ts | 50 +++++++++++++++++++ 5 files changed, 104 insertions(+) create mode 100644 cloudflare-gastown/src/handlers/town-eviction.handler.ts diff --git a/cloudflare-gastown/src/db/tables/town-events.table.ts b/cloudflare-gastown/src/db/tables/town-events.table.ts index 95309e7a04..30be09c65b 100644 --- a/cloudflare-gastown/src/db/tables/town-events.table.ts +++ b/cloudflare-gastown/src/db/tables/town-events.table.ts @@ -5,6 +5,7 @@ export const TownEventType = z.enum([ 'agent_done', 'agent_completed', 'container_status', + 'container_eviction', 'pr_status_changed', 'bead_created', 'bead_cancelled', diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158c..6e8626ff41 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -504,6 +504,9 @@ export class TownDO extends DurableObject { const townConfig = await config.getTownConfig(this.ctx.storage); this._ownerUserId = townConfig.owner_user_id; + // Load persisted draining flag + this._draining = (await this.ctx.storage.get('town:draining')) ?? false; + // All tables are now initialized via beads.initBeadTables(): // beads, bead_events, bead_dependencies, agent_metadata, review_metadata, // escalation_metadata, convoy_metadata @@ -537,6 +540,7 @@ export class TownDO extends DurableObject { private _townId: string | null = null; private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null; private _dashboardContext: string | null = null; + private _draining = false; private get townId(): string { return this._townId ?? this.ctx.id.name ?? this.ctx.id.toString(); @@ -563,6 +567,27 @@ export class TownDO extends DurableObject { return this._dashboardContext; } + // ══════════════════════════════════════════════════════════════════ + // Container Eviction (graceful drain) + // ══════════════════════════════════════════════════════════════════ + + /** + * Record a container eviction event and set the draining flag. + * Called by the container when it receives SIGTERM. While draining, + * the reconciler skips dispatch to prevent new work from starting. + */ + async recordContainerEviction(): Promise { + events.insertEvent(this.sql, 'container_eviction', {}); + this._draining = true; + await this.ctx.storage.put('town:draining', true); + console.log(`${TOWN_LOG} recordContainerEviction: draining flag set`); + } + + /** Whether the town is in draining mode (container eviction in progress). */ + async isDraining(): Promise { + return this._draining; + } + // ══════════════════════════════════════════════════════════════════ // Town Configuration // ══════════════════════════════════════════════════════════════════ @@ -1114,6 +1139,15 @@ export class TownDO extends DurableObject { } ): Promise { agents.touchAgent(this.sql, agentId, watermark); + + // A heartbeat proves the container is alive. If a draining flag was + // set from a previous eviction, clear it so dispatch resumes. + if (this._draining) { + this._draining = false; + await this.ctx.storage.put('town:draining', false); + console.log(`${TOWN_LOG} touchAgentHeartbeat: cleared draining flag (container alive)`); + } + await this.armAlarmIfNeeded(); } diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d213..722e8b58be 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -290,6 +290,13 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { return; } + case 'container_eviction': { + // Draining flag is managed by the TownDO via KV storage. + // The reconciler reads it from there; no SQL state change needed here. + // The event is recorded for audit trail. + return; + } + case 'nudge_timeout': { // GUPP violations are handled by reconcileGUPP on the next pass. // The event just records the fact for audit trail. diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 32d676e0af..d4bc0d6af9 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -134,6 +134,7 @@ import { handleListEscalations, handleAcknowledgeEscalation, } from './handlers/town-escalations.handler'; +import { handleContainerEviction } from './handlers/town-eviction.handler'; export { GastownUserDO } from './dos/GastownUser.do'; export { GastownOrgDO } from './dos/GastownOrg.do'; @@ -478,6 +479,17 @@ app.post('/api/towns/:townId/rigs/:rigId/triage/resolve', c => ) ); +// ── Container Eviction ────────────────────────────────────────────────── +// Called by the container on SIGTERM. Uses container JWT auth (not kilo +// user auth), so it must be registered before the kiloAuthMiddleware +// wildcard below. + +app.post('/api/towns/:townId/container-eviction', c => + instrumented(c, 'POST /api/towns/:townId/container-eviction', () => + handleContainerEviction(c, c.req.param()) + ) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts new file mode 100644 index 0000000000..9c283c7794 --- /dev/null +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -0,0 +1,50 @@ +import type { Context } from 'hono'; +import { extractBearerToken } from '@kilocode/worker-utils'; +import type { GastownEnv } from '../gastown.worker'; +import { getTownDOStub } from '../dos/Town.do'; +import { verifyContainerJWT } from '../util/jwt.util'; +import { resolveSecret } from '../util/secret.util'; +import { resSuccess, resError } from '../util/res.util'; + +/** + * POST /api/towns/:townId/container-eviction + * + * Called by the container's process-manager when the container receives + * SIGTERM. Inserts a `container_eviction` event and sets the draining + * flag so the reconciler stops dispatching new work. + * + * Authenticated with the container-scoped JWT (same token used for all + * container→worker calls). + */ +export async function handleContainerEviction( + c: Context, + params: { townId: string } +): Promise { + // Authenticate with container JWT + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[town-eviction] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + // Cross-town guard + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + await town.recordContainerEviction(); + + console.log(`[town-eviction] container eviction recorded for town=${params.townId}`); + return c.json(resSuccess({ acknowledged: true }), 200); +} From 4fde72dac6dba5fbab2c57a34ce65d8469e59deb Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 18:45:00 +0000 Subject: [PATCH 2/7] feat(gastown): skip dispatch_agent actions when town is draining Add draining flag check to reconciler dispatch rules for Tier 1.5 graceful container eviction. When the town is in draining state (container eviction in progress), all dispatch_agent emissions are suppressed with a log message. Non-dispatch rules (agent health, convoy progress, GUPP, GC) continue to function normally during drain. Guards added to: - reconcileBeads Rule 1 (unassigned open beads) - reconcileBeads Rule 2 (idle agents with hooks) - reconcileReviewQueue Rule 5 (refinery dispatch for open MRs) - reconcileReviewQueue Rule 6 (idle refinery re-dispatch) --- cloudflare-gastown/src/dos/Town.do.ts | 2 +- cloudflare-gastown/src/dos/town/reconciler.ts | 33 ++++++++++++++++--- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158c..6400275d40 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -3116,7 +3116,7 @@ export class TownDO extends DurableObject { // Phase 1: Reconcile — compute desired state vs actual state const sideEffects: Array<() => Promise> = []; try { - const actions = reconciler.reconcile(this.sql); + const actions = reconciler.reconcile(this.sql, { draining: this._draining }); metrics.actionsEmitted = actions.length; for (const a of actions) { metrics.actionsByType[a.type] = (metrics.actionsByType[a.type] ?? 0) + 1; diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d213..f5ddb4a06c 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -306,11 +306,12 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { // Top-level reconcile // ════════════════════════════════════════════════════════════════════ -export function reconcile(sql: SqlStorage): Action[] { +export function reconcile(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; actions.push(...reconcileAgents(sql)); - actions.push(...reconcileBeads(sql)); - actions.push(...reconcileReviewQueue(sql)); + actions.push(...reconcileBeads(sql, { draining })); + actions.push(...reconcileReviewQueue(sql, { draining })); actions.push(...reconcileConvoys(sql)); actions.push(...reconcileGUPP(sql)); actions.push(...reconcileGC(sql)); @@ -457,7 +458,8 @@ export function reconcileAgents(sql: SqlStorage): Action[] { // reconcileBeads — handle unassigned beads, lost agents, stale reviews // ════════════════════════════════════════════════════════════════════ -export function reconcileBeads(sql: SqlStorage): Action[] { +export function reconcileBeads(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Rule 1: Open issue beads with no assignee, no blockers, not staged, not triage @@ -498,6 +500,10 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const bead of unassigned) { if (!bead.rig_id) continue; + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${bead.bead_id}`); + continue; + } // In shadow mode we can't call getOrCreateAgent, so we just note // that a hook_agent + dispatch_agent is needed. // The action includes rig_id so Phase 3's applyAction can resolve the agent. @@ -594,6 +600,11 @@ export function reconcileBeads(sql: SqlStorage): Action[] { if (blockerCount[0]?.cnt > 0) continue; + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${agent.current_hook_bead_id}`); + continue; + } + actions.push({ type: 'dispatch_agent', agent_id: agent.bead_id, @@ -743,7 +754,8 @@ export function reconcileBeads(sql: SqlStorage): Action[] { // refinery dispatch // ════════════════════════════════════════════════════════════════════ -export function reconcileReviewQueue(sql: SqlStorage): Action[] { +export function reconcileReviewQueue(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Get all MR beads that need attention @@ -933,6 +945,12 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { if (oldestMr.length === 0) continue; + // Skip dispatch if the town is draining (container eviction in progress) + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${oldestMr[0].bead_id}`); + continue; + } + // If no refinery exists or it's busy, emit a dispatch_agent with empty // agent_id — applyAction will create the refinery via getOrCreateAgent. if (refinery.length === 0) { @@ -1044,6 +1062,11 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { const mr = mrRows[0]; if (mr.type !== 'merge_request' || mr.status !== 'in_progress') continue; + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${ref.current_hook_bead_id}`); + continue; + } + // Container status is checked at apply time (async). In shadow mode, // we just note that a dispatch is needed. actions.push({ From bae1ad6a70cd8a98e563ec2542f2478e76684c33 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 18:48:10 +0000 Subject: [PATCH 3/7] style: apply oxfmt formatting to reconciler --- cloudflare-gastown/src/dos/town/reconciler.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 8409a3e79c..404d71d87b 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -608,7 +608,9 @@ export function reconcileBeads(sql: SqlStorage, opts?: { draining?: boolean }): if (blockerCount[0]?.cnt > 0) continue; if (draining) { - console.log(`${LOG} Town is draining, skipping dispatch for bead ${agent.current_hook_bead_id}`); + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${agent.current_hook_bead_id}` + ); continue; } @@ -1070,7 +1072,9 @@ export function reconcileReviewQueue(sql: SqlStorage, opts?: { draining?: boolea if (mr.type !== 'merge_request' || mr.status !== 'in_progress') continue; if (draining) { - console.log(`${LOG} Town is draining, skipping dispatch for bead ${ref.current_hook_bead_id}`); + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${ref.current_hook_bead_id}` + ); continue; } From 157ca27716259490ca6c3aa0ada2ddba7809d616 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 19:43:01 +0000 Subject: [PATCH 4/7] fix(gastown): replace heartbeat drain clearing with nonce-based /container-ready acknowledgment Heartbeats from the old (draining) container could race with the eviction signal and prematurely clear the drain flag, re-enabling dispatch into a container that was shutting down. Changes: - recordContainerEviction() now generates a drain nonce and returns it - New acknowledgeContainerReady(nonce) method validates the nonce before clearing drain - Heartbeat responses include the drainNonce when draining, so the replacement container can call /container-ready - New POST /container-ready worker endpoint - Drain auto-expires after 15 minutes as a safety net - Container heartbeat module detects drainNonce in responses and calls /container-ready to clear drain on the replacement container --- cloudflare-gastown/container/src/heartbeat.ts | 45 ++++++++++ cloudflare-gastown/src/dos/Town.do.ts | 83 ++++++++++++++++--- cloudflare-gastown/src/gastown.worker.ts | 8 +- .../src/handlers/rig-agents.handler.ts | 5 +- .../src/handlers/town-eviction.handler.ts | 67 ++++++++++++++- 5 files changed, 192 insertions(+), 16 deletions(-) diff --git a/cloudflare-gastown/container/src/heartbeat.ts b/cloudflare-gastown/container/src/heartbeat.ts index bd9dd8db32..b503804916 100644 --- a/cloudflare-gastown/container/src/heartbeat.ts +++ b/cloudflare-gastown/container/src/heartbeat.ts @@ -6,6 +6,8 @@ const HEARTBEAT_INTERVAL_MS = 30_000; let heartbeatTimer: ReturnType | null = null; let gastownApiUrl: string | null = null; let sessionToken: string | null = null; +/** Set once we've successfully acknowledged container-ready. */ +let containerReadyAcknowledged = false; /** * Configure and start the heartbeat reporter. @@ -38,6 +40,37 @@ export function stopHeartbeat(): void { console.log('Heartbeat reporter stopped'); } +/** + * Call POST /container-ready to acknowledge that this is a fresh + * container replacing an evicted one. Clears the TownDO drain flag + * so the reconciler can resume dispatching. + */ +async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise { + const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken; + if (!gastownApiUrl || !currentToken) return; + + try { + const response = await fetch(`${gastownApiUrl}/api/towns/${townId}/container-ready`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${currentToken}`, + }, + body: JSON.stringify({ nonce: drainNonce }), + }); + if (response.ok) { + containerReadyAcknowledged = true; + console.log(`[heartbeat] container-ready acknowledged for town=${townId}`); + } else { + console.warn( + `[heartbeat] container-ready failed for town=${townId}: ${response.status} ${response.statusText}` + ); + } + } catch (err) { + console.warn(`[heartbeat] container-ready error for town=${townId}:`, err); + } +} + async function sendHeartbeats(): Promise { // Prefer the live container token (refreshed via POST /refresh-token) // over the token captured at startHeartbeat() time. @@ -77,6 +110,18 @@ async function sendHeartbeats(): Promise { console.warn( `Heartbeat failed for agent ${agent.agentId}: ${response.status} ${response.statusText}` ); + } else if (!containerReadyAcknowledged) { + // If the TownDO is draining, the heartbeat response includes a + // drainNonce. Use it to call /container-ready and clear drain. + try { + const body = (await response.json()) as { data?: { drainNonce?: string } }; + const nonce = body?.data?.drainNonce; + if (nonce) { + void acknowledgeContainerReady(agent.townId, nonce); + } + } catch { + // Non-JSON or unexpected shape — ignore + } } } catch (err) { console.warn(`Heartbeat error for agent ${agent.agentId}:`, err); diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index d04713d9b5..92b517e58a 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -504,8 +504,10 @@ export class TownDO extends DurableObject { const townConfig = await config.getTownConfig(this.ctx.storage); this._ownerUserId = townConfig.owner_user_id; - // Load persisted draining flag + // Load persisted draining flag, nonce, and start time this._draining = (await this.ctx.storage.get('town:draining')) ?? false; + this._drainNonce = (await this.ctx.storage.get('town:drainNonce')) ?? null; + this._drainStartedAt = (await this.ctx.storage.get('town:drainStartedAt')) ?? null; // All tables are now initialized via beads.initBeadTables(): // beads, bead_events, bead_dependencies, agent_metadata, review_metadata, @@ -541,6 +543,8 @@ export class TownDO extends DurableObject { private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null; private _dashboardContext: string | null = null; private _draining = false; + private _drainNonce: string | null = null; + private _drainStartedAt: number | null = null; private get townId(): string { return this._townId ?? this.ctx.id.name ?? this.ctx.id.toString(); @@ -575,12 +579,52 @@ export class TownDO extends DurableObject { * Record a container eviction event and set the draining flag. * Called by the container when it receives SIGTERM. While draining, * the reconciler skips dispatch to prevent new work from starting. + * + * Returns a drain nonce that must be presented via + * `acknowledgeContainerReady()` to clear the drain flag. This + * prevents stale heartbeats from the dying container from + * prematurely re-enabling dispatch. */ - async recordContainerEviction(): Promise { + async recordContainerEviction(): Promise { events.insertEvent(this.sql, 'container_eviction', {}); + const nonce = crypto.randomUUID(); + const startedAt = Date.now(); this._draining = true; + this._drainNonce = nonce; + this._drainStartedAt = startedAt; await this.ctx.storage.put('town:draining', true); - console.log(`${TOWN_LOG} recordContainerEviction: draining flag set`); + await this.ctx.storage.put('town:drainNonce', nonce); + await this.ctx.storage.put('town:drainStartedAt', startedAt); + console.log(`${TOWN_LOG} recordContainerEviction: draining flag set, nonce=${nonce}`); + return nonce; + } + + /** + * Acknowledge that the replacement container is ready. Clears the + * draining flag only if the provided nonce matches the one generated + * during `recordContainerEviction()`. This ensures that only the + * new container (which received the nonce via startup config) can + * re-enable dispatch — not a stale heartbeat from the old container. + */ + async acknowledgeContainerReady(nonce: string): Promise { + if (!this._draining) { + console.log(`${TOWN_LOG} acknowledgeContainerReady: not draining, noop`); + return true; + } + if (nonce !== this._drainNonce) { + console.warn( + `${TOWN_LOG} acknowledgeContainerReady: nonce mismatch (got=${nonce}, expected=${this._drainNonce})` + ); + return false; + } + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + console.log(`${TOWN_LOG} acknowledgeContainerReady: draining flag cleared`); + return true; } /** Whether the town is in draining mode (container eviction in progress). */ @@ -588,6 +632,11 @@ export class TownDO extends DurableObject { return this._draining; } + /** The current drain nonce (null when not draining). */ + async getDrainNonce(): Promise { + return this._drainNonce; + } + // ══════════════════════════════════════════════════════════════════ // Town Configuration // ══════════════════════════════════════════════════════════════════ @@ -1139,15 +1188,6 @@ export class TownDO extends DurableObject { } ): Promise { agents.touchAgent(this.sql, agentId, watermark); - - // A heartbeat proves the container is alive. If a draining flag was - // set from a previous eviction, clear it so dispatch resumes. - if (this._draining) { - this._draining = false; - await this.ctx.storage.put('town:draining', false); - console.log(`${TOWN_LOG} touchAgentHeartbeat: cleared draining flag (container alive)`); - } - await this.armAlarmIfNeeded(); } @@ -3147,6 +3187,25 @@ export class TownDO extends DurableObject { Sentry.captureException(err); } + // Auto-clear drain flag if it has been active for too long. + // The drain sequence (drainAll) waits up to 10 minutes, so 15 + // minutes is a generous upper bound. After this timeout the old + // container is certainly dead and it is safe to resume dispatch. + const DRAIN_TIMEOUT_MS = 15 * 60 * 1000; + if ( + this._draining && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_TIMEOUT_MS + ) { + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + logger.info('reconciler: drain timeout exceeded, auto-clearing draining flag'); + } + // Phase 1: Reconcile — compute desired state vs actual state const sideEffects: Array<() => Promise> = []; try { diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index d4bc0d6af9..81be394ee4 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -134,7 +134,7 @@ import { handleListEscalations, handleAcknowledgeEscalation, } from './handlers/town-escalations.handler'; -import { handleContainerEviction } from './handlers/town-eviction.handler'; +import { handleContainerEviction, handleContainerReady } from './handlers/town-eviction.handler'; export { GastownUserDO } from './dos/GastownUser.do'; export { GastownOrgDO } from './dos/GastownOrg.do'; @@ -490,6 +490,12 @@ app.post('/api/towns/:townId/container-eviction', c => ) ); +app.post('/api/towns/:townId/container-ready', c => + instrumented(c, 'POST /api/towns/:townId/container-ready', () => + handleContainerReady(c, c.req.param()) + ) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index 24c429a4c9..9ba2ae4740 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -232,7 +232,10 @@ export async function handleHeartbeat( : undefined ); - return c.json(resSuccess({ heartbeat: true })); + // When draining, include the drain nonce in the response so the + // replacement container can call /container-ready to clear drain. + const drainNonce = await town.getDrainNonce(); + return c.json(resSuccess({ heartbeat: true, ...(drainNonce ? { drainNonce } : {}) })); } const GetOrCreateAgentBody = z.object({ diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts index 9c283c7794..a35d0f9e9b 100644 --- a/cloudflare-gastown/src/handlers/town-eviction.handler.ts +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -13,6 +13,10 @@ import { resSuccess, resError } from '../util/res.util'; * SIGTERM. Inserts a `container_eviction` event and sets the draining * flag so the reconciler stops dispatching new work. * + * Returns a `drainNonce` that must be presented via `/container-ready` + * to clear the drain flag. This prevents stale heartbeats from the + * dying container from prematurely re-enabling dispatch. + * * Authenticated with the container-scoped JWT (same token used for all * container→worker calls). */ @@ -43,8 +47,67 @@ export async function handleContainerEviction( } const town = getTownDOStub(c.env, params.townId); - await town.recordContainerEviction(); + const drainNonce = await town.recordContainerEviction(); console.log(`[town-eviction] container eviction recorded for town=${params.townId}`); - return c.json(resSuccess({ acknowledged: true }), 200); + return c.json(resSuccess({ acknowledged: true, drainNonce }), 200); +} + +/** + * POST /api/towns/:townId/container-ready + * + * Called by the replacement container on startup to signal readiness. + * Clears the draining flag only if the provided `drainNonce` matches + * the nonce generated during the eviction that triggered the drain. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleContainerReady( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[container-ready] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + let nonce: string | undefined; + try { + const body: unknown = await c.req.json(); + if ( + body && + typeof body === 'object' && + 'nonce' in body && + typeof (body as { nonce: unknown }).nonce === 'string' + ) { + nonce = (body as { nonce: string }).nonce; + } + } catch { + // No body or invalid JSON + } + + if (!nonce) { + return c.json(resError('Missing required field: nonce'), 400); + } + + const town = getTownDOStub(c.env, params.townId); + const cleared = await town.acknowledgeContainerReady(nonce); + + console.log(`[container-ready] town=${params.townId} nonce=${nonce} cleared=${cleared}`); + return c.json(resSuccess({ cleared }), 200); } From e4e9a721413f49f562ba01672fb3425dfda56067 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 20:13:52 +0000 Subject: [PATCH 5/7] fix(gastown): fix drain nonce race and idle container readiness Two issues fixed: 1. Heartbeat nonce TOCTOU race: touchAgentHeartbeat() now returns the drain nonce atomically in a single DO call instead of requiring a separate getDrainNonce() RPC. This prevents an in-flight heartbeat from the old container from observing a nonce generated between two separate calls. 2. Idle container readiness: ensureContainerReady() now passes X-Drain-Nonce and X-Town-Id headers to the container health check when draining. The container's /health endpoint reads these and calls /container-ready, handling the case where a replacement container has no running agents and the per-agent heartbeat loop has nothing to iterate. Also adds GET /drain-status endpoint for debugging. --- .../container/src/control-server.ts | 11 ++++- cloudflare-gastown/container/src/heartbeat.ts | 22 +++++++++- cloudflare-gastown/src/dos/Town.do.ts | 18 +++++++- cloudflare-gastown/src/gastown.worker.ts | 12 +++++- .../src/handlers/rig-agents.handler.ts | 9 ++-- .../src/handlers/town-eviction.handler.ts | 42 +++++++++++++++++++ 6 files changed, 105 insertions(+), 9 deletions(-) diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 70ba58c621..86b1d5c8de 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -13,7 +13,7 @@ import { getAgentEvents, registerEventSink, } from './process-manager'; -import { startHeartbeat, stopHeartbeat } from './heartbeat'; +import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat'; import { pushContext as pushDashboardContext } from './dashboard-context'; import { mergeBranch, setupRigBrowseWorktree } from './git-manager'; import { @@ -92,6 +92,15 @@ app.use('*', async (c, next) => { // GET /health app.get('/health', c => { + // When the TownDO is draining, it passes the drain nonce and town + // ID via headers so idle containers (no running agents) can + // acknowledge readiness and clear the drain flag. + const drainNonce = c.req.header('X-Drain-Nonce'); + const townId = c.req.header('X-Town-Id'); + if (drainNonce && townId) { + void notifyContainerReady(townId, drainNonce); + } + const response: HealthResponse = { status: 'ok', agents: activeAgentCount(), diff --git a/cloudflare-gastown/container/src/heartbeat.ts b/cloudflare-gastown/container/src/heartbeat.ts index b503804916..defbe7897c 100644 --- a/cloudflare-gastown/container/src/heartbeat.ts +++ b/cloudflare-gastown/container/src/heartbeat.ts @@ -40,17 +40,29 @@ export function stopHeartbeat(): void { console.log('Heartbeat reporter stopped'); } +/** + * Notify the TownDO that the replacement container is ready. + * Exported so the health endpoint can trigger it when the TownDO + * passes the drain nonce via headers (handles idle containers that + * have no running agents and thus no per-agent heartbeats). + */ +export async function notifyContainerReady(townId: string, drainNonce: string): Promise { + if (containerReadyAcknowledged) return; + await acknowledgeContainerReady(townId, drainNonce); +} + /** * Call POST /container-ready to acknowledge that this is a fresh * container replacing an evicted one. Clears the TownDO drain flag * so the reconciler can resume dispatching. */ async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise { + const apiUrl = gastownApiUrl ?? process.env.GASTOWN_API_URL; const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken; - if (!gastownApiUrl || !currentToken) return; + if (!apiUrl || !currentToken) return; try { - const response = await fetch(`${gastownApiUrl}/api/towns/${townId}/container-ready`, { + const response = await fetch(`${apiUrl}/api/towns/${townId}/container-ready`, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -79,6 +91,12 @@ async function sendHeartbeats(): Promise { const active = listAgents().filter(a => a.status === 'running' || a.status === 'starting'); + // When no agents are active, the per-agent heartbeat loop has + // nothing to send. Idle container drain acknowledgment is handled + // by the /health endpoint instead (the TownDO passes the nonce via + // X-Drain-Nonce headers in ensureContainerReady). + if (active.length === 0) return; + for (const agent of active) { const payload: HeartbeatPayload = { agentId: agent.agentId, diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 92b517e58a..457cb59c8d 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1179,6 +1179,13 @@ export class TownDO extends DurableObject { // ── Heartbeat ───────────────────────────────────────────────────── + /** + * Update an agent's heartbeat timestamp. Returns the current drain + * nonce (if draining) so the caller can include it in the HTTP + * response without a second RPC — preventing a TOCTOU race where + * an in-flight heartbeat from the old container could observe a + * nonce generated between two separate DO calls. + */ async touchAgentHeartbeat( agentId: string, watermark?: { @@ -1186,9 +1193,10 @@ export class TownDO extends DurableObject { lastEventAt?: string | null; activeTools?: string[]; } - ): Promise { + ): Promise<{ drainNonce: string | null }> { agents.touchAgent(this.sql, agentId, watermark); await this.armAlarmIfNeeded(); + return { drainNonce: this._drainNonce }; } async updateAgentStatusMessage(agentId: string, message: string): Promise { @@ -3756,8 +3764,16 @@ export class TownDO extends DurableObject { try { const container = getTownContainerStub(this.env, townId); + const headers: Record = {}; + // When draining, pass the nonce and town ID so the container + // can call /container-ready even if it has no running agents. + if (this._draining && this._drainNonce) { + headers['X-Drain-Nonce'] = this._drainNonce; + headers['X-Town-Id'] = townId; + } await container.fetch('http://container/health', { signal: AbortSignal.timeout(5_000), + headers, }); } catch { // Container is starting up or unavailable — alarm will retry diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 81be394ee4..d5808e0afa 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -134,7 +134,11 @@ import { handleListEscalations, handleAcknowledgeEscalation, } from './handlers/town-escalations.handler'; -import { handleContainerEviction, handleContainerReady } from './handlers/town-eviction.handler'; +import { + handleContainerEviction, + handleContainerReady, + handleDrainStatus, +} from './handlers/town-eviction.handler'; export { GastownUserDO } from './dos/GastownUser.do'; export { GastownOrgDO } from './dos/GastownOrg.do'; @@ -496,6 +500,12 @@ app.post('/api/towns/:townId/container-ready', c => ) ); +app.get('/api/towns/:townId/drain-status', c => + instrumented(c, 'GET /api/towns/:townId/drain-status', () => + handleDrainStatus(c, c.req.param()) + ) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index 9ba2ae4740..f1dd8062a8 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -221,7 +221,11 @@ export async function handleHeartbeat( // No body or invalid JSON — old container format, just touch } - await town.touchAgentHeartbeat( + // touchAgentHeartbeat returns the drain nonce atomically — no + // second RPC needed, which prevents a TOCTOU race where an + // in-flight heartbeat from the old container could observe a nonce + // generated between two separate DO calls. + const { drainNonce } = await town.touchAgentHeartbeat( params.agentId, watermark ? { @@ -232,9 +236,6 @@ export async function handleHeartbeat( : undefined ); - // When draining, include the drain nonce in the response so the - // replacement container can call /container-ready to clear drain. - const drainNonce = await town.getDrainNonce(); return c.json(resSuccess({ heartbeat: true, ...(drainNonce ? { drainNonce } : {}) })); } diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts index a35d0f9e9b..b608478885 100644 --- a/cloudflare-gastown/src/handlers/town-eviction.handler.ts +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -53,6 +53,48 @@ export async function handleContainerEviction( return c.json(resSuccess({ acknowledged: true, drainNonce }), 200); } +/** + * GET /api/towns/:townId/drain-status + * + * Lightweight endpoint for the container to poll drain state. Used by + * the heartbeat module when no agents are running — the per-agent + * heartbeat loop has nothing to iterate, so a separate check is needed + * to discover the drain nonce and call /container-ready. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleDrainStatus( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + const [draining, drainNonce] = await Promise.all([ + town.isDraining(), + town.getDrainNonce(), + ]); + + return c.json(resSuccess({ draining, drainNonce }), 200); +} + /** * POST /api/towns/:townId/container-ready * From f9b62d9b0c7a993c98934ea5cc884fd240f21eee Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 20:48:16 +0000 Subject: [PATCH 6/7] fix(gastown): delay drain nonce handoff via health check until old container exits ensureContainerReady() talks to whichever container is currently serving this town. During drain, that is still the old container. Passing X-Drain-Nonce immediately would let the old container clear drain before the replacement is up. Now the nonce is only passed via health check headers after 11 minutes (beyond the 10-min drainAll wait + exit), ensuring the old container has exited before the handoff. --- cloudflare-gastown/src/dos/Town.do.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 457cb59c8d..1b36425fd7 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -3765,9 +3765,20 @@ export class TownDO extends DurableObject { try { const container = getTownContainerStub(this.env, townId); const headers: Record = {}; - // When draining, pass the nonce and town ID so the container - // can call /container-ready even if it has no running agents. - if (this._draining && this._drainNonce) { + // When draining AND enough time has passed for the old container + // to have exited (drainAll waits up to 10 min + exit), pass the + // nonce so the replacement container can acknowledge readiness. + // We only send the nonce after 11 minutes to avoid the old + // (still-draining) container receiving it and clearing drain + // prematurely — the health check goes to whichever container is + // currently serving this town. + const DRAIN_HANDOFF_DELAY_MS = 11 * 60 * 1000; + if ( + this._draining && + this._drainNonce && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_HANDOFF_DELAY_MS + ) { headers['X-Drain-Nonce'] = this._drainNonce; headers['X-Town-Id'] = townId; } From 3068a4bd9e308519406799d37f83c349f5ebb618 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 21:32:57 +0000 Subject: [PATCH 7/7] fix(gastown): skip ensureContainerReady early-return when draining ensureContainerReady() bails out early when there is no active work and the rig is not recently configured. This prevented the drain nonce handoff from ever reaching the replacement container in idle towns. Now the draining flag bypasses that early-return so the health check (with X-Drain-Nonce headers) always fires. --- cloudflare-gastown/src/dos/Town.do.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 1b36425fd7..7cccd35ad5 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -3749,7 +3749,7 @@ export class TownDO extends DurableObject { if (!hasRigs) return; const hasWork = this.hasActiveWork(); - if (!hasWork) { + if (!hasWork && !this._draining) { const rigList = rigs.listRigs(this.sql); const newestRigAge = rigList.reduce((min, r) => { const age = Date.now() - new Date(r.created_at).getTime();