Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1446,8 +1446,14 @@ export class TownDO extends DurableObject<Env> {
const entry = reviewQueue.popReviewQueue(this.sql);
if (!entry) return;

const rigList = rigs.listRigs(this.sql);
const rigId = rigList[0]?.id ?? '';
// Resolve rig from the merge_request bead — not rigList[0] which would
// pick the wrong rig in multi-rig towns.
const rigId = entry.rig_id;
if (!rigId) {
console.error(`${TOWN_LOG} processReviewQueue: entry ${entry.id} has no rig_id, skipping`);
reviewQueue.completeReview(this.sql, entry.id, 'failed');
return;
}
const rigConfig = await this.getRigConfig(rigId);
if (!rigConfig) {
reviewQueue.completeReview(this.sql, entry.id, 'failed');
Expand All @@ -1471,7 +1477,10 @@ export class TownDO extends DurableObject<Env> {
polecatAgentId: entry.agent_id,
});

agents.hookBead(this.sql, refineryAgent.id, entry.bead_id);
// Hook the refinery to the MR bead (entry.id), not the source bead
// (entry.bead_id). The source bead stays closed with its original
// polecat assignee preserved.
agents.hookBead(this.sql, refineryAgent.id, entry.id);

const started = await dispatch.startAgentInContainer(this.env, this.ctx.storage, {
townId: this.townId,
Expand All @@ -1481,7 +1490,7 @@ export class TownDO extends DurableObject<Env> {
agentName: refineryAgent.name,
role: 'refinery',
identity: refineryAgent.identity,
beadId: entry.bead_id,
beadId: entry.id,
beadTitle: `Review merge: ${entry.branch} → ${rigConfig.defaultBranch}`,
beadBody: entry.summary ?? '',
checkpoint: null,
Expand Down
12 changes: 6 additions & 6 deletions cloudflare-gastown/src/dos/town/agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,28 +342,28 @@ export function getOrCreateAgent(
rigId: string,
townId: string
): Agent {
const singletonRoles = ['witness', 'refinery', 'mayor'];
// Town-wide singletons: one per town, not tied to a rig.
const townSingletonRoles = ['witness', 'mayor'];

if (singletonRoles.includes(role)) {
// Try to find an existing agent with this role
if (townSingletonRoles.includes(role)) {
const existing = listAgents(sql, { role });
if (existing.length > 0) return existing[0];
} else {
// For polecats, try to find an idle one without a hook in the SAME rig.
// Per-rig agents (polecat, refinery): reuse an idle one in the SAME rig.
// Agents are tied to a rig's worktree/repo — reusing one from a different
// rig would dispatch it into the wrong repository.
const idle = [
...query(
sql,
/* sql */ `
${AGENT_JOIN}
WHERE ${agent_metadata.role} = 'polecat'
WHERE ${agent_metadata.role} = ?
AND ${agent_metadata.status} = 'idle'
AND ${agent_metadata.current_hook_bead_id} IS NULL
AND ${beads.rig_id} = ?
LIMIT 1
`,
[rigId]
[role, rigId]
),
];
if (idle.length > 0) return toAgent(AgentBeadRecord.parse(idle[0]));
Expand Down
3 changes: 3 additions & 0 deletions cloudflare-gastown/src/dos/town/beads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ export function updateBeadStatus(
const bead = getBead(sql, beadId);
if (!bead) throw new Error(`Bead ${beadId} not found`);

// No-op if already in the target status — avoids redundant events
if (bead.status === status) return bead;

const oldStatus = bead.status;
const timestamp = now();
const closedAt = status === 'closed' ? timestamp : bead.closed_at;
Expand Down
97 changes: 58 additions & 39 deletions cloudflare-gastown/src/dos/town/review-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ const REVIEW_JOIN = /* sql */ `
function toReviewQueueEntry(row: MergeRequestBeadRecord): ReviewQueueEntry {
return {
id: row.bead_id,
agent_id: row.assignee_agent_bead_id ?? row.created_by ?? '',
// The polecat that submitted the review — stored in metadata (not assignee,
// which is set to the refinery when it claims the MR bead via hookBead).
agent_id:
typeof row.metadata?.source_agent_id === 'string'
? row.metadata.source_agent_id
: (row.created_by ?? ''),
bead_id:
typeof row.metadata?.source_bead_id === 'string' ? row.metadata.source_bead_id : row.bead_id,
rig_id: row.rig_id ?? '',
Comment thread
jrf0110 marked this conversation as resolved.
branch: row.branch,
pr_url: row.pr_url,
status:
Expand Down Expand Up @@ -89,19 +95,32 @@ export function submitToReviewQueue(sql: SqlStorage, input: ReviewQueueInput): v
'open',
`Review: ${input.branch}`,
input.summary ?? null,
input.rig_id,
null,
null,
input.agent_id,
null, // assignee left null — refinery claims it via hookBead
'medium',
JSON.stringify(['gt:merge-request']),
JSON.stringify({ source_bead_id: input.bead_id }),
input.agent_id,
JSON.stringify({ source_bead_id: input.bead_id, source_agent_id: input.agent_id }),
input.agent_id, // created_by records who submitted
timestamp,
timestamp,
null,
]
);

// Link MR bead → source bead via bead_dependencies so the DAG is queryable
query(
sql,
/* sql */ `
INSERT INTO ${bead_dependencies} (
${bead_dependencies.columns.bead_id},
${bead_dependencies.columns.depends_on_bead_id},
${bead_dependencies.columns.dependency_type}
) VALUES (?, ?, 'tracks')
`,
[id, input.bead_id]
);

// Create the review_metadata satellite
query(
sql,
Expand Down Expand Up @@ -255,65 +274,65 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu
if (!agent.current_hook_bead_id) throw new Error(`Agent ${agentId} has no hooked bead`);

if (agent.role === 'refinery') {
// Refinery agents merge the code themselves then call gt_done.
// Find the in-progress review entry whose source_bead_id matches the
// hooked bead and complete it, which also closes the original bead.
completeReviewForSourceBead(sql, agent.current_hook_bead_id, agentId);
// The refinery is hooked to the MR bead. Mark it as merged and log
// the review_completed event on the source bead.
const mrBeadId = agent.current_hook_bead_id;
completeReviewFromMRBead(sql, mrBeadId, agentId);
unhookBead(sql, agentId);
return;
}

const sourceBead = agent.current_hook_bead_id;

if (!agent.rig_id) {
console.warn(
`[review-queue] agentDone: agent ${agentId} has null rig_id — review entry may fail in processReviewQueue`
);
}

submitToReviewQueue(sql, {
agent_id: agentId,
bead_id: agent.current_hook_bead_id,
bead_id: sourceBead,
rig_id: agent.rig_id ?? '',
Comment thread
jrf0110 marked this conversation as resolved.
branch: input.branch,
pr_url: input.pr_url,
summary: input.summary,
});

// Close the source bead (matches upstream gt done behavior). The polecat's
// work is done — the MR bead now tracks the merge lifecycle. The source
// bead retains its assignee so we know which agent worked on it.
unhookBead(sql, agentId);
closeBead(sql, sourceBead, agentId);
Comment thread
jrf0110 marked this conversation as resolved.
}

/**
* Find the merge_request bead whose metadata.source_bead_id matches the
* given bead and complete it as 'merged'. Also closes the original bead.
*
* Used when a refinery agent finishes: it has already merged the code
* itself, so we just need to mark the review + source bead as done.
* Complete a review given the MR bead id directly (the refinery is hooked
* to the MR bead). Marks the MR as merged and logs a review_completed
* event on the source bead. The source bead itself is already closed by
* the polecat's agentDone path.
*/
function completeReviewForSourceBead(sql: SqlStorage, sourceBeadId: string, agentId: string): void {
// Find the merge_request bead for this source bead (most recent first)
const rows = [
...query(
sql,
/* sql */ `
${REVIEW_JOIN}
WHERE ${beads.status} IN ('open', 'in_progress')
AND json_extract(${beads.metadata}, '$.source_bead_id') = ?
ORDER BY ${beads.created_at} DESC
LIMIT 1
`,
[sourceBeadId]
),
];
function completeReviewFromMRBead(sql: SqlStorage, mrBeadId: string, agentId: string): void {
const mrBead = getBead(sql, mrBeadId);
Comment thread
jrf0110 marked this conversation as resolved.
if (!mrBead) {
console.error(
`[review-queue] completeReviewFromMRBead: MR bead ${mrBeadId} not found — data integrity issue`
);
return;
}
const sourceBeadId = mrBead.metadata?.source_bead_id;

if (rows.length > 0) {
const parsed = MergeRequestBeadRecord.parse(rows[0]);
const entry = toReviewQueueEntry(parsed);
completeReview(sql, entry.id, 'merged');
completeReview(sql, mrBeadId, 'merged');

if (typeof sourceBeadId === 'string') {
logBeadEvent(sql, {
beadId: sourceBeadId,
agentId,
eventType: 'review_completed',
newValue: 'merged',
metadata: { completedBy: 'refinery' },
metadata: { completedBy: 'refinery', mr_bead_id: mrBeadId },
});
}

// Close the original bead regardless of whether we found a review entry.
// The refinery confirmed the work is merged — the source bead is done.
closeBead(sql, sourceBeadId, agentId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function handleSubmitToReviewQueue(c: Context<GastownEnv>, params:
}
const townId = c.get('townId');
const town = getTownDOStub(c.env, townId);
await town.submitToReviewQueue(parsed.data);
await town.submitToReviewQueue({ ...parsed.data, rig_id: params.rigId });
return c.json(resSuccess({ submitted: true }), 201);
}

Expand Down
2 changes: 2 additions & 0 deletions cloudflare-gastown/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export type ReviewQueueEntry = {
id: string;
agent_id: string;
bead_id: string;
rig_id: string;
branch: string;
pr_url: string | null;
status: ReviewStatus;
Expand All @@ -130,6 +131,7 @@ export type ReviewQueueEntry = {
export type ReviewQueueInput = {
agent_id: string;
bead_id: string;
rig_id: string;
branch: string;
pr_url?: string;
summary?: string;
Expand Down
1 change: 1 addition & 0 deletions cloudflare-gastown/test/integration/rig-alarm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ describe('Town DO Alarm', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/review',
});

Expand Down
5 changes: 5 additions & 0 deletions cloudflare-gastown/test/integration/rig-do.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ describe('TownDO', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/fix-widget',
pr_url: 'https://github.com/org/repo/pull/1',
summary: 'Fixed the widget',
Expand Down Expand Up @@ -394,6 +395,7 @@ describe('TownDO', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/fix',
});

Expand All @@ -418,6 +420,7 @@ describe('TownDO', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/merge-test',
});

Expand Down Expand Up @@ -452,6 +455,7 @@ describe('TownDO', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/conflict-test',
});

Expand Down Expand Up @@ -751,6 +755,7 @@ describe('TownDO', () => {
await town.submitToReviewQueue({
agent_id: agent.id,
bead_id: bead.id,
rig_id: 'test-rig',
branch: 'feature/test',
});

Expand Down
Loading