diff --git a/cloudflare-gastown/AGENTS.md b/cloudflare-gastown/AGENTS.md index 563a9a66fc..f06633c061 100644 --- a/cloudflare-gastown/AGENTS.md +++ b/cloudflare-gastown/AGENTS.md @@ -8,6 +8,37 @@ ## Durable Objects - Each DO module must export a `get{ClassName}Stub` helper function (e.g. `getRigDOStub`) that centralizes how that DO namespace creates instances. Callers should use this helper instead of accessing the namespace binding directly. +- **Sub-modules for large DOs**: When a Durable Object grows beyond a few hundred lines, extract domain logic into sub-modules under a `/` directory alongside the DO file. For example, `Town.do.ts` delegates to modules in `town/`: + + ``` + dos/ + Town.do.ts # Class definition, RPC methods, alarm loop + town/ + agents.ts # Agent CRUD, hook management + beads.ts # Bead CRUD, convoy progress + scheduling.ts # Agent dispatch, pending work scheduling + review-queue.ts # Review lifecycle, recovery + patrol.ts # Zombie detection, stale hook recovery + config.ts # Town configuration + rigs.ts # Rig registry + mail.ts # Inter-agent mail + container-dispatch.ts # Container start/stop/status + ``` + + Each sub-module exports plain functions (not classes) that accept `SqlStorage` and any other required context as arguments. The DO imports them with the `import * as X` pattern: + + ```ts + import * as beadOps from './town/beads'; + import * as agents from './town/agents'; + import * as scheduling from './town/scheduling'; + + // In the DO class: + beadOps.updateBeadStatus(this.sql, beadId, 'closed', agentId); + agents.getOrCreateAgent(this.sql, 'polecat', rigId, this.townId); + await scheduling.schedulePendingWork(this.schedulingCtx); + ``` + + This keeps the DO class thin (RPC surface + orchestration) while sub-modules own the business logic. The `import * as X` pattern makes call sites self-documenting — you can always tell which domain a function belongs to. ## IO boundaries diff --git a/cloudflare-gastown/container/Dockerfile b/cloudflare-gastown/container/Dockerfile index 4c653a7485..0916754f59 100644 --- a/cloudflare-gastown/container/Dockerfile +++ b/cloudflare-gastown/container/Dockerfile @@ -44,6 +44,14 @@ RUN cd /opt/gastown-plugin && npm install --omit=dev && \ ln -s /opt/gastown-plugin/index.ts /home/agent/.config/kilo/plugins/gastown.ts && \ chown -R agent:agent /home/agent/.config +# ── Git config for agent user ─────────────────────────────────────── +# Skip LFS smudge filter: agents don't need binary assets and LFS +# downloads can fail when credentials don't cover the batch endpoint. +# Also disable LFS fetch entirely so clone/worktree never stalls. +RUN printf '[filter "lfs"]\n\tsmudge = git-lfs smudge --skip -- %%f\n\tprocess = git-lfs filter-process --skip\n\tclean = git-lfs clean -- %%f\n\trequired = true\n[lfs]\n\tfetchexclude = *\n' \ + > /home/agent/.gitconfig && \ + chown agent:agent /home/agent/.gitconfig + WORKDIR /app # ── Install production deps via pnpm ──────────────────────────────── diff --git a/cloudflare-gastown/container/src/git-manager.ts b/cloudflare-gastown/container/src/git-manager.ts index 24b312d43c..a65d34370b 100644 --- a/cloudflare-gastown/container/src/git-manager.ts +++ b/cloudflare-gastown/container/src/git-manager.ts @@ -1,4 +1,4 @@ -import { mkdir, realpath, rm, stat } from 'node:fs/promises'; +import { mkdir, realpath, rm, stat, writeFile } from 'node:fs/promises'; import { join, resolve } from 'node:path'; import type { CloneOptions, WorktreeOptions } from './types'; @@ -105,6 +105,49 @@ function authenticateGitUrl(gitUrl: string, envVars?: Record): s return gitUrl; } +/** + * Configure a credential-store helper on the bare repo so that worktree + * operations (checkout, reset, lfs smudge) can resolve credentials + * through the standard git credential chain. + * + * Without this, git-lfs smudge filters triggered by `git worktree add` + * or `git reset --hard` fail with "Smudge error" because the LFS batch + * API request has no credentials. The token is embedded in the remote + * URL, but some git-lfs versions require the credential helper for the + * LFS batch endpoint (which uses a different URL path). + */ +async function configureRepoCredentials( + repoDir: string, + gitUrl: string, + envVars?: Record +): Promise { + if (!envVars) return; + + const token = envVars.GIT_TOKEN ?? envVars.GITHUB_TOKEN; + const gitlabToken = envVars.GITLAB_TOKEN; + if (!token && !gitlabToken) return; + + try { + const url = new URL(gitUrl); + const credentialLine = + gitlabToken && (url.hostname.includes('gitlab') || envVars.GITLAB_INSTANCE_URL) + ? `https://oauth2:${gitlabToken}@${url.hostname}` + : token + ? `https://x-access-token:${token}@${url.hostname}` + : null; + + if (!credentialLine) return; + + // Write to a per-repo credential file outside the repo itself + const credFile = `/tmp/.git-credentials-repo-${repoDir.replace(/[^a-zA-Z0-9]/g, '-')}`; + await writeFile(credFile, credentialLine + '\n', { mode: 0o600 }); + + await exec('git', ['config', 'credential.helper', `store --file=${credFile}`], repoDir); + } catch (err) { + console.warn(`Failed to configure repo credentials for ${repoDir}:`, err); + } +} + /** * Validate a branch name — block control characters and shell metacharacters. */ @@ -148,6 +191,11 @@ async function exec(cmd: string, args: string[], cwd?: string): Promise // Public repos clone without auth; private repos fail fast with // a clear error instead of hanging on a username prompt. GIT_TERMINAL_PROMPT: '0', + // Skip LFS smudge filter during checkout/worktree operations. + // Agents don't need binary assets (videos, images, etc.) and + // LFS downloads can fail when the credential helper doesn't + // cover the LFS batch endpoint, blocking worktree creation. + GIT_LFS_SKIP_SMUDGE: '1', }, }); @@ -211,6 +259,7 @@ async function cloneRepoInner( await exec('git', ['remote', 'set-url', 'origin', authUrl], dir).catch(err => { console.warn(`Failed to update remote URL for rig ${options.rigId}:`, err); }); + await configureRepoCredentials(dir, options.gitUrl, options.envVars); await exec('git', ['fetch', '--all', '--prune'], dir); console.log(`Fetched latest for rig ${options.rigId}`); return dir; @@ -228,6 +277,7 @@ async function cloneRepoInner( await mkdir(dir, { recursive: true }); await exec('git', ['clone', '--no-checkout', '--branch', options.defaultBranch, authUrl, dir]); + await configureRepoCredentials(dir, options.gitUrl, options.envVars); console.log(`Cloned repo for rig ${options.rigId}`); return dir; } diff --git a/cloudflare-gastown/scripts/monitor-town.sh b/cloudflare-gastown/scripts/monitor-town.sh new file mode 100755 index 0000000000..8b21095adb --- /dev/null +++ b/cloudflare-gastown/scripts/monitor-town.sh @@ -0,0 +1,84 @@ +#!/bin/bash +# Continuously monitor a town's state via the debug endpoint. +# Usage: ./scripts/monitor-town.sh [townId] [interval_seconds] + +TOWN_ID="${1:-8a6f9375-b806-4ee0-ad6e-1697ea2dbfff}" +INTERVAL="${2:-15}" +BASE_URL="${GASTOWN_URL:-https://gastown.kiloapps.io}" +URL="${BASE_URL}/debug/towns/${TOWN_ID}/status" + +echo "Monitoring town ${TOWN_ID} every ${INTERVAL}s" +echo "Endpoint: ${URL}" +echo "Press Ctrl+C to stop" +echo "==========================================" + +while true; do + RESP=$(curl -s --max-time 10 "${URL}" 2>/dev/null) + if [ -z "$RESP" ]; then + echo "$(date -u +%H:%M:%S) [ERROR] No response from ${URL}" + sleep "$INTERVAL" + continue + fi + + echo "$RESP" | python3 -c " +import sys, json, datetime + +try: + d = json.load(sys.stdin) +except: + print('$(date -u +%H:%M:%S) [ERROR] Invalid JSON response') + sys.exit(0) + +ts = datetime.datetime.utcnow().strftime('%H:%M:%S') +alarm = d.get('alarmStatus', {}) +agents_info = alarm.get('agents', {}) +beads_info = alarm.get('beads', {}) +patrol_info = alarm.get('patrol', {}) +events = alarm.get('recentEvents', []) + +working = agents_info.get('working', 0) +idle = agents_info.get('idle', 0) +op = beads_info.get('open', 0) +ip = beads_info.get('inProgress', 0) +ir = beads_info.get('inReview', 0) +failed = beads_info.get('failed', 0) +orphaned = patrol_info.get('orphanedHooks', 0) + +# Agent details +agents = d.get('agentMeta', []) +hooked_agents = [a for a in agents if a.get('current_hook_bead_id')] +refinery = [a for a in agents if a.get('role') == 'refinery'] + +# Non-terminal beads +beads = d.get('beadSummary', []) + +print(f'{ts} W={working} I={idle} | open={op} prog={ip} review={ir} fail={failed} | hooks={orphaned} hooked={len(hooked_agents)}') + +# Show refinery state +for r in refinery: + hook = r.get('current_hook_bead_id', 'NULL') or 'NULL' + print(f' refinery: status={r.get(\"status\",\"?\"):8s} hook={hook[:12]:12s} dispatch={r.get(\"dispatch_attempts\",0)}') + +# Show non-terminal beads +if beads: + for b in beads[:8]: + assignee = str(b.get('assignee_agent_bead_id', '') or '')[:8] + print(f' {b.get(\"status\",\"?\"):12s} {b.get(\"type\",\"?\"):16s} {str(b.get(\"bead_id\",\"\"))[:8]} agent={assignee:8s} {str(b.get(\"title\",\"\"))[:50]}') + if len(beads) > 8: + print(f' ... and {len(beads) - 8} more') + +# Show most recent event +if events: + e = events[0] + print(f' last: {e.get(\"time\",\"\")[:19]} {e.get(\"type\",\"\"):20s} {e.get(\"message\",\"\")[:70]}') + +# Show review outcomes +review_events = [e for e in events if e.get('type') == 'review_completed'] +for e in review_events[:2]: + print(f' REVIEW: {e.get(\"time\",\"\")[:19]} {e.get(\"message\",\"\")[:70]}') + +print() +" 2>/dev/null + + sleep "$INTERVAL" +done diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index d73cfdded9..e6a0400578 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -26,13 +26,13 @@ import * as config from './town/config'; import * as rigs from './town/rigs'; import * as dispatch from './town/container-dispatch'; import * as patrol from './town/patrol'; +import * as scheduling from './town/scheduling'; import { GitHubPRStatusSchema, GitLabMRStatusSchema } from '../util/platform-pr.util'; // Table imports for beads-centric operations import { beads, BeadRecord, - AgentBeadRecord, EscalationBeadRecord, ConvoyBeadRecord, } from '../db/tables/beads.table'; @@ -115,8 +115,6 @@ function formatEventMessage(row: Record): string { // Alarm intervals const ACTIVE_ALARM_INTERVAL_MS = 5_000; // 5s when agents are active const IDLE_ALARM_INTERVAL_MS = 1 * 60_000; // 1m when idle -const DISPATCH_COOLDOWN_MS = 2 * 60_000; // 2 min — skip agents with recent dispatch activity -const MAX_DISPATCH_ATTEMPTS = 5; // Escalation constants const STALE_ESCALATION_THRESHOLD_MS = 4 * 60 * 60 * 1000; @@ -240,6 +238,20 @@ export class TownDO extends DurableObject { writeEvent(this.env, { ...data, delivery: 'internal', userId: this._ownerUserId }); } + /** Build the context object used by the scheduling sub-module. */ + private get schedulingCtx(): Parameters[0] { + return { + sql: this.sql, + env: this.env, + storage: this.ctx.storage, + townId: this.townId, + getTownConfig: () => this.getTownConfig(), + getRigConfig: (rigId: string) => this.getRigConfig(rigId), + resolveKilocodeToken: () => this.resolveKilocodeToken(), + emitEvent: data => this.emitEvent(data), + }; + } + // ── WebSocket: status broadcast ────────────────────────────────────── /** @@ -252,7 +264,6 @@ export class TownDO extends DurableObject { url.pathname.endsWith('/status/ws') && request.headers.get('Upgrade')?.toLowerCase() === 'websocket' ) { - await this.ensureInitialized(); const pair = new WebSocketPair(); const [client, server] = [pair[0], pair[1]]; this.ctx.acceptWebSocket(server, ['status']); @@ -393,7 +404,6 @@ export class TownDO extends DurableObject { * Called by the mayor via the /mayor/ui-action HTTP route. */ async broadcastUiAction(action: UiAction): Promise { - await this.ensureInitialized(); const sockets = this.ctx.getWebSockets('status'); if (sockets.length === 0) return; const frame = JSON.stringify({ channel: 'ui_action', action, ts: now() }); @@ -557,12 +567,10 @@ export class TownDO extends DurableObject { gitUrl: string; defaultBranch: string; }): Promise { - await this.ensureInitialized(); return rigs.addRig(this.sql, input); } async removeRig(rigId: string): Promise { - await this.ensureInitialized(); rigs.removeRig(this.sql, rigId); await this.ctx.storage.delete(`rig:${rigId}:config`); // Delete all beads belonging to this rig (cascades to satellite tables via deleteBead) @@ -581,12 +589,10 @@ export class TownDO extends DurableObject { } async listRigs(): Promise { - await this.ensureInitialized(); return rigs.listRigs(this.sql); } async getRigAsync(rigId: string): Promise { - await this.ensureInitialized(); return rigs.getRig(this.sql, rigId); } @@ -596,9 +602,6 @@ export class TownDO extends DurableObject { console.log( `${TOWN_LOG} configureRig: rigId=${rigConfig.rigId} hasKilocodeToken=${!!rigConfig.kilocodeToken}` ); - if (rigConfig.townId) { - await this.setTownId(rigConfig.townId); - } await this.ctx.storage.put(`rig:${rigConfig.rigId}:config`, rigConfig); if (rigConfig.kilocodeToken) { @@ -696,7 +699,6 @@ export class TownDO extends DurableObject { // ══════════════════════════════════════════════════════════════════ async createBead(input: CreateBeadInput): Promise { - await this.ensureInitialized(); const bead = beadOps.createBead(this.sql, input); this.emitEvent({ event: 'bead.created', @@ -716,17 +718,14 @@ export class TownDO extends DurableObject { } async getBeadAsync(beadId: string): Promise { - await this.ensureInitialized(); return beadOps.getBead(this.sql, beadId); } async listBeads(filter: BeadFilter): Promise { - await this.ensureInitialized(); return beadOps.listBeads(this.sql, filter); } async updateBeadStatus(beadId: string, status: string, agentId: string): Promise { - await this.ensureInitialized(); // Convoy progress is updated automatically inside beadOps.updateBeadStatus // when the bead reaches a terminal status (closed/failed). const bead = beadOps.updateBeadStatus(this.sql, beadId, status, agentId); @@ -792,7 +791,6 @@ export class TownDO extends DurableObject { } async deleteBead(beadId: string): Promise { - await this.ensureInitialized(); beadOps.deleteBead(this.sql, beadId); } @@ -801,7 +799,6 @@ export class TownDO extends DurableObject { since?: string; limit?: number; }): Promise { - await this.ensureInitialized(); return beadOps.listBeadEvents(this.sql, options); } @@ -822,7 +819,6 @@ export class TownDO extends DurableObject { }>, actorId: string ): Promise { - await this.ensureInitialized(); const bead = beadOps.updateBeadFields(this.sql, beadId, fields, actorId); // When a bead closes via field update, check for newly unblocked beads @@ -839,8 +835,6 @@ export class TownDO extends DurableObject { * Writes a bead_event for auditability. */ async resetAgent(agentId: string): Promise { - await this.ensureInitialized(); - const agent = agents.getAgent(this.sql, agentId); if (!agent) throw new Error(`Agent ${agentId} not found`); @@ -879,8 +873,6 @@ export class TownDO extends DurableObject { convoyId: string, fields: Partial<{ merge_mode: ConvoyMergeMode; feature_branch: string }> ): Promise { - await this.ensureInitialized(); - const convoy = this.getConvoy(convoyId); if (!convoy) return null; @@ -925,32 +917,26 @@ export class TownDO extends DurableObject { // ══════════════════════════════════════════════════════════════════ async registerAgent(input: RegisterAgentInput): Promise { - await this.ensureInitialized(); return agents.registerAgent(this.sql, input); } async getAgentAsync(agentId: string): Promise { - await this.ensureInitialized(); return agents.getAgent(this.sql, agentId); } async getAgentByIdentity(identity: string): Promise { - await this.ensureInitialized(); return agents.getAgentByIdentity(this.sql, identity); } async listAgents(filter?: AgentFilter): Promise { - await this.ensureInitialized(); return agents.listAgents(this.sql, filter); } async updateAgentStatus(agentId: string, status: string): Promise { - await this.ensureInitialized(); agents.updateAgentStatus(this.sql, agentId, status); } async deleteAgent(agentId: string): Promise { - await this.ensureInitialized(); agents.deleteAgent(this.sql, agentId); try { const agentDO = getAgentDOStub(this.env, agentId); @@ -961,23 +947,19 @@ export class TownDO extends DurableObject { } async hookBead(agentId: string, beadId: string): Promise { - await this.ensureInitialized(); agents.hookBead(this.sql, agentId, beadId); await this.armAlarmIfNeeded(); } async unhookBead(agentId: string): Promise { - await this.ensureInitialized(); agents.unhookBead(this.sql, agentId); } async getHookedBead(agentId: string): Promise { - await this.ensureInitialized(); return agents.getHookedBead(this.sql, agentId); } async getOrCreateAgent(role: AgentRole, rigId: string): Promise { - await this.ensureInitialized(); return agents.getOrCreateAgent(this.sql, role, rigId, this.townId); } @@ -996,30 +978,25 @@ export class TownDO extends DurableObject { // ── Prime & Checkpoint ──────────────────────────────────────────── async prime(agentId: string): Promise { - await this.ensureInitialized(); return agents.prime(this.sql, agentId); } async writeCheckpoint(agentId: string, data: unknown): Promise { - await this.ensureInitialized(); agents.writeCheckpoint(this.sql, agentId, data); } async readCheckpoint(agentId: string): Promise { - await this.ensureInitialized(); return agents.readCheckpoint(this.sql, agentId); } // ── Heartbeat ───────────────────────────────────────────────────── async touchAgentHeartbeat(agentId: string): Promise { - await this.ensureInitialized(); agents.touchAgent(this.sql, agentId); await this.armAlarmIfNeeded(); } async updateAgentStatusMessage(agentId: string, message: string): Promise { - await this.ensureInitialized(); agents.updateAgentStatusMessage(this.sql, agentId, message); const agent = agents.getAgent(this.sql, agentId); if (agent?.current_hook_bead_id) { @@ -1046,12 +1023,10 @@ export class TownDO extends DurableObject { // ══════════════════════════════════════════════════════════════════ async sendMail(input: SendMailInput): Promise { - await this.ensureInitialized(); mail.sendMail(this.sql, input); } async checkMail(agentId: string): Promise { - await this.ensureInitialized(); return mail.checkMail(this.sql, agentId); } @@ -1074,8 +1049,6 @@ export class TownDO extends DurableObject { ttlSeconds?: number; } ): Promise { - await this.ensureInitialized(); - const nudgeId = crypto.randomUUID(); const mode = options?.mode ?? 'wait-idle'; const priority = options?.priority ?? 'normal'; @@ -1143,8 +1116,6 @@ export class TownDO extends DurableObject { ): Promise< { nudge_id: string; message: string; mode: string; priority: string; source: string }[] > { - await this.ensureInitialized(); - const rows = [ ...query( this.sql, @@ -1180,8 +1151,6 @@ export class TownDO extends DurableObject { /** Mark a nudge as delivered. */ async markNudgeDelivered(nudgeId: string): Promise { - await this.ensureInitialized(); - query( this.sql, /* sql */ ` @@ -1198,8 +1167,6 @@ export class TownDO extends DurableObject { * Called from the alarm loop. Returns the count of nudges expired. */ async expireStaleNudges(): Promise { - await this.ensureInitialized(); - const result = [ ...query( this.sql, @@ -1223,7 +1190,6 @@ export class TownDO extends DurableObject { // ══════════════════════════════════════════════════════════════════ async submitToReviewQueue(input: ReviewQueueInput): Promise { - await this.ensureInitialized(); reviewQueue.submitToReviewQueue(this.sql, input); this.emitEvent({ event: 'review.submitted', @@ -1235,12 +1201,10 @@ export class TownDO extends DurableObject { } async popReviewQueue(): Promise { - await this.ensureInitialized(); return reviewQueue.popReviewQueue(this.sql); } async completeReview(entryId: string, status: 'merged' | 'failed'): Promise { - await this.ensureInitialized(); reviewQueue.completeReview(this.sql, entryId, status); } @@ -1250,8 +1214,6 @@ export class TownDO extends DurableObject { message?: string; commit_sha?: string; }): Promise { - await this.ensureInitialized(); - // Resolve the source bead ID before completing the review, so we can // trigger dispatchUnblockedBeads for it after the MR closes. const mrBead = beadOps.getBead(this.sql, input.entry_id); @@ -1280,39 +1242,13 @@ export class TownDO extends DurableObject { }); } - // When a review fails or conflicts (rework), the source bead was - // returned to in_progress. Re-hook a polecat and re-dispatch so the - // rework starts automatically. The original polecat may already be - // working on something else, so fall back to getOrCreateAgent. - if ((input.status === 'failed' || input.status === 'conflict') && sourceBeadId) { - const sourceBead = beadOps.getBead(this.sql, sourceBeadId); - if (sourceBead?.rig_id) { - try { - const reworkAgent = agents.getOrCreateAgent( - this.sql, - 'polecat', - sourceBead.rig_id, - this.townId - ); - agents.hookBead(this.sql, reworkAgent.id, sourceBeadId); - this.dispatchAgent(reworkAgent, sourceBead).catch(err => - console.error( - `${TOWN_LOG} completeReviewWithResult: fire-and-forget rework dispatch failed for bead=${sourceBeadId}`, - err - ) - ); - } catch (err) { - console.warn( - `${TOWN_LOG} completeReviewWithResult: could not dispatch rework for bead=${sourceBeadId}:`, - err - ); - } - } - } + // Rework is handled by the normal scheduling path: the failed/conflict + // path in completeReviewWithResult sets the source bead to 'open' with + // assignee cleared. feedStrandedConvoys or rehookOrphanedBeads will + // hook a polecat, and schedulePendingWork will dispatch it. } async agentDone(agentId: string, input: AgentDoneInput): Promise { - await this.ensureInitialized(); reviewQueue.agentDone(this.sql, agentId, input); await this.armAlarmIfNeeded(); } @@ -1321,7 +1257,6 @@ export class TownDO extends DurableObject { agentId: string, input: { status: 'completed' | 'failed'; reason?: string } ): Promise { - await this.ensureInitialized(); let resolvedAgentId = agentId; if (!resolvedAgentId) { const mayor = agents.listAgents(this.sql, { role: 'mayor' })[0]; @@ -1379,7 +1314,6 @@ export class TownDO extends DurableObject { action: string; resolution_notes: string; }): Promise { - await this.ensureInitialized(); const triageBead = beadOps.getBead(this.sql, input.triage_request_bead_id); if (!triageBead) throw new Error(`Triage request bead ${input.triage_request_bead_id} not found`); @@ -1605,19 +1539,16 @@ export class TownDO extends DurableObject { } async createMolecule(beadId: string, formula: unknown): Promise { - await this.ensureInitialized(); return reviewQueue.createMolecule(this.sql, beadId, formula); } async getMoleculeCurrentStep( agentId: string ): Promise<{ molecule: Molecule; step: unknown } | null> { - await this.ensureInitialized(); return reviewQueue.getMoleculeCurrentStep(this.sql, agentId); } async advanceMoleculeStep(agentId: string, summary: string): Promise { - await this.ensureInitialized(); return reviewQueue.advanceMoleculeStep(this.sql, agentId, summary); } @@ -1632,8 +1563,6 @@ export class TownDO extends DurableObject { priority?: string; metadata?: Record; }): Promise<{ bead: Bead; agent: Agent }> { - await this.ensureInitialized(); - const createdBead = beadOps.createBead(this.sql, { type: 'issue', title: input.title, @@ -1686,7 +1615,6 @@ export class TownDO extends DurableObject { _model?: string, uiContext?: string ): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> { - await this.ensureInitialized(); const townId = this.townId; let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; @@ -1771,7 +1699,6 @@ export class TownDO extends DurableObject { * without requiring the user to send a message first. */ async ensureMayor(): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> { - await this.ensureInitialized(); const townId = this.townId; let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; @@ -1855,7 +1782,6 @@ export class TownDO extends DurableObject { lastActivityAt: string; } | null; }> { - await this.ensureInitialized(); const mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; const mapStatus = (agentStatus: string): 'idle' | 'active' | 'starting' => { @@ -1914,7 +1840,6 @@ export class TownDO extends DurableObject { beads: Array<{ bead_id: string; rig_id: string }>; created_by?: string; }): Promise { - await this.ensureInitialized(); const parsed = z .object({ title: z.string().min(1), @@ -1996,8 +1921,6 @@ export class TownDO extends DurableObject { } async onBeadClosed(input: { convoyId: string; beadId: string }): Promise { - await this.ensureInitialized(); - // Count closed tracked beads const closedRows = [ ...query( @@ -2063,8 +1986,6 @@ export class TownDO extends DurableObject { * still assigned to those beads so they return to the idle pool. */ async closeConvoy(convoyId: string): Promise { - await this.ensureInitialized(); - const convoy = this.getConvoy(convoyId); if (!convoy) return null; @@ -2152,8 +2073,6 @@ export class TownDO extends DurableObject { merge_mode?: 'review-then-land' | 'review-and-merge'; staged?: boolean; }): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent | null }> }> { - await this.ensureInitialized(); - // Resolve staged: explicit request wins, otherwise fall back to town config default. const townConfig = await this.getTownConfig(); const isStaged = input.staged ?? townConfig.staged_convoys_default; @@ -2362,8 +2281,6 @@ export class TownDO extends DurableObject { async startConvoy( convoyId: string ): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent }> }> { - await this.ensureInitialized(); - const convoy = this.getConvoy(convoyId); if (!convoy) throw new Error(`Convoy not found: ${convoyId}`); if (!convoy.staged) throw new Error(`Convoy is not staged: ${convoyId}`); @@ -2458,7 +2375,6 @@ export class TownDO extends DurableObject { * List active convoys with progress counts. */ async listConvoys(): Promise { - await this.ensureInitialized(); const rows = [ ...query( this.sql, @@ -2492,7 +2408,6 @@ export class TownDO extends DurableObject { } > > { - await this.ensureInitialized(); const convoys = await this.listConvoys(); const detailed = []; for (const convoy of convoys) { @@ -2521,7 +2436,6 @@ export class TownDO extends DurableObject { }) | null > { - await this.ensureInitialized(); const convoy = this.getConvoy(convoyId); if (!convoy) return null; @@ -2579,7 +2493,6 @@ export class TownDO extends DurableObject { // ══════════════════════════════════════════════════════════════════ async acknowledgeEscalation(escalationId: string): Promise { - await this.ensureInitialized(); query( this.sql, /* sql */ ` @@ -2606,7 +2519,6 @@ export class TownDO extends DurableObject { } async listEscalations(filter?: { acknowledged?: boolean }): Promise { - await this.ensureInitialized(); const rows = filter?.acknowledged !== undefined ? [ @@ -2634,7 +2546,6 @@ export class TownDO extends DurableObject { category?: string; message: string; }): Promise { - await this.ensureInitialized(); const beadId = generateId(); const timestamp = now(); @@ -2787,7 +2698,6 @@ export class TownDO extends DurableObject { return; } - await this.ensureInitialized(); const townId = this.townId; console.log(`${TOWN_LOG} alarm: fired for town=${townId}`); @@ -2812,59 +2722,68 @@ export class TownDO extends DurableObject { } } - // Process reviews FIRST so the refinery gets assigned before the - // scheduler dispatches new polecats. This prevents downstream beads - // from starting before upstream reviews are merged. - try { - await this.processReviewQueue(); - } catch (err) { - console.error(`${TOWN_LOG} alarm: processReviewQueue failed`, err); - Sentry.captureException(err); - } - try { - await this.processConvoyLandings(); - } catch (err) { - console.error(`${TOWN_LOG} alarm: processConvoyLandings failed`, err); - Sentry.captureException(err); - } - try { - await this.schedulePendingWork(); - } catch (err) { - console.error(`${TOWN_LOG} alarm: schedulePendingWork failed`, err); - Sentry.captureException(err); - } - try { - await this.witnessPatrol(); - } catch (err) { - console.error(`${TOWN_LOG} alarm: witnessPatrol failed`, err); - Sentry.captureException(err); - } - try { - this.deaconPatrol(); - } catch (err) { - console.error(`${TOWN_LOG} alarm: deaconPatrol failed`, err); - Sentry.captureException(err); - } - try { - await this.deliverPendingMail(); - } catch (err) { - console.warn(`${TOWN_LOG} alarm: deliverPendingMail failed`, err); - } - try { - await this.expireStaleNudges(); - } catch (err) { - console.warn(`${TOWN_LOG} alarm: expireStaleNudges failed`, err); - } - try { - await this.reEscalateStaleEscalations(); - } catch (err) { - console.warn(`${TOWN_LOG} alarm: reEscalation failed`, err); - } - try { - await this.maybeDispatchTriageAgent(); - } catch (err) { - console.warn(`${TOWN_LOG} alarm: maybeDispatchTriageAgent failed`, err); - } + // ── Phase 1: Patrols (detect dead agents, recover stale state) ─── + // Patrols run first so that zombie agents are reset to idle and + // stale hooks are cleared before the scheduler tries to dispatch. + // This lets recovered agents dispatch in the same alarm tick + // instead of waiting for the next one. + await Promise.allSettled([ + this.witnessPatrol().catch(err => { + console.error(`${TOWN_LOG} alarm: witnessPatrol failed`, err); + Sentry.captureException(err); + }), + // deaconPatrol is sync — wrap in a resolved promise for allSettled + Promise.resolve().then(() => { + try { + this.deaconPatrol(); + } catch (err) { + console.error(`${TOWN_LOG} alarm: deaconPatrol failed`, err); + Sentry.captureException(err); + } + }), + ]); + + // ── Phase 2: Review pipeline + scheduling (dispatches agents) ── + // processReviewQueue and processConvoyLandings share the review + // queue, so they run sequentially. schedulePendingWork runs in + // parallel — it only handles non-refinery agents and reads + // disjoint state. + await Promise.allSettled([ + (async () => { + try { + await this.processReviewQueue(); + } catch (err) { + console.error(`${TOWN_LOG} alarm: processReviewQueue failed`, err); + Sentry.captureException(err); + } + try { + await this.processConvoyLandings(); + } catch (err) { + console.error(`${TOWN_LOG} alarm: processConvoyLandings failed`, err); + Sentry.captureException(err); + } + })(), + scheduling.schedulePendingWork(this.schedulingCtx).catch(err => { + console.error(`${TOWN_LOG} alarm: schedulePendingWork failed`, err); + Sentry.captureException(err); + }), + ]); + + // ── Phase 3: Housekeeping (independent, all parallelizable) ──── + await Promise.allSettled([ + this.deliverPendingMail().catch(err => + console.warn(`${TOWN_LOG} alarm: deliverPendingMail failed`, err) + ), + this.expireStaleNudges().catch(err => + console.warn(`${TOWN_LOG} alarm: expireStaleNudges failed`, err) + ), + this.reEscalateStaleEscalations().catch(err => + console.warn(`${TOWN_LOG} alarm: reEscalation failed`, err) + ), + this.maybeDispatchTriageAgent().catch(err => + console.warn(`${TOWN_LOG} alarm: maybeDispatchTriageAgent failed`, err) + ), + ]); // Re-arm: fast when active, slow when idle const active = this.hasActiveWork(); const interval = active ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; @@ -2902,305 +2821,21 @@ export class TownDO extends DurableObject { } private hasActiveWork(): boolean { - const activeAgentRows = [ - ...query( - this.sql, - /* sql */ `SELECT COUNT(*) as cnt FROM ${agent_metadata} WHERE ${agent_metadata.status} IN ('working', 'stalled')`, - [] - ), - ]; - const pendingBeadRows = [ - ...query( - this.sql, - /* sql */ `SELECT COUNT(*) as cnt FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'idle' AND ${agent_metadata.current_hook_bead_id} IS NOT NULL`, - [] - ), - ]; - const pendingReviewRows = [ - ...query( - this.sql, - /* sql */ `SELECT COUNT(*) as cnt FROM ${beads} WHERE ${beads.type} = 'merge_request' AND ${beads.status} IN ('open', 'in_progress')`, - [] - ), - ]; - const pendingTriageRows = [ - ...query( - this.sql, - /* sql */ `SELECT COUNT(*) as cnt FROM ${beads} WHERE ${beads.type} = 'issue' AND ${beads.labels} LIKE ? AND ${beads.status} = 'open'`, - [patrol.TRIAGE_LABEL_LIKE] - ), - ]; - return ( - Number(activeAgentRows[0]?.cnt ?? 0) > 0 || - Number(pendingBeadRows[0]?.cnt ?? 0) > 0 || - Number(pendingReviewRows[0]?.cnt ?? 0) > 0 || - Number(pendingTriageRows[0]?.cnt ?? 0) > 0 - ); + return scheduling.hasActiveWork(this.sql); } - /** - * Dispatch a single agent to the container. Used for eager dispatch from - * slingBead (so agents start immediately) and from schedulePendingWork - * (periodic recovery). Returns true if the agent was started. - */ - private async dispatchAgent(agent: Agent, bead: Bead): Promise { - try { - const rigId = agent.rig_id ?? rigs.listRigs(this.sql)[0]?.id ?? ''; - const rigConfig = rigId ? await this.getRigConfig(rigId) : null; - if (!rigConfig) { - console.warn(`${TOWN_LOG} dispatchAgent: no rig config for agent=${agent.id} rig=${rigId}`); - return false; - } - - const townConfig = await this.getTownConfig(); - const kilocodeToken = await this.resolveKilocodeToken(); - - // Check if this bead belongs to a convoy and resolve its feature branch. - // Convoy beads branch from the feature branch, not from defaultBranch. - const convoyId = beadOps.getConvoyForBead(this.sql, bead.bead_id); - const convoyFeatureBranch = convoyId - ? beadOps.getConvoyFeatureBranch(this.sql, convoyId) - : null; - - // Transition the bead to in_progress BEFORE starting the container. - // This must happen synchronously within the DO's I/O gate — the - // fire-and-forget pattern used by slingBead/slingConvoy means the - // calling RPC may return before startAgentInContainer completes, - // closing the I/O gate and preventing further SQL writes. - const currentBead = beadOps.getBead(this.sql, bead.bead_id); - if ( - currentBead && - currentBead.status !== 'in_progress' && - currentBead.status !== 'closed' && - currentBead.status !== 'failed' - ) { - beadOps.updateBeadStatus(this.sql, bead.bead_id, 'in_progress', agent.id); - } - - // Set status to 'working' BEFORE the async container start. This - // must happen synchronously so the SQL write executes while the I/O - // gate is still open. When dispatchAgent is called fire-and-forget - // (from slingBead, slingConvoy, dispatchUnblockedBeads), any SQL - // writes after the first `await` may be silently dropped because - // the DO's RPC response closes the I/O gate. If the container fails - // to start, we roll back to 'idle'. - const timestamp = now(); - query( - this.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'working', - ${agent_metadata.columns.dispatch_attempts} = ${agent_metadata.columns.dispatch_attempts} + 1, - ${agent_metadata.columns.last_activity_at} = ? - WHERE ${agent_metadata.bead_id} = ? - `, - [timestamp, agent.id] - ); - - const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { - townId: this.townId, - rigId, - userId: rigConfig.userId, - agentId: agent.id, - agentName: agent.name, - role: agent.role, - identity: agent.identity, - beadId: bead.bead_id, - beadTitle: bead.title, - beadBody: bead.body ?? '', - checkpoint: agent.checkpoint, - gitUrl: rigConfig.gitUrl, - defaultBranch: rigConfig.defaultBranch, - kilocodeToken, - townConfig, - platformIntegrationId: rigConfig.platformIntegrationId, - convoyFeatureBranch: convoyFeatureBranch ?? undefined, - }); - - if (started) { - // Reset dispatch_attempts on success (best-effort — may be - // dropped if the I/O gate is already closed, but that's fine - // because the agent is already 'working'). - query( - this.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.dispatch_attempts} = 0 - WHERE ${agent_metadata.bead_id} = ? - `, - [agent.id] - ); - console.log(`${TOWN_LOG} dispatchAgent: started agent=${agent.name}(${agent.id})`); - this.emitEvent({ - event: 'agent.spawned', - townId: this.townId, - rigId, - agentId: agent.id, - beadId: bead.bead_id, - role: agent.role, - }); - } else { - // Container failed to start — roll back to idle - query( - this.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'idle' - WHERE ${agent_metadata.bead_id} = ? - `, - [agent.id] - ); - this.emitEvent({ - event: 'agent.dispatch_failed', - townId: this.townId, - rigId, - agentId: agent.id, - beadId: bead.bead_id, - role: agent.role, - }); - } - return started; - } catch (err) { - console.error(`${TOWN_LOG} dispatchAgent: failed for agent=${agent.id}:`, err); - Sentry.captureException(err, { extra: { agentId: agent.id, beadId: bead.bead_id } }); - // Roll back agent and bead to prevent them from being stuck in - // working/in_progress state when the container call throws. - try { - query( - this.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'idle' - WHERE ${agent_metadata.bead_id} = ? - `, - [agent.id] - ); - if (agent.current_hook_bead_id) { - beadOps.updateBeadStatus(this.sql, agent.current_hook_bead_id, 'open', agent.id); - } - } catch (rollbackErr) { - console.error(`${TOWN_LOG} dispatchAgent: rollback also failed:`, rollbackErr); - } - this.emitEvent({ - event: 'agent.dispatch_failed', - townId: this.townId, - agentId: agent.id, - beadId: bead.bead_id, - role: agent.role, - }); - return false; - } + /** Dispatch a single agent to the container. Delegates to scheduling module. */ + private dispatchAgent( + agent: Agent, + bead: Bead, + options?: { systemPromptOverride?: string } + ): Promise { + return scheduling.dispatchAgent(this.schedulingCtx, agent, bead, options); } - /** - * When a bead closes, find beads that were blocked by it and are now - * fully unblocked (all 'blocks' dependencies resolved). Dispatch their - * assigned agents. - */ + /** When a bead closes, dispatch any beads it was blocking. */ private dispatchUnblockedBeads(closedBeadId: string): void { - const unblockedIds = beadOps.getNewlyUnblockedBeads(this.sql, closedBeadId); - if (unblockedIds.length === 0) return; - - console.log( - `${TOWN_LOG} dispatchUnblockedBeads: ${unblockedIds.length} beads unblocked by ${closedBeadId}` - ); - - for (const beadId of unblockedIds) { - const bead = beadOps.getBead(this.sql, beadId); - if (!bead || bead.status === 'closed' || bead.status === 'failed') continue; - - // Find the agent hooked to this bead - if (!bead.assignee_agent_bead_id) continue; - const agent = agents.getAgent(this.sql, bead.assignee_agent_bead_id); - if (!agent || agent.status !== 'idle') continue; - - this.dispatchAgent(agent, bead).catch(err => - console.error( - `${TOWN_LOG} dispatchUnblockedBeads: fire-and-forget dispatch failed for bead=${beadId}`, - err - ) - ); - } - } - - /** - * Find idle agents with hooked beads and dispatch them to the container. - * Agents whose last_activity_at is within the dispatch cooldown are - * skipped — they have a fire-and-forget dispatch already in flight. - */ - private async schedulePendingWork(): Promise { - const cooldownCutoff = new Date(Date.now() - DISPATCH_COOLDOWN_MS).toISOString(); - const rows = [ - ...query( - this.sql, - /* sql */ ` - SELECT ${beads}.*, - ${agent_metadata.role}, ${agent_metadata.identity}, - ${agent_metadata.container_process_id}, - ${agent_metadata.status} AS status, - ${agent_metadata.current_hook_bead_id}, - ${agent_metadata.dispatch_attempts}, ${agent_metadata.last_activity_at}, - ${agent_metadata.checkpoint}, - ${agent_metadata.agent_status_message}, ${agent_metadata.agent_status_updated_at} - FROM ${beads} - INNER JOIN ${agent_metadata} ON ${beads.bead_id} = ${agent_metadata.bead_id} - WHERE ${agent_metadata.status} = 'idle' - AND ${agent_metadata.current_hook_bead_id} IS NOT NULL - AND (${agent_metadata.last_activity_at} IS NULL OR ${agent_metadata.last_activity_at} < ?) - `, - [cooldownCutoff] - ), - ]; - const pendingAgents: Agent[] = AgentBeadRecord.array() - .parse(rows) - .map(row => ({ - id: row.bead_id, - rig_id: row.rig_id, - role: row.role, - name: row.title, - identity: row.identity, - status: row.status, - current_hook_bead_id: row.current_hook_bead_id, - dispatch_attempts: row.dispatch_attempts, - last_activity_at: row.last_activity_at, - checkpoint: row.checkpoint, - created_at: row.created_at, - agent_status_message: row.agent_status_message, - agent_status_updated_at: row.agent_status_updated_at, - })); - - console.log(`${TOWN_LOG} schedulePendingWork: found ${pendingAgents.length} pending agents`); - if (pendingAgents.length === 0) return; - - const dispatchTasks: Array<() => Promise> = []; - - for (const agent of pendingAgents) { - const beadId = agent.current_hook_bead_id; - if (!beadId) continue; - const bead = beadOps.getBead(this.sql, beadId); - if (!bead) continue; - - if (agent.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { - beadOps.updateBeadStatus(this.sql, beadId, 'failed', agent.id); - agents.unhookBead(this.sql, agent.id); - continue; - } - - // Skip beads that still have unresolved 'blocks' dependencies — - // they'll be dispatched by dispatchUnblockedBeads when their - // blockers close. - if (beadOps.hasUnresolvedBlockers(this.sql, beadId)) { - continue; - } - - dispatchTasks.push(async () => { - await this.dispatchAgent(agent, bead); - }); - } - - if (dispatchTasks.length > 0) { - await Promise.allSettled(dispatchTasks.map(fn => fn())); - } + scheduling.dispatchUnblockedBeads(this.schedulingCtx, closedBeadId); } /** @@ -3232,27 +2867,62 @@ export class TownDO extends DurableObject { ), ]); - for (const working of workingAgents) { + // Check container status for all working agents in parallel to + // avoid serial network round-trips (one per agent). + const statusChecks = workingAgents.map(async working => { const agentId = working.bead_id; - const containerInfo = await dispatch.checkAgentContainerStatus(this.env, townId, agentId); + return { agentId, containerInfo }; + }); + const statusResults = await Promise.allSettled(statusChecks); + + for (const result of statusResults) { + if (result.status !== 'fulfilled') continue; + const { agentId, containerInfo } = result.value; if (containerInfo.status === 'not_found' || containerInfo.status === 'exited') { - if (containerInfo.exitReason === 'completed') { + const agent = agents.getAgent(this.sql, agentId); + if (!agent) continue; + + if (agent.role === 'refinery') { + // For refineries: set to idle, keep hook intact. + // - gt_done may still arrive and close the MR bead normally + // - recoverStuckReviews (guard: no WORKING agent) will recover + // after 30 min if gt_done never arrives + // - If MR bead is already closed (gt_done already ran), unhook + query( + this.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.dispatch_attempts} = 0 + WHERE ${agent_metadata.bead_id} = ? + `, + [agentId] + ); + if (agent.current_hook_bead_id) { + const mrBead = beadOps.getBead(this.sql, agent.current_hook_bead_id); + if (mrBead && (mrBead.status === 'closed' || mrBead.status === 'failed')) { + agents.unhookBead(this.sql, agentId); + } + } + } else if (containerInfo.exitReason === 'completed') { + // Non-refinery normal exit — route through agentCompleted reviewQueue.agentCompleted(this.sql, agentId, { status: 'completed' }); - continue; + } else { + // Non-refinery abnormal death — reset to idle, keep hook + // so schedulePendingWork re-dispatches on next tick + query( + this.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.dispatch_attempts} = 0 + WHERE ${agent_metadata.bead_id} = ? + `, + [agentId] + ); } - query( - this.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'idle', - ${agent_metadata.columns.last_activity_at} = ? - WHERE ${agent_metadata.bead_id} = ? - `, - [now(), agentId] - ); - continue; } } @@ -3325,6 +2995,9 @@ export class TownDO extends DurableObject { // ── Stranded convoy feeding ──────────────────────────────────── patrol.feedStrandedConvoys(this.sql, this.townId); + // ── Orphaned bead re-hooking ─────────────────────────────────── + patrol.rehookOrphanedBeads(this.sql, this.townId); + // ── Crash loop detection ─────────────────────────────────────── patrol.detectCrashLoops(this.sql); } @@ -3347,7 +3020,7 @@ export class TownDO extends DurableObject { patrol.TRIAGE_REQUEST_LABEL, patrol.TRIAGE_BATCH_LABEL ); - const cooldownCutoff = new Date(Date.now() - DISPATCH_COOLDOWN_MS).toISOString(); + const cooldownCutoff = new Date(Date.now() - scheduling.DISPATCH_COOLDOWN_MS).toISOString(); const existingBatch = [ ...query( this.sql, @@ -3504,10 +3177,63 @@ export class TownDO extends DurableObject { private async processReviewQueue(): Promise { reviewQueue.recoverStuckReviews(this.sql); reviewQueue.closeOrphanedReviewBeads(this.sql); + reviewQueue.recoverOrphanedSourceBeads(this.sql); // Poll open PRs created by the 'pr' strategy await this.pollPendingPRs(); + // Cleanup: if the refinery is hooked to a terminal MR bead (closed/failed), + // check if the container session has actually ended before making it + // available. The refinery may still be running (finishing its LLM turn + // after gt_done returned). If still running, skip — wait for + // agentCompleted to fire, then clean up on the next tick. + const existingRefinery = agents.listAgents(this.sql, { role: 'refinery' })[0]; + if (existingRefinery?.current_hook_bead_id) { + const hookedMr = beadOps.getBead(this.sql, existingRefinery.current_hook_bead_id); + if (hookedMr && (hookedMr.status === 'closed' || hookedMr.status === 'failed')) { + const containerStatus = await dispatch.checkAgentContainerStatus( + this.env, this.townId, existingRefinery.id + ); + if (containerStatus.status === 'running') { + // Session still active — don't unhook or pop new MR this tick + return; + } + agents.unhookBead(this.sql, existingRefinery.id); + agents.updateAgentStatus(this.sql, existingRefinery.id, 'idle'); + agents.writeCheckpoint(this.sql, existingRefinery.id, null); + console.log( + `${TOWN_LOG} processReviewQueue: unhooked refinery from terminal MR bead=${hookedMr.bead_id}` + ); + } + } + + // Retry: if the refinery is idle but still hooked to an in_progress + // MR bead, either the previous dispatch failed (container not ready) + // or the dispatch succeeded but we got a false negative (timeout). + // Check the container first — if the agent is running, just restore + // the working status. If not, re-dispatch. + const refineryForRetry = agents.listAgents(this.sql, { role: 'refinery' })[0]; + if ( + refineryForRetry?.status === 'idle' && + refineryForRetry.current_hook_bead_id + ) { + const hookedRetryMr = beadOps.getBead(this.sql, refineryForRetry.current_hook_bead_id); + if (hookedRetryMr?.status === 'in_progress' && hookedRetryMr.type === 'merge_request') { + const containerStatus = await dispatch.checkAgentContainerStatus( + this.env, this.townId, refineryForRetry.id + ); + if (containerStatus.status === 'running') { + agents.updateAgentStatus(this.sql, refineryForRetry.id, 'working'); + return; + } + console.log( + `${TOWN_LOG} processReviewQueue: retrying refinery dispatch for MR bead=${hookedRetryMr.bead_id}` + ); + await this.dispatchAgent(refineryForRetry, hookedRetryMr); + return; + } + } + const entry = reviewQueue.popReviewQueue(this.sql); if (!entry) return; @@ -3516,12 +3242,15 @@ export class TownDO extends DurableObject { const rigId = entry.rig_id; if (!rigId) { console.error(`${TOWN_LOG} processReviewQueue: entry ${entry.id} has no rig_id, skipping`); - reviewQueue.completeReview(this.sql, entry.id, 'failed'); + this.failReviewWithRework(entry, 'MR bead has no rig_id'); return; } const rigConfig = await this.getRigConfig(rigId); if (!rigConfig) { - reviewQueue.completeReview(this.sql, entry.id, 'failed'); + console.error( + `${TOWN_LOG} processReviewQueue: no rig config for rig=${rigId}, entry=${entry.id}` + ); + this.failReviewWithRework(entry, `No rig config found for rig ${rigId}`); return; } @@ -3572,7 +3301,8 @@ export class TownDO extends DurableObject { // Get or create the per-rig refinery. If it already exists and is busy // (processing another review), put the entry back to 'open' so it gets - // retried on the next alarm cycle. + // retried on the next alarm cycle. Re-fetch since the cleanup block + // above may have changed it. const refineryAgent = agents.getOrCreateAgent(this.sql, 'refinery', rigId, this.townId); if (refineryAgent.status !== 'idle') { console.log( @@ -3603,6 +3333,12 @@ export class TownDO extends DurableObject { // Hook the refinery to the MR bead (entry.id), not the source bead // (entry.bead_id). The source bead stays closed with its original // polecat assignee preserved. + // If the refinery is still hooked to a previous MR bead (agentCompleted + // preserves hooks for refineries), unhook first and clear stale checkpoint. + if (refineryAgent.current_hook_bead_id && refineryAgent.current_hook_bead_id !== entry.id) { + agents.unhookBead(this.sql, refineryAgent.id); + agents.writeCheckpoint(this.sql, refineryAgent.id, null); + } agents.hookBead(this.sql, refineryAgent.id, entry.id); // Mark as working before the async container start (same I/O gate @@ -3626,22 +3362,51 @@ export class TownDO extends DurableObject { // may be a convoy feature branch that doesn't exist on the remote yet. // The refinery's system prompt tells it which branch to merge into. defaultBranch: rigConfig.defaultBranch, - kilocodeToken: rigConfig.kilocodeToken, + kilocodeToken: rigConfig.kilocodeToken ?? (await this.resolveKilocodeToken()), townConfig, systemPromptOverride: systemPrompt, platformIntegrationId: rigConfig.platformIntegrationId, }); if (!started) { - agents.unhookBead(this.sql, refineryAgent.id); + // Keep hook intact — the retry block at the top of processReviewQueue + // will re-dispatch on the next alarm tick. By preserving the hook, + // we prevent the cascade of popping new MRs when the container is + // temporarily unavailable. recoverStuckReviews clears after 30 min + // if retries never succeed. agents.updateAgentStatus(this.sql, refineryAgent.id, 'idle'); - console.error( - `${TOWN_LOG} processReviewQueue: refinery agent failed to start for entry=${entry.id}` + + const containerError = dispatch.getLastStartError() ?? 'unknown'; + console.warn( + `${TOWN_LOG} processReviewQueue: refinery start returned false for entry=${entry.id} — ` + + `leaving MR bead in_progress for recovery. error=${containerError}` ); - reviewQueue.completeReview(this.sql, entry.id, 'failed'); + agents.updateAgentStatusMessage(this.sql, refineryAgent.id, + `[start_uncertain] ${containerError}`); } } + /** + * Fail an MR bead via completeReviewWithResult. The source bead is + * returned to 'open' with its assignee cleared, so the normal + * scheduling path (feedStrandedConvoys → hookBead → schedulePendingWork) + * handles rework. No fire-and-forget dispatch — that pattern was prone + * to races with patrol recovery functions. + */ + private failReviewWithRework(entry: ReviewQueueEntry, reason: string): void { + reviewQueue.completeReviewWithResult(this.sql, { + entry_id: entry.id, + status: 'failed', + message: reason, + }); + + this.emitEvent({ + event: 'review.failed', + townId: this.townId, + beadId: entry.id, + }); + } + /** * Process convoys whose tracked beads are all closed and that have a * feature branch waiting to be landed. Creates a final merge_request bead @@ -3827,14 +3592,16 @@ export class TownDO extends DurableObject { if (!status) continue; if (status === 'merged') { - reviewQueue.completeReviewWithResult(this.sql, { + // Use the TownDO wrapper (not the module function directly) + // so dispatchUnblockedBeads fires and events are emitted. + await this.completeReviewWithResult({ entry_id: review.bead_id, status: 'merged', message: 'PR merged externally', }); console.log(`${TOWN_LOG} pollPendingPRs: PR merged for entry=${review.bead_id}`); } else if (status === 'closed') { - reviewQueue.completeReviewWithResult(this.sql, { + await this.completeReviewWithResult({ entry_id: review.bead_id, status: 'failed', message: 'PR closed without merge', @@ -4027,7 +3794,9 @@ export class TownDO extends DurableObject { try { const container = getTownContainerStub(this.env, townId); - await container.fetch('http://container/health'); + await container.fetch('http://container/health', { + signal: AbortSignal.timeout(5_000), + }); } catch { // Container is starting up or unavailable — alarm will retry } @@ -4062,7 +3831,6 @@ export class TownDO extends DurableObject { activeAgents: number; pendingBeads: number; }> { - await this.ensureInitialized(); const townId = this.townId; // Check if alarm is set @@ -4134,8 +3902,6 @@ export class TownDO extends DurableObject { message: string; }>; }> { - await this.ensureInitialized(); - const currentAlarm = await this.ctx.storage.getAlarm(); const active = this.hasActiveWork(); const intervalMs = active ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; @@ -4281,6 +4047,47 @@ export class TownDO extends DurableObject { }; } + // DEBUG: concise non-terminal bead summary — remove after debugging + async debugBeadSummary(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${beads.bead_id}, + ${beads.type}, + ${beads.status}, + ${beads.title}, + ${beads.assignee_agent_bead_id}, + ${beads.updated_at} + FROM ${beads} + WHERE ${beads.status} NOT IN ('closed', 'failed') + AND ${beads.type} != 'agent' + ORDER BY ${beads.type}, ${beads.status} + `, + [] + ), + ]; + } + + // DEBUG: raw agent_metadata dump — remove after debugging + async debugAgentMetadata(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${agent_metadata.bead_id}, + ${agent_metadata.role}, + ${agent_metadata.status}, + ${agent_metadata.current_hook_bead_id}, + ${agent_metadata.dispatch_attempts}, + ${agent_metadata.last_activity_at} + FROM ${agent_metadata} + `, + [] + ), + ]; + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/cloudflare-gastown/src/dos/town/beads.ts b/cloudflare-gastown/src/dos/town/beads.ts index cf0f009346..0095b444e6 100644 --- a/cloudflare-gastown/src/dos/town/beads.ts +++ b/cloudflare-gastown/src/dos/town/beads.ts @@ -258,6 +258,17 @@ export function updateBeadStatus( // No-op if already in the target status — avoids redundant events if (bead.status === status) return bead; + // HARD INVARIANT: terminal states (closed/failed) are immutable. + // Once a bead reaches a terminal state, no recovery function, stale MR + // failure, or race condition should ever change its status. Return the + // bead as-is (no-op, not an error) so callers don't need to pre-check. + if (bead.status === 'closed' || bead.status === 'failed') { + console.warn( + `[beads] updateBeadStatus: blocked ${bead.status} → ${status} for bead=${beadId} — terminal state is immutable` + ); + return bead; + } + const oldStatus = bead.status; const timestamp = now(); const closedAt = status === 'closed' ? timestamp : bead.closed_at; diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 5f56a0608c..4ada979758 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -12,6 +12,13 @@ import { buildContainerConfig, resolveModel, resolveSmallModel } from './config' const TOWN_LOG = '[Town.do]'; +// Module-level diagnostic: stores the last container start error so +// callers can surface it via the admin API. Reset on each call. +let lastStartError: string | null = null; +export function getLastStartError(): string | null { + return lastStartError; +} + /** * Resolve the GASTOWN_JWT_SECRET binding to a string. */ @@ -101,6 +108,7 @@ export async function ensureContainerToken( try { const resp = await container.fetch('http://container/refresh-token', { method: 'POST', + signal: AbortSignal.timeout(10_000), headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ token }), }); @@ -303,6 +311,7 @@ export async function startAgentInContainer( }>; } ): Promise { + lastStartError = null; console.log( `${TOWN_LOG} startAgentInContainer: agentId=${params.agentId} role=${params.role} name=${params.agentName}` ); @@ -376,6 +385,7 @@ export async function startAgentInContainer( const response = await container.fetch('http://container/agents/start', { method: 'POST', + signal: AbortSignal.timeout(60_000), headers: { 'Content-Type': 'application/json', 'X-Town-Config': JSON.stringify(containerConfig), @@ -427,11 +437,27 @@ export async function startAgentInContainer( if (!response.ok) { const text = await response.text().catch(() => '(unreadable)'); - console.error(`${TOWN_LOG} startAgentInContainer: error response: ${text.slice(0, 500)}`); + // "Already running" means a previous dispatch succeeded — the agent + // IS alive in the container. Treat as success so the DO marks the + // agent as working and stops retrying. + if (response.status === 500 && text.includes('already running')) { + console.log( + `${TOWN_LOG} startAgentInContainer: agent ${params.agentId} already running — treating as success` + ); + return true; + } + const errorMsg = `(${response.status}) ${text.slice(0, 300)}`; + console.error( + `${TOWN_LOG} startAgentInContainer: error response for ` + + `agent=${params.agentId} role=${params.role}: ${errorMsg}` + ); + lastStartError = errorMsg; } return response.ok; } catch (err) { + const message = err instanceof Error ? err.message : String(err); console.error(`${TOWN_LOG} startAgentInContainer: EXCEPTION for agent ${params.agentId}:`, err); + lastStartError = `EXCEPTION: ${message.slice(0, 300)}`; return false; } } @@ -533,13 +559,17 @@ export async function checkAgentContainerStatus( ): Promise<{ status: string; exitReason?: string }> { try { const container = getTownContainerStub(env, townId); - // TODO: Generally you should use containerFetch which waits for ports to be available - const response = await container.fetch(`http://container/agents/${agentId}/status`); + const response = await container.fetch(`http://container/agents/${agentId}/status`, { + signal: AbortSignal.timeout(10_000), + }); // 404 means the container is running but has no record of this agent // (e.g. after container eviction). Report as 'not_found' so // witnessPatrol can immediately reset and redispatch the agent // instead of waiting for the 2-hour GUPP timeout. if (response.status === 404) return { status: 'not_found' }; + // Non-OK but not 404 — container is having issues but may still + // have the agent running. Return 'unknown' so witnessPatrol doesn't + // falsely reset a working agent. if (!response.ok) return { status: 'unknown' }; const data: unknown = await response.json(); if (typeof data === 'object' && data !== null && 'status' in data) { @@ -553,6 +583,10 @@ export async function checkAgentContainerStatus( } return { status: 'unknown' }; } catch { + // Timeout, network error, or container starting up — return + // 'unknown' so witnessPatrol doesn't falsely reset working agents. + // True zombies will be caught after repeated 'unknown' results + // once the GIPP/heartbeat timeout expires. return { status: 'unknown' }; } } diff --git a/cloudflare-gastown/src/dos/town/patrol.ts b/cloudflare-gastown/src/dos/town/patrol.ts index 5fb78db442..4e18cbd6f2 100644 --- a/cloudflare-gastown/src/dos/town/patrol.ts +++ b/cloudflare-gastown/src/dos/town/patrol.ts @@ -586,6 +586,96 @@ export function feedStrandedConvoys(sql: SqlStorage, townId: string): void { } } +/** + * Recover beads whose assigned agent is no longer hooked to them. + * + * After container restarts, review failures, or rework cycles, beads + * can end up in 'open' or 'in_progress' with a stale + * assignee_agent_bead_id — the agent has been unhooked but the bead + * still references it. Neither feedStrandedConvoys (requires assignee + * IS NULL) nor schedulePendingWork (requires agent hooked) will pick + * these up. + * + * For each orphaned bead: + * - If in_progress, reset to open (no agent is actually working on it) + * - Hook a polecat so schedulePendingWork dispatches it on the next tick + */ +export function rehookOrphanedBeads(sql: SqlStorage, townId: string): void { + const OrphanedBeadRow = z.object({ + bead_id: z.string(), + bead_status: z.string(), + rig_id: z.string().nullable(), + assignee_agent_bead_id: z.string(), + }); + + // Find open/in_progress issue beads where the assigned agent's + // current_hook_bead_id does NOT point back to this bead (either NULL + // or hooked elsewhere). Also require the agent to NOT be 'working' — + // if the agent is working, the hook mismatch may be a transient race + // during dispatch rather than a real orphan. Time guard: only touch + // beads orphaned for >2 min to avoid interfering with in-flight + // transitions (dispatch, gt_done, review completion). + const cutoff = new Date(Date.now() - 2 * 60_000).toISOString(); + const rows = OrphanedBeadRow.array().parse([ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id}, + ${beads.status} AS bead_status, + ${beads.rig_id}, + ${beads.assignee_agent_bead_id} + FROM ${beads} + INNER JOIN ${agent_metadata} + ON ${agent_metadata.bead_id} = ${beads.assignee_agent_bead_id} + WHERE ${beads.status} IN ('open', 'in_progress') + AND ${beads.type} = 'issue' + AND ${beads.assignee_agent_bead_id} IS NOT NULL + AND ${beads.updated_at} < ? + AND ${agent_metadata.status} != 'working' + AND ( + ${agent_metadata.current_hook_bead_id} IS NULL + OR ${agent_metadata.current_hook_bead_id} != ${beads.bead_id} + ) + `, + [cutoff] + ), + ]); + + if (rows.length === 0) return; + + console.log(`${LOG} rehookOrphanedBeads: found ${rows.length} orphaned bead(s)`); + + for (const row of rows) { + const rigId = row.rig_id; + if (!rigId) continue; + + try { + // If the bead is in_progress but no agent is working on it, + // reset to open so the dispatch flow starts cleanly. + if (row.bead_status === 'in_progress') { + updateBeadStatus(sql, row.bead_id, 'open', 'system'); + } + + const agent = getOrCreateAgent(sql, 'polecat', rigId, townId); + hookBead(sql, agent.id, row.bead_id); + query( + sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.last_activity_at} = NULL + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.id] + ); + console.log( + `${LOG} rehookOrphanedBeads: re-hooked agent=${agent.id} to bead=${row.bead_id} (was ${row.bead_status})` + ); + } catch (err) { + console.warn(`${LOG} rehookOrphanedBeads: failed to re-hook bead=${row.bead_id}:`, err); + } + } +} + /** * Detect crash loops: agents that have failed repeatedly within a * short window. Creates a triage request for LLM assessment. diff --git a/cloudflare-gastown/src/dos/town/review-queue.ts b/cloudflare-gastown/src/dos/town/review-queue.ts index 26df5a6ddf..979028b020 100644 --- a/cloudflare-gastown/src/dos/town/review-queue.ts +++ b/cloudflare-gastown/src/dos/town/review-queue.ts @@ -28,8 +28,11 @@ import { getAgent, unhookBead } from './agents'; import { getRig } from './rigs'; import type { ReviewQueueInput, ReviewQueueEntry, AgentDoneInput, Molecule } from '../../types'; -// Review entries stuck in 'running' past this timeout are reset to 'pending' -const REVIEW_RUNNING_TIMEOUT_MS = 5 * 60 * 1000; +// Review entries stuck in 'running' past this timeout are reset to 'pending'. +// Only applies when no agent (working or idle) is hooked to the MR bead. +// Set to 30 min — reviews can legitimately take 10-15 min for clone + build +// + test + merge, and the refinery hook guard is the primary protection. +const REVIEW_RUNNING_TIMEOUT_MS = 30 * 60 * 1000; function generateId(): string { return crypto.randomUUID(); @@ -186,12 +189,26 @@ export function submitToReviewQueue(sql: SqlStorage, input: ReviewQueueInput): v } export function popReviewQueue(sql: SqlStorage): ReviewQueueEntry | null { + // Pop the oldest open MR bead, but skip any whose source bead already + // has another MR in_progress (i.e. a refinery is already reviewing it). + // This prevents popping stale MR beads and triggering failReviewWithRework + // while an active review is in flight for the same source. + // + // The source bead is linked via bead_dependencies (dependency_type='tracks'): + // bead_dependencies.bead_id = MR bead + // bead_dependencies.depends_on_bead_id = source bead const rows = [ ...query( sql, /* sql */ ` ${REVIEW_JOIN} WHERE ${beads.status} = 'open' + AND NOT EXISTS ( + SELECT 1 FROM ${beads} AS active_mr + WHERE active_mr.${beads.columns.type} = 'merge_request' + AND active_mr.${beads.columns.status} = 'in_progress' + AND active_mr.${beads.columns.rig_id} = ${beads.rig_id} + ) ORDER BY ${beads.created_at} ASC LIMIT 1 `, @@ -223,6 +240,16 @@ export function completeReview( entryId: string, status: 'merged' | 'failed' ): void { + // Guard: don't overwrite terminal states (closed MR bead that was + // already merged should never be set to 'failed' by a stale call) + const current = getBead(sql, entryId); + if (current && (current.status === 'closed' || current.status === 'failed')) { + console.warn( + `[review-queue] completeReview: bead ${entryId} already ${current.status}, skipping` + ); + return; + } + const beadStatus = status === 'merged' ? 'closed' : 'failed'; const timestamp = now(); query( @@ -275,8 +302,38 @@ export function completeReviewWithResult( if (input.status === 'merged') { const mergeTimestamp = now(); + console.log( + `[review-queue] completeReviewWithResult MERGED: entry_id=${input.entry_id} ` + + `entry.bead_id (source)=${entry.bead_id} entry.id (MR)=${entry.id} — ` + + `calling closeBead on source` + ); closeBead(sql, entry.bead_id, entry.agent_id); + // Close ALL other open/in_progress/failed MR beads for the same + // source bead. During rework cycles, multiple MR beads accumulate. + // Without this cleanup, stale MR beads trigger failReviewWithRework + // on the next alarm tick, reopening the source bead that was just + // closed by this merge. + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.status} = 'closed', + ${beads.columns.updated_at} = ?, + ${beads.columns.closed_at} = ? + WHERE ${beads.type} = 'merge_request' + AND ${beads.bead_id} != ? + AND ${beads.status} NOT IN ('closed') + AND ${beads.bead_id} IN ( + SELECT dep.${bead_dependencies.columns.bead_id} + FROM ${bead_dependencies} AS dep + WHERE dep.${bead_dependencies.columns.depends_on_bead_id} = ? + AND dep.${bead_dependencies.columns.dependency_type} = 'tracks' + ) + `, + [mergeTimestamp, mergeTimestamp, input.entry_id, entry.bead_id] + ); + // closeBead → updateBeadStatus short-circuits when completeReview already // set the status to 'closed' via direct SQL, so updateConvoyProgress is // never reached transitively. Call it explicitly to ensure the convoy @@ -310,13 +367,41 @@ export function completeReviewWithResult( conflict: true, }, }); - // Return source bead to in_progress so the polecat can be re-dispatched - // to resolve the conflict (in_review → in_progress rework flow). - updateBeadStatus(sql, entry.bead_id, 'in_progress', entry.agent_id); + // Return source bead to open so the normal scheduling path handles + // rework. Clear assignee so feedStrandedConvoys can match. + const conflictSourceBead = getBead(sql, entry.bead_id); + if (conflictSourceBead && conflictSourceBead.status !== 'closed' && conflictSourceBead.status !== 'failed') { + updateBeadStatus(sql, entry.bead_id, 'open', entry.agent_id); + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.assignee_agent_bead_id} = NULL + WHERE ${beads.bead_id} = ? + `, + [entry.bead_id] + ); + } } else if (input.status === 'failed') { - // Review failed (rework requested): return source bead to in_progress - // so it can be re-dispatched (in_review → in_progress rework flow). - updateBeadStatus(sql, entry.bead_id, 'in_progress', entry.agent_id); + // Review failed (rework requested): return source bead to open so + // the normal scheduling path (feedStrandedConvoys → hookBead → + // schedulePendingWork → dispatch) handles rework. Clear the stale + // assignee so feedStrandedConvoys can match (requires assignee IS NULL). + // This avoids the fire-and-forget rework dispatch race in TownDO + // where the dispatch fails and rehookOrphanedBeads churn. + const sourceBead = getBead(sql, entry.bead_id); + if (sourceBead && sourceBead.status !== 'closed' && sourceBead.status !== 'failed') { + updateBeadStatus(sql, entry.bead_id, 'open', entry.agent_id); + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.assignee_agent_bead_id} = NULL + WHERE ${beads.bead_id} = ? + `, + [entry.bead_id] + ); + } } } @@ -400,23 +485,113 @@ export function listPendingPRReviews(sql: SqlStorage): MergeRequestBeadRecord[] */ export function recoverStuckReviews(sql: SqlStorage): void { const timeout = new Date(Date.now() - REVIEW_RUNNING_TIMEOUT_MS).toISOString(); - query( - sql, - /* sql */ ` - UPDATE ${beads} - SET ${beads.columns.status} = 'open', - ${beads.columns.updated_at} = ? - WHERE ${beads.type} = 'merge_request' - AND ${beads.status} = 'in_progress' - AND ${beads.updated_at} < ? - AND ${beads.bead_id} NOT IN ( - SELECT ${review_metadata.bead_id} - FROM ${review_metadata} - WHERE ${review_metadata.pr_url} IS NOT NULL - ) - `, - [now(), timeout] - ); + const timestamp = now(); + + // Find stuck MR beads: in_progress past the timeout, no pr_url, and + // no WORKING agent hooked. An idle agent hooked to the MR means the + // refinery died (witnessPatrol set it to idle) — the review should be + // recovered. Only skip if the agent is actively working. + const stuckMrRows = BeadRecord.pick({ bead_id: true }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} + FROM ${beads} + WHERE ${beads.type} = 'merge_request' + AND ${beads.status} = 'in_progress' + AND ${beads.updated_at} < ? + AND ${beads.bead_id} NOT IN ( + SELECT ${review_metadata.bead_id} + FROM ${review_metadata} + WHERE ${review_metadata.pr_url} IS NOT NULL + ) + AND NOT EXISTS ( + SELECT 1 FROM ${agent_metadata} + WHERE ${agent_metadata.current_hook_bead_id} = ${beads.bead_id} + AND ${agent_metadata.status} = 'working' + ) + `, + [timeout] + ), + ]); + + for (const row of stuckMrRows) { + // Reset MR bead to open for re-processing + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.status} = 'open', + ${beads.columns.updated_at} = ? + WHERE ${beads.bead_id} = ? + `, + [timestamp, row.bead_id] + ); + + // Unhook any idle refinery still pointing at this MR bead so it + // can be reused for the next processReviewQueue cycle + query( + sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.current_hook_bead_id} = NULL + WHERE ${agent_metadata.current_hook_bead_id} = ? + AND ${agent_metadata.status} = 'idle' + `, + [row.bead_id] + ); + + console.log( + `[review-queue] recoverStuckReviews: reset MR bead=${row.bead_id} to open, unhooked idle agents` + ); + } + + // Fast recovery: MR beads in_progress with NO agent hooked at all are + // clearly abandoned (hookBead threw or the refinery was unhooked). + // Recover after 2 min instead of waiting for the 30-min timeout. + const abandonedCutoff = new Date(Date.now() - 2 * 60 * 1000).toISOString(); + const abandonedRows = BeadRecord.pick({ bead_id: true }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} + FROM ${beads} + WHERE ${beads.type} = 'merge_request' + AND ${beads.status} = 'in_progress' + AND ${beads.updated_at} < ? + AND NOT EXISTS ( + SELECT 1 FROM ${agent_metadata} + WHERE ${agent_metadata.current_hook_bead_id} = ${beads.bead_id} + ) + AND ${beads.bead_id} NOT IN ( + SELECT ${review_metadata.bead_id} + FROM ${review_metadata} + WHERE ${review_metadata.pr_url} IS NOT NULL + ) + `, + [abandonedCutoff] + ), + ]); + + for (const row of abandonedRows) { + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.status} = 'open', + ${beads.columns.updated_at} = ? + WHERE ${beads.bead_id} = ? + `, + [timestamp, row.bead_id] + ); + console.log( + `[review-queue] recoverStuckReviews: fast-recovered abandoned MR bead=${row.bead_id} (no agent hooked)` + ); + } } /** @@ -433,6 +608,12 @@ const ORPHAN_REVIEW_TIMEOUT_MS = 30 * 60 * 1000; export function closeOrphanedReviewBeads(sql: SqlStorage): void { const cutoff = new Date(Date.now() - ORPHAN_REVIEW_TIMEOUT_MS).toISOString(); + // Match MR beads with pr_url that are open OR in_progress, stale beyond + // the timeout, and whose refinery agent is idle/dead/missing. The + // in_progress case covers a gap where the refinery dies mid-review + // on a PR-strategy bead: recoverStuckReviews excludes pr_url beads + // and schedulePendingWork excludes refineries, so nothing else + // recovers them. const orphanRows = [ ...query( sql, @@ -442,7 +623,7 @@ export function closeOrphanedReviewBeads(sql: SqlStorage): void { INNER JOIN ${review_metadata} ON ${beads.bead_id} = ${review_metadata.bead_id} LEFT JOIN ${agent_metadata} ON ${beads.assignee_agent_bead_id} = ${agent_metadata.bead_id} WHERE ${beads.type} = 'merge_request' - AND ${beads.status} = 'open' + AND ${beads.status} IN ('open', 'in_progress') AND ${review_metadata.pr_url} IS NOT NULL AND ${beads.updated_at} < ? AND ( @@ -459,9 +640,16 @@ export function closeOrphanedReviewBeads(sql: SqlStorage): void { .object({ bead_id: z.string(), assignee_agent_bead_id: z.string().nullable() }) .parse(row); try { - closeBead(sql, parsed.bead_id, parsed.assignee_agent_bead_id ?? 'system'); + // Use completeReviewWithResult instead of closeBead so the source + // bead is also transitioned (closeBead only closes the MR bead + // itself, leaving the source stuck in in_review). + completeReviewWithResult(sql, { + entry_id: parsed.bead_id, + status: 'failed', + message: 'PR review orphaned — agent died and polling could not resolve', + }); console.log( - `[review-queue] closeOrphanedReviewBeads: closed orphaned MR bead=${parsed.bead_id}` + `[review-queue] closeOrphanedReviewBeads: failed orphaned MR bead=${parsed.bead_id}` ); } catch (err) { console.warn( @@ -472,12 +660,146 @@ export function closeOrphanedReviewBeads(sql: SqlStorage): void { } } +/** + * Recover source beads stuck in 'in_review' whose MR beads have all + * reached a terminal state (closed/failed) with no pending review still + * in flight. This can happen when an MR bead failure path bypasses + * completeReviewWithResult (which is the only path that returns the + * source bead to in_progress for rework). + * + * Returns beads to 'open' (not 'in_progress') so the scheduler can + * assign and dispatch a fresh polecat — by this point the original + * agent has already been unhooked. + * + * Only affects source beads that have been stuck for longer than the + * recovery timeout, to avoid interfering with in-flight reviews. + */ +export function recoverOrphanedSourceBeads(sql: SqlStorage): void { + // Use a shorter timeout than REVIEW_RUNNING_TIMEOUT_MS — by the time + // this runs, ALL MR beads for the source are already terminal (the + // NOT EXISTS guard below ensures this). The 5-min window just avoids + // interfering with in-flight transitions. + const cutoff = new Date(Date.now() - 5 * 60 * 1000).toISOString(); + + const stuckRows = [ + ...query( + sql, + /* sql */ ` + SELECT src.${beads.columns.bead_id} AS source_bead_id + FROM ${beads} AS src + INNER JOIN ${bead_dependencies} AS dep + ON dep.${bead_dependencies.columns.depends_on_bead_id} = src.${beads.columns.bead_id} + AND dep.${bead_dependencies.columns.dependency_type} = 'tracks' + INNER JOIN ${beads} AS mr + ON mr.${beads.columns.bead_id} = dep.${bead_dependencies.columns.bead_id} + AND mr.${beads.columns.type} = 'merge_request' + WHERE src.${beads.columns.status} = 'in_review' + AND src.${beads.columns.updated_at} < ? + AND mr.${beads.columns.status} IN ('closed', 'failed') + AND NOT EXISTS ( + SELECT 1 FROM ${bead_dependencies} AS dep2 + INNER JOIN ${beads} AS mr2 + ON mr2.${beads.columns.bead_id} = dep2.${bead_dependencies.columns.bead_id} + WHERE dep2.${bead_dependencies.columns.depends_on_bead_id} = src.${beads.columns.bead_id} + AND dep2.${bead_dependencies.columns.dependency_type} = 'tracks' + AND mr2.${beads.columns.type} = 'merge_request' + AND mr2.${beads.columns.status} IN ('open', 'in_progress') + ) + `, + [cutoff] + ), + ]; + + for (const row of stuckRows) { + const parsed = z.object({ source_bead_id: z.string() }).parse(row); + try { + updateBeadStatus(sql, parsed.source_bead_id, 'open', 'system'); + // Clear the stale assignee so feedStrandedConvoys (which requires + // assignee IS NULL) can pick up convoy beads, and rehookOrphanedBeads + // or feedStrandedConvoys can assign a fresh agent. + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.assignee_agent_bead_id} = NULL + WHERE ${beads.bead_id} = ? + `, + [parsed.source_bead_id] + ); + console.log( + `[review-queue] recoverOrphanedSourceBeads: returned bead=${parsed.source_bead_id} to open (assignee cleared)` + ); + } catch (err) { + console.warn( + `[review-queue] recoverOrphanedSourceBeads: failed to recover bead=${parsed.source_bead_id}`, + err + ); + } + } +} + // ── Agent Done ────────────────────────────────────────────────────── export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInput): void { const agent = getAgent(sql, agentId); if (!agent) throw new Error(`Agent ${agentId} not found`); - if (!agent.current_hook_bead_id) throw new Error(`Agent ${agentId} has no hooked bead`); + if (!agent.current_hook_bead_id) { + // The agent was unhooked by a recovery path (witnessPatrol, + // rehookOrphanedBeads) between when the agent finished work and + // when it called gt_done. + // + // For refineries, this is critical: the refinery successfully merged + // but the hook was cleared by zombie detection. We MUST still complete + // the review — otherwise the source bead stays open forever. Find the + // most recent non-closed MR bead assigned to this agent and complete it. + if (agent.role === 'refinery') { + const recentMrRows = [ + ...query( + sql, + /* sql */ ` + SELECT ${beads.bead_id} + FROM ${beads} + WHERE ${beads.type} = 'merge_request' + AND ${beads.assignee_agent_bead_id} = ? + AND ${beads.status} NOT IN ('closed', 'failed') + ORDER BY ${beads.updated_at} DESC + LIMIT 1 + `, + [agentId] + ), + ]; + if (recentMrRows.length > 0) { + const mrBeadId = z.object({ bead_id: z.string() }).parse(recentMrRows[0]).bead_id; + console.log( + `[review-queue] agentDone: unhooked refinery ${agentId} — recovering MR bead ${mrBeadId}` + ); + if (input.pr_url) { + const stored = setReviewPrUrl(sql, mrBeadId, input.pr_url); + if (stored) { + markReviewInReview(sql, mrBeadId); + } else { + completeReviewWithResult(sql, { + entry_id: mrBeadId, + status: 'failed', + message: `Refinery provided invalid pr_url: ${input.pr_url}`, + }); + } + } else { + completeReviewWithResult(sql, { + entry_id: mrBeadId, + status: 'merged', + message: input.summary ?? 'Merged by refinery agent (recovered from unhook)', + }); + } + return; + } + } + + console.warn( + `[review-queue] agentDone: agent ${agentId} (role=${agent.role}) has no hooked bead — ignoring` + ); + return; + } // Triage batch beads don't produce code — close and unhook without // submitting to the review queue. Only applies to system-created triage @@ -596,29 +918,19 @@ export function agentCompleted( if (!agent) return result; if (agent.current_hook_bead_id) { - // When a refinery exits with 'completed' but the MR bead is still - // in_progress (not closed/merged), it means the refinery requested - // rework. Route through completeReviewWithResult so the source bead - // is returned to in_progress for re-dispatch. - if (agent.role === 'refinery' && input.status === 'completed') { - const mrBead = getBead(sql, agent.current_hook_bead_id); - if (mrBead && mrBead.status !== 'closed') { - const sourceBeadId = - typeof mrBead.metadata?.source_bead_id === 'string' - ? mrBead.metadata.source_bead_id - : null; - completeReviewWithResult(sql, { - entry_id: agent.current_hook_bead_id, - status: 'failed', - message: input.reason ?? 'Refinery exited without merge — rework needed', - }); - result.reworkSourceBeadId = sourceBeadId; - unhookBead(sql, agentId); - // Mark agent idle (below) - } else { - // MR was already closed (merged) — normal completion - unhookBead(sql, agentId); - } + if (agent.role === 'refinery') { + // NEVER fail or unhook a refinery from agentCompleted. + // agentCompleted races with gt_done: the process exits, the + // container sends /completed, but gt_done's HTTP request may + // still be in flight. If we unhook here, recoverStuckReviews + // can fire between agentCompleted and gt_done, resetting the + // MR bead that's about to be closed by gt_done. + // + // Leave the hook intact. gt_done will close + unhook if the + // merge succeeded. recoverStuckReviews (which checks for + // status='working') handles the case where gt_done never arrives. + // + // No-op for the bead — just fall through to mark agent idle. } else { const beadStatus = input.status === 'completed' ? 'closed' : 'failed'; updateBeadStatus(sql, agent.current_hook_bead_id, beadStatus, agentId); diff --git a/cloudflare-gastown/src/dos/town/scheduling.ts b/cloudflare-gastown/src/dos/town/scheduling.ts new file mode 100644 index 0000000000..3f631dab65 --- /dev/null +++ b/cloudflare-gastown/src/dos/town/scheduling.ts @@ -0,0 +1,374 @@ +/** + * Agent scheduling and dispatch for the Town DO alarm loop. + * + * Owns the core dispatch/retry logic that was previously inline in + * Town.do.ts. The Town DO delegates to these pure(ish) functions, + * passing its SQL handle and env bindings. + */ + +import * as Sentry from '@sentry/cloudflare'; +import { z } from 'zod'; +import { beads, AgentBeadRecord } from '../../db/tables/beads.table'; +import { agent_metadata } from '../../db/tables/agent-metadata.table'; +import { query } from '../../util/query.util'; +import * as beadOps from './beads'; +import * as agents from './agents'; +import * as rigs from './rigs'; +import * as dispatch from './container-dispatch'; +import * as patrol from './patrol'; +import type { Agent, Bead, TownConfig } from '../../types'; +import type { GastownEventData } from '../../util/analytics.util'; + +const LOG = '[scheduling]'; + +// ── Constants ────────────────────────────────────────────────────────── + +export const DISPATCH_COOLDOWN_MS = 2 * 60_000; // 2 min +export const MAX_DISPATCH_ATTEMPTS = 20; + +// ── Context passed by the Town DO ────────────────────────────────────── + +type SchedulingContext = { + sql: SqlStorage; + env: Env; + storage: DurableObjectStorage; + townId: string; + getTownConfig: () => Promise; + getRigConfig: (rigId: string) => Promise; + resolveKilocodeToken: () => Promise; + emitEvent: (data: Omit) => void; +}; + +type RigConfig = { + townId: string; + rigId: string; + gitUrl: string; + defaultBranch: string; + userId: string; + kilocodeToken?: string; + platformIntegrationId?: string; + merge_strategy?: string; +}; + +function now(): string { + return new Date().toISOString(); +} + +// ── dispatchAgent ────────────────────────────────────────────────────── + +/** + * Dispatch a single agent to the container. Transitions the bead to + * in_progress and the agent to working BEFORE the async network call + * (I/O gate safety for fire-and-forget callers). Returns true if the + * container accepted the agent. + */ +export async function dispatchAgent( + ctx: SchedulingContext, + agent: Agent, + bead: Bead, + options?: { systemPromptOverride?: string } +): Promise { + try { + const rigId = agent.rig_id ?? rigs.listRigs(ctx.sql)[0]?.id ?? ''; + const rigConfig = rigId ? await ctx.getRigConfig(rigId) : null; + if (!rigConfig) { + console.warn(`${LOG} dispatchAgent: no rig config for agent=${agent.id} rig=${rigId}`); + return false; + } + + const townConfig = await ctx.getTownConfig(); + const kilocodeToken = await ctx.resolveKilocodeToken(); + + const convoyId = beadOps.getConvoyForBead(ctx.sql, bead.bead_id); + const convoyFeatureBranch = convoyId ? beadOps.getConvoyFeatureBranch(ctx.sql, convoyId) : null; + + // Transition bead to in_progress BEFORE the async container start. + // Must happen synchronously within the I/O gate — fire-and-forget + // callers (slingBead, slingConvoy) close the gate before the + // network call completes. + const currentBead = beadOps.getBead(ctx.sql, bead.bead_id); + if ( + currentBead && + currentBead.status !== 'in_progress' && + currentBead.status !== 'closed' && + currentBead.status !== 'failed' + ) { + beadOps.updateBeadStatus(ctx.sql, bead.bead_id, 'in_progress', agent.id); + } + + // Set agent to 'working' BEFORE the async container start (same + // I/O gate rationale). + const timestamp = now(); + query( + ctx.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'working', + ${agent_metadata.columns.dispatch_attempts} = ${agent_metadata.columns.dispatch_attempts} + 1, + ${agent_metadata.columns.last_activity_at} = ? + WHERE ${agent_metadata.bead_id} = ? + `, + [timestamp, agent.id] + ); + + const started = await dispatch.startAgentInContainer(ctx.env, ctx.storage, { + townId: ctx.townId, + rigId, + userId: rigConfig.userId, + agentId: agent.id, + agentName: agent.name, + role: agent.role, + identity: agent.identity, + beadId: bead.bead_id, + beadTitle: bead.title, + beadBody: bead.body ?? '', + checkpoint: agent.checkpoint, + gitUrl: rigConfig.gitUrl, + defaultBranch: rigConfig.defaultBranch, + kilocodeToken, + townConfig, + platformIntegrationId: rigConfig.platformIntegrationId, + convoyFeatureBranch: convoyFeatureBranch ?? undefined, + systemPromptOverride: options?.systemPromptOverride, + }); + + if (started) { + // Best-effort: may be dropped if I/O gate is closed + query( + ctx.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.dispatch_attempts} = 0 + WHERE ${agent_metadata.bead_id} = ? + `, + [agent.id] + ); + console.log(`${LOG} dispatchAgent: started agent=${agent.name}(${agent.id})`); + ctx.emitEvent({ + event: 'agent.spawned', + townId: ctx.townId, + rigId, + agentId: agent.id, + beadId: bead.bead_id, + role: agent.role, + }); + } else { + // Container start returned false — but the container may have + // actually started the agent (timeout race). DON'T roll back + // the bead to open. Leave it in_progress with the agent idle+hooked. + // If the agent truly failed: rehookOrphanedBeads recovers after 2 min. + // If the agent actually started: it works and calls gt_done normally. + query( + ctx.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.last_activity_at} = ? + WHERE ${agent_metadata.bead_id} = ? + `, + [now(), agent.id] + ); + ctx.emitEvent({ + event: 'agent.dispatch_failed', + townId: ctx.townId, + rigId, + agentId: agent.id, + beadId: bead.bead_id, + role: agent.role, + }); + } + return started; + } catch (err) { + console.error(`${LOG} dispatchAgent: failed for agent=${agent.id}:`, err); + Sentry.captureException(err, { extra: { agentId: agent.id, beadId: bead.bead_id } }); + try { + query( + ctx.sql, + /* sql */ ` + UPDATE ${agent_metadata} + SET ${agent_metadata.columns.status} = 'idle', + ${agent_metadata.columns.last_activity_at} = ? + WHERE ${agent_metadata.bead_id} = ? + `, + [now(), agent.id] + ); + // Don't roll back bead to open — same timeout race rationale + } catch (rollbackErr) { + console.error(`${LOG} dispatchAgent: rollback also failed:`, rollbackErr); + } + ctx.emitEvent({ + event: 'agent.dispatch_failed', + townId: ctx.townId, + agentId: agent.id, + beadId: bead.bead_id, + role: agent.role, + }); + return false; + } +} + +// ── dispatchUnblockedBeads ───────────────────────────────────────────── + +/** + * When a bead closes, find beads that were blocked by it and are now + * fully unblocked. Dispatch their assigned agents (fire-and-forget). + */ +export function dispatchUnblockedBeads(ctx: SchedulingContext, closedBeadId: string): void { + const unblockedIds = beadOps.getNewlyUnblockedBeads(ctx.sql, closedBeadId); + if (unblockedIds.length === 0) return; + + console.log( + `${LOG} dispatchUnblockedBeads: ${unblockedIds.length} beads unblocked by ${closedBeadId}` + ); + + for (const beadId of unblockedIds) { + const bead = beadOps.getBead(ctx.sql, beadId); + if (!bead || bead.status === 'closed' || bead.status === 'failed') continue; + + if (!bead.assignee_agent_bead_id) continue; + const agent = agents.getAgent(ctx.sql, bead.assignee_agent_bead_id); + if (!agent || agent.status !== 'idle') continue; + + dispatchAgent(ctx, agent, bead).catch(err => + console.error( + `${LOG} dispatchUnblockedBeads: fire-and-forget dispatch failed for bead=${beadId}`, + err + ) + ); + } +} + +// ── schedulePendingWork ──────────────────────────────────────────────── + +/** + * Find idle agents with hooked beads and dispatch them. Agents within + * the dispatch cooldown are skipped (fire-and-forget dispatch in flight). + * + * Refineries are excluded — they must go through processReviewQueue so + * they receive the full system prompt with branch, strategy, and gate + * context. recoverStuckReviews resets their MR bead to 'open' after the + * timeout, and processReviewQueue re-pops it with the correct prompt. + */ +export async function schedulePendingWork(ctx: SchedulingContext): Promise { + const cooldownCutoff = new Date(Date.now() - DISPATCH_COOLDOWN_MS).toISOString(); + const rows = [ + ...query( + ctx.sql, + /* sql */ ` + SELECT ${beads}.*, + ${agent_metadata.role}, ${agent_metadata.identity}, + ${agent_metadata.container_process_id}, + ${agent_metadata.status} AS agent_status, + ${agent_metadata.current_hook_bead_id}, + ${agent_metadata.dispatch_attempts}, ${agent_metadata.last_activity_at}, + ${agent_metadata.checkpoint}, + ${agent_metadata.agent_status_message}, ${agent_metadata.agent_status_updated_at} + FROM ${beads} + INNER JOIN ${agent_metadata} ON ${beads.bead_id} = ${agent_metadata.bead_id} + WHERE ${agent_metadata.status} = 'idle' + AND ${agent_metadata.current_hook_bead_id} IS NOT NULL + AND ${agent_metadata.role} != 'refinery' + AND (${agent_metadata.last_activity_at} IS NULL OR ${agent_metadata.last_activity_at} < ?) + `, + [cooldownCutoff] + ), + ]; + // Parse rows as AgentBeadRecord — the agent_metadata.status is aliased + // as agent_status (not status) to avoid overwriting beads.status which + // has a different enum (open/in_progress/... vs idle/working/...). + const parsed = z + .array(AgentBeadRecord.extend({ agent_status: z.string() })) + .safeParse(rows); + if (!parsed.success) { + console.error(`${LOG} schedulePendingWork: Zod parse failed:`, parsed.error.issues.slice(0, 3)); + return; + } + const pendingAgents: Agent[] = parsed.data.map(row => ({ + id: row.bead_id, + rig_id: row.rig_id, + role: row.role, + name: row.title, + identity: row.identity, + status: row.agent_status, + current_hook_bead_id: row.current_hook_bead_id, + dispatch_attempts: row.dispatch_attempts, + last_activity_at: row.last_activity_at, + checkpoint: row.checkpoint, + created_at: row.created_at, + agent_status_message: row.agent_status_message, + agent_status_updated_at: row.agent_status_updated_at, + })); + + console.log(`${LOG} schedulePendingWork: found ${pendingAgents.length} pending agents`); + if (pendingAgents.length === 0) return; + + const dispatchTasks: Array<() => Promise> = []; + + for (const agent of pendingAgents) { + const beadId = agent.current_hook_bead_id; + if (!beadId) continue; + const bead = beadOps.getBead(ctx.sql, beadId); + if (!bead) continue; + + if (agent.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { + beadOps.updateBeadStatus(ctx.sql, beadId, 'failed', agent.id); + agents.unhookBead(ctx.sql, agent.id); + continue; + } + + if (beadOps.hasUnresolvedBlockers(ctx.sql, beadId)) { + continue; + } + + dispatchTasks.push(async () => { + await dispatchAgent(ctx, agent, bead); + }); + } + + if (dispatchTasks.length > 0) { + await Promise.allSettled(dispatchTasks.map(fn => fn())); + } +} + +// ── hasActiveWork ────────────────────────────────────────────────────── + +/** + * Returns true if the town has work that requires the fast (5s) alarm + * interval. Used to decide between active and idle alarm cadence. + */ +export function hasActiveWork(sql: SqlStorage): boolean { + const activeAgentRows = [ + ...query( + sql, + /* sql */ `SELECT COUNT(*) as cnt FROM ${agent_metadata} WHERE ${agent_metadata.status} IN ('working', 'stalled')`, + [] + ), + ]; + const pendingBeadRows = [ + ...query( + sql, + /* sql */ `SELECT COUNT(*) as cnt FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'idle' AND ${agent_metadata.current_hook_bead_id} IS NOT NULL`, + [] + ), + ]; + const pendingReviewRows = [ + ...query( + sql, + /* sql */ `SELECT COUNT(*) as cnt FROM ${beads} WHERE ${beads.type} = 'merge_request' AND ${beads.status} IN ('open', 'in_progress')`, + [] + ), + ]; + const pendingTriageRows = [ + ...query( + sql, + /* sql */ `SELECT COUNT(*) as cnt FROM ${beads} WHERE ${beads.type} = 'issue' AND ${beads.labels} LIKE ? AND ${beads.status} = 'open'`, + [patrol.TRIAGE_LABEL_LIKE] + ), + ]; + return ( + Number(activeAgentRows[0]?.cnt ?? 0) > 0 || + Number(pendingBeadRows[0]?.cnt ?? 0) > 0 || + Number(pendingReviewRows[0]?.cnt ?? 0) > 0 || + Number(pendingTriageRows[0]?.cnt ?? 0) > 0 + ); +} diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 8282446cdf..ca8de34285 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -193,6 +193,18 @@ app.get('/', c => c.html(dashboardHtml())); app.get('/health', c => c.json({ status: 'ok' })); +// ── DEBUG: unauthenticated town introspection — REMOVE after debugging ── +app.get('/debug/towns/:townId/status', async c => { + const townId = c.req.param('townId'); + const town = getTownDOStub(c.env, townId); + const alarmStatus = await town.getAlarmStatus(); + // eslint-disable-next-line @typescript-eslint/await-thenable -- DO RPC returns promise at runtime + const agentMeta = await town.debugAgentMetadata(); + // eslint-disable-next-line @typescript-eslint/await-thenable + const beadSummary = await town.debugBeadSummary(); + return c.json({ alarmStatus, agentMeta, beadSummary }); +}); + // ── Town ID + Auth ────────────────────────────────────────────────────── // All rig routes live under /api/towns/:townId/rigs/:rigId so the townId // is always available from the URL path. diff --git a/cloudflare-gastown/src/handlers/mayor.handler.ts b/cloudflare-gastown/src/handlers/mayor.handler.ts index 0d6740a0f7..cbe0470450 100644 --- a/cloudflare-gastown/src/handlers/mayor.handler.ts +++ b/cloudflare-gastown/src/handlers/mayor.handler.ts @@ -50,9 +50,6 @@ export async function handleSendMayorMessage(c: Context, params: { t ); const town = getTownDOStub(c.env, params.townId); - // Ensure the TownDO knows its real UUID (ctx.id.name is unreliable in local dev) - // TODO: This should only be done on town creation. Why are we doing it here? - await town.setTownId(params.townId); const result = await town.sendMayorMessage( parsed.data.message, parsed.data.model, @@ -67,7 +64,6 @@ export async function handleSendMayorMessage(c: Context, params: { t */ export async function handleGetMayorStatus(c: Context, params: { townId: string }) { const town = getTownDOStub(c.env, params.townId); - await town.setTownId(params.townId); const status = await town.getMayorStatus(); return c.json(resSuccess(status), 200); } @@ -80,7 +76,6 @@ export async function handleGetMayorStatus(c: Context, params: { tow export async function handleEnsureMayor(c: Context, params: { townId: string }) { console.log(`${MAYOR_HANDLER_LOG} handleEnsureMayor: townId=${params.townId}`); const town = getTownDOStub(c.env, params.townId); - await town.setTownId(params.townId); const result = await town.ensureMayor(); return c.json(resSuccess(result), 200); } @@ -156,7 +151,6 @@ export async function handleSetDashboardContext( } const town = getTownDOStub(c.env, params.townId); - await town.setTownId(params.townId); await town.setDashboardContext(parsed.data.context); return c.json(resSuccess({ stored: true }), 200); } @@ -184,7 +178,6 @@ export async function handleBroadcastUiAction(c: Context, params: { const action = normalizeUiAction(parsed.data.action, params.townId); const town = getTownDOStub(c.env, params.townId); - await town.setTownId(params.townId); // Validate that the referenced rig belongs to this town const rigId = uiActionRigId(action); diff --git a/cloudflare-gastown/src/handlers/org-towns.handler.ts b/cloudflare-gastown/src/handlers/org-towns.handler.ts index 478959671b..a875e8375d 100644 --- a/cloudflare-gastown/src/handlers/org-towns.handler.ts +++ b/cloudflare-gastown/src/handlers/org-towns.handler.ts @@ -100,7 +100,6 @@ export async function handleCreateOrgRig(c: Context, params: { orgId // If this fails, roll back the rig creation to avoid an orphaned record. try { const townDOStub = getTownDOStub(c.env, parsed.data.town_id); - await townDOStub.setTownId(parsed.data.town_id); await townDOStub.configureRig({ rigId: rig.id, townId: parsed.data.town_id, diff --git a/cloudflare-gastown/src/trpc/router.ts b/cloudflare-gastown/src/trpc/router.ts index 45ee0260d0..6d88f34422 100644 --- a/cloudflare-gastown/src/trpc/router.ts +++ b/cloudflare-gastown/src/trpc/router.ts @@ -341,7 +341,6 @@ export const gastownRouter = router({ const ownerStub = ownership.stub; const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); // For org towns, use the town owner's identity for credentials; // for personal towns the caller is always the owner. @@ -583,7 +582,6 @@ export const gastownRouter = router({ } const townStub = getTownDOStub(ctx.env, rig.town_id); - await townStub.setTownId(rig.town_id); return townStub.slingBead({ rigId: rig.id, title: input.title, @@ -609,7 +607,6 @@ export const gastownRouter = router({ await verifyTownOwnership(ctx.env, ctx.userId, input.townId, ctx.orgMemberships); const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); return townStub.sendMayorMessage(input.message, input.model, input.uiContext); }), @@ -619,7 +616,6 @@ export const gastownRouter = router({ .query(async ({ ctx, input }) => { await verifyTownOwnership(ctx.env, ctx.userId, input.townId, ctx.orgMemberships); const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); return townStub.getMayorStatus(); }), @@ -629,7 +625,6 @@ export const gastownRouter = router({ .query(async ({ ctx, input }) => { await verifyTownOwnership(ctx.env, ctx.userId, input.townId, ctx.orgMemberships); const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); return townStub.getAlarmStatus(); }), @@ -666,7 +661,6 @@ export const gastownRouter = router({ } const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); return townStub.ensureMayor(); }), @@ -861,7 +855,6 @@ export const gastownRouter = router({ .mutation(async ({ ctx, input }) => { await verifyTownOwnership(ctx.env, ctx.userId, input.townId, ctx.orgMemberships); const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); await townStub.forceRefreshContainerToken(); }), @@ -1068,8 +1061,6 @@ export const gastownRouter = router({ if (!town) throw new TRPCError({ code: 'NOT_FOUND', message: 'Town not found' }); const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); - // Use the town owner's identity for credentials. Only re-mint the // kilocode token if the caller is the owner (they have their pepper // in ctx). For non-owner members, keep the existing town token. @@ -1201,7 +1192,6 @@ export const gastownRouter = router({ .output(RpcAlarmStatusOutput) .query(async ({ ctx, input }) => { const townStub = getTownDOStub(ctx.env, input.townId); - await townStub.setTownId(input.townId); return townStub.getAlarmStatus(); }), @@ -1231,6 +1221,14 @@ export const gastownRouter = router({ const townStub = getTownDOStub(ctx.env, input.townId); return townStub.getBeadAsync(input.beadId); }), + + // DEBUG: raw agent_metadata dump — remove after debugging + debugAgentMetadata: adminProcedure + .input(z.object({ townId: z.string().uuid() })) + .query(async ({ ctx, input }) => { + const townStub = getTownDOStub(ctx.env, input.townId); + return townStub.debugAgentMetadata(); + }), }); export type GastownRouter = typeof gastownRouter; diff --git a/cloudflare-gastown/test/integration/review-failure.test.ts b/cloudflare-gastown/test/integration/review-failure.test.ts new file mode 100644 index 0000000000..1c17b6ec43 --- /dev/null +++ b/cloudflare-gastown/test/integration/review-failure.test.ts @@ -0,0 +1,222 @@ +import { env } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name = 'test-town') { + const id = env.TOWN.idFromName(name); + return env.TOWN.get(id); +} + +describe('Review failure paths — convoy progress and source bead recovery', () => { + let town: ReturnType; + + beforeEach(() => { + town = getTownStub(`review-failure-${crypto.randomUUID()}`); + }); + + async function setupConvoyWithMR() { + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + + const result = await town.slingConvoy({ + rigId: 'rig-1', + convoyTitle: 'Review Failure Test', + tasks: [{ title: 'Task 1' }], + }); + + const beadId = result.beads[0].bead.bead_id; + const agentId = result.beads[0].agent.id; + + // Simulate agent completing work — creates an MR bead in review queue + await town.agentDone(agentId, { + branch: 'gt/polecat/test-branch', + summary: 'Completed task', + }); + + // Source bead should now be in_review (waiting for refinery) + const sourceBead = await town.getBeadAsync(beadId); + expect(sourceBead?.status).toBe('in_review'); + + // Find the MR bead + const allBeads = await town.listBeads({ type: 'merge_request' }); + const mrBead = allBeads.find(b => b.metadata?.source_bead_id === beadId); + expect(mrBead).toBeTruthy(); + + return { result, beadId, agentId, mrBeadId: mrBead!.bead_id, convoyId: result.convoy.id }; + } + + // ── completeReviewWithResult properly updates convoy progress ─────── + + describe('completeReviewWithResult on MR failure', () => { + it('should return source bead to in_progress when MR bead fails', async () => { + const { beadId, mrBeadId } = await setupConvoyWithMR(); + + // Fail the review via completeReviewWithResult (the fixed path) + await town.completeReviewWithResult({ + entry_id: mrBeadId, + status: 'failed', + message: 'Refinery container failed to start', + }); + + // MR bead should be failed + const mrBead = await town.getBeadAsync(mrBeadId); + expect(mrBead?.status).toBe('failed'); + + // Source bead should be returned to in_progress (not stuck in in_review) + const sourceBead = await town.getBeadAsync(beadId); + expect(sourceBead?.status).toBe('in_progress'); + }); + + it('should update convoy progress when MR bead is merged', async () => { + const { beadId, mrBeadId, convoyId } = await setupConvoyWithMR(); + + // Complete the review successfully + await town.completeReviewWithResult({ + entry_id: mrBeadId, + status: 'merged', + message: 'Merged by refinery', + }); + + // Source bead should be closed + const sourceBead = await town.getBeadAsync(beadId); + expect(sourceBead?.status).toBe('closed'); + + // MR bead should be closed + const mrBead = await town.getBeadAsync(mrBeadId); + expect(mrBead?.status).toBe('closed'); + + // Convoy progress should reflect the closed bead + const convoyStatus = await town.getConvoyStatus(convoyId); + expect(convoyStatus?.closed_beads).toBe(1); + }); + }); + + // ── Multi-bead convoy: failed MR doesn't stall the convoy ────────── + + describe('convoy progress with mixed outcomes', () => { + it('should not stall convoy when one MR fails and another merges', async () => { + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + + const result = await town.slingConvoy({ + rigId: 'rig-1', + convoyTitle: 'Two-Task Convoy', + tasks: [{ title: 'Task 1' }, { title: 'Task 2' }], + }); + + const bead0Id = result.beads[0].bead.bead_id; + const agent0Id = result.beads[0].agent.id; + const bead1Id = result.beads[1].bead.bead_id; + const agent1Id = result.beads[1].agent.id; + + // Both agents complete work + await town.agentDone(agent0Id, { + branch: 'gt/polecat/task-1', + summary: 'Task 1 done', + }); + await town.agentDone(agent1Id, { + branch: 'gt/polecat/task-2', + summary: 'Task 2 done', + }); + + // Find MR beads + const mrBeads = await town.listBeads({ type: 'merge_request' }); + const mr0 = mrBeads.find(b => b.metadata?.source_bead_id === bead0Id); + const mr1 = mrBeads.find(b => b.metadata?.source_bead_id === bead1Id); + expect(mr0).toBeTruthy(); + expect(mr1).toBeTruthy(); + + // Fail MR for task 1 via completeReviewWithResult + await town.completeReviewWithResult({ + entry_id: mr0!.bead_id, + status: 'failed', + message: 'Review failed', + }); + + // Source bead 0 should be back to in_progress (ready for rework) + const source0 = await town.getBeadAsync(bead0Id); + expect(source0?.status).toBe('in_progress'); + + // Merge MR for task 2 + await town.completeReviewWithResult({ + entry_id: mr1!.bead_id, + status: 'merged', + message: 'Merged', + }); + + // Source bead 1 should be closed + const source1 = await town.getBeadAsync(bead1Id); + expect(source1?.status).toBe('closed'); + + // Convoy should show 1 closed bead (task 2 merged; task 1 is in_progress + // awaiting rework, its MR is failed but the source isn't terminal yet) + const convoyStatus = await town.getConvoyStatus(result.convoy.id); + expect(convoyStatus?.closed_beads).toBe(1); + }); + }); + + // ── Direct completeReview leaves source bead orphaned (regression) ─ + + describe('completeReview bypass (regression guard)', () => { + it('should leave source bead stuck in in_review when completeReview is called directly', async () => { + const { beadId, mrBeadId } = await setupConvoyWithMR(); + + // Call completeReview directly (the OLD broken path) — + // this is the raw SQL update that bypasses lifecycle events. + // We use this to verify the regression scenario. + await town.completeReview(mrBeadId, 'failed'); + + // MR bead should be failed + const mrBead = await town.getBeadAsync(mrBeadId); + expect(mrBead?.status).toBe('failed'); + + // Source bead is STILL in_review — this is the bug this PR fixes + // in processReviewQueue. The direct completeReview call doesn't + // return the source bead to in_progress. + const sourceBead = await town.getBeadAsync(beadId); + expect(sourceBead?.status).toBe('in_review'); + }); + }); + + // ── Source bead in_review after agentDone ────────────────────────── + + describe('agentDone transitions source bead to in_review', () => { + it('should set source bead to in_review after polecat calls agentDone', async () => { + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + + const result = await town.slingConvoy({ + rigId: 'rig-1', + convoyTitle: 'Agent Done Test', + tasks: [{ title: 'Single Task' }], + }); + + const beadId = result.beads[0].bead.bead_id; + const agentId = result.beads[0].agent.id; + + await town.agentDone(agentId, { + branch: 'gt/polecat/test', + summary: 'Done', + }); + + const bead = await town.getBeadAsync(beadId); + expect(bead?.status).toBe('in_review'); + + // An MR bead should have been created + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads.length).toBeGreaterThan(0); + expect(mrBeads.some(b => b.metadata?.source_bead_id === beadId)).toBe(true); + }); + }); +}); diff --git a/cloudflare-gastown/wrangler.jsonc b/cloudflare-gastown/wrangler.jsonc index 5644cd45cd..683e4e58d6 100644 --- a/cloudflare-gastown/wrangler.jsonc +++ b/cloudflare-gastown/wrangler.jsonc @@ -5,7 +5,21 @@ "compatibility_date": "2026-02-24", "compatibility_flags": ["nodejs_compat"], "placement": { "mode": "smart" }, - "observability": { "enabled": true }, + "observability": { + "enabled": true, + "head_sampling_rate": 1, + "logs": { + "enabled": true, + "head_sampling_rate": 1, + "persist": true, + "invocation_logs": true, + }, + "traces": { + "enabled": true, + "persist": true, + "head_sampling_rate": 1, + }, + }, "upload_source_maps": true, "version_metadata": { "binding": "CF_VERSION_METADATA" }, "routes": [