diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index 75a654ccf1..f274cebcfa 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -1670,14 +1670,6 @@ export class TownDO extends DurableObject { await this.escalateToActiveCadence(); } - async popReviewQueue(): Promise { - return reviewQueue.popReviewQueue(this.sql); - } - - async completeReview(entryId: string, status: 'merged' | 'failed'): Promise { - reviewQueue.completeReview(this.sql, entryId, status); - } - async completeReviewWithResult(input: { entry_id: string; status: 'merged' | 'failed' | 'conflict'; @@ -1712,10 +1704,9 @@ export class TownDO extends DurableObject { }); } - // Rework is handled by the normal scheduling path: the failed/conflict + // Rework is handled by the reconciler's scheduling path: the failed/conflict // path in completeReviewWithResult sets the source bead to 'open' with - // assignee cleared. feedStrandedConvoys or rehookOrphanedBeads will - // hook a polecat, and schedulePendingWork will dispatch it. + // assignee cleared. The reconciler will hook a polecat and dispatch it. } async agentDone(agentId: string, input: AgentDoneInput): Promise { @@ -3558,9 +3549,9 @@ export class TownDO extends DurableObject { } // ── Pre-phase: Observe container status for working agents ──────── - // Replaces witnessPatrol's zombie detection. Poll the container for - // each working/stalled agent and emit container_status events. These - // are drained in Phase 0 and applied before reconciliation. + // Poll the container for each working/stalled agent and emit + // container_status events. These are drained in Phase 0 and applied + // before reconciliation. try { const workingAgentRows = z .object({ bead_id: z.string() }) @@ -4487,8 +4478,8 @@ export class TownDO extends DurableObject { // Only count idle+hooked agents as orphaned if they've been idle for // longer than the dispatch cooldown. Agents that were just hooked by - // feedStrandedConvoys or restarted with backoff are legitimately - // waiting for the next scheduler tick. + // the reconciler or restarted with backoff are legitimately waiting + // for the next scheduler tick. const orphanedHooks = Number( [ ...query( diff --git a/services/gastown/src/dos/town/beads.ts b/services/gastown/src/dos/town/beads.ts index 25d76d8a3b..1f1409c337 100644 --- a/services/gastown/src/dos/town/beads.ts +++ b/services/gastown/src/dos/town/beads.ts @@ -421,8 +421,8 @@ export function updateConvoyProgress(sql: SqlStorage, beadId: string, timestamp: if (featureBranch && mergeMode === 'review-then-land') { // Mark the convoy as ready to land by storing a flag in metadata. - // The alarm loop's processReviewQueue will detect this and create - // the final landing MR (feature branch → main). + // The reconciler will detect this and create the final landing + // MR (feature branch → main). query( sql, /* sql */ ` diff --git a/services/gastown/src/dos/town/container-dispatch.ts b/services/gastown/src/dos/town/container-dispatch.ts index e113bc4174..f89300abbd 100644 --- a/services/gastown/src/dos/town/container-dispatch.ts +++ b/services/gastown/src/dos/town/container-dispatch.ts @@ -647,12 +647,12 @@ export async function checkAgentContainerStatus( signal: AbortSignal.timeout(10_000), }); // 404 means the container is running but has no record of this agent - // (e.g. after container eviction). Report as 'not_found' so - // witnessPatrol can immediately reset and redispatch the agent + // (e.g. after container eviction). Report as 'not_found' so the + // reconciler can immediately reset and redispatch the agent // instead of waiting for the 2-hour GUPP timeout. if (response.status === 404) return { status: 'not_found' }; // Non-OK but not 404 — container is having issues but may still - // have the agent running. Return 'unknown' so witnessPatrol doesn't + // have the agent running. Return 'unknown' so the reconciler doesn't // falsely reset a working agent. if (!response.ok) return { status: 'unknown' }; const data: unknown = await response.json(); @@ -668,7 +668,7 @@ export async function checkAgentContainerStatus( return { status: 'unknown' }; } catch { // Timeout, network error, or container starting up — return - // 'unknown' so witnessPatrol doesn't falsely reset working agents. + // 'unknown' so the reconciler doesn't falsely reset working agents. // True zombies will be caught after repeated 'unknown' results // once the GIPP/heartbeat timeout expires. return { status: 'unknown' }; diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index a7bfcc9acb..111473fd4e 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -539,7 +539,7 @@ export function reconcileAgents(sql: SqlStorage, opts?: { draining?: boolean }): // Agent is working with fresh heartbeat but no hook — it's running // in the container but has no bead to work on (gt_done already ran, // or the hook was cleared by another code path). Set to idle so - // processReviewQueue / schedulePendingWork can use it. + // the reconciler can dispatch it to new work. actions.push({ type: 'transition_agent', agent_id: agent.bead_id, @@ -810,7 +810,7 @@ export function reconcileBeads( }); } - // Rule 2: Idle agents with hooks need dispatch (schedulePendingWork equivalent) + // Rule 2: Idle agents with hooks need dispatch const idleHooked = AgentRow.array().parse([ ...query( sql, diff --git a/services/gastown/src/dos/town/review-queue.ts b/services/gastown/src/dos/town/review-queue.ts index e25819107b..da6402496c 100644 --- a/services/gastown/src/dos/town/review-queue.ts +++ b/services/gastown/src/dos/town/review-queue.ts @@ -208,53 +208,6 @@ export function submitToReviewQueue(sql: SqlStorage, input: ReviewQueueInput): v }); } -export function popReviewQueue(sql: SqlStorage): ReviewQueueEntry | null { - // Pop the oldest open MR bead, but skip any whose source bead already - // has another MR in_progress (i.e. a refinery is already reviewing it). - // This prevents popping stale MR beads and triggering failReviewWithRework - // while an active review is in flight for the same source. - // - // The source bead is linked via bead_dependencies (dependency_type='tracks'): - // bead_dependencies.bead_id = MR bead - // bead_dependencies.depends_on_bead_id = source bead - const rows = [ - ...query( - sql, - /* sql */ ` - ${REVIEW_JOIN} - WHERE ${beads.status} = 'open' - AND NOT EXISTS ( - SELECT 1 FROM ${beads} AS active_mr - WHERE active_mr.${beads.columns.type} = 'merge_request' - AND active_mr.${beads.columns.status} = 'in_progress' - AND active_mr.${beads.columns.rig_id} = ${beads.rig_id} - ) - ORDER BY ${beads.created_at} ASC - LIMIT 1 - `, - [] - ), - ]; - - if (rows.length === 0) return null; - const parsed = MergeRequestBeadRecord.parse(rows[0]); - const entry = toReviewQueueEntry(parsed); - - // Mark as running (in_progress) - query( - sql, - /* sql */ ` - UPDATE ${beads} - SET ${beads.columns.status} = 'in_progress', - ${beads.columns.updated_at} = ? - WHERE ${beads.bead_id} = ? - `, - [now(), entry.id] - ); - - return { ...entry, status: 'running', processed_at: now() }; -} - export function completeReview( sql: SqlStorage, entryId: string, @@ -369,8 +322,8 @@ export function completeReviewWithResult( conflict: true, }, }); - // Return source bead to open so the normal scheduling path handles - // rework. Clear assignee so feedStrandedConvoys can match. + // Return source bead to open so the reconciler's scheduling path handles + // rework. Clear assignee so the reconciler can match it for dispatch. const conflictSourceBead = getBead(sql, entry.bead_id); if ( conflictSourceBead && @@ -390,11 +343,10 @@ export function completeReviewWithResult( } } else if (input.status === 'failed') { // Review failed (rework requested): return source bead to open so - // the normal scheduling path (feedStrandedConvoys → hookBead → - // schedulePendingWork → dispatch) handles rework. Clear the stale - // assignee so feedStrandedConvoys can match (requires assignee IS NULL). - // This avoids the fire-and-forget rework dispatch race in TownDO - // where the dispatch fails and rehookOrphanedBeads churn. + // the reconciler's scheduling path handles rework. Clear the stale + // assignee so the reconciler can match it for dispatch (requires + // assignee IS NULL). This avoids a fire-and-forget rework dispatch + // race where the dispatch fails and the bead churns. const sourceBead = getBead(sql, entry.bead_id); if (sourceBead && sourceBead.status !== 'closed' && sourceBead.status !== 'failed') { updateBeadStatus(sql, entry.bead_id, 'open', entry.agent_id); @@ -498,9 +450,8 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu const agent = getAgent(sql, agentId); if (!agent) throw new Error(`Agent ${agentId} not found`); if (!agent.current_hook_bead_id) { - // The agent was unhooked by a recovery path (witnessPatrol, - // rehookOrphanedBeads) between when the agent finished work and - // when it called gt_done. + // The agent was unhooked by a recovery path between when the agent + // finished work and when it called gt_done. // // For refineries, this is critical: the refinery successfully merged // but the hook was cleared by zombie detection. We MUST still complete @@ -648,9 +599,9 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu unhookBead(sql, agentId); // Set refinery to idle immediately — the review is done and the - // refinery is available for new work. Without this, processReviewQueue - // sees the refinery as 'working' and won't pop the next MR bead until - // agentCompleted fires (when the container process eventually exits). + // refinery is available for new work. Without this, the reconciler + // sees the refinery as 'working' and won't dispatch the next MR bead + // until agentCompleted fires (when the container process eventually exits). updateAgentStatus(sql, agentId, 'idle'); return; } @@ -659,7 +610,7 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu if (!agent.rig_id) { console.warn( - `[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in processReviewQueue` + `[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in submitToReviewQueue` ); } @@ -718,13 +669,13 @@ export function agentCompleted( // NEVER fail or unhook a refinery from agentCompleted. // agentCompleted races with gt_done: the process exits, the // container sends /completed, but gt_done's HTTP request may - // still be in flight. If we unhook here, recoverStuckReviews - // can fire between agentCompleted and gt_done, resetting the - // MR bead that's about to be closed by gt_done. + // still be in flight. If we unhook here, a recovery path can + // fire between agentCompleted and gt_done, resetting the MR bead + // that's about to be closed by gt_done. // // Leave the hook intact. gt_done will close + unhook if the - // merge succeeded. recoverStuckReviews (which checks for - // status='working') handles the case where gt_done never arrives. + // merge succeeded. The reconciler (which checks for status='working') + // handles the case where gt_done never arrives. // // No-op for the bead — just fall through to mark agent idle. } else { diff --git a/services/gastown/test/integration/review-failure.test.ts b/services/gastown/test/integration/review-failure.test.ts index d5b7773c00..8fcb07cb0e 100644 --- a/services/gastown/test/integration/review-failure.test.ts +++ b/services/gastown/test/integration/review-failure.test.ts @@ -182,29 +182,6 @@ describe('Review failure paths — convoy progress and source bead recovery', () }); }); - // ── Direct completeReview leaves source bead orphaned (regression) ─ - - describe('completeReview bypass (regression guard)', () => { - it('should leave source bead stuck in in_review when completeReview is called directly', async () => { - const { beadId, mrBeadId } = await setupConvoyWithMR(); - - // Call completeReview directly (the OLD broken path) — - // this is the raw SQL update that bypasses lifecycle events. - // We use this to verify the regression scenario. - await town.completeReview(mrBeadId, 'failed'); - - // MR bead should be failed - const mrBead = await town.getBeadAsync(mrBeadId); - expect(mrBead?.status).toBe('failed'); - - // Source bead is STILL in_review — this is the bug this PR fixes - // in processReviewQueue. The direct completeReview call doesn't - // return the source bead to in_progress. - const sourceBead = await town.getBeadAsync(beadId); - expect(sourceBead?.status).toBe('in_review'); - }); - }); - // ── Source bead in_review after agentDone ────────────────────────── describe('agentDone transitions source bead to in_review', () => { diff --git a/services/gastown/test/integration/rig-alarm.test.ts b/services/gastown/test/integration/rig-alarm.test.ts index a80cfc6b5f..1ec79e6962 100644 --- a/services/gastown/test/integration/rig-alarm.test.ts +++ b/services/gastown/test/integration/rig-alarm.test.ts @@ -158,9 +158,10 @@ describe('Town DO Alarm', () => { // fail gracefully and mark the review as 'failed' await runDurableObjectAlarm(town); - // The pending entry should have been popped (no more pending entries) - const nextEntry = await town.popReviewQueue(); - expect(nextEntry).toBeNull(); + // The MR bead should no longer be open (alarm processed it) + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads).toHaveLength(1); + expect(mrBeads[0].status).not.toBe('open'); }); }); @@ -293,9 +294,10 @@ describe('Town DO Alarm', () => { // (will fail at container level but that's expected in tests) await runDurableObjectAlarm(town); - // Review queue entry should have been popped and processed (failed in test env) - const reviewEntry = await town.popReviewQueue(); - expect(reviewEntry).toBeNull(); + // MR bead should have been picked up and processed (failed in test env) + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads).toHaveLength(1); + expect(mrBeads[0].status).not.toBe('open'); }); }); }); diff --git a/services/gastown/test/integration/rig-do.test.ts b/services/gastown/test/integration/rig-do.test.ts index eb22196fd0..221f5bce67 100644 --- a/services/gastown/test/integration/rig-do.test.ts +++ b/services/gastown/test/integration/rig-do.test.ts @@ -356,7 +356,7 @@ describe('TownDO', () => { // ── Review Queue ─────────────────────────────────────────────────────── describe('review queue', () => { - it('should submit to and pop from review queue', async () => { + it('should submit to review queue and create an open merge_request bead', async () => { const agent = await town.registerAgent({ role: 'polecat', name: 'P1', @@ -373,40 +373,12 @@ describe('TownDO', () => { summary: 'Fixed the widget', }); - const entry = await town.popReviewQueue(); - expect(entry).toBeDefined(); - expect(entry?.branch).toBe('feature/fix-widget'); - expect(entry?.pr_url).toBe('https://github.com/org/repo/pull/1'); - expect(entry?.status).toBe('running'); - - // Pop again should return null (nothing pending) - const empty = await town.popReviewQueue(); - expect(empty).toBeNull(); - }); - - it('should complete a review', async () => { - const agent = await town.registerAgent({ - role: 'polecat', - name: 'P1', - identity: `complete-review-${townName}`, - }); - const bead = await town.createBead({ type: 'issue', title: 'Review complete' }); - - await town.submitToReviewQueue({ - agent_id: agent.id, - bead_id: bead.bead_id, - rig_id: 'test-rig', - branch: 'feature/fix', - }); - - const entry = await town.popReviewQueue(); - expect(entry).toBeDefined(); - - await town.completeReview(entry!.id, 'merged'); - - // Pop again should be null - const empty = await town.popReviewQueue(); - expect(empty).toBeNull(); + // submitToReviewQueue creates an open merge_request bead + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads).toHaveLength(1); + expect(mrBeads[0].status).toBe('open'); + expect(mrBeads[0].metadata?.pr_url).toBe('https://github.com/org/repo/pull/1'); + expect(mrBeads[0].metadata?.source_bead_id).toBe(bead.bead_id); }); it('should close bead on successful merge via completeReviewWithResult', async () => { @@ -424,11 +396,12 @@ describe('TownDO', () => { branch: 'feature/merge-test', }); - const entry = await town.popReviewQueue(); - expect(entry).toBeDefined(); + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads).toHaveLength(1); + const mrBeadId = mrBeads[0].bead_id; await town.completeReviewWithResult({ - entry_id: entry!.id, + entry_id: mrBeadId, status: 'merged', message: 'Merge successful', commit_sha: 'abc123', @@ -439,9 +412,9 @@ describe('TownDO', () => { expect(updatedBead?.status).toBe('closed'); expect(updatedBead?.closed_at).toBeDefined(); - // Review queue should be empty - const empty = await town.popReviewQueue(); - expect(empty).toBeNull(); + // MR bead should be closed + const updatedMr = await town.getBeadAsync(mrBeadId); + expect(updatedMr?.status).toBe('closed'); }); it('should create escalation bead on merge conflict via completeReviewWithResult', async () => { @@ -459,11 +432,12 @@ describe('TownDO', () => { branch: 'feature/conflict-test', }); - const entry = await town.popReviewQueue(); - expect(entry).toBeDefined(); + const mrBeads = await town.listBeads({ type: 'merge_request' }); + expect(mrBeads).toHaveLength(1); + const mrBeadId = mrBeads[0].bead_id; await town.completeReviewWithResult({ - entry_id: entry!.id, + entry_id: mrBeadId, status: 'conflict', message: 'CONFLICT (content): Merge conflict in src/index.ts', }); @@ -484,9 +458,9 @@ describe('TownDO', () => { agent_id: agent.id, }); - // Review queue entry should be marked as failed - const empty = await town.popReviewQueue(); - expect(empty).toBeNull(); + // MR bead should be marked as failed + const updatedMr = await town.getBeadAsync(mrBeadId); + expect(updatedMr?.status).toBe('failed'); }); });