diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 1a8f802d73..e66a214fb7 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -360,7 +360,7 @@ export const makeOrchestrationIntegrationHarness = ( const scope = yield* Scope.make("sequential"); yield* tryRuntimePromise("start OrchestrationReactor", () => - runtime.runPromise(reactor.start.pipe(Scope.provide(scope))), + runtime.runPromise(reactor.start().pipe(Scope.provide(scope))), ).pipe(Effect.orDie); const receiptHistory = yield* Ref.make>([]); yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) => diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index 69360ebf6a..075f62f889 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -271,7 +271,7 @@ describe("CheckpointReactor", () => { const reactor = await runtime.runPromise(Effect.service(CheckpointReactor)); const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore)); scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(reactor.drain); const createdAt = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index e4a673342c..561626b8de 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -766,7 +766,7 @@ const make = Effect.gen(function* () { const worker = yield* makeDrainableWorker(processInputSafely); - const start: CheckpointReactorShape["start"] = Effect.gen(function* () { + const start: CheckpointReactorShape["start"] = Effect.fn("start")(function* () { yield* Effect.forkScoped( Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => { if ( diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts index 1514bef595..d60f0cf722 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts @@ -24,25 +24,28 @@ describe("OrchestrationReactor", () => { Layer.effect(OrchestrationReactor, makeOrchestrationReactor).pipe( Layer.provideMerge( Layer.succeed(ProviderRuntimeIngestionService, { - start: Effect.sync(() => { + start: () => { started.push("provider-runtime-ingestion"); - }), + return Effect.void; + }, drain: Effect.void, }), ), Layer.provideMerge( Layer.succeed(ProviderCommandReactor, { - start: Effect.sync(() => { + start: () => { started.push("provider-command-reactor"); - }), + return Effect.void; + }, drain: Effect.void, }), ), Layer.provideMerge( Layer.succeed(CheckpointReactor, { - start: Effect.sync(() => { + start: () => { started.push("checkpoint-reactor"); - }), + return Effect.void; + }, drain: Effect.void, }), ), @@ -51,7 +54,7 @@ describe("OrchestrationReactor", () => { const reactor = await runtime.runPromise(Effect.service(OrchestrationReactor)); const scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); expect(started).toEqual([ "provider-runtime-ingestion", diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts index 1e498885a0..99d30c57a2 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts @@ -13,10 +13,10 @@ export const makeOrchestrationReactor = Effect.gen(function* () { const providerCommandReactor = yield* ProviderCommandReactor; const checkpointReactor = yield* CheckpointReactor; - const start: OrchestrationReactorShape["start"] = Effect.gen(function* () { - yield* providerRuntimeIngestion.start; - yield* providerCommandReactor.start; - yield* checkpointReactor.start; + const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () { + yield* providerRuntimeIngestion.start(); + yield* providerCommandReactor.start(); + yield* checkpointReactor.start(); }); return { diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 4e87390eb1..ed7037695f 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -235,7 +235,7 @@ describe("ProviderCommandReactor", () => { const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const reactor = await runtime.runPromise(Effect.service(ProviderCommandReactor)); scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(reactor.drain); await Effect.runPromise( diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index f65137f4b5..d4f13ec727 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -773,22 +773,24 @@ const make = Effect.gen(function* () { const worker = yield* makeDrainableWorker(processDomainEventSafely); - const start: ProviderCommandReactorShape["start"] = Effect.forkScoped( - Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => { + const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () { + const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) { if ( - event.type !== "thread.runtime-mode-set" && - event.type !== "thread.turn-start-requested" && - event.type !== "thread.turn-interrupt-requested" && - event.type !== "thread.approval-response-requested" && - event.type !== "thread.user-input-response-requested" && - event.type !== "thread.session-stop-requested" + event.type === "thread.runtime-mode-set" || + event.type === "thread.turn-start-requested" || + event.type === "thread.turn-interrupt-requested" || + event.type === "thread.approval-response-requested" || + event.type === "thread.user-input-response-requested" || + event.type === "thread.session-stop-requested" ) { - return Effect.void; + return yield* worker.enqueue(event); } + }); - return worker.enqueue(event); - }), - ).pipe(Effect.asVoid); + yield* Effect.forkScoped( + Stream.runForEach(orchestrationEngine.streamDomainEvents, processEvent), + ); + }); return { start, diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 3eaeb2cd1d..529eae2444 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -214,7 +214,7 @@ describe("ProviderRuntimeIngestion", () => { const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService)); scope = await Effect.runPromise(Scope.make("sequential")); - await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope))); + await Effect.runPromise(ingestion.start().pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(ingestion.drain); const createdAt = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index b42e5f1566..b479d1c28a 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -500,7 +500,7 @@ function runtimeEventToActivities( return []; } -const make = Effect.gen(function* () { +const make = Effect.fn("make")(function* () { const orchestrationEngine = yield* OrchestrationEngineService; const providerService = yield* ProviderService; const projectionTurnRepository = yield* ProjectionTurnRepository; @@ -587,8 +587,8 @@ const make = Effect.gen(function* () { const appendBufferedAssistantText = (messageId: MessageId, delta: string) => Cache.getOption(bufferedAssistantTextByMessageId, messageId).pipe( - Effect.flatMap((existingText) => - Effect.gen(function* () { + Effect.flatMap( + Effect.fn("appendBufferedAssistantText")(function* (existingText) { const nextText = Option.match(existingText, { onNone: () => delta, onSome: (text) => `${text}${delta}`, @@ -644,7 +644,7 @@ const make = Effect.gen(function* () { const clearAssistantMessageState = (messageId: MessageId) => clearBufferedAssistantText(messageId); - const finalizeAssistantMessage = (input: { + const finalizeAssistantMessage = Effect.fn("finalizeAssistantMessage")(function* (input: { event: ProviderRuntimeEvent; threadId: ThreadId; messageId: MessageId; @@ -653,40 +653,39 @@ const make = Effect.gen(function* () { commandTag: string; finalDeltaCommandTag: string; fallbackText?: string; - }) => - Effect.gen(function* () { - const bufferedText = yield* takeBufferedAssistantText(input.messageId); - const text = - bufferedText.length > 0 - ? bufferedText - : (input.fallbackText?.trim().length ?? 0) > 0 - ? input.fallbackText! - : ""; - - if (text.length > 0) { - yield* orchestrationEngine.dispatch({ - type: "thread.message.assistant.delta", - commandId: providerCommandId(input.event, input.finalDeltaCommandTag), - threadId: input.threadId, - messageId: input.messageId, - delta: text, - ...(input.turnId ? { turnId: input.turnId } : {}), - createdAt: input.createdAt, - }); - } - + }) { + const bufferedText = yield* takeBufferedAssistantText(input.messageId); + const text = + bufferedText.length > 0 + ? bufferedText + : (input.fallbackText?.trim().length ?? 0) > 0 + ? input.fallbackText! + : ""; + + if (text.length > 0) { yield* orchestrationEngine.dispatch({ - type: "thread.message.assistant.complete", - commandId: providerCommandId(input.event, input.commandTag), + type: "thread.message.assistant.delta", + commandId: providerCommandId(input.event, input.finalDeltaCommandTag), threadId: input.threadId, messageId: input.messageId, + delta: text, ...(input.turnId ? { turnId: input.turnId } : {}), createdAt: input.createdAt, }); - yield* clearAssistantMessageState(input.messageId); + } + + yield* orchestrationEngine.dispatch({ + type: "thread.message.assistant.complete", + commandId: providerCommandId(input.event, input.commandTag), + threadId: input.threadId, + messageId: input.messageId, + ...(input.turnId ? { turnId: input.turnId } : {}), + createdAt: input.createdAt, }); + yield* clearAssistantMessageState(input.messageId); + }); - const upsertProposedPlan = (input: { + const upsertProposedPlan = Effect.fn("upsertProposedPlan")(function* (input: { event: ProviderRuntimeEvent; threadId: ThreadId; threadProposedPlans: ReadonlyArray<{ @@ -700,32 +699,31 @@ const make = Effect.gen(function* () { planMarkdown: string | undefined; createdAt: string; updatedAt: string; - }) => - Effect.gen(function* () { - const planMarkdown = normalizeProposedPlanMarkdown(input.planMarkdown); - if (!planMarkdown) { - return; - } + }) { + const planMarkdown = normalizeProposedPlanMarkdown(input.planMarkdown); + if (!planMarkdown) { + return; + } - const existingPlan = input.threadProposedPlans.find((entry) => entry.id === input.planId); - yield* orchestrationEngine.dispatch({ - type: "thread.proposed-plan.upsert", - commandId: providerCommandId(input.event, "proposed-plan-upsert"), - threadId: input.threadId, - proposedPlan: { - id: input.planId, - turnId: input.turnId ?? null, - planMarkdown, - implementedAt: existingPlan?.implementedAt ?? null, - implementationThreadId: existingPlan?.implementationThreadId ?? null, - createdAt: existingPlan?.createdAt ?? input.createdAt, - updatedAt: input.updatedAt, - }, - createdAt: input.updatedAt, - }); + const existingPlan = input.threadProposedPlans.find((entry) => entry.id === input.planId); + yield* orchestrationEngine.dispatch({ + type: "thread.proposed-plan.upsert", + commandId: providerCommandId(input.event, "proposed-plan-upsert"), + threadId: input.threadId, + proposedPlan: { + id: input.planId, + turnId: input.turnId ?? null, + planMarkdown, + implementedAt: existingPlan?.implementedAt ?? null, + implementationThreadId: existingPlan?.implementationThreadId ?? null, + createdAt: existingPlan?.createdAt ?? input.createdAt, + updatedAt: input.updatedAt, + }, + createdAt: input.updatedAt, }); + }); - const finalizeBufferedProposedPlan = (input: { + const finalizeBufferedProposedPlan = Effect.fn("finalizeBufferedProposedPlan")(function* (input: { event: ProviderRuntimeEvent; threadId: ThreadId; threadProposedPlans: ReadonlyArray<{ @@ -738,66 +736,65 @@ const make = Effect.gen(function* () { turnId?: TurnId; fallbackMarkdown?: string; updatedAt: string; - }) => - Effect.gen(function* () { - const bufferedPlan = yield* takeBufferedProposedPlan(input.planId); - const bufferedMarkdown = normalizeProposedPlanMarkdown(bufferedPlan?.text); - const fallbackMarkdown = normalizeProposedPlanMarkdown(input.fallbackMarkdown); - const planMarkdown = bufferedMarkdown ?? fallbackMarkdown; - if (!planMarkdown) { - return; - } + }) { + const bufferedPlan = yield* takeBufferedProposedPlan(input.planId); + const bufferedMarkdown = normalizeProposedPlanMarkdown(bufferedPlan?.text); + const fallbackMarkdown = normalizeProposedPlanMarkdown(input.fallbackMarkdown); + const planMarkdown = bufferedMarkdown ?? fallbackMarkdown; + if (!planMarkdown) { + return; + } - yield* upsertProposedPlan({ - event: input.event, - threadId: input.threadId, - threadProposedPlans: input.threadProposedPlans, - planId: input.planId, - ...(input.turnId ? { turnId: input.turnId } : {}), - planMarkdown, - createdAt: - bufferedPlan?.createdAt && bufferedPlan.createdAt.length > 0 - ? bufferedPlan.createdAt - : input.updatedAt, - updatedAt: input.updatedAt, - }); - yield* clearBufferedProposedPlan(input.planId); + yield* upsertProposedPlan({ + event: input.event, + threadId: input.threadId, + threadProposedPlans: input.threadProposedPlans, + planId: input.planId, + ...(input.turnId ? { turnId: input.turnId } : {}), + planMarkdown, + createdAt: + bufferedPlan?.createdAt && bufferedPlan.createdAt.length > 0 + ? bufferedPlan.createdAt + : input.updatedAt, + updatedAt: input.updatedAt, }); + yield* clearBufferedProposedPlan(input.planId); + }); - const clearTurnStateForSession = (threadId: ThreadId) => - Effect.gen(function* () { - const prefix = `${threadId}:`; - const proposedPlanPrefix = `plan:${threadId}:`; - const turnKeys = Array.from(yield* Cache.keys(turnMessageIdsByTurnKey)); - const proposedPlanKeys = Array.from(yield* Cache.keys(bufferedProposedPlanById)); - yield* Effect.forEach( - turnKeys, - (key) => - Effect.gen(function* () { - if (!key.startsWith(prefix)) { - return; - } + const clearTurnStateForSession = Effect.fn("clearTurnStateForSession")(function* ( + threadId: ThreadId, + ) { + const prefix = `${threadId}:`; + const proposedPlanPrefix = `plan:${threadId}:`; + const turnKeys = Array.from(yield* Cache.keys(turnMessageIdsByTurnKey)); + const proposedPlanKeys = Array.from(yield* Cache.keys(bufferedProposedPlanById)); + yield* Effect.forEach( + turnKeys, + Effect.fn(function* (key) { + if (!key.startsWith(prefix)) { + return; + } - const messageIds = yield* Cache.getOption(turnMessageIdsByTurnKey, key); - if (Option.isSome(messageIds)) { - yield* Effect.forEach(messageIds.value, clearAssistantMessageState, { - concurrency: 1, - }).pipe(Effect.asVoid); - } + const messageIds = yield* Cache.getOption(turnMessageIdsByTurnKey, key); + if (Option.isSome(messageIds)) { + yield* Effect.forEach(messageIds.value, clearAssistantMessageState, { + concurrency: 1, + }).pipe(Effect.asVoid); + } - yield* Cache.invalidate(turnMessageIdsByTurnKey, key); - }), - { concurrency: 1 }, - ).pipe(Effect.asVoid); - yield* Effect.forEach( - proposedPlanKeys, - (key) => - key.startsWith(proposedPlanPrefix) - ? Cache.invalidate(bufferedProposedPlanById, key) - : Effect.void, - { concurrency: 1 }, - ).pipe(Effect.asVoid); - }); + yield* Cache.invalidate(turnMessageIdsByTurnKey, key); + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + yield* Effect.forEach( + proposedPlanKeys, + (key) => + key.startsWith(proposedPlanPrefix) + ? Cache.invalidate(bufferedProposedPlanById, key) + : Effect.void, + { concurrency: 1 }, + ).pipe(Effect.asVoid); + }); const getSourceProposedPlanReferenceForPendingTurnStart = Effect.fnUntraced(function* ( threadId: ThreadId, @@ -872,359 +869,355 @@ const make = Effect.gen(function* () { }); }); - const processRuntimeEvent = (event: ProviderRuntimeEvent) => - Effect.gen(function* () { - const readModel = yield* orchestrationEngine.getReadModel(); - const thread = readModel.threads.find((entry) => entry.id === event.threadId); - if (!thread) return; + const processRuntimeEvent = Effect.fn("processRuntimeEvent")(function* ( + event: ProviderRuntimeEvent, + ) { + const readModel = yield* orchestrationEngine.getReadModel(); + const thread = readModel.threads.find((entry) => entry.id === event.threadId); + if (!thread) return; - const now = event.createdAt; - const eventTurnId = toTurnId(event.turnId); - const activeTurnId = thread.session?.activeTurnId ?? null; + const now = event.createdAt; + const eventTurnId = toTurnId(event.turnId); + const activeTurnId = thread.session?.activeTurnId ?? null; - const conflictsWithActiveTurn = - activeTurnId !== null && eventTurnId !== undefined && !sameId(activeTurnId, eventTurnId); - const missingTurnForActiveTurn = activeTurnId !== null && eventTurnId === undefined; + const conflictsWithActiveTurn = + activeTurnId !== null && eventTurnId !== undefined && !sameId(activeTurnId, eventTurnId); + const missingTurnForActiveTurn = activeTurnId !== null && eventTurnId === undefined; - const shouldApplyThreadLifecycle = (() => { - if (!STRICT_PROVIDER_LIFECYCLE_GUARD) { + const shouldApplyThreadLifecycle = (() => { + if (!STRICT_PROVIDER_LIFECYCLE_GUARD) { + return true; + } + switch (event.type) { + case "session.exited": return true; - } + case "session.started": + case "thread.started": + return true; + case "turn.started": + return !conflictsWithActiveTurn; + case "turn.completed": + if (conflictsWithActiveTurn || missingTurnForActiveTurn) { + return false; + } + // Only the active turn may close the lifecycle state. + if (activeTurnId !== null && eventTurnId !== undefined) { + return sameId(activeTurnId, eventTurnId); + } + // If no active turn is tracked, accept completion scoped to this thread. + return true; + default: + return true; + } + })(); + const acceptedTurnStartedSourcePlan = + event.type === "turn.started" && shouldApplyThreadLifecycle + ? yield* getSourceProposedPlanReferenceForAcceptedTurnStart(thread.id, eventTurnId) + : null; + + if ( + event.type === "session.started" || + event.type === "session.state.changed" || + event.type === "session.exited" || + event.type === "thread.started" || + event.type === "turn.started" || + event.type === "turn.completed" + ) { + const nextActiveTurnId = + event.type === "turn.started" + ? (eventTurnId ?? null) + : event.type === "turn.completed" || event.type === "session.exited" + ? null + : activeTurnId; + const status = (() => { switch (event.type) { + case "session.state.changed": + return orchestrationSessionStatusFromRuntimeState(event.payload.state); + case "turn.started": + return "running"; case "session.exited": - return true; + return "stopped"; + case "turn.completed": + return normalizeRuntimeTurnState(event.payload.state) === "failed" ? "error" : "ready"; case "session.started": case "thread.started": - return true; - case "turn.started": - return !conflictsWithActiveTurn; - case "turn.completed": - if (conflictsWithActiveTurn || missingTurnForActiveTurn) { - return false; - } - // Only the active turn may close the lifecycle state. - if (activeTurnId !== null && eventTurnId !== undefined) { - return sameId(activeTurnId, eventTurnId); - } - // If no active turn is tracked, accept completion scoped to this thread. - return true; - default: - return true; + // Provider thread/session start notifications can arrive during an + // active turn; preserve turn-running state in that case. + return activeTurnId !== null ? "running" : "ready"; } })(); - const acceptedTurnStartedSourcePlan = - event.type === "turn.started" && shouldApplyThreadLifecycle - ? yield* getSourceProposedPlanReferenceForAcceptedTurnStart(thread.id, eventTurnId) - : null; - - if ( - event.type === "session.started" || - event.type === "session.state.changed" || - event.type === "session.exited" || - event.type === "thread.started" || - event.type === "turn.started" || - event.type === "turn.completed" - ) { - const nextActiveTurnId = - event.type === "turn.started" - ? (eventTurnId ?? null) - : event.type === "turn.completed" || event.type === "session.exited" + const lastError = + event.type === "session.state.changed" && event.payload.state === "error" + ? (event.payload.reason ?? thread.session?.lastError ?? "Provider session error") + : event.type === "turn.completed" && + normalizeRuntimeTurnState(event.payload.state) === "failed" + ? (event.payload.errorMessage ?? thread.session?.lastError ?? "Turn failed") + : status === "ready" ? null - : activeTurnId; - const status = (() => { - switch (event.type) { - case "session.state.changed": - return orchestrationSessionStatusFromRuntimeState(event.payload.state); - case "turn.started": - return "running"; - case "session.exited": - return "stopped"; - case "turn.completed": - return normalizeRuntimeTurnState(event.payload.state) === "failed" - ? "error" - : "ready"; - case "session.started": - case "thread.started": - // Provider thread/session start notifications can arrive during an - // active turn; preserve turn-running state in that case. - return activeTurnId !== null ? "running" : "ready"; - } - })(); - const lastError = - event.type === "session.state.changed" && event.payload.state === "error" - ? (event.payload.reason ?? thread.session?.lastError ?? "Provider session error") - : event.type === "turn.completed" && - normalizeRuntimeTurnState(event.payload.state) === "failed" - ? (event.payload.errorMessage ?? thread.session?.lastError ?? "Turn failed") - : status === "ready" - ? null - : (thread.session?.lastError ?? null); - - if (shouldApplyThreadLifecycle) { - if (event.type === "turn.started" && acceptedTurnStartedSourcePlan !== null) { - yield* markSourceProposedPlanImplemented( - acceptedTurnStartedSourcePlan.sourceThreadId, - acceptedTurnStartedSourcePlan.sourcePlanId, - thread.id, - now, - ).pipe( - Effect.catchCause((cause) => - Effect.logWarning( - "provider runtime ingestion failed to mark source proposed plan", - { - eventId: event.eventId, - eventType: event.type, - cause: Cause.pretty(cause), - }, - ), - ), - ); - } + : (thread.session?.lastError ?? null); + + if (shouldApplyThreadLifecycle) { + if (event.type === "turn.started" && acceptedTurnStartedSourcePlan !== null) { + yield* markSourceProposedPlanImplemented( + acceptedTurnStartedSourcePlan.sourceThreadId, + acceptedTurnStartedSourcePlan.sourcePlanId, + thread.id, + now, + ).pipe( + Effect.catchCause((cause) => + Effect.logWarning("provider runtime ingestion failed to mark source proposed plan", { + eventId: event.eventId, + eventType: event.type, + cause: Cause.pretty(cause), + }), + ), + ); + } - yield* orchestrationEngine.dispatch({ - type: "thread.session.set", - commandId: providerCommandId(event, "thread-session-set"), + yield* orchestrationEngine.dispatch({ + type: "thread.session.set", + commandId: providerCommandId(event, "thread-session-set"), + threadId: thread.id, + session: { threadId: thread.id, - session: { - threadId: thread.id, - status, - providerName: event.provider, - runtimeMode: thread.session?.runtimeMode ?? "full-access", - activeTurnId: nextActiveTurnId, - lastError, - updatedAt: now, - }, - createdAt: now, - }); - } + status, + providerName: event.provider, + runtimeMode: thread.session?.runtimeMode ?? "full-access", + activeTurnId: nextActiveTurnId, + lastError, + updatedAt: now, + }, + createdAt: now, + }); } + } - const assistantDelta = - event.type === "content.delta" && event.payload.streamKind === "assistant_text" - ? event.payload.delta - : undefined; - const proposedPlanDelta = - event.type === "turn.proposed.delta" ? event.payload.delta : undefined; - - if (assistantDelta && assistantDelta.length > 0) { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); - const turnId = toTurnId(event.turnId); - if (turnId) { - yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); - } + const assistantDelta = + event.type === "content.delta" && event.payload.streamKind === "assistant_text" + ? event.payload.delta + : undefined; + const proposedPlanDelta = + event.type === "turn.proposed.delta" ? event.payload.delta : undefined; + + if (assistantDelta && assistantDelta.length > 0) { + const assistantMessageId = MessageId.makeUnsafe( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); + const turnId = toTurnId(event.turnId); + if (turnId) { + yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); + } - const assistantDeliveryMode: AssistantDeliveryMode = yield* Effect.map( - serverSettingsService.getSettings, - (settings) => (settings.enableAssistantStreaming ? "streaming" : "buffered"), - ); - if (assistantDeliveryMode === "buffered") { - const spillChunk = yield* appendBufferedAssistantText(assistantMessageId, assistantDelta); - if (spillChunk.length > 0) { - yield* orchestrationEngine.dispatch({ - type: "thread.message.assistant.delta", - commandId: providerCommandId(event, "assistant-delta-buffer-spill"), - threadId: thread.id, - messageId: assistantMessageId, - delta: spillChunk, - ...(turnId ? { turnId } : {}), - createdAt: now, - }); - } - } else { + const assistantDeliveryMode: AssistantDeliveryMode = yield* Effect.map( + serverSettingsService.getSettings, + (settings) => (settings.enableAssistantStreaming ? "streaming" : "buffered"), + ); + if (assistantDeliveryMode === "buffered") { + const spillChunk = yield* appendBufferedAssistantText(assistantMessageId, assistantDelta); + if (spillChunk.length > 0) { yield* orchestrationEngine.dispatch({ type: "thread.message.assistant.delta", - commandId: providerCommandId(event, "assistant-delta"), + commandId: providerCommandId(event, "assistant-delta-buffer-spill"), threadId: thread.id, messageId: assistantMessageId, - delta: assistantDelta, + delta: spillChunk, ...(turnId ? { turnId } : {}), createdAt: now, }); } - } - - if (proposedPlanDelta && proposedPlanDelta.length > 0) { - const planId = proposedPlanIdFromEvent(event, thread.id); - yield* appendBufferedProposedPlan(planId, proposedPlanDelta, now); - } - - const assistantCompletion = - event.type === "item.completed" && event.payload.itemType === "assistant_message" - ? { - messageId: MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ), - fallbackText: event.payload.detail, - } - : undefined; - const proposedPlanCompletion = - event.type === "turn.proposed.completed" - ? { - planId: proposedPlanIdFromEvent(event, thread.id), - turnId: toTurnId(event.turnId), - planMarkdown: event.payload.planMarkdown, - } - : undefined; - - if (assistantCompletion) { - const assistantMessageId = assistantCompletion.messageId; - const turnId = toTurnId(event.turnId); - const existingAssistantMessage = thread.messages.find( - (entry) => entry.id === assistantMessageId, - ); - const shouldApplyFallbackCompletionText = - !existingAssistantMessage || existingAssistantMessage.text.length === 0; - if (turnId) { - yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); - } - - yield* finalizeAssistantMessage({ - event, + } else { + yield* orchestrationEngine.dispatch({ + type: "thread.message.assistant.delta", + commandId: providerCommandId(event, "assistant-delta"), threadId: thread.id, messageId: assistantMessageId, + delta: assistantDelta, ...(turnId ? { turnId } : {}), createdAt: now, - commandTag: "assistant-complete", - finalDeltaCommandTag: "assistant-delta-finalize", - ...(assistantCompletion.fallbackText !== undefined && shouldApplyFallbackCompletionText - ? { fallbackText: assistantCompletion.fallbackText } - : {}), }); + } + } - if (turnId) { - yield* forgetAssistantMessageId(thread.id, turnId, assistantMessageId); - } + if (proposedPlanDelta && proposedPlanDelta.length > 0) { + const planId = proposedPlanIdFromEvent(event, thread.id); + yield* appendBufferedProposedPlan(planId, proposedPlanDelta, now); + } + + const assistantCompletion = + event.type === "item.completed" && event.payload.itemType === "assistant_message" + ? { + messageId: MessageId.makeUnsafe( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ), + fallbackText: event.payload.detail, + } + : undefined; + const proposedPlanCompletion = + event.type === "turn.proposed.completed" + ? { + planId: proposedPlanIdFromEvent(event, thread.id), + turnId: toTurnId(event.turnId), + planMarkdown: event.payload.planMarkdown, + } + : undefined; + + if (assistantCompletion) { + const assistantMessageId = assistantCompletion.messageId; + const turnId = toTurnId(event.turnId); + const existingAssistantMessage = thread.messages.find( + (entry) => entry.id === assistantMessageId, + ); + const shouldApplyFallbackCompletionText = + !existingAssistantMessage || existingAssistantMessage.text.length === 0; + if (turnId) { + yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); + } + + yield* finalizeAssistantMessage({ + event, + threadId: thread.id, + messageId: assistantMessageId, + ...(turnId ? { turnId } : {}), + createdAt: now, + commandTag: "assistant-complete", + finalDeltaCommandTag: "assistant-delta-finalize", + ...(assistantCompletion.fallbackText !== undefined && shouldApplyFallbackCompletionText + ? { fallbackText: assistantCompletion.fallbackText } + : {}), + }); + + if (turnId) { + yield* forgetAssistantMessageId(thread.id, turnId, assistantMessageId); } + } + + if (proposedPlanCompletion) { + yield* finalizeBufferedProposedPlan({ + event, + threadId: thread.id, + threadProposedPlans: thread.proposedPlans, + planId: proposedPlanCompletion.planId, + ...(proposedPlanCompletion.turnId ? { turnId: proposedPlanCompletion.turnId } : {}), + fallbackMarkdown: proposedPlanCompletion.planMarkdown, + updatedAt: now, + }); + } + + if (event.type === "turn.completed") { + const turnId = toTurnId(event.turnId); + if (turnId) { + const assistantMessageIds = yield* getAssistantMessageIdsForTurn(thread.id, turnId); + yield* Effect.forEach( + assistantMessageIds, + (assistantMessageId) => + finalizeAssistantMessage({ + event, + threadId: thread.id, + messageId: assistantMessageId, + turnId, + createdAt: now, + commandTag: "assistant-complete-finalize", + finalDeltaCommandTag: "assistant-delta-finalize-fallback", + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + yield* clearAssistantMessageIdsForTurn(thread.id, turnId); - if (proposedPlanCompletion) { yield* finalizeBufferedProposedPlan({ event, threadId: thread.id, threadProposedPlans: thread.proposedPlans, - planId: proposedPlanCompletion.planId, - ...(proposedPlanCompletion.turnId ? { turnId: proposedPlanCompletion.turnId } : {}), - fallbackMarkdown: proposedPlanCompletion.planMarkdown, + planId: proposedPlanIdForTurn(thread.id, turnId), + turnId, updatedAt: now, }); } + } - if (event.type === "turn.completed") { - const turnId = toTurnId(event.turnId); - if (turnId) { - const assistantMessageIds = yield* getAssistantMessageIdsForTurn(thread.id, turnId); - yield* Effect.forEach( - assistantMessageIds, - (assistantMessageId) => - finalizeAssistantMessage({ - event, - threadId: thread.id, - messageId: assistantMessageId, - turnId, - createdAt: now, - commandTag: "assistant-complete-finalize", - finalDeltaCommandTag: "assistant-delta-finalize-fallback", - }), - { concurrency: 1 }, - ).pipe(Effect.asVoid); - yield* clearAssistantMessageIdsForTurn(thread.id, turnId); + if (event.type === "session.exited") { + yield* clearTurnStateForSession(thread.id); + } + + if (event.type === "runtime.error") { + const runtimeErrorMessage = event.payload.message; - yield* finalizeBufferedProposedPlan({ - event, + const shouldApplyRuntimeError = !STRICT_PROVIDER_LIFECYCLE_GUARD + ? true + : activeTurnId === null || eventTurnId === undefined || sameId(activeTurnId, eventTurnId); + + if (shouldApplyRuntimeError) { + yield* orchestrationEngine.dispatch({ + type: "thread.session.set", + commandId: providerCommandId(event, "runtime-error-session-set"), + threadId: thread.id, + session: { threadId: thread.id, - threadProposedPlans: thread.proposedPlans, - planId: proposedPlanIdForTurn(thread.id, turnId), - turnId, + status: "error", + providerName: event.provider, + runtimeMode: thread.session?.runtimeMode ?? "full-access", + activeTurnId: eventTurnId ?? null, + lastError: runtimeErrorMessage, updatedAt: now, - }); - } - } - - if (event.type === "session.exited") { - yield* clearTurnStateForSession(thread.id); + }, + createdAt: now, + }); } + } - if (event.type === "runtime.error") { - const runtimeErrorMessage = event.payload.message; - - const shouldApplyRuntimeError = !STRICT_PROVIDER_LIFECYCLE_GUARD - ? true - : activeTurnId === null || eventTurnId === undefined || sameId(activeTurnId, eventTurnId); + if (event.type === "thread.metadata.updated" && event.payload.name) { + yield* orchestrationEngine.dispatch({ + type: "thread.meta.update", + commandId: providerCommandId(event, "thread-meta-update"), + threadId: thread.id, + title: event.payload.name, + }); + } - if (shouldApplyRuntimeError) { + if (event.type === "turn.diff.updated") { + const turnId = toTurnId(event.turnId); + if (turnId && (yield* isGitRepoForThread(thread.id))) { + // Skip if a checkpoint already exists for this turn. A real + // (non-placeholder) capture from CheckpointReactor should not + // be clobbered, and dispatching a duplicate placeholder for the + // same turnId would produce an unstable checkpointTurnCount. + if (thread.checkpoints.some((c) => c.turnId === turnId)) { + // Already tracked; no-op. + } else { + const assistantMessageId = MessageId.makeUnsafe( + `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, + ); + const maxTurnCount = thread.checkpoints.reduce( + (max, c) => Math.max(max, c.checkpointTurnCount), + 0, + ); yield* orchestrationEngine.dispatch({ - type: "thread.session.set", - commandId: providerCommandId(event, "runtime-error-session-set"), + type: "thread.turn.diff.complete", + commandId: providerCommandId(event, "thread-turn-diff-complete"), threadId: thread.id, - session: { - threadId: thread.id, - status: "error", - providerName: event.provider, - runtimeMode: thread.session?.runtimeMode ?? "full-access", - activeTurnId: eventTurnId ?? null, - lastError: runtimeErrorMessage, - updatedAt: now, - }, + turnId, + completedAt: now, + checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), + status: "missing", + files: [], + assistantMessageId, + checkpointTurnCount: maxTurnCount + 1, createdAt: now, }); } } + } - if (event.type === "thread.metadata.updated" && event.payload.name) { - yield* orchestrationEngine.dispatch({ - type: "thread.meta.update", - commandId: providerCommandId(event, "thread-meta-update"), - threadId: thread.id, - title: event.payload.name, - }); - } - - if (event.type === "turn.diff.updated") { - const turnId = toTurnId(event.turnId); - if (turnId && (yield* isGitRepoForThread(thread.id))) { - // Skip if a checkpoint already exists for this turn. A real - // (non-placeholder) capture from CheckpointReactor should not - // be clobbered, and dispatching a duplicate placeholder for the - // same turnId would produce an unstable checkpointTurnCount. - if (thread.checkpoints.some((c) => c.turnId === turnId)) { - // Already tracked; no-op. - } else { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); - const maxTurnCount = thread.checkpoints.reduce( - (max, c) => Math.max(max, c.checkpointTurnCount), - 0, - ); - yield* orchestrationEngine.dispatch({ - type: "thread.turn.diff.complete", - commandId: providerCommandId(event, "thread-turn-diff-complete"), - threadId: thread.id, - turnId, - completedAt: now, - checkpointRef: CheckpointRef.makeUnsafe(`provider-diff:${event.eventId}`), - status: "missing", - files: [], - assistantMessageId, - checkpointTurnCount: maxTurnCount + 1, - createdAt: now, - }); - } - } - } - - const activities = runtimeEventToActivities(event); - yield* Effect.forEach(activities, (activity) => - orchestrationEngine.dispatch({ - type: "thread.activity.append", - commandId: providerCommandId(event, "thread-activity-append"), - threadId: thread.id, - activity, - createdAt: activity.createdAt, - }), - ).pipe(Effect.asVoid); - }); + const activities = runtimeEventToActivities(event); + yield* Effect.forEach(activities, (activity) => + orchestrationEngine.dispatch({ + type: "thread.activity.append", + commandId: providerCommandId(event, "thread-activity-append"), + threadId: thread.id, + activity, + createdAt: activity.createdAt, + }), + ).pipe(Effect.asVoid); + }); const processDomainEvent = (_event: TurnStartRequestedDomainEvent) => Effect.void; @@ -1248,7 +1241,7 @@ const make = Effect.gen(function* () { const worker = yield* makeDrainableWorker(processInputSafely); - const start: ProviderRuntimeIngestionShape["start"] = Effect.gen(function* () { + const start: ProviderRuntimeIngestionShape["start"] = Effect.fn("start")(function* () { yield* Effect.forkScoped( Stream.runForEach(providerService.streamEvents, (event) => worker.enqueue({ source: "runtime", event }), @@ -1272,5 +1265,5 @@ const make = Effect.gen(function* () { export const ProviderRuntimeIngestionLive = Layer.effect( ProviderRuntimeIngestionService, - make, + make(), ).pipe(Layer.provide(ProjectionTurnRepositoryLive)); diff --git a/apps/server/src/orchestration/Services/CheckpointReactor.ts b/apps/server/src/orchestration/Services/CheckpointReactor.ts index 612bc22acb..539ac784cd 100644 --- a/apps/server/src/orchestration/Services/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Services/CheckpointReactor.ts @@ -22,7 +22,7 @@ export interface CheckpointReactorShape { * Consumes both orchestration-domain and provider-runtime events via an * internal queue. */ - readonly start: Effect.Effect; + readonly start: () => Effect.Effect; /** * Resolves when the internal processing queue is empty and idle. diff --git a/apps/server/src/orchestration/Services/OrchestrationReactor.ts b/apps/server/src/orchestration/Services/OrchestrationReactor.ts index 258ef2d614..9b628eebda 100644 --- a/apps/server/src/orchestration/Services/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Services/OrchestrationReactor.ts @@ -19,7 +19,7 @@ export interface OrchestrationReactorShape { * The returned effect must be run in a scope so all worker fibers can be * finalized on shutdown. */ - readonly start: Effect.Effect; + readonly start: () => Effect.Effect; } /** diff --git a/apps/server/src/orchestration/Services/ProviderCommandReactor.ts b/apps/server/src/orchestration/Services/ProviderCommandReactor.ts index 8f20571c15..8f96d8fb99 100644 --- a/apps/server/src/orchestration/Services/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Services/ProviderCommandReactor.ts @@ -22,7 +22,7 @@ export interface ProviderCommandReactorShape { * Filters orchestration domain events to provider-intent types before * processing. */ - readonly start: Effect.Effect; + readonly start: () => Effect.Effect; /** * Resolves when the internal processing queue is empty and idle. diff --git a/apps/server/src/orchestration/Services/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Services/ProviderRuntimeIngestion.ts index 2bbb157f10..f1b95762a2 100644 --- a/apps/server/src/orchestration/Services/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Services/ProviderRuntimeIngestion.ts @@ -22,7 +22,7 @@ export interface ProviderRuntimeIngestionShape { * Uses an internal queue and continues after non-interrupt failures by * logging warnings. */ - readonly start: Effect.Effect; + readonly start: () => Effect.Effect; /** * Resolves when the internal processing queue is empty and idle. diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 7290660cf4..c04d913d52 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -646,7 +646,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< }), ).pipe(Effect.forkIn(subscriptionsScope)); - yield* Scope.provide(orchestrationReactor.start, subscriptionsScope); + yield* Scope.provide(orchestrationReactor.start(), subscriptionsScope); yield* readiness.markOrchestrationSubscriptionsReady; let welcomeBootstrapProjectId: ProjectId | undefined; diff --git a/docs/effect-fn-checklist.md b/docs/effect-fn-checklist.md index d986594849..1addfdf4dd 100644 --- a/docs/effect-fn-checklist.md +++ b/docs/effect-fn-checklist.md @@ -46,7 +46,7 @@ Effect.fn("name")( - [x] `apps/server/src/provider/Layers/CodexAdapter.ts` - [x] `apps/server/src/git/Layers/GitCore.ts` - [x] `apps/server/src/git/Layers/GitManager.ts` -- [ ] `apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` +- [x] `apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` - [x] `apps/server/src/orchestration/Layers/ProjectionPipeline.ts` - [ ] `apps/server/src/orchestration/Layers/OrchestrationEngine.ts` - [ ] `apps/server/src/provider/Layers/EventNdjsonLogger.ts` @@ -115,12 +115,12 @@ Effect.fn("name")( ### `apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` (`14`) -- [ ] [finalizeAssistantMessage](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L680) -- [ ] [upsertProposedPlan](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L722) -- [ ] [finalizeBufferedProposedPlan](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L761) -- [ ] [clearTurnStateForSession](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L800) -- [ ] [processRuntimeEvent](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L908) -- [ ] Nested callback wrappers in this file +- [x] [finalizeAssistantMessage](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L680) +- [x] [upsertProposedPlan](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L722) +- [x] [finalizeBufferedProposedPlan](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L761) +- [x] [clearTurnStateForSession](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L800) +- [x] [processRuntimeEvent](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts#L908) +- [x] Nested callback wrappers in this file ### `apps/server/src/provider/Layers/CodexAdapter.ts` (`12`)