Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions cloudflare-gastown/container/src/completion-reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,47 @@

import type { ManagedAgent } from './types';

/**
* Notify the TownDO that the mayor has finished processing a prompt and
* is now waiting for user input. This lets the TownDO transition the
* mayor from "working" to "waiting", which drops the alarm to the idle
* cadence and stops health-check pings that reset the container's
* sleepAfter timer.
*
* Best-effort: errors are logged but do not propagate.
*/
export async function reportMayorWaiting(agent: ManagedAgent): Promise<void> {
const apiUrl = agent.gastownApiUrl;
const authToken =
process.env.GASTOWN_CONTAINER_TOKEN ?? agent.gastownContainerToken ?? agent.gastownSessionToken;
if (!apiUrl || !authToken) {
console.warn(
`Cannot report mayor ${agent.agentId} waiting: no API credentials on agent record`
);
return;
}

const url = `${apiUrl}/api/towns/${agent.townId}/rigs/${agent.rigId}/agents/${agent.agentId}/waiting`;
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${authToken}`,
},
body: JSON.stringify({ agentId: agent.agentId, firedAt: Date.now() }),
});

if (!response.ok) {
console.warn(
`Failed to report mayor ${agent.agentId} waiting: ${response.status} ${response.statusText}`
);
}
} catch (err) {
console.warn(`Error reporting mayor ${agent.agentId} waiting:`, err);
}
}

/**
* Notify the Rig DO that an agent session has completed or failed.
* Best-effort: errors are logged but do not propagate.
Expand Down
7 changes: 6 additions & 1 deletion cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { createKilo, type KiloClient } from '@kilocode/sdk';
import { z } from 'zod';
import type { ManagedAgent, StartAgentRequest } from './types';
import { reportAgentCompleted } from './completion-reporter';
import { reportAgentCompleted, reportMayorWaiting } from './completion-reporter';
import { buildKiloConfigContent } from './agent-runner';
import { log } from './logger';

Expand Down Expand Up @@ -549,6 +549,11 @@ async function subscribeToEvents(
if (event.type === 'session.idle') {
if (request.role === 'mayor') {
// Mayor agents are persistent — session.idle means "turn done", not exit.
// Notify the TownDO so it can transition the mayor to "waiting"
// (alive in container, not doing LLM work). This lets the alarm
// drop to the idle cadence and stops health-check pings that
// would reset the container's sleepAfter timer.
void reportMayorWaiting(agent);
continue;
}
// Non-mayor: check for pending nudges before deciding to exit.
Expand Down
2 changes: 1 addition & 1 deletion cloudflare-gastown/src/db/tables/agent-metadata.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/
// queries parsing through AgentMetadataRecord don't throw on old rows.
// Application code should only create the known roles below.
const AgentRole = z.enum(['polecat', 'refinery', 'mayor']).or(z.string());
const AgentProcessStatus = z.enum(['idle', 'working', 'stalled', 'dead']).or(z.string());
const AgentProcessStatus = z.enum(['idle', 'working', 'waiting', 'stalled', 'dead']).or(z.string());

export const AgentMetadataRecord = z.object({
bead_id: z.string(),
Expand Down
85 changes: 74 additions & 11 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ function formatEventMessage(row: Record<string, unknown>): string {

// Alarm intervals
const ACTIVE_ALARM_INTERVAL_MS = 5_000; // 5s when agents are active
const IDLE_ALARM_INTERVAL_MS = 1 * 60_000; // 1m when idle
const IDLE_ALARM_INTERVAL_MS = 5 * 60_000; // 5m when idle (no working agents)
Comment thread
jrf0110 marked this conversation as resolved.

// Escalation constants
const STALE_ESCALATION_THRESHOLD_MS = 4 * 60 * 60 * 1000;
Expand Down Expand Up @@ -547,6 +547,10 @@ export class TownDO extends DurableObject<Env> {
private _townId: string | null = null;
private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null;
private _dashboardContext: string | null = null;
/** Monotonic timestamp of the last working → transition for the mayor.
* Used to reject stale session.idle callbacks that arrive after a new
* prompt has already re-activated the mayor. */
private _mayorWorkingSince = 0;
private _draining = false;
private _drainNonce: string | null = null;
private _drainStartedAt: number | null = null;
Expand Down Expand Up @@ -676,7 +680,7 @@ export class TownDO extends DurableObject<Env> {
const townConfig = await this.getTownConfig();
const userId = townConfig.owner_user_id ?? townId;
await dispatch.forceRefreshContainerToken(this.env, townId, userId);
this.lastContainerTokenRefreshAt = Date.now();
await this.ctx.storage.put('container:lastTokenRefreshAt', Date.now());
}

/**
Expand Down Expand Up @@ -1558,6 +1562,43 @@ export class TownDO extends DurableObject<Env> {
await this.armAlarmIfNeeded();
}

/**
* Transition the mayor from "working" to "waiting". Called by the
* container when the mayor's session goes idle (turn done, waiting for
* user input). The "waiting" status means the mayor is alive in the
* container but not doing LLM work — hasActiveWork() returns false,
* so the alarm drops to the idle cadence and health-check pings stop
* resetting the container's sleepAfter timer.
*
* @param firedAt - Timestamp (ms) when the container fired this
* callback. Used to reject stale session.idle callbacks from a
* previous turn that arrive after the mayor has already been
* re-activated by a new prompt.
*/
async mayorWaiting(agentId?: string, firedAt?: number): Promise<void> {
let resolvedAgentId = agentId;
if (!resolvedAgentId) {
const mayor = agents.listAgents(this.sql, { role: 'mayor' })[0];
if (mayor) resolvedAgentId = mayor.id;
}
if (!resolvedAgentId) return;

const agent = agents.getAgent(this.sql, resolvedAgentId);
if (!agent || agent.role !== 'mayor') return;

// Only transition from working → waiting. If the agent has already
// been set to idle/stalled/dead by another path, don't overwrite.
// Guard against stale session.idle callbacks: reportMayorWaiting is
// fire-and-forget, so a callback from a previous turn can arrive
// after sendMayorMessage has already re-activated the mayor. If the
// callback carries a firedAt timestamp that predates the last
// working transition, it belongs to an older turn — reject it.
if (agent.status === 'working') {
if (firedAt && firedAt < this._mayorWorkingSince) return;
agents.updateAgentStatus(this.sql, resolvedAgentId, 'waiting');
}
}

async agentCompleted(
agentId: string,
input: { status: 'completed' | 'failed'; reason?: string }
Expand Down Expand Up @@ -2106,7 +2147,20 @@ export class TownDO extends DurableObject<Env> {

if (isAlive) {
const sent = await dispatch.sendMessageToAgent(this.env, townId, mayor.id, combinedMessage);
sessionStatus = sent ? 'active' : 'idle';
if (sent) {
// Transition waiting → working so the alarm runs at the active cadence
// while the mayor processes this prompt. Also reschedule the alarm
// immediately — the idle alarm may be up to 5 min away, and we need
// the reconciler/health-check loop to resume promptly.
if (mayor.status === 'waiting') {
agents.updateAgentStatus(this.sql, mayor.id, 'working');
Comment thread
jrf0110 marked this conversation as resolved.
this._mayorWorkingSince = Date.now();
await this.ctx.storage.setAlarm(Date.now() + ACTIVE_ALARM_INTERVAL_MS);
Comment thread
jrf0110 marked this conversation as resolved.
}
sessionStatus = 'active';
} else {
sessionStatus = 'idle';
}
} else {
const townConfig = await this.getTownConfig();
const rigConfig = await this.getMayorRigConfig();
Expand Down Expand Up @@ -2152,6 +2206,7 @@ export class TownDO extends DurableObject<Env> {

if (started) {
agents.updateAgentStatus(this.sql, mayor.id, 'working');
this._mayorWorkingSince = Date.now();
sessionStatus = 'starting';
} else {
sessionStatus = 'idle';
Expand Down Expand Up @@ -2200,8 +2255,9 @@ export class TownDO extends DurableObject<Env> {
const isAlive = containerStatus.status === 'running' || containerStatus.status === 'starting';

if (isAlive) {
const status = mayor.status === 'working' || mayor.status === 'stalled' ? 'active' : 'idle';
return { agentId: mayor.id, sessionStatus: status };
const isActive =
mayor.status === 'working' || mayor.status === 'stalled' || mayor.status === 'waiting';
return { agentId: mayor.id, sessionStatus: isActive ? 'active' : 'idle' };
}

// Start the container with an idle mayor (no initial prompt)
Expand Down Expand Up @@ -2253,6 +2309,7 @@ export class TownDO extends DurableObject<Env> {

if (started) {
agents.updateAgentStatus(this.sql, mayor.id, 'working');
this._mayorWorkingSince = Date.now();
return { agentId: mayor.id, sessionStatus: 'starting' };
}

Expand Down Expand Up @@ -2317,7 +2374,7 @@ export class TownDO extends DurableObject<Env> {
const mapStatus = (agentStatus: string): 'idle' | 'active' | 'starting' => {
switch (agentStatus) {
case 'working':
return 'active';
case 'waiting':
case 'stalled':
return 'active';
default:
Expand Down Expand Up @@ -3543,12 +3600,17 @@ export class TownDO extends DurableObject<Env> {
* from the alarm handler, throttled to once per hour (tokens have
* 8h expiry). The TownContainerDO stores it as an env var so it's
* available to all agents in the container.
*
* The throttle timestamp is persisted in ctx.storage so it survives
* DO eviction. Without persistence, eviction resets the throttle to 0
* and the refresh fires immediately on the next alarm tick, sending
* requests that reset the container's sleepAfter timer (#1409).
*/
private lastContainerTokenRefreshAt = 0;
private async refreshContainerToken(): Promise<void> {
const TOKEN_REFRESH_INTERVAL_MS = 60 * 60_000; // 1 hour
const now = Date.now();
if (now - this.lastContainerTokenRefreshAt < TOKEN_REFRESH_INTERVAL_MS) return;
const lastRefresh = (await this.ctx.storage.get<number>('container:lastTokenRefreshAt')) ?? 0;
if (now - lastRefresh < TOKEN_REFRESH_INTERVAL_MS) return;

const townId = this.townId;
if (!townId) return;
Expand All @@ -3557,7 +3619,7 @@ export class TownDO extends DurableObject<Env> {
await dispatch.refreshContainerToken(this.env, townId, userId);
// Only mark as refreshed after success — failed refreshes should
// be retried on the next alarm tick, not throttled for an hour.
this.lastContainerTokenRefreshAt = now;
await this.ctx.storage.put('container:lastTokenRefreshAt', now);
}

/**
Expand Down Expand Up @@ -4114,6 +4176,7 @@ export class TownDO extends DurableObject<Env> {
};
agents: {
working: number;
waiting: number;
idle: number;
stalled: number;
dead: number;
Expand Down Expand Up @@ -4157,7 +4220,7 @@ export class TownDO extends DurableObject<Env> {
[]
),
];
const agentCounts = { working: 0, idle: 0, stalled: 0, dead: 0, total: 0 };
const agentCounts = { working: 0, waiting: 0, idle: 0, stalled: 0, dead: 0, total: 0 };
for (const row of agentRows) {
const s = `${row.status as string}`;
const c = Number(row.cnt);
Expand Down Expand Up @@ -4266,7 +4329,7 @@ export class TownDO extends DurableObject<Env> {
alarm: {
nextFireAt: currentAlarm ? new Date(Number(currentAlarm)).toISOString() : null,
intervalMs,
intervalLabel: active ? 'active (5s)' : 'idle (60s)',
intervalLabel: active ? 'active (5s)' : 'idle (5m)',
},
agents: agentCounts,
beads: beadCounts,
Expand Down
6 changes: 6 additions & 0 deletions cloudflare-gastown/src/gastown.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
handleAgentDone,
handleRequestChanges,
handleAgentCompleted,
handleAgentWaiting,
handleWriteCheckpoint,
handleWriteEvictionContext,
handleCheckMail,
Expand Down Expand Up @@ -422,6 +423,11 @@ app.post('/api/towns/:townId/rigs/:rigId/agents/:agentId/completed', c =>
handleAgentCompleted(c, c.req.param())
)
);
app.post('/api/towns/:townId/rigs/:rigId/agents/:agentId/waiting', c =>
instrumented(c, 'POST /api/towns/:townId/rigs/:rigId/agents/:agentId/waiting', () =>
handleAgentWaiting(c, c.req.param())
)
);
app.post('/api/towns/:townId/rigs/:rigId/agents/:agentId/checkpoint', c =>
instrumented(c, 'POST /api/towns/:townId/rigs/:rigId/agents/:agentId/checkpoint', () =>
handleWriteCheckpoint(c, c.req.param())
Expand Down
18 changes: 18 additions & 0 deletions cloudflare-gastown/src/handlers/rig-agents.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ export async function handleAgentCompleted(
return c.json(resSuccess({ completed: true }));
}

/**
* Called by the container when the mayor's session goes idle (turn done,
* waiting for user input). Transitions the mayor from "working" to
* "waiting" so the alarm drops to the idle cadence and health-check
* pings stop resetting the container's sleepAfter timer.
*/
export async function handleAgentWaiting(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
) {
const body = (await parseJsonBody(c)) as Record<string, unknown>;
const firedAt = typeof body?.firedAt === 'number' ? body.firedAt : undefined;
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.mayorWaiting(params.agentId, firedAt);
return c.json(resSuccess({ acknowledged: true }));
}

export async function handleWriteCheckpoint(
c: Context<GastownEnv>,
params: { rigId: string; agentId: string }
Expand Down
2 changes: 1 addition & 1 deletion cloudflare-gastown/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export type BeadFilter = {
export const AgentRole = z.enum(['polecat', 'refinery', 'mayor']);
export type AgentRole = z.infer<typeof AgentRole>;

export const AgentStatus = z.enum(['idle', 'working', 'stalled', 'dead']);
export const AgentStatus = z.enum(['idle', 'working', 'waiting', 'stalled', 'dead']);
export type AgentStatus = z.infer<typeof AgentStatus>;

/**
Expand Down
Loading