diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts index 9fb2500ce4..c66c529b9a 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts @@ -1,83 +1,38 @@ -import { - CheckpointRef, - DEFAULT_PROVIDER_INTERACTION_MODE, - ProjectId, - ThreadId, - TurnId, - type OrchestrationReadModel, -} from "@t3tools/contracts"; -import { Effect, Layer } from "effect"; +import { CheckpointRef, ProjectId, ThreadId, TurnId } from "@t3tools/contracts"; +import { Effect, Layer, Option } from "effect"; import { describe, expect, it } from "vitest"; -import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { + ProjectionSnapshotQuery, + type ProjectionThreadCheckpointContext, +} from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { checkpointRefForThreadTurn } from "../Utils.ts"; import { CheckpointDiffQueryLive } from "./CheckpointDiffQuery.ts"; import { CheckpointStore, type CheckpointStoreShape } from "../Services/CheckpointStore.ts"; import { CheckpointDiffQuery } from "../Services/CheckpointDiffQuery.ts"; -function makeSnapshot(input: { +function makeThreadCheckpointContext(input: { readonly projectId: ProjectId; readonly threadId: ThreadId; readonly workspaceRoot: string; readonly worktreePath: string | null; readonly checkpointTurnCount: number; readonly checkpointRef: CheckpointRef; -}): OrchestrationReadModel { +}): ProjectionThreadCheckpointContext { return { - snapshotSequence: 0, - updatedAt: "2026-01-01T00:00:00.000Z", - projects: [ - { - id: input.projectId, - title: "Project", - workspaceRoot: input.workspaceRoot, - defaultModelSelection: null, - scripts: [], - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - deletedAt: null, - }, - ], - threads: [ + threadId: input.threadId, + projectId: input.projectId, + workspaceRoot: input.workspaceRoot, + worktreePath: input.worktreePath, + checkpoints: [ { - id: input.threadId, - projectId: input.projectId, - title: "Thread", - modelSelection: { - provider: "codex", - model: "gpt-5-codex", - }, - interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, - runtimeMode: "full-access", - branch: null, - worktreePath: input.worktreePath, - latestTurn: { - turnId: TurnId.makeUnsafe("turn-1"), - state: "completed", - requestedAt: "2026-01-01T00:00:00.000Z", - startedAt: "2026-01-01T00:00:00.000Z", - completedAt: "2026-01-01T00:00:00.000Z", - assistantMessageId: null, - }, - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - archivedAt: null, - deletedAt: null, - messages: [], - activities: [], - proposedPlans: [], - checkpoints: [ - { - turnId: TurnId.makeUnsafe("turn-1"), - checkpointTurnCount: input.checkpointTurnCount, - checkpointRef: input.checkpointRef, - status: "ready", - files: [], - assistantMessageId: null, - completedAt: "2026-01-01T00:00:00.000Z", - }, - ], - session: null, + turnId: TurnId.makeUnsafe("turn-1"), + checkpointTurnCount: input.checkpointTurnCount, + checkpointRef: input.checkpointRef, + status: "ready", + files: [], + assistantMessageId: null, + completedAt: "2026-01-01T00:00:00.000Z", }, ], }; @@ -95,7 +50,7 @@ describe("CheckpointDiffQueryLive", () => { readonly cwd: string; }> = []; - const snapshot = makeSnapshot({ + const threadCheckpointContext = makeThreadCheckpointContext({ projectId, threadId, workspaceRoot: "/tmp/workspace", @@ -125,7 +80,12 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { - getSnapshot: () => Effect.succeed(snapshot), + getSnapshot: () => + Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), + getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), + getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), + getThreadCheckpointContext: () => Effect.succeed(Option.some(threadCheckpointContext)), }), ), ); @@ -175,12 +135,11 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { getSnapshot: () => - Effect.succeed({ - snapshotSequence: 0, - projects: [], - threads: [], - updatedAt: "2026-01-01T00:00:00.000Z", - } satisfies OrchestrationReadModel), + Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), + getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), + getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), + getThreadCheckpointContext: () => Effect.succeed(Option.none()), }), ), ); diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts index bbb5a42931..1c2edee469 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts @@ -4,11 +4,11 @@ import { type OrchestrationGetFullThreadDiffResult, type OrchestrationGetTurnDiffResult as OrchestrationGetTurnDiffResultType, } from "@t3tools/contracts"; -import { Effect, Layer, Schema } from "effect"; +import { Effect, Layer, Option, Schema } from "effect"; import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { CheckpointInvariantError, CheckpointUnavailableError } from "../Errors.ts"; -import { checkpointRefForThreadTurn, resolveThreadWorkspaceCwd } from "../Utils.ts"; +import { checkpointRefForThreadTurn } from "../Utils.ts"; import { CheckpointStore } from "../Services/CheckpointStore.ts"; import { CheckpointDiffQuery, @@ -41,16 +41,17 @@ const make = Effect.gen(function* () { return emptyDiff; } - const snapshot = yield* projectionSnapshotQuery.getSnapshot(); - const thread = snapshot.threads.find((entry) => entry.id === input.threadId); - if (!thread) { + const threadContext = yield* projectionSnapshotQuery.getThreadCheckpointContext( + input.threadId, + ); + if (Option.isNone(threadContext)) { return yield* new CheckpointInvariantError({ operation, detail: `Thread '${input.threadId}' not found.`, }); } - const maxTurnCount = thread.checkpoints.reduce( + const maxTurnCount = threadContext.value.checkpoints.reduce( (max, checkpoint) => Math.max(max, checkpoint.checkpointTurnCount), 0, ); @@ -62,10 +63,7 @@ const make = Effect.gen(function* () { }); } - const workspaceCwd = resolveThreadWorkspaceCwd({ - thread, - projects: snapshot.projects, - }); + const workspaceCwd = threadContext.value.worktreePath ?? threadContext.value.workspaceRoot; if (!workspaceCwd) { return yield* new CheckpointInvariantError({ operation, @@ -76,7 +74,7 @@ const make = Effect.gen(function* () { const fromCheckpointRef = input.fromTurnCount === 0 ? checkpointRefForThreadTurn(input.threadId, 0) - : thread.checkpoints.find( + : threadContext.value.checkpoints.find( (checkpoint) => checkpoint.checkpointTurnCount === input.fromTurnCount, )?.checkpointRef; if (!fromCheckpointRef) { @@ -87,7 +85,7 @@ const make = Effect.gen(function* () { }); } - const toCheckpointRef = thread.checkpoints.find( + const toCheckpointRef = threadContext.value.checkpoints.find( (checkpoint) => checkpoint.checkpointTurnCount === input.toTurnCount, )?.checkpointRef; if (!toCheckpointRef) { diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index 32143d751f..c038bc9d2c 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -338,4 +338,290 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { ]); }), ); + + it.effect( + "reads targeted project, thread, and count queries without hydrating the full snapshot", + () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_turns`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES + ( + 'project-active', + 'Active Project', + '/tmp/workspace', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-03-01T00:00:00.000Z', + '2026-03-01T00:00:01.000Z', + NULL + ), + ( + 'project-deleted', + 'Deleted Project', + '/tmp/deleted', + NULL, + '[]', + '2026-03-01T00:00:02.000Z', + '2026-03-01T00:00:03.000Z', + '2026-03-01T00:00:04.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES + ( + 'thread-first', + 'project-active', + 'First Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + NULL, + '2026-03-01T00:00:05.000Z', + '2026-03-01T00:00:06.000Z', + NULL, + NULL + ), + ( + 'thread-second', + 'project-active', + 'Second Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + NULL, + '2026-03-01T00:00:07.000Z', + '2026-03-01T00:00:08.000Z', + NULL, + NULL + ), + ( + 'thread-deleted', + 'project-active', + 'Deleted Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + NULL, + '2026-03-01T00:00:09.000Z', + '2026-03-01T00:00:10.000Z', + NULL, + '2026-03-01T00:00:11.000Z' + ) + `; + + const counts = yield* snapshotQuery.getCounts(); + assert.deepEqual(counts, { + projectCount: 2, + threadCount: 3, + }); + + const project = yield* snapshotQuery.getActiveProjectByWorkspaceRoot("/tmp/workspace"); + assert.equal(project._tag, "Some"); + if (project._tag === "Some") { + assert.equal(project.value.id, asProjectId("project-active")); + } + + const missingProject = yield* snapshotQuery.getActiveProjectByWorkspaceRoot("/tmp/missing"); + assert.equal(missingProject._tag, "None"); + + const firstThreadId = yield* snapshotQuery.getFirstActiveThreadIdByProjectId( + asProjectId("project-active"), + ); + assert.equal(firstThreadId._tag, "Some"); + if (firstThreadId._tag === "Some") { + assert.equal(firstThreadId.value, ThreadId.makeUnsafe("thread-first")); + } + }), + ); + + it.effect("reads single-thread checkpoint context without hydrating unrelated threads", () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_turns`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-context', + 'Context Project', + '/tmp/context-workspace', + NULL, + '[]', + '2026-03-02T00:00:00.000Z', + '2026-03-02T00:00:01.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES ( + 'thread-context', + 'project-context', + 'Context Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + 'feature/perf', + '/tmp/context-worktree', + NULL, + '2026-03-02T00:00:02.000Z', + '2026-03-02T00:00:03.000Z', + NULL, + NULL + ) + `; + + yield* sql` + INSERT INTO projection_turns ( + thread_id, + turn_id, + pending_message_id, + source_proposed_plan_thread_id, + source_proposed_plan_id, + assistant_message_id, + state, + requested_at, + started_at, + completed_at, + checkpoint_turn_count, + checkpoint_ref, + checkpoint_status, + checkpoint_files_json + ) + VALUES + ( + 'thread-context', + 'turn-1', + NULL, + NULL, + NULL, + NULL, + 'completed', + '2026-03-02T00:00:04.000Z', + '2026-03-02T00:00:04.000Z', + '2026-03-02T00:00:04.000Z', + 1, + 'checkpoint-a', + 'ready', + '[]' + ), + ( + 'thread-context', + 'turn-2', + NULL, + NULL, + NULL, + NULL, + 'completed', + '2026-03-02T00:00:05.000Z', + '2026-03-02T00:00:05.000Z', + '2026-03-02T00:00:05.000Z', + 2, + 'checkpoint-b', + 'ready', + '[]' + ) + `; + + const context = yield* snapshotQuery.getThreadCheckpointContext( + ThreadId.makeUnsafe("thread-context"), + ); + assert.equal(context._tag, "Some"); + if (context._tag === "Some") { + assert.deepEqual(context.value, { + threadId: ThreadId.makeUnsafe("thread-context"), + projectId: asProjectId("project-context"), + workspaceRoot: "/tmp/context-workspace", + worktreePath: "/tmp/context-worktree", + checkpoints: [ + { + turnId: asTurnId("turn-1"), + checkpointTurnCount: 1, + checkpointRef: asCheckpointRef("checkpoint-a"), + status: "ready", + files: [], + assistantMessageId: null, + completedAt: "2026-03-02T00:00:04.000Z", + }, + { + turnId: asTurnId("turn-2"), + checkpointTurnCount: 2, + checkpointRef: asCheckpointRef("checkpoint-b"), + status: "ready", + files: [], + assistantMessageId: null, + completedAt: "2026-03-02T00:00:05.000Z", + }, + ], + }); + } + }), + ); }); diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index f951c54b5b..da7c695674 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -7,7 +7,6 @@ import { OrchestrationProposedPlanId, OrchestrationReadModel, ProjectScript, - ThreadId, TurnId, type OrchestrationCheckpointSummary, type OrchestrationLatestTurn, @@ -18,8 +17,10 @@ import { type OrchestrationThread, type OrchestrationThreadActivity, ModelSelection, + ProjectId, + ThreadId, } from "@t3tools/contracts"; -import { Effect, Layer, Schema, Struct } from "effect"; +import { Effect, Layer, Option, Schema, Struct } from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; import * as SqlSchema from "effect/unstable/sql/SqlSchema"; @@ -40,6 +41,8 @@ import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.t import { ORCHESTRATION_PROJECTOR_NAMES } from "./ProjectionPipeline.ts"; import { ProjectionSnapshotQuery, + type ProjectionSnapshotCounts, + type ProjectionThreadCheckpointContext, type ProjectionSnapshotQueryShape, } from "../Services/ProjectionSnapshotQuery.ts"; @@ -86,6 +89,29 @@ const ProjectionLatestTurnDbRowSchema = Schema.Struct({ sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), }); const ProjectionStateDbRowSchema = ProjectionState; +const ProjectionCountsRowSchema = Schema.Struct({ + projectCount: Schema.Number, + threadCount: Schema.Number, +}); +const WorkspaceRootLookupInput = Schema.Struct({ + workspaceRoot: Schema.String, +}); +const ProjectIdLookupInput = Schema.Struct({ + projectId: ProjectId, +}); +const ThreadIdLookupInput = Schema.Struct({ + threadId: ThreadId, +}); +const ProjectionProjectLookupRowSchema = ProjectionProjectDbRowSchema; +const ProjectionThreadIdLookupRowSchema = Schema.Struct({ + threadId: ThreadId, +}); +const ProjectionThreadCheckpointContextThreadRowSchema = Schema.Struct({ + threadId: ThreadId, + projectId: ProjectId, + workspaceRoot: Schema.String, + worktreePath: Schema.NullOr(Schema.String), +}); const REQUIRED_SNAPSHOT_PROJECTORS = [ ORCHESTRATION_PROJECTOR_NAMES.projects, @@ -319,6 +345,94 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const readProjectionCounts = SqlSchema.findOne({ + Request: Schema.Void, + Result: ProjectionCountsRowSchema, + execute: () => + sql` + SELECT + (SELECT COUNT(*) FROM projection_projects) AS "projectCount", + (SELECT COUNT(*) FROM projection_threads) AS "threadCount" + `, + }); + + const getActiveProjectRowByWorkspaceRoot = SqlSchema.findOneOption({ + Request: WorkspaceRootLookupInput, + Result: ProjectionProjectLookupRowSchema, + execute: ({ workspaceRoot }) => + sql` + SELECT + project_id AS "projectId", + title, + workspace_root AS "workspaceRoot", + default_model_selection_json AS "defaultModelSelection", + scripts_json AS "scripts", + created_at AS "createdAt", + updated_at AS "updatedAt", + deleted_at AS "deletedAt" + FROM projection_projects + WHERE workspace_root = ${workspaceRoot} + AND deleted_at IS NULL + ORDER BY created_at ASC, project_id ASC + LIMIT 1 + `, + }); + + const getFirstActiveThreadIdByProject = SqlSchema.findOneOption({ + Request: ProjectIdLookupInput, + Result: ProjectionThreadIdLookupRowSchema, + execute: ({ projectId }) => + sql` + SELECT + thread_id AS "threadId" + FROM projection_threads + WHERE project_id = ${projectId} + AND deleted_at IS NULL + ORDER BY created_at ASC, thread_id ASC + LIMIT 1 + `, + }); + + const getThreadCheckpointContextThreadRow = SqlSchema.findOneOption({ + Request: ThreadIdLookupInput, + Result: ProjectionThreadCheckpointContextThreadRowSchema, + execute: ({ threadId }) => + sql` + SELECT + threads.thread_id AS "threadId", + threads.project_id AS "projectId", + projects.workspace_root AS "workspaceRoot", + threads.worktree_path AS "worktreePath" + FROM projection_threads AS threads + INNER JOIN projection_projects AS projects + ON projects.project_id = threads.project_id + WHERE threads.thread_id = ${threadId} + AND threads.deleted_at IS NULL + LIMIT 1 + `, + }); + + const listCheckpointRowsByThread = SqlSchema.findAll({ + Request: ThreadIdLookupInput, + Result: ProjectionCheckpointDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + checkpoint_turn_count AS "checkpointTurnCount", + checkpoint_ref AS "checkpointRef", + checkpoint_status AS "status", + checkpoint_files_json AS "files", + assistant_message_id AS "assistantMessageId", + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND checkpoint_turn_count IS NOT NULL + ORDER BY checkpoint_turn_count ASC + `, + }); + const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => sql .withTransaction( @@ -593,8 +707,109 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }), ); + const getCounts: ProjectionSnapshotQueryShape["getCounts"] = () => + readProjectionCounts(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCounts:query", + "ProjectionSnapshotQuery.getCounts:decodeRow", + ), + ), + Effect.map( + (row): ProjectionSnapshotCounts => ({ + projectCount: row.projectCount, + threadCount: row.threadCount, + }), + ), + ); + + const getActiveProjectByWorkspaceRoot: ProjectionSnapshotQueryShape["getActiveProjectByWorkspaceRoot"] = + (workspaceRoot) => + getActiveProjectRowByWorkspaceRoot({ workspaceRoot }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveProjectByWorkspaceRoot:query", + "ProjectionSnapshotQuery.getActiveProjectByWorkspaceRoot:decodeRow", + ), + ), + Effect.map( + Option.map( + (row): OrchestrationProject => ({ + id: row.projectId, + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModelSelection: row.defaultModelSelection, + scripts: row.scripts, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + }), + ), + ), + ); + + const getFirstActiveThreadIdByProjectId: ProjectionSnapshotQueryShape["getFirstActiveThreadIdByProjectId"] = + (projectId) => + getFirstActiveThreadIdByProject({ projectId }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getFirstActiveThreadIdByProjectId:query", + "ProjectionSnapshotQuery.getFirstActiveThreadIdByProjectId:decodeRow", + ), + ), + Effect.map(Option.map((row) => row.threadId)), + ); + + const getThreadCheckpointContext: ProjectionSnapshotQueryShape["getThreadCheckpointContext"] = ( + threadId, + ) => + Effect.gen(function* () { + const threadRow = yield* getThreadCheckpointContextThreadRow({ threadId }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getThreadCheckpointContext:getThread:query", + "ProjectionSnapshotQuery.getThreadCheckpointContext:getThread:decodeRow", + ), + ), + ); + if (Option.isNone(threadRow)) { + return Option.none(); + } + + const checkpointRows = yield* listCheckpointRowsByThread({ threadId }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getThreadCheckpointContext:listCheckpoints:query", + "ProjectionSnapshotQuery.getThreadCheckpointContext:listCheckpoints:decodeRows", + ), + ), + ); + + return Option.some({ + threadId: threadRow.value.threadId, + projectId: threadRow.value.projectId, + workspaceRoot: threadRow.value.workspaceRoot, + worktreePath: threadRow.value.worktreePath, + checkpoints: checkpointRows.map( + (row): OrchestrationCheckpointSummary => ({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + }), + ), + }); + }); + return { getSnapshot, + getCounts, + getActiveProjectByWorkspaceRoot, + getFirstActiveThreadIdByProjectId, + getThreadCheckpointContext, } satisfies ProjectionSnapshotQueryShape; }); diff --git a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts index 91e42f02ff..a7673dc32e 100644 --- a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts @@ -6,12 +6,32 @@ * * @module ProjectionSnapshotQuery */ -import type { OrchestrationReadModel } from "@t3tools/contracts"; +import type { + OrchestrationCheckpointSummary, + OrchestrationProject, + OrchestrationReadModel, + ProjectId, + ThreadId, +} from "@t3tools/contracts"; import { ServiceMap } from "effect"; +import type { Option } from "effect"; import type { Effect } from "effect"; import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; +export interface ProjectionSnapshotCounts { + readonly projectCount: number; + readonly threadCount: number; +} + +export interface ProjectionThreadCheckpointContext { + readonly threadId: ThreadId; + readonly projectId: ProjectId; + readonly workspaceRoot: string; + readonly worktreePath: string | null; + readonly checkpoints: ReadonlyArray; +} + /** * ProjectionSnapshotQueryShape - Service API for read-model snapshots. */ @@ -23,6 +43,32 @@ export interface ProjectionSnapshotQueryShape { * projector cursor state. */ readonly getSnapshot: () => Effect.Effect; + + /** + * Read aggregate projection counts without hydrating the full read model. + */ + readonly getCounts: () => Effect.Effect; + + /** + * Read the active project for an exact workspace root match. + */ + readonly getActiveProjectByWorkspaceRoot: ( + workspaceRoot: string, + ) => Effect.Effect, ProjectionRepositoryError>; + + /** + * Read the earliest active thread for a project. + */ + readonly getFirstActiveThreadIdByProjectId: ( + projectId: ProjectId, + ) => Effect.Effect, ProjectionRepositoryError>; + + /** + * Read the checkpoint context needed to resolve a single thread diff. + */ + readonly getThreadCheckpointContext: ( + threadId: ThreadId, + ) => Effect.Effect, ProjectionRepositoryError>; } /** diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 47102471ac..a03c3c2d18 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -31,6 +31,7 @@ import Migration0015 from "./Migrations/015_ProjectionTurnsSourceProposedPlan.ts import Migration0016 from "./Migrations/016_CanonicalizeModelSelections.ts"; import Migration0017 from "./Migrations/017_ProjectionThreadsArchivedAt.ts"; import Migration0018 from "./Migrations/018_ProjectionThreadsArchivedAtIndex.ts"; +import Migration0019 from "./Migrations/019_ProjectionSnapshotLookupIndexes.ts"; /** * Migration loader with all migrations defined inline. @@ -61,6 +62,7 @@ export const migrationEntries = [ [16, "CanonicalizeModelSelections", Migration0016], [17, "ProjectionThreadsArchivedAt", Migration0017], [18, "ProjectionThreadsArchivedAtIndex", Migration0018], + [19, "ProjectionSnapshotLookupIndexes", Migration0019], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.test.ts b/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.test.ts new file mode 100644 index 0000000000..6207a9bcb6 --- /dev/null +++ b/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.test.ts @@ -0,0 +1,73 @@ +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +layer("019_ProjectionSnapshotLookupIndexes", (it) => { + it.effect("creates indexes for targeted projection lookup filters", () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 18 }); + yield* runMigrations({ toMigrationInclusive: 19 }); + + const projectIndexes = yield* sql<{ + readonly seq: number; + readonly name: string; + readonly unique: number; + readonly origin: string; + readonly partial: number; + }>` + PRAGMA index_list(projection_projects) + `; + assert.ok( + projectIndexes.some( + (index) => index.name === "idx_projection_projects_workspace_root_deleted_at", + ), + ); + + const projectIndexColumns = yield* sql<{ + readonly seqno: number; + readonly cid: number; + readonly name: string; + }>` + PRAGMA index_info('idx_projection_projects_workspace_root_deleted_at') + `; + assert.deepStrictEqual( + projectIndexColumns.map((column) => column.name), + ["workspace_root", "deleted_at"], + ); + + const threadIndexes = yield* sql<{ + readonly seq: number; + readonly name: string; + readonly unique: number; + readonly origin: string; + readonly partial: number; + }>` + PRAGMA index_list(projection_threads) + `; + assert.ok( + threadIndexes.some( + (index) => index.name === "idx_projection_threads_project_deleted_created", + ), + ); + + const threadIndexColumns = yield* sql<{ + readonly seqno: number; + readonly cid: number; + readonly name: string; + }>` + PRAGMA index_info('idx_projection_threads_project_deleted_created') + `; + assert.deepStrictEqual( + threadIndexColumns.map((column) => column.name), + ["project_id", "deleted_at", "created_at"], + ); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.ts b/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.ts new file mode 100644 index 0000000000..bf74a5147d --- /dev/null +++ b/apps/server/src/persistence/Migrations/019_ProjectionSnapshotLookupIndexes.ts @@ -0,0 +1,16 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_projects_workspace_root_deleted_at + ON projection_projects(workspace_root, deleted_at) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_threads_project_deleted_created + ON projection_threads(project_id, deleted_at, created_at) + `; +}); diff --git a/apps/server/src/serverRuntimeStartup.test.ts b/apps/server/src/serverRuntimeStartup.test.ts index 55700e3482..fc06d77566 100644 --- a/apps/server/src/serverRuntimeStartup.test.ts +++ b/apps/server/src/serverRuntimeStartup.test.ts @@ -1,8 +1,13 @@ import { assert, it } from "@effect/vitest"; -import { Deferred, Effect, Fiber, Ref } from "effect"; -import { TestClock } from "effect/testing"; +import { Deferred, Effect, Fiber, Option, Ref } from "effect"; -import { makeCommandGate, ServerRuntimeStartupError } from "./serverRuntimeStartup.ts"; +import { AnalyticsService } from "./telemetry/Services/AnalyticsService.ts"; +import { ProjectionSnapshotQuery } from "./orchestration/Services/ProjectionSnapshotQuery.ts"; +import { + launchStartupHeartbeat, + makeCommandGate, + ServerRuntimeStartupError, +} from "./serverRuntimeStartup.ts"; it.effect("enqueueCommand waits for readiness and then drains queued work", () => Effect.scoped( @@ -14,7 +19,7 @@ it.effect("enqueueCommand waits for readiness and then drains queued work", () = .enqueueCommand(Ref.updateAndGet(executionCount, (count) => count + 1)) .pipe(Effect.forkScoped); - yield* TestClock.adjust("50 millis"); + yield* Effect.yieldNow; assert.equal(yield* Ref.get(executionCount), 0); yield* commandGate.signalCommandReady; @@ -47,3 +52,31 @@ it.effect("enqueueCommand fails queued work when readiness fails", () => }), ), ); + +it.effect("launchStartupHeartbeat does not block the caller while counts are loading", () => + Effect.scoped( + Effect.gen(function* () { + const releaseCounts = yield* Deferred.make(); + + yield* launchStartupHeartbeat.pipe( + Effect.provideService(ProjectionSnapshotQuery, { + getSnapshot: () => Effect.die("unused"), + getCounts: () => + Deferred.await(releaseCounts).pipe( + Effect.as({ + projectCount: 2, + threadCount: 3, + }), + ), + getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), + getThreadCheckpointContext: () => Effect.succeed(Option.none()), + }), + Effect.provideService(AnalyticsService, { + record: () => Effect.void, + flush: Effect.void, + }), + ); + }), + ), +); diff --git a/apps/server/src/serverRuntimeStartup.ts b/apps/server/src/serverRuntimeStartup.ts index 2457f6ffe8..026145eca5 100644 --- a/apps/server/src/serverRuntimeStartup.ts +++ b/apps/server/src/serverRuntimeStartup.ts @@ -5,7 +5,19 @@ import { ProjectId, ThreadId, } from "@t3tools/contracts"; -import { Data, Deferred, Effect, Exit, Layer, Path, Queue, Ref, Scope, ServiceMap } from "effect"; +import { + Data, + Deferred, + Effect, + Exit, + Layer, + Option, + Path, + Queue, + Ref, + Scope, + ServiceMap, +} from "effect"; import { ServerConfig } from "./config"; import { Keybindings } from "./keybindings"; @@ -105,17 +117,15 @@ export const makeCommandGate = Effect.gen(function* () { } satisfies CommandGate; }); -const recordStartupHeartbeat = Effect.gen(function* () { +export const recordStartupHeartbeat = Effect.gen(function* () { const analytics = yield* AnalyticsService; const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; - const { threadCount, projectCount } = yield* projectionSnapshotQuery.getSnapshot().pipe( - Effect.map((snapshot) => ({ - threadCount: snapshot.threads.length, - projectCount: snapshot.projects.length, - })), + const { threadCount, projectCount } = yield* projectionSnapshotQuery.getCounts().pipe( Effect.catch((cause) => - Effect.logWarning("failed to gather startup snapshot for telemetry", { cause }).pipe( + Effect.logWarning("failed to gather startup projection counts for telemetry", { + cause, + }).pipe( Effect.as({ threadCount: 0, projectCount: 0, @@ -130,6 +140,12 @@ const recordStartupHeartbeat = Effect.gen(function* () { }); }); +export const launchStartupHeartbeat = recordStartupHeartbeat.pipe( + Effect.ignoreCause({ log: true }), + Effect.forkScoped, + Effect.asVoid, +); + const autoBootstrapWelcome = Effect.gen(function* () { const serverConfig = yield* ServerConfig; const projectionReadModelQuery = yield* ProjectionSnapshotQuery; @@ -141,14 +157,13 @@ const autoBootstrapWelcome = Effect.gen(function* () { if (serverConfig.autoBootstrapProjectFromCwd) { yield* Effect.gen(function* () { - const snapshot = yield* projectionReadModelQuery.getSnapshot(); - const existingProject = snapshot.projects.find( - (project) => project.workspaceRoot === serverConfig.cwd && project.deletedAt === null, + const existingProject = yield* projectionReadModelQuery.getActiveProjectByWorkspaceRoot( + serverConfig.cwd, ); let nextProjectId: ProjectId; let nextProjectDefaultModelSelection: ModelSelection; - if (!existingProject) { + if (Option.isNone(existingProject)) { const createdAt = new Date().toISOString(); nextProjectId = ProjectId.makeUnsafe(crypto.randomUUID()); const bootstrapProjectTitle = path.basename(serverConfig.cwd) || "project"; @@ -166,17 +181,16 @@ const autoBootstrapWelcome = Effect.gen(function* () { createdAt, }); } else { - nextProjectId = existingProject.id; - nextProjectDefaultModelSelection = existingProject.defaultModelSelection ?? { + nextProjectId = existingProject.value.id; + nextProjectDefaultModelSelection = existingProject.value.defaultModelSelection ?? { provider: "codex", model: "gpt-5-codex", }; } - const existingThread = snapshot.threads.find( - (thread) => thread.projectId === nextProjectId && thread.deletedAt === null, - ); - if (!existingThread) { + const existingThreadId = + yield* projectionReadModelQuery.getFirstActiveThreadIdByProjectId(nextProjectId); + if (Option.isNone(existingThreadId)) { const createdAt = new Date().toISOString(); const createdThreadId = ThreadId.makeUnsafe(crypto.randomUUID()); yield* orchestrationEngine.dispatch({ @@ -196,7 +210,7 @@ const autoBootstrapWelcome = Effect.gen(function* () { bootstrapThreadId = createdThreadId; } else { bootstrapProjectId = nextProjectId; - bootstrapThreadId = existingThread.id; + bootstrapThreadId = existingThreadId.value; } }); } @@ -314,7 +328,7 @@ const makeServerRuntimeStartup = Effect.gen(function* () { }); yield* Effect.logDebug("startup phase: recording startup heartbeat"); - yield* recordStartupHeartbeat; + yield* launchStartupHeartbeat; yield* Effect.logDebug("startup phase: browser open check"); yield* maybeOpenBrowser; yield* Effect.logDebug("startup phase: complete");