diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 408ea827c2..152ed1d608 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -289,9 +289,10 @@ export const makeOrchestrationIntegrationHarness = ( ); const checkpointStoreLayer = CheckpointStoreLive.pipe(Layer.provide(GitCoreLive)); + const projectionSnapshotQueryLayer = OrchestrationProjectionSnapshotQueryLive; const runtimeServicesLayer = Layer.mergeAll( - orchestrationLayer, - OrchestrationProjectionSnapshotQueryLive, + projectionSnapshotQueryLayer, + orchestrationLayer.pipe(Layer.provide(projectionSnapshotQueryLayer)), ProjectionCheckpointRepositoryLive, ProjectionPendingApprovalRepositoryLive, checkpointStoreLayer, @@ -333,7 +334,9 @@ export const makeOrchestrationIntegrationHarness = ( Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), ); - const layer = orchestrationReactorLayer.pipe( + const layer = Layer.empty.pipe( + Layer.provideMerge(runtimeServicesLayer), + Layer.provideMerge(orchestrationReactorLayer), Layer.provide(persistenceLayer), Layer.provideMerge(ServerSettingsService.layerTest()), Layer.provideMerge(ServerConfig.layerTest(workspaceDir, rootDir)), diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index 781a0025e2..ab9f633e02 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -23,6 +23,7 @@ import { GitCoreLive } from "../../git/Layers/GitCore.ts"; import { CheckpointReactorLive } from "./CheckpointReactor.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { RuntimeReceiptBusLive } from "./RuntimeReceiptBus.ts"; import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; @@ -249,6 +250,7 @@ describe("CheckpointReactor", () => { options?.providerName ?? "codex", ); const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 6aa889991e..16bbd98d6b 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -8,7 +8,7 @@ import { TurnId, type OrchestrationEvent, } from "@t3tools/contracts"; -import { Effect, Layer, ManagedRuntime, Queue, Stream } from "effect"; +import { Effect, Layer, ManagedRuntime, Option, Queue, Stream } from "effect"; import { describe, expect, it } from "vitest"; import { PersistenceSqlError } from "../../persistence/Errors.ts"; @@ -21,11 +21,13 @@ import { } from "../../persistence/Services/OrchestrationEventStore.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { OrchestrationProjectionPipeline, type OrchestrationProjectionPipelineShape, } from "../Services/ProjectionPipeline.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { ServerConfig } from "../../config.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -39,6 +41,7 @@ async function createOrchestrationSystem() { prefix: "t3-orchestration-engine-test-", }); const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), @@ -60,6 +63,105 @@ function now() { } describe("OrchestrationEngine", () => { + it("bootstraps the in-memory read model from persisted projections", async () => { + const failOnHistoricalReplayStore: OrchestrationEventStoreShape = { + append: () => + Effect.fail( + new PersistenceSqlError({ + operation: "test.append", + detail: "append should not be called during bootstrap", + }), + ), + readFromSequence: () => Stream.empty, + readAll: () => + Stream.fail( + new PersistenceSqlError({ + operation: "test.readAll", + detail: "historical replay should not be used during bootstrap", + }), + ), + }; + + const projectionSnapshot = { + snapshotSequence: 7, + updatedAt: "2026-03-03T00:00:04.000Z", + projects: [ + { + id: asProjectId("project-bootstrap"), + title: "Bootstrap Project", + workspaceRoot: "/tmp/project-bootstrap", + defaultModelSelection: { + provider: "codex" as const, + model: "gpt-5-codex", + }, + scripts: [], + createdAt: "2026-03-03T00:00:00.000Z", + updatedAt: "2026-03-03T00:00:01.000Z", + deletedAt: null, + }, + ], + threads: [ + { + id: ThreadId.makeUnsafe("thread-bootstrap"), + projectId: asProjectId("project-bootstrap"), + title: "Bootstrap Thread", + modelSelection: { + provider: "codex" as const, + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access" as const, + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: "2026-03-03T00:00:02.000Z", + updatedAt: "2026-03-03T00:00:03.000Z", + archivedAt: null, + deletedAt: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + }, + ], + }; + + const layer = OrchestrationEngineLive.pipe( + Layer.provide( + Layer.succeed(ProjectionSnapshotQuery, { + getSnapshot: () => Effect.succeed(projectionSnapshot), + getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }), + getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), + getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), + getThreadCheckpointContext: () => Effect.succeed(Option.none()), + }), + ), + Layer.provide( + Layer.succeed(OrchestrationProjectionPipeline, { + bootstrap: Effect.void, + projectEvent: () => Effect.void, + } satisfies OrchestrationProjectionPipelineShape), + ), + Layer.provide(Layer.succeed(OrchestrationEventStore, failOnHistoricalReplayStore)), + Layer.provide(OrchestrationCommandReceiptRepositoryLive), + Layer.provide(SqlitePersistenceMemory), + ); + + const runtime = ManagedRuntime.make(layer); + + const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const readModel = await runtime.runPromise(engine.getReadModel()); + + expect(readModel.snapshotSequence).toBe(7); + expect(readModel.projects).toHaveLength(1); + expect(readModel.projects[0]?.title).toBe("Bootstrap Project"); + expect(readModel.threads).toHaveLength(1); + expect(readModel.threads[0]?.title).toBe("Bootstrap Thread"); + + await runtime.dispose(); + }); + it("returns deterministic read models for repeated reads", async () => { const createdAt = now(); const system = await createOrchestrationSystem(); @@ -417,6 +519,7 @@ describe("OrchestrationEngine", () => { const runtime = ManagedRuntime.make( OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(Layer.succeed(OrchestrationEventStore, flakyStore)), Layer.provide(OrchestrationCommandReceiptRepositoryLive), @@ -512,6 +615,7 @@ describe("OrchestrationEngine", () => { const runtime = ManagedRuntime.make( OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), @@ -653,6 +757,7 @@ describe("OrchestrationEngine", () => { const runtime = ManagedRuntime.make( OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)), Layer.provide(Layer.succeed(OrchestrationEventStore, nonTransactionalStore)), Layer.provide(OrchestrationCommandReceiptRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 5c52379f47..bd3581deb6 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -19,6 +19,7 @@ import { import { decideOrchestrationCommand } from "../decider.ts"; import { createEmptyReadModel, projectEvent } from "../projector.ts"; import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; import { OrchestrationEngineService, type OrchestrationEngineShape, @@ -54,6 +55,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { const eventStore = yield* OrchestrationEventStore; const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository; const projectionPipeline = yield* OrchestrationProjectionPipeline; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; let readModel = createEmptyReadModel(new Date().toISOString()); @@ -195,13 +197,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { }; yield* projectionPipeline.bootstrap; - - // bootstrap in-memory read model from event store - yield* Stream.runForEach(eventStore.readAll(), (event) => - Effect.gen(function* () { - readModel = yield* projectEvent(readModel, event); - }), - ); + readModel = yield* projectionSnapshotQuery.getSnapshot(); const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope))); yield* Effect.forkScoped(worker); diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 77b5d4d619..1850745469 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -25,6 +25,7 @@ import { ORCHESTRATION_PROJECTOR_NAMES, OrchestrationProjectionPipelineLive, } from "./ProjectionPipeline.ts"; +import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts"; import { ServerConfig } from "../../config.ts"; @@ -1841,6 +1842,7 @@ it.effect("restores pending turn-start metadata across projection pipeline resta const engineLayer = it.layer( OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 506d6d2864..ca3dc04517 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -30,6 +30,7 @@ import { GitCore, type GitCoreShape } from "../../git/Services/GitCore.ts"; import { TextGeneration, type TextGenerationShape } from "../../git/Services/TextGeneration.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { ProviderCommandReactorLive } from "./ProviderCommandReactor.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; @@ -211,6 +212,7 @@ describe("ProviderCommandReactor", () => { }; const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 529eae2444..6c27e1010c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -31,6 +31,7 @@ import { } from "../../provider/Services/ProviderService.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts"; import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts"; import { OrchestrationEngineService, @@ -197,6 +198,7 @@ describe("ProviderRuntimeIngestion", () => { fs.mkdirSync(path.join(workspaceRoot, ".git")); const provider = createProviderServiceHarness(); const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionSnapshotQueryLive), Layer.provide(OrchestrationProjectionPipelineLive), Layer.provide(OrchestrationEventStoreLive), Layer.provide(OrchestrationCommandReceiptRepositoryLive), diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 40a8eb09bc..4e296e817a 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -99,12 +99,24 @@ const ReactorLayerLive = Layer.empty.pipe( Layer.provideMerge(RuntimeReceiptBusLive), ); -const OrchestrationLayerLive = Layer.empty.pipe( - Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive), - Layer.provideMerge(OrchestrationEngineLive), - Layer.provideMerge(OrchestrationProjectionPipelineLive), - Layer.provideMerge(OrchestrationEventStoreLive), - Layer.provideMerge(OrchestrationCommandReceiptRepositoryLive), +const OrchestrationEventInfrastructureLayerLive = Layer.mergeAll( + OrchestrationEventStoreLive, + OrchestrationCommandReceiptRepositoryLive, +); + +const OrchestrationProjectionPipelineLayerLive = OrchestrationProjectionPipelineLive.pipe( + Layer.provide(OrchestrationEventStoreLive), +); + +const OrchestrationInfrastructureLayerLive = Layer.mergeAll( + OrchestrationProjectionSnapshotQueryLive, + OrchestrationEventInfrastructureLayerLive, + OrchestrationProjectionPipelineLayerLive, +); + +const OrchestrationLayerLive = Layer.mergeAll( + OrchestrationInfrastructureLayerLive, + OrchestrationEngineLive.pipe(Layer.provide(OrchestrationInfrastructureLayerLive)), ); const CheckpointingLayerLive = Layer.empty.pipe(