diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index c717ce364e..c1f9a3d593 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1446,8 +1446,14 @@ export class TownDO extends DurableObject { const entry = reviewQueue.popReviewQueue(this.sql); if (!entry) return; - const rigList = rigs.listRigs(this.sql); - const rigId = rigList[0]?.id ?? ''; + // Resolve rig from the merge_request bead — not rigList[0] which would + // pick the wrong rig in multi-rig towns. + const rigId = entry.rig_id; + if (!rigId) { + console.error(`${TOWN_LOG} processReviewQueue: entry ${entry.id} has no rig_id, skipping`); + reviewQueue.completeReview(this.sql, entry.id, 'failed'); + return; + } const rigConfig = await this.getRigConfig(rigId); if (!rigConfig) { reviewQueue.completeReview(this.sql, entry.id, 'failed'); @@ -1471,7 +1477,10 @@ export class TownDO extends DurableObject { polecatAgentId: entry.agent_id, }); - agents.hookBead(this.sql, refineryAgent.id, entry.bead_id); + // Hook the refinery to the MR bead (entry.id), not the source bead + // (entry.bead_id). The source bead stays closed with its original + // polecat assignee preserved. + agents.hookBead(this.sql, refineryAgent.id, entry.id); const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, { townId: this.townId, @@ -1481,7 +1490,7 @@ export class TownDO extends DurableObject { agentName: refineryAgent.name, role: 'refinery', identity: refineryAgent.identity, - beadId: entry.bead_id, + beadId: entry.id, beadTitle: `Review merge: ${entry.branch} → ${rigConfig.defaultBranch}`, beadBody: entry.summary ?? '', checkpoint: null, diff --git a/cloudflare-gastown/src/dos/town/agents.ts b/cloudflare-gastown/src/dos/town/agents.ts index 79864d6230..890ca26d0a 100644 --- a/cloudflare-gastown/src/dos/town/agents.ts +++ b/cloudflare-gastown/src/dos/town/agents.ts @@ -342,14 +342,14 @@ export function getOrCreateAgent( rigId: string, townId: string ): Agent { - const singletonRoles = ['witness', 'refinery', 'mayor']; + // Town-wide singletons: one per town, not tied to a rig. + const townSingletonRoles = ['witness', 'mayor']; - if (singletonRoles.includes(role)) { - // Try to find an existing agent with this role + if (townSingletonRoles.includes(role)) { const existing = listAgents(sql, { role }); if (existing.length > 0) return existing[0]; } else { - // For polecats, try to find an idle one without a hook in the SAME rig. + // Per-rig agents (polecat, refinery): reuse an idle one in the SAME rig. // Agents are tied to a rig's worktree/repo — reusing one from a different // rig would dispatch it into the wrong repository. const idle = [ @@ -357,13 +357,13 @@ export function getOrCreateAgent( sql, /* sql */ ` ${AGENT_JOIN} - WHERE ${agent_metadata.role} = 'polecat' + WHERE ${agent_metadata.role} = ? AND ${agent_metadata.status} = 'idle' AND ${agent_metadata.current_hook_bead_id} IS NULL AND ${beads.rig_id} = ? LIMIT 1 `, - [rigId] + [role, rigId] ), ]; if (idle.length > 0) return toAgent(AgentBeadRecord.parse(idle[0])); diff --git a/cloudflare-gastown/src/dos/town/beads.ts b/cloudflare-gastown/src/dos/town/beads.ts index fbd10ecc27..4aa991e9cc 100644 --- a/cloudflare-gastown/src/dos/town/beads.ts +++ b/cloudflare-gastown/src/dos/town/beads.ts @@ -169,6 +169,9 @@ export function updateBeadStatus( const bead = getBead(sql, beadId); if (!bead) throw new Error(`Bead ${beadId} not found`); + // No-op if already in the target status — avoids redundant events + if (bead.status === status) return bead; + const oldStatus = bead.status; const timestamp = now(); const closedAt = status === 'closed' ? timestamp : bead.closed_at; diff --git a/cloudflare-gastown/src/dos/town/review-queue.ts b/cloudflare-gastown/src/dos/town/review-queue.ts index a2d27f89f7..7268ab94df 100644 --- a/cloudflare-gastown/src/dos/town/review-queue.ts +++ b/cloudflare-gastown/src/dos/town/review-queue.ts @@ -47,9 +47,15 @@ const REVIEW_JOIN = /* sql */ ` function toReviewQueueEntry(row: MergeRequestBeadRecord): ReviewQueueEntry { return { id: row.bead_id, - agent_id: row.assignee_agent_bead_id ?? row.created_by ?? '', + // The polecat that submitted the review — stored in metadata (not assignee, + // which is set to the refinery when it claims the MR bead via hookBead). + agent_id: + typeof row.metadata?.source_agent_id === 'string' + ? row.metadata.source_agent_id + : (row.created_by ?? ''), bead_id: typeof row.metadata?.source_bead_id === 'string' ? row.metadata.source_bead_id : row.bead_id, + rig_id: row.rig_id ?? '', branch: row.branch, pr_url: row.pr_url, status: @@ -89,19 +95,32 @@ export function submitToReviewQueue(sql: SqlStorage, input: ReviewQueueInput): v 'open', `Review: ${input.branch}`, input.summary ?? null, + input.rig_id, null, - null, - input.agent_id, + null, // assignee left null — refinery claims it via hookBead 'medium', JSON.stringify(['gt:merge-request']), - JSON.stringify({ source_bead_id: input.bead_id }), - input.agent_id, + JSON.stringify({ source_bead_id: input.bead_id, source_agent_id: input.agent_id }), + input.agent_id, // created_by records who submitted timestamp, timestamp, null, ] ); + // Link MR bead → source bead via bead_dependencies so the DAG is queryable + query( + sql, + /* sql */ ` + INSERT INTO ${bead_dependencies} ( + ${bead_dependencies.columns.bead_id}, + ${bead_dependencies.columns.depends_on_bead_id}, + ${bead_dependencies.columns.dependency_type} + ) VALUES (?, ?, 'tracks') + `, + [id, input.bead_id] + ); + // Create the review_metadata satellite query( sql, @@ -255,65 +274,65 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu if (!agent.current_hook_bead_id) throw new Error(`Agent ${agentId} has no hooked bead`); if (agent.role === 'refinery') { - // Refinery agents merge the code themselves then call gt_done. - // Find the in-progress review entry whose source_bead_id matches the - // hooked bead and complete it, which also closes the original bead. - completeReviewForSourceBead(sql, agent.current_hook_bead_id, agentId); + // The refinery is hooked to the MR bead. Mark it as merged and log + // the review_completed event on the source bead. + const mrBeadId = agent.current_hook_bead_id; + completeReviewFromMRBead(sql, mrBeadId, agentId); unhookBead(sql, agentId); return; } + const sourceBead = agent.current_hook_bead_id; + + if (!agent.rig_id) { + console.warn( + `[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in processReviewQueue` + ); + } + submitToReviewQueue(sql, { agent_id: agentId, - bead_id: agent.current_hook_bead_id, + bead_id: sourceBead, + rig_id: agent.rig_id ?? '', branch: input.branch, pr_url: input.pr_url, summary: input.summary, }); + // Close the source bead (matches upstream gt done behavior). The polecat's + // work is done — the MR bead now tracks the merge lifecycle. The source + // bead retains its assignee so we know which agent worked on it. unhookBead(sql, agentId); + closeBead(sql, sourceBead, agentId); } /** - * Find the merge_request bead whose metadata.source_bead_id matches the - * given bead and complete it as 'merged'. Also closes the original bead. - * - * Used when a refinery agent finishes: it has already merged the code - * itself, so we just need to mark the review + source bead as done. + * Complete a review given the MR bead id directly (the refinery is hooked + * to the MR bead). Marks the MR as merged and logs a review_completed + * event on the source bead. The source bead itself is already closed by + * the polecat's agentDone path. */ -function completeReviewForSourceBead(sql: SqlStorage, sourceBeadId: string, agentId: string): void { - // Find the merge_request bead for this source bead (most recent first) - const rows = [ - ...query( - sql, - /* sql */ ` - ${REVIEW_JOIN} - WHERE ${beads.status} IN ('open', 'in_progress') - AND json_extract(${beads.metadata}, '$.source_bead_id') = ? - ORDER BY ${beads.created_at} DESC - LIMIT 1 - `, - [sourceBeadId] - ), - ]; +function completeReviewFromMRBead(sql: SqlStorage, mrBeadId: string, agentId: string): void { + const mrBead = getBead(sql, mrBeadId); + if (!mrBead) { + console.error( + `[review-queue] completeReviewFromMRBead: MR bead ${mrBeadId} not found — data integrity issue` + ); + return; + } + const sourceBeadId = mrBead.metadata?.source_bead_id; - if (rows.length > 0) { - const parsed = MergeRequestBeadRecord.parse(rows[0]); - const entry = toReviewQueueEntry(parsed); - completeReview(sql, entry.id, 'merged'); + completeReview(sql, mrBeadId, 'merged'); + if (typeof sourceBeadId === 'string') { logBeadEvent(sql, { beadId: sourceBeadId, agentId, eventType: 'review_completed', newValue: 'merged', - metadata: { completedBy: 'refinery' }, + metadata: { completedBy: 'refinery', mr_bead_id: mrBeadId }, }); } - - // Close the original bead regardless of whether we found a review entry. - // The refinery confirmed the work is merged — the source bead is done. - closeBead(sql, sourceBeadId, agentId); } /** diff --git a/cloudflare-gastown/src/handlers/rig-review-queue.handler.ts b/cloudflare-gastown/src/handlers/rig-review-queue.handler.ts index 0998a6e2fd..ae275e728c 100644 --- a/cloudflare-gastown/src/handlers/rig-review-queue.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-review-queue.handler.ts @@ -28,7 +28,7 @@ export async function handleSubmitToReviewQueue(c: Context, params: } const townId = c.get('townId'); const town = getTownDOStub(c.env, townId); - await town.submitToReviewQueue(parsed.data); + await town.submitToReviewQueue({ ...parsed.data, rig_id: params.rigId }); return c.json(resSuccess({ submitted: true }), 201); } diff --git a/cloudflare-gastown/src/types.ts b/cloudflare-gastown/src/types.ts index c39d2c76cc..f8bc0b118a 100644 --- a/cloudflare-gastown/src/types.ts +++ b/cloudflare-gastown/src/types.ts @@ -119,6 +119,7 @@ export type ReviewQueueEntry = { id: string; agent_id: string; bead_id: string; + rig_id: string; branch: string; pr_url: string | null; status: ReviewStatus; @@ -130,6 +131,7 @@ export type ReviewQueueEntry = { export type ReviewQueueInput = { agent_id: string; bead_id: string; + rig_id: string; branch: string; pr_url?: string; summary?: string; diff --git a/cloudflare-gastown/test/integration/rig-alarm.test.ts b/cloudflare-gastown/test/integration/rig-alarm.test.ts index effc5f82b0..a80cfc6b5f 100644 --- a/cloudflare-gastown/test/integration/rig-alarm.test.ts +++ b/cloudflare-gastown/test/integration/rig-alarm.test.ts @@ -150,6 +150,7 @@ describe('Town DO Alarm', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/review', }); diff --git a/cloudflare-gastown/test/integration/rig-do.test.ts b/cloudflare-gastown/test/integration/rig-do.test.ts index eed7a872be..38ba3642a2 100644 --- a/cloudflare-gastown/test/integration/rig-do.test.ts +++ b/cloudflare-gastown/test/integration/rig-do.test.ts @@ -367,6 +367,7 @@ describe('TownDO', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/fix-widget', pr_url: 'https://github.com/org/repo/pull/1', summary: 'Fixed the widget', @@ -394,6 +395,7 @@ describe('TownDO', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/fix', }); @@ -418,6 +420,7 @@ describe('TownDO', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/merge-test', }); @@ -452,6 +455,7 @@ describe('TownDO', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/conflict-test', }); @@ -751,6 +755,7 @@ describe('TownDO', () => { await town.submitToReviewQueue({ agent_id: agent.id, bead_id: bead.id, + rig_id: 'test-rig', branch: 'feature/test', }); diff --git a/src/components/gastown/drawer-panels/BeadPanel.tsx b/src/components/gastown/drawer-panels/BeadPanel.tsx index 22b150dee8..664cfe4088 100644 --- a/src/components/gastown/drawer-panels/BeadPanel.tsx +++ b/src/components/gastown/drawer-panels/BeadPanel.tsx @@ -18,6 +18,10 @@ import { GitBranch, ChevronRight, Bot, + Network, + ArrowDownRight, + GitPullRequest, + CircleDot, } from 'lucide-react'; import ReactMarkdown from 'react-markdown'; import remarkGfm from 'remark-gfm'; @@ -66,6 +70,10 @@ export function BeadPanel({ const townId = rigQuery.data?.town_id; + // Build related beads from the flat list (no extra API needed) + const allBeads = beadsQuery.data ?? []; + const relatedBeads = buildRelatedBeads(bead, allBeads); + return (
{/* Title area */} @@ -156,6 +164,46 @@ export function BeadPanel({ )}
+ {/* Related Beads DAG */} + {relatedBeads.length > 0 && ( +
+
+ + + Related Beads + +
+
+ {relatedBeads.map(rel => ( + + ))} +
+
+ )} + {/* Body (markdown) */} {bead.body && bead.body.trim().length > 0 && (
@@ -210,3 +258,57 @@ function MetaCell({
); } + +// ── Related beads DAG ───────────────────────────────────────────────── + +type BeadLike = { + bead_id: string; + type: string; + status: string; + title: string; + parent_bead_id: string | null; + metadata: Record; +}; + +type RelatedBead = { + relation: string; + label: string; + icon: typeof Clock; + bead: BeadLike; +}; + +/** Compute the DAG neighborhood of a bead from the flat list. */ +function buildRelatedBeads(bead: BeadLike, allBeads: BeadLike[]): RelatedBead[] { + const related: RelatedBead[] = []; + + // Parent bead (already shown in metadata grid, but include in DAG for completeness) + // Skip — the metadata grid already renders a clickable parent link. + + // Child beads (beads whose parent_bead_id = this bead) + for (const b of allBeads) { + if (b.parent_bead_id === bead.bead_id) { + related.push({ relation: 'child', label: 'Child', icon: ArrowDownRight, bead: b }); + } + } + + // For merge_request beads: link back to the source bead + if (bead.type === 'merge_request' && typeof bead.metadata?.source_bead_id === 'string') { + const source = allBeads.find(b => b.bead_id === bead.metadata.source_bead_id); + if (source) { + related.push({ relation: 'source', label: 'Source Work', icon: CircleDot, bead: source }); + } + } + + // For non-MR beads: find any MR beads that track this bead + if (bead.type !== 'merge_request') { + for (const b of allBeads) { + if (b.type === 'merge_request' && b.metadata?.source_bead_id === bead.bead_id) { + related.push({ relation: 'review', label: 'Review', icon: GitPullRequest, bead: b }); + } + } + } + + // Beads that share the same parent (siblings) — skip, too noisy for now + + return related; +}