diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index bdbaf03575..a6be9aecdb 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -47,7 +47,11 @@ import { agent_metadata } from '../db/tables/agent-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; -import { town_events, TownEventRecord } from '../db/tables/town-events.table'; +import { + town_events, + TownEventRecord, + type TownEventType, +} from '../db/tables/town-events.table'; import { agent_nudges, AgentNudgeRecord, @@ -3896,15 +3900,28 @@ export class TownDO extends DurableObject { reconciler.applyEvent(this.sql, event, { townConfig }); events.markProcessed(this.sql, event.event_id); } catch (err) { - logger.error('reconciler: applyEvent failed', { - eventId: event.event_id, - eventType: event.event_type, - error: err instanceof Error ? err.message : String(err), - }); - // Event stays unprocessed — will be retried on the next alarm tick. - // Mark it processed anyway after 3 consecutive failures to prevent - // a poison event from blocking the entire queue forever. - // For now, we skip it and let the next tick retry. + const message = err instanceof Error ? err.message : String(err); + // Terminal errors referencing a missing bead/agent can never + // succeed on retry — mark them processed so the drain loop + // stops re-running them every alarm tick. + const isMissingEntity = + err instanceof Error && + /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); + if (isMissingEntity) { + logger.warn('reconciler: applyEvent skipped (missing entity)', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + events.markProcessed(this.sql, event.event_id); + } else { + logger.error('reconciler: applyEvent failed', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + // Event stays unprocessed — will be retried on the next alarm tick. + } } } } catch (err) { @@ -5161,6 +5178,56 @@ export class TownDO extends DurableObject { ]; } + async debugTownEvents(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${town_events.event_id}, + ${town_events.event_type}, + ${town_events.agent_id}, + ${town_events.bead_id}, + ${town_events.processed_at} + FROM ${town_events} + ORDER BY ${town_events.created_at} ASC + `, + [] + ), + ]; + } + + /** + * Test-only helper: directly insert a row into the town_events queue + * without going through the producer APIs. Used to reproduce orphan + * events (referencing deleted beads/agents) in tests. + */ + async debugInsertTownEvent(input: { + event_type: TownEventType; + agent_id?: string | null; + bead_id?: string | null; + payload?: Record; + }): Promise { + const eventId = events.insertEvent(this.sql, input.event_type, { + agent_id: input.agent_id ?? null, + bead_id: input.bead_id ?? null, + payload: input.payload ?? {}, + }); + await this.armAlarmIfNeeded(); + return eventId; + } + + /** + * Test-only helper: insert a container_status event for a given agent. + * Mirrors the container observer's upsert so tests can verify that + * deleteBead sweeps agent-keyed events. + */ + async debugRecordContainerStatus( + agentId: string, + payload: { status: string; exit_reason?: string | null } + ): Promise { + events.upsertContainerStatus(this.sql, agentId, payload); + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/services/gastown/src/dos/town/beads.ts b/services/gastown/src/dos/town/beads.ts index a3faa75437..945679cf1f 100644 --- a/services/gastown/src/dos/town/beads.ts +++ b/services/gastown/src/dos/town/beads.ts @@ -41,6 +41,7 @@ import { createTableConvoyMetadata, migrateConvoyMetadata, } from '../../db/tables/convoy-metadata.table'; +import { town_events } from '../../db/tables/town-events.table'; import { query } from '../../util/query.util'; import type { CreateBeadInput, @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo beadId, ]); + // Remove any pending/processed reconciler events targeting this bead or + // this agent (agents are themselves beads, so deleteBead is used for both). + // Without this, bead_cancelled / container_status / … events that reference + // a deleted bead make applyEvent throw forever on every alarm tick. + query( + sql, + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`, + [beadId, beadId] + ); + query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]); return true; } @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string): ...allIdsArr ); + // Remove any reconciler events referencing these beads/agents. See + // deleteBead above for rationale. + sql.exec( + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} IN (${placeholders}) + OR ${town_events.agent_id} IN (${placeholders})`, + ...allIdsArr, + ...allIdsArr + ); + // Delete the beads themselves sql.exec( /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`, diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index 8e9b1d4671..f7a62d0cf3 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -289,6 +289,17 @@ export function applyEvent( console.warn(`${LOG} applyEvent: bead_cancelled missing bead_id`); return; } + // Tolerate the bead having been deleted after the event was enqueued. + // Without this guard updateBeadStatus throws `Bead not found`, + // the drain loop can't mark the event processed, and the error + // recurs on every alarm tick forever. + const existing = beadOps.getBead(sql, event.bead_id); + if (!existing) { + console.warn( + `${LOG} applyEvent: bead_cancelled target bead ${event.bead_id} no longer exists — skipping` + ); + return; + } const cancelStatus = payload.cancel_status === 'closed' || payload.cancel_status === 'failed' ? payload.cancel_status diff --git a/services/gastown/test/integration/event-cleanup.test.ts b/services/gastown/test/integration/event-cleanup.test.ts new file mode 100644 index 0000000000..0d964e44c9 --- /dev/null +++ b/services/gastown/test/integration/event-cleanup.test.ts @@ -0,0 +1,183 @@ +/** + * Tests for the "orphaned bead_cancelled events retried forever" bug. + * + * Two independent fixes compose to eliminate the failure: + * Fix 1: deleteBead / deleteBeads purge town_events rows that reference + * the deleted bead (by bead_id or agent_id), so the drain loop + * never sees them. + * Fix 2a: reconciler.applyEvent('bead_cancelled') tolerates the bead + * being missing (returns early, logs warn — does not throw). + * Fix 2b: the Town.do.ts drain loop recognises "Bead/Agent ... not + * found" terminal errors and marks the offending event + * processed so it is not retried forever. + */ + +import { env, runDurableObjectAlarm } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name: string) { + return env.TOWN.get(env.TOWN.idFromName(name)); +} + +describe('town_events cleanup on bead deletion (#fix-1)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evcleanup-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('deleteBead removes pending town_events referencing the bead by bead_id', async () => { + const bead = await town.createBead({ + type: 'issue', + title: 'To be deleted', + rig_id: 'rig-1', + }); + + // Transitioning to a terminal status enqueues a bead_cancelled event. + await town.updateBeadStatus(bead.bead_id, 'failed', 'system'); + + const pendingBefore = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + processed_at: string | null; + }>; + expect( + pendingBefore.filter(e => e.bead_id === bead.bead_id && e.processed_at === null).length + ).toBeGreaterThan(0); + + await town.deleteBead(bead.bead_id); + + const pendingAfter = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + agent_id: string | null; + }>; + expect( + pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id) + ).toBe(false); + }); + + it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P1', + identity: `ev-agent-${townName}`, + rig_id: 'rig-1', + }); + + // Upsert a container_status event keyed by agent_id — this is the shape + // of events that hang off an agent's bead row. + await town.debugRecordContainerStatus(agent.id, { status: 'running' }); + + const beforeRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(beforeRows.some(e => e.agent_id === agent.id)).toBe(true); + + // deleteBead is used for agents too (agents are beads). + await town.deleteBead(agent.id); + + const afterRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(afterRows.some(e => e.agent_id === agent.id)).toBe(false); + }); + + it('deleteBeads bulk path removes events for every deleted bead', async () => { + const a = await town.createBead({ type: 'issue', title: 'A', rig_id: 'rig-1' }); + const b = await town.createBead({ type: 'issue', title: 'B', rig_id: 'rig-1' }); + + await town.updateBeadStatus(a.bead_id, 'failed', 'system'); + await town.updateBeadStatus(b.bead_id, 'failed', 'system'); + + const before = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(before.filter(e => e.bead_id === a.bead_id).length).toBeGreaterThan(0); + expect(before.filter(e => e.bead_id === b.bead_id).length).toBeGreaterThan(0); + + await town.deleteBeads([a.bead_id, b.bead_id]); + + const after = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(after.some(e => e.bead_id === a.bead_id)).toBe(false); + expect(after.some(e => e.bead_id === b.bead_id)).toBe(false); + }); +}); + +describe('applyEvent tolerance + drain loop marks missing-entity events processed (#fix-2)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evtolerate-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('drain loop marks a bead_cancelled event processed when the bead is gone', async () => { + // Simulate the historical orphan: enqueue a bead_cancelled event whose + // bead has been deleted (or never existed). Before Fix 2, applyEvent + // would throw `Bead not found` forever on every alarm tick. + await town.debugInsertTownEvent({ + event_type: 'bead_cancelled', + bead_id: '00000000-0000-4000-8000-000000000001', + payload: { cancel_status: 'failed' }, + }); + + const beforeDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphan = beforeDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + expect(orphan?.processed_at).toBeNull(); + + await runDurableObjectAlarm(town); + + // After the alarm, the orphan event should be processed — not retried. + const afterDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = afterDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + // If retention GC already pruned it, that's also acceptable — the key + // invariant is that it is no longer pending. + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); + + it('drain loop marks an agent-missing event processed too', async () => { + await town.debugInsertTownEvent({ + event_type: 'agent_done', + agent_id: '00000000-0000-4000-8000-0000000000aa', + payload: { branch: 'gt/ghost' }, + }); + + await runDurableObjectAlarm(town); + + const after = (await town.debugTownEvents()) as Array<{ + event_type: string; + agent_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = after.find( + e => e.event_type === 'agent_done' && e.agent_id === '00000000-0000-4000-8000-0000000000aa' + ); + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); +});