Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions apps/server/integration/OrchestrationEngineHarness.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { CheckpointStoreLive } from "../src/checkpointing/Layers/CheckpointStore
import { CheckpointStore } from "../src/checkpointing/Services/CheckpointStore.ts";
import { GitCoreLive } from "../src/git/Layers/GitCore.ts";
import { GitCore, type GitCoreShape } from "../src/git/Services/GitCore.ts";
import { GitStatusBroadcaster } from "../src/git/Services/GitStatusBroadcaster.ts";
import { TextGeneration, type TextGenerationShape } from "../src/git/Services/TextGeneration.ts";
import { OrchestrationCommandReceiptRepositoryLive } from "../src/persistence/Layers/OrchestrationCommandReceipts.ts";
import { OrchestrationEventStoreLive } from "../src/persistence/Layers/OrchestrationEventStore.ts";
Expand Down Expand Up @@ -321,6 +322,22 @@ export const makeOrchestrationIntegrationHarness = (
);
const checkpointReactorLayer = CheckpointReactorLive.pipe(
Layer.provideMerge(runtimeServicesLayer),
Layer.provideMerge(
Layer.succeed(GitStatusBroadcaster, {
getStatus: () => Effect.die("getStatus should not be called in this test"),
refreshLocalStatus: () =>
Effect.succeed({
isRepo: true,
hasOriginRemote: false,
isDefaultBranch: true,
branch: "main",
hasWorkingTreeChanges: false,
workingTree: { files: [], insertions: 0, deletions: 0 },
}),
refreshStatus: () => Effect.die("refreshStatus should not be called in this test"),
streamStatus: () => Stream.empty,
}),
),
Layer.provideMerge(
WorkspaceEntriesLive.pipe(
Layer.provide(WorkspacePathsLive),
Expand Down
36 changes: 36 additions & 0 deletions apps/server/src/git/Layers/GitStatusBroadcaster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,42 @@ describe("GitStatusBroadcasterLive", () => {
}).pipe(Effect.provide(makeTestLayer(state)));
});

it.effect("refreshes only the cached local snapshot when requested", () => {
const state = {
currentLocalStatus: baseLocalStatus,
currentRemoteStatus: baseRemoteStatus,
localStatusCalls: 0,
remoteStatusCalls: 0,
localInvalidationCalls: 0,
remoteInvalidationCalls: 0,
};

return Effect.gen(function* () {
const broadcaster = yield* GitStatusBroadcaster;
const initial = yield* broadcaster.getStatus({ cwd: "/repo" });

state.currentLocalStatus = {
...baseLocalStatus,
branch: "feature/local-only-refresh",
hasWorkingTreeChanges: true,
};

const refreshedLocal = yield* broadcaster.refreshLocalStatus("/repo");
const cached = yield* broadcaster.getStatus({ cwd: "/repo" });

assert.deepStrictEqual(initial, baseStatus);
assert.deepStrictEqual(refreshedLocal, state.currentLocalStatus);
assert.deepStrictEqual(cached, {
...state.currentLocalStatus,
...baseRemoteStatus,
});
assert.equal(state.localStatusCalls, 2);
assert.equal(state.remoteStatusCalls, 1);
assert.equal(state.localInvalidationCalls, 1);
assert.equal(state.remoteInvalidationCalls, 0);
}).pipe(Effect.provide(makeTestLayer(state)));
});

it.effect("streams a local snapshot first and remote updates later", () => {
const state = {
currentLocalStatus: baseLocalStatus,
Expand Down
12 changes: 8 additions & 4 deletions apps/server/src/git/Layers/GitStatusBroadcaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ export const GitStatusBroadcasterLive = Layer.effect(
return mergeGitStatusParts(local, remote);
});

const refreshLocalStatus = Effect.fn("refreshLocalStatus")(function* (cwd: string) {
yield* gitManager.invalidateLocalStatus(cwd);
const local = yield* gitManager.localStatus({ cwd });
return yield* updateCachedLocalStatus(cwd, local, { publish: true });
const refreshLocalStatus: GitStatusBroadcasterShape["refreshLocalStatus"] = Effect.fn(
"refreshLocalStatus",
)(function* (cwd) {
const normalizedCwd = normalizeCwd(cwd);
yield* gitManager.invalidateLocalStatus(normalizedCwd);
const local = yield* gitManager.localStatus({ cwd: normalizedCwd });
return yield* updateCachedLocalStatus(normalizedCwd, local, { publish: true });
});

const refreshRemoteStatus = Effect.fn("refreshRemoteStatus")(function* (cwd: string) {
Expand Down Expand Up @@ -300,6 +303,7 @@ export const GitStatusBroadcasterLive = Layer.effect(

return {
getStatus,
refreshLocalStatus,
refreshStatus,
streamStatus,
} satisfies GitStatusBroadcasterShape;
Expand Down
4 changes: 4 additions & 0 deletions apps/server/src/git/Services/GitStatusBroadcaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Effect, Stream } from "effect";
import type {
GitManagerServiceError,
GitStatusInput,
GitStatusLocalResult,
GitStatusResult,
GitStatusStreamEvent,
} from "@t3tools/contracts";
Expand All @@ -11,6 +12,9 @@ export interface GitStatusBroadcasterShape {
readonly getStatus: (
input: GitStatusInput,
) => Effect.Effect<GitStatusResult, GitManagerServiceError>;
readonly refreshLocalStatus: (
cwd: string,
) => Effect.Effect<GitStatusLocalResult, GitManagerServiceError>;
readonly refreshStatus: (cwd: string) => Effect.Effect<GitStatusResult, GitManagerServiceError>;
readonly streamStatus: (
input: GitStatusInput,
Expand Down
43 changes: 43 additions & 0 deletions apps/server/src/orchestration/Layers/CheckpointReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import { CheckpointStoreLive } from "../../checkpointing/Layers/CheckpointStore.ts";
import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts";
import { GitCoreLive } from "../../git/Layers/GitCore.ts";
import { GitStatusBroadcaster } from "../../git/Services/GitStatusBroadcaster.ts";
import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts";
import { CheckpointReactorLive } from "./CheckpointReactor.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
Expand Down Expand Up @@ -243,6 +244,7 @@ describe("CheckpointReactor", () => {
readonly threadWorktreePath?: string | null;
readonly providerSessionCwd?: string;
readonly providerName?: ProviderKind;
readonly gitStatusRefreshCalls?: Array<string>;
}) {
const cwd = createGitRepository();
tempDirs.push(cwd);
Expand All @@ -264,11 +266,30 @@ describe("CheckpointReactor", () => {
const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), {
prefix: "t3-checkpoint-reactor-test-",
});
const gitStatusBroadcasterLayer = Layer.succeed(GitStatusBroadcaster, {
getStatus: () => Effect.die("getStatus should not be called in this test"),
refreshLocalStatus: (cwd: string) =>
Effect.sync(() => {
options?.gitStatusRefreshCalls?.push(cwd);
}).pipe(
Effect.as({
isRepo: true,
hasOriginRemote: false,
isDefaultBranch: true,
branch: "main",
hasWorkingTreeChanges: false,
workingTree: { files: [], insertions: 0, deletions: 0 },
}),
),
refreshStatus: () => Effect.die("refreshStatus should not be called in this test"),
streamStatus: () => Stream.empty,
});

const layer = CheckpointReactorLive.pipe(
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(RuntimeReceiptBusLive),
Layer.provideMerge(Layer.succeed(ProviderService, provider.service)),
Layer.provideMerge(gitStatusBroadcasterLayer),
Layer.provideMerge(CheckpointStoreLive),
Layer.provideMerge(WorkspaceEntriesLive.pipe(Layer.provide(WorkspacePathsLive))),
Layer.provideMerge(WorkspacePathsLive),
Expand Down Expand Up @@ -426,6 +447,28 @@ describe("CheckpointReactor", () => {
).toBe("v2\n");
});

it("refreshes local git status state on turn completion using the session cwd", async () => {
const gitStatusRefreshCalls: string[] = [];
const harness = await createHarness({
seedFilesystemCheckpoints: false,
gitStatusRefreshCalls,
});

harness.provider.emit({
type: "turn.completed",
eventId: EventId.makeUnsafe("evt-turn-completed-refresh-local-status"),
provider: "codex",
createdAt: new Date().toISOString(),
threadId: ThreadId.makeUnsafe("thread-1"),
turnId: asTurnId("turn-refresh-local-status"),
payload: { state: "completed" },
});

await harness.drain();

expect(gitStatusRefreshCalls).toEqual([harness.cwd]);
});

it("ignores auxiliary thread turn completion while primary turn is active", async () => {
const harness = await createHarness({ seedFilesystemCheckpoints: false });
const createdAt = new Date().toISOString();
Expand Down
23 changes: 23 additions & 0 deletions apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts";
import { CheckpointStoreError } from "../../checkpointing/Errors.ts";
import { OrchestrationDispatchError } from "../Errors.ts";
import { isGitRepository } from "../../git/Utils.ts";
import { GitStatusBroadcaster } from "../../git/Services/GitStatusBroadcaster.ts";
import { WorkspaceEntries } from "../../workspace/Services/WorkspaceEntries.ts";

type ReactorInput =
Expand Down Expand Up @@ -69,6 +70,7 @@ const make = Effect.gen(function* () {
const checkpointStore = yield* CheckpointStore;
const receiptBus = yield* RuntimeReceiptBus;
const workspaceEntries = yield* WorkspaceEntries;
const gitStatusBroadcaster = yield* GitStatusBroadcaster;

const appendRevertFailureActivity = (input: {
readonly threadId: ThreadId;
Expand Down Expand Up @@ -498,6 +500,26 @@ const make = Effect.gen(function* () {
},
);

const refreshLocalGitStatusFromTurnCompletion = Effect.fn(
"refreshLocalGitStatusFromTurnCompletion",
)(function* (event: Extract<ProviderRuntimeEvent, { type: "turn.completed" }>) {
const sessionRuntime = yield* resolveSessionRuntimeForThread(event.threadId);
if (Option.isNone(sessionRuntime)) {
return;
}

yield* gitStatusBroadcaster.refreshLocalStatus(sessionRuntime.value.cwd).pipe(
Effect.catch((error) =>
Effect.logWarning("failed to refresh local git status after turn completion", {
threadId: event.threadId,
turnId: event.turnId ?? null,
cwd: sessionRuntime.value.cwd,
detail: error.message,
}),
),
);
});

const ensurePreTurnBaselineFromDomainTurnStart = Effect.fn(
"ensurePreTurnBaselineFromDomainTurnStart",
)(function* (
Expand Down Expand Up @@ -736,6 +758,7 @@ const make = Effect.gen(function* () {

if (event.type === "turn.completed") {
const turnId = toTurnId(event.turnId);
yield* refreshLocalGitStatusFromTurnCompletion(event);
yield* captureCheckpointFromTurnCompletion(event).pipe(
Effect.catch((error) =>
appendCaptureFailureActivity({
Expand Down
Loading