Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 32 additions & 73 deletions apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,38 @@
import {
CheckpointRef,
DEFAULT_PROVIDER_INTERACTION_MODE,
ProjectId,
ThreadId,
TurnId,
type OrchestrationReadModel,
} from "@t3tools/contracts";
import { Effect, Layer } from "effect";
import { CheckpointRef, ProjectId, ThreadId, TurnId } from "@t3tools/contracts";
import { Effect, Layer, Option } from "effect";
import { describe, expect, it } from "vitest";

import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts";
import {
ProjectionSnapshotQuery,
type ProjectionThreadCheckpointContext,
} from "../../orchestration/Services/ProjectionSnapshotQuery.ts";
import { checkpointRefForThreadTurn } from "../Utils.ts";
import { CheckpointDiffQueryLive } from "./CheckpointDiffQuery.ts";
import { CheckpointStore, type CheckpointStoreShape } from "../Services/CheckpointStore.ts";
import { CheckpointDiffQuery } from "../Services/CheckpointDiffQuery.ts";

function makeSnapshot(input: {
function makeThreadCheckpointContext(input: {
readonly projectId: ProjectId;
readonly threadId: ThreadId;
readonly workspaceRoot: string;
readonly worktreePath: string | null;
readonly checkpointTurnCount: number;
readonly checkpointRef: CheckpointRef;
}): OrchestrationReadModel {
}): ProjectionThreadCheckpointContext {
return {
snapshotSequence: 0,
updatedAt: "2026-01-01T00:00:00.000Z",
projects: [
{
id: input.projectId,
title: "Project",
workspaceRoot: input.workspaceRoot,
defaultModelSelection: null,
scripts: [],
createdAt: "2026-01-01T00:00:00.000Z",
updatedAt: "2026-01-01T00:00:00.000Z",
deletedAt: null,
},
],
threads: [
threadId: input.threadId,
projectId: input.projectId,
workspaceRoot: input.workspaceRoot,
worktreePath: input.worktreePath,
checkpoints: [
{
id: input.threadId,
projectId: input.projectId,
title: "Thread",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
branch: null,
worktreePath: input.worktreePath,
latestTurn: {
turnId: TurnId.makeUnsafe("turn-1"),
state: "completed",
requestedAt: "2026-01-01T00:00:00.000Z",
startedAt: "2026-01-01T00:00:00.000Z",
completedAt: "2026-01-01T00:00:00.000Z",
assistantMessageId: null,
},
createdAt: "2026-01-01T00:00:00.000Z",
updatedAt: "2026-01-01T00:00:00.000Z",
archivedAt: null,
deletedAt: null,
messages: [],
activities: [],
proposedPlans: [],
checkpoints: [
{
turnId: TurnId.makeUnsafe("turn-1"),
checkpointTurnCount: input.checkpointTurnCount,
checkpointRef: input.checkpointRef,
status: "ready",
files: [],
assistantMessageId: null,
completedAt: "2026-01-01T00:00:00.000Z",
},
],
session: null,
turnId: TurnId.makeUnsafe("turn-1"),
checkpointTurnCount: input.checkpointTurnCount,
checkpointRef: input.checkpointRef,
status: "ready",
files: [],
assistantMessageId: null,
completedAt: "2026-01-01T00:00:00.000Z",
},
],
};
Expand All @@ -95,7 +50,7 @@ describe("CheckpointDiffQueryLive", () => {
readonly cwd: string;
}> = [];

const snapshot = makeSnapshot({
const threadCheckpointContext = makeThreadCheckpointContext({
projectId,
threadId,
workspaceRoot: "/tmp/workspace",
Expand Down Expand Up @@ -125,7 +80,12 @@ describe("CheckpointDiffQueryLive", () => {
Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)),
Layer.provideMerge(
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () => Effect.succeed(snapshot),
getSnapshot: () =>
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.some(threadCheckpointContext)),
}),
),
);
Expand Down Expand Up @@ -175,12 +135,11 @@ describe("CheckpointDiffQueryLive", () => {
Layer.provideMerge(
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () =>
Effect.succeed({
snapshotSequence: 0,
projects: [],
threads: [],
updatedAt: "2026-01-01T00:00:00.000Z",
} satisfies OrchestrationReadModel),
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.none()),
}),
),
);
Expand Down
22 changes: 10 additions & 12 deletions apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import {
type OrchestrationGetFullThreadDiffResult,
type OrchestrationGetTurnDiffResult as OrchestrationGetTurnDiffResultType,
} from "@t3tools/contracts";
import { Effect, Layer, Schema } from "effect";
import { Effect, Layer, Option, Schema } from "effect";

import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts";
import { CheckpointInvariantError, CheckpointUnavailableError } from "../Errors.ts";
import { checkpointRefForThreadTurn, resolveThreadWorkspaceCwd } from "../Utils.ts";
import { checkpointRefForThreadTurn } from "../Utils.ts";
import { CheckpointStore } from "../Services/CheckpointStore.ts";
import {
CheckpointDiffQuery,
Expand Down Expand Up @@ -41,16 +41,17 @@ const make = Effect.gen(function* () {
return emptyDiff;
}

const snapshot = yield* projectionSnapshotQuery.getSnapshot();
const thread = snapshot.threads.find((entry) => entry.id === input.threadId);
if (!thread) {
const threadContext = yield* projectionSnapshotQuery.getThreadCheckpointContext(
input.threadId,
);
if (Option.isNone(threadContext)) {
return yield* new CheckpointInvariantError({
operation,
detail: `Thread '${input.threadId}' not found.`,
});
}

const maxTurnCount = thread.checkpoints.reduce(
const maxTurnCount = threadContext.value.checkpoints.reduce(
(max, checkpoint) => Math.max(max, checkpoint.checkpointTurnCount),
0,
);
Expand All @@ -62,10 +63,7 @@ const make = Effect.gen(function* () {
});
}

const workspaceCwd = resolveThreadWorkspaceCwd({
thread,
projects: snapshot.projects,
});
const workspaceCwd = threadContext.value.worktreePath ?? threadContext.value.workspaceRoot;
if (!workspaceCwd) {
return yield* new CheckpointInvariantError({
operation,
Expand All @@ -76,7 +74,7 @@ const make = Effect.gen(function* () {
const fromCheckpointRef =
input.fromTurnCount === 0
? checkpointRefForThreadTurn(input.threadId, 0)
: thread.checkpoints.find(
: threadContext.value.checkpoints.find(
(checkpoint) => checkpoint.checkpointTurnCount === input.fromTurnCount,
)?.checkpointRef;
if (!fromCheckpointRef) {
Expand All @@ -87,7 +85,7 @@ const make = Effect.gen(function* () {
});
}

const toCheckpointRef = thread.checkpoints.find(
const toCheckpointRef = threadContext.value.checkpoints.find(
(checkpoint) => checkpoint.checkpointTurnCount === input.toTurnCount,
)?.checkpointRef;
if (!toCheckpointRef) {
Expand Down
Loading
Loading