Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ export const makeOrchestrationIntegrationHarness = (
);

const checkpointStoreLayer = CheckpointStoreLive.pipe(Layer.provide(GitCoreLive));
const projectionSnapshotQueryLayer = OrchestrationProjectionSnapshotQueryLive;
const runtimeServicesLayer = Layer.mergeAll(
orchestrationLayer,
OrchestrationProjectionSnapshotQueryLive,
projectionSnapshotQueryLayer,
orchestrationLayer.pipe(Layer.provide(projectionSnapshotQueryLayer)),
ProjectionCheckpointRepositoryLive,
ProjectionPendingApprovalRepositoryLive,
checkpointStoreLayer,
Expand Down Expand Up @@ -333,7 +334,9 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provideMerge(providerCommandReactorLayer),
Layer.provideMerge(checkpointReactorLayer),
);
const layer = orchestrationReactorLayer.pipe(
const layer = Layer.empty.pipe(
Layer.provideMerge(runtimeServicesLayer),
Layer.provideMerge(orchestrationReactorLayer),
Layer.provide(persistenceLayer),
Layer.provideMerge(ServerSettingsService.layerTest()),
Layer.provideMerge(ServerConfig.layerTest(workspaceDir, rootDir)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { GitCoreLive } from "../../git/Layers/GitCore.ts";
import { CheckpointReactorLive } from "./CheckpointReactor.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { RuntimeReceiptBusLive } from "./RuntimeReceiptBus.ts";
import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts";
import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts";
Expand Down Expand Up @@ -249,6 +250,7 @@ describe("CheckpointReactor", () => {
options?.providerName ?? "codex",
);
const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down
107 changes: 106 additions & 1 deletion apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
TurnId,
type OrchestrationEvent,
} from "@t3tools/contracts";
import { Effect, Layer, ManagedRuntime, Queue, Stream } from "effect";
import { Effect, Layer, ManagedRuntime, Option, Queue, Stream } from "effect";
import { describe, expect, it } from "vitest";

import { PersistenceSqlError } from "../../persistence/Errors.ts";
Expand All @@ -21,11 +21,13 @@ import {
} from "../../persistence/Services/OrchestrationEventStore.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import {
OrchestrationProjectionPipeline,
type OrchestrationProjectionPipelineShape,
} from "../Services/ProjectionPipeline.ts";
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
import { ServerConfig } from "../../config.ts";
import * as NodeServices from "@effect/platform-node/NodeServices";

Expand All @@ -39,6 +41,7 @@ async function createOrchestrationSystem() {
prefix: "t3-orchestration-engine-test-",
});
const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand All @@ -60,6 +63,105 @@ function now() {
}

describe("OrchestrationEngine", () => {
it("bootstraps the in-memory read model from persisted projections", async () => {
const failOnHistoricalReplayStore: OrchestrationEventStoreShape = {
append: () =>
Effect.fail(
new PersistenceSqlError({
operation: "test.append",
detail: "append should not be called during bootstrap",
}),
),
readFromSequence: () => Stream.empty,
readAll: () =>
Stream.fail(
new PersistenceSqlError({
operation: "test.readAll",
detail: "historical replay should not be used during bootstrap",
}),
),
};

const projectionSnapshot = {
snapshotSequence: 7,
updatedAt: "2026-03-03T00:00:04.000Z",
projects: [
{
id: asProjectId("project-bootstrap"),
title: "Bootstrap Project",
workspaceRoot: "/tmp/project-bootstrap",
defaultModelSelection: {
provider: "codex" as const,
model: "gpt-5-codex",
},
scripts: [],
createdAt: "2026-03-03T00:00:00.000Z",
updatedAt: "2026-03-03T00:00:01.000Z",
deletedAt: null,
},
],
threads: [
{
id: ThreadId.makeUnsafe("thread-bootstrap"),
projectId: asProjectId("project-bootstrap"),
title: "Bootstrap Thread",
modelSelection: {
provider: "codex" as const,
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access" as const,
branch: null,
worktreePath: null,
latestTurn: null,
createdAt: "2026-03-03T00:00:02.000Z",
updatedAt: "2026-03-03T00:00:03.000Z",
archivedAt: null,
deletedAt: null,
messages: [],
proposedPlans: [],
activities: [],
checkpoints: [],
session: null,
},
],
};

const layer = OrchestrationEngineLive.pipe(
Layer.provide(
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () => Effect.succeed(projectionSnapshot),
getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.none()),
}),
),
Layer.provide(
Layer.succeed(OrchestrationProjectionPipeline, {
bootstrap: Effect.void,
projectEvent: () => Effect.void,
} satisfies OrchestrationProjectionPipelineShape),
),
Layer.provide(Layer.succeed(OrchestrationEventStore, failOnHistoricalReplayStore)),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Layer.provide(SqlitePersistenceMemory),
);

const runtime = ManagedRuntime.make(layer);

const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const readModel = await runtime.runPromise(engine.getReadModel());

expect(readModel.snapshotSequence).toBe(7);
expect(readModel.projects).toHaveLength(1);
expect(readModel.projects[0]?.title).toBe("Bootstrap Project");
expect(readModel.threads).toHaveLength(1);
expect(readModel.threads[0]?.title).toBe("Bootstrap Thread");

await runtime.dispose();
});

it("returns deterministic read models for repeated reads", async () => {
const createdAt = now();
const system = await createOrchestrationSystem();
Expand Down Expand Up @@ -417,6 +519,7 @@ describe("OrchestrationEngine", () => {

const runtime = ManagedRuntime.make(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(Layer.succeed(OrchestrationEventStore, flakyStore)),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down Expand Up @@ -512,6 +615,7 @@ describe("OrchestrationEngine", () => {

const runtime = ManagedRuntime.make(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down Expand Up @@ -653,6 +757,7 @@ describe("OrchestrationEngine", () => {

const runtime = ManagedRuntime.make(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, flakyProjectionPipeline)),
Layer.provide(Layer.succeed(OrchestrationEventStore, nonTransactionalStore)),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down
10 changes: 3 additions & 7 deletions apps/server/src/orchestration/Layers/OrchestrationEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { decideOrchestrationCommand } from "../decider.ts";
import { createEmptyReadModel, projectEvent } from "../projector.ts";
import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts";
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
import {
OrchestrationEngineService,
type OrchestrationEngineShape,
Expand Down Expand Up @@ -54,6 +55,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
const eventStore = yield* OrchestrationEventStore;
const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository;
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;

let readModel = createEmptyReadModel(new Date().toISOString());

Expand Down Expand Up @@ -195,13 +197,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
};

yield* projectionPipeline.bootstrap;

// bootstrap in-memory read model from event store
yield* Stream.runForEach(eventStore.readAll(), (event) =>
Effect.gen(function* () {
readModel = yield* projectEvent(readModel, event);
}),
);
readModel = yield* projectionSnapshotQuery.getSnapshot();

const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope)));
yield* Effect.forkScoped(worker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
ORCHESTRATION_PROJECTOR_NAMES,
OrchestrationProjectionPipelineLive,
} from "./ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { OrchestrationProjectionPipeline } from "../Services/ProjectionPipeline.ts";
import { ServerConfig } from "../../config.ts";
Expand Down Expand Up @@ -1841,6 +1842,7 @@ it.effect("restores pending turn-start metadata across projection pipeline resta

const engineLayer = it.layer(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { GitCore, type GitCoreShape } from "../../git/Services/GitCore.ts";
import { TextGeneration, type TextGenerationShape } from "../../git/Services/TextGeneration.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { ProviderCommandReactorLive } from "./ProviderCommandReactor.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
Expand Down Expand Up @@ -211,6 +212,7 @@ describe("ProviderCommandReactor", () => {
};

const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
} from "../../provider/Services/ProviderService.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "./ProjectionSnapshotQuery.ts";
import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts";
import {
OrchestrationEngineService,
Expand Down Expand Up @@ -197,6 +198,7 @@ describe("ProviderRuntimeIngestion", () => {
fs.mkdirSync(path.join(workspaceRoot, ".git"));
const provider = createProviderServiceHarness();
const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Expand Down
24 changes: 18 additions & 6 deletions apps/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,24 @@ const ReactorLayerLive = Layer.empty.pipe(
Layer.provideMerge(RuntimeReceiptBusLive),
);

const OrchestrationLayerLive = Layer.empty.pipe(
Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive),
Layer.provideMerge(OrchestrationEngineLive),
Layer.provideMerge(OrchestrationProjectionPipelineLive),
Layer.provideMerge(OrchestrationEventStoreLive),
Layer.provideMerge(OrchestrationCommandReceiptRepositoryLive),
const OrchestrationEventInfrastructureLayerLive = Layer.mergeAll(
OrchestrationEventStoreLive,
OrchestrationCommandReceiptRepositoryLive,
);

const OrchestrationProjectionPipelineLayerLive = OrchestrationProjectionPipelineLive.pipe(
Layer.provide(OrchestrationEventStoreLive),
);

const OrchestrationInfrastructureLayerLive = Layer.mergeAll(
OrchestrationProjectionSnapshotQueryLive,
OrchestrationEventInfrastructureLayerLive,
OrchestrationProjectionPipelineLayerLive,
);

const OrchestrationLayerLive = Layer.mergeAll(
OrchestrationInfrastructureLayerLive,
OrchestrationEngineLive.pipe(Layer.provide(OrchestrationInfrastructureLayerLive)),
);

const CheckpointingLayerLive = Layer.empty.pipe(
Expand Down
Loading