Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions services/gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ import { agent_metadata } from '../db/tables/agent-metadata.table';
import { escalation_metadata } from '../db/tables/escalation-metadata.table';
import { convoy_metadata } from '../db/tables/convoy-metadata.table';
import { bead_dependencies } from '../db/tables/bead-dependencies.table';
import { town_events, TownEventRecord } from '../db/tables/town-events.table';
import {
town_events,
TownEventRecord,
type TownEventType,
} from '../db/tables/town-events.table';
import {
agent_nudges,
AgentNudgeRecord,
Expand Down Expand Up @@ -3896,15 +3900,28 @@ export class TownDO extends DurableObject<Env> {
reconciler.applyEvent(this.sql, event, { townConfig });
events.markProcessed(this.sql, event.event_id);
} catch (err) {
logger.error('reconciler: applyEvent failed', {
eventId: event.event_id,
eventType: event.event_type,
error: err instanceof Error ? err.message : String(err),
});
// Event stays unprocessed — will be retried on the next alarm tick.
// Mark it processed anyway after 3 consecutive failures to prevent
// a poison event from blocking the entire queue forever.
// For now, we skip it and let the next tick retry.
const message = err instanceof Error ? err.message : String(err);
// Terminal errors referencing a missing bead/agent can never
// succeed on retry — mark them processed so the drain loop
// stops re-running them every alarm tick.
const isMissingEntity =
err instanceof Error &&
/\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message);
if (isMissingEntity) {
logger.warn('reconciler: applyEvent skipped (missing entity)', {
eventId: event.event_id,
eventType: event.event_type,
error: message,
});
events.markProcessed(this.sql, event.event_id);
} else {
logger.error('reconciler: applyEvent failed', {
eventId: event.event_id,
eventType: event.event_type,
error: message,
});
// Event stays unprocessed — will be retried on the next alarm tick.
}
}
}
} catch (err) {
Expand Down Expand Up @@ -5161,6 +5178,56 @@ export class TownDO extends DurableObject<Env> {
];
}

async debugTownEvents(): Promise<unknown[]> {
return [
...query(
this.sql,
/* sql */ `
SELECT ${town_events.event_id},
${town_events.event_type},
${town_events.agent_id},
${town_events.bead_id},
${town_events.processed_at}
FROM ${town_events}
ORDER BY ${town_events.created_at} ASC
`,
[]
),
];
}

/**
* Test-only helper: directly insert a row into the town_events queue
* without going through the producer APIs. Used to reproduce orphan
* events (referencing deleted beads/agents) in tests.
*/
async debugInsertTownEvent(input: {
event_type: TownEventType;
agent_id?: string | null;
bead_id?: string | null;
payload?: Record<string, unknown>;
}): Promise<string> {
const eventId = events.insertEvent(this.sql, input.event_type, {
agent_id: input.agent_id ?? null,
bead_id: input.bead_id ?? null,
payload: input.payload ?? {},
});
await this.armAlarmIfNeeded();
return eventId;
}

/**
* Test-only helper: insert a container_status event for a given agent.
* Mirrors the container observer's upsert so tests can verify that
* deleteBead sweeps agent-keyed events.
*/
async debugRecordContainerStatus(
agentId: string,
payload: { status: string; exit_reason?: string | null }
): Promise<void> {
events.upsertContainerStatus(this.sql, agentId, payload);
}

