From 85ed747247a9e15acac0005b11fcf3d4584be9ba Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Tue, 7 Apr 2026 12:07:24 -0700 Subject: [PATCH] Refresh local git status on turn completion - Invalidate and recache the session cwd git snapshot when a turn completes - Add broadcaster coverage for local-only refresh and reactor integration --- .../OrchestrationEngineHarness.integration.ts | 17 ++++++++ .../git/Layers/GitStatusBroadcaster.test.ts | 36 ++++++++++++++++ .../src/git/Layers/GitStatusBroadcaster.ts | 12 ++++-- .../src/git/Services/GitStatusBroadcaster.ts | 4 ++ .../Layers/CheckpointReactor.test.ts | 43 +++++++++++++++++++ .../orchestration/Layers/CheckpointReactor.ts | 23 ++++++++++ 6 files changed, 131 insertions(+), 4 deletions(-) diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 87c81f08c8..589dab6537 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -26,6 +26,7 @@ import { CheckpointStoreLive } from "../src/checkpointing/Layers/CheckpointStore import { CheckpointStore } from "../src/checkpointing/Services/CheckpointStore.ts"; import { GitCoreLive } from "../src/git/Layers/GitCore.ts"; import { GitCore, type GitCoreShape } from "../src/git/Services/GitCore.ts"; +import { GitStatusBroadcaster } from "../src/git/Services/GitStatusBroadcaster.ts"; import { TextGeneration, type TextGenerationShape } from "../src/git/Services/TextGeneration.ts"; import { OrchestrationCommandReceiptRepositoryLive } from "../src/persistence/Layers/OrchestrationCommandReceipts.ts"; import { OrchestrationEventStoreLive } from "../src/persistence/Layers/OrchestrationEventStore.ts"; @@ -320,6 +321,22 @@ export const makeOrchestrationIntegrationHarness = ( ); const checkpointReactorLayer = CheckpointReactorLive.pipe( Layer.provideMerge(runtimeServicesLayer), + Layer.provideMerge( + Layer.succeed(GitStatusBroadcaster, { + getStatus: () => Effect.die("getStatus should not be called in this test"), + refreshLocalStatus: () => + Effect.succeed({ + isRepo: true, + hasOriginRemote: false, + isDefaultBranch: true, + branch: "main", + hasWorkingTreeChanges: false, + workingTree: { files: [], insertions: 0, deletions: 0 }, + }), + refreshStatus: () => Effect.die("refreshStatus should not be called in this test"), + streamStatus: () => Stream.empty, + }), + ), Layer.provideMerge( WorkspaceEntriesLive.pipe( Layer.provide(WorkspacePathsLive), diff --git a/apps/server/src/git/Layers/GitStatusBroadcaster.test.ts b/apps/server/src/git/Layers/GitStatusBroadcaster.test.ts index fbe418ab7d..72a0c24e27 100644 --- a/apps/server/src/git/Layers/GitStatusBroadcaster.test.ts +++ b/apps/server/src/git/Layers/GitStatusBroadcaster.test.ts @@ -143,6 +143,42 @@ describe("GitStatusBroadcasterLive", () => { }).pipe(Effect.provide(makeTestLayer(state))); }); + it.effect("refreshes only the cached local snapshot when requested", () => { + const state = { + currentLocalStatus: baseLocalStatus, + currentRemoteStatus: baseRemoteStatus, + localStatusCalls: 0, + remoteStatusCalls: 0, + localInvalidationCalls: 0, + remoteInvalidationCalls: 0, + }; + + return Effect.gen(function* () { + const broadcaster = yield* GitStatusBroadcaster; + const initial = yield* broadcaster.getStatus({ cwd: "/repo" }); + + state.currentLocalStatus = { + ...baseLocalStatus, + branch: "feature/local-only-refresh", + hasWorkingTreeChanges: true, + }; + + const refreshedLocal = yield* broadcaster.refreshLocalStatus("/repo"); + const cached = yield* broadcaster.getStatus({ cwd: "/repo" }); + + assert.deepStrictEqual(initial, baseStatus); + assert.deepStrictEqual(refreshedLocal, state.currentLocalStatus); + assert.deepStrictEqual(cached, { + ...state.currentLocalStatus, + ...baseRemoteStatus, + }); + assert.equal(state.localStatusCalls, 2); + assert.equal(state.remoteStatusCalls, 1); + assert.equal(state.localInvalidationCalls, 1); + assert.equal(state.remoteInvalidationCalls, 0); + }).pipe(Effect.provide(makeTestLayer(state))); + }); + it.effect("streams a local snapshot first and remote updates later", () => { const state = { currentLocalStatus: baseLocalStatus, diff --git a/apps/server/src/git/Layers/GitStatusBroadcaster.ts b/apps/server/src/git/Layers/GitStatusBroadcaster.ts index 78d4abf40d..3ad7d095d8 100644 --- a/apps/server/src/git/Layers/GitStatusBroadcaster.ts +++ b/apps/server/src/git/Layers/GitStatusBroadcaster.ts @@ -179,10 +179,13 @@ export const GitStatusBroadcasterLive = Layer.effect( return mergeGitStatusParts(local, remote); }); - const refreshLocalStatus = Effect.fn("refreshLocalStatus")(function* (cwd: string) { - yield* gitManager.invalidateLocalStatus(cwd); - const local = yield* gitManager.localStatus({ cwd }); - return yield* updateCachedLocalStatus(cwd, local, { publish: true }); + const refreshLocalStatus: GitStatusBroadcasterShape["refreshLocalStatus"] = Effect.fn( + "refreshLocalStatus", + )(function* (cwd) { + const normalizedCwd = normalizeCwd(cwd); + yield* gitManager.invalidateLocalStatus(normalizedCwd); + const local = yield* gitManager.localStatus({ cwd: normalizedCwd }); + return yield* updateCachedLocalStatus(normalizedCwd, local, { publish: true }); }); const refreshRemoteStatus = Effect.fn("refreshRemoteStatus")(function* (cwd: string) { @@ -300,6 +303,7 @@ export const GitStatusBroadcasterLive = Layer.effect( return { getStatus, + refreshLocalStatus, refreshStatus, streamStatus, } satisfies GitStatusBroadcasterShape; diff --git a/apps/server/src/git/Services/GitStatusBroadcaster.ts b/apps/server/src/git/Services/GitStatusBroadcaster.ts index 0f3f622d17..b898b03f56 100644 --- a/apps/server/src/git/Services/GitStatusBroadcaster.ts +++ b/apps/server/src/git/Services/GitStatusBroadcaster.ts @@ -3,6 +3,7 @@ import type { Effect, Stream } from "effect"; import type { GitManagerServiceError, GitStatusInput, + GitStatusLocalResult, GitStatusResult, GitStatusStreamEvent, } from "@t3tools/contracts"; @@ -11,6 +12,9 @@ export interface GitStatusBroadcasterShape { readonly getStatus: ( input: GitStatusInput, ) => Effect.Effect; + readonly refreshLocalStatus: ( + cwd: string, + ) => Effect.Effect; readonly refreshStatus: (cwd: string) => Effect.Effect; readonly streamStatus: ( input: GitStatusInput, diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index 72adb175f9..10195506a2 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -20,6 +20,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { CheckpointStoreLive } from "../../checkpointing/Layers/CheckpointStore.ts"; import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts"; import { GitCoreLive } from "../../git/Layers/GitCore.ts"; +import { GitStatusBroadcaster } from "../../git/Services/GitStatusBroadcaster.ts"; import { CheckpointReactorLive } from "./CheckpointReactor.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; @@ -242,6 +243,7 @@ describe("CheckpointReactor", () => { readonly threadWorktreePath?: string | null; readonly providerSessionCwd?: string; readonly providerName?: ProviderKind; + readonly gitStatusRefreshCalls?: Array; }) { const cwd = createGitRepository(); tempDirs.push(cwd); @@ -262,11 +264,30 @@ describe("CheckpointReactor", () => { const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), { prefix: "t3-checkpoint-reactor-test-", }); + const gitStatusBroadcasterLayer = Layer.succeed(GitStatusBroadcaster, { + getStatus: () => Effect.die("getStatus should not be called in this test"), + refreshLocalStatus: (cwd: string) => + Effect.sync(() => { + options?.gitStatusRefreshCalls?.push(cwd); + }).pipe( + Effect.as({ + isRepo: true, + hasOriginRemote: false, + isDefaultBranch: true, + branch: "main", + hasWorkingTreeChanges: false, + workingTree: { files: [], insertions: 0, deletions: 0 }, + }), + ), + refreshStatus: () => Effect.die("refreshStatus should not be called in this test"), + streamStatus: () => Stream.empty, + }); const layer = CheckpointReactorLive.pipe( Layer.provideMerge(orchestrationLayer), Layer.provideMerge(RuntimeReceiptBusLive), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), + Layer.provideMerge(gitStatusBroadcasterLayer), Layer.provideMerge(CheckpointStoreLive), Layer.provideMerge(WorkspaceEntriesLive.pipe(Layer.provide(WorkspacePathsLive))), Layer.provideMerge(WorkspacePathsLive), @@ -424,6 +445,28 @@ describe("CheckpointReactor", () => { ).toBe("v2\n"); }); + it("refreshes local git status state on turn completion using the session cwd", async () => { + const gitStatusRefreshCalls: string[] = []; + const harness = await createHarness({ + seedFilesystemCheckpoints: false, + gitStatusRefreshCalls, + }); + + harness.provider.emit({ + type: "turn.completed", + eventId: EventId.makeUnsafe("evt-turn-completed-refresh-local-status"), + provider: "codex", + createdAt: new Date().toISOString(), + threadId: ThreadId.makeUnsafe("thread-1"), + turnId: asTurnId("turn-refresh-local-status"), + payload: { state: "completed" }, + }); + + await harness.drain(); + + expect(gitStatusRefreshCalls).toEqual([harness.cwd]); + }); + it("ignores auxiliary thread turn completion while primary turn is active", async () => { const harness = await createHarness({ seedFilesystemCheckpoints: false }); const createdAt = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index 03abebaf3a..3ed4dacd2b 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -24,6 +24,7 @@ import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts"; import { CheckpointStoreError } from "../../checkpointing/Errors.ts"; import { OrchestrationDispatchError } from "../Errors.ts"; import { isGitRepository } from "../../git/Utils.ts"; +import { GitStatusBroadcaster } from "../../git/Services/GitStatusBroadcaster.ts"; import { WorkspaceEntries } from "../../workspace/Services/WorkspaceEntries.ts"; type ReactorInput = @@ -69,6 +70,7 @@ const make = Effect.gen(function* () { const checkpointStore = yield* CheckpointStore; const receiptBus = yield* RuntimeReceiptBus; const workspaceEntries = yield* WorkspaceEntries; + const gitStatusBroadcaster = yield* GitStatusBroadcaster; const appendRevertFailureActivity = (input: { readonly threadId: ThreadId; @@ -498,6 +500,26 @@ const make = Effect.gen(function* () { }, ); + const refreshLocalGitStatusFromTurnCompletion = Effect.fn( + "refreshLocalGitStatusFromTurnCompletion", + )(function* (event: Extract) { + const sessionRuntime = yield* resolveSessionRuntimeForThread(event.threadId); + if (Option.isNone(sessionRuntime)) { + return; + } + + yield* gitStatusBroadcaster.refreshLocalStatus(sessionRuntime.value.cwd).pipe( + Effect.catch((error) => + Effect.logWarning("failed to refresh local git status after turn completion", { + threadId: event.threadId, + turnId: event.turnId ?? null, + cwd: sessionRuntime.value.cwd, + detail: error.message, + }), + ), + ); + }); + const ensurePreTurnBaselineFromDomainTurnStart = Effect.fn( "ensurePreTurnBaselineFromDomainTurnStart", )(function* ( @@ -736,6 +758,7 @@ const make = Effect.gen(function* () { if (event.type === "turn.completed") { const turnId = toTurnId(event.turnId); + yield* refreshLocalGitStatusFromTurnCompletion(event); yield* captureCheckpointFromTurnCompletion(event).pipe( Effect.catch((error) => appendCaptureFailureActivity({