Skip to content
Merged
11 changes: 10 additions & 1 deletion cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
getAgentEvents,
registerEventSink,
} from './process-manager';
import { startHeartbeat, stopHeartbeat } from './heartbeat';
import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat';
import { pushContext as pushDashboardContext } from './dashboard-context';
import { mergeBranch, setupRigBrowseWorktree } from './git-manager';
import {
Expand Down Expand Up @@ -92,6 +92,15 @@ app.use('*', async (c, next) => {

// GET /health
app.get('/health', c => {
// When the TownDO is draining, it passes the drain nonce and town
// ID via headers so idle containers (no running agents) can
// acknowledge readiness and clear the drain flag.
const drainNonce = c.req.header('X-Drain-Nonce');
const townId = c.req.header('X-Town-Id');
if (drainNonce && townId) {
void notifyContainerReady(townId, drainNonce);
}

const response: HealthResponse = {
status: 'ok',
agents: activeAgentCount(),
Expand Down
63 changes: 63 additions & 0 deletions cloudflare-gastown/container/src/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const HEARTBEAT_INTERVAL_MS = 30_000;
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let gastownApiUrl: string | null = null;
let sessionToken: string | null = null;
/** Set once we've successfully acknowledged container-ready. */
let containerReadyAcknowledged = false;

/**
* Configure and start the heartbeat reporter.
Expand Down Expand Up @@ -38,6 +40,49 @@ export function stopHeartbeat(): void {
console.log('Heartbeat reporter stopped');
}

/**
* Notify the TownDO that the replacement container is ready.
* Exported so the health endpoint can trigger it when the TownDO
* passes the drain nonce via headers (handles idle containers that
* have no running agents and thus no per-agent heartbeats).
*/
export async function notifyContainerReady(townId: string, drainNonce: string): Promise<void> {
if (containerReadyAcknowledged) return;
await acknowledgeContainerReady(townId, drainNonce);
}

/**
* Call POST /container-ready to acknowledge that this is a fresh
* container replacing an evicted one. Clears the TownDO drain flag
* so the reconciler can resume dispatching.
*/
async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise<void> {
const apiUrl = gastownApiUrl ?? process.env.GASTOWN_API_URL;
const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken;
if (!apiUrl || !currentToken) return;

try {
const response = await fetch(`${apiUrl}/api/towns/${townId}/container-ready`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${currentToken}`,
},
body: JSON.stringify({ nonce: drainNonce }),
});
if (response.ok) {
containerReadyAcknowledged = true;
console.log(`[heartbeat] container-ready acknowledged for town=${townId}`);
} else {
console.warn(
`[heartbeat] container-ready failed for town=${townId}: ${response.status} ${response.statusText}`
);
}
} catch (err) {
console.warn(`[heartbeat] container-ready error for town=${townId}:`, err);
}
}

async function sendHeartbeats(): Promise<void> {
// Prefer the live container token (refreshed via POST /refresh-token)
// over the token captured at startHeartbeat() time.
Expand All @@ -46,6 +91,12 @@ async function sendHeartbeats(): Promise<void> {

const active = listAgents().filter(a => a.status === 'running' || a.status === 'starting');

// When no agents are active, the per-agent heartbeat loop has
// nothing to send. Idle container drain acknowledgment is handled
// by the /health endpoint instead (the TownDO passes the nonce via
// X-Drain-Nonce headers in ensureContainerReady).
if (active.length === 0) return;

for (const agent of active) {
const payload: HeartbeatPayload = {
agentId: agent.agentId,
Expand Down Expand Up @@ -77,6 +128,18 @@ async function sendHeartbeats(): Promise<void> {
console.warn(
`Heartbeat failed for agent ${agent.agentId}: ${response.status} ${response.statusText}`
);
} else if (!containerReadyAcknowledged) {
Comment thread
jrf0110 marked this conversation as resolved.
// If the TownDO is draining, the heartbeat response includes a
// drainNonce. Use it to call /container-ready and clear drain.
try {
const body = (await response.json()) as { data?: { drainNonce?: string } };
const nonce = body?.data?.drainNonce;
if (nonce) {
void acknowledgeContainerReady(agent.townId, nonce);
}
} catch {
// Non-JSON or unexpected shape — ignore
}
}
} catch (err) {
console.warn(`Heartbeat error for agent ${agent.agentId}:`, err);
Expand Down
1 change: 1 addition & 0 deletions cloudflare-gastown/src/db/tables/town-events.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const TownEventType = z.enum([
'agent_done',
'agent_completed',
'container_status',
'container_eviction',
'pr_status_changed',
'bead_created',
'bead_cancelled',
Expand Down
126 changes: 123 additions & 3 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ export class TownDO extends DurableObject<Env> {
const townConfig = await config.getTownConfig(this.ctx.storage);
this._ownerUserId = townConfig.owner_user_id;

// Load persisted draining flag, nonce, and start time
this._draining = (await this.ctx.storage.get<boolean>('town:draining')) ?? false;
this._drainNonce = (await this.ctx.storage.get<string>('town:drainNonce')) ?? null;
this._drainStartedAt = (await this.ctx.storage.get<number>('town:drainStartedAt')) ?? null;

// All tables are now initialized via beads.initBeadTables():
// beads, bead_events, bead_dependencies, agent_metadata, review_metadata,
// escalation_metadata, convoy_metadata
Expand Down Expand Up @@ -537,6 +542,9 @@ export class TownDO extends DurableObject<Env> {
private _townId: string | null = null;
private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null;
private _dashboardContext: string | null = null;
private _draining = false;
private _drainNonce: string | null = null;
private _drainStartedAt: number | null = null;

private get townId(): string {
return this._townId ?? this.ctx.id.name ?? this.ctx.id.toString();
Expand All @@ -563,6 +571,72 @@ export class TownDO extends DurableObject<Env> {
return this._dashboardContext;
}

// ══════════════════════════════════════════════════════════════════
// Container Eviction (graceful drain)
// ══════════════════════════════════════════════════════════════════

/**
* Record a container eviction event and set the draining flag.
* Called by the container when it receives SIGTERM. While draining,
* the reconciler skips dispatch to prevent new work from starting.
*
* Returns a drain nonce that must be presented via
* `acknowledgeContainerReady()` to clear the drain flag. This
* prevents stale heartbeats from the dying container from
* prematurely re-enabling dispatch.
*/
async recordContainerEviction(): Promise<string> {
events.insertEvent(this.sql, 'container_eviction', {});
const nonce = crypto.randomUUID();
const startedAt = Date.now();
this._draining = true;
this._drainNonce = nonce;
this._drainStartedAt = startedAt;
await this.ctx.storage.put('town:draining', true);
await this.ctx.storage.put('town:drainNonce', nonce);
await this.ctx.storage.put('town:drainStartedAt', startedAt);
console.log(`${TOWN_LOG} recordContainerEviction: draining flag set, nonce=${nonce}`);
return nonce;
}

/**
* Acknowledge that the replacement container is ready. Clears the
* draining flag only if the provided nonce matches the one generated
* during `recordContainerEviction()`. This ensures that only the
* new container (which received the nonce via startup config) can
* re-enable dispatch — not a stale heartbeat from the old container.
*/
async acknowledgeContainerReady(nonce: string): Promise<boolean> {
if (!this._draining) {
console.log(`${TOWN_LOG} acknowledgeContainerReady: not draining, noop`);
return true;
}
if (nonce !== this._drainNonce) {
console.warn(
`${TOWN_LOG} acknowledgeContainerReady: nonce mismatch (got=${nonce}, expected=${this._drainNonce})`
);
return false;
}
this._draining = false;
this._drainNonce = null;
this._drainStartedAt = null;
await this.ctx.storage.put('town:draining', false);
await this.ctx.storage.delete('town:drainNonce');
await this.ctx.storage.delete('town:drainStartedAt');
console.log(`${TOWN_LOG} acknowledgeContainerReady: draining flag cleared`);
return true;
}

/** Whether the town is in draining mode (container eviction in progress). */
async isDraining(): Promise<boolean> {
return this._draining;
}

/** The current drain nonce (null when not draining). */
async getDrainNonce(): Promise<string | null> {
return this._drainNonce;
}

// ══════════════════════════════════════════════════════════════════
// Town Configuration
// ══════════════════════════════════════════════════════════════════
Expand Down Expand Up @@ -1105,16 +1179,24 @@ export class TownDO extends DurableObject<Env> {

// ── Heartbeat ─────────────────────────────────────────────────────

/**
* Update an agent's heartbeat timestamp. Returns the current drain
* nonce (if draining) so the caller can include it in the HTTP
* response without a second RPC — preventing a TOCTOU race where
* an in-flight heartbeat from the old container could observe a
* nonce generated between two separate DO calls.
*/
async touchAgentHeartbeat(
agentId: string,
watermark?: {
lastEventType?: string | null;
lastEventAt?: string | null;
activeTools?: string[];
}
): Promise<void> {
): Promise<{ drainNonce: string | null }> {
agents.touchAgent(this.sql, agentId, watermark);
await this.armAlarmIfNeeded();
return { drainNonce: this._drainNonce };
}

async updateAgentStatusMessage(agentId: string, message: string): Promise<void> {
Expand Down Expand Up @@ -3113,10 +3195,29 @@ export class TownDO extends DurableObject<Env> {
Sentry.captureException(err);
}

// Auto-clear drain flag if it has been active for too long.
// The drain sequence (drainAll) waits up to 10 minutes, so 15
// minutes is a generous upper bound. After this timeout the old
// container is certainly dead and it is safe to resume dispatch.
const DRAIN_TIMEOUT_MS = 15 * 60 * 1000;
if (
this._draining &&
this._drainStartedAt &&
Date.now() - this._drainStartedAt > DRAIN_TIMEOUT_MS
) {
this._draining = false;
this._drainNonce = null;
this._drainStartedAt = null;
await this.ctx.storage.put('town:draining', false);
await this.ctx.storage.delete('town:drainNonce');
await this.ctx.storage.delete('town:drainStartedAt');
logger.info('reconciler: drain timeout exceeded, auto-clearing draining flag');
}

// Phase 1: Reconcile — compute desired state vs actual state
const sideEffects: Array<() => Promise<void>> = [];
try {
const actions = reconciler.reconcile(this.sql);
const actions = reconciler.reconcile(this.sql, { draining: this._draining });
metrics.actionsEmitted = actions.length;
for (const a of actions) {
metrics.actionsByType[a.type] = (metrics.actionsByType[a.type] ?? 0) + 1;
Expand Down Expand Up @@ -3648,7 +3749,7 @@ export class TownDO extends DurableObject<Env> {
if (!hasRigs) return;

const hasWork = this.hasActiveWork();
if (!hasWork) {
if (!hasWork && !this._draining) {
const rigList = rigs.listRigs(this.sql);
const newestRigAge = rigList.reduce((min, r) => {
const age = Date.now() - new Date(r.created_at).getTime();
Expand All @@ -3663,8 +3764,27 @@ export class TownDO extends DurableObject<Env> {

try {
const container = getTownContainerStub(this.env, townId);
const headers: Record<string, string> = {};
// When draining AND enough time has passed for the old container
// to have exited (drainAll waits up to 10 min + exit), pass the
// nonce so the replacement container can acknowledge readiness.
// We only send the nonce after 11 minutes to avoid the old
// (still-draining) container receiving it and clearing drain
// prematurely — the health check goes to whichever container is
// currently serving this town.
const DRAIN_HANDOFF_DELAY_MS = 11 * 60 * 1000;
Comment thread
jrf0110 marked this conversation as resolved.
if (
this._draining &&
this._drainNonce &&
this._drainStartedAt &&
Date.now() - this._drainStartedAt > DRAIN_HANDOFF_DELAY_MS
) {
headers['X-Drain-Nonce'] = this._drainNonce;
Comment thread
jrf0110 marked this conversation as resolved.
headers['X-Town-Id'] = townId;
}
await container.fetch('http://container/health', {
signal: AbortSignal.timeout(5_000),
headers,
});
} catch {
// Container is starting up or unavailable — alarm will retry
Expand Down
Loading