diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index 199376a980..ab38c10332 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -2,6 +2,7 @@ import { CommandId, EventId, MessageId, + type ProjectId, ThreadId, TurnId, type OrchestrationEvent, @@ -147,88 +148,89 @@ const make = Effect.gen(function* () { const isGitWorkspace = (cwd: string) => isGitRepository(cwd); - const captureCheckpointFromTurnCompletion = Effect.fnUntraced(function* ( - event: Extract, - ) { - const turnId = toTurnId(event.turnId); - if (!turnId) { - return; - } - - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); - if (!thread) { - return; - } - - // When a primary turn is active, only that turn may produce completion checkpoints. - if (thread.session?.activeTurnId && !sameId(thread.session.activeTurnId, turnId)) { - return; - } - - if (thread.checkpoints.some((checkpoint) => checkpoint.turnId === turnId)) { - return; - } + // Resolves the workspace CWD for checkpoint operations, preferring the + // active provider session CWD and falling back to the thread/project config. + // Returns undefined when no CWD can be determined or the workspace is not + // a git repository. + const resolveCheckpointCwd = Effect.fnUntraced(function* (input: { + readonly threadId: ThreadId; + readonly thread: { readonly projectId: ProjectId; readonly worktreePath: string | null }; + readonly projects: ReadonlyArray<{ readonly id: ProjectId; readonly workspaceRoot: string }>; + readonly preferSessionRuntime: boolean; + }): Effect.fn.Return { + const fromSession = yield* resolveSessionRuntimeForThread(input.threadId); + const fromThread = resolveThreadWorkspaceCwd({ + thread: input.thread, + projects: input.projects, + }); - const sessionRuntime = yield* resolveSessionRuntimeForThread(thread.id); - const checkpointCwd = - Option.match(sessionRuntime, { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }) ?? - resolveThreadWorkspaceCwd({ - thread, - projects: readModel.projects, - }); - if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint capture skipped: no active provider session cwd", { - threadId: thread.id, - turnId, - }); - return; + const cwd = input.preferSessionRuntime + ? (Option.match(fromSession, { + onNone: () => undefined, + onSome: (runtime) => runtime.cwd, + }) ?? fromThread) + : (fromThread ?? + Option.match(fromSession, { + onNone: () => undefined, + onSome: (runtime) => runtime.cwd, + })); + + if (!cwd) { + return undefined; } - if (!isGitWorkspace(checkpointCwd)) { - yield* Effect.logDebug("checkpoint capture skipped for non-git workspace", { - threadId: thread.id, - turnId, - cwd: checkpointCwd, - }); - return; + if (!isGitWorkspace(cwd)) { + return undefined; } + return cwd; + }); - const currentTurnCount = thread.checkpoints.reduce( - (maxTurnCount, checkpoint) => Math.max(maxTurnCount, checkpoint.checkpointTurnCount), - 0, - ); - const nextTurnCount = currentTurnCount + 1; - const fromTurnCount = Math.max(0, nextTurnCount - 1); - const fromCheckpointRef = checkpointRefForThreadTurn(thread.id, fromTurnCount); - const targetCheckpointRef = checkpointRefForThreadTurn(thread.id, nextTurnCount); + // Shared tail for both capture paths: creates the git checkpoint ref, diffs + // it against the previous turn, then dispatches the domain events to update + // the orchestration read model. + const captureAndDispatchCheckpoint = Effect.fnUntraced(function* (input: { + readonly threadId: ThreadId; + readonly turnId: TurnId; + readonly thread: { + readonly messages: ReadonlyArray<{ + readonly id: MessageId; + readonly role: string; + readonly turnId: TurnId | null; + }>; + }; + readonly cwd: string; + readonly turnCount: number; + readonly status: "ready" | "missing" | "error"; + readonly assistantMessageId: MessageId | undefined; + readonly createdAt: string; + }) { + const fromTurnCount = Math.max(0, input.turnCount - 1); + const fromCheckpointRef = checkpointRefForThreadTurn(input.threadId, fromTurnCount); + const targetCheckpointRef = checkpointRefForThreadTurn(input.threadId, input.turnCount); const fromCheckpointExists = yield* checkpointStore.hasCheckpointRef({ - cwd: checkpointCwd, + cwd: input.cwd, checkpointRef: fromCheckpointRef, }); if (!fromCheckpointExists) { - yield* Effect.logWarning("checkpoint completion missing pre-turn baseline", { - threadId: thread.id, - turnId, + yield* Effect.logWarning("checkpoint capture missing pre-turn baseline", { + threadId: input.threadId, + turnId: input.turnId, fromTurnCount, }); } yield* checkpointStore.captureCheckpoint({ - cwd: checkpointCwd, + cwd: input.cwd, checkpointRef: targetCheckpointRef, }); // Invalidate the workspace entry cache so the @-mention file picker // reflects files created or deleted during this turn. - clearWorkspaceIndexCache(checkpointCwd); + clearWorkspaceIndexCache(input.cwd); const files = yield* checkpointStore .diffCheckpoints({ - cwd: checkpointCwd, + cwd: input.cwd, fromCheckpointRef, toCheckpointRef: targetCheckpointRef, fallbackFromToHead: false, @@ -244,81 +246,82 @@ const make = Effect.gen(function* () { ), Effect.tapError((error) => appendCaptureFailureActivity({ - threadId: thread.id, - turnId, + threadId: input.threadId, + turnId: input.turnId, detail: `Checkpoint captured, but turn diff summary is unavailable: ${error.message}`, - createdAt: event.createdAt, + createdAt: input.createdAt, }), ), Effect.catch((error) => Effect.logWarning("failed to derive checkpoint file summary", { - threadId: thread.id, - turnId, - turnCount: nextTurnCount, + threadId: input.threadId, + turnId: input.turnId, + turnCount: input.turnCount, detail: error.message, }).pipe(Effect.as([])), ), ); const assistantMessageId = - thread.messages + input.assistantMessageId ?? + input.thread.messages .toReversed() - .find((entry) => entry.role === "assistant" && entry.turnId === turnId)?.id ?? - MessageId.makeUnsafe(`assistant:${turnId}`); + .find((entry) => entry.role === "assistant" && entry.turnId === input.turnId)?.id ?? + MessageId.makeUnsafe(`assistant:${input.turnId}`); - const now = event.createdAt; yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", commandId: serverCommandId("checkpoint-turn-diff-complete"), - threadId: thread.id, - turnId, - completedAt: now, + threadId: input.threadId, + turnId: input.turnId, + completedAt: input.createdAt, checkpointRef: targetCheckpointRef, - status: checkpointStatusFromRuntime(event.payload.state), + status: input.status, files, assistantMessageId, - checkpointTurnCount: nextTurnCount, - createdAt: now, + checkpointTurnCount: input.turnCount, + createdAt: input.createdAt, }); yield* receiptBus.publish({ type: "checkpoint.diff.finalized", - threadId: thread.id, - turnId, - checkpointTurnCount: nextTurnCount, + threadId: input.threadId, + turnId: input.turnId, + checkpointTurnCount: input.turnCount, checkpointRef: targetCheckpointRef, - status: checkpointStatusFromRuntime(event.payload.state), - createdAt: now, + status: input.status, + createdAt: input.createdAt, }); yield* receiptBus.publish({ type: "turn.processing.quiesced", - threadId: thread.id, - turnId, - checkpointTurnCount: nextTurnCount, - createdAt: now, + threadId: input.threadId, + turnId: input.turnId, + checkpointTurnCount: input.turnCount, + createdAt: input.createdAt, }); yield* orchestrationEngine.dispatch({ type: "thread.activity.append", commandId: serverCommandId("checkpoint-captured-activity"), - threadId: thread.id, + threadId: input.threadId, activity: { id: EventId.makeUnsafe(crypto.randomUUID()), tone: "info", kind: "checkpoint.captured", summary: "Checkpoint captured", payload: { - turnCount: nextTurnCount, - status: event.payload.state, + turnCount: input.turnCount, + status: input.status, }, - turnId, - createdAt: now, + turnId: input.turnId, + createdAt: input.createdAt, }, - createdAt: now, + createdAt: input.createdAt, }); }); - const ensurePreTurnBaselineFromTurnStart = Effect.fnUntraced(function* ( - event: Extract, + // Captures a real git checkpoint when a turn completes via a runtime event. + const captureCheckpointFromTurnCompletion = Effect.fnUntraced(function* ( + event: Extract, ) { const turnId = toTurnId(event.turnId); if (!turnId) { @@ -331,24 +334,140 @@ const make = Effect.gen(function* () { return; } - const checkpointCwdFromThreadOrProject = resolveThreadWorkspaceCwd({ + // When a primary turn is active, only that turn may produce completion checkpoints. + if (thread.session?.activeTurnId && !sameId(thread.session.activeTurnId, turnId)) { + return; + } + + // Only skip if a real (non-placeholder) checkpoint already exists for this turn. + // ProviderRuntimeIngestion may insert placeholder entries with status "missing" + // before this reactor runs; those must not prevent real git capture. + if ( + thread.checkpoints.some( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status !== "missing", + ) + ) { + return; + } + + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId: thread.id, thread, projects: readModel.projects, + preferSessionRuntime: true, }); - const checkpointCwd = - checkpointCwdFromThreadOrProject ?? - Option.match(yield* resolveSessionRuntimeForThread(thread.id), { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }); if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint pre-turn capture skipped: no workspace cwd", { - threadId: thread.id, - turnId, + return; + } + + // If a placeholder checkpoint exists for this turn, reuse its turn count + // instead of incrementing past it. + const existingPlaceholder = thread.checkpoints.find( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status === "missing", + ); + const currentTurnCount = thread.checkpoints.reduce( + (maxTurnCount, checkpoint) => Math.max(maxTurnCount, checkpoint.checkpointTurnCount), + 0, + ); + const nextTurnCount = existingPlaceholder + ? existingPlaceholder.checkpointTurnCount + : currentTurnCount + 1; + + yield* captureAndDispatchCheckpoint({ + threadId: thread.id, + turnId, + thread, + cwd: checkpointCwd, + turnCount: nextTurnCount, + status: checkpointStatusFromRuntime(event.payload.state), + assistantMessageId: undefined, + createdAt: event.createdAt, + }); + }); + + // Captures a real git checkpoint when a placeholder checkpoint (status "missing") + // is detected via a domain event. This replaces the placeholder with a real + // git-ref-based checkpoint. + // + // ProviderRuntimeIngestion creates placeholder checkpoints on turn.diff.updated + // events from the Codex runtime. This handler fires when the corresponding + // domain event arrives, allowing the reactor to capture the actual filesystem + // state into a git ref and dispatch a replacement checkpoint. + const captureCheckpointFromPlaceholder = Effect.fnUntraced(function* ( + event: Extract, + ) { + const { threadId, turnId, checkpointTurnCount, status } = event.payload; + + // Only replace placeholders; skip events from our own real captures. + if (status !== "missing") { + return; + } + + const readModel = yield* orchestrationEngine.getReadModel(); + const thread = readModel.threads.find((entry) => entry.id === threadId); + if (!thread) { + yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", { + threadId, }); return; } - if (!isGitWorkspace(checkpointCwd)) { + + // If a real checkpoint already exists for this turn, skip. + if ( + thread.checkpoints.some( + (checkpoint) => checkpoint.turnId === turnId && checkpoint.status !== "missing", + ) + ) { + yield* Effect.logDebug( + "checkpoint capture from placeholder skipped: real checkpoint already exists", + { threadId, turnId }, + ); + return; + } + + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId, + thread, + projects: readModel.projects, + preferSessionRuntime: true, + }); + if (!checkpointCwd) { + return; + } + + yield* captureAndDispatchCheckpoint({ + threadId, + turnId, + thread, + cwd: checkpointCwd, + turnCount: checkpointTurnCount, + status: "ready", + assistantMessageId: event.payload.assistantMessageId ?? undefined, + createdAt: event.payload.completedAt, + }); + }); + + const ensurePreTurnBaselineFromTurnStart = Effect.fnUntraced(function* ( + event: Extract, + ) { + const turnId = toTurnId(event.turnId); + if (!turnId) { + return; + } + + const readModel = yield* orchestrationEngine.getReadModel(); + const thread = readModel.threads.find((entry) => entry.id === event.threadId); + if (!thread) { + return; + } + + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId: thread.id, + thread, + projects: readModel.projects, + preferSessionRuntime: false, + }); + if (!checkpointCwd) { return; } @@ -401,23 +520,13 @@ const make = Effect.gen(function* () { return; } - const checkpointCwdFromThreadOrProject = resolveThreadWorkspaceCwd({ + const checkpointCwd = yield* resolveCheckpointCwd({ + threadId, thread, projects: readModel.projects, + preferSessionRuntime: false, }); - const checkpointCwd = - checkpointCwdFromThreadOrProject ?? - Option.match(yield* resolveSessionRuntimeForThread(threadId), { - onNone: () => undefined, - onSome: (runtime) => runtime.cwd, - }); if (!checkpointCwd) { - yield* Effect.logWarning("checkpoint pre-turn capture skipped: no workspace cwd", { - threadId, - }); - return; - } - if (!isGitWorkspace(checkpointCwd)) { return; } @@ -592,6 +701,25 @@ const make = Effect.gen(function* () { }), ), ); + return; + } + + // When ProviderRuntimeIngestion creates a placeholder checkpoint (status "missing") + // from a turn.diff.updated runtime event, capture the real git checkpoint to + // replace it. The providerService.streamEvents PubSub does not reliably deliver + // turn.completed runtime events to this reactor (shared subscription), so + // reacting to the domain event is the reliable path. + if (event.type === "thread.turn-diff-completed") { + yield* captureCheckpointFromPlaceholder(event).pipe( + Effect.catch((error) => + appendCaptureFailureActivity({ + threadId: event.payload.threadId, + turnId: event.payload.turnId, + detail: error.message, + createdAt: new Date().toISOString(), + }).pipe(Effect.catch(() => Effect.void)), + ), + ); } }); @@ -644,7 +772,8 @@ const make = Effect.gen(function* () { if ( event.type !== "thread.turn-start-requested" && event.type !== "thread.message-sent" && - event.type !== "thread.checkpoint-revert-requested" + event.type !== "thread.checkpoint-revert-requested" && + event.type !== "thread.turn-diff-completed" ) { return Effect.void; } diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 33b49f233f..69b28b9d3c 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -222,14 +222,16 @@ const makeOrchestrationEngine = Effect.gen(function* () { return yield* Deferred.await(result); }); - const streamDomainEvents: OrchestrationEngineShape["streamDomainEvents"] = - Stream.fromPubSub(eventPubSub); - return { getReadModel, readEvents, dispatch, - streamDomainEvents, + // Each access creates a fresh PubSub subscription so that multiple + // consumers (wsServer, ProviderRuntimeIngestion, CheckpointReactor, etc.) + // each independently receive all domain events. + get streamDomainEvents(): OrchestrationEngineShape["streamDomainEvents"] { + return Stream.fromPubSub(eventPubSub); + }, } satisfies OrchestrationEngineShape; }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 70ce978d54..0dd10dcb7c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1064,22 +1064,34 @@ const make = Effect.gen(function* () { if (event.type === "turn.diff.updated") { const turnId = toTurnId(event.turnId); if (turnId && (yield* isGitRepoForThread(thread.id))) { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); - yield* orchestrationEngine.dispatch({ - type: "thread.turn.diff.complete", - commandId: providerCommandId(event, "thread-turn-diff-complete"), - threadId: thread.id, - turnId, - completedAt: now, - checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), - status: "missing", - files: [], - assistantMessageId, - checkpointTurnCount: thread.checkpoints.length + 1, - createdAt: now, - }); + // Skip if a checkpoint already exists for this turn. A real + // (non-placeholder) capture from CheckpointReactor should not + // be clobbered, and dispatching a duplicate placeholder for the + // same turnId would produce an unstable checkpointTurnCount. + if (thread.checkpoints.some((c) => c.turnId === turnId)) { + // Already tracked; no-op. + } else { + const assistantMessageId = MessageId.makeUnsafe( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); + const maxTurnCount = thread.checkpoints.reduce( + (max, c) => Math.max(max, c.checkpointTurnCount), + 0, + ); + yield* orchestrationEngine.dispatch({ + type: "thread.turn.diff.complete", + commandId: providerCommandId(event, "thread-turn-diff-complete"), + threadId: thread.id, + turnId, + completedAt: now, + checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), + status: "missing", + files: [], + assistantMessageId, + checkpointTurnCount: maxTurnCount + 1, + createdAt: now, + }); + } } } diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 60575d30ae..015f82a677 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -500,6 +500,16 @@ export function projectEvent( "checkpoint", ); + // Do not let a placeholder (status "missing") overwrite a checkpoint + // that has already been captured with a real git ref (status "ready"). + // ProviderRuntimeIngestion may fire multiple turn.diff.updated events + // per turn; without this guard later placeholders would clobber the + // real capture dispatched by CheckpointReactor. + const existing = thread.checkpoints.find((entry) => entry.turnId === checkpoint.turnId); + if (existing && existing.status !== "missing" && checkpoint.status === "missing") { + return nextBase; + } + const checkpoints = [ ...thread.checkpoints.filter((entry) => entry.turnId !== checkpoint.turnId), checkpoint, diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 4185014b26..8e3bc72041 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -537,7 +537,12 @@ const makeProviderService = (options?: ProviderServiceLiveOptions) => listSessions, getCapabilities, rollbackConversation, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + // Each access creates a fresh PubSub subscription so that multiple + // consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each + // independently receive all runtime events. + get streamEvents(): ProviderServiceShape["streamEvents"] { + return Stream.fromPubSub(runtimeEventPubSub); + }, } satisfies ProviderServiceShape; });