From 7fc3b4781dca2f226ace8745354dc1341809f29e Mon Sep 17 00:00:00 2001 From: eggfriedrice Date: Mon, 9 Mar 2026 02:08:21 +0000 Subject: [PATCH 1/3] fix: checkpoint diffs never resolve due to shared PubSub subscriptions Stream.fromPubSub() creates a subscription eagerly. Storing the result as a property meant all consumers shared one queue, so only the first reader (wsServer/ProviderRuntimeIngestion) got events. CheckpointReactor received zero events and never captured git checkpoints. Fix: change streamDomainEvents and streamEvents to getters so each consumer gets its own subscription. Also add a domain event handler that replaces placeholder checkpoints with real git captures, and extract shared CWD resolution and capture logic to reduce duplication. Closes #585 --- .../orchestration/Layers/CheckpointReactor.ts | 334 ++++++++++++------ .../Layers/OrchestrationEngine.ts | 10 +- .../src/provider/Layers/ProviderService.ts | 7 +- 3 files changed, 238 insertions(+), 113 deletions(-) diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index 52243248fb..da484794ef 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -2,6 +2,7 @@ import { CommandId, EventId, MessageId, + type ProjectId, ThreadId, TurnId, type OrchestrationEvent, @@ -147,84 +148,79 @@ const make = Effect.gen(function* () { const isGitWorkspace = (cwd: string) => isGitRepository(cwd); - const captureCheckpointFromTurnCompletion = Effect.fnUntraced(function* ( - event: Extract, - ) { - const turnId = toTurnId(event.turnId); - if (!turnId) { - return; - } - - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); - if (!thread) { - return; - } - - // When a primary turn is active, only that turn may produce completion checkpoints. - if (thread.session?.activeTurnId && !sameId(thread.session.activeTurnId, turnId)) { - return; - } + // Resolves the workspace CWD for checkpoint operations, preferring the + // active provider session CWD and falling back to the thread/project config. + // Returns undefined when no CWD can be determined or the workspace is not + // a git repository. + const resolveCheckpointCwd = Effect.fnUntraced(function* (input: { + readonly threadId: ThreadId; + readonly thread: { readonly projectId: ProjectId; readonly worktreePath: string | null }; + readonly projects: ReadonlyArray<{ readonly id: ProjectId; readonly workspaceRoot: string }>; + readonly preferSessionRuntime: boolean; + }): Effect.fn.Return { + const fromSession = yield* resolveSessionRuntimeForThread(input.threadId); + const fromThread = resolveThreadWorkspaceCwd({ + thread: input.thread, + projects: input.projects, + }); - if (thread.checkpoints.some((checkpoint) => checkpoint.turnId === turnId)) { - return; - } + const cwd = input.preferSessionRuntime + ? Option.match(fromSession, { + onNone: () => undefined, + onSome: (runtime) => runtime.cwd, + }) ?? fromThread + : fromThread ?? + Option.match(fromSession, { + onNone: () => undefined, + onSome: (runtime) => runtime.cwd, + }); - const sessionRuntime = yield* resolveSessionRuntimeForThread(thread.id); - const checkpointCwd = - Option.match(sessionRuntime, { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }) ?? - resolveThreadWorkspaceCwd({ - thread, - projects: readModel.projects, - }); - if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint capture skipped: no active provider session cwd", { - threadId: thread.id, - turnId, - }); - return; + if (!cwd) { + return undefined; } - if (!isGitWorkspace(checkpointCwd)) { - yield* Effect.logDebug("checkpoint capture skipped for non-git workspace", { - threadId: thread.id, - turnId, - cwd: checkpointCwd, - }); - return; + if (!isGitWorkspace(cwd)) { + return undefined; } + return cwd; + }); - const currentTurnCount = thread.checkpoints.reduce( - (maxTurnCount, checkpoint) => Math.max(maxTurnCount, checkpoint.checkpointTurnCount), - 0, - ); - const nextTurnCount = currentTurnCount + 1; - const fromTurnCount = Math.max(0, nextTurnCount - 1); - const fromCheckpointRef = checkpointRefForThreadTurn(thread.id, fromTurnCount); - const targetCheckpointRef = checkpointRefForThreadTurn(thread.id, nextTurnCount); + // Shared tail for both capture paths: creates the git checkpoint ref, diffs + // it against the previous turn, then dispatches the domain events to update + // the orchestration read model. + const captureAndDispatchCheckpoint = Effect.fnUntraced(function* (input: { + readonly threadId: ThreadId; + readonly turnId: TurnId; + readonly thread: { readonly messages: ReadonlyArray<{ readonly id: MessageId; readonly role: string; readonly turnId: TurnId | null }> }; + readonly cwd: string; + readonly turnCount: number; + readonly status: "ready" | "missing" | "error"; + readonly assistantMessageId: MessageId | undefined; + readonly createdAt: string; + }) { + const fromTurnCount = Math.max(0, input.turnCount - 1); + const fromCheckpointRef = checkpointRefForThreadTurn(input.threadId, fromTurnCount); + const targetCheckpointRef = checkpointRefForThreadTurn(input.threadId, input.turnCount); const fromCheckpointExists = yield* checkpointStore.hasCheckpointRef({ - cwd: checkpointCwd, + cwd: input.cwd, checkpointRef: fromCheckpointRef, }); if (!fromCheckpointExists) { - yield* Effect.logWarning("checkpoint completion missing pre-turn baseline", { - threadId: thread.id, - turnId, + yield* Effect.logWarning("checkpoint capture missing pre-turn baseline", { + threadId: input.threadId, + turnId: input.turnId, fromTurnCount, }); } yield* checkpointStore.captureCheckpoint({ - cwd: checkpointCwd, + cwd: input.cwd, checkpointRef: targetCheckpointRef, }); const files = yield* checkpointStore .diffCheckpoints({ - cwd: checkpointCwd, + cwd: input.cwd, fromCheckpointRef, toCheckpointRef: targetCheckpointRef, fallbackFromToHead: false, @@ -240,60 +236,183 @@ const make = Effect.gen(function* () { ), Effect.tapError((error) => appendCaptureFailureActivity({ - threadId: thread.id, - turnId, + threadId: input.threadId, + turnId: input.turnId, detail: `Checkpoint captured, but turn diff summary is unavailable: ${error.message}`, - createdAt: event.createdAt, + createdAt: input.createdAt, }), ), Effect.catch((error) => Effect.logWarning("failed to derive checkpoint file summary", { - threadId: thread.id, - turnId, - turnCount: nextTurnCount, + threadId: input.threadId, + turnId: input.turnId, + turnCount: input.turnCount, detail: error.message, }).pipe(Effect.as([])), ), ); const assistantMessageId = - thread.messages + input.assistantMessageId ?? + input.thread.messages .toReversed() - .find((entry) => entry.role === "assistant" && entry.turnId === turnId)?.id ?? - MessageId.makeUnsafe(`assistant:${turnId}`); + .find((entry) => entry.role === "assistant" && entry.turnId === input.turnId)?.id ?? + MessageId.makeUnsafe(`assistant:${input.turnId}`); - const now = event.createdAt; yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: serverCommandId("checkpoint-turn-diff-complete"), - threadId: thread.id, - turnId, - completedAt: now, + threadId: input.threadId, + turnId: input.turnId, + completedAt: input.createdAt, checkpointRef: targetCheckpointRef, - status: checkpointStatusFromRuntime(event.payload.state), + status: input.status, files, assistantMessageId, - checkpointTurnCount: nextTurnCount, - createdAt: now, + checkpointTurnCount: input.turnCount, + createdAt: input.createdAt, }); yield* orchestrationEngine.dispatch({ type: "thread.activity.append", commandId: serverCommandId("checkpoint-captured-activity"), - threadId: thread.id, + threadId: input.threadId, activity: { id: EventId.makeUnsafe(crypto.randomUUID()), tone: "info", kind: "checkpoint.captured", summary: "Checkpoint captured", payload: { - turnCount: nextTurnCount, - status: event.payload.state, + turnCount: input.turnCount, + status: input.status, }, - turnId, - createdAt: now, + turnId: input.turnId, + createdAt: input.createdAt, }, - createdAt: now, + createdAt: input.createdAt, + }); + }); + + // Captures a real git checkpoint when a turn completes via a runtime event. + const captureCheckpointFromTurnCompletion = Effect.fnUntraced(function* ( + event: Extract, + ) { + const turnId = toTurnId(event.turnId); + if (!turnId) { + return; + } + + const readModel = yield* orchestrationEngine.getReadModel(); + const thread = readModel.threads.find((entry) => entry.id === event.threadId); + if (!thread) { + return; + } + + // When a primary turn is active, only that turn may produce completion checkpoints. + if (thread.session?.activeTurnId && !sameId(thread.session.activeTurnId, turnId)) { + return; + } + + // Only skip if a real (non-placeholder) checkpoint already exists for this turn. + // ProviderRuntimeIngestion may insert placeholder entries with status "missing" + // before this reactor runs; those must not prevent real git capture. + if ( + thread.checkpoints.some( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status !== "missing", + ) + ) { + return; + } + + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId: thread.id, + thread, + projects: readModel.projects, + preferSessionRuntime: true, + }); + if (!checkpointCwd) { + return; + } + + // If a placeholder checkpoint exists for this turn, reuse its turn count + // instead of incrementing past it. + const existingPlaceholder = thread.checkpoints.find( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status === "missing", + ); + const currentTurnCount = thread.checkpoints.reduce( + (maxTurnCount, checkpoint) => Math.max(maxTurnCount, checkpoint.checkpointTurnCount), + 0, + ); + const nextTurnCount = existingPlaceholder + ? existingPlaceholder.checkpointTurnCount + : currentTurnCount + 1; + + yield* captureAndDispatchCheckpoint({ + threadId: thread.id, + turnId, + thread, + cwd: checkpointCwd, + turnCount: nextTurnCount, + status: checkpointStatusFromRuntime(event.payload.state), + assistantMessageId: undefined, + createdAt: event.createdAt, + }); + }); + + // Captures a real git checkpoint when a placeholder checkpoint (status "missing") + // is detected via a domain event. This replaces the placeholder with a real + // git-ref-based checkpoint. + // + // ProviderRuntimeIngestion creates placeholder checkpoints on turn.diff.updated + // events from the Codex runtime. This handler fires when the corresponding + // domain event arrives, allowing the reactor to capture the actual filesystem + // state into a git ref and dispatch a replacement checkpoint. + const captureCheckpointFromPlaceholder = Effect.fnUntraced(function* ( + event: Extract, + ) { + const { threadId, turnId, checkpointTurnCount, status } = event.payload; + + // Only replace placeholders; skip events from our own real captures. + if (status !== "missing") { + return; + } + + const readModel = yield* orchestrationEngine.getReadModel(); + const thread = readModel.threads.find((entry) => entry.id === threadId); + if (!thread) { + yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", { threadId }); + return; + } + + // If a real checkpoint already exists for this turn, skip. + if ( + thread.checkpoints.some( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status !== "missing", + ) + ) { + yield* Effect.logDebug("checkpoint capture from placeholder skipped: real checkpoint already exists", { threadId, turnId }); + return; + } + + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId, + thread, + projects: readModel.projects, + preferSessionRuntime: true, + }); + if (!checkpointCwd) { + return; + } + + yield* captureAndDispatchCheckpoint({ + threadId, + turnId, + thread, + cwd: checkpointCwd, + turnCount: checkpointTurnCount, + status: "ready", + assistantMessageId: event.payload.assistantMessageId ?? undefined, + createdAt: event.payload.completedAt, }); }); @@ -313,24 +432,13 @@ const make = Effect.gen(function* () { return; } - const checkpointCwdFromThreadOrProject = resolveThreadWorkspaceCwd({ + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId: thread.id, thread, projects: readModel.projects, + preferSessionRuntime: false, }); - const checkpointCwd = - checkpointCwdFromThreadOrProject ?? - Option.match(yield* resolveSessionRuntimeForThread(thread.id), { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }); if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint pre-turn capture skipped: no workspace cwd", { - threadId: thread.id, - turnId, - }); - return; - } - if (!isGitWorkspace(checkpointCwd)) { return; } @@ -376,23 +484,13 @@ const make = Effect.gen(function* () { return; } - const checkpointCwdFromThreadOrProject = resolveThreadWorkspaceCwd({ + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId, thread, projects: readModel.projects, + preferSessionRuntime: false, }); - const checkpointCwd = - checkpointCwdFromThreadOrProject ?? - Option.match(yield* resolveSessionRuntimeForThread(threadId), { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }); if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint pre-turn capture skipped: no workspace cwd", { - threadId, - }); - return; - } - if (!isGitWorkspace(checkpointCwd)) { return; } @@ -556,6 +654,25 @@ const make = Effect.gen(function* () { }), ), ); + return; + } + + // When ProviderRuntimeIngestion creates a placeholder checkpoint (status "missing") + // from a turn.diff.updated runtime event, capture the real git checkpoint to + // replace it. The providerService.streamEvents PubSub does not reliably deliver + // turn.completed runtime events to this reactor (shared subscription), so + // reacting to the domain event is the reliable path. + if (event.type === "thread.turn-diff-completed") { + yield* captureCheckpointFromPlaceholder(event).pipe( + Effect.catch((error) => + appendCaptureFailureActivity({ + threadId: event.payload.threadId, + turnId: event.payload.turnId, + detail: error.message, + createdAt: new Date().toISOString(), + }).pipe(Effect.catch(() => Effect.void)), + ), + ); } }); @@ -613,7 +730,8 @@ const make = Effect.gen(function* () { if ( event.type !== "thread.turn-start-requested" && event.type !== "thread.message-sent" && - event.type !== "thread.checkpoint-revert-requested" + event.type !== "thread.checkpoint-revert-requested" && + event.type !== "thread.turn-diff-completed" ) { return Effect.void; } diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 0895cd99c9..168ab5f013 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -229,14 +229,16 @@ const makeOrchestrationEngine = Effect.gen(function* () { return yield* Deferred.await(result); }); - const streamDomainEvents: OrchestrationEngineShape["streamDomainEvents"] = - Stream.fromPubSub(eventPubSub); - return { getReadModel, readEvents, dispatch, - streamDomainEvents, + // Each access creates a fresh PubSub subscription so that multiple + // consumers (wsServer, ProviderRuntimeIngestion, CheckpointReactor, etc.) + // each independently receive all domain events. + get streamDomainEvents(): OrchestrationEngineShape["streamDomainEvents"] { + return Stream.fromPubSub(eventPubSub); + }, } satisfies OrchestrationEngineShape; }); diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 05a1de1495..08a61f435e 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -537,7 +537,12 @@ const makeProviderService = (options?: ProviderServiceLiveOptions) => listSessions, getCapabilities, rollbackConversation, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + // Each access creates a fresh PubSub subscription so that multiple + // consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each + // independently receive all runtime events. + get streamEvents(): ProviderServiceShape["streamEvents"] { + return Stream.fromPubSub(runtimeEventPubSub); + }, } satisfies ProviderServiceShape; }); From 1ef7d20e060c43fde73037bcdba3fac8182797d9 Mon Sep 17 00:00:00 2001 From: eggfriedrice Date: Tue, 10 Mar 2026 23:15:52 +0000 Subject: [PATCH 2/3] style: fix formatting --- .../orchestration/Layers/CheckpointReactor.ts | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index 872cd7adec..ab38c10332 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -165,15 +165,15 @@ const make = Effect.gen(function* () { }); const cwd = input.preferSessionRuntime - ? Option.match(fromSession, { + ? (Option.match(fromSession, { onNone: () => undefined, onSome: (runtime) => runtime.cwd, - }) ?? fromThread - : fromThread ?? + }) ?? fromThread) + : (fromThread ?? Option.match(fromSession, { onNone: () => undefined, onSome: (runtime) => runtime.cwd, - }); + })); if (!cwd) { return undefined; @@ -190,7 +190,13 @@ const make = Effect.gen(function* () { const captureAndDispatchCheckpoint = Effect.fnUntraced(function* (input: { readonly threadId: ThreadId; readonly turnId: TurnId; - readonly thread: { readonly messages: ReadonlyArray<{ readonly id: MessageId; readonly role: string; readonly turnId: TurnId | null }> }; + readonly thread: { + readonly messages: ReadonlyArray<{ + readonly id: MessageId; + readonly role: string; + readonly turnId: TurnId | null; + }>; + }; readonly cwd: string; readonly turnCount: number; readonly status: "ready" | "missing" | "error"; @@ -400,7 +406,9 @@ const make = Effect.gen(function* () { const readModel = yield* orchestrationEngine.getReadModel(); const thread = readModel.threads.find((entry) => entry.id === threadId); if (!thread) { - yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", { threadId }); + yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", { + threadId, + }); return; } @@ -410,7 +418,10 @@ const make = Effect.gen(function* () { (checkpoint) => checkpoint.turnId === turnId && checkpoint.status !== "missing", ) ) { - yield* Effect.logDebug("checkpoint capture from placeholder skipped: real checkpoint already exists", { threadId, turnId }); + yield* Effect.logDebug( + "checkpoint capture from placeholder skipped: real checkpoint already exists", + { threadId, turnId }, + ); return; } From 420c3fab729892a6c56d2d7c3b88762cb39a6e70 Mon Sep 17 00:00:00 2001 From: eggfriedrice Date: Tue, 10 Mar 2026 23:31:56 +0000 Subject: [PATCH 3/3] fix: prevent placeholders from overwriting real checkpoints ProviderRuntimeIngestion fires multiple turn.diff.updated events per turn. Each dispatches a placeholder checkpoint that the projector would blindly replace by turnId, clobbering the real git capture from CheckpointReactor. Also, using thread.checkpoints.length + 1 for checkpointTurnCount produced unstable values after dedup. Two fixes: 1. Projector: skip incoming "missing" checkpoints when a non-placeholder entry already exists for that turnId 2. ProviderRuntimeIngestion: skip placeholder dispatch entirely if a checkpoint already exists for the turnId, and use max turnCount + 1 instead of length + 1 --- .../Layers/ProviderRuntimeIngestion.ts | 44 ++++++++++++------- apps/server/src/orchestration/projector.ts | 10 +++++ 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 70ce978d54..0dd10dcb7c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1064,22 +1064,34 @@ const make = Effect.gen(function* () { if (event.type === "turn.diff.updated") { const turnId = toTurnId(event.turnId); if (turnId && (yield* isGitRepoForThread(thread.id))) { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); - yield* orchestrationEngine.dispatch({ - type: "thread.turn.diff.complete", - commandId: providerCommandId(event, "thread-turn-diff-complete"), - threadId: thread.id, - turnId, - completedAt: now, - checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), - status: "missing", - files: [], - assistantMessageId, - checkpointTurnCount: thread.checkpoints.length + 1, - createdAt: now, - }); + // Skip if a checkpoint already exists for this turn. A real + // (non-placeholder) capture from CheckpointReactor should not + // be clobbered, and dispatching a duplicate placeholder for the + // same turnId would produce an unstable checkpointTurnCount. + if (thread.checkpoints.some((c) => c.turnId === turnId)) { + // Already tracked; no-op. + } else { + const assistantMessageId = MessageId.makeUnsafe( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); + const maxTurnCount = thread.checkpoints.reduce( + (max, c) => Math.max(max, c.checkpointTurnCount), + 0, + ); + yield* orchestrationEngine.dispatch({ + type: "thread.turn.diff.complete", + commandId: providerCommandId(event, "thread-turn-diff-complete"), + threadId: thread.id, + turnId, + completedAt: now, + checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), + status: "missing", + files: [], + assistantMessageId, + checkpointTurnCount: maxTurnCount + 1, + createdAt: now, + }); + } } } diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 60575d30ae..015f82a677 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -500,6 +500,16 @@ export function projectEvent( "checkpoint", ); + // Do not let a placeholder (status "missing") overwrite a checkpoint + // that has already been captured with a real git ref (status "ready"). + // ProviderRuntimeIngestion may fire multiple turn.diff.updated events + // per turn; without this guard later placeholders would clobber the + // real capture dispatched by CheckpointReactor. + const existing = thread.checkpoints.find((entry) => entry.turnId === checkpoint.turnId); + if (existing && existing.status !== "missing" && checkpoint.status === "missing") { + return nextBase; + } + const checkpoints = [ ...thread.checkpoints.filter((entry) => entry.turnId !== checkpoint.turnId), checkpoint,