diff --git a/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx b/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx index e0f4b67fb6..9e2b22ec4c 100644 --- a/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx +++ b/apps/web/src/app/(app)/gastown/onboarding/OnboardingStepRepo.tsx @@ -1,7 +1,9 @@ 'use client'; -import { useState, useMemo, useCallback } from 'react'; +import { useState, useMemo, useCallback, useEffect } from 'react'; import { useQuery } from '@tanstack/react-query'; +import { useSearchParams } from 'next/navigation'; +import { toast } from 'sonner'; import { useTRPC } from '@/lib/trpc/utils'; import { useUser } from '@/hooks/useUser'; import { RepositoryCombobox, type RepositoryOption } from '@/components/shared/RepositoryCombobox'; @@ -63,11 +65,23 @@ export function OnboardingStepRepo() { const githubAppName = process.env.NEXT_PUBLIC_GITHUB_APP_NAME || 'KiloConnect'; const handleInstallGithub = useCallback(() => { - const installState = orgId ? `org_${orgId}` : `user_${user?.id}`; - const installUrl = `https://github.com/apps/${githubAppName}/installations/new?state=${installState}`; - window.open(installUrl, '_blank', 'noopener'); + const owner = orgId ? `org_${orgId}` : `user_${user?.id}`; + const returnPath = `/gastown/onboarding?step=repo${orgId ? `&orgId=${orgId}` : ''}`; + const state = `${owner}|return=${encodeURIComponent(returnPath)}`; + const installUrl = `https://github.com/apps/${githubAppName}/installations/new?state=${encodeURIComponent(state)}`; + window.location.href = installUrl; }, [orgId, user?.id, githubAppName]); + const githubInstallParam = useSearchParams().get('github_install'); + const { refetch: refetchGithubRepos } = githubReposQuery; + + useEffect(() => { + if (githubInstallParam === 'success') { + refetchGithubRepos(); + toast.success('GitHub app installed. Select a repo to continue.'); + } + }, [githubInstallParam, refetchGithubRepos]); + const handleRepoSelect = useCallback( (fullName: string) => { setSelectedRepoFullName(fullName); diff --git a/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx b/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx index 22cc538436..889c71a7bf 100644 --- a/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx +++ b/apps/web/src/app/(app)/gastown/onboarding/OnboardingWizardClient.tsx @@ -20,6 +20,8 @@ const STEPS = [ type StepKey = (typeof STEPS)[number]['key']; +const VALID_STEP_KEYS = new Set(STEPS.map(s => s.key)); + function StepIndicator({ currentIndex }: { currentIndex: number }) { return (
@@ -146,7 +148,16 @@ function CancelButton() { function WizardContent() { const searchParams = useSearchParams(); const orgId = searchParams.get('orgId'); - const [currentStepKey, setCurrentStepKey] = useState('name'); + + const initialStep: StepKey = (() => { + const stepParam = searchParams.get('step'); + if (stepParam && VALID_STEP_KEYS.has(stepParam)) { + return stepParam as StepKey; + } + return 'name'; + })(); + + const [currentStepKey, setCurrentStepKey] = useState(initialStep); const currentIndex = STEPS.findIndex(s => s.key === currentStepKey); diff --git a/apps/web/src/app/api/integrations/github/callback/route.ts b/apps/web/src/app/api/integrations/github/callback/route.ts index d5ff99be02..45598327c2 100644 --- a/apps/web/src/app/api/integrations/github/callback/route.ts +++ b/apps/web/src/app/api/integrations/github/callback/route.ts @@ -19,6 +19,7 @@ import type { IntegrationPermissions, Owner, } from '@/lib/integrations/core/types'; +import { parseStateReturn } from '@/lib/integrations/validate-return-path'; import { captureException, captureMessage } from '@sentry/nextjs'; /** @@ -40,23 +41,25 @@ export async function GET(request: NextRequest) { const searchParams = request.nextUrl.searchParams; const installationId = searchParams.get('installation_id') ?? ''; const setupAction = searchParams.get('setup_action'); - const state = searchParams.get('state'); // Contains owner info (org_ID or user_ID) + const rawState = searchParams.get('state'); + + // 3. Parse owner from state (with optional |return= suffix) + const { ownerToken, returnTo } = parseStateReturn(rawState); - // 3. Parse owner from state let owner: Owner; let ownerId: string; - if (state?.startsWith('org_')) { - ownerId = state.replace('org_', ''); + if (ownerToken.startsWith('org_')) { + ownerId = ownerToken.slice(4); owner = { type: 'org', id: ownerId }; - } else if (state?.startsWith('user_')) { - ownerId = state.replace('user_', ''); + } else if (ownerToken.startsWith('user_')) { + ownerId = ownerToken.slice(5); owner = { type: 'user', id: ownerId }; } else { captureMessage('GitHub callback missing or invalid owner in state', { level: 'warning', tags: { endpoint: 'github/callback', source: 'github_app_installation' }, - extra: { installationId, state, allParams: Object.fromEntries(searchParams.entries()) }, + extra: { installationId, rawState, allParams: Object.fromEntries(searchParams.entries()) }, }); return NextResponse.redirect(new URL('/', request.url)); } @@ -172,7 +175,7 @@ export async function GET(request: NextRequest) { captureMessage('GitHub callback missing installation_id', { level: 'warning', tags: { endpoint: 'github/callback', source: 'github_app_installation' }, - extra: { setupAction, state, allParams: Object.fromEntries(searchParams.entries()) }, + extra: { setupAction, rawState, allParams: Object.fromEntries(searchParams.entries()) }, }); const redirectPath = @@ -291,8 +294,9 @@ export async function GET(request: NextRequest) { } // 9. Redirect to success page - const successPath = - owner.type === 'org' + const successPath = returnTo + ? `${returnTo}${returnTo.includes('?') ? '&' : '?'}github_install=success` + : owner.type === 'org' ? `/organizations/${owner.id}/integrations/github?success=installed` : `/integrations/github?success=installed`; @@ -302,7 +306,7 @@ export async function GET(request: NextRequest) { // Capture error to Sentry with context for debugging const searchParams = request.nextUrl.searchParams; - const state = searchParams.get('state'); + const rawState = searchParams.get('state'); captureException(error, { tags: { @@ -312,17 +316,18 @@ export async function GET(request: NextRequest) { extra: { installationId: searchParams.get('installation_id'), setupAction: searchParams.get('setup_action'), - state, + rawState, }, }); - // Determine redirect path based on state parameter + const { ownerToken: errorOwnerToken } = parseStateReturn(rawState); + let redirectPath = '/?error=installation_failed'; - if (state?.startsWith('org_')) { - const orgId = state.replace('org_', ''); + if (errorOwnerToken.startsWith('org_')) { + const orgId = errorOwnerToken.slice(4); redirectPath = `/organizations/${orgId}/integrations/github?error=installation_failed`; - } else if (state?.startsWith('user_')) { + } else if (errorOwnerToken.startsWith('user_')) { redirectPath = `/integrations/github?error=installation_failed`; } diff --git a/apps/web/src/components/gastown/MayorChat.tsx b/apps/web/src/components/gastown/MayorChat.tsx index e84309fc19..48d9c29a4d 100644 --- a/apps/web/src/components/gastown/MayorChat.tsx +++ b/apps/web/src/components/gastown/MayorChat.tsx @@ -22,7 +22,22 @@ export function MayorChat({ townId }: MayorChatProps) { // Eagerly ensure mayor agent + container on mount const ensureMayor = useMutation( trpc.gastown.ensureMayor.mutationOptions({ - onSuccess: () => { + onSuccess: data => { + queryClient.setQueryData( + trpc.gastown.getMayorStatus.queryKey({ townId }), + (old: { configured?: boolean; townId?: string; session?: { agentId?: string; sessionId?: string; status?: string; lastActivityAt?: string } } | undefined) => ({ + ...(old ?? {}), + configured: true, + townId, + session: { + ...(old?.session ?? {}), + agentId: data.agentId, + sessionId: data.agentId, + status: data.sessionStatus, + lastActivityAt: old?.session?.lastActivityAt ?? new Date().toISOString(), + }, + }) + ); void queryClient.invalidateQueries({ queryKey: trpc.gastown.getMayorStatus.queryKey(), }); diff --git a/apps/web/src/components/gastown/TerminalBar.tsx b/apps/web/src/components/gastown/TerminalBar.tsx index ad2af05ca6..f51f42cf08 100644 --- a/apps/web/src/components/gastown/TerminalBar.tsx +++ b/apps/web/src/components/gastown/TerminalBar.tsx @@ -1319,7 +1319,22 @@ function MayorTerminalPane({ townId, collapsed }: { townId: string; collapsed: b const ensureMayor = useMutation( trpc.gastown.ensureMayor.mutationOptions({ - onSuccess: () => { + onSuccess: data => { + queryClient.setQueryData( + trpc.gastown.getMayorStatus.queryKey({ townId }), + (old: { configured?: boolean; townId?: string; session?: { agentId?: string; sessionId?: string; status?: string; lastActivityAt?: string } } | undefined) => ({ + ...(old ?? {}), + configured: true, + townId, + session: { + ...(old?.session ?? {}), + agentId: data.agentId, + sessionId: data.agentId, + status: data.sessionStatus, + lastActivityAt: old?.session?.lastActivityAt ?? new Date().toISOString(), + }, + }) + ); void queryClient.invalidateQueries({ queryKey: trpc.gastown.getMayorStatus.queryKey(), }); diff --git a/apps/web/src/lib/integrations/validate-return-path.test.ts b/apps/web/src/lib/integrations/validate-return-path.test.ts new file mode 100644 index 0000000000..ffd238c210 --- /dev/null +++ b/apps/web/src/lib/integrations/validate-return-path.test.ts @@ -0,0 +1,110 @@ +import { validateReturnPath, parseStateReturn } from './validate-return-path'; + +describe('validateReturnPath', () => { + it('accepts a simple internal path', () => { + expect(validateReturnPath('/gastown/onboarding')).toBe('/gastown/onboarding'); + }); + + it('accepts a path with query params', () => { + expect(validateReturnPath('/gastown/onboarding?step=repo&orgId=123')).toBe( + '/gastown/onboarding?step=repo&orgId=123' + ); + }); + + it('rejects protocol-relative URLs', () => { + expect(validateReturnPath('//evil.com')).toBeNull(); + }); + + it('rejects absolute URLs', () => { + expect(validateReturnPath('https://evil.com')).toBeNull(); + }); + + it('rejects backslash-prefixed paths', () => { + expect(validateReturnPath('/\\evil.com')).toBeNull(); + }); + + it('rejects paths with carriage return', () => { + expect(validateReturnPath('/foo\rbar')).toBeNull(); + }); + + it('rejects paths with newline', () => { + expect(validateReturnPath('/foo\nbar')).toBeNull(); + }); + + it('rejects paths without leading slash', () => { + expect(validateReturnPath('foo/bar')).toBeNull(); + }); + + it('rejects empty string', () => { + expect(validateReturnPath('')).toBeNull(); + }); + + it('accepts root path', () => { + expect(validateReturnPath('/')).toBe('/'); + }); + + it('rejects triple-slash paths', () => { + expect(validateReturnPath('///foo')).toBeNull(); + }); +}); + +describe('parseStateReturn', () => { + it('parses state with return suffix', () => { + const encoded = encodeURIComponent('/gastown/onboarding?step=repo'); + const result = parseStateReturn(`user_abc|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: '/gastown/onboarding?step=repo', + }); + }); + + it('parses org state with return suffix', () => { + const encoded = encodeURIComponent('/gastown/onboarding?step=repo&orgId=123'); + const result = parseStateReturn(`org_123|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'org_123', + returnTo: '/gastown/onboarding?step=repo&orgId=123', + }); + }); + + it('parses state without return suffix (backwards compat)', () => { + const result = parseStateReturn('user_abc'); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); + + it('parses org state without return suffix', () => { + const result = parseStateReturn('org_123'); + expect(result).toEqual({ + ownerToken: 'org_123', + returnTo: null, + }); + }); + + it('returns null returnTo when return path is invalid', () => { + const encoded = encodeURIComponent('//evil.com'); + const result = parseStateReturn(`user_abc|return=${encoded}`); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); + + it('handles null state', () => { + const result = parseStateReturn(null); + expect(result).toEqual({ + ownerToken: '', + returnTo: null, + }); + }); + + it('returns null returnTo when return suffix has malformed percent-encoding', () => { + const result = parseStateReturn('user_abc|return=%ZZ'); + expect(result).toEqual({ + ownerToken: 'user_abc', + returnTo: null, + }); + }); +}); diff --git a/apps/web/src/lib/integrations/validate-return-path.ts b/apps/web/src/lib/integrations/validate-return-path.ts new file mode 100644 index 0000000000..e0cc67008c --- /dev/null +++ b/apps/web/src/lib/integrations/validate-return-path.ts @@ -0,0 +1,31 @@ +const RETURN_PATH_RE = /^\/(?![\/\\])[^\r\n]*$/; + +export function validateReturnPath(candidate: string): string | null { + if (!RETURN_PATH_RE.test(candidate) || candidate.startsWith('//')) { + return null; + } + return candidate; +} + +export function parseStateReturn(rawState: string | null): { + ownerToken: string; + returnTo: string | null; +} { + let ownerToken = rawState ?? ''; + let returnTo: string | null = null; + + if (rawState) { + const sepIdx = rawState.indexOf('|return='); + if (sepIdx !== -1) { + ownerToken = rawState.slice(0, sepIdx); + try { + const candidate = decodeURIComponent(rawState.slice(sepIdx + '|return='.length)); + returnTo = validateReturnPath(candidate); + } catch { + returnTo = null; + } + } + } + + return { ownerToken, returnTo }; +} diff --git a/services/gastown/container/src/agent-runner.ts b/services/gastown/container/src/agent-runner.ts index 647e6d94ee..d31511a68c 100644 --- a/services/gastown/container/src/agent-runner.ts +++ b/services/gastown/container/src/agent-runner.ts @@ -372,6 +372,10 @@ async function verifyGitCredentials( * kilo serve requires a git repo in the working directory, so we init * a bare local repo with an empty initial commit. */ +export function mayorWorkdirForTown(townId: string): string { + return `/workspace/rigs/mayor-${townId}/mayor-workspace`; +} + async function createLightweightWorkspace(label: string, rigId: string): Promise { const { mkdir: mkdirAsync } = await import('node:fs/promises'); const { existsSync } = await import('node:fs'); diff --git a/services/gastown/container/src/main.ts b/services/gastown/container/src/main.ts index 5b12a1aa19..4abff0666e 100644 --- a/services/gastown/container/src/main.ts +++ b/services/gastown/container/src/main.ts @@ -1,11 +1,53 @@ import { startControlServer } from './control-server'; import { log } from './logger'; -import { bootHydration, getUptime } from './process-manager'; +import { activeAgentCount, bootHydration, getUptime, listAgents } from './process-manager'; -log.info('container.cold_start', { uptime: getUptime(), ts: new Date().toISOString() }); +// Container-scoped identifiers for crash/diagnostic logs. The container is +// pinned to a single town for its lifetime (see GASTOWN_TOWN_ID injection in +// the deployer), so reading these once at module init is safe and lets us +// emit them even when no agents are registered yet. +const TOWN_ID = process.env.GASTOWN_TOWN_ID ?? null; + +log.info('container.cold_start', { + uptime: getUptime(), + ts: new Date().toISOString(), + townId: TOWN_ID, +}); + +// Bun (like Node) will ignore unhandled promise rejections unless a handler +// is registered. Without this handler a rejection in a fire-and-forget path +// (e.g. `void saveDbSnapshot(...)`, `void subscribeToEvents(...)`, +// `setInterval(() => void fn())`) is effectively invisible — making the +// root cause of container crashes impossible to diagnose from logs. +// +// We deliberately DO NOT call process.exit here: visibility is the goal. +// If a specific rejection turns out to be fatal state corruption we can +// escalate it individually. +process.on('unhandledRejection', reason => { + const err = + reason instanceof Error + ? { message: reason.message, stack: reason.stack, name: reason.name } + : { message: String(reason) }; + log.error('container.unhandled_rejection', { + ...err, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + }); +}); process.on('uncaughtException', err => { - log.error('container.uncaught_exception', { error: err.message, stack: err.stack }); + log.error('container.uncaught_exception', { + message: err.message, + stack: err.stack, + name: err.name, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + }); + // Keep the existing fatal behaviour for truly uncaught synchronous errors. + // An unhandled rejection is handled separately above without exit so we + // can observe the crash class before deciding whether to remain fatal. process.exit(1); }); @@ -13,8 +55,45 @@ process.on('SIGTERM', () => { console.log('SIGTERM received — starting graceful drain...'); }); +// Periodically log RSS memory so we can correlate OOM-class failures +// (external SIGKILL from Cloudflare Containers runtime when a memory +// ceiling is hit) with steady-state memory growth. 30s cadence matches +// the heartbeat interval and is cheap. +const MEMORY_LOG_INTERVAL_MS = 30_000; +setInterval(() => { + try { + const mem = process.memoryUsage(); + log.info('container.memory_usage', { + rssMB: Math.round(mem.rss / 1024 / 1024), + heapUsedMB: Math.round(mem.heapUsed / 1024 / 1024), + heapTotalMB: Math.round(mem.heapTotal / 1024 / 1024), + externalMB: Math.round(mem.external / 1024 / 1024), + townId: TOWN_ID, + uptimeMs: getUptime(), + agents: listAgents().length, + activeAgents: activeAgentCount(), + }); + } catch (err) { + log.warn('container.memory_usage_failed', { + error: err instanceof Error ? err.message : String(err), + }); + } +}, MEMORY_LOG_INTERVAL_MS); + startControlServer(); void (async () => { - await bootHydration(); + try { + await bootHydration(); + } catch (err) { + // bootHydration has its own try/catch for the registry fetch path but + // the inner startAgent loop can still throw on rare synchronous errors + // before its first await. Log rather than crash so the next /agents/start + // request can recover. + log.error('container.boot_hydration_failed', { + message: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + townId: TOWN_ID, + }); + } })(); diff --git a/services/gastown/container/src/process-manager.test.ts b/services/gastown/container/src/process-manager.test.ts new file mode 100644 index 0000000000..d2c8906c2a --- /dev/null +++ b/services/gastown/container/src/process-manager.test.ts @@ -0,0 +1,180 @@ +import { describe, it, expect, vi } from 'vitest'; + +// Mock heavy imports so the module can be loaded without spinning up +// a real SDK server or hono app. +vi.mock('@kilocode/sdk', () => ({ + createKilo: vi.fn(), +})); +vi.mock('./agent-runner', () => ({ + runAgent: vi.fn(), + buildKiloConfigContent: vi.fn(), + resolveGitCredentials: vi.fn(), + writeMayorSystemPromptToAgentsMd: vi.fn(), +})); +vi.mock('./control-server', () => ({ + getCurrentTownConfig: vi.fn(() => ({})), + getLastAppliedEnvVarKeys: vi.fn(() => new Set()), + RESERVED_ENV_KEYS: new Set(), +})); +vi.mock('./completion-reporter', () => ({ + reportAgentCompleted: vi.fn(), + reportMayorWaiting: vi.fn(), +})); +vi.mock('./token-refresh', () => ({ + refreshTokenIfNearExpiry: vi.fn(), +})); + +const { applyModelToSession, withStartAgentLock } = await import('./process-manager'); + +type PromptCall = { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; +}; + +function makeClient(impl?: (args: PromptCall) => Promise) { + const calls: PromptCall[] = []; + const prompt = vi.fn(async (args: PromptCall) => { + calls.push(args); + if (impl) return impl(args); + return {}; + }); + return { client: { session: { prompt } }, calls, prompt }; +} + +describe('applyModelToSession', () => { + it('sends the startup prompt with the model for a fresh session', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-new' }); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: 'STARTUP PROMPT' }]); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-sonnet-4.6', + }); + expect(calls[0].body.noReply).toBeUndefined(); + }); + + it('pushes the new model with noReply:true for a resumed session without replaying the startup prompt', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT (must not be sent)', + resumedSession: true, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-resumed' }); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-opus-4.7', + }); + expect(calls[0].body.noReply).toBe(true); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: '' }]); + // Ensure the MAYOR_STARTUP_PROMPT is NOT replayed on resume. + expect(calls[0].body.parts[0].text).not.toContain('STARTUP PROMPT'); + }); + + it('swallows errors from the resumed-session prompt so the hot-swap can continue', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + // Should not throw — errors on the noReply path are logged and ignored. + await expect( + applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT', + resumedSession: true, + }) + ).resolves.toBeUndefined(); + }); + + it('propagates errors for a fresh session (so the hot-swap can roll back)', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + await expect( + applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }) + ).rejects.toThrow('simulated SDK failure'); + }); +}); + +describe('withStartAgentLock', () => { + it('serialises concurrent callers for the same agentId', async () => { + const order: string[] = []; + let secondStartedBeforeFirstFinished = false; + + // Fire both in the same microtask so they race on the lock. + const first = withStartAgentLock('agent-1', async () => { + order.push('first:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('first:end'); + return 1; + }); + const second = withStartAgentLock('agent-1', async () => { + // If the lock works, `first:end` has already been pushed. + if (!order.includes('first:end')) { + secondStartedBeforeFirstFinished = true; + } + order.push('second:start'); + order.push('second:end'); + return 2; + }); + + const [r1, r2] = await Promise.all([first, second]); + expect(r1).toBe(1); + expect(r2).toBe(2); + expect(secondStartedBeforeFirstFinished).toBe(false); + expect(order).toEqual(['first:start', 'first:end', 'second:start', 'second:end']); + }); + + it('runs concurrently for different agentIds', async () => { + const order: string[] = []; + + const a = withStartAgentLock('agent-a', async () => { + order.push('a:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('a:end'); + }); + const b = withStartAgentLock('agent-b', async () => { + order.push('b:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('b:end'); + }); + + await Promise.all([a, b]); + + // Both should have started before either ended (no serialisation across ids). + expect(order.indexOf('b:start')).toBeLessThan(order.indexOf('a:end')); + }); + + it('releases the lock when the fn throws so subsequent callers can proceed', async () => { + await expect( + withStartAgentLock('agent-err', async () => { + throw new Error('boom'); + }) + ).rejects.toThrow('boom'); + + const result = await withStartAgentLock('agent-err', async () => 'ok'); + expect(result).toBe('ok'); + }); +}); diff --git a/services/gastown/container/src/process-manager.ts b/services/gastown/container/src/process-manager.ts index 2c58efa795..71af0067c6 100644 --- a/services/gastown/container/src/process-manager.ts +++ b/services/gastown/container/src/process-manager.ts @@ -11,7 +11,7 @@ import { z } from 'zod'; import * as fs from 'node:fs/promises'; import type { ManagedAgent, StartAgentRequest } from './types'; import { reportAgentCompleted, reportMayorWaiting } from './completion-reporter'; -import { buildKiloConfigContent } from './agent-runner'; +import { buildKiloConfigContent, mayorWorkdirForTown } from './agent-runner'; import { getCurrentTownConfig, getLastAppliedEnvVarKeys, @@ -30,6 +30,7 @@ type SDKInstance = { client: KiloClient; server: { url: string; close(): void }; sessionCount: number; + configContent?: string; }; const agents = new Map(); @@ -71,6 +72,47 @@ export function isDraining(): boolean { // once created, the SDK instance is reused without locking. let sdkServerLock: Promise = Promise.resolve(); +// Per-agentId mutex for startAgent. Without this, two concurrent POST +// /agents/start calls for the same agentId (observed in production: two +// `[control-server] /agents/start:` log lines at the same millisecond) +// both pass the re-entrancy check at the top of startAgent before either +// has committed a 'starting' record. The second invocation aborts the +// first's startupAbortController and both paths race on session creation, +// idle timers, and SDK instance reference counts — leaving the agent in +// an inconsistent state (orphaned sessions, leaked sessionCount, etc). +// +// Serialising per agentId means the second caller waits for the first to +// complete (or abort) before proceeding, and then observes a consistent +// snapshot in `agents.get(agentId)`. +const startAgentLocks = new Map>(); + +// Exported for tests that exercise the locking behaviour directly without +// bringing up the whole SDK/process harness. Production callers should use +// `startAgent` (which wraps `startAgentImpl` with this lock). +export async function withStartAgentLock(agentId: string, fn: () => Promise): Promise { + const previous = startAgentLocks.get(agentId) ?? Promise.resolve(); + // Use the same explicit `new Promise` pattern as `sdkServerLock` above + // instead of `Promise.withResolvers`, which is not available on older + // Bun runtimes. This module is imported during container startup, so a + // missing global here would throw before the crash handlers are + // registered and prevent the control server from starting. + let releaseLock!: () => void; + const lockPromise = new Promise(resolve => { + releaseLock = resolve; + }); + startAgentLocks.set(agentId, lockPromise); + try { + await previous.catch(() => {}); + return await fn(); + } finally { + releaseLock(); + // Only clear the slot if no newer caller has queued behind us. + if (startAgentLocks.get(agentId) === lockPromise) { + startAgentLocks.delete(agentId); + } + } +} + export function getUptime(): number { return Date.now() - startTime; } @@ -540,6 +582,12 @@ function broadcastEvent(agentId: string, event: string, data: unknown): void { * corrupting each other's globals. Once created, the SDK instance is * cached and returned without locking. */ +const PERSIST_ENV_KEYS = new Set([ + 'KILO_CONFIG_CONTENT', + 'OPENCODE_CONFIG_CONTENT', + 'GASTOWN_ORGANIZATION_ID', +]); + async function ensureSDKServer( workdir: string, env: Record @@ -547,10 +595,23 @@ async function ensureSDKServer( // Fast path: reuse existing instance without locking. const existing = sdkInstances.get(workdir); if (existing) { - return { - client: existing.client, - port: parseInt(new URL(existing.server.url).port), - }; + const newConfig = env.KILO_CONFIG_CONTENT; + if (newConfig && newConfig !== existing.configContent) { + console.log( + `${MANAGER_LOG} ensureSDKServer: config mismatch for ${workdir}, evicting prewarmed server` + ); + existing.server.close(); + sdkInstances.delete(workdir); + } else { + for (const key of PERSIST_ENV_KEYS) { + const value = env[key]; + if (value) process.env[key] = value; + } + return { + client: existing.client, + port: parseInt(new URL(existing.server.url).port), + }; + } } // Slow path: serialize server creation. createKilo() reads process.cwd() @@ -570,26 +631,28 @@ async function ensureSDKServer( // Re-check after acquiring lock — another caller may have created it. const cached = sdkInstances.get(workdir); if (cached) { - return { - client: cached.client, - port: parseInt(new URL(cached.server.url).port), - }; + const newConfig = env.KILO_CONFIG_CONTENT; + if (newConfig && newConfig !== cached.configContent) { + console.log( + `${MANAGER_LOG} ensureSDKServer: config mismatch for ${workdir} (locked), evicting prewarmed server` + ); + cached.server.close(); + sdkInstances.delete(workdir); + } else { + for (const key of PERSIST_ENV_KEYS) { + const value = env[key]; + if (value) process.env[key] = value; + } + return { + client: cached.client, + port: parseInt(new URL(cached.server.url).port), + }; + } } const port = nextPort++; console.log(`${MANAGER_LOG} Starting SDK server on port ${port} for ${workdir}`); - // Keys that must persist on process.env after the SDK server starts. - // KILO_CONFIG_CONTENT / OPENCODE_CONFIG_CONTENT carry the kilo provider - // auth config (including organizationId) and must survive the snapshot - // restore so extractOrganizationId() and subsequent model hot-swaps can - // read them. GASTOWN_ORGANIZATION_ID is the standalone org ID env var. - const PERSIST_ENV_KEYS = new Set([ - 'KILO_CONFIG_CONTENT', - 'OPENCODE_CONFIG_CONTENT', - 'GASTOWN_ORGANIZATION_ID', - ]); - const envSnapshot: Record = {}; for (const key of Object.keys(env)) { envSnapshot[key] = process.env[key]; @@ -605,7 +668,12 @@ async function ensureSDKServer( timeout: 30_000, }); - const instance: SDKInstance = { client, server, sessionCount: 0 }; + const instance: SDKInstance = { + client, + server, + sessionCount: 0, + configContent: env.KILO_CONFIG_CONTENT, + }; sdkInstances.set(workdir, instance); console.log(`${MANAGER_LOG} SDK server started: ${server.url}`); @@ -1005,11 +1073,22 @@ async function subscribeToEvents( /** * Start an agent: ensure SDK server, create session, subscribe to events, * send initial prompt. + * + * Serialises concurrent callers for the same agentId so the re-entrancy + * handling inside `startAgentImpl` observes a consistent snapshot. */ export async function startAgent( request: StartAgentRequest, workdir: string, env: Record +): Promise { + return withStartAgentLock(request.agentId, () => startAgentImpl(request, workdir, env)); +} + +async function startAgentImpl( + request: StartAgentRequest, + workdir: string, + env: Record ): Promise { const existing = agents.get(request.agentId); if (existing && (existing.status === 'running' || existing.status === 'starting')) { @@ -1082,8 +1161,15 @@ export async function startAgent( phase: 'db_hydrated', elapsedMs: tDbDone - t0, }); + postEventToWorker('agent.startup_phase', { + agentId: request.agentId, + role: request.role, + label: 'db_hydrated', + elapsedMs: tDbDone - t0, + }); // 1. Ensure SDK server is running for this workdir + const sdkExistedBefore = sdkInstances.has(workdir); const { client, port } = await ensureSDKServer(workdir, env); agent.serverPort = port; const tSdkDone = Date.now(); @@ -1091,7 +1177,15 @@ export async function startAgent( agentId: request.agentId, phase: 'sdk_ready', elapsedMs: tSdkDone - t0, - phaseMs: tSdkDone - tDbDone, + phaseMs: sdkExistedBefore ? 0 : tSdkDone - tDbDone, + prewarmed: sdkExistedBefore, + }); + postEventToWorker('agent.startup_phase', { + agentId: request.agentId, + role: request.role, + label: 'sdk_ready', + elapsedMs: tSdkDone - t0, + phaseMs: sdkExistedBefore ? 0 : tSdkDone - tDbDone, }); // Check if startup was cancelled while waiting for the SDK server @@ -1148,6 +1242,13 @@ export async function startAgent( phaseMs: tSessionDone - tSdkDone, resumed, }); + postEventToWorker('agent.startup_phase', { + agentId: request.agentId, + role: request.role, + label: 'session_created', + elapsedMs: tSessionDone - t0, + phaseMs: tSessionDone - tSdkDone, + }); // Now check if startup was cancelled while creating the session. // agent.sessionId is already set, so the catch block will abort it. @@ -1815,6 +1916,80 @@ export async function refreshTokenForAllAgents(): Promise< return Promise.all(snapshot.map(restartAgent)); } +/** + * Minimal shape of `client.session` needed by {@link applyModelToSession}. + * Defined structurally so tests can pass a fake without pulling in the + * whole KiloClient type. + */ +type SessionPromptClient = { + session: { + prompt: (args: { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; + }) => Promise; + }; +}; + +/** + * Push a model selection onto a mayor session. + * + * For a freshly created session, sends the startup prompt together with + * the model param so the first turn runs the configured model. + * + * For a resumed session the startup prompt MUST NOT be replayed (it + * would recreate the duplicate turn regression fixed by 9785570b9), + * but the per-session model on the SDK server still needs to be updated + * so the next user turn uses the newly-selected model. We do this by + * sending a `noReply: true` prompt that carries only the model param; + * the SDK treats this as a state update and does not trigger the model. + * + * Errors on the resumed path are swallowed: if pushing the model fails, + * the mayor falls back to whichever model the SDK server loaded from + * KILO_CONFIG_CONTENT at startup, which we have already updated. + */ +export async function applyModelToSession(params: { + client: SessionPromptClient; + sessionId: string; + model: string; + prompt: string; + resumedSession: boolean; +}): Promise { + const { client, sessionId, model, prompt, resumedSession } = params; + const modelParam = { providerID: 'kilo', modelID: model }; + if (!resumedSession) { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: prompt }], + model: modelParam, + }, + }); + return; + } + try { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: '' }], + model: modelParam, + noReply: true, + }, + }); + console.log( + `${MANAGER_LOG} updateAgentModel: pushed model=${model} to resumed session ${sessionId}` + ); + } catch (err) { + console.warn( + `${MANAGER_LOG} updateAgentModel: failed to push model to resumed session ${sessionId}:`, + err + ); + } +} + /** * Update the model for a running agent by restarting its SDK server with * new KILO_CONFIG_CONTENT. The kilo serve child process reads the model @@ -1958,16 +2133,13 @@ export async function updateAgentModel( const prompt = conversationHistory ? `${conversationHistory}\n\n${MAYOR_STARTUP_PROMPT}` : MAYOR_STARTUP_PROMPT; - if (!resumedSession) { - const modelParam = { providerID: 'kilo', modelID: model }; - await client.session.prompt({ - path: { id: agent.sessionId }, - body: { - parts: [{ type: 'text', text: prompt }], - model: modelParam, - }, - }); - } + await applyModelToSession({ + client, + sessionId: agent.sessionId, + model, + prompt, + resumedSession, + }); agent.messageCount = 1; // 6. New server is healthy — now tear down the old one. @@ -2419,6 +2591,146 @@ export async function stopAll(): Promise { sdkInstances.clear(); } +function postEventToWorker( + event: string, + data: Record +): void { + const apiUrl = process.env.GASTOWN_API_URL; + const townId = process.env.GASTOWN_TOWN_ID; + const token = process.env.GASTOWN_CONTAINER_TOKEN; + if (!apiUrl || !townId || !token) return; + + fetch(`${apiUrl}/api/towns/${townId}/container-events`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ event, townId, ...data }), + }).catch(err => { + console.warn(`${MANAGER_LOG} postEventToWorker failed for ${event}:`, err); + }); +} + +async function fetchMayorAgentId( + townId: string, + apiUrl: string, + token: string +): Promise { + try { + const resp = await fetch(`${apiUrl}/api/towns/${townId}/mayor-id`, { + headers: { Authorization: `Bearer ${token}` }, + signal: AbortSignal.timeout(10_000), + }); + if (!resp.ok) { + console.log(`${MANAGER_LOG} fetchMayorAgentId: ${resp.status} for town ${townId}`); + return null; + } + const json: unknown = await resp.json(); + if ( + typeof json === 'object' && + json !== null && + 'agentId' in json && + typeof (json as { agentId: unknown }).agentId === 'string' + ) { + return (json as { agentId: string }).agentId; + } + return null; + } catch (err) { + console.warn(`${MANAGER_LOG} fetchMayorAgentId failed:`, err); + return null; + } +} + +function buildPrewarmEnv(mayorAgentId: string): Record { + const env: Record = { + KILO_TEST_HOME: `/tmp/agent-home-${mayorAgentId}`, + XDG_DATA_HOME: `/tmp/agent-home-${mayorAgentId}/.local/share`, + }; + const keys = [ + 'GASTOWN_API_URL', + 'GASTOWN_CONTAINER_TOKEN', + 'GASTOWN_TOWN_ID', + 'KILOCODE_TOKEN', + 'GASTOWN_ORGANIZATION_ID', + 'KILO_API_URL', + 'KILO_OPENROUTER_BASE', + ]; + for (const key of keys) { + const value = process.env[key]; + if (value) env[key] = value; + } + + const kilocodeToken = env.KILOCODE_TOKEN; + if (kilocodeToken) { + const organizationId = env.GASTOWN_ORGANIZATION_ID || undefined; + const configJson = buildKiloConfigContent( + kilocodeToken, + 'anthropic/claude-sonnet-4.6', + 'anthropic/claude-haiku-4.5', + organizationId + ); + env.KILO_CONFIG_CONTENT = configJson; + env.OPENCODE_CONFIG_CONTENT = configJson; + } + + return env; +} + +async function prewarmMayorSDK( + townId: string, + apiUrl: string, + token: string +): Promise { + const t0 = Date.now(); + + const mayorAgentId = await fetchMayorAgentId(townId, apiUrl, token); + if (!mayorAgentId) { + console.log(`${MANAGER_LOG} prewarmMayorSDK: no mayor agent for town ${townId}`); + return; + } + + const workdir = mayorWorkdirForTown(townId); + + await hydrateDbFromSnapshot(mayorAgentId, apiUrl, token, `mayor-${townId}`, townId); + + const env = buildPrewarmEnv(mayorAgentId); + + const existing = sdkInstances.get(workdir); + if (existing) { + const durationMs = Date.now() - t0; + log.info('mayor.prewarm_complete', { + agentId: mayorAgentId, + townId, + port: parseInt(new URL(existing.server.url).port), + durationMs, + alreadyRunning: true, + }); + postEventToWorker('mayor.prewarm_complete', { + agentId: mayorAgentId, + role: 'mayor', + durationMs, + }); + return; + } + + const { port } = await ensureSDKServer(workdir, env); + + const durationMs = Date.now() - t0; + log.info('mayor.prewarm_complete', { + agentId: mayorAgentId, + townId, + port, + durationMs, + alreadyRunning: false, + }); + postEventToWorker('mayor.prewarm_complete', { + agentId: mayorAgentId, + role: 'mayor', + durationMs, + }); +} + /** * Boot-time agent hydration — fetches the container registry from the * Gastown worker and resumes all registered agents. @@ -2469,34 +2781,49 @@ export async function bootHydration(): Promise { if (!Array.isArray(registry) || registry.length === 0) { console.log(`${LOG} No agents in registry — nothing to hydrate`); - return; - } + } else { + console.log(`${LOG} Resuming ${registry.length} agent(s) from registry`); - console.log(`${LOG} Resuming ${registry.length} agent(s) from registry`); + for (const entry of registry as Record[]) { + const agentId = entry.agentId as string | undefined; + const agentRequest = entry.request as StartAgentRequest | undefined; + const workdir = entry.workdir as string | undefined; + const env = entry.env as Record | undefined; - for (const entry of registry as Record[]) { - const agentId = entry.agentId as string | undefined; - const agentRequest = entry.request as StartAgentRequest | undefined; - const workdir = entry.workdir as string | undefined; - const env = entry.env as Record | undefined; + if (!agentId || !agentRequest || !workdir || !env) { + console.warn(`${LOG} Skipping malformed registry entry:`, entry); + continue; + } - if (!agentId || !agentRequest || !workdir || !env) { - console.warn(`${LOG} Skipping malformed registry entry:`, entry); - continue; - } + // Registry entries were written with the token snapshot at dispatch + // time. If we just refreshed, overlay the fresh value so the hydrated + // kilo serve child inherits the current token. + const hydratedEnv = { ...env, GASTOWN_CONTAINER_TOKEN: token }; - // Registry entries were written with the token snapshot at dispatch - // time. If we just refreshed, overlay the fresh value so the hydrated - // kilo serve child inherits the current token. - const hydratedEnv = { ...env, GASTOWN_CONTAINER_TOKEN: token }; + console.log(`${LOG} Resuming agent ${agentId} in ${workdir}`); + try { + await startAgent(agentRequest, workdir, hydratedEnv); + console.log(`${LOG} Agent ${agentId} resumed`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`${LOG} Failed to resume agent ${agentId}:`, msg); + } + } + } - console.log(`${LOG} Resuming agent ${agentId} in ${workdir}`); + const mayorAlreadyResumed = (Array.isArray(registry) ? registry : []).some( + (e: unknown) => + typeof e === 'object' && + e !== null && + 'request' in e && + typeof (e as { request?: { role?: string } }).request?.role === 'string' && + (e as { request: { role: string } }).request.role === 'mayor' + ); + if (!mayorAlreadyResumed) { try { - await startAgent(agentRequest, workdir, hydratedEnv); - console.log(`${LOG} Agent ${agentId} resumed`); + await prewarmMayorSDK(townId, apiUrl, token); } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - console.error(`${LOG} Failed to resume agent ${agentId}:`, msg); + console.warn(`${LOG} Mayor SDK prewarm failed:`, err); } } } diff --git a/services/gastown/container/vitest.config.ts b/services/gastown/container/vitest.config.ts index 468ee375fe..32b052a10d 100644 --- a/services/gastown/container/vitest.config.ts +++ b/services/gastown/container/vitest.config.ts @@ -3,6 +3,6 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { globals: false, - include: ['plugin/**/*.test.ts'], + include: ['plugin/**/*.test.ts', 'src/**/*.test.ts'], }, }); diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index bdbaf03575..a4ac4089d6 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -29,6 +29,7 @@ import * as dispatch from './town/container-dispatch'; import * as patrol from './town/patrol'; import * as scheduling from './town/scheduling'; import * as events from './town/events'; +import { stopContainerIfIdle as _stopContainerIfIdle } from './town/container-idle-stop'; import * as scm from './town/town-scm'; import * as reconciler from './town/reconciler'; import { applyAction } from './town/actions'; @@ -47,7 +48,7 @@ import { agent_metadata } from '../db/tables/agent-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; -import { town_events, TownEventRecord } from '../db/tables/town-events.table'; +import { town_events, TownEventRecord, type TownEventType } from '../db/tables/town-events.table'; import { agent_nudges, AgentNudgeRecord, @@ -2653,6 +2654,11 @@ export class TownDO extends DurableObject { * Called eagerly on page load so the terminal is available immediately * without requiring the user to send a message first. */ + async getMayorAgentId(): Promise { + const mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; + return mayor?.id ?? null; + } + async ensureMayor(): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting'; @@ -2681,16 +2687,47 @@ export class TownDO extends DurableObject { logger.setTags({ agentId: mayor.id }); - // Check if the container is already running + // Check if the container is already running AND the SDK has a live + // session for the mayor. The SDK can be torn down (serverPort=0, + // sessionId='') after stream errors or drain while the agent record + // still says "running" — in that case we must fall through to a + // fresh dispatch instead of returning early. const containerStatus = await dispatch.checkAgentContainerStatus(this.env, townId, mayor.id); const isAlive = containerStatus.status === 'running' || containerStatus.status === 'starting'; + const sdkAlive = isAlive && (containerStatus.serverPort ?? 0) > 0 && Boolean(containerStatus.sessionId); - if (isAlive) { + if (sdkAlive) { const isActive = mayor.status === 'working' || mayor.status === 'stalled' || mayor.status === 'waiting'; + writeEvent(this.env, { + event: 'mayor.ensure_decision', + townId, + agentId: mayor.id, + role: 'mayor', + label: isActive ? 'short_circuit_warm' : 'short_circuit_idle', + }); return { agentId: mayor.id, sessionStatus: isActive ? 'active' : 'idle' }; } + // Container says running/starting but SDK has no port/session — the + // SDK was torn down (e.g. stream error, drain). Fall through to a + // fresh dispatch so the user doesn't have to manually refresh. + if (isAlive && !sdkAlive) { + logger.info('ensureMayor: container alive but SDK torn down, redispatching', { + agentId: mayor.id, + containerStatus: containerStatus.status, + serverPort: containerStatus.serverPort, + sessionId: containerStatus.sessionId, + }); + writeEvent(this.env, { + event: 'mayor.ensure_decision', + townId, + agentId: mayor.id, + role: 'mayor', + label: 'sdk_dead_redispatch', + }); + } + // Start the container with an idle mayor (no initial prompt) const townConfig = await this.getTownConfig(); const rigConfig = await this.getMayorRigConfig(); @@ -2708,6 +2745,14 @@ export class TownDO extends DurableObject { return { agentId: mayor.id, sessionStatus: 'idle' }; } + writeEvent(this.env, { + event: 'mayor.ensure_decision', + townId, + agentId: mayor.id, + role: 'mayor', + label: 'fresh_dispatch', + }); + try { const containerStub = getTownContainerStub(this.env, townId); await containerStub.setEnvVar('KILOCODE_TOKEN', kilocodeToken); @@ -3896,15 +3941,27 @@ export class TownDO extends DurableObject { reconciler.applyEvent(this.sql, event, { townConfig }); events.markProcessed(this.sql, event.event_id); } catch (err) { - logger.error('reconciler: applyEvent failed', { - eventId: event.event_id, - eventType: event.event_type, - error: err instanceof Error ? err.message : String(err), - }); - // Event stays unprocessed — will be retried on the next alarm tick. - // Mark it processed anyway after 3 consecutive failures to prevent - // a poison event from blocking the entire queue forever. - // For now, we skip it and let the next tick retry. + const message = err instanceof Error ? err.message : String(err); + // Terminal errors referencing a missing bead/agent can never + // succeed on retry — mark them processed so the drain loop + // stops re-running them every alarm tick. + const isMissingEntity = + err instanceof Error && /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); + if (isMissingEntity) { + logger.warn('reconciler: applyEvent skipped (missing entity)', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + events.markProcessed(this.sql, event.event_id); + } else { + logger.error('reconciler: applyEvent failed', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + // Event stays unprocessed — will be retried on the next alarm tick. + } } } } catch (err) { @@ -4092,6 +4149,12 @@ export class TownDO extends DurableObject { }), ]); + await this.stopContainerIfIdle().catch(err => + logger.warn('alarm: stopContainerIfIdle failed', { + error: err instanceof Error ? err.message : String(err), + }) + ); + // Re-arm: fast when active, slow when idle const interval = activeWork ? ACTIVE_ALARM_INTERVAL_MS : IDLE_ALARM_INTERVAL_MS; await this.ctx.storage.setAlarm(Date.now() + interval); @@ -4152,6 +4215,27 @@ export class TownDO extends DurableObject { await this.ctx.storage.put('container:lastTokenRefreshAt', now); } + /** + * Proactively stop the town container when the town is idle. + * + * Cloudflare's sleepAfter timer resets on any port-8080 traffic (including + * long-lived PTY WebSockets), so containers can stay awake for hours after + * all real work finishes. Delegates to container-idle-stop sub-module. + */ + private async stopContainerIfIdle(): Promise { + await _stopContainerIfIdle({ + hasActiveWork: () => this.hasActiveWork(), + isDraining: () => this._draining, + getMayor: () => agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null, + getTownId: () => this.townId, + getLastIdleStopAt: () => this.ctx.storage.get('container:lastIdleStopAt'), + setLastIdleStopAt: (value) => this.ctx.storage.put('container:lastIdleStopAt', value), + getContainerStub: (townId) => getTownContainerStub(this.env, townId), + writeEventFn: (data) => writeEvent(this.env, data), + now: () => Date.now(), + }); + } + /** * Proactively remint KILOCODE_TOKEN when it's approaching expiry. * Throttled to once per day — the 30-day token is refreshed when @@ -5161,6 +5245,56 @@ export class TownDO extends DurableObject { ]; } + async debugTownEvents(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${town_events.event_id}, + ${town_events.event_type}, + ${town_events.agent_id}, + ${town_events.bead_id}, + ${town_events.processed_at} + FROM ${town_events} + ORDER BY ${town_events.created_at} ASC + `, + [] + ), + ]; + } + + /** + * Test-only helper: directly insert a row into the town_events queue + * without going through the producer APIs. Used to reproduce orphan + * events (referencing deleted beads/agents) in tests. + */ + async debugInsertTownEvent(input: { + event_type: TownEventType; + agent_id?: string | null; + bead_id?: string | null; + payload?: Record; + }): Promise { + const eventId = events.insertEvent(this.sql, input.event_type, { + agent_id: input.agent_id ?? null, + bead_id: input.bead_id ?? null, + payload: input.payload ?? {}, + }); + await this.armAlarmIfNeeded(); + return eventId; + } + + /** + * Test-only helper: insert a container_status event for a given agent. + * Mirrors the container observer's upsert so tests can verify that + * deleteBead sweeps agent-keyed events. + */ + async debugRecordContainerStatus( + agentId: string, + payload: { status: string; exit_reason?: string | null } + ): Promise { + events.upsertContainerStatus(this.sql, agentId, payload); + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/services/gastown/src/dos/town/beads.ts b/services/gastown/src/dos/town/beads.ts index a3faa75437..945679cf1f 100644 --- a/services/gastown/src/dos/town/beads.ts +++ b/services/gastown/src/dos/town/beads.ts @@ -41,6 +41,7 @@ import { createTableConvoyMetadata, migrateConvoyMetadata, } from '../../db/tables/convoy-metadata.table'; +import { town_events } from '../../db/tables/town-events.table'; import { query } from '../../util/query.util'; import type { CreateBeadInput, @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo beadId, ]); + // Remove any pending/processed reconciler events targeting this bead or + // this agent (agents are themselves beads, so deleteBead is used for both). + // Without this, bead_cancelled / container_status / … events that reference + // a deleted bead make applyEvent throw forever on every alarm tick. + query( + sql, + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`, + [beadId, beadId] + ); + query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]); return true; } @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string): ...allIdsArr ); + // Remove any reconciler events referencing these beads/agents. See + // deleteBead above for rationale. + sql.exec( + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} IN (${placeholders}) + OR ${town_events.agent_id} IN (${placeholders})`, + ...allIdsArr, + ...allIdsArr + ); + // Delete the beads themselves sql.exec( /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`, diff --git a/services/gastown/src/dos/town/container-dispatch.ts b/services/gastown/src/dos/town/container-dispatch.ts index e2559f5d27..4c37825376 100644 --- a/services/gastown/src/dos/town/container-dispatch.ts +++ b/services/gastown/src/dos/town/container-dispatch.ts @@ -669,7 +669,7 @@ export async function checkAgentContainerStatus( env: Env, townId: string, agentId: string -): Promise<{ status: string; exitReason?: string }> { +): Promise<{ status: string; exitReason?: string; serverPort?: number; sessionId?: string }> { try { const container = getTownContainerStub(env, townId); const response = await container.fetch(`http://container/agents/${agentId}/status`, { @@ -689,9 +689,15 @@ export async function checkAgentContainerStatus( const status = (data as { status: unknown }).status; const exitReason = 'exitReason' in data ? (data as { exitReason: unknown }).exitReason : undefined; + const serverPort = + 'serverPort' in data ? (data as { serverPort: unknown }).serverPort : undefined; + const sessionId = + 'sessionId' in data ? (data as { sessionId: unknown }).sessionId : undefined; return { status: typeof status === 'string' ? status : 'unknown', exitReason: typeof exitReason === 'string' ? exitReason : undefined, + serverPort: typeof serverPort === 'number' ? serverPort : undefined, + sessionId: typeof sessionId === 'string' && sessionId.length > 0 ? sessionId : undefined, }; } return { status: 'unknown' }; diff --git a/services/gastown/src/dos/town/container-idle-stop.test.ts b/services/gastown/src/dos/town/container-idle-stop.test.ts new file mode 100644 index 0000000000..be75051ffd --- /dev/null +++ b/services/gastown/src/dos/town/container-idle-stop.test.ts @@ -0,0 +1,205 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + stopContainerIfIdle, + CONTAINER_IDLE_STOP_THRESHOLD_MS, + CONTAINER_IDLE_STOP_THROTTLE_MS, + type IdleStopDeps, +} from './container-idle-stop'; + +function makeMayor(overrides: Partial<{ status: string; last_activity_at: string }> = {}) { + return { + id: 'mayor-1', + rig_id: null, + role: 'mayor' as const, + name: 'Mayor', + identity: 'Mayor@test', + status: overrides.status ?? 'idle', + current_hook_bead_id: null, + dispatch_attempts: 0, + last_activity_at: overrides.last_activity_at ?? new Date().toISOString(), + checkpoint: null, + created_at: new Date().toISOString(), + agent_status_message: null, + agent_status_updated_at: null, + }; +} + +type TestDeps = IdleStopDeps & { + _stopFn: ReturnType; + _getStateFn: ReturnType; + _store: Map; + _events: Array<{ event: string; townId: string; reason: string; error?: string }>; +}; + +function makeDeps(overrides: Partial = {}): TestDeps { + const stopFn = vi.fn().mockResolvedValue(undefined); + const getStateFn = vi.fn().mockResolvedValue({ status: 'running' }); + const store = new Map(); + const events: Array<{ event: string; townId: string; reason: string; error?: string }> = []; + + return { + hasActiveWork: overrides.hasActiveWork ?? (() => false), + isDraining: overrides.isDraining ?? (() => false), + getMayor: overrides.getMayor ?? (() => null), + getTownId: overrides.getTownId ?? (() => 'town-1'), + getLastIdleStopAt: + overrides.getLastIdleStopAt ?? (() => Promise.resolve(store.get('container:lastIdleStopAt'))), + setLastIdleStopAt: + overrides.setLastIdleStopAt ?? + ((value: number) => { + store.set('container:lastIdleStopAt', value); + return Promise.resolve(); + }), + getContainerStub: + overrides.getContainerStub ?? + (() => ({ + getState: getStateFn, + stop: stopFn, + })), + writeEventFn: + overrides.writeEventFn ?? + ((data) => { + events.push(data); + }), + now: overrides.now ?? (() => Date.now()), + _stopFn: stopFn, + _getStateFn: getStateFn, + _store: store, + _events: events, + } as TestDeps; +} + +describe('stopContainerIfIdle', () => { + it('does not stop when town has active work', async () => { + const deps = makeDeps({ hasActiveWork: () => true }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when draining', async () => { + const deps = makeDeps({ isDraining: () => true }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor is working', async () => { + const deps = makeDeps({ getMayor: () => makeMayor({ status: 'working' }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor is stalled', async () => { + const deps = makeDeps({ getMayor: () => makeMayor({ status: 'stalled' }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('does not stop when mayor last_activity_at is within threshold', async () => { + const recentActivity = new Date(Date.now() - 60_000).toISOString(); + const deps = makeDeps({ getMayor: () => makeMayor({ last_activity_at: recentActivity }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('stops container when mayor idle beyond threshold and container is running', async () => { + const oldActivity = new Date(Date.now() - CONTAINER_IDLE_STOP_THRESHOLD_MS - 60_000).toISOString(); + const deps = makeDeps({ getMayor: () => makeMayor({ last_activity_at: oldActivity }) }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + expect(deps._events).toHaveLength(1); + expect(deps._events[0].event).toBe('container.idle_stop'); + expect(deps._events[0].reason).toMatch(/^mayor_idle_\d+m$/); + }); + + it('stops container when no mayor exists (no_active_work reason)', async () => { + const deps = makeDeps({ getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + expect(deps._events[0].reason).toBe('no_active_work'); + }); + + it('stops container when container is healthy', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'healthy' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).toHaveBeenCalledTimes(1); + expect(deps._events[0].reason).toBe('no_active_work'); + }); + + it('does not stop when container is already stopped', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'stopped' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).not.toHaveBeenCalled(); + }); + + it('throttles: calling twice within throttle window stops only once', async () => { + const deps = makeDeps({ getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + }); + + it('allows stop again after throttle window passes', async () => { + let currentTime = Date.now(); + const deps = makeDeps({ + getMayor: () => null, + now: () => currentTime, + }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(1); + + currentTime += CONTAINER_IDLE_STOP_THROTTLE_MS + 1; + await stopContainerIfIdle(deps); + expect(deps._stopFn).toHaveBeenCalledTimes(2); + }); + + it('logs error and does not set throttle when stop() throws', async () => { + const stopFn = vi.fn().mockRejectedValue(new Error('stop failed')); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockResolvedValue({ status: 'running' }), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + + expect(deps._events).toHaveLength(1); + expect(deps._events[0].error).toBe('stop failed'); + expect(deps._store.has('container:lastIdleStopAt')).toBe(false); + }); + + it('returns without stopping when townId is null', async () => { + const deps = makeDeps({ getTownId: () => null, getMayor: () => null }); + await stopContainerIfIdle(deps); + expect(deps._stopFn).not.toHaveBeenCalled(); + }); + + it('returns without stopping when getState() throws', async () => { + const stopFn = vi.fn().mockResolvedValue(undefined); + const deps = makeDeps({ + getMayor: () => null, + getContainerStub: () => ({ + getState: vi.fn().mockRejectedValue(new Error('rpc failed')), + stop: stopFn, + }), + }); + await stopContainerIfIdle(deps); + expect(stopFn).not.toHaveBeenCalled(); + }); +}); diff --git a/services/gastown/src/dos/town/container-idle-stop.ts b/services/gastown/src/dos/town/container-idle-stop.ts new file mode 100644 index 0000000000..eadb313468 --- /dev/null +++ b/services/gastown/src/dos/town/container-idle-stop.ts @@ -0,0 +1,81 @@ +/** + * Proactive idle-container stop logic. + * + * Cloudflare's sleepAfter timer resets on any port-8080 traffic (including + * long-lived PTY WebSockets), so containers can stay awake for hours after + * all real work finishes. This module provides the decision logic for + * stopping the container from the TownDO alarm when the town is truly idle. + */ + +import { logger } from '../../util/log.util'; +import type { Agent } from '../../types'; + +export const CONTAINER_IDLE_STOP_THRESHOLD_MS = 5 * 60_000; +export const CONTAINER_IDLE_STOP_THROTTLE_MS = 2 * 60_000; + +export type IdleStopDeps = { + hasActiveWork: () => boolean; + isDraining: () => boolean; + getMayor: () => Agent | null; + getTownId: () => string | null; + getLastIdleStopAt: () => Promise; + setLastIdleStopAt: (value: number) => Promise; + getContainerStub: (townId: string) => { getState: () => Promise<{ status: string }>; stop: () => Promise }; + writeEventFn: (data: { event: string; townId: string; reason: string; error?: string }) => void; + now: () => number; +}; + +export async function stopContainerIfIdle(deps: IdleStopDeps): Promise { + if (deps.hasActiveWork()) return; + if (deps.isDraining()) return; + + const mayor = deps.getMayor(); + const mayorAlive = mayor && (mayor.status === 'working' || mayor.status === 'stalled'); + if (mayorAlive) return; + + if (mayor && mayor.last_activity_at != null) { + const lastActivity = new Date(mayor.last_activity_at).getTime(); + if (deps.now() - lastActivity <= CONTAINER_IDLE_STOP_THRESHOLD_MS) return; + } + + const townId = deps.getTownId(); + if (!townId) return; + + const now = deps.now(); + const lastIdleStop = (await deps.getLastIdleStopAt()) ?? 0; + if (now - lastIdleStop < CONTAINER_IDLE_STOP_THROTTLE_MS) return; + + const stub = deps.getContainerStub(townId); + let state: { status: string }; + try { + state = await stub.getState(); + } catch (err) { + logger.warn('stopContainerIfIdle: getState() failed', { + error: err instanceof Error ? err.message : String(err), + }); + return; + } + + if (state.status !== 'running' && state.status !== 'healthy') return; + + const idleMinutes = mayor?.last_activity_at != null + ? Math.round((deps.now() - new Date(mayor.last_activity_at).getTime()) / 60_000) + : 0; + const reason = mayor ? `mayor_idle_${idleMinutes}m` : 'no_active_work'; + + try { + await stub.stop(); + await deps.setLastIdleStopAt(now); + deps.writeEventFn({ event: 'container.idle_stop', townId, reason }); + } catch (err) { + logger.warn('stopContainerIfIdle: stop() failed', { + error: err instanceof Error ? err.message : String(err), + }); + deps.writeEventFn({ + event: 'container.idle_stop', + townId, + reason, + error: err instanceof Error ? err.message.slice(0, 300) : String(err).slice(0, 300), + }); + } +} diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index 8e9b1d4671..f7a62d0cf3 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -289,6 +289,17 @@ export function applyEvent( console.warn(`${LOG} applyEvent: bead_cancelled missing bead_id`); return; } + // Tolerate the bead having been deleted after the event was enqueued. + // Without this guard updateBeadStatus throws `Bead not found`, + // the drain loop can't mark the event processed, and the error + // recurs on every alarm tick forever. + const existing = beadOps.getBead(sql, event.bead_id); + if (!existing) { + console.warn( + `${LOG} applyEvent: bead_cancelled target bead ${event.bead_id} no longer exists — skipping` + ); + return; + } const cancelStatus = payload.cancel_status === 'closed' || payload.cancel_status === 'failed' ? payload.cancel_status diff --git a/services/gastown/src/gastown.worker.ts b/services/gastown/src/gastown.worker.ts index 0ac039a284..e743e7c753 100644 --- a/services/gastown/src/gastown.worker.ts +++ b/services/gastown/src/gastown.worker.ts @@ -8,6 +8,7 @@ import { getTownContainerStub } from './dos/TownContainer.do'; import { getTownDOStub } from './dos/Town.do'; import { TownConfigUpdateSchema } from './types'; import { resError } from './util/res.util'; +import { writeEvent } from './util/analytics.util'; import { authMiddleware, agentOnlyMiddleware, @@ -659,6 +660,56 @@ app.delete('/api/towns/:townId/rigs/:rigId/agents/:agentId/db-snapshot', async c return c.json({ success: true }); }); +// ── Mayor Agent ID ────────────────────────────────────────────────────── +// Returns the mayor's agent ID for a town so the container can prewarm +// the mayor's SDK server during bootHydration. Protected by authMiddleware +// (accepts container JWTs), not kiloAuthMiddleware. + +app.use('/api/towns/:townId/mayor-id', async (c: Context, next) => + c.env.ENVIRONMENT === 'development' ? next() : authMiddleware(c, next) +); + +app.get('/api/towns/:townId/mayor-id', async c => { + const townId = c.req.param('townId'); + const town = getTownDOStub(c.env, townId); + const agentId = await town.getMayorAgentId(); + return c.json({ success: true, agentId }); +}); + +// ── Container Events ───────────────────────────────────────────────────── +// Container-to-worker event proxy. The container can't call writeEvent +// directly (it's worker-side), so it POSTs events here. Protected by +// authMiddleware (accepts container JWTs), not kiloAuthMiddleware. + +app.use('/api/towns/:townId/container-events', async (c: Context, next) => + c.env.ENVIRONMENT === 'development' ? next() : authMiddleware(c, next) +); + +app.post('/api/towns/:townId/container-events', async c => { + const townId = c.req.param('townId'); + const body: unknown = await c.req.json(); + if ( + typeof body !== 'object' || + body === null || + !('event' in body) || + typeof (body as { event: unknown }).event !== 'string' + ) { + return c.json({ success: false, error: 'Missing event name' }, 400); + } + const data = body as { event: string; [key: string]: unknown }; + writeEvent(c.env, { + event: data.event, + townId, + agentId: typeof data.agentId === 'string' ? data.agentId : undefined, + durationMs: typeof data.durationMs === 'number' ? data.durationMs : undefined, + role: typeof data.role === 'string' ? data.role : undefined, + label: typeof data.label === 'string' ? data.label : undefined, + double3: typeof data.phaseMs === 'number' ? data.phaseMs : undefined, + double4: typeof data.elapsedMs === 'number' ? data.elapsedMs : undefined, + }); + return c.json({ success: true }); +}); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead @@ -671,7 +722,7 @@ app.use('/api/users/*', async (c: Context, next) => // Skip for container-registry and db-snapshot routes which use authMiddleware with container JWT support. app.use('/api/towns/:townId/*', async (c: Context, next) => { const path = c.req.path; - if (path.includes('/container-registry') || path.includes('/db-snapshot')) { + if (path.includes('/container-registry') || path.includes('/db-snapshot') || path.includes('/mayor-id') || path.includes('/container-events')) { return next(); } await kiloAuthMiddleware(c, async () => { diff --git a/services/gastown/test/integration/event-cleanup.test.ts b/services/gastown/test/integration/event-cleanup.test.ts new file mode 100644 index 0000000000..bb4f3d3b3c --- /dev/null +++ b/services/gastown/test/integration/event-cleanup.test.ts @@ -0,0 +1,183 @@ +/** + * Tests for the "orphaned bead_cancelled events retried forever" bug. + * + * Two independent fixes compose to eliminate the failure: + * Fix 1: deleteBead / deleteBeads purge town_events rows that reference + * the deleted bead (by bead_id or agent_id), so the drain loop + * never sees them. + * Fix 2a: reconciler.applyEvent('bead_cancelled') tolerates the bead + * being missing (returns early, logs warn — does not throw). + * Fix 2b: the Town.do.ts drain loop recognises "Bead/Agent ... not + * found" terminal errors and marks the offending event + * processed so it is not retried forever. + */ + +import { env, runDurableObjectAlarm } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name: string) { + return env.TOWN.get(env.TOWN.idFromName(name)); +} + +describe('town_events cleanup on bead deletion (#fix-1)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evcleanup-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('deleteBead removes pending town_events referencing the bead by bead_id', async () => { + const bead = await town.createBead({ + type: 'issue', + title: 'To be deleted', + rig_id: 'rig-1', + }); + + // Transitioning to a terminal status enqueues a bead_cancelled event. + await town.updateBeadStatus(bead.bead_id, 'failed', 'system'); + + const pendingBefore = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + processed_at: string | null; + }>; + expect( + pendingBefore.filter(e => e.bead_id === bead.bead_id && e.processed_at === null).length + ).toBeGreaterThan(0); + + await town.deleteBead(bead.bead_id); + + const pendingAfter = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + agent_id: string | null; + }>; + expect(pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id)).toBe( + false + ); + }); + + it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P1', + identity: `ev-agent-${townName}`, + rig_id: 'rig-1', + }); + + // Upsert a container_status event keyed by agent_id — this is the shape + // of events that hang off an agent's bead row. + await town.debugRecordContainerStatus(agent.id, { status: 'running' }); + + const beforeRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(beforeRows.some(e => e.agent_id === agent.id)).toBe(true); + + // deleteBead is used for agents too (agents are beads). + await town.deleteBead(agent.id); + + const afterRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(afterRows.some(e => e.agent_id === agent.id)).toBe(false); + }); + + it('deleteBeads bulk path removes events for every deleted bead', async () => { + const a = await town.createBead({ type: 'issue', title: 'A', rig_id: 'rig-1' }); + const b = await town.createBead({ type: 'issue', title: 'B', rig_id: 'rig-1' }); + + await town.updateBeadStatus(a.bead_id, 'failed', 'system'); + await town.updateBeadStatus(b.bead_id, 'failed', 'system'); + + const before = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(before.filter(e => e.bead_id === a.bead_id).length).toBeGreaterThan(0); + expect(before.filter(e => e.bead_id === b.bead_id).length).toBeGreaterThan(0); + + await town.deleteBeads([a.bead_id, b.bead_id]); + + const after = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(after.some(e => e.bead_id === a.bead_id)).toBe(false); + expect(after.some(e => e.bead_id === b.bead_id)).toBe(false); + }); +}); + +describe('applyEvent tolerance + drain loop marks missing-entity events processed (#fix-2)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evtolerate-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('drain loop marks a bead_cancelled event processed when the bead is gone', async () => { + // Simulate the historical orphan: enqueue a bead_cancelled event whose + // bead has been deleted (or never existed). Before Fix 2, applyEvent + // would throw `Bead not found` forever on every alarm tick. + await town.debugInsertTownEvent({ + event_type: 'bead_cancelled', + bead_id: '00000000-0000-4000-8000-000000000001', + payload: { cancel_status: 'failed' }, + }); + + const beforeDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphan = beforeDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + expect(orphan?.processed_at).toBeNull(); + + await runDurableObjectAlarm(town); + + // After the alarm, the orphan event should be processed — not retried. + const afterDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = afterDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + // If retention GC already pruned it, that's also acceptable — the key + // invariant is that it is no longer pending. + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); + + it('drain loop marks an agent-missing event processed too', async () => { + await town.debugInsertTownEvent({ + event_type: 'agent_done', + agent_id: '00000000-0000-4000-8000-0000000000aa', + payload: { branch: 'gt/ghost' }, + }); + + await runDurableObjectAlarm(town); + + const after = (await town.debugTownEvents()) as Array<{ + event_type: string; + agent_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = after.find( + e => e.event_type === 'agent_done' && e.agent_id === '00000000-0000-4000-8000-0000000000aa' + ); + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); +}); diff --git a/services/gastown/test/integration/mayor-sdk-fallthrough.test.ts b/services/gastown/test/integration/mayor-sdk-fallthrough.test.ts new file mode 100644 index 0000000000..e52d074bb4 --- /dev/null +++ b/services/gastown/test/integration/mayor-sdk-fallthrough.test.ts @@ -0,0 +1,99 @@ +/** + * Integration tests for the torn-down-SDK fall-through in _ensureMayor. + * + * Change 3 of the mayor startup optimization: when the container reports + * the mayor as "running"/"starting" but the SDK has no serverPort or + * sessionId (torn down after stream errors or drain), _ensureMayor must + * fall through to a fresh dispatch instead of returning early. + * + * In the test environment there's no real container, so + * checkAgentContainerStatus returns { status: 'unknown' } or + * { status: 'not_found' }. These tests verify that: + * 1. ensureMayor falls through when the container status is not "running"/"starting" + * 2. checkAgentContainerStatus surfaces serverPort and sessionId when available + * 3. The sdkAlive check correctly rejects zero/empty port/session values + */ + +import { env } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name = 'test-town') { + const id = env.TOWN.idFromName(name); + return env.TOWN.get(id); +} + +describe('ensureMayor torn-down-SDK fall-through', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `sdk-fallthrough-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + describe('container not available (test env baseline)', () => { + it('should fall through when container status is not running/starting', async () => { + const result = await town.ensureMayor(); + expect(result.agentId).toBeTruthy(); + expect(result.sessionStatus).toBe('idle'); + }); + + it('should return the same agentId on repeated ensureMayor calls', async () => { + const first = await town.ensureMayor(); + const second = await town.ensureMayor(); + expect(first.agentId).toBe(second.agentId); + }); + }); + + describe('sdkAlive validation logic', () => { + it('should reject zero serverPort (SDK torn down)', () => { + const isAlive = true; + const serverPort = 0; + const sessionId = 'some-session'; + const sdkAlive = isAlive && (serverPort ?? 0) > 0 && Boolean(sessionId); + expect(sdkAlive).toBe(false); + }); + + it('should reject empty sessionId (SDK torn down)', () => { + const isAlive = true; + const serverPort = 8080; + const sessionId = ''; + const sdkAlive = isAlive && (serverPort ?? 0) > 0 && Boolean(sessionId); + expect(sdkAlive).toBe(false); + }); + + it('should accept valid serverPort and sessionId', () => { + const isAlive = true; + const serverPort = 8080; + const sessionId = 'session-123'; + const sdkAlive = isAlive && (serverPort ?? 0) > 0 && Boolean(sessionId); + expect(sdkAlive).toBe(true); + }); + + it('should reject when container says not alive', () => { + const isAlive = false; + const serverPort = 8080; + const sessionId = 'session-123'; + const sdkAlive = isAlive && (serverPort ?? 0) > 0 && Boolean(sessionId); + expect(sdkAlive).toBe(false); + }); + }); + + describe('checkAgentContainerStatus response parsing', () => { + it('should include serverPort and sessionId from container response', async () => { + const agentId = (await town.ensureMayor()).agentId; + const container = env.TOWN_CONTAINER.get(env.TOWN_CONTAINER.idFromName(townName)); + const response = await container.fetch(`http://container/agents/${agentId}/status`, { + signal: AbortSignal.timeout(5_000), + }); + expect(response.status).toBe(404); + }); + }); +}); diff --git a/services/gastown/wrangler.jsonc b/services/gastown/wrangler.jsonc index 4cc1410810..e613dc666a 100644 --- a/services/gastown/wrangler.jsonc +++ b/services/gastown/wrangler.jsonc @@ -148,7 +148,7 @@ "services": [ { "binding": "GIT_TOKEN_SERVICE", - "service": "git-token-service", + "service": "git-token-service-dev", "entrypoint": "GitTokenRPCEntrypoint", }, ],