diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 72e9dcbb79..6f3b0dd96f 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -93,6 +93,13 @@ function syncTownConfigToProcessEnv(): void { } else { delete process.env.GASTOWN_DISABLE_AI_COAUTHOR; } + + // Keep the standalone env var in sync with the town config so org + // billing context is never lost across model changes. + const orgId = cfg.organization_id; + if (typeof orgId === 'string' && orgId) { + process.env.GASTOWN_ORGANIZATION_ID = orgId; + } } export const app = new Hono(); @@ -216,6 +223,11 @@ app.post('/agents/start', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Persist the organization ID as a standalone env var so it survives + // config rebuilds (e.g. model hot-swap). The env var is the primary + // source of truth; KILO_CONFIG_CONTENT extraction is the fallback. + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? ''; + console.log( `[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}` ); @@ -285,6 +297,11 @@ app.patch('/agents/:agentId/model', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Update org billing context from the request body if provided. + if (parsed.data.organizationId) { + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId; + } + // Sync config-derived env vars from X-Town-Config into process.env so // the SDK server restart picks up fresh tokens and git identity. // The middleware already parsed the header into lastKnownTownConfig. diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 26f49b550f..d83c45e69b 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -620,6 +620,14 @@ export async function startAgent( console.log( `${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})` ); + + // If the agent is still starting, abort the in-flight startup to prevent + // an orphaned session from being created after stopAgent returns. + if (existing.status === 'starting' && existing.startupAbortController) { + console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`); + existing.startupAbortController.abort(); + } + await stopAgent(request.agentId).catch(err => { console.warn( `${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`, @@ -629,6 +637,7 @@ export async function startAgent( } const now = new Date().toISOString(); + const startupAbortController = new AbortController(); const agent: ManagedAgent = { agentId: request.agentId, rigId: request.rigId, @@ -653,15 +662,22 @@ export async function startAgent( completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null, model: request.model ?? null, startupEnv: env, + startupAbortController, }; agents.set(request.agentId, agent); + const { signal } = startupAbortController; let sessionCounted = false; try { // 1. Ensure SDK server is running for this workdir const { client, port } = await ensureSDKServer(workdir, env); agent.serverPort = port; + // Check if startup was cancelled while waiting for the SDK server + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // Track session count on the SDK instance const instance = sdkInstances.get(workdir); if (instance) { @@ -671,6 +687,10 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); + + // Parse and store the session ID immediately so the catch block can + // abort an orphaned session if startupAbortController fires during + // the await above. const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -684,6 +704,12 @@ export async function startAgent( const sessionId = parsed.data.id; agent.sessionId = sessionId; + // Now check if startup was cancelled while creating the session. + // agent.sessionId is already set, so the catch block will abort it. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // 3. Subscribe to events (async, runs in background) void subscribeToEvents(client, agent, request); @@ -705,6 +731,11 @@ export async function startAgent( modelParam = { providerID: 'kilo', modelID: request.model }; } + // Final abort check before sending the prompt + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + await client.session.prompt({ path: { id: sessionId }, body: { @@ -722,6 +753,7 @@ export async function startAgent( sessionCounted = false; throw new Error('Event stream failed during initial prompt'); } + agent.startupAbortController = null; agent.messageCount = 1; @@ -735,7 +767,39 @@ export async function startAgent( return agent; } catch (err) { + // On abort, clean up silently — the new startAgent invocation will + // proceed with a fresh entry. + if (err instanceof StartupAbortedError) { + console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`); + if (sessionCounted) { + const instance = sdkInstances.get(workdir); + if (instance) { + // Abort the orphaned session if one was created before the abort + if (agent.sessionId) { + try { + await instance.client.session.abort({ path: { id: agent.sessionId } }); + } catch (abortErr) { + console.error( + `${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`, + abortErr + ); + } + } + instance.sessionCount--; + if (instance.sessionCount <= 0) { + instance.server.close(); + sdkInstances.delete(workdir); + } + } + } + if (agents.get(request.agentId) === agent) { + agents.delete(request.agentId); + } + throw err; + } + agent.status = 'failed'; + agent.startupAbortController = null; agent.exitReason = err instanceof Error ? err.message : String(err); if (sessionCounted) { const instance = sdkInstances.get(workdir); @@ -745,6 +809,18 @@ export async function startAgent( } } +/** + * Thrown when a startup sequence is cancelled via AbortController. + * Distinct from other errors so the catch block can clean up without + * marking the agent as failed (a new startup is taking over). + */ +class StartupAbortedError extends Error { + constructor(agentId: string) { + super(`Startup aborted for agent ${agentId}`); + this.name = 'StartupAbortedError'; + } +} + /** * Stop an agent by aborting its session. */ @@ -753,6 +829,13 @@ export async function stopAgent(agentId: string): Promise { if (!agent) throw new Error(`Agent ${agentId} not found`); if (agent.status !== 'running' && agent.status !== 'starting') return; + // If still starting, abort the in-flight startup so session.create() + // doesn't produce an orphaned session after we return. + if (agent.startupAbortController) { + agent.startupAbortController.abort(); + agent.startupAbortController = null; + } + agent.status = 'stopping'; // Cancel any pending idle timer @@ -839,6 +922,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise; @@ -133,6 +135,10 @@ export type ManagedAgent = { model: string | null; /** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */ startupEnv: Record; + /** AbortController for the in-flight startup sequence. Aborted when a + * restart is requested while the agent is still in 'starting' status, + * preventing orphaned sessions from leaking. */ + startupAbortController: AbortController | null; }; export type AgentStatusResponse = { diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 85bc47d0b7..6e438e28f6 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -2425,6 +2425,11 @@ export class TownDO extends DurableObject { // before restarting the SDK server (tokens, git identity, etc.). const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env); + // Resolve townConfig to thread the organization_id into the request body + // (belt-and-suspenders: ensures org billing survives even if X-Town-Config + // header parsing fails on the container side). + const townConfig = await config.getTownConfig(this.ctx.storage); + const updated = await dispatch.updateAgentModelInContainer( this.env, townId, @@ -2432,7 +2437,8 @@ export class TownDO extends DurableObject { model, smallModel, conversationHistory || undefined, - containerConfig + containerConfig, + townConfig.organization_id ); if (updated) { console.log( @@ -4007,9 +4013,24 @@ export class TownDO extends DurableObject { const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/); if (ghMatch) { const [, owner, repo, numberStr] = ghMatch; - const token = townConfig.git_auth.github_token; + // Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration + let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat; + if (!token) { + // Try resolving from GitHub App installation as final fallback + const integrationId = townConfig.git_auth.platform_integration_id; + if (integrationId && this.env.GIT_TOKEN_SERVICE) { + try { + token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId); + } catch (err) { + console.warn( + `${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`, + err + ); + } + } + } if (!token) { - console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`); + console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`); return null; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index d586d47532..5b0a45779b 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -291,6 +291,12 @@ export type ApplyActionContext = { const LOG = '[actions]'; +/** Fail MR bead after this many consecutive null poll results (#1632). */ +const PR_POLL_NULL_THRESHOLD = 10; + +/** Minimum interval between PR polls per MR bead (ms) (#1632). */ +export const PR_POLL_INTERVAL_MS = 60_000; // 1 minute + function now(): string { return new Date().toISOString(); } @@ -576,166 +582,229 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'poll_pr': { - // Touch updated_at synchronously so the bead doesn't look stale - // to Rule 4 (orphaned PR review, 30 min timeout). Without this, - // active polling keeps the PR alive but updated_at was set once - // at PR creation and never refreshed, causing a false "orphaned" - // failure after 30 minutes. + // Touch updated_at and record last_poll_at synchronously so the bead + // doesn't look stale to Rule 4 (orphaned PR review, 30 min timeout). + // Without this, active polling keeps the PR alive but updated_at was + // set once at PR creation and never refreshed, causing a false + // "orphaned" failure after 30 minutes. + const timestamp = now(); query( sql, /* sql */ ` UPDATE ${beads} - SET ${beads.columns.updated_at} = ? + SET ${beads.columns.updated_at} = ?, + ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.last_poll_at', ? + ) WHERE ${beads.bead_id} = ? `, - [now(), action.bead_id] + [timestamp, timestamp, action.bead_id] ); return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); - if (status && status !== 'open') { - ctx.insertEvent('pr_status_changed', { - bead_id: action.bead_id, - payload: { pr_url: action.pr_url, pr_state: status }, - }); - return; - } + if (status !== null) { + // Any non-null result resets the consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', 0 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + if (status !== 'open') { + ctx.insertEvent('pr_status_changed', { + bead_id: action.bead_id, + payload: { pr_url: action.pr_url, pr_state: status }, + }); + return; + } - // PR is open — check for feedback and auto-merge if configured - const townConfig = await ctx.getTownConfig(); - const refineryConfig = townConfig.refinery; - if (!refineryConfig) return; + // PR is open — check for feedback and auto-merge if configured + const townConfig = await ctx.getTownConfig(); + const refineryConfig = townConfig.refinery; + if (!refineryConfig) return; + + // Auto-resolve PR feedback: detect unresolved comments and failing CI + if (refineryConfig.auto_resolve_pr_feedback) { + const feedback = await ctx.checkPRFeedback(action.pr_url); + if ( + feedback && + (feedback.hasUnresolvedComments || + feedback.hasFailingChecks || + feedback.hasUncheckedRuns) + ) { + const existingFeedback = hasExistingFeedbackBead(sql, action.bead_id); + if (!existingFeedback) { + const prMeta = parsePrUrl(action.pr_url); + const rmRows = z + .object({ branch: z.string() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${review_metadata.columns.branch} + FROM ${review_metadata} + WHERE ${review_metadata.bead_id} = ? + `, + [action.bead_id] + ), + ]); + const branch = rmRows[0]?.branch ?? ''; + + ctx.insertEvent('pr_feedback_detected', { + bead_id: action.bead_id, + payload: { + mr_bead_id: action.bead_id, + pr_url: action.pr_url, + pr_number: prMeta?.prNumber ?? 0, + repo: prMeta?.repo ?? '', + branch, + has_unresolved_comments: feedback.hasUnresolvedComments, + has_failing_checks: feedback.hasFailingChecks, + has_unchecked_runs: feedback.hasUncheckedRuns, + }, + }); + } + + query( + sql, + /* sql */ ` + UPDATE ${review_metadata} + SET ${review_metadata.columns.last_feedback_check_at} = ? + WHERE ${review_metadata.bead_id} = ? + `, + [now(), action.bead_id] + ); + } + } - // Auto-resolve PR feedback: detect unresolved comments and failing CI - if (refineryConfig.auto_resolve_pr_feedback) { - const feedback = await ctx.checkPRFeedback(action.pr_url); + // Auto-merge timer: track grace period when everything is green. + // Requires both auto_merge enabled AND a delay configured. if ( - feedback && - (feedback.hasUnresolvedComments || - feedback.hasFailingChecks || - feedback.hasUncheckedRuns) + refineryConfig.auto_merge !== false && + refineryConfig.auto_merge_delay_minutes !== null && + refineryConfig.auto_merge_delay_minutes !== undefined ) { - // Check for existing non-terminal feedback bead to prevent duplicates - const existingFeedback = hasExistingFeedbackBead(sql, action.bead_id); - if (!existingFeedback) { - // Parse PR URL for repo/number metadata - const prMeta = parsePrUrl(action.pr_url); - const rmRows = z - .object({ branch: z.string() }) + const feedback = await ctx.checkPRFeedback(action.pr_url); + if (!feedback) return; + + const allGreen = + !feedback.hasUnresolvedComments && + !feedback.hasFailingChecks && + feedback.allChecksPass; + + if (allGreen) { + const readySinceRows = z + .object({ auto_merge_ready_since: z.string().nullable() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${review_metadata.columns.branch} + SELECT ${review_metadata.columns.auto_merge_ready_since} FROM ${review_metadata} WHERE ${review_metadata.bead_id} = ? `, [action.bead_id] ), ]); - const branch = rmRows[0]?.branch ?? ''; - - ctx.insertEvent('pr_feedback_detected', { - bead_id: action.bead_id, - payload: { - mr_bead_id: action.bead_id, - pr_url: action.pr_url, - pr_number: prMeta?.prNumber ?? 0, - repo: prMeta?.repo ?? '', - branch, - has_unresolved_comments: feedback.hasUnresolvedComments, - has_failing_checks: feedback.hasFailingChecks, - has_unchecked_runs: feedback.hasUncheckedRuns, - }, - }); - } - // Update last_feedback_check_at - query( - sql, - /* sql */ ` - UPDATE ${review_metadata} - SET ${review_metadata.columns.last_feedback_check_at} = ? - WHERE ${review_metadata.bead_id} = ? - `, - [now(), action.bead_id] - ); - } - } + const readySince = readySinceRows[0]?.auto_merge_ready_since; - // Auto-merge timer: track grace period when everything is green. - // Requires both auto_merge enabled AND a delay configured. - if ( - refineryConfig.auto_merge !== false && - refineryConfig.auto_merge_delay_minutes !== null && - refineryConfig.auto_merge_delay_minutes !== undefined - ) { - const feedback = await ctx.checkPRFeedback(action.pr_url); - if (!feedback) return; - - const allGreen = - !feedback.hasUnresolvedComments && - !feedback.hasFailingChecks && - feedback.allChecksPass; - - if (allGreen) { - // Check if timer is already running - const readySinceRows = z - .object({ auto_merge_ready_since: z.string().nullable() }) - .array() - .parse([ - ...query( + if (!readySince) { + query( sql, /* sql */ ` - SELECT ${review_metadata.columns.auto_merge_ready_since} - FROM ${review_metadata} + UPDATE ${review_metadata} + SET ${review_metadata.columns.auto_merge_ready_since} = ? WHERE ${review_metadata.bead_id} = ? `, - [action.bead_id] - ), - ]); - - const readySince = readySinceRows[0]?.auto_merge_ready_since; - - if (!readySince) { - // First tick where everything is green — start the timer + [now(), action.bead_id] + ); + } else { + const elapsed = Date.now() - new Date(readySince).getTime(); + if (elapsed >= refineryConfig.auto_merge_delay_minutes * 60_000) { + ctx.insertEvent('pr_auto_merge', { + bead_id: action.bead_id, + payload: { + mr_bead_id: action.bead_id, + pr_url: action.pr_url, + }, + }); + } + } + } else { query( sql, /* sql */ ` UPDATE ${review_metadata} - SET ${review_metadata.columns.auto_merge_ready_since} = ? + SET ${review_metadata.columns.auto_merge_ready_since} = NULL WHERE ${review_metadata.bead_id} = ? `, - [now(), action.bead_id] + [action.bead_id] ); - } else { - const elapsed = Date.now() - new Date(readySince).getTime(); - if (elapsed >= refineryConfig.auto_merge_delay_minutes * 60_000) { - // Grace period elapsed — emit merge event - ctx.insertEvent('pr_auto_merge', { - bead_id: action.bead_id, - payload: { - mr_bead_id: action.bead_id, - pr_url: action.pr_url, - }, - }); - } } - } else { - // Not all green — reset the timer + } + } else { + // Null result (e.g. no GitHub token) — increment consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', + COALESCE( + json_extract(${beads.columns.metadata}, '$.poll_null_count'), + 0 + ) + 1 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT json_extract(${beads.columns.metadata}, '$.poll_null_count') AS null_count + FROM ${beads} + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ), + ]; + const nullCount = Number(rows[0]?.null_count ?? 0); + if (nullCount >= PR_POLL_NULL_THRESHOLD) { + console.warn( + `${LOG} poll_pr: ${nullCount} consecutive null results for bead=${action.bead_id}, failing` + ); + beadOps.updateBeadStatus(sql, action.bead_id, 'failed', 'system'); query( sql, /* sql */ ` - UPDATE ${review_metadata} - SET ${review_metadata.columns.auto_merge_ready_since} = NULL - WHERE ${review_metadata.bead_id} = ? + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.failureReason', 'no_github_token', + '$.failureMessage', 'Cannot poll PR status — no GitHub token configured. Please add a token in town settings.' + ) + WHERE ${beads.bead_id} = ? `, [action.bead_id] ); } } + // status === 'open' — no action needed, poll again next tick } catch (err) { console.warn(`${LOG} poll_pr failed: bead=${action.bead_id} url=${action.pr_url}`, err); } diff --git a/cloudflare-gastown/src/dos/town/config.ts b/cloudflare-gastown/src/dos/town/config.ts index 6c3c875f72..afc705a400 100644 --- a/cloudflare-gastown/src/dos/town/config.ts +++ b/cloudflare-gastown/src/dos/town/config.ts @@ -159,5 +159,6 @@ export async function buildContainerConfig( disable_ai_coauthor: config.disable_ai_coauthor, kilo_api_url: env.KILO_API_URL ?? '', gastown_api_url: env.GASTOWN_API_URL ?? '', + organization_id: config.organization_id, }; } diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 4932e92a50..65d6266aa4 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -676,7 +676,8 @@ export async function updateAgentModelInContainer( model: string, smallModel?: string, conversationHistory?: string, - containerConfig?: Record + containerConfig?: Record, + organizationId?: string ): Promise { try { const container = getTownContainerStub(env, townId); @@ -691,6 +692,7 @@ export async function updateAgentModelInContainer( model, ...(smallModel ? { smallModel } : {}), ...(conversationHistory ? { conversationHistory } : {}), + ...(organizationId ? { organizationId } : {}), }), }); return response.ok; diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 398b619916..27b1e9f843 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -30,6 +30,7 @@ import * as reviewQueue from './review-queue'; import * as agents from './agents'; import * as beadOps from './beads'; import { getRig } from './rigs'; +import { PR_POLL_INTERVAL_MS } from './actions'; import type { Action } from './actions'; import type { TownEventRecord } from '../../db/tables/town-events.table'; @@ -1046,6 +1047,7 @@ export function reconcileReviewQueue( /* sql */ ` SELECT b.${beads.columns.bead_id}, b.${beads.columns.status}, b.${beads.columns.rig_id}, b.${beads.columns.updated_at}, + b.${beads.columns.metadata}, rm.${review_metadata.columns.pr_url}, b.${beads.columns.assignee_agent_bead_id}, b.${beads.columns.metadata} @@ -1059,14 +1061,20 @@ export function reconcileReviewQueue( ]); for (const mr of mrBeads) { - // Rule 1: PR-strategy MR beads in_progress need polling + // Rule 1: PR-strategy MR beads in_progress need polling. + // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - // Always poll for status changes (merged/closed by human, etc.) - actions.push({ - type: 'poll_pr', - bead_id: mr.bead_id, - pr_url: mr.pr_url, - }); + const lastPollAt: unknown = mr.metadata?.last_poll_at; + const msSinceLastPoll = + typeof lastPollAt === 'string' ? Date.now() - new Date(lastPollAt).getTime() : Infinity; + + if (msSinceLastPoll >= PR_POLL_INTERVAL_MS) { + actions.push({ + type: 'poll_pr', + bead_id: mr.bead_id, + pr_url: mr.pr_url, + }); + } // If auto-merge is pending, also attempt the merge if (mr.metadata?.auto_merge_pending) { actions.push({ @@ -2009,6 +2017,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { const violations: Violation[] = []; // Invariant 7: Working agents must have hooks + // Mayors are always 'working' and intentionally have no hook — exclude them. const unhookedWorkers = z .object({ bead_id: z.string() }) .array() @@ -2020,6 +2029,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'working' AND ${agent_metadata.current_hook_bead_id} IS NULL + AND ${agent_metadata.role} != 'mayor' `, [] ), @@ -2031,26 +2041,27 @@ export function checkInvariants(sql: SqlStorage): Violation[] { }); } - // Invariant 5: Convoy beads should not be in_progress - const inProgressConvoys = z - .object({ bead_id: z.string() }) + // Invariant 5: Convoy beads should not be in unexpected states. + // Valid transient states: open, in_progress, in_review, closed. + const badStateConvoys = z + .object({ bead_id: z.string(), status: z.string() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} + SELECT ${beads.bead_id}, ${beads.status} FROM ${beads} WHERE ${beads.type} = 'convoy' - AND ${beads.status} = 'in_progress' + AND ${beads.status} NOT IN ('open', 'in_progress', 'in_review', 'closed') `, [] ), ]); - for (const c of inProgressConvoys) { + for (const c of badStateConvoys) { violations.push({ invariant: 5, - message: `Convoy bead ${c.bead_id} is in_progress (should only be open or closed)`, + message: `Convoy bead ${c.bead_id} is in unexpected state '${c.status}'`, }); }