From efac6ed2e594dbf78172bf265569171626f647ea Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:52:37 +0000 Subject: [PATCH 1/7] fix(gastown): preserve org billing context across model changes (#1756) Store organizationId as a standalone env var (GASTOWN_ORGANIZATION_ID) that survives KILO_CONFIG_CONTENT rebuilds. Three defense layers: 1. Set GASTOWN_ORGANIZATION_ID on /agents/start and read it first in extractOrganizationId() before falling back to config extraction. 2. Pass organizationId in the PATCH /model request body so the container receives it even if X-Town-Config parsing fails. 3. Include organization_id in buildContainerConfig and sync it to process.env in the PATCH handler's X-Town-Config processing. --- .../container/src/control-server.ts | 17 +++++++++++++++++ .../container/src/process-manager.ts | 6 ++++++ cloudflare-gastown/container/src/types.ts | 2 ++ cloudflare-gastown/src/dos/Town.do.ts | 8 +++++++- cloudflare-gastown/src/dos/town/config.ts | 1 + .../src/dos/town/container-dispatch.ts | 4 +++- 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 72e9dcbb79..6f3b0dd96f 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -93,6 +93,13 @@ function syncTownConfigToProcessEnv(): void { } else { delete process.env.GASTOWN_DISABLE_AI_COAUTHOR; } + + // Keep the standalone env var in sync with the town config so org + // billing context is never lost across model changes. + const orgId = cfg.organization_id; + if (typeof orgId === 'string' && orgId) { + process.env.GASTOWN_ORGANIZATION_ID = orgId; + } } export const app = new Hono(); @@ -216,6 +223,11 @@ app.post('/agents/start', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Persist the organization ID as a standalone env var so it survives + // config rebuilds (e.g. model hot-swap). The env var is the primary + // source of truth; KILO_CONFIG_CONTENT extraction is the fallback. + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? ''; + console.log( `[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}` ); @@ -285,6 +297,11 @@ app.patch('/agents/:agentId/model', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Update org billing context from the request body if provided. + if (parsed.data.organizationId) { + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId; + } + // Sync config-derived env vars from X-Town-Config into process.env so // the SDK server restart picks up fresh tokens and git identity. // The middleware already parsed the header into lastKnownTownConfig. diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 26f49b550f..c9967e1069 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -839,6 +839,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise; diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 85bc47d0b7..399221e05c 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -2425,6 +2425,11 @@ export class TownDO extends DurableObject { // before restarting the SDK server (tokens, git identity, etc.). const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env); + // Resolve townConfig to thread the organization_id into the request body + // (belt-and-suspenders: ensures org billing survives even if X-Town-Config + // header parsing fails on the container side). + const townConfig = await config.getTownConfig(this.ctx.storage); + const updated = await dispatch.updateAgentModelInContainer( this.env, townId, @@ -2432,7 +2437,8 @@ export class TownDO extends DurableObject { model, smallModel, conversationHistory || undefined, - containerConfig + containerConfig, + townConfig.organization_id ); if (updated) { console.log( diff --git a/cloudflare-gastown/src/dos/town/config.ts b/cloudflare-gastown/src/dos/town/config.ts index 6c3c875f72..afc705a400 100644 --- a/cloudflare-gastown/src/dos/town/config.ts +++ b/cloudflare-gastown/src/dos/town/config.ts @@ -159,5 +159,6 @@ export async function buildContainerConfig( disable_ai_coauthor: config.disable_ai_coauthor, kilo_api_url: env.KILO_API_URL ?? '', gastown_api_url: env.GASTOWN_API_URL ?? '', + organization_id: config.organization_id, }; } diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 4932e92a50..65d6266aa4 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -676,7 +676,8 @@ export async function updateAgentModelInContainer( model: string, smallModel?: string, conversationHistory?: string, - containerConfig?: Record + containerConfig?: Record, + organizationId?: string ): Promise { try { const container = getTownContainerStub(env, townId); @@ -691,6 +692,7 @@ export async function updateAgentModelInContainer( model, ...(smallModel ? { smallModel } : {}), ...(conversationHistory ? { conversationHistory } : {}), + ...(organizationId ? { organizationId } : {}), }), }); return response.ok; From 0c942ddb2fe35885332da2060a9a4852b0bce75f Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:49:57 +0000 Subject: [PATCH 2/7] fix(gastown): stop false-positive invariant violations spamming logs every 5s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Invariant 7: Exclude mayors from the 'working agent must have a hook' check — mayors are always working and intentionally hookless. Invariant 5: Widen valid convoy bead states to include in_progress and in_review, which are legitimate transient states while child beads are being worked on. Only flag truly unexpected states. Fixes #1364 --- cloudflare-gastown/src/dos/town/reconciler.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 398b619916..38b1c1cddc 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -2009,6 +2009,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { const violations: Violation[] = []; // Invariant 7: Working agents must have hooks + // Mayors are always 'working' and intentionally have no hook — exclude them. const unhookedWorkers = z .object({ bead_id: z.string() }) .array() @@ -2020,6 +2021,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'working' AND ${agent_metadata.current_hook_bead_id} IS NULL + AND ${agent_metadata.role} != 'mayor' `, [] ), @@ -2031,26 +2033,27 @@ export function checkInvariants(sql: SqlStorage): Violation[] { }); } - // Invariant 5: Convoy beads should not be in_progress - const inProgressConvoys = z - .object({ bead_id: z.string() }) + // Invariant 5: Convoy beads should not be in unexpected states. + // Valid transient states: open, in_progress, in_review, closed. + const badStateConvoys = z + .object({ bead_id: z.string(), status: z.string() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} + SELECT ${beads.bead_id}, ${beads.status} FROM ${beads} WHERE ${beads.type} = 'convoy' - AND ${beads.status} = 'in_progress' + AND ${beads.status} NOT IN ('open', 'in_progress', 'in_review', 'closed') `, [] ), ]); - for (const c of inProgressConvoys) { + for (const c of badStateConvoys) { violations.push({ invariant: 5, - message: `Convoy bead ${c.bead_id} is in_progress (should only be open or closed)`, + message: `Convoy bead ${c.bead_id} is in unexpected state '${c.status}'`, }); } From 09a22c69d21571fef258685b39d46a7f2e9a11e4 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 04:27:56 +0000 Subject: [PATCH 3/7] fix(gastown): prevent MR beads stuck in_progress when github_token missing (#1632) - Fall back to github_cli_pat then platform integration token in checkPRStatus - Track consecutive null poll results; fail MR bead after 10 nulls - Rate-limit PR polling to once per minute per MR bead via last_poll_at metadata - Store failureReason and failureMessage in bead metadata on permanent failure --- cloudflare-gastown/src/dos/Town.do.ts | 19 +++- cloudflare-gastown/src/dos/town/actions.ts | 90 +++++++++++++++++-- cloudflare-gastown/src/dos/town/reconciler.ts | 23 +++-- 3 files changed, 116 insertions(+), 16 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 399221e05c..6e438e28f6 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -4013,9 +4013,24 @@ export class TownDO extends DurableObject { const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/); if (ghMatch) { const [, owner, repo, numberStr] = ghMatch; - const token = townConfig.git_auth.github_token; + // Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration + let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat; if (!token) { - console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`); + // Try resolving from GitHub App installation as final fallback + const integrationId = townConfig.git_auth.platform_integration_id; + if (integrationId && this.env.GIT_TOKEN_SERVICE) { + try { + token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId); + } catch (err) { + console.warn( + `${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`, + err + ); + } + } + } + if (!token) { + console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`); return null; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index d586d47532..397cf001a7 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -291,6 +291,12 @@ export type ApplyActionContext = { const LOG = '[actions]'; +/** Fail MR bead after this many consecutive null poll results (#1632). */ +const PR_POLL_NULL_THRESHOLD = 10; + +/** Minimum interval between PR polls per MR bead (ms) (#1632). */ +export const PR_POLL_INTERVAL_MS = 60_000; // 1 minute + function now(): string { return new Date().toISOString(); } @@ -576,29 +582,48 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'poll_pr': { - // Touch updated_at synchronously so the bead doesn't look stale - // to Rule 4 (orphaned PR review, 30 min timeout). Without this, - // active polling keeps the PR alive but updated_at was set once - // at PR creation and never refreshed, causing a false "orphaned" - // failure after 30 minutes. + // Touch updated_at and record last_poll_at synchronously so the bead + // doesn't look stale to Rule 4 (orphaned PR review, 30 min timeout). + // Without this, active polling keeps the PR alive but updated_at was + // set once at PR creation and never refreshed, causing a false + // "orphaned" failure after 30 minutes. + const timestamp = now(); query( sql, /* sql */ ` UPDATE ${beads} - SET ${beads.columns.updated_at} = ? + SET ${beads.columns.updated_at} = ?, + ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.last_poll_at', ? + ) WHERE ${beads.bead_id} = ? `, - [now(), action.bead_id] + [timestamp, timestamp, action.bead_id] ); return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); if (status && status !== 'open') { + // Successful non-open status — reset null counter and emit event + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', 0 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); ctx.insertEvent('pr_status_changed', { bead_id: action.bead_id, payload: { pr_url: action.pr_url, pr_state: status }, }); +<<<<<<< HEAD return; } @@ -736,6 +761,57 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro ); } } + } else if (status === null) { + // Null result (e.g. no GitHub token) — increment consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', + COALESCE( + json_extract(${beads.columns.metadata}, '$.poll_null_count'), + 0 + ) + 1 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT json_extract(${beads.columns.metadata}, '$.poll_null_count') AS null_count + FROM ${beads} + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ), + ]; + const nullCount = Number(rows[0]?.null_count ?? 0); + if (nullCount >= PR_POLL_NULL_THRESHOLD) { + console.warn( + `${LOG} poll_pr: ${nullCount} consecutive null results for bead=${action.bead_id}, failing` + ); + beadOps.updateBeadStatus(sql, action.bead_id, 'failed', 'system'); + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.failureReason', 'no_github_token', + '$.failureMessage', 'Cannot poll PR status — no GitHub token configured. Please add a token in town settings.' + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + } + } + // status === 'open' — no action needed, poll again next tick } catch (err) { console.warn(`${LOG} poll_pr failed: bead=${action.bead_id} url=${action.pr_url}`, err); } diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 38b1c1cddc..7d3ef58233 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -30,6 +30,7 @@ import * as reviewQueue from './review-queue'; import * as agents from './agents'; import * as beadOps from './beads'; import { getRig } from './rigs'; +import { PR_POLL_INTERVAL_MS } from './actions'; import type { Action } from './actions'; import type { TownEventRecord } from '../../db/tables/town-events.table'; @@ -1046,6 +1047,7 @@ export function reconcileReviewQueue( /* sql */ ` SELECT b.${beads.columns.bead_id}, b.${beads.columns.status}, b.${beads.columns.rig_id}, b.${beads.columns.updated_at}, + b.${beads.columns.metadata}, rm.${review_metadata.columns.pr_url}, b.${beads.columns.assignee_agent_bead_id}, b.${beads.columns.metadata} @@ -1059,14 +1061,21 @@ export function reconcileReviewQueue( ]); for (const mr of mrBeads) { - // Rule 1: PR-strategy MR beads in_progress need polling + // Rule 1: PR-strategy MR beads in_progress need polling. + // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - // Always poll for status changes (merged/closed by human, etc.) - actions.push({ - type: 'poll_pr', - bead_id: mr.bead_id, - pr_url: mr.pr_url, - }); + // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago + const lastPollAt = mr.metadata?.last_poll_at; + const msSinceLastPoll = + typeof lastPollAt === 'string' ? Date.now() - new Date(lastPollAt).getTime() : Infinity; + + if (msSinceLastPoll >= PR_POLL_INTERVAL_MS) { + actions.push({ + type: 'poll_pr', + bead_id: mr.bead_id, + pr_url: mr.pr_url, + }); + } // If auto-merge is pending, also attempt the merge if (mr.metadata?.auto_merge_pending) { actions.push({ From c4219172f8e59d8361a00ea541b16533440e866f Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 04:37:40 +0000 Subject: [PATCH 4/7] fix: format and lint errors in reconciler.ts - Add explicit 'unknown' type annotation to lastPollAt to fix no-unsafe-assignment lint error - Auto-format ternary expression per oxfmt rules --- cloudflare-gastown/src/dos/town/reconciler.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 7d3ef58233..27b1e9f843 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -1064,8 +1064,7 @@ export function reconcileReviewQueue( // Rule 1: PR-strategy MR beads in_progress need polling. // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago - const lastPollAt = mr.metadata?.last_poll_at; + const lastPollAt: unknown = mr.metadata?.last_poll_at; const msSinceLastPoll = typeof lastPollAt === 'string' ? Date.now() - new Date(lastPollAt).getTime() : Infinity; From d85a7266b66d4c970917552262c4400fa87215a2 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:56:26 +0000 Subject: [PATCH 5/7] fix(gastown): prevent duplicate session leak when restarting a starting agent Thread an AbortController through the startAgent() startup sequence so that when a restart is requested for an agent still in 'starting' status, the in-flight startup is cancelled before session.create() can produce an orphaned session. - Add startupAbortController field to ManagedAgent type - In startAgent: abort existing startup controller when agent is 'starting' - Check signal.aborted after each async step (ensureSDKServer, session.create, before session.prompt) - On abort: decrement sessionCount, remove agent entry, clean up SDK instance if no sessions remain - In stopAgent: also abort startupAbortController for 'starting' agents - Introduce StartupAbortedError to distinguish abort from real failures Fixes #1341 --- .../container/src/process-manager.ts | 71 +++++++++++++++++++ cloudflare-gastown/container/src/types.ts | 4 ++ 2 files changed, 75 insertions(+) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index c9967e1069..0b77787283 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -620,6 +620,16 @@ export async function startAgent( console.log( `${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})` ); + + // If the agent is still starting, abort the in-flight startup to prevent + // an orphaned session from being created after stopAgent returns. + if (existing.status === 'starting' && existing.startupAbortController) { + console.log( + `${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}` + ); + existing.startupAbortController.abort(); + } + await stopAgent(request.agentId).catch(err => { console.warn( `${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`, @@ -629,6 +639,7 @@ export async function startAgent( } const now = new Date().toISOString(); + const startupAbortController = new AbortController(); const agent: ManagedAgent = { agentId: request.agentId, rigId: request.rigId, @@ -653,15 +664,22 @@ export async function startAgent( completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null, model: request.model ?? null, startupEnv: env, + startupAbortController, }; agents.set(request.agentId, agent); + const { signal } = startupAbortController; let sessionCounted = false; try { // 1. Ensure SDK server is running for this workdir const { client, port } = await ensureSDKServer(workdir, env); agent.serverPort = port; + // Check if startup was cancelled while waiting for the SDK server + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // Track session count on the SDK instance const instance = sdkInstances.get(workdir); if (instance) { @@ -671,6 +689,13 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); + + // Check if startup was cancelled while creating the session — this is + // the critical window where an orphaned session would leak. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -705,6 +730,11 @@ export async function startAgent( modelParam = { providerID: 'kilo', modelID: request.model }; } + // Final abort check before sending the prompt + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + await client.session.prompt({ path: { id: sessionId }, body: { @@ -722,6 +752,7 @@ export async function startAgent( sessionCounted = false; throw new Error('Event stream failed during initial prompt'); } + agent.startupAbortController = null; agent.messageCount = 1; @@ -735,7 +766,28 @@ export async function startAgent( return agent; } catch (err) { + // On abort, clean up silently — the new startAgent invocation will + // proceed with a fresh entry. + if (err instanceof StartupAbortedError) { + console.log( + `${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up` + ); + if (sessionCounted) { + const instance = sdkInstances.get(workdir); + if (instance) { + instance.sessionCount--; + if (instance.sessionCount <= 0) { + instance.server.close(); + sdkInstances.delete(workdir); + } + } + } + agents.delete(request.agentId); + throw err; + } + agent.status = 'failed'; + agent.startupAbortController = null; agent.exitReason = err instanceof Error ? err.message : String(err); if (sessionCounted) { const instance = sdkInstances.get(workdir); @@ -745,6 +797,18 @@ export async function startAgent( } } +/** + * Thrown when a startup sequence is cancelled via AbortController. + * Distinct from other errors so the catch block can clean up without + * marking the agent as failed (a new startup is taking over). + */ +class StartupAbortedError extends Error { + constructor(agentId: string) { + super(`Startup aborted for agent ${agentId}`); + this.name = 'StartupAbortedError'; + } +} + /** * Stop an agent by aborting its session. */ @@ -753,6 +817,13 @@ export async function stopAgent(agentId: string): Promise { if (!agent) throw new Error(`Agent ${agentId} not found`); if (agent.status !== 'running' && agent.status !== 'starting') return; + // If still starting, abort the in-flight startup so session.create() + // doesn't produce an orphaned session after we return. + if (agent.startupAbortController) { + agent.startupAbortController.abort(); + agent.startupAbortController = null; + } + agent.status = 'stopping'; // Cancel any pending idle timer diff --git a/cloudflare-gastown/container/src/types.ts b/cloudflare-gastown/container/src/types.ts index 309583bee8..a791d9786e 100644 --- a/cloudflare-gastown/container/src/types.ts +++ b/cloudflare-gastown/container/src/types.ts @@ -135,6 +135,10 @@ export type ManagedAgent = { model: string | null; /** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */ startupEnv: Record; + /** AbortController for the in-flight startup sequence. Aborted when a + * restart is requested while the agent is still in 'starting' status, + * preventing orphaned sessions from leaking. */ + startupAbortController: AbortController | null; }; export type AgentStatusResponse = { From fca2b591dfcf1127bad42ca046c2c200eecf273a Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 21:53:47 +0000 Subject: [PATCH 6/7] =?UTF-8?q?fix(gastown):=20address=20review=20comments?= =?UTF-8?q?=20=E2=80=94=20reset=20poll=5Fnull=5Fcount=20on=20all=20non-nul?= =?UTF-8?q?l=20polls,=20abort=20orphaned=20session=20on=20startup=20abort,?= =?UTF-8?q?=20guard=20agents.delete=20with=20identity=20check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../container/src/process-manager.ts | 15 +- cloudflare-gastown/src/dos/town/actions.ts | 223 +++++++++--------- 2 files changed, 122 insertions(+), 116 deletions(-) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 0b77787283..1a24154b1c 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -775,6 +775,17 @@ export async function startAgent( if (sessionCounted) { const instance = sdkInstances.get(workdir); if (instance) { + // Abort the orphaned session if one was created before the abort + if (agent.sessionId) { + try { + await instance.client.session.abort({ path: { id: agent.sessionId } }); + } catch (abortErr) { + console.error( + `${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`, + abortErr + ); + } + } instance.sessionCount--; if (instance.sessionCount <= 0) { instance.server.close(); @@ -782,7 +793,9 @@ export async function startAgent( } } } - agents.delete(request.agentId); + if (agents.get(request.agentId) === agent) { + agents.delete(request.agentId); + } throw err; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index 397cf001a7..5b0a45779b 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -605,8 +605,8 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); - if (status && status !== 'open') { - // Successful non-open status — reset null counter and emit event + if (status !== null) { + // Any non-null result resets the consecutive null counter query( sql, /* sql */ ` @@ -619,149 +619,142 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro `, [action.bead_id] ); - ctx.insertEvent('pr_status_changed', { - bead_id: action.bead_id, - payload: { pr_url: action.pr_url, pr_state: status }, - }); -<<<<<<< HEAD - return; - } + if (status !== 'open') { + ctx.insertEvent('pr_status_changed', { + bead_id: action.bead_id, + payload: { pr_url: action.pr_url, pr_state: status }, + }); + return; + } - // PR is open — check for feedback and auto-merge if configured - const townConfig = await ctx.getTownConfig(); - const refineryConfig = townConfig.refinery; - if (!refineryConfig) return; + // PR is open — check for feedback and auto-merge if configured + const townConfig = await ctx.getTownConfig(); + const refineryConfig = townConfig.refinery; + if (!refineryConfig) return; + + // Auto-resolve PR feedback: detect unresolved comments and failing CI + if (refineryConfig.auto_resolve_pr_feedback) { + const feedback = await ctx.checkPRFeedback(action.pr_url); + if ( + feedback && + (feedback.hasUnresolvedComments || + feedback.hasFailingChecks || + feedback.hasUncheckedRuns) + ) { + const existingFeedback = hasExistingFeedbackBead(sql, action.bead_id); + if (!existingFeedback) { + const prMeta = parsePrUrl(action.pr_url); + const rmRows = z + .object({ branch: z.string() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT ${review_metadata.columns.branch} + FROM ${review_metadata} + WHERE ${review_metadata.bead_id} = ? + `, + [action.bead_id] + ), + ]); + const branch = rmRows[0]?.branch ?? ''; + + ctx.insertEvent('pr_feedback_detected', { + bead_id: action.bead_id, + payload: { + mr_bead_id: action.bead_id, + pr_url: action.pr_url, + pr_number: prMeta?.prNumber ?? 0, + repo: prMeta?.repo ?? '', + branch, + has_unresolved_comments: feedback.hasUnresolvedComments, + has_failing_checks: feedback.hasFailingChecks, + has_unchecked_runs: feedback.hasUncheckedRuns, + }, + }); + } - // Auto-resolve PR feedback: detect unresolved comments and failing CI - if (refineryConfig.auto_resolve_pr_feedback) { - const feedback = await ctx.checkPRFeedback(action.pr_url); + query( + sql, + /* sql */ ` + UPDATE ${review_metadata} + SET ${review_metadata.columns.last_feedback_check_at} = ? + WHERE ${review_metadata.bead_id} = ? + `, + [now(), action.bead_id] + ); + } + } + + // Auto-merge timer: track grace period when everything is green. + // Requires both auto_merge enabled AND a delay configured. if ( - feedback && - (feedback.hasUnresolvedComments || - feedback.hasFailingChecks || - feedback.hasUncheckedRuns) + refineryConfig.auto_merge !== false && + refineryConfig.auto_merge_delay_minutes !== null && + refineryConfig.auto_merge_delay_minutes !== undefined ) { - // Check for existing non-terminal feedback bead to prevent duplicates - const existingFeedback = hasExistingFeedbackBead(sql, action.bead_id); - if (!existingFeedback) { - // Parse PR URL for repo/number metadata - const prMeta = parsePrUrl(action.pr_url); - const rmRows = z - .object({ branch: z.string() }) + const feedback = await ctx.checkPRFeedback(action.pr_url); + if (!feedback) return; + + const allGreen = + !feedback.hasUnresolvedComments && + !feedback.hasFailingChecks && + feedback.allChecksPass; + + if (allGreen) { + const readySinceRows = z + .object({ auto_merge_ready_since: z.string().nullable() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${review_metadata.columns.branch} + SELECT ${review_metadata.columns.auto_merge_ready_since} FROM ${review_metadata} WHERE ${review_metadata.bead_id} = ? `, [action.bead_id] ), ]); - const branch = rmRows[0]?.branch ?? ''; - - ctx.insertEvent('pr_feedback_detected', { - bead_id: action.bead_id, - payload: { - mr_bead_id: action.bead_id, - pr_url: action.pr_url, - pr_number: prMeta?.prNumber ?? 0, - repo: prMeta?.repo ?? '', - branch, - has_unresolved_comments: feedback.hasUnresolvedComments, - has_failing_checks: feedback.hasFailingChecks, - has_unchecked_runs: feedback.hasUncheckedRuns, - }, - }); - } - // Update last_feedback_check_at - query( - sql, - /* sql */ ` - UPDATE ${review_metadata} - SET ${review_metadata.columns.last_feedback_check_at} = ? - WHERE ${review_metadata.bead_id} = ? - `, - [now(), action.bead_id] - ); - } - } + const readySince = readySinceRows[0]?.auto_merge_ready_since; - // Auto-merge timer: track grace period when everything is green. - // Requires both auto_merge enabled AND a delay configured. - if ( - refineryConfig.auto_merge !== false && - refineryConfig.auto_merge_delay_minutes !== null && - refineryConfig.auto_merge_delay_minutes !== undefined - ) { - const feedback = await ctx.checkPRFeedback(action.pr_url); - if (!feedback) return; - - const allGreen = - !feedback.hasUnresolvedComments && - !feedback.hasFailingChecks && - feedback.allChecksPass; - - if (allGreen) { - // Check if timer is already running - const readySinceRows = z - .object({ auto_merge_ready_since: z.string().nullable() }) - .array() - .parse([ - ...query( + if (!readySince) { + query( sql, /* sql */ ` - SELECT ${review_metadata.columns.auto_merge_ready_since} - FROM ${review_metadata} + UPDATE ${review_metadata} + SET ${review_metadata.columns.auto_merge_ready_since} = ? WHERE ${review_metadata.bead_id} = ? `, - [action.bead_id] - ), - ]); - - const readySince = readySinceRows[0]?.auto_merge_ready_since; - - if (!readySince) { - // First tick where everything is green — start the timer + [now(), action.bead_id] + ); + } else { + const elapsed = Date.now() - new Date(readySince).getTime(); + if (elapsed >= refineryConfig.auto_merge_delay_minutes * 60_000) { + ctx.insertEvent('pr_auto_merge', { + bead_id: action.bead_id, + payload: { + mr_bead_id: action.bead_id, + pr_url: action.pr_url, + }, + }); + } + } + } else { query( sql, /* sql */ ` UPDATE ${review_metadata} - SET ${review_metadata.columns.auto_merge_ready_since} = ? + SET ${review_metadata.columns.auto_merge_ready_since} = NULL WHERE ${review_metadata.bead_id} = ? `, - [now(), action.bead_id] + [action.bead_id] ); - } else { - const elapsed = Date.now() - new Date(readySince).getTime(); - if (elapsed >= refineryConfig.auto_merge_delay_minutes * 60_000) { - // Grace period elapsed — emit merge event - ctx.insertEvent('pr_auto_merge', { - bead_id: action.bead_id, - payload: { - mr_bead_id: action.bead_id, - pr_url: action.pr_url, - }, - }); - } } - } else { - // Not all green — reset the timer - query( - sql, - /* sql */ ` - UPDATE ${review_metadata} - SET ${review_metadata.columns.auto_merge_ready_since} = NULL - WHERE ${review_metadata.bead_id} = ? - `, - [action.bead_id] - ); } - } - } else if (status === null) { + } else { // Null result (e.g. no GitHub token) — increment consecutive null counter query( sql, From 82231680b6b495b0b74adef00dcf68315556e14f Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 2 Apr 2026 14:53:22 +0000 Subject: [PATCH 7/7] fix(gastown): store session ID before abort check to prevent orphaned session leak After session.create() resolves, parse and store the session ID on the agent object BEFORE checking signal.aborted. This ensures the catch block has agent.sessionId available to call session.abort(), closing the race window where an abort during session.create() could leak an orphaned session. Also fixes format-check CI by running oxfmt. --- .../container/src/process-manager.ts | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 1a24154b1c..d83c45e69b 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -624,9 +624,7 @@ export async function startAgent( // If the agent is still starting, abort the in-flight startup to prevent // an orphaned session from being created after stopAgent returns. if (existing.status === 'starting' && existing.startupAbortController) { - console.log( - `${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}` - ); + console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`); existing.startupAbortController.abort(); } @@ -690,12 +688,9 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); - // Check if startup was cancelled while creating the session — this is - // the critical window where an orphaned session would leak. - if (signal.aborted) { - throw new StartupAbortedError(request.agentId); - } - + // Parse and store the session ID immediately so the catch block can + // abort an orphaned session if startupAbortController fires during + // the await above. const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -709,6 +704,12 @@ export async function startAgent( const sessionId = parsed.data.id; agent.sessionId = sessionId; + // Now check if startup was cancelled while creating the session. + // agent.sessionId is already set, so the catch block will abort it. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // 3. Subscribe to events (async, runs in background) void subscribeToEvents(client, agent, request); @@ -769,9 +770,7 @@ export async function startAgent( // On abort, clean up silently — the new startAgent invocation will // proceed with a fresh entry. if (err instanceof StartupAbortedError) { - console.log( - `${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up` - ); + console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`); if (sessionCounted) { const instance = sdkInstances.get(workdir); if (instance) {