diff --git a/services/gastown/src/dos/town/actions.ts b/services/gastown/src/dos/town/actions.ts index d7fdfee532..bb1482d035 100644 --- a/services/gastown/src/dos/town/actions.ts +++ b/services/gastown/src/dos/town/actions.ts @@ -132,6 +132,12 @@ const CloseConvoy = z.object({ convoy_id: z.string(), }); +const FailConvoy = z.object({ + type: z.literal('fail_convoy'), + convoy_id: z.string(), + reason: z.string(), +}); + // ── Side effects (deferred) ───────────────────────────────────────── const DispatchAgent = z.object({ @@ -206,6 +212,7 @@ export const Action = z.discriminatedUnion('type', [ UpdateConvoyProgress, SetConvoyReadyToLand, CloseConvoy, + FailConvoy, // Side effects DispatchAgent, StopAgent, @@ -239,6 +246,7 @@ export type DeleteAgent = z.infer; export type UpdateConvoyProgress = z.infer; export type SetConvoyReadyToLand = z.infer; export type CloseConvoy = z.infer; +export type FailConvoy = z.infer; export type DispatchAgent = z.infer; export type StopAgent = z.infer; export type PollPr = z.infer; @@ -397,7 +405,22 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'create_landing_mr': { - // Create an MR bead for the landing merge (feature branch → main) + const timestamp = now(); + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.landing_mr_attempts', + COALESCE(json_extract(${beads.columns.metadata}, '$.landing_mr_attempts'), 0) + 1, + '$.last_landing_mr_attempt_at', ? + ), + ${beads.columns.updated_at} = ? + WHERE ${beads.bead_id} = ? + `, + [timestamp, timestamp, action.convoy_id] + ); reviewQueue.submitToReviewQueue(sql, { agent_id: 'system', bead_id: action.convoy_id, @@ -592,7 +615,6 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'close_convoy': { - // Use updateBeadStatus for terminal state guard + bead event logging beadOps.updateBeadStatus(sql, action.convoy_id, 'closed', 'system'); query( sql, @@ -606,6 +628,25 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro return null; } + case 'fail_convoy': { + beadOps.updateBeadStatus(sql, action.convoy_id, 'failed', 'system'); + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.failureReason', 'landing_mr_exhausted', + '$.failureMessage', ? + ), + ${beads.columns.updated_at} = ? + WHERE ${beads.bead_id} = ? + `, + [action.reason, now(), action.convoy_id] + ); + return null; + } + // ── Side effects (deferred) ───────────────────────────────── case 'dispatch_agent': { diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index 111473fd4e..91884509e5 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -45,6 +45,15 @@ const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 20; /** Window in minutes for counting dispatch failures. */ const CIRCUIT_BREAKER_WINDOW_MINUTES = 30; +/** Max landing MR creation attempts before failing the convoy (#2260). */ +const MAX_LANDING_MR_ATTEMPTS = 5; + +/** Base cooldown for landing MR retry: min(2^attempts * BASE, MAX) (#2260). */ +const LANDING_MR_COOLDOWN_BASE_MS = 30_000; // 30s + +/** Max cooldown for landing MR retry (#2260). */ +const LANDING_MR_COOLDOWN_MAX_MS = 30 * 60_000; // 30 min + /** * Town-level dispatch circuit breaker. Counts beads with at least one * dispatch attempt in the recent window that have not yet closed @@ -1723,14 +1732,19 @@ export function reconcileConvoys(sql: SqlStorage): Action[] { if (progressRows.length === 0) continue; const { closed_count, total_count } = progressRows[0]; - // Update progress if stale - if (closed_count !== convoy.closed_beads) { - actions.push({ - type: 'update_convoy_progress', - convoy_id: convoy.bead_id, - closed_beads: closed_count, - }); + // Parse convoy metadata for landing MR tracking fields (#2260) + let parsedMeta: Record = {}; + try { + parsedMeta = JSON.parse(convoy.metadata) as Record; + } catch { + /* ignore */ } + const landingMrAttempts = + typeof parsedMeta.landing_mr_attempts === 'number' ? parsedMeta.landing_mr_attempts : 0; + const lastLandingMrAttemptAt = + typeof parsedMeta.last_landing_mr_attempt_at === 'string' + ? parsedMeta.last_landing_mr_attempt_at + : null; // Check for in-flight MR beads (open or in_progress) for tracked issue beads const inFlightMrCount = z @@ -1759,31 +1773,36 @@ export function reconcileConvoys(sql: SqlStorage): Action[] { const hasInFlightReviews = (inFlightMrCount[0]?.cnt ?? 0) > 0; // Check if all beads done - if (closed_count >= total_count && total_count > 0 && !hasInFlightReviews) { - let parsedMeta: Record = {}; - try { - parsedMeta = JSON.parse(convoy.metadata) as Record; - } catch { - /* ignore */ - } + const allBeadsDone = closed_count >= total_count && total_count > 0 && !hasInFlightReviews; - if (convoy.merge_mode === 'review-then-land' && convoy.feature_branch) { - if (!parsedMeta.ready_to_land) { - actions.push({ - type: 'set_convoy_ready_to_land', - convoy_id: convoy.bead_id, - }); - } + // Update progress if stale (skip if we're failing/closing the convoy this tick) + if (closed_count !== convoy.closed_beads) { + actions.push({ + type: 'update_convoy_progress', + convoy_id: convoy.bead_id, + closed_beads: closed_count, + }); + } - if (parsedMeta.ready_to_land) { - // Check if a landing MR already exists (any status) - const landingMrs = z - .object({ status: z.string() }) - .array() - .parse([ - ...query( - sql, - /* sql */ ` + if (!allBeadsDone) continue; + + if (convoy.merge_mode === 'review-then-land' && convoy.feature_branch) { + if (!parsedMeta.ready_to_land) { + actions.push({ + type: 'set_convoy_ready_to_land', + convoy_id: convoy.bead_id, + }); + } + + if (parsedMeta.ready_to_land) { + // Check if a landing MR already exists (any status) + const landingMrs = z + .object({ status: z.string() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` SELECT mr.${beads.columns.status} FROM ${bead_dependencies} bd INNER JOIN ${beads} mr ON mr.${beads.columns.bead_id} = bd.${bead_dependencies.columns.bead_id} @@ -1791,36 +1810,87 @@ export function reconcileConvoys(sql: SqlStorage): Action[] { AND bd.${bead_dependencies.columns.dependency_type} = 'tracks' AND mr.${beads.columns.type} = 'merge_request' `, - [convoy.bead_id] - ), - ]); + [convoy.bead_id] + ), + ]); - // If a landing MR was already merged (closed), close the convoy - const hasMergedLanding = landingMrs.some(mr => mr.status === 'closed'); - if (hasMergedLanding) { - actions.push({ - type: 'close_convoy', - convoy_id: convoy.bead_id, - }); - continue; - } + // If a landing MR was already merged (closed), close the convoy + const hasMergedLanding = landingMrs.some(mr => mr.status === 'closed'); + if (hasMergedLanding) { + actions.push({ + type: 'close_convoy', + convoy_id: convoy.bead_id, + }); + continue; + } + + // Fix 1 (#2260): If a landing MR is active (open or in_progress), wait — don't create another + const hasActiveLanding = landingMrs.some( + mr => mr.status === 'open' || mr.status === 'in_progress' + ); + if (hasActiveLanding) continue; + + // Fix 2 (#2260): If max landing MR attempts exceeded and no landing MR is + // active or merged, fail the convoy. Checked after landing MR status lookup + // so the final allowed attempt can still succeed. + if (landingMrAttempts >= MAX_LANDING_MR_ATTEMPTS) { + actions.push({ + type: 'fail_convoy', + convoy_id: convoy.bead_id, + reason: `Landing MR creation failed after ${MAX_LANDING_MR_ATTEMPTS} attempts`, + }); + continue; + } + + // Fix 2 (#2260): Apply exponential cooldown between landing MR attempts + if (landingMrAttempts > 0 && lastLandingMrAttemptAt) { + const elapsed = Date.now() - new Date(lastLandingMrAttemptAt).getTime(); + const cooldownMs = Math.min( + Math.pow(2, landingMrAttempts) * LANDING_MR_COOLDOWN_BASE_MS, + LANDING_MR_COOLDOWN_MAX_MS + ); + if (elapsed < cooldownMs) continue; + } + + // Fix 3 (#2260): Check that tracked beads have at least one MR with a PR URL + const convoyBeadsWithPr = z + .object({ cnt: z.number() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT count(*) as cnt + FROM ${bead_dependencies} track_dep + INNER JOIN ${bead_dependencies} mr_dep + ON mr_dep.${bead_dependencies.columns.depends_on_bead_id} = track_dep.${bead_dependencies.columns.bead_id} + INNER JOIN ${review_metadata} rm + ON rm.${review_metadata.columns.bead_id} = mr_dep.${bead_dependencies.columns.bead_id} + WHERE track_dep.${bead_dependencies.columns.depends_on_bead_id} = ? + AND track_dep.${bead_dependencies.columns.dependency_type} = 'tracks' + AND mr_dep.${bead_dependencies.columns.dependency_type} = 'tracks' + AND rm.${review_metadata.columns.pr_url} IS NOT NULL + `, + [convoy.bead_id] + ), + ]); - // If a landing MR is active (open or in_progress), wait for it - const hasActiveLanding = landingMrs.some( - mr => mr.status === 'open' || mr.status === 'in_progress' + if ((convoyBeadsWithPr[0]?.cnt ?? 0) === 0) { + console.warn( + `${LOG} convoy ${convoy.bead_id} has no beads with pr_url — skipping create_landing_mr` ); - if (hasActiveLanding) continue; - - // No landing MR exists yet — create one - { - // Need rig_id from one of the tracked beads - const rigRows = z - .object({ rig_id: z.string() }) - .array() - .parse([ - ...query( - sql, - /* sql */ ` + continue; + } + + // No landing MR exists yet and cooldown has passed — create one + { + const rigRows = z + .object({ rig_id: z.string() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` SELECT DISTINCT tracked.${beads.columns.rig_id} as rig_id FROM ${bead_dependencies} bd INNER JOIN ${beads} tracked ON tracked.${beads.columns.bead_id} = bd.${bead_dependencies.columns.bead_id} @@ -1829,29 +1899,28 @@ export function reconcileConvoys(sql: SqlStorage): Action[] { AND tracked.${beads.columns.rig_id} IS NOT NULL LIMIT 1 `, - [convoy.bead_id] - ), - ]); - - if (rigRows.length > 0) { - const rig = getRig(sql, rigRows[0].rig_id); - actions.push({ - type: 'create_landing_mr', - convoy_id: convoy.bead_id, - rig_id: rigRows[0].rig_id, - feature_branch: convoy.feature_branch, - target_branch: rig?.default_branch ?? 'main', - }); - } + [convoy.bead_id] + ), + ]); + + if (rigRows.length > 0) { + const rig = getRig(sql, rigRows[0].rig_id); + actions.push({ + type: 'create_landing_mr', + convoy_id: convoy.bead_id, + rig_id: rigRows[0].rig_id, + feature_branch: convoy.feature_branch, + target_branch: rig?.default_branch ?? 'main', + }); } } - } else { - // review-and-merge or no feature branch — auto-close - actions.push({ - type: 'close_convoy', - convoy_id: convoy.bead_id, - }); } + } else { + // review-and-merge or no feature branch — auto-close + actions.push({ + type: 'close_convoy', + convoy_id: convoy.bead_id, + }); } }