diff --git a/cloudflare-gastown/container/plugin/client.ts b/cloudflare-gastown/container/plugin/client.ts index 94e3f73efe..1627c2a16f 100644 --- a/cloudflare-gastown/container/plugin/client.ts +++ b/cloudflare-gastown/container/plugin/client.ts @@ -174,6 +174,21 @@ export class GastownClient { body: JSON.stringify({ summary }), }); } + + /** + * Resolve a triage_request bead with the chosen action and notes. + * The TownDO closes the triage request and executes any side effects. + */ + async resolveTriage(input: { + triage_request_bead_id: string; + action: string; + resolution_notes: string; + }): Promise { + return this.request(this.rigPath('/triage/resolve'), { + method: 'POST', + body: JSON.stringify(input), + }); + } } /** diff --git a/cloudflare-gastown/container/plugin/tools.ts b/cloudflare-gastown/container/plugin/tools.ts index f18480d17d..ead4bf4cea 100644 --- a/cloudflare-gastown/container/plugin/tools.ts +++ b/cloudflare-gastown/container/plugin/tools.ts @@ -184,5 +184,32 @@ export function createTools(client: GastownClient) { return `Advanced to step ${result.currentStep + 1} of ${result.totalSteps}.`; }, }), + + gt_triage_resolve: tool({ + description: + 'Resolve a triage request with your chosen action. The TownDO will execute ' + + 'the action (restart agent, close bead, escalate, etc.) and close the triage request.', + args: { + triage_request_bead_id: tool.schema + .string() + .describe('The UUID of the triage_request bead to resolve'), + action: tool.schema + .string() + .describe( + 'The chosen action from the available options (e.g. RESTART, ESCALATE_TO_MAYOR, CLOSE_BEAD)' + ), + resolution_notes: tool.schema + .string() + .describe('Brief explanation of your reasoning for choosing this action'), + }, + async execute(args) { + const bead = await client.resolveTriage({ + triage_request_bead_id: args.triage_request_bead_id, + action: args.action, + resolution_notes: args.resolution_notes, + }); + return `Triage request ${args.triage_request_bead_id} resolved with action: ${args.action}`; + }, + }), }; } diff --git a/cloudflare-gastown/container/plugin/types.ts b/cloudflare-gastown/container/plugin/types.ts index 7a78c08aa0..2a2ad85a93 100644 --- a/cloudflare-gastown/container/plugin/types.ts +++ b/cloudflare-gastown/container/plugin/types.ts @@ -30,7 +30,7 @@ export type Bead = { closed_at: string | null; }; -export type AgentRole = 'polecat' | 'refinery' | 'mayor' | 'witness'; +export type AgentRole = 'polecat' | 'refinery' | 'mayor'; export type AgentStatus = 'idle' | 'working' | 'stalled' | 'dead'; export type Agent = { diff --git a/cloudflare-gastown/container/src/agent-runner.ts b/cloudflare-gastown/container/src/agent-runner.ts index aec33a652d..391444e1fa 100644 --- a/cloudflare-gastown/container/src/agent-runner.ts +++ b/cloudflare-gastown/container/src/agent-runner.ts @@ -1,7 +1,7 @@ import type { Config } from '@kilocode/sdk'; import { z } from 'zod'; import { writeFile } from 'node:fs/promises'; -import { cloneRepo, createWorktree } from './git-manager'; +import { cloneRepo, createWorktree, setupRigBrowseWorktree } from './git-manager'; import { startAgent } from './process-manager'; import { getCurrentTownConfig } from './control-server'; import type { ManagedAgent, StartAgentRequest } from './types'; @@ -210,15 +210,16 @@ async function configureGitCredentials( * is available, call the Next.js server to resolve fresh credentials. * Returns the (potentially enriched) envVars. */ -async function resolveGitCredentialsIfMissing( - request: StartAgentRequest -): Promise> { - const envVars = { ...(request.envVars ?? {}) }; +export async function resolveGitCredentials(params: { + envVars?: Record; + platformIntegrationId?: string; +}): Promise> { + const envVars = { ...(params.envVars ?? {}) }; const hasToken = !!(envVars.GIT_TOKEN || envVars.GITHUB_TOKEN || envVars.GITLAB_TOKEN); if (hasToken) return envVars; - const integrationId = request.platformIntegrationId; + const integrationId = params.platformIntegrationId; const kiloToken = envVars.KILOCODE_TOKEN; // The Next.js server URL — in dev it's localhost:3000, in prod it's the main app URL. // We derive it from KILO_API_URL (the gateway URL) or fall back to localhost. @@ -360,6 +361,58 @@ async function createMayorWorkspace(rigId: string): Promise { return dir; } +/** + * Write the mayor's system prompt to AGENTS.md in the workspace. + * + * kilo/opencode reads AGENTS.md from the project root for ALL sessions, + * including built-in sub-agents (explore, general). By writing the full + * system prompt here instead of passing it via the session.prompt API, + * the mayor and all its sub-agents share the exact same instructions. + * + * The system prompt comes from the TownDO (buildMayorSystemPrompt) and + * is the single source of truth. When it changes (gastown updates, + * user customization), the TownDO sends the updated prompt and we + * rewrite this file. + */ +async function writeMayorSystemPromptToAgentsMd( + workspaceDir: string, + systemPrompt: string +): Promise { + const { writeFile, readdir, stat } = await import('node:fs/promises'); + const path = await import('node:path'); + + // Append a dynamic section listing discovered browse worktrees so + // sub-agents know where to find rig codebases. + const rigsRoot = '/workspace/rigs'; + let rigDirs: string[] = []; + try { + rigDirs = await readdir(rigsRoot); + } catch { + // No rigs directory yet + } + + const browseEntries: string[] = []; + for (const entry of rigDirs) { + if (entry.startsWith('mayor-')) continue; + const browseDir = path.join(rigsRoot, entry, 'browse'); + try { + const s = await stat(browseDir); + if (s.isDirectory()) { + browseEntries.push(`- **${entry}**: \`${browseDir}\``); + } + } catch { + // No browse worktree yet + } + } + + const browseSuffix = + browseEntries.length > 0 + ? `\n\n## Discovered Browse Worktrees\n\n${browseEntries.join('\n')}` + : ''; + + await writeFile(path.join(workspaceDir, 'AGENTS.md'), systemPrompt + browseSuffix); +} + /** * Run the full agent startup sequence: * 1. Clone/fetch the rig's git repo (or create minimal workspace for mayor) @@ -368,17 +421,54 @@ async function createMayorWorkspace(rigId: string): Promise { * 4. Start a kilo serve instance for the worktree (or reuse existing) * 5. Create a session and send the initial prompt via HTTP API */ -export async function runAgent(request: StartAgentRequest): Promise { +export async function runAgent(originalRequest: StartAgentRequest): Promise { + let request = originalRequest; let workdir: string; if (request.role === 'mayor') { // Mayor doesn't need a repo clone — just a git-initialized directory workdir = await createMayorWorkspace(request.rigId); + + // On fresh containers the browse worktrees won't exist yet. Set them + // up for all known rigs before writing AGENTS.md so the mayor (and its + // sub-agents) can immediately browse codebases. + if (request.rigs?.length) { + const envVars = await resolveGitCredentials(request); + await Promise.allSettled( + request.rigs.map(async rig => { + try { + await setupRigBrowseWorktree({ + rigId: rig.rigId, + gitUrl: rig.gitUrl, + defaultBranch: rig.defaultBranch, + envVars, + }); + } catch (err) { + const msg = err instanceof Error ? err.message.split('\n')[0] : String(err); + console.warn(`[runAgent] browse worktree setup failed for rig=${rig.rigId}: ${msg}`); + } + }) + ); + } + + // Write the system prompt to AGENTS.md so the mayor AND its built-in + // sub-agents (explore, general) all share the same instructions. + // The system prompt is NOT passed via the session.prompt API — AGENTS.md + // is the sole source of truth for the mayor's instructions. + if (request.systemPrompt) { + await writeMayorSystemPromptToAgentsMd(workdir, request.systemPrompt); + } } else { // Resolve git credentials if missing. When the town config doesn't have // a token (common on first dispatch after rig creation), fetch one from // the Next.js server using the platform_integration_id. - const envVars = await resolveGitCredentialsIfMissing(request); + const envVars = await resolveGitCredentials(request); + + // Merge resolved credentials back into the request so buildAgentEnv + // can propagate GIT_TOKEN/GH_TOKEN to the spawned kilo serve process. + // Without this, rigs using platformIntegrationId would clone successfully + // but the agent session itself would lack git push / gh credentials. + request = { ...request, envVars }; await cloneRepo({ rigId: request.rigId, @@ -390,6 +480,8 @@ export async function runAgent(request: StartAgentRequest): Promise { console.log( `[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}` ); - console.log(`[control-server] system prompt length: ${parsed.data.systemPrompt.length}`); + console.log(`[control-server] system prompt length: ${parsed.data.systemPrompt?.length ?? 0}`); try { const agent = await runAgent(parsed.data); @@ -228,6 +228,53 @@ export function consumeStreamTicket(ticket: string): string | null { return entry.agentId; } +// POST /repos/setup +// Proactively clone a rig's repo and create a browse worktree so the +// mayor (and future agents) have immediate access to the codebase. +// Called by the TownDO when a new rig is added. +app.post('/repos/setup', async c => { + const body: unknown = await c.req.json().catch(() => null); + const parsed = SetupRepoRequest.safeParse(body); + if (!parsed.success) { + return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); + } + + const req = parsed.data; + console.log(`[control-server] /repos/setup: rigId=${req.rigId} gitUrl=${req.gitUrl}`); + + // Run in background so we return 202 immediately. + // Errors are caught and logged — never propagated as unhandled rejections. + const doSetup = async () => { + try { + // Resolve git credentials from platformIntegrationId if no token + // is present in envVars (e.g. rigs using GitHub App installations). + const envVars = await resolveGitCredentials({ + envVars: req.envVars, + platformIntegrationId: req.platformIntegrationId, + }); + + const browseDir = await setupRigBrowseWorktree({ + rigId: req.rigId, + gitUrl: req.gitUrl, + defaultBranch: req.defaultBranch, + envVars, + }); + console.log(`[control-server] /repos/setup: done rigId=${req.rigId} browse=${browseDir}`); + } catch (err) { + // Log as a warning, not an error — this is a best-effort background + // operation. The mayor and agents can still function without the + // browse worktree; it will be retried on the next agent dispatch. + const message = err instanceof Error ? err.message : String(err); + console.warn( + `[control-server] /repos/setup: FAILED for rigId=${req.rigId}: ${message.split('\n')[0]}` + ); + } + }; + doSetup().catch(() => {}); + + return c.json({ status: 'accepted', message: 'Repo setup started' }, 202); +}); + // POST /git/merge // Deterministic merge of a polecat branch into the target branch. // Called by the Rig DO's processReviewQueue → startMergeInContainer. diff --git a/cloudflare-gastown/container/src/git-manager.ts b/cloudflare-gastown/container/src/git-manager.ts index 57aec3c401..b74d425e48 100644 --- a/cloudflare-gastown/container/src/git-manager.ts +++ b/cloudflare-gastown/container/src/git-manager.ts @@ -142,6 +142,13 @@ async function exec(cmd: string, args: string[], cwd?: string): Promise cwd, stdout: 'pipe', stderr: 'pipe', + env: { + ...process.env, + // Prevent git from prompting for credentials in the container. + // Public repos clone without auth; private repos fail fast with + // a clear error instead of hanging on a username prompt. + GIT_TERMINAL_PROMPT: '0', + }, }); const exitCode = await proc.exited; @@ -246,10 +253,27 @@ async function createWorktreeInner(options: WorktreeOptions): Promise { return dir; } + // When a startPoint is provided (e.g. a convoy feature branch), create + // the new branch from that ref so the agent begins with the latest + // merged work from upstream. Without a startPoint, try to track the + // remote branch or fall back to the repo's current HEAD. + const startPoint = options.startPoint; try { - await exec('git', ['branch', '--track', options.branch, `origin/${options.branch}`], repo); + if (startPoint) { + await exec('git', ['branch', options.branch, startPoint], repo); + } else { + await exec('git', ['branch', '--track', options.branch, `origin/${options.branch}`], repo); + } } catch { - await exec('git', ['branch', options.branch], repo); + // Fall back to origin/ so we always branch from the + // latest remote tip rather than the repo's local HEAD (which may be + // stale in a --no-checkout bare clone). + const fallback = options.defaultBranch ? `origin/${options.defaultBranch}` : undefined; + if (fallback) { + await exec('git', ['branch', options.branch, fallback], repo); + } else { + await exec('git', ['branch', options.branch], repo); + } } await exec('git', ['worktree', 'add', dir, options.branch], repo); @@ -286,6 +310,64 @@ export async function listWorktrees(rigId: string): Promise { .map(line => line.replace('worktree ', '')); } +/** + * Create (or update) a read-only browse worktree for a rig on its default branch. + * This gives the mayor agent a checked-out view of the codebase at + * `/workspace/rigs//browse/` that it can navigate into via external_directory. + * + * If the browse worktree already exists, pulls latest from the remote. + */ +export function setupRigBrowseWorktree( + options: CloneOptions & { envVars?: Record } +): Promise { + return withRigLock(options.rigId, async () => { + // Ensure the repo is cloned/up-to-date first + await cloneRepoInner(options); + return setupBrowseWorktreeInner(options.rigId, options.defaultBranch); + }); +} + +async function setupBrowseWorktreeInner(rigId: string, defaultBranch: string): Promise { + validatePathSegment(rigId, 'rigId'); + const repo = await repoDir(rigId); + const browseDir = resolve(WORKSPACE_ROOT, rigId, 'browse'); + await assertInsideWorkspace(browseDir); + + if (await pathExists(browseDir)) { + // Already exists — pull latest. Log failures so callers know the + // browse worktree may contain stale code. + try { + await exec('git', ['checkout', defaultBranch], browseDir); + await exec('git', ['pull', '--rebase', '--autostash'], browseDir); + console.log(`Updated browse worktree for rig ${rigId} at ${browseDir}`); + } catch (err) { + const msg = err instanceof Error ? err.message.split('\n')[0] : String(err); + console.warn(`Browse worktree refresh failed for rig ${rigId} (may be stale): ${msg}`); + } + return browseDir; + } + + // Create a worktree on the default branch for browsing. + // Force-create (or reset) the tracking branch to origin/ + // so a recreated browse worktree always starts from the latest remote + // tip rather than a stale local ref. + const trackingBranch = `browse-${rigId.slice(0, 8)}`; + try { + await exec( + 'git', + ['branch', '--force', '--track', trackingBranch, `origin/${defaultBranch}`], + repo + ); + } catch { + // --force --track may fail on very old git; fall back to create-or-reset + await exec('git', ['branch', '-f', trackingBranch, `origin/${defaultBranch}`], repo); + } + + await exec('git', ['worktree', 'add', browseDir, trackingBranch], repo); + console.log(`Created browse worktree for rig ${rigId} at ${browseDir}`); + return browseDir; +} + export type MergeOutcome = { status: 'merged' | 'conflict'; message: string; diff --git a/cloudflare-gastown/container/src/types.ts b/cloudflare-gastown/container/src/types.ts index 55ae3847c0..70c1d46d8a 100644 --- a/cloudflare-gastown/container/src/types.ts +++ b/cloudflare-gastown/container/src/types.ts @@ -2,7 +2,7 @@ import { z } from 'zod'; // ── Agent roles (mirrors worker types) ────────────────────────────────── -export const AgentRole = z.enum(['mayor', 'polecat', 'refinery', 'witness']); +export const AgentRole = z.enum(['mayor', 'polecat', 'refinery']); export type AgentRole = z.infer; // ── Control server request/response schemas ───────────────────────────── @@ -18,13 +18,25 @@ export const StartAgentRequest = z.object({ model: z.string(), /** Lightweight model for title generation, explore subagent, etc. */ smallModel: z.string().optional(), - systemPrompt: z.string(), + systemPrompt: z.string().optional(), gitUrl: z.string(), branch: z.string(), defaultBranch: z.string(), envVars: z.record(z.string(), z.string()).optional(), /** Platform integration ID for resolving fresh git credentials at startup */ platformIntegrationId: z.string().optional(), + /** Git ref to branch from (e.g. convoy feature branch). Falls back to HEAD if absent. */ + startPoint: z.string().optional(), + /** Rig list for mayor agents — used to set up browse worktrees on fresh containers. */ + rigs: z + .array( + z.object({ + rigId: z.string(), + gitUrl: z.string(), + defaultBranch: z.string(), + }) + ) + .optional(), }); export type StartAgentRequest = z.infer; @@ -254,8 +266,24 @@ export type CloneOptions = { export type WorktreeOptions = { rigId: string; branch: string; + /** Optional start point for the new branch (e.g. 'origin/main' or a feature branch ref). */ + startPoint?: string; + /** Default branch name, used as fallback start point (e.g. 'main'). */ + defaultBranch?: string; }; +// ── Repo setup (proactive clone + browse worktree) ────────────────────── + +export const SetupRepoRequest = z.object({ + rigId: z.string().min(1), + gitUrl: z.string().min(1), + defaultBranch: z.string().min(1), + envVars: z.record(z.string(), z.string()).optional(), + /** Platform integration ID for resolving git credentials when no token is in envVars */ + platformIntegrationId: z.string().optional(), +}); +export type SetupRepoRequest = z.infer; + // ── Heartbeat ─────────────────────────────────────────────────────────── export type HeartbeatPayload = { diff --git a/cloudflare-gastown/src/db/tables/agent-metadata.table.ts b/cloudflare-gastown/src/db/tables/agent-metadata.table.ts index 7be7c62e34..cc4064cce9 100644 --- a/cloudflare-gastown/src/db/tables/agent-metadata.table.ts +++ b/cloudflare-gastown/src/db/tables/agent-metadata.table.ts @@ -1,8 +1,11 @@ import { z } from 'zod'; import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table'; -const AgentRole = z.enum(['polecat', 'refinery', 'mayor', 'witness']); -const AgentProcessStatus = z.enum(['idle', 'working', 'stalled', 'dead']); +// Accept legacy role values (e.g. 'witness' from pre-#442 towns) so that +// queries parsing through AgentMetadataRecord don't throw on old rows. +// Application code should only create the known roles below. +const AgentRole = z.enum(['polecat', 'refinery', 'mayor']).or(z.string()); +const AgentProcessStatus = z.enum(['idle', 'working', 'stalled', 'dead']).or(z.string()); export const AgentMetadataRecord = z.object({ bead_id: z.string(), @@ -32,13 +35,16 @@ export type AgentMetadataRecord = z.output; export const agent_metadata = getTableFromZodSchema('agent_metadata', AgentMetadataRecord); +// CHECK constraints are intentionally omitted — Cloudflare DO SQLite +// provides no way to alter CHECK constraints on existing tables, and +// Zod validates all values at the application layer. See #442. export function createTableAgentMetadata(): string { return getCreateTableQueryFromTable(agent_metadata, { bead_id: `text primary key references beads(bead_id)`, - role: `text not null check(role in ('polecat', 'refinery', 'mayor', 'witness'))`, + role: `text not null`, identity: `text not null unique`, container_process_id: `text`, - status: `text not null default 'idle' check(status in ('idle', 'working', 'stalled', 'dead'))`, + status: `text not null default 'idle'`, current_hook_bead_id: `text references beads(bead_id)`, dispatch_attempts: `integer not null default 0`, checkpoint: `text`, diff --git a/cloudflare-gastown/src/db/tables/beads.table.ts b/cloudflare-gastown/src/db/tables/beads.table.ts index fa7827e779..9358791614 100644 --- a/cloudflare-gastown/src/db/tables/beads.table.ts +++ b/cloudflare-gastown/src/db/tables/beads.table.ts @@ -110,17 +110,20 @@ export type BeadRecordWithMetadata = z.output; export const beads = getTableFromZodSchema('beads', BeadRecord); +// CHECK constraints are intentionally omitted — Cloudflare DO SQLite +// provides no way to alter CHECK constraints on existing tables, and +// Zod validates all values at the application layer. See #442. export function createTableBeads(): string { return getCreateTableQueryFromTable(beads, { bead_id: `text primary key`, - type: `text not null check(type in ('issue', 'message', 'escalation', 'merge_request', 'convoy', 'molecule', 'agent'))`, - status: `text not null default 'open' check(status in ('open', 'in_progress', 'closed', 'failed'))`, + type: `text not null`, + status: `text not null default 'open'`, title: `text not null`, body: `text`, rig_id: `text`, parent_bead_id: `text references beads(bead_id)`, assignee_agent_bead_id: `text`, - priority: `text default 'medium' check(priority in ('low', 'medium', 'high', 'critical'))`, + priority: `text default 'medium'`, labels: `text default '[]'`, metadata: `text default '{}'`, created_by: `text`, diff --git a/cloudflare-gastown/src/db/tables/rig-agents.table.ts b/cloudflare-gastown/src/db/tables/rig-agents.table.ts index 4f3dfc8a20..cab0311450 100644 --- a/cloudflare-gastown/src/db/tables/rig-agents.table.ts +++ b/cloudflare-gastown/src/db/tables/rig-agents.table.ts @@ -1,7 +1,7 @@ import { z } from 'zod'; import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table'; -const AgentRole = z.enum(['polecat', 'refinery', 'mayor', 'witness']); +const AgentRole = z.enum(['polecat', 'refinery', 'mayor']); const AgentStatus = z.enum(['idle', 'working', 'blocked', 'dead']); export const RigAgentRecord = z.object({ @@ -31,10 +31,10 @@ export function createTableRigAgents(): string { return getCreateTableQueryFromTable(rig_agents, { id: `text primary key`, rig_id: `text`, - role: `text not null check(role in ('polecat', 'refinery', 'mayor', 'witness'))`, + role: `text not null`, name: `text not null`, identity: `text not null unique`, - status: `text not null default 'idle' check(status in ('idle', 'working', 'blocked', 'dead'))`, + status: `text not null default 'idle'`, current_hook_bead_id: `text references rig_beads(id)`, dispatch_attempts: `integer not null default 0`, last_activity_at: `text`, diff --git a/cloudflare-gastown/src/dos/GastownUser.do.ts b/cloudflare-gastown/src/dos/GastownUser.do.ts index d0c7730a27..a6759ad184 100644 --- a/cloudflare-gastown/src/dos/GastownUser.do.ts +++ b/cloudflare-gastown/src/dos/GastownUser.do.ts @@ -2,9 +2,13 @@ import { DurableObject } from 'cloudflare:workers'; import { createTableUserTowns, user_towns, UserTownRecord } from '../db/tables/user-towns.table'; import { createTableUserRigs, user_rigs, UserRigRecord } from '../db/tables/user-rigs.table'; import { query } from '../util/query.util'; +import { getTownDOStub } from './Town.do'; const USER_LOG = '[GastownUser.do]'; +/** Health watchdog interval — check town alarms every 5 minutes */ +const WATCHDOG_INTERVAL_MS = 5 * 60_000; + function generateId(): string { return crypto.randomUUID(); } @@ -49,6 +53,9 @@ export class GastownUserDO extends DurableObject { private async initializeDatabase(): Promise { query(this.sql, createTableUserTowns(), []); query(this.sql, createTableUserRigs(), []); + // Arm the watchdog on every initialization so existing users (who + // created towns before the watchdog was added) get health checks. + await this.armWatchdogIfNeeded(); } // ── Towns ───────────────────────────────────────────────────────────── @@ -79,6 +86,9 @@ export class GastownUserDO extends DurableObject { // TODO: Should create the Town DO now, call setTownId, and then some function like ensureContainer // In the background, this way the town will likely be ready to go when the user gets to the UI + // Arm the health watchdog so it starts checking this town's alarm + await this.armWatchdogIfNeeded(); + return town; } @@ -223,6 +233,57 @@ export class GastownUserDO extends DurableObject { async ping(): Promise { return 'pong'; } + + // ── Health Watchdog ─────────────────────────────────────────────────── + + /** + * Arm the watchdog alarm if this user has any towns. Called after + * creating a town to ensure the watchdog runs. + */ + private async armWatchdogIfNeeded(): Promise { + const currentAlarm = await this.ctx.storage.getAlarm(); + if (!currentAlarm || currentAlarm < Date.now()) { + const towns = UserTownRecord.array().parse([ + ...query(this.sql, /* sql */ `SELECT * FROM ${user_towns} LIMIT 1`, []), + ]); + if (towns.length > 0) { + await this.ctx.storage.setAlarm(Date.now() + WATCHDOG_INTERVAL_MS); + } + } + } + + /** + * Watchdog alarm: periodically ping each town's TownDO to verify its + * alarm is firing and re-arm it if not. This is the external observer + * that catches a silently broken alarm handler. + * + * See #442 — replaces the Boot agent's role from local Gastown. + */ + async alarm(): Promise { + await this.ensureInitialized(); + const towns = UserTownRecord.array().parse([ + ...query(this.sql, /* sql */ `SELECT * FROM ${user_towns}`, []), + ]); + + if (towns.length === 0) return; + + console.log(`${USER_LOG} watchdog: checking ${towns.length} town(s)`); + + for (const town of towns) { + try { + const townStub = getTownDOStub(this.env, town.id); + const health = await townStub.healthCheck(); + if (!health.alarmSet) { + console.warn(`${USER_LOG} watchdog: re-armed alarm for town=${town.id} (was missing)`); + } + } catch (err) { + console.error(`${USER_LOG} watchdog: healthCheck failed for town=${town.id}:`, err); + } + } + + // Re-arm the watchdog + await this.ctx.storage.setAlarm(Date.now() + WATCHDOG_INTERVAL_MS); + } } export function getGastownUserStub(env: Env, userId: string) { diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 5d3cb2cab0..020e0ac0e2 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -24,6 +24,7 @@ import * as reviewQueue from './town/review-queue'; import * as config from './town/config'; import * as rigs from './town/rigs'; import * as dispatch from './town/container-dispatch'; +import * as patrol from './town/patrol'; import { GitHubPRStatusSchema, GitLabMRStatusSchema } from '../util/platform-pr.util'; // Table imports for beads-centric operations @@ -67,11 +68,39 @@ import type { const TOWN_LOG = '[Town.do]'; +/** Format a bead_events row into a human-readable message for the status feed. */ +function formatEventMessage(row: Record): string { + const eventType = String(row.event_type ?? ''); + const beadTitle = row.bead_title ? String(row.bead_title) : null; + const newValue = row.new_value ? String(row.new_value) : null; + const agentId = row.agent_id ? String(row.agent_id).slice(0, 8) : null; + const beadId = row.bead_id ? String(row.bead_id).slice(0, 8) : null; + + const target = beadTitle ? `"${beadTitle}"` : beadId ? `bead ${beadId}…` : 'unknown'; + const actor = agentId ? `agent ${agentId}…` : 'system'; + + switch (eventType) { + case 'status_changed': + return `${target} → ${newValue ?? '?'} (by ${actor})`; + case 'assigned': + return `${target} assigned to ${actor}`; + case 'pr_created': + return `PR created for ${target}`; + case 'pr_merged': + return `PR merged for ${target}`; + case 'pr_creation_failed': + return `PR creation failed for ${target}`; + case 'escalation_created': + return `Escalation created: ${target}`; + default: + return `${eventType}: ${target}`; + } +} + // Alarm intervals const ACTIVE_ALARM_INTERVAL_MS = 5_000; // 5s when agents are active const IDLE_ALARM_INTERVAL_MS = 1 * 60_000; // 1m when idle const DISPATCH_COOLDOWN_MS = 2 * 60_000; // 2 min — skip agents with recent dispatch activity -const GUPP_THRESHOLD_MS = 30 * 60_000; // 30 min const MAX_DISPATCH_ATTEMPTS = 5; // Escalation constants @@ -189,6 +218,86 @@ export class TownDO extends DurableObject { }); } + // ── WebSocket: status broadcast ────────────────────────────────────── + + /** + * Handle HTTP requests to the DO. Only used for the /status/ws + * WebSocket upgrade — all other requests use RPC methods. + */ + async fetch(request: Request): Promise { + const url = new URL(request.url); + if ( + url.pathname.endsWith('/status/ws') && + request.headers.get('Upgrade')?.toLowerCase() === 'websocket' + ) { + await this.ensureInitialized(); + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + this.ctx.acceptWebSocket(server, ['status']); + + // Send an initial snapshot immediately so the client doesn't + // wait for the next alarm tick. + try { + const snapshot = await this.getAlarmStatus(); + server.send(JSON.stringify(snapshot)); + } catch { + // Best-effort — snapshot will come on next tick + } + + return new Response(null, { status: 101, webSocket: client }); + } + + return new Response('Not found', { status: 404 }); + } + + /** Called by the runtime when a hibernated WebSocket receives a message. */ + async webSocketMessage(_ws: WebSocket, _message: string | ArrayBuffer): Promise { + // Status WebSocket is server-push only — ignore client messages. + } + + /** Called by the runtime when a hibernated WebSocket is closed. */ + async webSocketClose( + ws: WebSocket, + _code: number, + _reason: string, + _wasClean: boolean + ): Promise { + try { + ws.close(); + } catch { + // Already closed + } + } + + /** Called by the runtime when a hibernated WebSocket errors. */ + async webSocketError(ws: WebSocket, _error: unknown): Promise { + try { + ws.close(1011, 'WebSocket error'); + } catch { + // Already closed + } + } + + /** + * Broadcast the alarm status snapshot to all connected status WebSocket + * clients. Called at the end of each alarm tick. + */ + private broadcastAlarmStatus(snapshot: Awaited>): void { + const sockets = this.ctx.getWebSockets('status'); + if (sockets.length === 0) return; + + const payload = JSON.stringify(snapshot); + for (const ws of sockets) { + try { + ws.send(payload); + } catch { + // Client disconnected — will be cleaned up by webSocketClose + } + } + } + + // ── Initialization ────────────────────────────────────────────────── + private async ensureInitialized(): Promise { if (!this.initPromise) { this.initPromise = this.initializeDatabase(); @@ -331,6 +440,63 @@ export class TownDO extends DurableObject { } catch { // Container may take a moment to start — the alarm will retry } + + // Proactively clone the rig's repo and create a browse worktree so + // the mayor has immediate access to the codebase without waiting for + // the first agent dispatch. + this.setupRigRepoInContainer(rigConfig).catch(err => + console.warn(`${TOWN_LOG} configureRig: background repo setup failed:`, err) + ); + } + + /** + * Tell the container to clone a rig's repo and create a browse worktree. + * Fire-and-forget — failures are logged but don't block the caller. + */ + private async setupRigRepoInContainer(rigConfig: RigConfig): Promise { + const townConfig = await this.getTownConfig(); + const envVars: Record = {}; + if (townConfig.git_auth?.github_token) { + envVars.GIT_TOKEN = townConfig.git_auth.github_token; + } + if (townConfig.git_auth?.gitlab_token) { + envVars.GITLAB_TOKEN = townConfig.git_auth.gitlab_token; + } + if (townConfig.git_auth?.gitlab_instance_url) { + envVars.GITLAB_INSTANCE_URL = townConfig.git_auth.gitlab_instance_url; + } + // resolveGitCredentials in the container needs KILOCODE_TOKEN to + // authenticate against the credential API for platform integrations. + const kilocodeToken = rigConfig.kilocodeToken ?? townConfig.kilocode_token; + if (kilocodeToken) { + envVars.KILOCODE_TOKEN = kilocodeToken; + } + + const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env); + const container = getTownContainerStub(this.env, this.townId); + const response = await container.fetch('http://container/repos/setup', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Town-Config': JSON.stringify(containerConfig), + }, + body: JSON.stringify({ + rigId: rigConfig.rigId, + gitUrl: rigConfig.gitUrl, + defaultBranch: rigConfig.defaultBranch, + envVars: Object.keys(envVars).length > 0 ? envVars : undefined, + platformIntegrationId: rigConfig.platformIntegrationId, + }), + }); + + if (!response.ok) { + const text = await response.text().catch(() => '(unreadable)'); + console.warn( + `${TOWN_LOG} setupRigRepoInContainer: failed for rig=${rigConfig.rigId}: ${response.status} ${text.slice(0, 200)}` + ); + } else { + console.log(`${TOWN_LOG} setupRigRepoInContainer: accepted for rig=${rigConfig.rigId}`); + } } async getRigConfig(rigId: string): Promise { @@ -565,6 +731,199 @@ export class TownDO extends DurableObject { } } + /** + * Resolve a triage_request bead. Called by the triage agent via the + * gt_triage_resolve tool. Applies the chosen action, closes the + * triage request, and logs the resolution. + */ + async resolveTriage(input: { + agent_id: string; + triage_request_bead_id: string; + action: string; + resolution_notes: string; + }): Promise { + await this.ensureInitialized(); + const triageBead = beadOps.getBead(this.sql, input.triage_request_bead_id); + if (!triageBead) + throw new Error(`Triage request bead ${input.triage_request_bead_id} not found`); + if (!triageBead.labels.includes(patrol.TRIAGE_REQUEST_LABEL)) { + throw new Error(`Bead ${input.triage_request_bead_id} is not a triage request`); + } + if (triageBead.status !== 'open') { + throw new Error( + `Triage request ${input.triage_request_bead_id} is already ${triageBead.status} — cannot resolve again` + ); + } + + // ── Apply the chosen action ──────────────────────────────────── + const targetAgentId = + typeof triageBead.metadata?.agent_bead_id === 'string' + ? triageBead.metadata.agent_bead_id + : null; + // Use the hooked bead ID captured when the triage request was created, + // not the agent's current hook (which may have changed since then). + const snapshotHookedBeadId = + typeof triageBead.metadata?.hooked_bead_id === 'string' + ? triageBead.metadata.hooked_bead_id + : null; + const action = input.action.toUpperCase(); + + if (targetAgentId) { + const targetAgent = agents.getAgent(this.sql, targetAgentId); + + switch (action) { + case 'RESTART': + case 'RESTART_WITH_BACKOFF': { + // Stop the agent in the container, reset to idle so the + // scheduler picks it up again on the next alarm cycle. + if (targetAgent?.status === 'working' || targetAgent?.status === 'stalled') { + dispatch.stopAgentInContainer(this.env, this.townId, targetAgentId).catch(() => {}); + } + if (targetAgent) { + // RESTART clears last_activity_at so the scheduler picks it + // up immediately. RESTART_WITH_BACKOFF sets it to now() so + // the dispatch cooldown (DISPATCH_COOLDOWN_MS) delays the + // next attempt, preventing immediate restart of crash loops. + const activityAt = action === 'RESTART_WITH_BACKOFF' ? now() : null; + query( + this.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.last_activity_at} = ? + WHERE ${agent_metadata.bead_id} = ? + `, + [activityAt, targetAgentId] + ); + } + break; + } + case 'CLOSE_BEAD': { + // Stop the agent in the container first so it can't push stale + // changes after the bead is marked failed. + if (targetAgent?.status === 'working' || targetAgent?.status === 'stalled') { + dispatch.stopAgentInContainer(this.env, this.townId, targetAgentId).catch(() => {}); + } + // Fail the bead that was hooked when the triage request was + // created (not the agent's current hook, which may differ). + const beadToClose = snapshotHookedBeadId ?? targetAgent?.current_hook_bead_id; + if (beadToClose) { + beadOps.updateBeadStatus(this.sql, beadToClose, 'failed', input.agent_id); + // Only unhook if the agent is still hooked to this bead + if (targetAgent?.current_hook_bead_id === beadToClose) { + agents.unhookBead(this.sql, targetAgentId); + } + } + break; + } + case 'ESCALATE_TO_MAYOR': + case 'ESCALATE': { + const message = input.resolution_notes || triageBead.title || 'Triage escalation'; + this.sendMayorMessage(`[Triage Escalation] ${message}`).catch(err => + console.warn(`${TOWN_LOG} resolveTriage: mayor notification failed:`, err) + ); + break; + } + case 'NUDGE': { + // Send a nudge message to the stuck agent + if (targetAgent) { + mail.sendMail(this.sql, { + from_agent_id: 'patrol', + to_agent_id: targetAgentId, + subject: 'TRIAGE_NUDGE', + body: + input.resolution_notes || + 'The triage system has flagged you as potentially stuck. Please report your status.', + }); + } + break; + } + case 'REASSIGN_BEAD': { + // Stop the agent in the container before reassigning so the old + // polecat can't push changes for work being handed off. + if (targetAgent?.status === 'working' || targetAgent?.status === 'stalled') { + dispatch.stopAgentInContainer(this.env, this.townId, targetAgentId).catch(() => {}); + } + // Target the bead from the triage snapshot, not the agent's current hook. + const beadToReassign = snapshotHookedBeadId ?? targetAgent?.current_hook_bead_id; + if (beadToReassign) { + // Only unhook if the agent is still hooked to this bead + if (targetAgent?.current_hook_bead_id === beadToReassign) { + agents.unhookBead(this.sql, targetAgentId); + } + // Reset the bead to open so the scheduler can re-assign it + query( + this.sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.assignee_agent_bead_id} = NULL, + ${beads.columns.status} = 'open', + ${beads.columns.updated_at} = ? + WHERE ${beads.bead_id} = ? + AND ${beads.status} != 'closed' + AND ${beads.status} != 'failed' + `, + [now(), beadToReassign] + ); + } + break; + } + // DISCARD, PROVIDE_GUIDANCE, and other informational actions + // are handled by the triage agent itself via gt_mail_send + // before calling gt_triage_resolve — no server-side effect needed. + default: + break; + } + } + + // ── Close the triage request bead with resolution metadata ────── + const timestamp = now(); + query( + this.sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.status} = 'closed', + ${beads.columns.closed_at} = ?, + ${beads.columns.updated_at} = ?, + ${beads.columns.metadata} = json_set( + COALESCE(${beads.metadata}, '{}'), + '$.resolution_action', ?, + '$.resolution_notes', ?, + '$.resolved_by', ? + ) + WHERE ${beads.bead_id} = ? + `, + [ + timestamp, + timestamp, + input.action, + input.resolution_notes, + input.agent_id, + input.triage_request_bead_id, + ] + ); + + beadOps.logBeadEvent(this.sql, { + beadId: input.triage_request_bead_id, + agentId: input.agent_id, + eventType: 'status_changed', + oldValue: triageBead.status, + newValue: 'closed', + metadata: { + action: input.action, + resolution_notes: input.resolution_notes, + }, + }); + + console.log( + `${TOWN_LOG} resolveTriage: bead=${input.triage_request_bead_id} action=${input.action}` + ); + + const updated = beadOps.getBead(this.sql, input.triage_request_bead_id); + if (!updated) throw new Error('Triage bead not found after update'); + return updated; + } + async createMolecule(beadId: string, formula: unknown): Promise { await this.ensureInitialized(); return reviewQueue.createMolecule(this.sql, beadId, formula); @@ -620,6 +979,15 @@ export class TownDO extends DurableObject { return { bead, agent: hookedAgent }; } + /** Build the rig list for mayor agent startup (browse worktree setup on fresh containers). */ + private rigListForMayor(): Array<{ rigId: string; gitUrl: string; defaultBranch: string }> { + return rigs.listRigs(this.sql).map(r => ({ + rigId: r.id, + gitUrl: r.git_url, + defaultBranch: r.default_branch, + })); + } + // ══════════════════════════════════════════════════════════════════ // Mayor (just another agent) // ══════════════════════════════════════════════════════════════════ @@ -687,6 +1055,7 @@ export class TownDO extends DurableObject { defaultBranch: rigConfig?.defaultBranch ?? 'main', kilocodeToken, townConfig, + rigs: this.rigListForMayor(), }); if (started) { @@ -769,6 +1138,7 @@ export class TownDO extends DurableObject { defaultBranch: rigConfig?.defaultBranch ?? 'main', kilocodeToken, townConfig, + rigs: this.rigListForMayor(), }); if (started) { @@ -1552,6 +1922,11 @@ export class TownDO extends DurableObject { } catch (err) { console.error(`${TOWN_LOG} alarm: witnessPatrol failed`, err); } + try { + this.deaconPatrol(); + } catch (err) { + console.error(`${TOWN_LOG} alarm: deaconPatrol failed`, err); + } try { await this.deliverPendingMail(); } catch (err) { @@ -1562,11 +1937,24 @@ export class TownDO extends DurableObject { } catch (err) { console.warn(`${TOWN_LOG} alarm: reEscalation failed`, err); } + try { + await this.maybeDispatchTriageAgent(); + } catch (err) { + console.warn(`${TOWN_LOG} alarm: maybeDispatchTriageAgent failed`, err); + } // Re-arm: fast when active, slow when idle const active = this.hasActiveWork(); const interval = active ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; await this.ctx.storage.setAlarm(Date.now() + interval); + + // Broadcast status snapshot to connected WebSocket clients + try { + const snapshot = await this.getAlarmStatus(); + this.broadcastAlarmStatus(snapshot); + } catch (err) { + console.warn(`${TOWN_LOG} alarm: status broadcast failed`, err); + } } private hasActiveWork(): boolean { @@ -1591,10 +1979,18 @@ export class TownDO extends DurableObject { [] ), ]; + const pendingTriageRows = [ + ...query( + this.sql, + /* sql */ `SELECT COUNT(*) as cnt FROM ${beads} WHERE ${beads.type} = 'issue' AND ${beads.labels} LIKE ? AND ${beads.status} = 'open'`, + [patrol.TRIAGE_LABEL_LIKE] + ), + ]; return ( Number(activeAgentRows[0]?.cnt ?? 0) > 0 || Number(pendingBeadRows[0]?.cnt ?? 0) > 0 || - Number(pendingReviewRows[0]?.cnt ?? 0) > 0 + Number(pendingReviewRows[0]?.cnt ?? 0) > 0 || + Number(pendingTriageRows[0]?.cnt ?? 0) > 0 ); } @@ -1637,18 +2033,24 @@ export class TownDO extends DurableObject { beadOps.updateBeadStatus(this.sql, bead.bead_id, 'in_progress', agent.id); } - // Mark dispatch in progress: set last_activity_at so schedulePendingWork - // skips this agent while the container start is in flight, and bump - // dispatch_attempts for the retry budget. + // Set status to 'working' BEFORE the async container start. This + // must happen synchronously so the SQL write executes while the I/O + // gate is still open. When dispatchAgent is called fire-and-forget + // (from slingBead, slingConvoy, dispatchUnblockedBeads), any SQL + // writes after the first `await` may be silently dropped because + // the DO's RPC response closes the I/O gate. If the container fails + // to start, we roll back to 'idle'. + const timestamp = now(); query( this.sql, /* sql */ ` UPDATE ${agent_metadata} - SET ${agent_metadata.columns.dispatch_attempts} = ${agent_metadata.columns.dispatch_attempts} + 1, + SET ${agent_metadata.columns.status} = 'working', + ${agent_metadata.columns.dispatch_attempts} = ${agent_metadata.columns.dispatch_attempts} + 1, ${agent_metadata.columns.last_activity_at} = ? WHERE ${agent_metadata.bead_id} = ? `, - [now(), agent.id] + [timestamp, agent.id] ); const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { @@ -1672,23 +2074,52 @@ export class TownDO extends DurableObject { }); if (started) { - const timestamp = now(); + // Reset dispatch_attempts on success (best-effort — may be + // dropped if the I/O gate is already closed, but that's fine + // because the agent is already 'working'). query( this.sql, /* sql */ ` UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'working', - ${agent_metadata.columns.dispatch_attempts} = 0, - ${agent_metadata.columns.last_activity_at} = ? + SET ${agent_metadata.columns.dispatch_attempts} = 0 WHERE ${agent_metadata.bead_id} = ? `, - [timestamp, agent.id] + [agent.id] ); console.log(`${TOWN_LOG} dispatchAgent: started agent=${agent.name}(${agent.id})`); + } else { + // Container failed to start — roll back to idle + query( + this.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle' + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.id] + ); } return started; } catch (err) { console.error(`${TOWN_LOG} dispatchAgent: failed for agent=${agent.id}:`, err); + // Roll back agent and bead to prevent them from being stuck in + // working/in_progress state when the container call throws. + try { + query( + this.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle' + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.id] + ); + if (agent.current_hook_bead_id) { + beadOps.updateBeadStatus(this.sql, agent.current_hook_bead_id, 'open', agent.id); + } + } catch (rollbackErr) { + console.error(`${TOWN_LOG} dispatchAgent: rollback also failed:`, rollbackErr); + } return false; } } @@ -1802,12 +2233,17 @@ export class TownDO extends DurableObject { } /** - * Witness patrol: detect dead/stale agents, orphaned beads. + * Witness patrol: detect dead/stale agents, GUPP violations with + * tiered escalation, orphaned work, agent GC, per-bead timeouts. + * + * Mechanical checks run as deterministic code. Ambiguous situations + * produce triage_request beads for the on-demand triage agent. + * See #442. */ private async witnessPatrol(): Promise { const townId = this.townId; - const guppThreshold = new Date(Date.now() - GUPP_THRESHOLD_MS).toISOString(); + // ── Zombie detection (container status reconciliation) ────────── const WorkingAgentRow = AgentMetadataRecord.pick({ bead_id: true, current_hook_bead_id: true, @@ -1827,7 +2263,6 @@ export class TownDO extends DurableObject { for (const working of workingAgents) { const agentId = working.bead_id; - const lastActivity = working.last_activity_at; const containerInfo = await dispatch.checkAgentContainerStatus(this.env, townId, agentId); @@ -1838,41 +2273,193 @@ export class TownDO extends DurableObject { } query( this.sql, - /* sql */ `UPDATE ${agent_metadata} SET ${agent_metadata.columns.status} = 'idle', ${agent_metadata.columns.last_activity_at} = ? WHERE ${agent_metadata.bead_id} = ?`, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.last_activity_at} = ? + WHERE ${agent_metadata.bead_id} = ? + `, [now(), agentId] ); continue; } + } - // GUPP violation check - if (lastActivity && lastActivity < guppThreshold) { - // Check for existing GUPP mail - const existingGupp = [ - ...query( - this.sql, - /* sql */ ` - SELECT ${beads.bead_id} FROM ${beads} - WHERE ${beads.type} = 'message' - AND ${beads.assignee_agent_bead_id} = ? - AND ${beads.title} = 'GUPP_CHECK' - AND ${beads.status} = 'open' - LIMIT 1 - `, - [agentId] - ), - ]; - if (existingGupp.length === 0) { - mail.sendMail(this.sql, { - from_agent_id: 'witness', - to_agent_id: agentId, - subject: 'GUPP_CHECK', - body: 'You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.', - }); - } + // ── Tiered GUPP violation handling ────────────────────────────── + // Re-query to get the current set of working agents (some may have + // been reset to idle by zombie detection above). + const currentWorking = WorkingAgentRow.array().parse([ + ...query( + this.sql, + /* sql */ ` + SELECT ${agent_metadata.bead_id}, ${agent_metadata.current_hook_bead_id}, ${agent_metadata.last_activity_at} + FROM ${agent_metadata} + WHERE ${agent_metadata.status} IN ('working', 'stalled') + `, + [] + ), + ]); + + const forceStopIds = patrol.detectGUPPViolations(this.sql, currentWorking); + + // Force-stop agents in the container (best-effort) + for (const agentId of forceStopIds) { + dispatch + .stopAgentInContainer(this.env, townId, agentId) + .catch(err => + console.warn(`${TOWN_LOG} witnessPatrol: force-stop failed for agent=${agentId}`, err) + ); + } + + // ── Orphaned work detection ──────────────────────────────────── + patrol.detectOrphanedWork(this.sql); + + // ── Agent garbage collection ─────────────────────────────────── + const gcCount = patrol.agentGC(this.sql); + if (gcCount > 0) { + // Clean up AgentDO storage for GC'd agents (best-effort) + console.log(`${TOWN_LOG} witnessPatrol: GC'd ${gcCount} agent(s)`); + } + + // ── Per-bead timeout enforcement ─────────────────────────────── + const timedOut = patrol.checkTimerGates(this.sql); + for (const { agentId } of timedOut) { + if (agentId) { + dispatch + .stopAgentInContainer(this.env, townId, agentId) + .catch(err => + console.warn( + `${TOWN_LOG} witnessPatrol: failed to stop timed-out agent=${agentId}`, + err + ) + ); } } } + /** + * Deacon patrol: stale hook detection, stranded convoy feeding, + * crash loop detection. + * + * Mechanical checks that complement the witness patrol. See #442. + */ + private deaconPatrol(): void { + // ── Stale hook detection ─────────────────────────────────────── + patrol.detectStaleHooks(this.sql); + + // ── Stranded convoy feeding ──────────────────────────────────── + patrol.feedStrandedConvoys(this.sql, this.townId); + + // ── Crash loop detection ─────────────────────────────────────── + patrol.detectCrashLoops(this.sql); + } + + /** + * If triage_request beads are queued, dispatch a short-lived triage + * agent to process them. The triage agent gets a focused prompt with + * the pending requests and a narrow tool set. + * + * Skips dispatch if a triage agent is already working. + */ + private async maybeDispatchTriageAgent(): Promise { + const pendingCount = patrol.countPendingTriageRequests(this.sql); + if (pendingCount === 0) return; + + // Check if a triage batch bead is already in progress (meaning a + // triage agent is working). We can't filter by role since triage + // uses polecat role; instead check for an open gt:triage batch bead. + const existingBatch = [ + ...query( + this.sql, + /* sql */ ` + SELECT ${beads.bead_id} FROM ${beads} + WHERE ${beads.type} = 'issue' + AND ${beads.labels} LIKE ? + AND ${beads.status} IN ('open', 'in_progress') + AND ${beads.created_by} = 'patrol' + LIMIT 1 + `, + [patrol.TRIAGE_LABEL_LIKE.replace(patrol.TRIAGE_REQUEST_LABEL, patrol.TRIAGE_BATCH_LABEL)] + ), + ]; + if (existingBatch.length > 0) { + console.log( + `${TOWN_LOG} maybeDispatchTriageAgent: triage agent already working, skipping (${pendingCount} pending)` + ); + return; + } + + // Validate preconditions before creating any beads to avoid + // leaked phantom issue beads on early-return paths. + const rigList = rigs.listRigs(this.sql); + if (rigList.length === 0) { + console.warn(`${TOWN_LOG} maybeDispatchTriageAgent: no rigs available, skipping`); + return; + } + const rigId = rigList[0].id; + + const rigConfig = await this.getRigConfig(rigId); + if (!rigConfig) { + console.warn(`${TOWN_LOG} maybeDispatchTriageAgent: no rig config for rig=${rigId}`); + return; + } + + console.log( + `${TOWN_LOG} maybeDispatchTriageAgent: ${pendingCount} pending triage request(s), dispatching agent` + ); + + const townConfig = await this.getTownConfig(); + const kilocodeToken = await this.resolveKilocodeToken(); + + // Build the triage prompt from pending requests + const pendingRequests = patrol.listPendingTriageRequests(this.sql); + const { buildTriageSystemPrompt } = await import('../prompts/triage-system.prompt'); + const systemPrompt = buildTriageSystemPrompt(pendingRequests); + + // Only now create the synthetic bead — preconditions are verified. + const triageBead = beadOps.createBead(this.sql, { + type: 'issue', + title: `Triage batch: ${pendingCount} request(s)`, + body: 'Process all pending triage request beads and resolve each one.', + priority: 'high', + labels: [patrol.TRIAGE_BATCH_LABEL], + created_by: 'patrol', + }); + + const triageAgent = agents.getOrCreateAgent(this.sql, 'polecat', rigId, this.townId); + agents.hookBead(this.sql, triageAgent.id, triageBead.bead_id); + + const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { + townId: this.townId, + rigId, + userId: rigConfig.userId, + agentId: triageAgent.id, + agentName: triageAgent.name, + role: 'polecat', + identity: triageAgent.identity, + beadId: triageBead.bead_id, + beadTitle: triageBead.title, + beadBody: triageBead.body ?? '', + checkpoint: null, + gitUrl: rigConfig.gitUrl, + defaultBranch: rigConfig.defaultBranch, + kilocodeToken, + townConfig, + systemPromptOverride: systemPrompt, + platformIntegrationId: rigConfig.platformIntegrationId, + }); + + if (started) { + // Mark the agent as working so the duplicate-guard on the next + // alarm tick sees it and skips dispatch. + agents.updateAgentStatus(this.sql, triageAgent.id, 'working'); + } else { + agents.unhookBead(this.sql, triageAgent.id); + beadOps.updateBeadStatus(this.sql, triageBead.bead_id, 'failed', triageAgent.id); + console.error(`${TOWN_LOG} maybeDispatchTriageAgent: triage agent failed to start`); + } + } + /** * Push undelivered mail to agents that are currently running in the * container. For each working agent with open message beads, we format @@ -2009,6 +2596,10 @@ export class TownDO extends DurableObject { // polecat assignee preserved. agents.hookBead(this.sql, refineryAgent.id, entry.id); + // Mark as working before the async container start (same I/O gate + // rationale as dispatchAgent — see comment there). + agents.updateAgentStatus(this.sql, refineryAgent.id, 'working'); + const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { townId: this.townId, rigId, @@ -2445,6 +3036,234 @@ export class TownDO extends DurableObject { // Cleanup // ══════════════════════════════════════════════════════════════════ + /** + * Health check: verify the alarm is set and return basic town status. + * Called by the GastownUserDO watchdog alarm to ensure each town's + * alarm loop is firing. Re-arms the alarm if it's missing. + */ + async healthCheck(): Promise<{ + townId: string; + alarmSet: boolean; + activeAgents: number; + pendingBeads: number; + }> { + await this.ensureInitialized(); + const townId = this.townId; + + // Check if alarm is set + const currentAlarm = await this.ctx.storage.getAlarm(); + const alarmSet = currentAlarm !== null && currentAlarm > Date.now(); + + // Re-arm if missing — this is the whole point of the watchdog + if (!alarmSet) { + console.warn(`${TOWN_LOG} healthCheck: alarm not set for town=${townId}, re-arming`); + await this.ctx.storage.setAlarm(Date.now() + ACTIVE_ALARM_INTERVAL_MS); + } + + const activeAgents = Number( + [ + ...query( + this.sql, + /* sql */ `SELECT COUNT(*) AS cnt FROM ${agent_metadata} WHERE ${agent_metadata.status} IN ('working', 'stalled')`, + [] + ), + ][0]?.cnt ?? 0 + ); + + const pendingBeads = Number( + [ + ...query( + this.sql, + /* sql */ `SELECT COUNT(*) AS cnt FROM ${beads} WHERE ${beads.status} IN ('open', 'in_progress') AND ${beads.type} NOT IN ('agent', 'message')`, + [] + ), + ][0]?.cnt ?? 0 + ); + + return { townId, alarmSet, activeAgents, pendingBeads }; + } + + /** + * Return a structured snapshot of the alarm loop and patrol state + * for the dashboard Status tab. + */ + async getAlarmStatus(): Promise<{ + alarm: { + nextFireAt: string | null; + intervalMs: number; + intervalLabel: string; + }; + agents: { + working: number; + idle: number; + stalled: number; + dead: number; + total: number; + }; + beads: { + open: number; + inProgress: number; + failed: number; + triageRequests: number; + }; + patrol: { + guppWarnings: number; + guppEscalations: number; + stalledAgents: number; + orphanedHooks: number; + }; + recentEvents: Array<{ + time: string; + type: string; + message: string; + }>; + }> { + await this.ensureInitialized(); + + const currentAlarm = await this.ctx.storage.getAlarm(); + const active = this.hasActiveWork(); + const intervalMs = active ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; + + // Agent counts by status + const agentRows = [ + ...query( + this.sql, + /* sql */ ` + SELECT ${agent_metadata.status} AS status, COUNT(*) AS cnt + FROM ${agent_metadata} + GROUP BY ${agent_metadata.status} + `, + [] + ), + ]; + const agentCounts = { working: 0, idle: 0, stalled: 0, dead: 0, total: 0 }; + for (const row of agentRows) { + const s = String(row.status); + const c = Number(row.cnt); + if (s in agentCounts) (agentCounts as Record)[s] = c; + agentCounts.total += c; + } + + // Bead counts + const beadRows = [ + ...query( + this.sql, + /* sql */ ` + SELECT ${beads.status} AS status, COUNT(*) AS cnt + FROM ${beads} + WHERE ${beads.type} NOT IN ('agent', 'message') + GROUP BY ${beads.status} + `, + [] + ), + ]; + const beadCounts = { open: 0, inProgress: 0, failed: 0, triageRequests: 0 }; + for (const row of beadRows) { + const s = String(row.status); + const c = Number(row.cnt); + if (s === 'open') beadCounts.open = c; + else if (s === 'in_progress') beadCounts.inProgress = c; + else if (s === 'failed') beadCounts.failed = c; + } + + // Triage request count (issue beads with gt:triage-request label) + beadCounts.triageRequests = patrol.countPendingTriageRequests(this.sql); + + // Patrol indicators — count active warnings/issues + const guppWarnings = Number( + [ + ...query( + this.sql, + /* sql */ ` + SELECT COUNT(*) AS cnt FROM ${beads} + WHERE ${beads.type} = 'message' + AND ${beads.title} = 'GUPP_CHECK' + AND ${beads.status} = 'open' + `, + [] + ), + ][0]?.cnt ?? 0 + ); + + const guppEscalations = Number( + [ + ...query( + this.sql, + /* sql */ ` + SELECT COUNT(*) AS cnt FROM ${beads} + WHERE ${beads.type} = 'message' + AND ${beads.title} = 'GUPP_ESCALATION' + AND ${beads.status} = 'open' + `, + [] + ), + ][0]?.cnt ?? 0 + ); + + const stalledAgents = agentCounts.stalled; + + // Only count idle+hooked agents as orphaned if they've been idle for + // longer than the dispatch cooldown. Agents that were just hooked by + // feedStrandedConvoys or restarted with backoff are legitimately + // waiting for the next scheduler tick. + const orphanedHooks = Number( + [ + ...query( + this.sql, + /* sql */ ` + SELECT COUNT(*) AS cnt FROM ${agent_metadata} + WHERE ${agent_metadata.status} = 'idle' + AND ${agent_metadata.current_hook_bead_id} IS NOT NULL + AND ( + ${agent_metadata.last_activity_at} IS NULL + OR ${agent_metadata.last_activity_at} < datetime('now', '-5 minutes') + ) + `, + [] + ), + ][0]?.cnt ?? 0 + ); + + // Recent bead events (last 20) for the activity feed + const recentRows = [ + ...query( + this.sql, + /* sql */ ` + SELECT be.created_at, be.event_type, be.new_value, be.agent_id, be.bead_id, + b.${beads.columns.title} AS bead_title + FROM bead_events AS be + LEFT JOIN ${beads} AS b ON be.bead_id = b.${beads.columns.bead_id} + ORDER BY be.created_at DESC + LIMIT 20 + `, + [] + ), + ]; + + const recentEvents = recentRows.map(row => ({ + time: String(row.created_at), + type: String(row.event_type), + message: formatEventMessage(row), + })); + + return { + alarm: { + nextFireAt: currentAlarm ? new Date(Number(currentAlarm)).toISOString() : null, + intervalMs, + intervalLabel: active ? 'active (5s)' : 'idle (60s)', + }, + agents: agentCounts, + beads: beadCounts, + patrol: { + guppWarnings, + guppEscalations, + stalledAgents, + orphanedHooks, + }, + recentEvents, + }; + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/cloudflare-gastown/src/dos/town/agents.ts b/cloudflare-gastown/src/dos/town/agents.ts index 8f43289fcf..3260a92218 100644 --- a/cloudflare-gastown/src/dos/town/agents.ts +++ b/cloudflare-gastown/src/dos/town/agents.ts @@ -335,7 +335,7 @@ export function allocatePolecatName(sql: SqlStorage): string { /** * Find an idle agent of the given role, or create one. - * For singleton roles (witness, refinery, mayor), reuse existing. + * For singleton roles (mayor), reuse existing. * For polecats, create a new one. */ export function getOrCreateAgent( @@ -345,7 +345,7 @@ export function getOrCreateAgent( townId: string ): Agent { // Town-wide singletons: one per town, not tied to a rig. - const townSingletonRoles = ['witness', 'mayor']; + const townSingletonRoles = ['mayor']; if (townSingletonRoles.includes(role)) { const existing = listAgents(sql, { role }); diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 8a71016b52..085872cc1b 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -99,8 +99,6 @@ export function systemPromptForRole(params: { switch (params.role) { case 'refinery': return `${base} You review code quality and merge PRs. Check for correctness, style, and test coverage.`; - case 'witness': - return `${base} You monitor agent health and report anomalies.`; default: return base; } @@ -172,6 +170,8 @@ export async function startAgentInContainer( platformIntegrationId?: string; /** For convoy beads: the convoy's feature branch to branch from instead of defaultBranch. */ convoyFeatureBranch?: string; + /** All rigs in the town (mayor only) — used to set up browse worktrees on fresh containers. */ + rigs?: Array<{ rigId: string; gitUrl: string; defaultBranch: string }>; } ): Promise { console.log( @@ -259,6 +259,10 @@ export async function startAgentInContainer( defaultBranch: params.defaultBranch, envVars, platformIntegrationId: params.platformIntegrationId, + // For convoy agents, start from the convoy's feature branch so the + // worktree includes all previously merged convoy work. + startPoint: params.convoyFeatureBranch ? `origin/${params.convoyFeatureBranch}` : undefined, + rigs: params.rigs, }), }); diff --git a/cloudflare-gastown/src/dos/town/patrol.ts b/cloudflare-gastown/src/dos/town/patrol.ts new file mode 100644 index 0000000000..79dbedf8fc --- /dev/null +++ b/cloudflare-gastown/src/dos/town/patrol.ts @@ -0,0 +1,633 @@ +/** + * Witness & Deacon patrol functions for the TownDO alarm handler. + * + * All mechanical checks run as deterministic code. Ambiguous situations + * produce triage request beads (type='issue', label='gt:triage-request') + * with structured context for an on-demand LLM triage agent to resolve. + * + * See #442 for the full design. + */ + +import { z } from 'zod'; +import { beads, BeadRecord as BeadRecordSchema } from '../../db/tables/beads.table'; +import { agent_metadata, AgentMetadataRecord } from '../../db/tables/agent-metadata.table'; +import { bead_dependencies } from '../../db/tables/bead-dependencies.table'; +import { query } from '../../util/query.util'; +import { sendMail } from './mail'; +import { deleteAgent, getOrCreateAgent, hookBead, unhookBead } from './agents'; +import { createBead, logBeadEvent, updateBeadStatus } from './beads'; + +const LOG = '[patrol]'; + +// ── Thresholds ────────────────────────────────────────────────────── + +/** First GUPP warning (existing behavior) */ +export const GUPP_WARN_MS = 30 * 60_000; // 30 min +/** Escalate to mayor after second threshold */ +export const GUPP_ESCALATE_MS = 60 * 60_000; // 1h +/** Force-stop agent after third threshold */ +export const GUPP_FORCE_STOP_MS = 2 * 60 * 60_000; // 2h +/** Agents dead/completed for longer than this are GC'd */ +export const AGENT_GC_RETENTION_MS = 24 * 60 * 60_000; // 24h +/** Per-bead timeout (if metadata.timeout_ms is set) */ +export const DEFAULT_BEAD_TIMEOUT_MS = 4 * 60 * 60_000; // 4h fallback +/** Hook considered stale after this duration with no dispatch activity */ +export const STALE_HOOK_MS = 30 * 60_000; // 30 min +/** Agent failing repeatedly within this window is a crash loop */ +export const CRASH_LOOP_WINDOW_MS = 30 * 60_000; // 30 min +/** Minimum failures within the window to flag a crash loop */ +export const CRASH_LOOP_THRESHOLD = 3; + +// ── Triage request types ──────────────────────────────────────────── + +export type TriageType = + | 'dirty_polecat' + | 'stuck_agent' + | 'help_request' + | 'zombie_confirm' + | 'crash_loop'; + +export type TriageRequestMetadata = { + triage_type: TriageType; + agent_bead_id: string | null; + /** The bead the agent was hooked to when the triage request was created. + * Resolve actions should target this bead, not the agent's current hook + * (which may have changed by the time the triage agent resolves it). */ + hooked_bead_id: string | null; + context: Record; + options: string[]; +}; + +// ── Triage request creation ───────────────────────────────────────── + +/** Label used to identify triage request beads (type='issue'). */ +export const TRIAGE_REQUEST_LABEL = 'gt:triage-request'; + +/** Label used to identify the triage agent's batch bead. */ +export const TRIAGE_BATCH_LABEL = 'gt:triage'; + +/** SQL LIKE pattern for querying triage request beads by label. */ +export const TRIAGE_LABEL_LIKE = `%"${TRIAGE_REQUEST_LABEL}"%`; + +/** Create a triage request bead for the LLM triage agent to resolve. */ +export function createTriageRequest( + sql: SqlStorage, + params: { + triageType: TriageType; + agentBeadId: string | null; + /** The bead the agent was hooked to at the time of the request. */ + hookedBeadId?: string | null; + title: string; + context: Record; + options: string[]; + rigId?: string; + } +): void { + // Deduplicate: skip if an open triage request of the same type already + // exists for this agent + if (params.agentBeadId) { + const existing = [ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} FROM ${beads} + WHERE ${beads.type} = 'issue' + AND ${beads.labels} LIKE ? + AND ${beads.status} = 'open' + AND ${beads.assignee_agent_bead_id} = ? + AND json_extract(${beads.metadata}, '$.triage_type') = ? + LIMIT 1 + `, + [TRIAGE_LABEL_LIKE, params.agentBeadId, params.triageType] + ), + ]; + if (existing.length > 0) return; + } + + const metadata: TriageRequestMetadata = { + triage_type: params.triageType, + agent_bead_id: params.agentBeadId, + hooked_bead_id: params.hookedBeadId ?? null, + context: params.context, + options: params.options, + }; + + createBead(sql, { + type: 'issue', + title: params.title, + body: JSON.stringify(params.context), + priority: 'medium', + metadata, + labels: [TRIAGE_REQUEST_LABEL], + assignee_agent_bead_id: params.agentBeadId ?? undefined, + rig_id: params.rigId, + }); + + console.log( + `${LOG} createTriageRequest: type=${params.triageType} agent=${params.agentBeadId ?? 'none'}` + ); +} + +// ── Witness patrol sub-checks ─────────────────────────────────────── + +/** + * Tiered GUPP violation handling: + * - 30 min: send GUPP_CHECK mail (existing behavior) + * - 1h: escalate to mayor + * - 2h: force-stop agent, create triage request for dirty polecat + * + * Returns agent IDs that were force-stopped (caller should stop them + * in the container). + */ +export function detectGUPPViolations( + sql: SqlStorage, + workingAgents: Array<{ + bead_id: string; + current_hook_bead_id: string | null; + last_activity_at: string | null; + }> +): string[] { + const nowMs = Date.now(); + const forceStopIds: string[] = []; + + for (const agent of workingAgents) { + if (!agent.last_activity_at) continue; + const staleMs = nowMs - new Date(agent.last_activity_at).getTime(); + + if (staleMs >= GUPP_FORCE_STOP_MS) { + // Tier 3: force-stop and flag for triage + forceStopIds.push(agent.bead_id); + + createTriageRequest(sql, { + triageType: 'stuck_agent', + agentBeadId: agent.bead_id, + hookedBeadId: agent.current_hook_bead_id, + title: `Force-stopped agent after ${Math.round(staleMs / 60_000)}min GUPP violation`, + context: { + last_activity_at: agent.last_activity_at, + stale_minutes: Math.round(staleMs / 60_000), + action_taken: 'force_stop', + }, + options: ['RESTART', 'ESCALATE_TO_MAYOR', 'CLOSE_BEAD'], + }); + + // Mark agent as stalled + query( + sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'stalled' + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.bead_id] + ); + + console.log( + `${LOG} GUPP force-stop: agent=${agent.bead_id} stale=${Math.round(staleMs / 60_000)}min` + ); + } else if (staleMs >= GUPP_ESCALATE_MS) { + // Tier 2: create a triage request for the stuck agent. The triage + // agent (or mayor, if escalated) will decide whether to restart, + // nudge, or force-stop. Also warn the stuck agent directly. + const existingEsc = [ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} FROM ${beads} + WHERE ${beads.type} = 'message' + AND ${beads.assignee_agent_bead_id} = ? + AND ${beads.title} = 'GUPP_ESCALATION' + AND ${beads.status} = 'open' + LIMIT 1 + `, + [agent.bead_id] + ), + ]; + if (existingEsc.length === 0) { + // Notify the stuck agent + sendMail(sql, { + from_agent_id: 'patrol', + to_agent_id: agent.bead_id, + subject: 'GUPP_ESCALATION', + body: `You have been inactive for ${Math.round(staleMs / 60_000)} minutes. This has been escalated. You will be force-stopped if inactivity continues.`, + }); + + // Create a triage request so the triage agent (or mayor) is aware + createTriageRequest(sql, { + triageType: 'stuck_agent', + agentBeadId: agent.bead_id, + hookedBeadId: agent.current_hook_bead_id, + title: `Agent inactive for ${Math.round(staleMs / 60_000)}min — GUPP escalation`, + context: { + last_activity_at: agent.last_activity_at, + stale_minutes: Math.round(staleMs / 60_000), + tier: 'escalation', + }, + options: ['RESTART', 'NUDGE', 'ESCALATE_TO_MAYOR', 'CLOSE_BEAD'], + }); + + console.log(`${LOG} GUPP escalation: agent=${agent.bead_id}`); + } + } else if (staleMs >= GUPP_WARN_MS) { + // Tier 1: send warning mail (existing behavior, idempotent) + const existingGupp = [ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} FROM ${beads} + WHERE ${beads.type} = 'message' + AND ${beads.assignee_agent_bead_id} = ? + AND ${beads.title} = 'GUPP_CHECK' + AND ${beads.status} = 'open' + LIMIT 1 + `, + [agent.bead_id] + ), + ]; + if (existingGupp.length === 0) { + sendMail(sql, { + from_agent_id: 'patrol', + to_agent_id: agent.bead_id, + subject: 'GUPP_CHECK', + body: 'You have had work hooked for 30+ minutes with no activity. Are you stuck? If so, call gt_escalate.', + }); + } + } + } + + return forceStopIds; +} + +/** + * Detect orphaned work: idle agents with a hooked bead but no recent + * dispatch activity. These agents were assigned work but never started. + * + * Different from schedulePendingWork which handles the cooldown/retry + * loop — this catches agents that have been idle+hooked for an + * unreasonably long time (beyond what the scheduler would tolerate). + */ +export function detectOrphanedWork(sql: SqlStorage): void { + const cutoff = new Date(Date.now() - STALE_HOOK_MS).toISOString(); + + const rows = AgentMetadataRecord.pick({ + bead_id: true, + current_hook_bead_id: true, + dispatch_attempts: true, + last_activity_at: true, + }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${agent_metadata.bead_id}, + ${agent_metadata.current_hook_bead_id}, + ${agent_metadata.dispatch_attempts}, + ${agent_metadata.last_activity_at} + FROM ${agent_metadata} + WHERE ${agent_metadata.status} = 'idle' + AND ${agent_metadata.current_hook_bead_id} IS NOT NULL + AND ${agent_metadata.dispatch_attempts} >= 5 + AND (${agent_metadata.last_activity_at} IS NULL OR ${agent_metadata.last_activity_at} < ?) + `, + [cutoff] + ), + ]); + + for (const row of rows) { + // These agents have exhausted dispatch attempts AND are still hooked. + // schedulePendingWork should have failed the bead — this is a safety net. + console.log( + `${LOG} orphaned work detected: agent=${row.bead_id} hook=${row.current_hook_bead_id} attempts=${row.dispatch_attempts}` + ); + + // Actually fail the bead and unhook the agent (matching schedulePendingWork behavior) + if (row.current_hook_bead_id) { + updateBeadStatus(sql, row.current_hook_bead_id, 'failed', row.bead_id); + unhookBead(sql, row.bead_id); + } + } +} + +/** + * Garbage-collect dead/completed agents past the retention period. + * Agents in 'idle' status with no hooked bead whose creation time + * exceeds the retention threshold and that have been idle for longer + * than the retention period are deleted. + * + * Only targets polecats and refinery agents — the mayor singleton + * is never GC'd. + */ +export function agentGC(sql: SqlStorage): number { + const cutoff = new Date(Date.now() - AGENT_GC_RETENTION_MS).toISOString(); + + // Find agents eligible for GC: idle polecats/refinery with no hook, + // whose last activity is older than the retention period + const rows = AgentMetadataRecord.pick({ bead_id: true }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${agent_metadata.bead_id} + FROM ${agent_metadata} + WHERE ${agent_metadata.status} IN ('idle', 'dead') + AND ${agent_metadata.current_hook_bead_id} IS NULL + AND ${agent_metadata.role} IN ('polecat', 'refinery') + AND ( + ${agent_metadata.last_activity_at} IS NOT NULL + AND ${agent_metadata.last_activity_at} < ? + ) + `, + [cutoff] + ), + ]); + + for (const row of rows) { + console.log(`${LOG} agentGC: deleting agent=${row.bead_id}`); + deleteAgent(sql, row.bead_id); + } + + return rows.length; +} + +/** + * Enforce per-bead timeouts. Beads with metadata.timeout_ms that have + * been in_progress for longer than their timeout are failed. + * + * Returns timed-out bead IDs and their assigned agent IDs (so the + * caller can stop the agent processes in the container). + */ +export function checkTimerGates( + sql: SqlStorage +): Array<{ beadId: string; agentId: string | null }> { + const nowMs = Date.now(); + const timedOut: Array<{ beadId: string; agentId: string | null }> = []; + + // Find in_progress beads with a timeout_ms in metadata + const rows = BeadRecordSchema.pick({ + bead_id: true, + metadata: true, + updated_at: true, + assignee_agent_bead_id: true, + }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id}, ${beads.metadata}, ${beads.updated_at}, ${beads.assignee_agent_bead_id} + FROM ${beads} + WHERE ${beads.status} = 'in_progress' + AND ${beads.type} IN ('issue', 'molecule') + AND json_extract(${beads.metadata}, '$.timeout_ms') IS NOT NULL + `, + [] + ), + ]); + + for (const row of rows) { + const timeoutMs = Number(row.metadata?.timeout_ms ?? DEFAULT_BEAD_TIMEOUT_MS); + if (!timeoutMs || isNaN(timeoutMs) || timeoutMs <= 0) continue; + + const elapsedMs = nowMs - new Date(row.updated_at).getTime(); + if (elapsedMs > timeoutMs) { + // Fail the bead and unhook the assigned agent so the scheduler + // can recover the slot (matching schedulePendingWork's failure path). + // updateBeadStatus already logs a status_changed event internally, + // so no additional logBeadEvent call is needed here. + updateBeadStatus(sql, row.bead_id, 'failed', row.assignee_agent_bead_id ?? 'patrol'); + + if (row.assignee_agent_bead_id) { + unhookBead(sql, row.assignee_agent_bead_id); + } + + timedOut.push({ beadId: row.bead_id, agentId: row.assignee_agent_bead_id ?? null }); + console.log( + `${LOG} checkTimerGates: bead=${row.bead_id} timed out after ${Math.round(elapsedMs / 60_000)}min (limit=${Math.round(timeoutMs / 60_000)}min)` + ); + } + } + + return timedOut; +} + +// ── Deacon patrol sub-checks ──────────────────────────────────────── + +/** + * Detect stale hooks: agents that have been idle with a hook for an + * extended period without any dispatch activity. This catches cases + * where schedulePendingWork's cooldown/retry loop failed silently. + * + * Different from detectOrphanedWork (which catches exhausted retries) — + * this catches agents that are hooked+idle but haven't even been + * attempted recently. + */ +export function detectStaleHooks(sql: SqlStorage): void { + const cutoff = new Date(Date.now() - STALE_HOOK_MS).toISOString(); + + const rows = AgentMetadataRecord.pick({ + bead_id: true, + current_hook_bead_id: true, + dispatch_attempts: true, + last_activity_at: true, + }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${agent_metadata.bead_id}, + ${agent_metadata.current_hook_bead_id}, + ${agent_metadata.dispatch_attempts}, + ${agent_metadata.last_activity_at} + FROM ${agent_metadata} + WHERE ${agent_metadata.status} = 'idle' + AND ${agent_metadata.current_hook_bead_id} IS NOT NULL + AND ${agent_metadata.dispatch_attempts} < 5 + AND (${agent_metadata.last_activity_at} IS NULL OR ${agent_metadata.last_activity_at} < ?) + `, + [cutoff] + ), + ]); + + for (const row of rows) { + // Reset last_activity_at to trigger schedulePendingWork to pick it up + // on the next alarm cycle (it skips agents with recent activity). + query( + sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.last_activity_at} = NULL + WHERE ${agent_metadata.bead_id} = ? + `, + [row.bead_id] + ); + + console.log( + `${LOG} stale hook nudge: agent=${row.bead_id} hook=${row.current_hook_bead_id} attempts=${row.dispatch_attempts}` + ); + } +} + +/** + * Feed stranded convoys: find active convoys that have open beads with + * no assigned agent. Auto-sling by assigning idle polecats. + */ +export function feedStrandedConvoys(sql: SqlStorage, townId: string): void { + // Find open issue beads that: + // 1. Belong to an active convoy (tracked by a convoy bead) + // 2. Have no assigned agent + const StrandedBeadRow = z.object({ + bead_id: z.string(), + rig_id: z.string().nullable(), + convoy_bead_id: z.string(), + }); + + const rows = StrandedBeadRow.array().parse([ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id}, + ${beads.rig_id}, + ${bead_dependencies.depends_on_bead_id} AS convoy_bead_id + FROM ${bead_dependencies} + INNER JOIN ${beads} ON ${bead_dependencies.bead_id} = ${beads.bead_id} + INNER JOIN ${beads} AS convoy ON ${bead_dependencies.depends_on_bead_id} = convoy.${beads.columns.bead_id} + WHERE ${bead_dependencies.dependency_type} = 'tracks' + AND convoy.${beads.columns.type} = 'convoy' + AND convoy.${beads.columns.status} = 'open' + AND ${beads.status} = 'open' + AND ${beads.type} = 'issue' + AND ${beads.assignee_agent_bead_id} IS NULL + `, + [] + ), + ]); + + if (rows.length === 0) return; + + console.log(`${LOG} feedStrandedConvoys: found ${rows.length} unassigned convoy bead(s)`); + + // For each stranded bead, find or create an idle polecat in the same rig + // and hook it. The next schedulePendingWork cycle will dispatch it. + // We import getOrCreateAgent inline to avoid circular dependency issues. + for (const row of rows) { + const rigId = row.rig_id; + if (!rigId) continue; + + try { + const agent = getOrCreateAgent(sql, 'polecat', rigId, townId); + hookBead(sql, agent.id, row.bead_id); + // Clear last_activity_at so schedulePendingWork picks this up on + // the next alarm tick instead of waiting for the dispatch cooldown. + query( + sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.last_activity_at} = NULL + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.id] + ); + console.log( + `${LOG} feedStrandedConvoys: assigned agent=${agent.id} to bead=${row.bead_id} in convoy=${row.convoy_bead_id}` + ); + } catch (err) { + console.warn( + `${LOG} feedStrandedConvoys: failed to assign agent to bead=${row.bead_id}:`, + err + ); + } + } +} + +/** + * Detect crash loops: agents that have failed repeatedly within a + * short window. Creates a triage request for LLM assessment. + * + * Crash loop detection uses the bead_events table to count recent + * status_changed events to 'failed' for each agent. + */ +export function detectCrashLoops(sql: SqlStorage): void { + const windowCutoff = new Date(Date.now() - CRASH_LOOP_WINDOW_MS).toISOString(); + + // Count recent failure events per agent + const CrashRow = z.object({ + agent_id: z.string(), + fail_count: z.number(), + }); + + const rows = CrashRow.array().parse([ + ...query( + sql, + /* sql */ ` + SELECT be.agent_id, COUNT(*) AS fail_count + FROM bead_events AS be + WHERE be.event_type = 'status_changed' + AND be.new_value = 'failed' + AND be.agent_id IS NOT NULL + AND be.created_at > ? + GROUP BY be.agent_id + HAVING fail_count >= ? + `, + [windowCutoff, CRASH_LOOP_THRESHOLD] + ), + ]); + + for (const row of rows) { + createTriageRequest(sql, { + triageType: 'crash_loop', + agentBeadId: row.agent_id, + title: `Crash loop detected: ${row.fail_count} failures in ${CRASH_LOOP_WINDOW_MS / 60_000}min`, + context: { + agent_id: row.agent_id, + fail_count: row.fail_count, + window_minutes: CRASH_LOOP_WINDOW_MS / 60_000, + }, + options: ['RESTART_WITH_BACKOFF', 'REASSIGN_BEAD', 'ESCALATE_TO_MAYOR'], + }); + + console.log( + `${LOG} crash loop: agent=${row.agent_id} failures=${row.fail_count} in ${CRASH_LOOP_WINDOW_MS / 60_000}min` + ); + } +} + +// ── Pending triage requests ───────────────────────────────────────── + +/** Count open triage request beads (issue beads with gt:triage-request label). */ +export function countPendingTriageRequests(sql: SqlStorage): number { + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT COUNT(*) AS cnt FROM ${beads} + WHERE ${beads.type} = 'issue' + AND ${beads.labels} LIKE ? + AND ${beads.status} = 'open' + `, + [TRIAGE_LABEL_LIKE] + ), + ]; + return Number(z.object({ cnt: z.number() }).parse(rows[0]).cnt); +} + +/** List open triage request beads for the triage agent prompt. */ +export function listPendingTriageRequests(sql: SqlStorage): z.output[] { + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT * FROM ${beads} + WHERE ${beads.type} = 'issue' + AND ${beads.labels} LIKE ? + AND ${beads.status} = 'open' + ORDER BY ${beads.created_at} ASC + LIMIT 20 + `, + [TRIAGE_LABEL_LIKE] + ), + ]; + return BeadRecordSchema.array().parse(rows); +} diff --git a/cloudflare-gastown/src/dos/town/review-queue.ts b/cloudflare-gastown/src/dos/town/review-queue.ts index 381deb2f56..f24c72a693 100644 --- a/cloudflare-gastown/src/dos/town/review-queue.ts +++ b/cloudflare-gastown/src/dos/town/review-queue.ts @@ -464,6 +464,17 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu if (!agent) throw new Error(`Agent ${agentId} not found`); if (!agent.current_hook_bead_id) throw new Error(`Agent ${agentId} has no hooked bead`); + // Triage batch beads don't produce code — close and unhook without + // submitting to the review queue. Only applies to system-created triage + // beads (created_by = 'patrol'). User-created beads that happen to carry + // the gt:triage label go through normal review flow. + const hookedBead = getBead(sql, agent.current_hook_bead_id); + if (hookedBead?.labels.includes('gt:triage') && hookedBead.created_by === 'patrol') { + closeBead(sql, agent.current_hook_bead_id, agentId); + unhookBead(sql, agentId); + return; + } + if (agent.role === 'refinery') { // The refinery handles merging (direct strategy) or PR creation (pr strategy) // itself. When it calls gt_done: diff --git a/cloudflare-gastown/src/dos/town/rigs.ts b/cloudflare-gastown/src/dos/town/rigs.ts index e07b410580..113737de16 100644 --- a/cloudflare-gastown/src/dos/town/rigs.ts +++ b/cloudflare-gastown/src/dos/town/rigs.ts @@ -54,18 +54,61 @@ export function addRig( } ): RigRecord { const timestamp = new Date().toISOString(); - query( - sql, - /* sql */ ` - INSERT INTO rigs (id, name, git_url, default_branch, config, created_at) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - name = excluded.name, - git_url = excluded.git_url, - default_branch = excluded.default_branch - `, - [input.rigId, input.name, input.gitUrl, input.defaultBranch, '{}', timestamp] - ); + try { + query( + sql, + /* sql */ ` + INSERT INTO rigs (id, name, git_url, default_branch, config, created_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + git_url = excluded.git_url, + default_branch = excluded.default_branch + `, + [input.rigId, input.name, input.gitUrl, input.defaultBranch, '{}', timestamp] + ); + } catch (err) { + if (err instanceof Error && err.message.includes('UNIQUE constraint failed: rigs.name')) { + // A rig with this name already exists under a different ID. + // Clean up the stale entry only if it's truly orphaned (has no + // open or in-progress beads). Otherwise surface the conflict. + const staleRig = query(sql, /* sql */ `SELECT id FROM rigs WHERE name = ? AND id != ?`, [ + input.name, + input.rigId, + ]); + const staleId = [...staleRig][0]?.id as string | undefined; + if (staleId) { + const activeBeads = query( + sql, + /* sql */ `SELECT COUNT(*) AS cnt FROM beads WHERE rig_id = ? AND status IN ('open', 'in_progress')`, + [staleId] + ); + const count = Number([...activeBeads][0]?.cnt ?? 0); + if (count > 0) { + throw new Error( + `A rig named "${input.name}" already exists (id=${staleId}) with ${count} active bead(s). ` + + `Delete it first before creating a new rig with the same name.` + ); + } + // Truly orphaned — safe to remove + query(sql, /* sql */ `DELETE FROM rigs WHERE id = ?`, [staleId]); + } + query( + sql, + /* sql */ ` + INSERT INTO rigs (id, name, git_url, default_branch, config, created_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + git_url = excluded.git_url, + default_branch = excluded.default_branch + `, + [input.rigId, input.name, input.gitUrl, input.defaultBranch, '{}', timestamp] + ); + } else { + throw err; + } + } const rig = getRig(sql, input.rigId); if (!rig) throw new Error('Failed to create rig'); diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 7f0cb3018c..4abe264405 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -2,6 +2,7 @@ import { Hono } from 'hono'; import type { Context } from 'hono'; import { cors } from 'hono/cors'; import { getTownContainerStub } from './dos/TownContainer.do'; +import { getTownDOStub } from './dos/Town.do'; import { resError } from './util/res.util'; import { dashboardHtml } from './ui/dashboard.ui'; import { @@ -44,6 +45,7 @@ import { handleCompleteReview, } from './handlers/rig-review-queue.handler'; import { handleCreateEscalation } from './handlers/rig-escalations.handler'; +import { handleResolveTriage } from './handlers/rig-triage.handler'; import { handleListBeadEvents } from './handlers/rig-bead-events.handler'; import { handleListTownEvents } from './handlers/town-events.handler'; import { @@ -265,6 +267,12 @@ app.post('/api/towns/:townId/rigs/:rigId/escalations', c => handleCreateEscalation(c, c.req.param()) ); +// ── Triage ────────────────────────────────────────────────────────────── + +app.post('/api/towns/:townId/rigs/:rigId/triage/resolve', c => + handleResolveTriage(c, c.req.param()) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Skip in development. Container→worker routes use the agent JWT @@ -440,6 +448,7 @@ app.onError((err, c) => { const WS_STREAM_PATTERN = /^\/api\/towns\/([^/]+)\/container\/agents\/([^/]+)\/stream$/; const WS_PTY_PATTERN = /^\/api\/towns\/([^/]+)\/container\/agents\/([^/]+)\/pty\/([^/]+)\/connect$/; +const WS_STATUS_PATTERN = /^\/api\/towns\/([^/]+)\/status\/ws$/; export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { @@ -475,6 +484,15 @@ export default { const stub = getTownContainerStub(env, townId); return stub.fetch(request); } + + // Town alarm status (real-time push) + const statusMatch = url.pathname.match(WS_STATUS_PATTERN); + if (statusMatch) { + const townId = statusMatch[1]; + console.log(`[gastown-worker] WS upgrade (status): townId=${townId}`); + const stub = getTownDOStub(env, townId); + return stub.fetch(request); + } } // All other requests go through Hono diff --git a/cloudflare-gastown/src/handlers/rig-triage.handler.ts b/cloudflare-gastown/src/handlers/rig-triage.handler.ts new file mode 100644 index 0000000000..29965bdd1c --- /dev/null +++ b/cloudflare-gastown/src/handlers/rig-triage.handler.ts @@ -0,0 +1,64 @@ +import type { Context } from 'hono'; +import { z } from 'zod'; +import { getTownDOStub } from '../dos/Town.do'; +import { resSuccess, resError } from '../util/res.util'; +import { parseJsonBody } from '../util/parse-json-body.util'; +import { getEnforcedAgentId } from '../middleware/auth.middleware'; +import type { GastownEnv } from '../gastown.worker'; + +const VALID_TRIAGE_ACTIONS = [ + 'RESTART', + 'RESTART_WITH_BACKOFF', + 'CLOSE_BEAD', + 'ESCALATE', + 'ESCALATE_TO_MAYOR', + 'NUDGE', + 'REASSIGN_BEAD', + 'DISCARD', + 'PROVIDE_GUIDANCE', +] as const; + +const ResolveTriageBody = z.object({ + triage_request_bead_id: z.string().min(1), + action: z + .string() + .min(1) + .transform(v => v.toUpperCase()) + .pipe(z.enum(VALID_TRIAGE_ACTIONS)), + resolution_notes: z.string(), +}); + +export async function handleResolveTriage(c: Context, params: { rigId: string }) { + const agentId = getEnforcedAgentId(c); + if (!agentId) { + return c.json(resError('Agent authentication required'), 401); + } + + const parsed = ResolveTriageBody.safeParse(await parseJsonBody(c)); + if (!parsed.success) { + return c.json( + { success: false, error: 'Invalid request body', issues: parsed.error.issues }, + 400 + ); + } + const townId = c.get('townId'); + const town = getTownDOStub(c.env, townId); + + // Verify this agent is actually working on a triage batch. Without this + // check, any rig agent (polecat, refinery) could call the endpoint and + // trigger restart/close/escalate side effects on other agents. + const hookedBead = await town.getHookedBead(agentId); + if ( + !hookedBead || + !hookedBead.labels.includes('gt:triage') || + hookedBead.created_by !== 'patrol' + ) { + return c.json(resError('Only triage agents can resolve triage requests'), 403); + } + + const bead = await town.resolveTriage({ + agent_id: agentId, + ...parsed.data, + }); + return c.json(resSuccess(bead), 200); +} diff --git a/cloudflare-gastown/src/prompts/mayor-system.prompt.ts b/cloudflare-gastown/src/prompts/mayor-system.prompt.ts index 6819211233..6e360ee67b 100644 --- a/cloudflare-gastown/src/prompts/mayor-system.prompt.ts +++ b/cloudflare-gastown/src/prompts/mayor-system.prompt.ts @@ -166,6 +166,41 @@ When calling gt_sling, write clear, actionable descriptions: The polecat works autonomously — it cannot ask you questions mid-task. Front-load ALL necessary context in the body. A polecat with a detailed body succeeds. A polecat with a vague body flounders. +## Browsing Rig Codebases + +You have READ-ONLY access to every rig's codebase in your town. Each rig has a checked-out copy of its default branch at: + +\`/workspace/rigs//browse/\` + +Use gt_list_rigs to discover rig IDs, then browse the codebase using your standard file-reading tools (Read, Grep, Glob). This is how you build context to write better bead descriptions. + +**Before browsing a rig, always pull the latest changes first:** +\`\`\`bash +cd /workspace/rigs//browse && git pull --rebase --autostash +\`\`\` + +**Browsing workflow:** +1. Call gt_list_rigs to get rig IDs +2. Pull latest in the browse directory +3. Explore the codebase (directory structure, key files, dependencies, etc.) +4. Use what you learn to write precise, context-rich bead descriptions when slinging + +This is especially useful when: +- The user's request is vague and you need to understand the codebase structure first +- You need to identify the right files, components, or modules for a task +- You want to include specific file paths and function names in bead bodies +- You need to understand existing patterns before delegating work + +## DO NOT EDIT FILES + +**You must NEVER edit, write, create, or delete files.** Your role is coordination, not implementation. All code changes must go through polecats via gt_sling or gt_sling_batch. + +If you catch yourself about to use Edit, Write, or any file-modification tool — STOP. Instead, sling a bead describing the change you want. + +You may READ files to build context. You must NEVER WRITE files. The browse worktrees are for observation only. Any change you make there will be lost and could corrupt the shared repo state. + +The only exception is running \`git pull\` in a browse directory to get the latest code. + ## Important - You maintain context across messages. This is a continuous conversation. diff --git a/cloudflare-gastown/src/prompts/refinery-system.prompt.ts b/cloudflare-gastown/src/prompts/refinery-system.prompt.ts index 09b934c883..2f54dde46e 100644 --- a/cloudflare-gastown/src/prompts/refinery-system.prompt.ts +++ b/cloudflare-gastown/src/prompts/refinery-system.prompt.ts @@ -107,6 +107,18 @@ This bead is part of a **review-then-land** convoy. Your job for this intermedia 3. **If changes needed, send rework request** — the polecat will fix and resubmit. The final merge/PR to main happens automatically once ALL beads in the convoy are done. Do NOT create a PR for this intermediate step.`; + } + if (ctx.mergeMode === 'review-then-land' && !ctx.isIntermediateStep) { + return ` +## Convoy Context — Final Landing +This is the **final landing merge** for a review-then-land convoy. All individual beads have been reviewed and merged into the convoy's feature branch. Your job is to: + +1. **Review the combined diff** — the feature branch contains the accumulated work of all convoy beads. Review the full diff against the target branch to ensure everything integrates correctly. +2. **Run quality gates** — this is the last check before the work lands on the target branch. All gates must pass against the combined changes. +3. **If approved, perform the merge or create a PR** — use the merge strategy shown above. This IS the final landing, so follow the merge/PR instructions below to land the convoy on the target branch. +4. **If changes needed, escalate** — since there's no single polecat to send rework to, call \`gt_escalate\` with severity "medium" describing the issue. Do NOT attempt to fix the code yourself. + +This merge represents all the work in the convoy landing together. Treat it with the same rigor as a large feature branch merge.`; } if (ctx.mergeMode === 'review-and-merge') { return ` diff --git a/cloudflare-gastown/src/prompts/triage-system.prompt.ts b/cloudflare-gastown/src/prompts/triage-system.prompt.ts new file mode 100644 index 0000000000..7e593d8ef5 --- /dev/null +++ b/cloudflare-gastown/src/prompts/triage-system.prompt.ts @@ -0,0 +1,73 @@ +/** + * Build the system prompt for a triage agent. + * + * The triage agent is a short-lived LLM session spawned by the TownDO + * alarm handler when mechanical patrol checks produce ambiguous results. + * It processes a batch of triage_request beads, makes a judgment call + * for each, and exits. See #442. + */ + +type TriageRequestBead = { + bead_id: string; + title: string; + body: string | null; + metadata: Record; +}; + +export function buildTriageSystemPrompt(pendingRequests: TriageRequestBead[]): string { + const situations = pendingRequests + .map((req, i) => { + const meta = req.metadata ?? {}; + const triageType = String(meta.triage_type ?? 'unknown').toUpperCase(); + const options = Array.isArray(meta.options) ? (meta.options as string[]).join(' | ') : 'N/A'; + const context = + typeof meta.context === 'object' && meta.context !== null + ? JSON.stringify(meta.context, null, 2) + : (req.body ?? 'No additional context'); + + return `${i + 1}. [${triageType}] ${req.title} + Triage request ID: ${req.bead_id} + Context: +${context + .split('\n') + .map(line => ` ${line}`) + .join('\n')} + Options: ${options}`; + }) + .join('\n\n'); + + return `You are a Gastown triage agent. Your job is to assess ambiguous situations +that the mechanical patrol checks could not resolve automatically. + +You will be given a list of situations. For each one: +1. Read the context carefully. +2. Assess the situation and choose the best action from the available options. +3. Call gt_triage_resolve with the triage request bead ID and your chosen action. +4. Briefly explain your reasoning in the resolution_notes field. + +When you have resolved all situations, call gt_bead_close on your hooked bead to signal completion. +Do NOT call gt_done — it submits work to the review queue, which is not appropriate for triage. + +## Guidelines + +- **Be decisive.** The system is waiting on your judgment. Do not deliberate excessively. +- **Prefer least-disruptive actions.** RESTART over CLOSE_BEAD. NUDGE over ESCALATE. +- **Escalate genuinely hard problems.** If a situation requires human context you don't have, escalate rather than guess. +- **Never skip a triage request.** Every pending request must be resolved. + +## Available Tools + +- **gt_triage_resolve** — Resolve a triage request. Provide the triage_request_bead_id, chosen action, and brief notes. +- **gt_mail_send** — Send guidance to a stuck agent. +- **gt_escalate** — Forward a problem to the Mayor or human operators. +- **gt_bead_close** — Close your hooked bead when all triage requests have been processed. + +## Situations to Assess + +${situations} + +--- + +Process each situation above. For each one, call gt_triage_resolve with your decision. +When all are resolved, call gt_bead_close on your hooked bead to signal completion.`; +} diff --git a/cloudflare-gastown/src/trpc/router.ts b/cloudflare-gastown/src/trpc/router.ts index 4c344edbcd..4fbe8a5909 100644 --- a/cloudflare-gastown/src/trpc/router.ts +++ b/cloudflare-gastown/src/trpc/router.ts @@ -28,6 +28,7 @@ import { RpcSlingResultOutput, RpcRigDetailOutput, RpcConvoyDetailOutput, + RpcAlarmStatusOutput, } from './schemas'; import type { TRPCContext } from './init'; @@ -185,6 +186,15 @@ export const gastownRouter = router({ await townStub.setTownId(input.townId); await townStub.updateTownConfig({ kilocode_token: kilocodeToken }); + // Resolve git credentials BEFORE configureRig so that + // townConfig.git_auth.github_token is populated when + // setupRigRepoInContainer reads it for the proactive clone. + try { + await refreshGitCredentials(ctx.env, input.townId, input.gitUrl, user.id); + } catch (err) { + console.warn('[gastown-trpc] createRig: git credential refresh failed', err); + } + const userStub = getGastownUserStub(ctx.env, user.id); const rig = await userStub.createRig({ town_id: input.townId, @@ -225,13 +235,6 @@ export const gastownRouter = router({ throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Failed to configure rig' }); } - // Best-effort: resolve git credentials from the git-token-service - try { - await refreshGitCredentials(ctx.env, input.townId, input.gitUrl, user.id); - } catch (err) { - console.warn('[gastown-trpc] createRig: git credential refresh failed', err); - } - return rig; }), @@ -259,7 +262,12 @@ export const gastownRouter = router({ deleteRig: gastownProcedure .input(z.object({ rigId: z.string().uuid() })) .mutation(async ({ ctx, input }) => { - await verifyRigOwnership(ctx.env, ctx.userId, input.rigId); + const rig = await verifyRigOwnership(ctx.env, ctx.userId, input.rigId); + // Remove from Town DO first so the name is freed before the user + // record is deleted. If this fails the user record is still intact + // and the user can retry. + const townStub = getTownDOStub(ctx.env, rig.town_id); + await townStub.removeRig(input.rigId); const userStub = getGastownUserStub(ctx.env, ctx.userId); await userStub.deleteRig(input.rigId); }), @@ -370,6 +378,16 @@ export const gastownRouter = router({ return townStub.getMayorStatus(); }), + getAlarmStatus: gastownProcedure + .input(z.object({ townId: z.string().uuid() })) + .output(RpcAlarmStatusOutput) + .query(async ({ ctx, input }) => { + await verifyTownOwnership(ctx.env, ctx.userId, input.townId); + const townStub = getTownDOStub(ctx.env, input.townId); + await townStub.setTownId(input.townId); + return townStub.getAlarmStatus(); + }), + ensureMayor: gastownProcedure .input(z.object({ townId: z.string().uuid() })) .output(RpcMayorSendResultOutput) diff --git a/cloudflare-gastown/src/trpc/schemas.ts b/cloudflare-gastown/src/trpc/schemas.ts index e0ae802a3e..c849dd84c9 100644 --- a/cloudflare-gastown/src/trpc/schemas.ts +++ b/cloudflare-gastown/src/trpc/schemas.ts @@ -53,10 +53,10 @@ export const BeadOutput = z.object({ export const AgentOutput = z.object({ id: z.string(), rig_id: z.string().nullable(), - role: z.enum(['polecat', 'refinery', 'mayor', 'witness']), + role: z.enum(['polecat', 'refinery', 'mayor']).or(z.string()), name: z.string(), identity: z.string(), - status: z.enum(['idle', 'working', 'stalled', 'dead']), + status: z.enum(['idle', 'working', 'stalled', 'dead']).or(z.string()), current_hook_bead_id: z.string().nullable(), dispatch_attempts: z.number().default(0), last_activity_at: z.string().nullable(), @@ -184,4 +184,40 @@ export const RpcPtySessionOutput = rpcSafe(PtySessionOutput); export const RpcConvoyOutput = rpcSafe(ConvoyOutput); export const RpcConvoyDetailOutput = rpcSafe(ConvoyDetailOutput); export const RpcSlingResultOutput = rpcSafe(SlingResultOutput); + +// Alarm status +const AlarmStatusOutput = z.object({ + alarm: z.object({ + nextFireAt: z.string().nullable(), + intervalMs: z.number(), + intervalLabel: z.string(), + }), + agents: z.object({ + working: z.number(), + idle: z.number(), + stalled: z.number(), + dead: z.number(), + total: z.number(), + }), + beads: z.object({ + open: z.number(), + inProgress: z.number(), + failed: z.number(), + triageRequests: z.number(), + }), + patrol: z.object({ + guppWarnings: z.number(), + guppEscalations: z.number(), + stalledAgents: z.number(), + orphanedHooks: z.number(), + }), + recentEvents: z.array( + z.object({ + time: z.string(), + type: z.string(), + message: z.string(), + }) + ), +}); +export const RpcAlarmStatusOutput = rpcSafe(AlarmStatusOutput); export const RpcRigDetailOutput = rpcSafe(RigDetailOutput); diff --git a/cloudflare-gastown/src/types.ts b/cloudflare-gastown/src/types.ts index 46f7b5b543..a775283462 100644 --- a/cloudflare-gastown/src/types.ts +++ b/cloudflare-gastown/src/types.ts @@ -48,7 +48,7 @@ export type BeadFilter = { // -- Agents (now beads + agent_metadata) -- -export const AgentRole = z.enum(['polecat', 'refinery', 'mayor', 'witness']); +export const AgentRole = z.enum(['polecat', 'refinery', 'mayor']); export type AgentRole = z.infer; export const AgentStatus = z.enum(['idle', 'working', 'stalled', 'dead']); diff --git a/cloudflare-gastown/src/ui/dashboard.ui.ts b/cloudflare-gastown/src/ui/dashboard.ui.ts index 51ca2d7d39..1fee579ed9 100644 --- a/cloudflare-gastown/src/ui/dashboard.ui.ts +++ b/cloudflare-gastown/src/ui/dashboard.ui.ts @@ -93,7 +93,7 @@ export function dashboardHtml(): string { - + diff --git a/src/components/gastown/ConvoyTimeline.tsx b/src/components/gastown/ConvoyTimeline.tsx index 7371bb459c..840c178438 100644 --- a/src/components/gastown/ConvoyTimeline.tsx +++ b/src/components/gastown/ConvoyTimeline.tsx @@ -158,23 +158,26 @@ function ConvoyCard({ {/* Convoy header */}
- + CONVOY - + {convoy.title} {convoy.feature_branch && ( - - {convoy.feature_branch} + + {convoy.feature_branch} )}
-
+
{convoy.closed_beads}/{convoy.total_beads} diff --git a/src/components/gastown/TerminalBar.tsx b/src/components/gastown/TerminalBar.tsx index f7ce4a2878..066f30cdc6 100644 --- a/src/components/gastown/TerminalBar.tsx +++ b/src/components/gastown/TerminalBar.tsx @@ -1,11 +1,12 @@ 'use client'; -import { useEffect, useRef, useState } from 'react'; +import { useCallback, useEffect, useRef, useState } from 'react'; import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; import { useGastownTRPC, gastownWsUrl } from '@/lib/gastown/trpc'; + import { useSidebar } from '@/components/ui/sidebar'; import { useTerminalBar } from './TerminalBarContext'; -import { ChevronDown, ChevronUp, Crown, Terminal as TerminalIcon, X } from 'lucide-react'; +import { ChevronDown, ChevronUp, Crown, Activity, Terminal as TerminalIcon, X } from 'lucide-react'; import { motion, AnimatePresence } from 'motion/react'; import type { Terminal } from '@xterm/xterm'; import type { FitAddon } from '@xterm/addon-fit'; @@ -35,6 +36,7 @@ export function TerminalBar({ townId }: TerminalBarProps) { const sidebarLeft = isMobile ? '0px' : sidebarState === 'expanded' ? '16rem' : '3rem'; const allTabs = [ + { id: 'status', label: 'Status', kind: 'status' as const, agentId: '' }, { id: 'mayor', label: 'Mayor', kind: 'mayor' as const, agentId: '' }, ...agentTabs, ]; @@ -93,8 +95,11 @@ export function TerminalBar({ townId }: TerminalBarProps) { {isMayor && ( )} + {tab.kind === 'status' && ( + + )} {tab.label} - {!isMayor && ( + {!isMayor && tab.kind !== 'status' && (