Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 177 additions & 4 deletions apps/web/src/orchestrationRecovery.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { describe, expect, it } from "vitest";

import { createOrchestrationRecoveryCoordinator } from "./orchestrationRecovery";
import {
createOrchestrationRecoveryCoordinator,
deriveReplayRetryDecision,
} from "./orchestrationRecovery";

describe("createOrchestrationRecoveryCoordinator", () => {
it("defers live events until bootstrap completes and then requests replay", () => {
Expand Down Expand Up @@ -59,18 +62,24 @@ describe("createOrchestrationRecoveryCoordinator", () => {
coordinator.classifyDomainEvent(7);
coordinator.markEventBatchApplied([{ sequence: 4 }, { sequence: 5 }, { sequence: 6 }]);

expect(coordinator.completeReplayRecovery()).toBe(true);
expect(coordinator.completeReplayRecovery()).toEqual({
replayMadeProgress: true,
shouldReplay: true,
});
});

it("does not immediately replay again when replay returns no new events", () => {
it("retries replay when no progress was made but higher live sequences were observed", () => {
const coordinator = createOrchestrationRecoveryCoordinator();

coordinator.beginSnapshotRecovery("bootstrap");
coordinator.completeSnapshotRecovery(3);
coordinator.classifyDomainEvent(5);
coordinator.beginReplayRecovery("sequence-gap");

expect(coordinator.completeReplayRecovery()).toBe(false);
expect(coordinator.completeReplayRecovery()).toEqual({
replayMadeProgress: false,
shouldReplay: true,
});
expect(coordinator.getState()).toMatchObject({
latestSequence: 3,
highestObservedSequence: 5,
Expand All @@ -79,6 +88,19 @@ describe("createOrchestrationRecoveryCoordinator", () => {
});
});

it("does not request another replay when a replay made no progress and nothing newer was observed", () => {
const coordinator = createOrchestrationRecoveryCoordinator();

coordinator.beginSnapshotRecovery("bootstrap");
coordinator.completeSnapshotRecovery(3);
coordinator.beginReplayRecovery("sequence-gap");

expect(coordinator.completeReplayRecovery()).toEqual({
replayMadeProgress: false,
shouldReplay: false,
});
});

it("marks replay failure as unbootstrapped so snapshot fallback is recovery-only", () => {
const coordinator = createOrchestrationRecoveryCoordinator();

Expand Down Expand Up @@ -131,3 +153,154 @@ describe("createOrchestrationRecoveryCoordinator", () => {
});
});
});

describe("deriveReplayRetryDecision", () => {
it("retries immediately when replay made progress", () => {
expect(
deriveReplayRetryDecision({
previousTracker: {
attempts: 2,
latestSequence: 3,
highestObservedSequence: 5,
},
completion: {
replayMadeProgress: true,
shouldReplay: true,
},
recoveryState: {
latestSequence: 5,
highestObservedSequence: 5,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
}),
).toEqual({
shouldRetry: true,
delayMs: 0,
tracker: null,
});
});

it("caps no-progress retries for the same frontier", () => {
const first = deriveReplayRetryDecision({
previousTracker: null,
completion: {
replayMadeProgress: false,
shouldReplay: true,
},
recoveryState: {
latestSequence: 3,
highestObservedSequence: 5,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
});

const second = deriveReplayRetryDecision({
previousTracker: first.tracker,
completion: {
replayMadeProgress: false,
shouldReplay: true,
},
recoveryState: {
latestSequence: 3,
highestObservedSequence: 5,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
});

const third = deriveReplayRetryDecision({
previousTracker: second.tracker,
completion: {
replayMadeProgress: false,
shouldReplay: true,
},
recoveryState: {
latestSequence: 3,
highestObservedSequence: 5,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
});

const fourth = deriveReplayRetryDecision({
previousTracker: third.tracker,
completion: {
replayMadeProgress: false,
shouldReplay: true,
},
recoveryState: {
latestSequence: 3,
highestObservedSequence: 5,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
});

expect(first).toEqual({
shouldRetry: true,
delayMs: 100,
tracker: {
attempts: 1,
latestSequence: 3,
highestObservedSequence: 5,
},
});
expect(second).toEqual({
shouldRetry: true,
delayMs: 200,
tracker: {
attempts: 2,
latestSequence: 3,
highestObservedSequence: 5,
},
});
expect(third).toEqual({
shouldRetry: true,
delayMs: 400,
tracker: {
attempts: 3,
latestSequence: 3,
highestObservedSequence: 5,
},
});
expect(fourth).toEqual({
shouldRetry: false,
delayMs: 0,
tracker: null,
});
});

it("resets the retry budget when the replay frontier changes", () => {
const exhausted = {
attempts: 3,
latestSequence: 3,
highestObservedSequence: 5,
};

expect(
deriveReplayRetryDecision({
previousTracker: exhausted,
completion: {
replayMadeProgress: false,
shouldReplay: true,
},
recoveryState: {
latestSequence: 3,
highestObservedSequence: 6,
},
baseDelayMs: 100,
maxNoProgressRetries: 3,
}),
).toEqual({
shouldRetry: true,
delayMs: 100,
tracker: {
attempts: 1,
latestSequence: 3,
highestObservedSequence: 6,
},
});
});
});
78 changes: 72 additions & 6 deletions apps/web/src/orchestrationRecovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,74 @@ export interface OrchestrationRecoveryState {
inFlight: OrchestrationRecoveryPhase | null;
}

export interface ReplayRecoveryCompletion {
replayMadeProgress: boolean;
shouldReplay: boolean;
}

export interface ReplayRetryTracker {
attempts: number;
latestSequence: number;
highestObservedSequence: number;
}

export interface ReplayRetryDecision {
shouldRetry: boolean;
delayMs: number;
tracker: ReplayRetryTracker | null;
}

type SequencedEvent = Readonly<{ sequence: number }>;

export function deriveReplayRetryDecision(input: {
previousTracker: ReplayRetryTracker | null;
completion: ReplayRecoveryCompletion;
recoveryState: Pick<OrchestrationRecoveryState, "latestSequence" | "highestObservedSequence">;
baseDelayMs: number;
maxNoProgressRetries: number;
}): ReplayRetryDecision {
if (!input.completion.shouldReplay) {
return {
shouldRetry: false,
delayMs: 0,
tracker: null,
};
}

if (input.completion.replayMadeProgress) {
return {
shouldRetry: true,
delayMs: 0,
tracker: null,
};
}

const previousTracker = input.previousTracker;
const sameFrontier =
previousTracker !== null &&
previousTracker.latestSequence === input.recoveryState.latestSequence &&
previousTracker.highestObservedSequence === input.recoveryState.highestObservedSequence;

const attempts = sameFrontier && previousTracker !== null ? previousTracker.attempts + 1 : 1;
if (attempts > input.maxNoProgressRetries) {
return {
shouldRetry: false,
delayMs: 0,
tracker: null,
};
}

return {
shouldRetry: true,
delayMs: input.baseDelayMs * 2 ** (attempts - 1),
tracker: {
attempts,
latestSequence: input.recoveryState.latestSequence,
highestObservedSequence: input.recoveryState.highestObservedSequence,
},
};
}

export function createOrchestrationRecoveryCoordinator() {
let state: OrchestrationRecoveryState = {
latestSequence: 0,
Expand Down Expand Up @@ -120,16 +186,16 @@ export function createOrchestrationRecoveryCoordinator() {
return true;
},

completeReplayRecovery(): boolean {
completeReplayRecovery(): ReplayRecoveryCompletion {
const replayMadeProgress =
replayStartSequence !== null && state.latestSequence > replayStartSequence;
replayStartSequence = null;
state.inFlight = null;
if (!replayMadeProgress) {
state.pendingReplay = false;
return false;
}
return resolveReplayNeedAfterRecovery().shouldReplay;
const replayResolution = resolveReplayNeedAfterRecovery();
return {
replayMadeProgress,
shouldReplay: replayResolution.shouldReplay,
};
},

failReplayRecovery(): void {
Expand Down
38 changes: 36 additions & 2 deletions apps/web/src/routes/__root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { projectQueryKeys } from "../lib/projectReactQuery";
import { collectActiveTerminalThreadIds } from "../lib/terminalStateCleanup";
import { deriveOrchestrationBatchEffects } from "../orchestrationEventEffects";
import { createOrchestrationRecoveryCoordinator } from "../orchestrationRecovery";
import { deriveReplayRetryDecision } from "../orchestrationRecovery";
import { getWsRpcClient } from "~/wsRpcClient";

export const Route = createRootRouteWithContext<{
Expand Down Expand Up @@ -189,6 +190,9 @@ function coalesceOrchestrationUiEvents(
return coalesced;
}

const REPLAY_RECOVERY_RETRY_DELAY_MS = 100;
const MAX_NO_PROGRESS_REPLAY_RETRIES = 3;

function ServerStateBootstrap() {
useEffect(() => startServerStateSync(getWsRpcClient().server), []);

Expand Down Expand Up @@ -309,6 +313,7 @@ function EventRouter() {
let disposed = false;
disposedRef.current = false;
const recovery = createOrchestrationRecoveryCoordinator();
let replayRetryTracker: import("../orchestrationRecovery").ReplayRetryTracker | null = null;
let needsProviderInvalidation = false;
const pendingDomainEvents: OrchestrationEvent[] = [];
let flushPendingDomainEventsScheduled = false;
Expand Down Expand Up @@ -435,13 +440,42 @@ function EventRouter() {
applyEventBatch(events);
}
} catch {
replayRetryTracker = null;
recovery.failReplayRecovery();
void fallbackToSnapshotRecovery();
return;
}

if (!disposed && recovery.completeReplayRecovery()) {
void recoverFromSequenceGap();
if (!disposed) {
const replayCompletion = recovery.completeReplayRecovery();
const retryDecision = deriveReplayRetryDecision({
previousTracker: replayRetryTracker,
completion: replayCompletion,
recoveryState: recovery.getState(),
baseDelayMs: REPLAY_RECOVERY_RETRY_DELAY_MS,
maxNoProgressRetries: MAX_NO_PROGRESS_REPLAY_RETRIES,
});
replayRetryTracker = retryDecision.tracker;

if (retryDecision.shouldRetry) {
if (retryDecision.delayMs > 0) {
await new Promise<void>((resolve) => {
setTimeout(resolve, retryDecision.delayMs);
});
if (disposed) {
return;
}
}
void recoverFromSequenceGap();
} else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") {
console.warn(
"[orchestration-recovery]",
"Stopping replay recovery after no-progress retries.",
{
state: recovery.getState(),
},
);
}
}
};

Expand Down
Loading