From 21a14d0490a4bbe3bc94533879dfee9ac6548105 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 30 Apr 2026 13:19:39 -0500 Subject: [PATCH 1/8] fix(gastown): point dev GIT_TOKEN_SERVICE binding at git-token-service-dev git-token-service's wrangler env.dev overrides the worker name to 'git-token-service-dev', but gastown's env.dev.services binding was still referencing the base 'git-token-service' name. Wrangler's local dev registry does exact-name matching, so the binding showed as [not connected] whenever both workers were running side by side. Every other consumer in the repo (cloud-agent-next, security-sync, security-auto-analysis) already uses 'git-token-service-dev' in their env.dev block; gastown was the outlier. --- services/gastown/wrangler.jsonc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/gastown/wrangler.jsonc b/services/gastown/wrangler.jsonc index 4cc1410810..e613dc666a 100644 --- a/services/gastown/wrangler.jsonc +++ b/services/gastown/wrangler.jsonc @@ -148,7 +148,7 @@ "services": [ { "binding": "GIT_TOKEN_SERVICE", - "service": "git-token-service", + "service": "git-token-service-dev", "entrypoint": "GitTokenRPCEntrypoint", }, ], From c532f40e5627062a80077b30a2f0d60d0bc8d3b5 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Fri, 1 May 2026 11:22:07 -0500 Subject: [PATCH 2/8] fix(gastown): push new model onto resumed mayor session on hot-swap (#2999) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user changes the mayor's model in town settings, updateAgentModel restarts the SDK server with new KILO_CONFIG_CONTENT and resumes the existing session from kilo.db. Commit 9785570b9 intentionally stopped sending any session.prompt on resume to avoid duplicating the MAYOR_STARTUP_PROMPT, but that also dropped the model param — so the resumed session kept its prior per-session model until the user ran /model manually. Extract the fresh vs. resumed session-prompt logic into applyModelToSession and on resume send a noReply:true prompt carrying only the new model param. This updates the SDK server's per-session model without replaying the startup prompt. Errors on the resume path are swallowed so the hot-swap still succeeds; the SDK server fell back to the config-loaded model at startup, which was already updated. Add container tests covering both fresh and resumed paths. Co-authored-by: John Fawcett --- .../container/src/process-manager.test.ts | 119 ++++++++++++++++++ .../gastown/container/src/process-manager.ts | 91 ++++++++++++-- services/gastown/container/vitest.config.ts | 2 +- 3 files changed, 201 insertions(+), 11 deletions(-) create mode 100644 services/gastown/container/src/process-manager.test.ts diff --git a/services/gastown/container/src/process-manager.test.ts b/services/gastown/container/src/process-manager.test.ts new file mode 100644 index 0000000000..91ffdcce4c --- /dev/null +++ b/services/gastown/container/src/process-manager.test.ts @@ -0,0 +1,119 @@ +import { describe, it, expect, vi } from 'vitest'; + +// Mock heavy imports so the module can be loaded without spinning up +// a real SDK server or hono app. +vi.mock('@kilocode/sdk', () => ({ + createKilo: vi.fn(), +})); +vi.mock('./agent-runner', () => ({ + runAgent: vi.fn(), + buildKiloConfigContent: vi.fn(), + resolveGitCredentials: vi.fn(), + writeMayorSystemPromptToAgentsMd: vi.fn(), +})); +vi.mock('./control-server', () => ({ + getCurrentTownConfig: vi.fn(() => ({})), + getLastAppliedEnvVarKeys: vi.fn(() => new Set()), + RESERVED_ENV_KEYS: new Set(), +})); +vi.mock('./completion-reporter', () => ({ + reportAgentCompleted: vi.fn(), + reportMayorWaiting: vi.fn(), +})); +vi.mock('./token-refresh', () => ({ + refreshTokenIfNearExpiry: vi.fn(), +})); + +const { applyModelToSession } = await import('./process-manager'); + +type PromptCall = { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; +}; + +function makeClient(impl?: (args: PromptCall) => Promise) { + const calls: PromptCall[] = []; + const prompt = vi.fn(async (args: PromptCall) => { + calls.push(args); + if (impl) return impl(args); + return {}; + }); + return { client: { session: { prompt } }, calls, prompt }; +} + +describe('applyModelToSession', () => { + it('sends the startup prompt with the model for a fresh session', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-new' }); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: 'STARTUP PROMPT' }]); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-sonnet-4.6', + }); + expect(calls[0].body.noReply).toBeUndefined(); + }); + + it('pushes the new model with noReply:true for a resumed session without replaying the startup prompt', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT (must not be sent)', + resumedSession: true, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-resumed' }); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-opus-4.7', + }); + expect(calls[0].body.noReply).toBe(true); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: '' }]); + // Ensure the MAYOR_STARTUP_PROMPT is NOT replayed on resume. + expect(calls[0].body.parts[0].text).not.toContain('STARTUP PROMPT'); + }); + + it('swallows errors from the resumed-session prompt so the hot-swap can continue', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + // Should not throw — errors on the noReply path are logged and ignored. + await expect( + applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT', + resumedSession: true, + }) + ).resolves.toBeUndefined(); + }); + + it('propagates errors for a fresh session (so the hot-swap can roll back)', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + await expect( + applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }) + ).rejects.toThrow('simulated SDK failure'); + }); +}); diff --git a/services/gastown/container/src/process-manager.ts b/services/gastown/container/src/process-manager.ts index 2c58efa795..33a37fa29a 100644 --- a/services/gastown/container/src/process-manager.ts +++ b/services/gastown/container/src/process-manager.ts @@ -1815,6 +1815,80 @@ export async function refreshTokenForAllAgents(): Promise< return Promise.all(snapshot.map(restartAgent)); } +/** + * Minimal shape of `client.session` needed by {@link applyModelToSession}. + * Defined structurally so tests can pass a fake without pulling in the + * whole KiloClient type. + */ +type SessionPromptClient = { + session: { + prompt: (args: { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; + }) => Promise; + }; +}; + +/** + * Push a model selection onto a mayor session. + * + * For a freshly created session, sends the startup prompt together with + * the model param so the first turn runs the configured model. + * + * For a resumed session the startup prompt MUST NOT be replayed (it + * would recreate the duplicate turn regression fixed by 9785570b9), + * but the per-session model on the SDK server still needs to be updated + * so the next user turn uses the newly-selected model. We do this by + * sending a `noReply: true` prompt that carries only the model param; + * the SDK treats this as a state update and does not trigger the model. + * + * Errors on the resumed path are swallowed: if pushing the model fails, + * the mayor falls back to whichever model the SDK server loaded from + * KILO_CONFIG_CONTENT at startup, which we have already updated. + */ +export async function applyModelToSession(params: { + client: SessionPromptClient; + sessionId: string; + model: string; + prompt: string; + resumedSession: boolean; +}): Promise { + const { client, sessionId, model, prompt, resumedSession } = params; + const modelParam = { providerID: 'kilo', modelID: model }; + if (!resumedSession) { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: prompt }], + model: modelParam, + }, + }); + return; + } + try { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: '' }], + model: modelParam, + noReply: true, + }, + }); + console.log( + `${MANAGER_LOG} updateAgentModel: pushed model=${model} to resumed session ${sessionId}` + ); + } catch (err) { + console.warn( + `${MANAGER_LOG} updateAgentModel: failed to push model to resumed session ${sessionId}:`, + err + ); + } +} + /** * Update the model for a running agent by restarting its SDK server with * new KILO_CONFIG_CONTENT. The kilo serve child process reads the model @@ -1958,16 +2032,13 @@ export async function updateAgentModel( const prompt = conversationHistory ? `${conversationHistory}\n\n${MAYOR_STARTUP_PROMPT}` : MAYOR_STARTUP_PROMPT; - if (!resumedSession) { - const modelParam = { providerID: 'kilo', modelID: model }; - await client.session.prompt({ - path: { id: agent.sessionId }, - body: { - parts: [{ type: 'text', text: prompt }], - model: modelParam, - }, - }); - } + await applyModelToSession({ + client, + sessionId: agent.sessionId, + model, + prompt, + resumedSession, + }); agent.messageCount = 1; // 6. New server is healthy — now tear down the old one. diff --git a/services/gastown/container/vitest.config.ts b/services/gastown/container/vitest.config.ts index 468ee375fe..32b052a10d 100644 --- a/services/gastown/container/vitest.config.ts +++ b/services/gastown/container/vitest.config.ts @@ -3,6 +3,6 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { globals: false, - include: ['plugin/**/*.test.ts'], + include: ['plugin/**/*.test.ts', 'src/**/*.test.ts'], }, }); From db58406603e6278b53484928f5fe0eae9bb2f76f Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 5 May 2026 12:11:55 -0500 Subject: [PATCH 3/8] fix(gastown): stop reconciler log spam from orphaned bead_cancelled events (#3047) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent bugs compose to flood production logs every alarm tick with 'Bead not found' errors: 1. deleteBead / deleteBeads did not clean up the town_events queue, leaving bead_cancelled and container_status rows pointing at deleted beads/agents. 2. applyEvent threw on missing beads and the drain loop never marked the failing event processed — so it retried forever. Fix 1: purge town_events rows (by bead_id OR agent_id, since agents are beads) from deleteBead and the deleteBeads bulk path. Fix 2a: reconciler.applyEvent('bead_cancelled') checks for the target bead up front and returns (with a warn) when it's missing, instead of throwing. Fix 2b: the Town.do.ts drain loop recognises 'Bead/Agent not found' terminal errors, logs them at warn, and marks the offending event processed so it stops retrying. Adds debug RPCs (debugTownEvents, debugInsertTownEvent, debugRecordContainerStatus) and integration coverage in event-cleanup.test.ts. Co-authored-by: John Fawcett --- services/gastown/src/dos/Town.do.ts | 87 ++++++++- services/gastown/src/dos/town/beads.ts | 22 +++ services/gastown/src/dos/town/reconciler.ts | 11 ++ .../test/integration/event-cleanup.test.ts | 183 ++++++++++++++++++ 4 files changed, 293 insertions(+), 10 deletions(-) create mode 100644 services/gastown/test/integration/event-cleanup.test.ts diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index bdbaf03575..a6be9aecdb 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -47,7 +47,11 @@ import { agent_metadata } from '../db/tables/agent-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; -import { town_events, TownEventRecord } from '../db/tables/town-events.table'; +import { + town_events, + TownEventRecord, + type TownEventType, +} from '../db/tables/town-events.table'; import { agent_nudges, AgentNudgeRecord, @@ -3896,15 +3900,28 @@ export class TownDO extends DurableObject { reconciler.applyEvent(this.sql, event, { townConfig }); events.markProcessed(this.sql, event.event_id); } catch (err) { - logger.error('reconciler: applyEvent failed', { - eventId: event.event_id, - eventType: event.event_type, - error: err instanceof Error ? err.message : String(err), - }); - // Event stays unprocessed — will be retried on the next alarm tick. - // Mark it processed anyway after 3 consecutive failures to prevent - // a poison event from blocking the entire queue forever. - // For now, we skip it and let the next tick retry. + const message = err instanceof Error ? err.message : String(err); + // Terminal errors referencing a missing bead/agent can never + // succeed on retry — mark them processed so the drain loop + // stops re-running them every alarm tick. + const isMissingEntity = + err instanceof Error && + /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); + if (isMissingEntity) { + logger.warn('reconciler: applyEvent skipped (missing entity)', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + events.markProcessed(this.sql, event.event_id); + } else { + logger.error('reconciler: applyEvent failed', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + // Event stays unprocessed — will be retried on the next alarm tick. + } } } } catch (err) { @@ -5161,6 +5178,56 @@ export class TownDO extends DurableObject { ]; } + async debugTownEvents(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${town_events.event_id}, + ${town_events.event_type}, + ${town_events.agent_id}, + ${town_events.bead_id}, + ${town_events.processed_at} + FROM ${town_events} + ORDER BY ${town_events.created_at} ASC + `, + [] + ), + ]; + } + + /** + * Test-only helper: directly insert a row into the town_events queue + * without going through the producer APIs. Used to reproduce orphan + * events (referencing deleted beads/agents) in tests. + */ + async debugInsertTownEvent(input: { + event_type: TownEventType; + agent_id?: string | null; + bead_id?: string | null; + payload?: Record; + }): Promise { + const eventId = events.insertEvent(this.sql, input.event_type, { + agent_id: input.agent_id ?? null, + bead_id: input.bead_id ?? null, + payload: input.payload ?? {}, + }); + await this.armAlarmIfNeeded(); + return eventId; + } + + /** + * Test-only helper: insert a container_status event for a given agent. + * Mirrors the container observer's upsert so tests can verify that + * deleteBead sweeps agent-keyed events. + */ + async debugRecordContainerStatus( + agentId: string, + payload: { status: string; exit_reason?: string | null } + ): Promise { + events.upsertContainerStatus(this.sql, agentId, payload); + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/services/gastown/src/dos/town/beads.ts b/services/gastown/src/dos/town/beads.ts index a3faa75437..945679cf1f 100644 --- a/services/gastown/src/dos/town/beads.ts +++ b/services/gastown/src/dos/town/beads.ts @@ -41,6 +41,7 @@ import { createTableConvoyMetadata, migrateConvoyMetadata, } from '../../db/tables/convoy-metadata.table'; +import { town_events } from '../../db/tables/town-events.table'; import { query } from '../../util/query.util'; import type { CreateBeadInput, @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo beadId, ]); + // Remove any pending/processed reconciler events targeting this bead or + // this agent (agents are themselves beads, so deleteBead is used for both). + // Without this, bead_cancelled / container_status / … events that reference + // a deleted bead make applyEvent throw forever on every alarm tick. + query( + sql, + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`, + [beadId, beadId] + ); + query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]); return true; } @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string): ...allIdsArr ); + // Remove any reconciler events referencing these beads/agents. See + // deleteBead above for rationale. + sql.exec( + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} IN (${placeholders}) + OR ${town_events.agent_id} IN (${placeholders})`, + ...allIdsArr, + ...allIdsArr + ); + // Delete the beads themselves sql.exec( /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`, diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index 8e9b1d4671..f7a62d0cf3 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -289,6 +289,17 @@ export function applyEvent( console.warn(`${LOG} applyEvent: bead_cancelled missing bead_id`); return; } + // Tolerate the bead having been deleted after the event was enqueued. + // Without this guard updateBeadStatus throws `Bead not found`, + // the drain loop can't mark the event processed, and the error + // recurs on every alarm tick forever. + const existing = beadOps.getBead(sql, event.bead_id); + if (!existing) { + console.warn( + `${LOG} applyEvent: bead_cancelled target bead ${event.bead_id} no longer exists — skipping` + ); + return; + } const cancelStatus = payload.cancel_status === 'closed' || payload.cancel_status === 'failed' ? payload.cancel_status diff --git a/services/gastown/test/integration/event-cleanup.test.ts b/services/gastown/test/integration/event-cleanup.test.ts new file mode 100644 index 0000000000..0d964e44c9 --- /dev/null +++ b/services/gastown/test/integration/event-cleanup.test.ts @@ -0,0 +1,183 @@ +/** + * Tests for the "orphaned bead_cancelled events retried forever" bug. + * + * Two independent fixes compose to eliminate the failure: + * Fix 1: deleteBead / deleteBeads purge town_events rows that reference + * the deleted bead (by bead_id or agent_id), so the drain loop + * never sees them. + * Fix 2a: reconciler.applyEvent('bead_cancelled') tolerates the bead + * being missing (returns early, logs warn — does not throw). + * Fix 2b: the Town.do.ts drain loop recognises "Bead/Agent ... not + * found" terminal errors and marks the offending event + * processed so it is not retried forever. + */ + +import { env, runDurableObjectAlarm } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name: string) { + return env.TOWN.get(env.TOWN.idFromName(name)); +} + +describe('town_events cleanup on bead deletion (#fix-1)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evcleanup-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('deleteBead removes pending town_events referencing the bead by bead_id', async () => { + const bead = await town.createBead({ + type: 'issue', + title: 'To be deleted', + rig_id: 'rig-1', + }); + + // Transitioning to a terminal status enqueues a bead_cancelled event. + await town.updateBeadStatus(bead.bead_id, 'failed', 'system'); + + const pendingBefore = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + processed_at: string | null; + }>; + expect( + pendingBefore.filter(e => e.bead_id === bead.bead_id && e.processed_at === null).length + ).toBeGreaterThan(0); + + await town.deleteBead(bead.bead_id); + + const pendingAfter = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + agent_id: string | null; + }>; + expect( + pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id) + ).toBe(false); + }); + + it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P1', + identity: `ev-agent-${townName}`, + rig_id: 'rig-1', + }); + + // Upsert a container_status event keyed by agent_id — this is the shape + // of events that hang off an agent's bead row. + await town.debugRecordContainerStatus(agent.id, { status: 'running' }); + + const beforeRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(beforeRows.some(e => e.agent_id === agent.id)).toBe(true); + + // deleteBead is used for agents too (agents are beads). + await town.deleteBead(agent.id); + + const afterRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(afterRows.some(e => e.agent_id === agent.id)).toBe(false); + }); + + it('deleteBeads bulk path removes events for every deleted bead', async () => { + const a = await town.createBead({ type: 'issue', title: 'A', rig_id: 'rig-1' }); + const b = await town.createBead({ type: 'issue', title: 'B', rig_id: 'rig-1' }); + + await town.updateBeadStatus(a.bead_id, 'failed', 'system'); + await town.updateBeadStatus(b.bead_id, 'failed', 'system'); + + const before = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(before.filter(e => e.bead_id === a.bead_id).length).toBeGreaterThan(0); + expect(before.filter(e => e.bead_id === b.bead_id).length).toBeGreaterThan(0); + + await town.deleteBeads([a.bead_id, b.bead_id]); + + const after = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(after.some(e => e.bead_id === a.bead_id)).toBe(false); + expect(after.some(e => e.bead_id === b.bead_id)).toBe(false); + }); +}); + +describe('applyEvent tolerance + drain loop marks missing-entity events processed (#fix-2)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evtolerate-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('drain loop marks a bead_cancelled event processed when the bead is gone', async () => { + // Simulate the historical orphan: enqueue a bead_cancelled event whose + // bead has been deleted (or never existed). Before Fix 2, applyEvent + // would throw `Bead not found` forever on every alarm tick. + await town.debugInsertTownEvent({ + event_type: 'bead_cancelled', + bead_id: '00000000-0000-4000-8000-000000000001', + payload: { cancel_status: 'failed' }, + }); + + const beforeDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphan = beforeDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + expect(orphan?.processed_at).toBeNull(); + + await runDurableObjectAlarm(town); + + // After the alarm, the orphan event should be processed — not retried. + const afterDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = afterDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + // If retention GC already pruned it, that's also acceptable — the key + // invariant is that it is no longer pending. + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); + + it('drain loop marks an agent-missing event processed too', async () => { + await town.debugInsertTownEvent({ + event_type: 'agent_done', + agent_id: '00000000-0000-4000-8000-0000000000aa', + payload: { branch: 'gt/ghost' }, + }); + + await runDurableObjectAlarm(town); + + const after = (await town.debugTownEvents()) as Array<{ + event_type: string; + agent_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = after.find( + e => e.event_type === 'agent_done' && e.agent_id === '00000000-0000-4000-8000-0000000000aa' + ); + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); +}); From 47e95a8b83fe750b563ffab9676ce6bed77eda99 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 5 May 2026 15:23:07 -0500 Subject: [PATCH 4/8] feat(gastown-container): add crash visibility + per-agent start mutex (#3055) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(gastown-container): add crash visibility + per-agent start mutex Diagnostic changes to investigate frequent container restarts for town 4d82f099-ccb7-4eaf-8676-73562e0a27eb (~1.5–2 min boot-hydration loops). - main.ts: add unhandledRejection listener that logs full error/stack without exiting (Bun/Node silently drop rejections without a handler, making fire-and-forget failures like void saveDbSnapshot()/void subscribeToEvents() invisible). Include uptime and active-agent count for correlation. - main.ts: improve uncaughtException log with name/uptime/agent count. - main.ts: 30s periodic container.memory_usage log (rss/heap/external) so OOM-class failures (external SIGKILL from Cloudflare Containers runtime when the memory ceiling is hit) become observable — these leave no exception behind. - main.ts: wrap bootHydration() in try/catch so a rare synchronous throw before the first await doesn't crash the process. - process-manager.ts: add per-agentId mutex for startAgent. Production logs show two /agents/start requests for the same agentId logged at the same millisecond; both pass the re-entrancy check before either commits a 'starting' record, then race on startupAbortController, session creation, idle timers, and SDK sessionCount. Serialising per agentId makes the re-entrant path observe a consistent snapshot. - process-manager.test.ts: three tests for the mutex — same-id serialisation, different-id concurrency, lock release on throw. * fix(container): replace Promise.withResolvers with explicit new Promise Promise.withResolvers is a newer API not available on older Bun runtimes. Since process-manager.ts is imported during container startup, a missing global would throw before crash handlers are registered and prevent the control server from starting. Use the same explicit new Promise pattern as the existing sdkServerLock. * feat(gastown/container): include townId in crash and memory logs Per review feedback, attach the container's GASTOWN_TOWN_ID to unhandled_rejection, uncaught_exception, cold_start, memory_usage, and boot_hydration_failed log entries so production crash logs can be correlated with a specific town without needing to also have an agent registered. --------- Co-authored-by: John Fawcett --- services/gastown/container/src/main.ts | 88 ++++++++++++++++++- .../container/src/process-manager.test.ts | 63 ++++++++++++- .../gastown/container/src/process-manager.ts | 52 +++++++++++ 3 files changed, 198 insertions(+), 5 deletions(-) diff --git a/services/gastown/container/src/main.ts b/services/gastown/container/src/main.ts index 5b12a1aa19..e351a91731 100644 --- a/services/gastown/container/src/main.ts +++ b/services/gastown/container/src/main.ts @@ -1,11 +1,54 @@ import { startControlServer } from './control-server'; import { log } from './logger'; -import { bootHydration, getUptime } from './process-manager'; +import { activeAgentCount, bootHydration, getUptime, listAgents } from './process-manager'; -log.info('container.cold_start', { uptime: getUptime(), ts: new Date().toISOString() }); +// Container-scoped identifiers for crash/diagnostic logs. The container is +// pinned to a single town for its lifetime (see GASTOWN_TOWN_ID injection in +// the deployer), so reading these once at module init is safe and lets us +// emit them even when no agents are registered yet. +const TOWN_ID = process.env.GASTOWN_TOWN_ID ?? null; + +log.info('container.cold_start', { + uptime: getUptime(), + ts: new Date().toISOString(), + townId: TOWN_ID, +}); + +// Bun (like Node) will ignore unhandled promise rejections unless a handler +// is registered. Without this handler a rejection in a fire-and-forget path +// (e.g. `void saveDbSnapshot(...)`, `void subscribeToEvents(...)`, +// `setInterval(() => void fn())`) is effectively invisible — making the +// root cause of container crashes impossible to diagnose from logs. +// +// We deliberately DO NOT call process.exit here: visibility is the goal. +// If a specific rejection turns out to be fatal state corruption we can +// escalate it individually. +process.on('unhandledRejection', (reason, promise) => { + const err = + reason instanceof Error + ? { message: reason.message, stack: reason.stack, name: reason.name } + : { message: String(reason) }; + log.error('container.unhandled_rejection', { + ...err, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + promise: String(promise), + }); +}); process.on('uncaughtException', err => { - log.error('container.uncaught_exception', { error: err.message, stack: err.stack }); + log.error('container.uncaught_exception', { + message: err.message, + stack: err.stack, + name: err.name, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + }); + // Keep the existing fatal behaviour for truly uncaught synchronous errors. + // An unhandled rejection is handled separately above without exit so we + // can observe the crash class before deciding whether to remain fatal. process.exit(1); }); @@ -13,8 +56,45 @@ process.on('SIGTERM', () => { console.log('SIGTERM received — starting graceful drain...'); }); +// Periodically log RSS memory so we can correlate OOM-class failures +// (external SIGKILL from Cloudflare Containers runtime when a memory +// ceiling is hit) with steady-state memory growth. 30s cadence matches +// the heartbeat interval and is cheap. +const MEMORY_LOG_INTERVAL_MS = 30_000; +setInterval(() => { + try { + const mem = process.memoryUsage(); + log.info('container.memory_usage', { + rssMB: Math.round(mem.rss / 1024 / 1024), + heapUsedMB: Math.round(mem.heapUsed / 1024 / 1024), + heapTotalMB: Math.round(mem.heapTotal / 1024 / 1024), + externalMB: Math.round(mem.external / 1024 / 1024), + townId: TOWN_ID, + uptimeMs: getUptime(), + agents: listAgents().length, + activeAgents: activeAgentCount(), + }); + } catch (err) { + log.warn('container.memory_usage_failed', { + error: err instanceof Error ? err.message : String(err), + }); + } +}, MEMORY_LOG_INTERVAL_MS); + startControlServer(); void (async () => { - await bootHydration(); + try { + await bootHydration(); + } catch (err) { + // bootHydration has its own try/catch for the registry fetch path but + // the inner startAgent loop can still throw on rare synchronous errors + // before its first await. Log rather than crash so the next /agents/start + // request can recover. + log.error('container.boot_hydration_failed', { + message: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + townId: TOWN_ID, + }); + } })(); diff --git a/services/gastown/container/src/process-manager.test.ts b/services/gastown/container/src/process-manager.test.ts index 91ffdcce4c..d2c8906c2a 100644 --- a/services/gastown/container/src/process-manager.test.ts +++ b/services/gastown/container/src/process-manager.test.ts @@ -24,7 +24,7 @@ vi.mock('./token-refresh', () => ({ refreshTokenIfNearExpiry: vi.fn(), })); -const { applyModelToSession } = await import('./process-manager'); +const { applyModelToSession, withStartAgentLock } = await import('./process-manager'); type PromptCall = { path: { id: string }; @@ -117,3 +117,64 @@ describe('applyModelToSession', () => { ).rejects.toThrow('simulated SDK failure'); }); }); + +describe('withStartAgentLock', () => { + it('serialises concurrent callers for the same agentId', async () => { + const order: string[] = []; + let secondStartedBeforeFirstFinished = false; + + // Fire both in the same microtask so they race on the lock. + const first = withStartAgentLock('agent-1', async () => { + order.push('first:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('first:end'); + return 1; + }); + const second = withStartAgentLock('agent-1', async () => { + // If the lock works, `first:end` has already been pushed. + if (!order.includes('first:end')) { + secondStartedBeforeFirstFinished = true; + } + order.push('second:start'); + order.push('second:end'); + return 2; + }); + + const [r1, r2] = await Promise.all([first, second]); + expect(r1).toBe(1); + expect(r2).toBe(2); + expect(secondStartedBeforeFirstFinished).toBe(false); + expect(order).toEqual(['first:start', 'first:end', 'second:start', 'second:end']); + }); + + it('runs concurrently for different agentIds', async () => { + const order: string[] = []; + + const a = withStartAgentLock('agent-a', async () => { + order.push('a:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('a:end'); + }); + const b = withStartAgentLock('agent-b', async () => { + order.push('b:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('b:end'); + }); + + await Promise.all([a, b]); + + // Both should have started before either ended (no serialisation across ids). + expect(order.indexOf('b:start')).toBeLessThan(order.indexOf('a:end')); + }); + + it('releases the lock when the fn throws so subsequent callers can proceed', async () => { + await expect( + withStartAgentLock('agent-err', async () => { + throw new Error('boom'); + }) + ).rejects.toThrow('boom'); + + const result = await withStartAgentLock('agent-err', async () => 'ok'); + expect(result).toBe('ok'); + }); +}); diff --git a/services/gastown/container/src/process-manager.ts b/services/gastown/container/src/process-manager.ts index 33a37fa29a..02eb63a0f7 100644 --- a/services/gastown/container/src/process-manager.ts +++ b/services/gastown/container/src/process-manager.ts @@ -71,6 +71,47 @@ export function isDraining(): boolean { // once created, the SDK instance is reused without locking. let sdkServerLock: Promise = Promise.resolve(); +// Per-agentId mutex for startAgent. Without this, two concurrent POST +// /agents/start calls for the same agentId (observed in production: two +// `[control-server] /agents/start:` log lines at the same millisecond) +// both pass the re-entrancy check at the top of startAgent before either +// has committed a 'starting' record. The second invocation aborts the +// first's startupAbortController and both paths race on session creation, +// idle timers, and SDK instance reference counts — leaving the agent in +// an inconsistent state (orphaned sessions, leaked sessionCount, etc). +// +// Serialising per agentId means the second caller waits for the first to +// complete (or abort) before proceeding, and then observes a consistent +// snapshot in `agents.get(agentId)`. +const startAgentLocks = new Map>(); + +// Exported for tests that exercise the locking behaviour directly without +// bringing up the whole SDK/process harness. Production callers should use +// `startAgent` (which wraps `startAgentImpl` with this lock). +export async function withStartAgentLock(agentId: string, fn: () => Promise): Promise { + const previous = startAgentLocks.get(agentId) ?? Promise.resolve(); + // Use the same explicit `new Promise` pattern as `sdkServerLock` above + // instead of `Promise.withResolvers`, which is not available on older + // Bun runtimes. This module is imported during container startup, so a + // missing global here would throw before the crash handlers are + // registered and prevent the control server from starting. + let releaseLock!: () => void; + const lockPromise = new Promise(resolve => { + releaseLock = resolve; + }); + startAgentLocks.set(agentId, lockPromise); + try { + await previous.catch(() => {}); + return await fn(); + } finally { + releaseLock(); + // Only clear the slot if no newer caller has queued behind us. + if (startAgentLocks.get(agentId) === lockPromise) { + startAgentLocks.delete(agentId); + } + } +} + export function getUptime(): number { return Date.now() - startTime; } @@ -1005,11 +1046,22 @@ async function subscribeToEvents( /** * Start an agent: ensure SDK server, create session, subscribe to events, * send initial prompt. + * + * Serialises concurrent callers for the same agentId so the re-entrancy + * handling inside `startAgentImpl` observes a consistent snapshot. */ export async function startAgent( request: StartAgentRequest, workdir: string, env: Record +): Promise { + return withStartAgentLock(request.agentId, () => startAgentImpl(request, workdir, env)); +} + +async function startAgentImpl( + request: StartAgentRequest, + workdir: string, + env: Record ): Promise { const existing = agents.get(request.agentId); if (existing && (existing.status === 'running' || existing.status === 'starting')) { From 0ebac0f1fe433f033e7b25c2ba4a8496d237c8b2 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 6 May 2026 10:46:39 -0500 Subject: [PATCH 5/8] chore(gastown): fix format and lint CI failures (#3072) chore(gastown): fix format and lint CI failures on staging Co-authored-by: John Fawcett --- services/gastown/container/src/main.ts | 2 +- services/gastown/src/dos/Town.do.ts | 9 ++------- services/gastown/test/integration/event-cleanup.test.ts | 6 +++--- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/services/gastown/container/src/main.ts b/services/gastown/container/src/main.ts index e351a91731..8c738965cb 100644 --- a/services/gastown/container/src/main.ts +++ b/services/gastown/container/src/main.ts @@ -33,7 +33,7 @@ process.on('unhandledRejection', (reason, promise) => { townId: TOWN_ID, uptimeMs: getUptime(), activeAgents: activeAgentCount(), - promise: String(promise), + event: 'unhandled_rejection', }); }); diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index a6be9aecdb..2d807aef4d 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -47,11 +47,7 @@ import { agent_metadata } from '../db/tables/agent-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; -import { - town_events, - TownEventRecord, - type TownEventType, -} from '../db/tables/town-events.table'; +import { town_events, TownEventRecord, type TownEventType } from '../db/tables/town-events.table'; import { agent_nudges, AgentNudgeRecord, @@ -3905,8 +3901,7 @@ export class TownDO extends DurableObject { // succeed on retry — mark them processed so the drain loop // stops re-running them every alarm tick. const isMissingEntity = - err instanceof Error && - /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); + err instanceof Error && /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); if (isMissingEntity) { logger.warn('reconciler: applyEvent skipped (missing entity)', { eventId: event.event_id, diff --git a/services/gastown/test/integration/event-cleanup.test.ts b/services/gastown/test/integration/event-cleanup.test.ts index 0d964e44c9..bb4f3d3b3c 100644 --- a/services/gastown/test/integration/event-cleanup.test.ts +++ b/services/gastown/test/integration/event-cleanup.test.ts @@ -59,9 +59,9 @@ describe('town_events cleanup on bead deletion (#fix-1)', () => { bead_id: string | null; agent_id: string | null; }>; - expect( - pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id) - ).toBe(false); + expect(pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id)).toBe( + false + ); }); it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => { From e20d04f7d0060c3a8b2128a953362082cd7d75b0 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 6 May 2026 10:56:53 -0500 Subject: [PATCH 6/8] chore(gastown): drop unused promise param from unhandledRejection handler (#3074) Co-authored-by: John Fawcett --- services/gastown/container/src/main.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/gastown/container/src/main.ts b/services/gastown/container/src/main.ts index 8c738965cb..4abff0666e 100644 --- a/services/gastown/container/src/main.ts +++ b/services/gastown/container/src/main.ts @@ -23,7 +23,7 @@ log.info('container.cold_start', { // We deliberately DO NOT call process.exit here: visibility is the goal. // If a specific rejection turns out to be fatal state corruption we can // escalate it individually. -process.on('unhandledRejection', (reason, promise) => { +process.on('unhandledRejection', reason => { const err = reason instanceof Error ? { message: reason.message, stack: reason.stack, name: reason.name } @@ -33,7 +33,6 @@ process.on('unhandledRejection', (reason, promise) => { townId: TOWN_ID, uptimeMs: getUptime(), activeAgents: activeAgentCount(), - event: 'unhandled_rejection', }); }); From 62e1f1355ede3aa2f2df380aec242020171e19a5 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 7 May 2026 11:24:19 -0500 Subject: [PATCH 7/8] feat(gastown): add proactive idle-container stop in TownDO alarm (#3113) * feat(gastown): add proactive idle-container stop in TownDO alarm When a town has no active work and the mayor has been idle for >5min, the alarm handler now calls container.stop() to force a graceful SIGTERM drain instead of waiting for Cloudflare's port-idle timer (which gets reset by PTY WebSocket keep-alives). This targets the root cause of 300+ active containers for ~100 active users. - Add stopContainerIfIdle() with dependency-injected logic in town/container-idle-stop.ts for testability - Wire into alarm handler just before the re-arm block - Emit container.idle_stop event with reason for observability - 2min throttle prevents thrash; failed stops are retried next tick - 13 unit tests covering all branches * fix(gastown): remove non-null assertions from container-idle-stop Replace mayor.last_activity_at! with null-safe checks using mayor.last_activity_at != null guards, consistent with coding style that forbids ! non-null assertions. * fix: allow stopping healthy containers in idle-stop guard The state guard only checked for 'running', but containers can also report 'healthy' as an active state (consistent with gastown.worker.ts). Added 'healthy' to the guard and a corresponding test. --------- Co-authored-by: John Fawcett --- services/gastown/src/dos/Town.do.ts | 28 +++ .../src/dos/town/container-idle-stop.test.ts | 205 ++++++++++++++++++ .../src/dos/town/container-idle-stop.ts | 81 +++++++ 3 files changed, 314 insertions(+) create mode 100644 services/gastown/src/dos/town/container-idle-stop.test.ts create mode 100644 services/gastown/src/dos/town/container-idle-stop.ts diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index 2d807aef4d..2fab945fd9 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -29,6 +29,7 @@ import * as dispatch from './town/container-dispatch'; import * as patrol from './town/patrol'; import * as scheduling from './town/scheduling'; import * as events from './town/events'; +import { stopContainerIfIdle as _stopContainerIfIdle } from './town/container-idle-stop'; import * as scm from './town/town-scm'; import * as reconciler from './town/reconciler'; import { applyAction } from './town/actions'; @@ -4104,6 +4105,12 @@ export class TownDO extends DurableObject { }), ]); + await this.stopContainerIfIdle().catch(err => + logger.warn('alarm: stopContainerIfIdle failed', { + error: err instanceof Error ? err.message : String(err), + }) + ); + // Re-arm: fast when active, slow when idle const interval = activeWork ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; await this.ctx.storage.setAlarm(Date.now() + interval); @@ -4164,6 +4171,27 @@ export class TownDO extends DurableObject { await this.ctx.storage.put('container:lastTokenRefreshAt', now); } + /** + * Proactively stop the town container when the town is idle. + * + * Cloudflare's sleepAfter timer resets on any port-8080 traffic (including + * long-lived PTY WebSockets), so containers can stay awake for hours after + * all real work finishes. Delegates to container-idle-stop sub-module. + */ + private async stopContainerIfIdle(): Promise { + await _stopContainerIfIdle({ + hasActiveWork: () => this.hasActiveWork(), + isDraining: () => this._draining, + getMayor: () => agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null, + getTownId: () => this.townId, + getLastIdleStopAt: () => this.ctx.storage.get('container:lastIdleStopAt'), + setLastIdleStopAt: (value) => this.ctx.storage.put('container:lastIdleStopAt', value), + getContainerStub: (townId) => getTownContainerStub(this.env, townId), + writeEventFn: (data) => writeEvent(this.env, data), + now: () => Date.now(), + }); + } + /** * Proactively remint KILOCODE_TOKEN when it's approaching expiry. * Throttled to once per day — the 30-day token is refreshed when diff --git a/services/gastown/src/dos/town/container-idle-stop.test.ts b/services/gastown/src/dos/town/container-idle-stop.test.ts new file mode 100644 index 0000000000..be75051ffd --- /dev/null +++ b/services/gastown/src/dos/town/container-idle-stop.test.ts @@ -0,0 +1,205 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + stopContainerIfIdle, + CONTAINER_IDLE_STOP_THRESHOLD_MS, + CONTAINER_IDLE_STOP_THROTTLE_MS, + type IdleStopDeps, +} from './container-idle-stop'; + +function makeMayor(overrides: Partial<{ status: string; last_activity_at: string }> = {}) { + return { + id: 'mayor-1', + rig_id: null, + role: 'mayor' as const, + name: 'Mayor', + identity: 'Mayor@test', + status: overrides.status ?? 'idle', + current_hook_bead_id: null, + dispatch_attempts: 0, + last_activity_at: overrides.last_activity_at ?? new Date().toISOString(), + checkpoint: null, + created_at: new Date().toISOString(), + agent_status_message: null, + agent_status_updated_at: null, + }; +} + +type TestDeps = IdleStopDeps & { + _stopFn: ReturnType; + _getStateFn: ReturnType; + _store: Map; + _events: Array<{ event: string; townId: string; reason: string; error?: string }>; +}; + +function makeDeps(overrides: Partial = {}): TestDeps { + const stopFn = vi.fn().mockResolvedValue(undefined); + const getStateFn = vi.fn().mockResolvedValue({ status: 'running' }); + const store = new Map(); + const events: Array<{ event: string; townId: string; reason: string; error?: string }> = []; + + return { + hasActiveWork: overrides.hasActiveWork ?? (() => false), + isDraining: overrides.isDraining ?? (() => false), + getMayor: overrides.getMayor ?? (() => null), + getTownId: overrides.getTownId ?? (() => 'town-1'), + getLastIdleStopAt: + overrides.getLastIdleStopAt ?? (() => Promise.resolve(store.get('container:lastIdleStopAt'))), + setLastIdleStopAt: + overrides.setLastIdleStopAt ?? + ((value: number) => { + store.set('container:lastIdleStopAt', value); + return Promise.resolve(); + }), + getContainerStub: + overrides.getContainerStub ?? + (() => ({ + getState: getStateFn, + stop: stopFn, + })), + writeEventFn: + overrides.writeEventFn ?? + ((data) => { + events.push(data); + }), + now: overrides.now ?? (() => Date.now()), + _stopFn: stopFn, + _getStateFn: getStateFn, + _store: store, + _events: events, + } as TestDeps; +} + +describe('stopContainerIfIdle', () => { + it('does not stop when town has active work', async () => { + const deps = makeDeps({ hasActiveWork: () => true }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when draining', async () => { + const deps = makeDeps({ isDraining: () => true }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor is working', async () => { + const deps = makeDeps({ getMayor: () => makeMayor({ status: 'working' }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor is stalled', async () => { + const deps = makeDeps({ getMayor: () => makeMayor({ status: 'stalled' }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor last_activity_at is within threshold', async () => { + const recentActivity = new Date(Date.now() - 60_000).toISOString(); + const deps = makeDeps({ getMayor: () => makeMayor({ last_activity_at: recentActivity }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('stops container when mayor idle beyond threshold and container is running', async () => { + const oldActivity = new Date(Date.now() - CONTAINER_IDLE_STOP_THRESHOLD_MS - 60_000).toISOString(); + const deps = makeDeps({ getMayor: () => makeMayor({ last_activity_at: oldActivity }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + expect(deps._events).toHaveLength(1); + expect(deps._events[0].event).toBe('container.idle_stop'); + expect(deps._events[0].reason).toMatch(/^mayor_idle_\d+m$/); + }); + + it('stops container when no mayor exists (no_active_work reason)', async () => { + const deps = makeDeps({ getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + expect(deps._events[0].reason).toBe('no_active_work'); + }); + + it('stops container when container is healthy', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'healthy' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).toHaveBeenCalledTimes(1); + expect(deps._events[0].reason).toBe('no_active_work'); + }); + + it('does not stop when container is already stopped', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'stopped' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).not.toHaveBeenCalled(); + }); + + it('throttles: calling twice within throttle window stops only once', async () => { + const deps = makeDeps({ getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + }); + + it('allows stop again after throttle window passes', async () => { + let currentTime = Date.now(); + const deps = makeDeps({ + getMayor: () => null, + now: () => currentTime, + }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + + currentTime += CONTAINER_IDLE_STOP_THROTTLE_MS + 1; + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(2); + }); + + it('logs error and does not set throttle when stop() throws', async () => { + const stopFn = vi.fn().mockRejectedValue(new Error('stop failed')); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'running' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + + expect(deps._events).toHaveLength(1); + expect(deps._events[0].error).toBe('stop failed'); + expect(deps._store.has('container:lastIdleStopAt')).toBe(false); + }); + + it('returns without stopping when townId is null', async () => { + const deps = makeDeps({ getTownId: () => null, getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('returns without stopping when getState() throws', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockRejectedValue(new Error('rpc failed')), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).not.toHaveBeenCalled(); + }); +}); diff --git a/services/gastown/src/dos/town/container-idle-stop.ts b/services/gastown/src/dos/town/container-idle-stop.ts new file mode 100644 index 0000000000..eadb313468 --- /dev/null +++ b/services/gastown/src/dos/town/container-idle-stop.ts @@ -0,0 +1,81 @@ +/** + * Proactive idle-container stop logic. + * + * Cloudflare's sleepAfter timer resets on any port-8080 traffic (including + * long-lived PTY WebSockets), so containers can stay awake for hours after + * all real work finishes. This module provides the decision logic for + * stopping the container from the TownDO alarm when the town is truly idle. + */ + +import { logger } from '../../util/log.util'; +import type { Agent } from '../../types'; + +export const CONTAINER_IDLE_STOP_THRESHOLD_MS = 5 * 60_000; +export const CONTAINER_IDLE_STOP_THROTTLE_MS = 2 * 60_000; + +export type IdleStopDeps = { + hasActiveWork: () => boolean; + isDraining: () => boolean; + getMayor: () => Agent | null; + getTownId: () => string | null; + getLastIdleStopAt: () => Promise; + setLastIdleStopAt: (value: number) => Promise; + getContainerStub: (townId: string) => { getState: () => Promise<{ status: string }>; stop: () => Promise }; + writeEventFn: (data: { event: string; townId: string; reason: string; error?: string }) => void; + now: () => number; +}; + +export async function stopContainerIfIdle(deps: IdleStopDeps): Promise { + if (deps.hasActiveWork()) return; + if (deps.isDraining()) return; + + const mayor = deps.getMayor(); + const mayorAlive = mayor && (mayor.status === 'working' || mayor.status === 'stalled'); + if (mayorAlive) return; + + if (mayor && mayor.last_activity_at != null) { + const lastActivity = new Date(mayor.last_activity_at).getTime(); + if (deps.now() - lastActivity <= CONTAINER_IDLE_STOP_THRESHOLD_MS) return; + } + + const townId = deps.getTownId(); + if (!townId) return; + + const now = deps.now(); + const lastIdleStop = (await deps.getLastIdleStopAt()) ?? 0; + if (now - lastIdleStop < CONTAINER_IDLE_STOP_THROTTLE_MS) return; + + const stub = deps.getContainerStub(townId); + let state: { status: string }; + try { + state = await stub.getState(); + } catch (err) { + logger.warn('stopContainerIfIdle: getState() failed', { + error: err instanceof Error ? err.message : String(err), + }); + return; + } + + if (state.status !== 'running' && state.status !== 'healthy') return; + + const idleMinutes = mayor?.last_activity_at != null + ? Math.round((deps.now() - new Date(mayor.last_activity_at).getTime()) / 60_000) + : 0; + const reason = mayor ? `mayor_idle_${idleMinutes}m` : 'no_active_work'; + + try { + await stub.stop(); + await deps.setLastIdleStopAt(now); + deps.writeEventFn({ event: 'container.idle_stop', townId, reason }); + } catch (err) { + logger.warn('stopContainerIfIdle: stop() failed', { + error: err instanceof Error ? err.message : String(err), + }); + deps.writeEventFn({ + event: 'container.idle_stop', + townId, + reason, + error: err instanceof Error ? err.message.slice(0, 300) : String(err).slice(0, 300), + }); + } +} From 4793373794ec23abb0895a6949b9897154b72ce3 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 7 May 2026 14:42:48 -0500 Subject: [PATCH 8/8] fix(onboarding): redirect back to onboarding after GitHub app install (#3119) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(onboarding): redirect back to onboarding after GitHub app install * fix: address PR review feedback — stable effect deps, parsed error state, URIError guard - OnboardingStepRepo: use stable refetch reference and scalar param instead of full query object to prevent duplicate toasts/refetches - GitHub callback: parse owner token from state in error handler so |return= suffix doesn't leak into org redirect URLs - validate-return-path: catch URIError from malformed percent-encoding and treat as invalid return path (null) instead of throwing --------- Co-authored-by: John Fawcett --- .../gastown/onboarding/OnboardingStepRepo.tsx | 22 +++- .../onboarding/OnboardingWizardClient.tsx | 13 ++- .../api/integrations/github/callback/route.ts | 37 +++--- .../integrations/validate-return-path.test.ts | 110 ++++++++++++++++++ .../lib/integrations/validate-return-path.ts | 31 +++++ 5 files changed, 192 insertions(+), 21 deletions(-) create mode 100644 apps/web/src/lib/integrations/validate-return-path.test.ts create mode 100644 apps/web/src/lib/integrations/validate-return-path.ts diff --git a/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx b/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx index e0f4b67fb6..9e2b22ec4c 100644 --- a/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx +++ b/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx @@ -1,7 +1,9 @@ 'use client'; -import { useState, useMemo, useCallback } from 'react'; +import { useState, useMemo, useCallback, useEffect } from 'react'; import { useQuery } from '@tanstack/react-query'; +import { useSearchParams } from 'next/navigation'; +import { toast } from 'sonner'; import { useTRPC } from '@/lib/trpc/utils'; import { useUser } from '@/hooks/useUser'; import { RepositoryCombobox, type RepositoryOption } from '@/components/shared/RepositoryCombobox'; @@ -63,11 +65,23 @@ export function OnboardingStepRepo() { const githubAppName = process.env.NEXT_PUBLIC_GITHUB_APP_NAME || 'KiloConnect'; const handleInstallGithub = useCallback(() => { - const installState = orgId ? `org_${orgId}` : `user_${user?.id}`; - const installUrl = `https://github.com/apps/${githubAppName}/installations/new?state=${installState}`; - window.open(installUrl, '_blank', 'noopener'); + const owner = orgId ? `org_${orgId}` : `user_${user?.id}`; + const returnPath = `/gastown/onboarding?step=repo${orgId ? `&orgId=${orgId}` : ''}`; + const state = `${owner}|return=${encodeURIComponent(returnPath)}`; + const installUrl = `https://github.com/apps/${githubAppName}/installations/new?state=${encodeURIComponent(state)}`; + window.location.href = installUrl; }, [orgId, user?.id, githubAppName]); + const githubInstallParam = useSearchParams().get('github_install'); + const { refetch: refetchGithubRepos } = githubReposQuery; + + useEffect(() => { + if (githubInstallParam === 'success') { + refetchGithubRepos(); + toast.success('GitHub app installed. Select a repo to continue.'); + } + }, [githubInstallParam, refetchGithubRepos]); + const handleRepoSelect = useCallback( (fullName: string) => { setSelectedRepoFullName(fullName); diff --git a/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx b/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx index 22cc538436..889c71a7bf 100644 --- a/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx +++ b/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx @@ -20,6 +20,8 @@ const STEPS = [ type StepKey = (typeof STEPS)[number]['key']; +const VALID_STEP_KEYS = new Set(STEPS.map(s => s.key)); + function StepIndicator({ currentIndex }: { currentIndex: number }) { return (
@@ -146,7 +148,16 @@ function CancelButton() { function WizardContent() { const searchParams = useSearchParams(); const orgId = searchParams.get('orgId'); - const [currentStepKey, setCurrentStepKey] = useState('name'); + + const initialStep: StepKey = (() => { + const stepParam = searchParams.get('step'); + if (stepParam && VALID_STEP_KEYS.has(stepParam)) { + return stepParam as StepKey; + } + return 'name'; + })(); + + const [currentStepKey, setCurrentStepKey] = useState(initialStep); const currentIndex = STEPS.findIndex(s => s.key === currentStepKey); diff --git a/apps/web/src/app/api/integrations/github/callback/route.ts b/apps/web/src/app/api/integrations/github/callback/route.ts index d5ff99be02..45598327c2 100644 --- a/apps/web/src/app/api/integrations/github/callback/route.ts +++ b/apps/web/src/app/api/integrations/github/callback/route.ts @@ -19,6 +19,7 @@ import type { IntegrationPermissions, Owner, } from '@/lib/integrations/core/types'; +import { parseStateReturn } from '@/lib/integrations/validate-return-path'; import { captureException, captureMessage } from '@sentry/nextjs'; /** @@ -40,23 +41,25 @@ export async function GET(request: NextRequest) { const searchParams = request.nextUrl.searchParams; const installationId = searchParams.get('installation_id') ?? ''; const setupAction = searchParams.get('setup_action'); - const state = searchParams.get('state'); // Contains owner info (org_ID or user_ID) + const rawState = searchParams.get('state'); + + // 3. Parse owner from state (with optional |return= suffix) + const { ownerToken, returnTo } = parseStateReturn(rawState); - // 3. Parse owner from state let owner: Owner; let ownerId: string; - if (state?.startsWith('org_')) { - ownerId = state.replace('org_', ''); + if (ownerToken.startsWith('org_')) { + ownerId = ownerToken.slice(4); owner = { type: 'org', id: ownerId }; - } else if (state?.startsWith('user_')) { - ownerId = state.replace('user_', ''); + } else if (ownerToken.startsWith('user_')) { + ownerId = ownerToken.slice(5); owner = { type: 'user', id: ownerId }; } else { captureMessage('GitHub callback missing or invalid owner in state', { level: 'warning', tags: { endpoint: 'github/callback', source: 'github_app_installation' }, - extra: { installationId, state, allParams: Object.fromEntries(searchParams.entries()) }, + extra: { installationId, rawState, allParams: Object.fromEntries(searchParams.entries()) }, }); return NextResponse.redirect(new URL('/', request.url)); } @@ -172,7 +175,7 @@ export async function GET(request: NextRequest) { captureMessage('GitHub callback missing installation_id', { level: 'warning', tags: { endpoint: 'github/callback', source: 'github_app_installation' }, - extra: { setupAction, state, allParams: Object.fromEntries(searchParams.entries()) }, + extra: { setupAction, rawState, allParams: Object.fromEntries(searchParams.entries()) }, }); const redirectPath = @@ -291,8 +294,9 @@ export async function GET(request: NextRequest) { } // 9. Redirect to success page - const successPath = - owner.type === 'org' + const successPath = returnTo + ? `${returnTo}${returnTo.includes('?') ? '&' : '?'}github_install=success` + : owner.type === 'org' ? `/organizations/${owner.id}/integrations/github?success=installed` : `/integrations/github?success=installed`; @@ -302,7 +306,7 @@ export async function GET(request: NextRequest) { // Capture error to Sentry with context for debugging const searchParams = request.nextUrl.searchParams; - const state = searchParams.get('state'); + const rawState = searchParams.get('state'); captureException(error, { tags: { @@ -312,17 +316,18 @@ export async function GET(request: NextRequest) { extra: { installationId: searchParams.get('installation_id'), setupAction: searchParams.get('setup_action'), - state, + rawState, }, }); - // Determine redirect path based on state parameter + const { ownerToken: errorOwnerToken } = parseStateReturn(rawState); + let redirectPath = '/?error=installation_failed'; - if (state?.startsWith('org_')) { - const orgId = state.replace('org_', ''); + if (errorOwnerToken.startsWith('org_')) { + const orgId = errorOwnerToken.slice(4); redirectPath = `/organizations/${orgId}/integrations/github?error=installation_failed`; - } else if (state?.startsWith('user_')) { + } else if (errorOwnerToken.startsWith('user_')) { redirectPath = `/integrations/github?error=installation_failed`; } diff --git a/apps/web/src/lib/integrations/validate-return-path.test.ts b/apps/web/src/lib/integrations/validate-return-path.test.ts new file mode 100644 index 0000000000..ffd238c210 --- /dev/null +++ b/apps/web/src/lib/integrations/validate-return-path.test.ts @@ -0,0 +1,110 @@ +import { validateReturnPath, parseStateReturn } from './validate-return-path'; + +describe('validateReturnPath', () => { + it('accepts a simple internal path', () => { + expect(validateReturnPath('/gastown/onboarding')).toBe('/gastown/onboarding'); + }); + + it('accepts a path with query params', () => { + expect(validateReturnPath('/gastown/onboarding?step=repo&orgId=123')).toBe( + '/gastown/onboarding?step=repo&orgId=123' + ); + }); + + it('rejects protocol-relative URLs', () => { + expect(validateReturnPath('//evil.com')).toBeNull(); + }); + + it('rejects absolute URLs', () => { + expect(validateReturnPath('https://evil.com')).toBeNull(); + }); + + it('rejects backslash-prefixed paths', () => { + expect(validateReturnPath('/\\evil.com')).toBeNull(); + }); + + it('rejects paths with carriage return', () => { + expect(validateReturnPath('/foo\rbar')).toBeNull(); + }); + + it('rejects paths with newline', () => { + expect(validateReturnPath('/foo\nbar')).toBeNull(); + }); + + it('rejects paths without leading slash', () => { + expect(validateReturnPath('foo/bar')).toBeNull(); + }); + + it('rejects empty string', () => { + expect(validateReturnPath('')).toBeNull(); + }); + + it('accepts root path', () => { + expect(validateReturnPath('/')).toBe('/'); + }); + + it('rejects triple-slash paths', () => { + expect(validateReturnPath('///foo')).toBeNull(); + }); +}); + +describe('parseStateReturn', () => { + it('parses state with return suffix', () => { + const encoded = encodeURIComponent('/gastown/onboarding?step=repo'); + const result = parseStateReturn(`user_abc|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: '/gastown/onboarding?step=repo', + }); + }); + + it('parses org state with return suffix', () => { + const encoded = encodeURIComponent('/gastown/onboarding?step=repo&orgId=123'); + const result = parseStateReturn(`org_123|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'org_123', + returnTo: '/gastown/onboarding?step=repo&orgId=123', + }); + }); + + it('parses state without return suffix (backwards compat)', () => { + const result = parseStateReturn('user_abc'); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); + + it('parses org state without return suffix', () => { + const result = parseStateReturn('org_123'); + expect(result).toEqual({ + ownerToken: 'org_123', + returnTo: null, + }); + }); + + it('returns null returnTo when return path is invalid', () => { + const encoded = encodeURIComponent('//evil.com'); + const result = parseStateReturn(`user_abc|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); + + it('handles null state', () => { + const result = parseStateReturn(null); + expect(result).toEqual({ + ownerToken: '', + returnTo: null, + }); + }); + + it('returns null returnTo when return suffix has malformed percent-encoding', () => { + const result = parseStateReturn('user_abc|return=%ZZ'); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); +}); diff --git a/apps/web/src/lib/integrations/validate-return-path.ts b/apps/web/src/lib/integrations/validate-return-path.ts new file mode 100644 index 0000000000..e0cc67008c --- /dev/null +++ b/apps/web/src/lib/integrations/validate-return-path.ts @@ -0,0 +1,31 @@ +const RETURN_PATH_RE = /^\/(?![\/\\])[^\r\n]*$/; + +export function validateReturnPath(candidate: string): string | null { + if (!RETURN_PATH_RE.test(candidate) || candidate.startsWith('//')) { + return null; + } + return candidate; +} + +export function parseStateReturn(rawState: string | null): { + ownerToken: string; + returnTo: string | null; +} { + let ownerToken = rawState ?? ''; + let returnTo: string | null = null; + + if (rawState) { + const sepIdx = rawState.indexOf('|return='); + if (sepIdx !== -1) { + ownerToken = rawState.slice(0, sepIdx); + try { + const candidate = decodeURIComponent(rawState.slice(sepIdx + '|return='.length)); + returnTo = validateReturnPath(candidate); + } catch { + returnTo = null; + } + } + } + + return { ownerToken, returnTo }; +}