Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions services/gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1670,14 +1670,6 @@ export class TownDO extends DurableObject<Env> {
await this.escalateToActiveCadence();
}

async popReviewQueue(): Promise<ReviewQueueEntry | null> {
return reviewQueue.popReviewQueue(this.sql);
}

async completeReview(entryId: string, status: 'merged' | 'failed'): Promise<void> {
reviewQueue.completeReview(this.sql, entryId, status);
}

async completeReviewWithResult(input: {
Comment thread
jrf0110 marked this conversation as resolved.
entry_id: string;
status: 'merged' | 'failed' | 'conflict';
Expand Down Expand Up @@ -1712,10 +1704,9 @@ export class TownDO extends DurableObject<Env> {
});
}

// Rework is handled by the normal scheduling path: the failed/conflict
// Rework is handled by the reconciler's scheduling path: the failed/conflict
// path in completeReviewWithResult sets the source bead to 'open' with
// assignee cleared. feedStrandedConvoys or rehookOrphanedBeads will
// hook a polecat, and schedulePendingWork will dispatch it.
// assignee cleared. The reconciler will hook a polecat and dispatch it.
}

async agentDone(agentId: string, input: AgentDoneInput): Promise<void> {
Expand Down Expand Up @@ -3558,9 +3549,9 @@ export class TownDO extends DurableObject<Env> {
}

// ── Pre-phase: Observe container status for working agents ────────
// Replaces witnessPatrol's zombie detection. Poll the container for
// each working/stalled agent and emit container_status events. These
// are drained in Phase 0 and applied before reconciliation.
// Poll the container for each working/stalled agent and emit
// container_status events. These are drained in Phase 0 and applied
// before reconciliation.
try {
const workingAgentRows = z
.object({ bead_id: z.string() })
Expand Down Expand Up @@ -4487,8 +4478,8 @@ export class TownDO extends DurableObject<Env> {

// Only count idle+hooked agents as orphaned if they've been idle for
// longer than the dispatch cooldown. Agents that were just hooked by
// feedStrandedConvoys or restarted with backoff are legitimately
// waiting for the next scheduler tick.
// the reconciler or restarted with backoff are legitimately waiting
// for the next scheduler tick.
const orphanedHooks = Number(
[
...query(
Expand Down
4 changes: 2 additions & 2 deletions services/gastown/src/dos/town/beads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ export function updateConvoyProgress(sql: SqlStorage, beadId: string, timestamp:

if (featureBranch && mergeMode === 'review-then-land') {
// Mark the convoy as ready to land by storing a flag in metadata.
// The alarm loop's processReviewQueue will detect this and create
// the final landing MR (feature branch → main).
// The reconciler will detect this and create the final landing
// MR (feature branch → main).
query(
sql,
/* sql */ `
Expand Down
8 changes: 4 additions & 4 deletions services/gastown/src/dos/town/container-dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,12 @@ export async function checkAgentContainerStatus(
signal: AbortSignal.timeout(10_000),
});
// 404 means the container is running but has no record of this agent
// (e.g. after container eviction). Report as 'not_found' so
// witnessPatrol can immediately reset and redispatch the agent
// (e.g. after container eviction). Report as 'not_found' so the
// reconciler can immediately reset and redispatch the agent
// instead of waiting for the 2-hour GUPP timeout.
if (response.status === 404) return { status: 'not_found' };
// Non-OK but not 404 — container is having issues but may still
// have the agent running. Return 'unknown' so witnessPatrol doesn't
// have the agent running. Return 'unknown' so the reconciler doesn't
// falsely reset a working agent.
if (!response.ok) return { status: 'unknown' };
const data: unknown = await response.json();
Expand All @@ -668,7 +668,7 @@ export async function checkAgentContainerStatus(
return { status: 'unknown' };
} catch {
// Timeout, network error, or container starting up — return
// 'unknown' so witnessPatrol doesn't falsely reset working agents.
// 'unknown' so the reconciler doesn't falsely reset working agents.
// True zombies will be caught after repeated 'unknown' results
// once the GIPP/heartbeat timeout expires.
return { status: 'unknown' };
Expand Down
4 changes: 2 additions & 2 deletions services/gastown/src/dos/town/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ export function reconcileAgents(sql: SqlStorage, opts?: { draining?: boolean }):
// Agent is working with fresh heartbeat but no hook — it's running
// in the container but has no bead to work on (gt_done already ran,
// or the hook was cleared by another code path). Set to idle so
// processReviewQueue / schedulePendingWork can use it.
// the reconciler can dispatch it to new work.
actions.push({
type: 'transition_agent',
agent_id: agent.bead_id,
Expand Down Expand Up @@ -810,7 +810,7 @@ export function reconcileBeads(
});
}

// Rule 2: Idle agents with hooks need dispatch (schedulePendingWork equivalent)
// Rule 2: Idle agents with hooks need dispatch
const idleHooked = AgentRow.array().parse([
...query(
sql,
Expand Down
83 changes: 17 additions & 66 deletions services/gastown/src/dos/town/review-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,53 +208,6 @@ export function submitToReviewQueue(sql: SqlStorage, input: ReviewQueueInput): v
});
}

export function popReviewQueue(sql: SqlStorage): ReviewQueueEntry | null {
// Pop the oldest open MR bead, but skip any whose source bead already
// has another MR in_progress (i.e. a refinery is already reviewing it).
// This prevents popping stale MR beads and triggering failReviewWithRework
// while an active review is in flight for the same source.
//
// The source bead is linked via bead_dependencies (dependency_type='tracks'):
// bead_dependencies.bead_id = MR bead
// bead_dependencies.depends_on_bead_id = source bead
const rows = [
...query(
sql,
/* sql */ `
${REVIEW_JOIN}
WHERE ${beads.status} = 'open'
AND NOT EXISTS (
SELECT 1 FROM ${beads} AS active_mr
WHERE active_mr.${beads.columns.type} = 'merge_request'
AND active_mr.${beads.columns.status} = 'in_progress'
AND active_mr.${beads.columns.rig_id} = ${beads.rig_id}
)
ORDER BY ${beads.created_at} ASC
LIMIT 1
`,
[]
),
];

if (rows.length === 0) return null;
const parsed = MergeRequestBeadRecord.parse(rows[0]);
const entry = toReviewQueueEntry(parsed);

// Mark as running (in_progress)
query(
sql,
/* sql */ `
UPDATE ${beads}
SET ${beads.columns.status} = 'in_progress',
${beads.columns.updated_at} = ?
WHERE ${beads.bead_id} = ?
`,
[now(), entry.id]
);

return { ...entry, status: 'running', processed_at: now() };
}

export function completeReview(
sql: SqlStorage,
entryId: string,
Expand Down Expand Up @@ -369,8 +322,8 @@ export function completeReviewWithResult(
conflict: true,
},
});
// Return source bead to open so the normal scheduling path handles
// rework. Clear assignee so feedStrandedConvoys can match.
// Return source bead to open so the reconciler's scheduling path handles
// rework. Clear assignee so the reconciler can match it for dispatch.
const conflictSourceBead = getBead(sql, entry.bead_id);
if (
conflictSourceBead &&
Expand All @@ -390,11 +343,10 @@ export function completeReviewWithResult(
}
} else if (input.status === 'failed') {
// Review failed (rework requested): return source bead to open so
// the normal scheduling path (feedStrandedConvoys → hookBead →
// schedulePendingWork → dispatch) handles rework. Clear the stale
// assignee so feedStrandedConvoys can match (requires assignee IS NULL).
// This avoids the fire-and-forget rework dispatch race in TownDO
// where the dispatch fails and rehookOrphanedBeads churn.
// the reconciler's scheduling path handles rework. Clear the stale
// assignee so the reconciler can match it for dispatch (requires
// assignee IS NULL). This avoids a fire-and-forget rework dispatch
// race where the dispatch fails and the bead churns.
const sourceBead = getBead(sql, entry.bead_id);
if (sourceBead && sourceBead.status !== 'closed' && sourceBead.status !== 'failed') {
updateBeadStatus(sql, entry.bead_id, 'open', entry.agent_id);
Expand Down Expand Up @@ -498,9 +450,8 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu
const agent = getAgent(sql, agentId);
if (!agent) throw new Error(`Agent ${agentId} not found`);
if (!agent.current_hook_bead_id) {
// The agent was unhooked by a recovery path (witnessPatrol,
// rehookOrphanedBeads) between when the agent finished work and
// when it called gt_done.
// The agent was unhooked by a recovery path between when the agent
// finished work and when it called gt_done.
//
// For refineries, this is critical: the refinery successfully merged
// but the hook was cleared by zombie detection. We MUST still complete
Expand Down Expand Up @@ -648,9 +599,9 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu

unhookBead(sql, agentId);
// Set refinery to idle immediately — the review is done and the
// refinery is available for new work. Without this, processReviewQueue
// sees the refinery as 'working' and won't pop the next MR bead until
// agentCompleted fires (when the container process eventually exits).
// refinery is available for new work. Without this, the reconciler
// sees the refinery as 'working' and won't dispatch the next MR bead
// until agentCompleted fires (when the container process eventually exits).
updateAgentStatus(sql, agentId, 'idle');
return;
}
Expand All @@ -659,7 +610,7 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu

if (!agent.rig_id) {
console.warn(
`[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in processReviewQueue`
`[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in submitToReviewQueue`
);
}

Expand Down Expand Up @@ -718,13 +669,13 @@ export function agentCompleted(
// NEVER fail or unhook a refinery from agentCompleted.
// agentCompleted races with gt_done: the process exits, the
// container sends /completed, but gt_done's HTTP request may
// still be in flight. If we unhook here, recoverStuckReviews
// can fire between agentCompleted and gt_done, resetting the
// MR bead that's about to be closed by gt_done.
// still be in flight. If we unhook here, a recovery path can
// fire between agentCompleted and gt_done, resetting the MR bead
// that's about to be closed by gt_done.
//
// Leave the hook intact. gt_done will close + unhook if the
// merge succeeded. recoverStuckReviews (which checks for
// status='working') handles the case where gt_done never arrives.
// merge succeeded. The reconciler (which checks for status='working')
// handles the case where gt_done never arrives.
//
// No-op for the bead — just fall through to mark agent idle.
} else {
Expand Down
23 changes: 0 additions & 23 deletions services/gastown/test/integration/review-failure.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,29 +182,6 @@ describe('Review failure paths — convoy progress and source bead recovery', ()
});
});

// ── Direct completeReview leaves source bead orphaned (regression) ─

describe('completeReview bypass (regression guard)', () => {
it('should leave source bead stuck in in_review when completeReview is called directly', async () => {
const { beadId, mrBeadId } = await setupConvoyWithMR();

// Call completeReview directly (the OLD broken path) —
// this is the raw SQL update that bypasses lifecycle events.
// We use this to verify the regression scenario.
await town.completeReview(mrBeadId, 'failed');

// MR bead should be failed
const mrBead = await town.getBeadAsync(mrBeadId);
expect(mrBead?.status).toBe('failed');

// Source bead is STILL in_review — this is the bug this PR fixes
// in processReviewQueue. The direct completeReview call doesn't
// return the source bead to in_progress.
const sourceBead = await town.getBeadAsync(beadId);
expect(sourceBead?.status).toBe('in_review');
});
});

// ── Source bead in_review after agentDone ──────────────────────────

describe('agentDone transitions source bead to in_review', () => {
Expand Down
14 changes: 8 additions & 6 deletions services/gastown/test/integration/rig-alarm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ describe('Town DO Alarm', () => {
// fail gracefully and mark the review as 'failed'
await runDurableObjectAlarm(town);

// The pending entry should have been popped (no more pending entries)
const nextEntry = await town.popReviewQueue();
expect(nextEntry).toBeNull();
// The MR bead should no longer be open (alarm processed it)
const mrBeads = await town.listBeads({ type: 'merge_request' });
expect(mrBeads).toHaveLength(1);
expect(mrBeads[0].status).not.toBe('open');
});
});

Expand Down Expand Up @@ -293,9 +294,10 @@ describe('Town DO Alarm', () => {
// (will fail at container level but that's expected in tests)
await runDurableObjectAlarm(town);

// Review queue entry should have been popped and processed (failed in test env)
const reviewEntry = await town.popReviewQueue();
expect(reviewEntry).toBeNull();
// MR bead should have been picked up and processed (failed in test env)
const mrBeads = await town.listBeads({ type: 'merge_request' });
expect(mrBeads).toHaveLength(1);
expect(mrBeads[0].status).not.toBe('open');
});
});
});
Loading