Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions services/gastown/src/dos/town/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ const CloseConvoy = z.object({
convoy_id: z.string(),
});

const FailConvoy = z.object({
type: z.literal('fail_convoy'),
convoy_id: z.string(),
reason: z.string(),
});

// ── Side effects (deferred) ─────────────────────────────────────────

const DispatchAgent = z.object({
Expand Down Expand Up @@ -206,6 +212,7 @@ export const Action = z.discriminatedUnion('type', [
UpdateConvoyProgress,
SetConvoyReadyToLand,
CloseConvoy,
FailConvoy,
// Side effects
DispatchAgent,
StopAgent,
Expand Down Expand Up @@ -239,6 +246,7 @@ export type DeleteAgent = z.infer<typeof DeleteAgent>;
export type UpdateConvoyProgress = z.infer<typeof UpdateConvoyProgress>;
export type SetConvoyReadyToLand = z.infer<typeof SetConvoyReadyToLand>;
export type CloseConvoy = z.infer<typeof CloseConvoy>;
export type FailConvoy = z.infer<typeof FailConvoy>;
export type DispatchAgent = z.infer<typeof DispatchAgent>;
export type StopAgent = z.infer<typeof StopAgent>;
export type PollPr = z.infer<typeof PollPr>;
Expand Down Expand Up @@ -397,7 +405,22 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro
}

case 'create_landing_mr': {
// Create an MR bead for the landing merge (feature branch → main)
const timestamp = now();
query(
sql,
/* sql */ `
UPDATE ${beads}
SET ${beads.columns.metadata} = json_set(
COALESCE(${beads.columns.metadata}, '{}'),
'$.landing_mr_attempts',
COALESCE(json_extract(${beads.columns.metadata}, '$.landing_mr_attempts'), 0) + 1,
'$.last_landing_mr_attempt_at', ?
),
${beads.columns.updated_at} = ?
WHERE ${beads.bead_id} = ?
`,
[timestamp, timestamp, action.convoy_id]
);
reviewQueue.submitToReviewQueue(sql, {
agent_id: 'system',
bead_id: action.convoy_id,
Expand Down Expand Up @@ -592,7 +615,6 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro
}

case 'close_convoy': {
// Use updateBeadStatus for terminal state guard + bead event logging
beadOps.updateBeadStatus(sql, action.convoy_id, 'closed', 'system');
query(
sql,
Expand All @@ -606,6 +628,25 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro
return null;
}

case 'fail_convoy': {
beadOps.updateBeadStatus(sql, action.convoy_id, 'failed', 'system');
query(
sql,
/* sql */ `
UPDATE ${beads}
SET ${beads.columns.metadata} = json_set(
COALESCE(${beads.columns.metadata}, '{}'),
'$.failureReason', 'landing_mr_exhausted',
'$.failureMessage', ?
),
${beads.columns.updated_at} = ?
WHERE ${beads.bead_id} = ?
`,
[action.reason, now(), action.convoy_id]
);
return null;
}

// ── Side effects (deferred) ─────────────────────────────────

case 'dispatch_agent': {
Expand Down
223 changes: 146 additions & 77 deletions services/gastown/src/dos/town/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 20;
/** Window in minutes for counting dispatch failures. */
const CIRCUIT_BREAKER_WINDOW_MINUTES = 30;

/** Max landing MR creation attempts before failing the convoy (#2260). */
const MAX_LANDING_MR_ATTEMPTS = 5;

/** Base cooldown for landing MR retry: min(2^attempts * BASE, MAX) (#2260). */
const LANDING_MR_COOLDOWN_BASE_MS = 30_000; // 30s

/** Max cooldown for landing MR retry (#2260). */
const LANDING_MR_COOLDOWN_MAX_MS = 30 * 60_000; // 30 min

/**
* Town-level dispatch circuit breaker. Counts beads with at least one
* dispatch attempt in the recent window that have not yet closed
Expand Down Expand Up @@ -1723,14 +1732,19 @@ export function reconcileConvoys(sql: SqlStorage): Action[] {
if (progressRows.length === 0) continue;
const { closed_count, total_count } = progressRows[0];

// Update progress if stale
if (closed_count !== convoy.closed_beads) {
actions.push({
type: 'update_convoy_progress',
convoy_id: convoy.bead_id,
closed_beads: closed_count,
});
// Parse convoy metadata for landing MR tracking fields (#2260)
let parsedMeta: Record<string, unknown> = {};
try {
parsedMeta = JSON.parse(convoy.metadata) as Record<string, unknown>;
} catch {
/* ignore */
}
const landingMrAttempts =
typeof parsedMeta.landing_mr_attempts === 'number' ? parsedMeta.landing_mr_attempts : 0;
const lastLandingMrAttemptAt =
typeof parsedMeta.last_landing_mr_attempt_at === 'string'
? parsedMeta.last_landing_mr_attempt_at
: null;

// Check for in-flight MR beads (open or in_progress) for tracked issue beads
const inFlightMrCount = z
Expand Down Expand Up @@ -1759,68 +1773,124 @@ export function reconcileConvoys(sql: SqlStorage): Action[] {
const hasInFlightReviews = (inFlightMrCount[0]?.cnt ?? 0) > 0;

// Check if all beads done
if (closed_count >= total_count && total_count > 0 && !hasInFlightReviews) {
let parsedMeta: Record<string, unknown> = {};
try {
parsedMeta = JSON.parse(convoy.metadata) as Record<string, unknown>;
} catch {
/* ignore */
}
const allBeadsDone = closed_count >= total_count && total_count > 0 && !hasInFlightReviews;

if (convoy.merge_mode === 'review-then-land' && convoy.feature_branch) {
if (!parsedMeta.ready_to_land) {
actions.push({
type: 'set_convoy_ready_to_land',
convoy_id: convoy.bead_id,
});
}
// Update progress if stale (skip if we're failing/closing the convoy this tick)
if (closed_count !== convoy.closed_beads) {
actions.push({
type: 'update_convoy_progress',
convoy_id: convoy.bead_id,
closed_beads: closed_count,
});
}

if (parsedMeta.ready_to_land) {
// Check if a landing MR already exists (any status)
const landingMrs = z
.object({ status: z.string() })
.array()
.parse([
...query(
sql,
/* sql */ `
if (!allBeadsDone) continue;

if (convoy.merge_mode === 'review-then-land' && convoy.feature_branch) {
if (!parsedMeta.ready_to_land) {
actions.push({
type: 'set_convoy_ready_to_land',
convoy_id: convoy.bead_id,
});
}

if (parsedMeta.ready_to_land) {
// Check if a landing MR already exists (any status)
const landingMrs = z
.object({ status: z.string() })
.array()
.parse([
...query(
sql,
/* sql */ `
SELECT mr.${beads.columns.status}
FROM ${bead_dependencies} bd
INNER JOIN ${beads} mr ON mr.${beads.columns.bead_id} = bd.${bead_dependencies.columns.bead_id}
WHERE bd.${bead_dependencies.columns.depends_on_bead_id} = ?
AND bd.${bead_dependencies.columns.dependency_type} = 'tracks'
AND mr.${beads.columns.type} = 'merge_request'
`,
[convoy.bead_id]
),
]);
[convoy.bead_id]
),
]);

// If a landing MR was already merged (closed), close the convoy
const hasMergedLanding = landingMrs.some(mr => mr.status === 'closed');
if (hasMergedLanding) {
actions.push({
type: 'close_convoy',
convoy_id: convoy.bead_id,
});
continue;
}
// If a landing MR was already merged (closed), close the convoy
const hasMergedLanding = landingMrs.some(mr => mr.status === 'closed');
if (hasMergedLanding) {
actions.push({
type: 'close_convoy',
convoy_id: convoy.bead_id,
});
continue;
}

// Fix 1 (#2260): If a landing MR is active (open or in_progress), wait — don't create another
const hasActiveLanding = landingMrs.some(
mr => mr.status === 'open' || mr.status === 'in_progress'
);
if (hasActiveLanding) continue;

// Fix 2 (#2260): If max landing MR attempts exceeded and no landing MR is
// active or merged, fail the convoy. Checked after landing MR status lookup
// so the final allowed attempt can still succeed.
if (landingMrAttempts >= MAX_LANDING_MR_ATTEMPTS) {
actions.push({
type: 'fail_convoy',
convoy_id: convoy.bead_id,
reason: `Landing MR creation failed after ${MAX_LANDING_MR_ATTEMPTS} attempts`,
});
continue;
}

// Fix 2 (#2260): Apply exponential cooldown between landing MR attempts
if (landingMrAttempts > 0 && lastLandingMrAttemptAt) {
const elapsed = Date.now() - new Date(lastLandingMrAttemptAt).getTime();
const cooldownMs = Math.min(
Math.pow(2, landingMrAttempts) * LANDING_MR_COOLDOWN_BASE_MS,
LANDING_MR_COOLDOWN_MAX_MS
);
if (elapsed < cooldownMs) continue;
}

// Fix 3 (#2260): Check that tracked beads have at least one MR with a PR URL
const convoyBeadsWithPr = z
.object({ cnt: z.number() })
.array()
.parse([
...query(
sql,
/* sql */ `
SELECT count(*) as cnt
FROM ${bead_dependencies} track_dep
INNER JOIN ${bead_dependencies} mr_dep
ON mr_dep.${bead_dependencies.columns.depends_on_bead_id} = track_dep.${bead_dependencies.columns.bead_id}
INNER JOIN ${review_metadata} rm
ON rm.${review_metadata.columns.bead_id} = mr_dep.${bead_dependencies.columns.bead_id}
WHERE track_dep.${bead_dependencies.columns.depends_on_bead_id} = ?
AND track_dep.${bead_dependencies.columns.dependency_type} = 'tracks'
AND mr_dep.${bead_dependencies.columns.dependency_type} = 'tracks'
AND rm.${review_metadata.columns.pr_url} IS NOT NULL
`,
[convoy.bead_id]
),
]);

// If a landing MR is active (open or in_progress), wait for it
const hasActiveLanding = landingMrs.some(
mr => mr.status === 'open' || mr.status === 'in_progress'
if ((convoyBeadsWithPr[0]?.cnt ?? 0) === 0) {
console.warn(
`${LOG} convoy ${convoy.bead_id} has no beads with pr_url — skipping create_landing_mr`
);
if (hasActiveLanding) continue;

// No landing MR exists yet — create one
{
// Need rig_id from one of the tracked beads
const rigRows = z
.object({ rig_id: z.string() })
.array()
.parse([
...query(
sql,
/* sql */ `
continue;
}

// No landing MR exists yet and cooldown has passed — create one
{
const rigRows = z
.object({ rig_id: z.string() })
.array()
.parse([
...query(
sql,
/* sql */ `
SELECT DISTINCT tracked.${beads.columns.rig_id} as rig_id
FROM ${bead_dependencies} bd
INNER JOIN ${beads} tracked ON tracked.${beads.columns.bead_id} = bd.${bead_dependencies.columns.bead_id}
Expand All @@ -1829,29 +1899,28 @@ export function reconcileConvoys(sql: SqlStorage): Action[] {
AND tracked.${beads.columns.rig_id} IS NOT NULL
LIMIT 1
`,
[convoy.bead_id]
),
]);

if (rigRows.length > 0) {
const rig = getRig(sql, rigRows[0].rig_id);
actions.push({
type: 'create_landing_mr',
convoy_id: convoy.bead_id,
rig_id: rigRows[0].rig_id,
feature_branch: convoy.feature_branch,
target_branch: rig?.default_branch ?? 'main',
});
}
[convoy.bead_id]
),
]);

if (rigRows.length > 0) {
const rig = getRig(sql, rigRows[0].rig_id);
actions.push({
type: 'create_landing_mr',
convoy_id: convoy.bead_id,
rig_id: rigRows[0].rig_id,
feature_branch: convoy.feature_branch,
target_branch: rig?.default_branch ?? 'main',
});
}
}
} else {
// review-and-merge or no feature branch — auto-close
actions.push({
type: 'close_convoy',
convoy_id: convoy.bead_id,
});
}
} else {
// review-and-merge or no feature branch — auto-close
actions.push({
type: 'close_convoy',
convoy_id: convoy.bead_id,
});
}
}

Expand Down