async destroy(): Promise<void> {
console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`);

Expand Down
22 changes: 22 additions & 0 deletions services/gastown/src/dos/town/beads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
createTableConvoyMetadata,
migrateConvoyMetadata,
} from '../../db/tables/convoy-metadata.table';
import { town_events } from '../../db/tables/town-events.table';
import { query } from '../../util/query.util';
import type {
CreateBeadInput,
Expand Down Expand Up @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo
beadId,
]);

// Remove any pending/processed reconciler events targeting this bead or
// this agent (agents are themselves beads, so deleteBead is used for both).
// Without this, bead_cancelled / container_status / … events that reference
// a deleted bead make applyEvent throw forever on every alarm tick.
query(
sql,
/* sql */ `DELETE FROM ${town_events}
WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`,
[beadId, beadId]
);

query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]);
return true;
}
Expand Down Expand Up @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string):
...allIdsArr
);

// Remove any reconciler events referencing these beads/agents. See
// deleteBead above for rationale.
sql.exec(
/* sql */ `DELETE FROM ${town_events}
WHERE ${town_events.bead_id} IN (${placeholders})
OR ${town_events.agent_id} IN (${placeholders})`,
...allIdsArr,
...allIdsArr
);

// Delete the beads themselves
sql.exec(
/* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`,
Expand Down
11 changes: 11 additions & 0 deletions services/gastown/src/dos/town/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ export function applyEvent(
console.warn(`${LOG} applyEvent: bead_cancelled missing bead_id`);
return;
}
// Tolerate the bead having been deleted after the event was enqueued.
// Without this guard updateBeadStatus throws `Bead <id> not found`,
// the drain loop can't mark the event processed, and the error
// recurs on every alarm tick forever.
const existing = beadOps.getBead(sql, event.bead_id);
if (!existing) {
console.warn(
`${LOG} applyEvent: bead_cancelled target bead ${event.bead_id} no longer exists — skipping`
);
return;
}
const cancelStatus =
payload.cancel_status === 'closed' || payload.cancel_status === 'failed'
? payload.cancel_status
Expand Down
183 changes: 183 additions & 0 deletions services/gastown/test/integration/event-cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Tests for the "orphaned bead_cancelled events retried forever" bug.
*
* Two independent fixes compose to eliminate the failure:
* Fix 1: deleteBead / deleteBeads purge town_events rows that reference
* the deleted bead (by bead_id or agent_id), so the drain loop
* never sees them.
* Fix 2a: reconciler.applyEvent('bead_cancelled') tolerates the bead
* being missing (returns early, logs warn — does not throw).
* Fix 2b: the Town.do.ts drain loop recognises "Bead/Agent ... not
* found" terminal errors and marks the offending event
* processed so it is not retried forever.
*/

import { env, runDurableObjectAlarm } from 'cloudflare:test';
import { describe, it, expect, beforeEach } from 'vitest';

function getTownStub(name: string) {
return env.TOWN.get(env.TOWN.idFromName(name));
}

describe('town_events cleanup on bead deletion (#fix-1)', () => {
let town: ReturnType<typeof getTownStub>;
let townName: string;

beforeEach(async () => {
townName = `evcleanup-${crypto.randomUUID()}`;
town = getTownStub(townName);
await town.setTownId(townName);
await town.addRig({
rigId: 'rig-1',
name: 'main-rig',
gitUrl: 'https://github.com/test/repo.git',
defaultBranch: 'main',
});
});

it('deleteBead removes pending town_events referencing the bead by bead_id', async () => {
const bead = await town.createBead({
type: 'issue',
title: 'To be deleted',
rig_id: 'rig-1',
});

// Transitioning to a terminal status enqueues a bead_cancelled event.
await town.updateBeadStatus(bead.bead_id, 'failed', 'system');

const pendingBefore = (await town.debugTownEvents()) as Array<{
bead_id: string | null;
processed_at: string | null;
}>;
expect(
pendingBefore.filter(e => e.bead_id === bead.bead_id && e.processed_at === null).length
).toBeGreaterThan(0);

await town.deleteBead(bead.bead_id);

const pendingAfter = (await town.debugTownEvents()) as Array<{
bead_id: string | null;
agent_id: string | null;
}>;
expect(
pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id)
).toBe(false);
});

it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => {
const agent = await town.registerAgent({
role: 'polecat',
name: 'P1',
identity: `ev-agent-${townName}`,
rig_id: 'rig-1',
});

// Upsert a container_status event keyed by agent_id — this is the shape
// of events that hang off an agent's bead row.
await town.debugRecordContainerStatus(agent.id, { status: 'running' });

const beforeRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>;
expect(beforeRows.some(e => e.agent_id === agent.id)).toBe(true);

// deleteBead is used for agents too (agents are beads).
await town.deleteBead(agent.id);

const afterRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>;
expect(afterRows.some(e => e.agent_id === agent.id)).toBe(false);
});

it('deleteBeads bulk path removes events for every deleted bead', async () => {
const a = await town.createBead({ type: 'issue', title: 'A', rig_id: 'rig-1' });
const b = await town.createBead({ type: 'issue', title: 'B', rig_id: 'rig-1' });

await town.updateBeadStatus(a.bead_id, 'failed', 'system');
await town.updateBeadStatus(b.bead_id, 'failed', 'system');

const before = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>;
expect(before.filter(e => e.bead_id === a.bead_id).length).toBeGreaterThan(0);
expect(before.filter(e => e.bead_id === b.bead_id).length).toBeGreaterThan(0);

await town.deleteBeads([a.bead_id, b.bead_id]);

const after = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>;
expect(after.some(e => e.bead_id === a.bead_id)).toBe(false);
expect(after.some(e => e.bead_id === b.bead_id)).toBe(false);
});
});

describe('applyEvent tolerance + drain loop marks missing-entity events processed (#fix-2)', () => {
let town: ReturnType<typeof getTownStub>;
let townName: string;

beforeEach(async () => {
townName = `evtolerate-${crypto.randomUUID()}`;
town = getTownStub(townName);
await town.setTownId(townName);
await town.addRig({
rigId: 'rig-1',
name: 'main-rig',
gitUrl: 'https://github.com/test/repo.git',
defaultBranch: 'main',
});
});

it('drain loop marks a bead_cancelled event processed when the bead is gone', async () => {
// Simulate the historical orphan: enqueue a bead_cancelled event whose
// bead has been deleted (or never existed). Before Fix 2, applyEvent
// would throw `Bead <id> not found` forever on every alarm tick.
await town.debugInsertTownEvent({
event_type: 'bead_cancelled',
bead_id: '00000000-0000-4000-8000-000000000001',
payload: { cancel_status: 'failed' },
});

const beforeDrain = (await town.debugTownEvents()) as Array<{
event_type: string;
bead_id: string | null;
processed_at: string | null;
}>;
const orphan = beforeDrain.find(
e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001'
);
expect(orphan?.processed_at).toBeNull();

await runDurableObjectAlarm(town);

// After the alarm, the orphan event should be processed — not retried.
const afterDrain = (await town.debugTownEvents()) as Array<{
event_type: string;
bead_id: string | null;
processed_at: string | null;
}>;
const orphanAfter = afterDrain.find(
e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001'
);
// If retention GC already pruned it, that's also acceptable — the key
// invariant is that it is no longer pending.
if (orphanAfter) {
expect(orphanAfter.processed_at).not.toBeNull();
}
});

it('drain loop marks an agent-missing event processed too', async () => {
await town.debugInsertTownEvent({
event_type: 'agent_done',
agent_id: '00000000-0000-4000-8000-0000000000aa',
payload: { branch: 'gt/ghost' },
});

await runDurableObjectAlarm(town);

const after = (await town.debugTownEvents()) as Array<{
event_type: string;
agent_id: string | null;
processed_at: string | null;
}>;
const orphanAfter = after.find(
e => e.event_type === 'agent_done' && e.agent_id === '00000000-0000-4000-8000-0000000000aa'
);
if (orphanAfter) {
expect(orphanAfter.processed_at).not.toBeNull();
}
});
});