diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 0137152e83..f348829598 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -131,472 +131,470 @@ function readPersistedCwd( return trimmed.length > 0 ? trimmed : undefined; } -const makeProviderService = (options?: ProviderServiceLiveOptions) => - Effect.gen(function* () { - const analytics = yield* Effect.service(AnalyticsService); - const serverSettings = yield* ServerSettingsService; - const canonicalEventLogger = - options?.canonicalEventLogger ?? - (options?.canonicalEventLogPath !== undefined - ? yield* makeEventNdjsonLogger(options.canonicalEventLogPath, { - stream: "canonical", - }) - : undefined); - - const registry = yield* ProviderAdapterRegistry; - const directory = yield* ProviderSessionDirectory; - const runtimeEventQueue = yield* Queue.unbounded(); - const runtimeEventPubSub = yield* PubSub.unbounded(); - - const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => - Effect.succeed(event).pipe( - Effect.tap((canonicalEvent) => - canonicalEventLogger ? canonicalEventLogger.write(canonicalEvent, null) : Effect.void, - ), - Effect.flatMap((canonicalEvent) => PubSub.publish(runtimeEventPubSub, canonicalEvent)), - Effect.asVoid, - ); - - const upsertSessionBinding = ( - session: ProviderSession, - threadId: ThreadId, - extra?: { - readonly modelSelection?: unknown; - readonly lastRuntimeEvent?: string; - readonly lastRuntimeEventAt?: string; - }, - ) => - directory.upsert({ - threadId, - provider: session.provider, - runtimeMode: session.runtimeMode, - status: toRuntimeStatus(session), - ...(session.resumeCursor !== undefined ? { resumeCursor: session.resumeCursor } : {}), - runtimePayload: toRuntimePayloadFromSession(session, extra), - }); - - const providers = yield* registry.listProviders(); - const adapters = yield* Effect.forEach(providers, (provider) => - registry.getByProvider(provider), - ); - const processRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => - publishRuntimeEvent(event); - - const worker = Effect.forever( - Queue.take(runtimeEventQueue).pipe(Effect.flatMap(processRuntimeEvent)), +const makeProviderService = Effect.fn("makeProviderService")(function* ( + options?: ProviderServiceLiveOptions, +) { + const analytics = yield* Effect.service(AnalyticsService); + const serverSettings = yield* ServerSettingsService; + const canonicalEventLogger = + options?.canonicalEventLogger ?? + (options?.canonicalEventLogPath !== undefined + ? yield* makeEventNdjsonLogger(options.canonicalEventLogPath, { + stream: "canonical", + }) + : undefined); + + const registry = yield* ProviderAdapterRegistry; + const directory = yield* ProviderSessionDirectory; + const runtimeEventQueue = yield* Queue.unbounded(); + const runtimeEventPubSub = yield* PubSub.unbounded(); + + const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => + Effect.succeed(event).pipe( + Effect.tap((canonicalEvent) => + canonicalEventLogger ? canonicalEventLogger.write(canonicalEvent, null) : Effect.void, + ), + Effect.flatMap((canonicalEvent) => PubSub.publish(runtimeEventPubSub, canonicalEvent)), + Effect.asVoid, ); - yield* Effect.forkScoped(worker); - - yield* Effect.forEach(adapters, (adapter) => - Stream.runForEach(adapter.streamEvents, (event) => - Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid), - ).pipe(Effect.forkScoped), - ).pipe(Effect.asVoid); - - const recoverSessionForThread = (input: { - readonly binding: ProviderRuntimeBinding; - readonly operation: string; - }) => - Effect.gen(function* () { - const adapter = yield* registry.getByProvider(input.binding.provider); - const hasResumeCursor = - input.binding.resumeCursor !== null && input.binding.resumeCursor !== undefined; - const hasActiveSession = yield* adapter.hasSession(input.binding.threadId); - if (hasActiveSession) { - const activeSessions = yield* adapter.listSessions(); - const existing = activeSessions.find( - (session) => session.threadId === input.binding.threadId, - ); - if (existing) { - yield* upsertSessionBinding(existing, input.binding.threadId); - yield* analytics.record("provider.session.recovered", { - provider: existing.provider, - strategy: "adopt-existing", - hasResumeCursor: existing.resumeCursor !== undefined, - }); - return { adapter, session: existing } as const; - } - } - if (!hasResumeCursor) { - return yield* toValidationError( - input.operation, - `Cannot recover thread '${input.binding.threadId}' because no provider resume state is persisted.`, - ); - } - - const persistedCwd = readPersistedCwd(input.binding.runtimePayload); - const persistedModelSelection = readPersistedModelSelection(input.binding.runtimePayload); - - const resumed = yield* adapter.startSession({ - threadId: input.binding.threadId, - provider: input.binding.provider, - ...(persistedCwd ? { cwd: persistedCwd } : {}), - ...(persistedModelSelection ? { modelSelection: persistedModelSelection } : {}), - ...(hasResumeCursor ? { resumeCursor: input.binding.resumeCursor } : {}), - runtimeMode: input.binding.runtimeMode ?? "full-access", - }); - if (resumed.provider !== adapter.provider) { - return yield* toValidationError( - input.operation, - `Adapter/provider mismatch while recovering thread '${input.binding.threadId}'. Expected '${adapter.provider}', received '${resumed.provider}'.`, - ); - } - - yield* upsertSessionBinding(resumed, input.binding.threadId); + const upsertSessionBinding = ( + session: ProviderSession, + threadId: ThreadId, + extra?: { + readonly modelSelection?: unknown; + readonly lastRuntimeEvent?: string; + readonly lastRuntimeEventAt?: string; + }, + ) => + directory.upsert({ + threadId, + provider: session.provider, + runtimeMode: session.runtimeMode, + status: toRuntimeStatus(session), + ...(session.resumeCursor !== undefined ? { resumeCursor: session.resumeCursor } : {}), + runtimePayload: toRuntimePayloadFromSession(session, extra), + }); + + const providers = yield* registry.listProviders(); + const adapters = yield* Effect.forEach(providers, (provider) => registry.getByProvider(provider)); + const processRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => + publishRuntimeEvent(event); + + const worker = Effect.forever( + Queue.take(runtimeEventQueue).pipe(Effect.flatMap(processRuntimeEvent)), + ); + yield* Effect.forkScoped(worker); + + yield* Effect.forEach(adapters, (adapter) => + Stream.runForEach(adapter.streamEvents, (event) => + Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid), + ).pipe(Effect.forkScoped), + ).pipe(Effect.asVoid); + + const recoverSessionForThread = Effect.fn("recoverSessionForThread")(function* (input: { + readonly binding: ProviderRuntimeBinding; + readonly operation: string; + }) { + const adapter = yield* registry.getByProvider(input.binding.provider); + const hasResumeCursor = + input.binding.resumeCursor !== null && input.binding.resumeCursor !== undefined; + const hasActiveSession = yield* adapter.hasSession(input.binding.threadId); + if (hasActiveSession) { + const activeSessions = yield* adapter.listSessions(); + const existing = activeSessions.find( + (session) => session.threadId === input.binding.threadId, + ); + if (existing) { + yield* upsertSessionBinding(existing, input.binding.threadId); yield* analytics.record("provider.session.recovered", { - provider: resumed.provider, - strategy: "resume-thread", - hasResumeCursor: resumed.resumeCursor !== undefined, + provider: existing.provider, + strategy: "adopt-existing", + hasResumeCursor: existing.resumeCursor !== undefined, }); - return { adapter, session: resumed } as const; - }); + return { adapter, session: existing } as const; + } + } + + if (!hasResumeCursor) { + return yield* toValidationError( + input.operation, + `Cannot recover thread '${input.binding.threadId}' because no provider resume state is persisted.`, + ); + } + + const persistedCwd = readPersistedCwd(input.binding.runtimePayload); + const persistedModelSelection = readPersistedModelSelection(input.binding.runtimePayload); + + const resumed = yield* adapter.startSession({ + threadId: input.binding.threadId, + provider: input.binding.provider, + ...(persistedCwd ? { cwd: persistedCwd } : {}), + ...(persistedModelSelection ? { modelSelection: persistedModelSelection } : {}), + ...(hasResumeCursor ? { resumeCursor: input.binding.resumeCursor } : {}), + runtimeMode: input.binding.runtimeMode ?? "full-access", + }); + if (resumed.provider !== adapter.provider) { + return yield* toValidationError( + input.operation, + `Adapter/provider mismatch while recovering thread '${input.binding.threadId}'. Expected '${adapter.provider}', received '${resumed.provider}'.`, + ); + } + + yield* upsertSessionBinding(resumed, input.binding.threadId); + yield* analytics.record("provider.session.recovered", { + provider: resumed.provider, + strategy: "resume-thread", + hasResumeCursor: resumed.resumeCursor !== undefined, + }); + return { adapter, session: resumed } as const; + }); - const resolveRoutableSession = (input: { - readonly threadId: ThreadId; - readonly operation: string; - readonly allowRecovery: boolean; - }) => - Effect.gen(function* () { - const bindingOption = yield* directory.getBinding(input.threadId); - const binding = Option.getOrUndefined(bindingOption); - if (!binding) { - return yield* toValidationError( - input.operation, - `Cannot route thread '${input.threadId}' because no persisted provider binding exists.`, - ); - } - const adapter = yield* registry.getByProvider(binding.provider); + const resolveRoutableSession = Effect.fn("resolveRoutableSession")(function* (input: { + readonly threadId: ThreadId; + readonly operation: string; + readonly allowRecovery: boolean; + }) { + const bindingOption = yield* directory.getBinding(input.threadId); + const binding = Option.getOrUndefined(bindingOption); + if (!binding) { + return yield* toValidationError( + input.operation, + `Cannot route thread '${input.threadId}' because no persisted provider binding exists.`, + ); + } + const adapter = yield* registry.getByProvider(binding.provider); - const hasRequestedSession = yield* adapter.hasSession(input.threadId); - if (hasRequestedSession) { - return { adapter, threadId: input.threadId, isActive: true } as const; - } + const hasRequestedSession = yield* adapter.hasSession(input.threadId); + if (hasRequestedSession) { + return { adapter, threadId: input.threadId, isActive: true } as const; + } - if (!input.allowRecovery) { - return { adapter, threadId: input.threadId, isActive: false } as const; - } + if (!input.allowRecovery) { + return { adapter, threadId: input.threadId, isActive: false } as const; + } - const recovered = yield* recoverSessionForThread({ binding, operation: input.operation }); - return { adapter: recovered.adapter, threadId: input.threadId, isActive: true } as const; - }); + const recovered = yield* recoverSessionForThread({ binding, operation: input.operation }); + return { adapter: recovered.adapter, threadId: input.threadId, isActive: true } as const; + }); - const startSession: ProviderServiceShape["startSession"] = (threadId, rawInput) => - Effect.gen(function* () { - const parsed = yield* decodeInputOrValidationError({ - operation: "ProviderService.startSession", - schema: ProviderSessionStartInput, - payload: rawInput, - }); + const startSession: ProviderServiceShape["startSession"] = Effect.fn("startSession")( + function* (threadId, rawInput) { + const parsed = yield* decodeInputOrValidationError({ + operation: "ProviderService.startSession", + schema: ProviderSessionStartInput, + payload: rawInput, + }); - const input = { - ...parsed, - threadId, - provider: parsed.provider ?? "codex", - }; - const settings = yield* serverSettings.getSettings.pipe( - Effect.mapError((error) => - toValidationError( - "ProviderService.startSession", - `Failed to load provider settings: ${error.message}`, - error, - ), + const input = { + ...parsed, + threadId, + provider: parsed.provider ?? "codex", + }; + const settings = yield* serverSettings.getSettings.pipe( + Effect.mapError((error) => + toValidationError( + "ProviderService.startSession", + `Failed to load provider settings: ${error.message}`, + error, ), + ), + ); + if (!settings.providers[input.provider].enabled) { + return yield* toValidationError( + "ProviderService.startSession", + `Provider '${input.provider}' is disabled in T3 Code settings.`, ); - if (!settings.providers[input.provider].enabled) { - return yield* toValidationError( - "ProviderService.startSession", - `Provider '${input.provider}' is disabled in T3 Code settings.`, - ); - } - const persistedBinding = Option.getOrUndefined(yield* directory.getBinding(threadId)); - const effectiveResumeCursor = - input.resumeCursor ?? - (persistedBinding?.provider === input.provider - ? persistedBinding.resumeCursor - : undefined); - const adapter = yield* registry.getByProvider(input.provider); - const session = yield* adapter.startSession({ - ...input, - ...(effectiveResumeCursor !== undefined ? { resumeCursor: effectiveResumeCursor } : {}), - }); - - if (session.provider !== adapter.provider) { - return yield* toValidationError( - "ProviderService.startSession", - `Adapter/provider mismatch: requested '${adapter.provider}', received '${session.provider}'.`, - ); - } + } + const persistedBinding = Option.getOrUndefined(yield* directory.getBinding(threadId)); + const effectiveResumeCursor = + input.resumeCursor ?? + (persistedBinding?.provider === input.provider ? persistedBinding.resumeCursor : undefined); + const adapter = yield* registry.getByProvider(input.provider); + const session = yield* adapter.startSession({ + ...input, + ...(effectiveResumeCursor !== undefined ? { resumeCursor: effectiveResumeCursor } : {}), + }); - yield* upsertSessionBinding(session, threadId, { - modelSelection: input.modelSelection, - }); - yield* analytics.record("provider.session.started", { - provider: session.provider, - runtimeMode: input.runtimeMode, - hasResumeCursor: session.resumeCursor !== undefined, - hasCwd: typeof input.cwd === "string" && input.cwd.trim().length > 0, - hasModel: - typeof input.modelSelection?.model === "string" && - input.modelSelection.model.trim().length > 0, - }); + if (session.provider !== adapter.provider) { + return yield* toValidationError( + "ProviderService.startSession", + `Adapter/provider mismatch: requested '${adapter.provider}', received '${session.provider}'.`, + ); + } - return session; + yield* upsertSessionBinding(session, threadId, { + modelSelection: input.modelSelection, + }); + yield* analytics.record("provider.session.started", { + provider: session.provider, + runtimeMode: input.runtimeMode, + hasResumeCursor: session.resumeCursor !== undefined, + hasCwd: typeof input.cwd === "string" && input.cwd.trim().length > 0, + hasModel: + typeof input.modelSelection?.model === "string" && + input.modelSelection.model.trim().length > 0, }); - const sendTurn: ProviderServiceShape["sendTurn"] = (rawInput) => - Effect.gen(function* () { - const parsed = yield* decodeInputOrValidationError({ - operation: "ProviderService.sendTurn", - schema: ProviderSendTurnInput, - payload: rawInput, - }); + return session; + }, + ); - const input = { - ...parsed, - attachments: parsed.attachments ?? [], - }; - if (!input.input && input.attachments.length === 0) { - return yield* toValidationError( - "ProviderService.sendTurn", - "Either input text or at least one attachment is required", - ); - } - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.sendTurn", - allowRecovery: true, - }); - const turn = yield* routed.adapter.sendTurn(input); - yield* directory.upsert({ - threadId: input.threadId, - provider: routed.adapter.provider, - status: "running", - ...(turn.resumeCursor !== undefined ? { resumeCursor: turn.resumeCursor } : {}), - runtimePayload: { - ...(input.modelSelection !== undefined ? { modelSelection: input.modelSelection } : {}), - activeTurnId: turn.turnId, - lastRuntimeEvent: "provider.sendTurn", - lastRuntimeEventAt: new Date().toISOString(), - }, - }); - yield* analytics.record("provider.turn.sent", { - provider: routed.adapter.provider, - model: input.modelSelection?.model, - interactionMode: input.interactionMode, - attachmentCount: input.attachments.length, - hasInput: typeof input.input === "string" && input.input.trim().length > 0, - }); - return turn; - }); + const sendTurn: ProviderServiceShape["sendTurn"] = Effect.fn("sendTurn")(function* (rawInput) { + const parsed = yield* decodeInputOrValidationError({ + operation: "ProviderService.sendTurn", + schema: ProviderSendTurnInput, + payload: rawInput, + }); + + const input = { + ...parsed, + attachments: parsed.attachments ?? [], + }; + if (!input.input && input.attachments.length === 0) { + return yield* toValidationError( + "ProviderService.sendTurn", + "Either input text or at least one attachment is required", + ); + } + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.sendTurn", + allowRecovery: true, + }); + const turn = yield* routed.adapter.sendTurn(input); + yield* directory.upsert({ + threadId: input.threadId, + provider: routed.adapter.provider, + status: "running", + ...(turn.resumeCursor !== undefined ? { resumeCursor: turn.resumeCursor } : {}), + runtimePayload: { + ...(input.modelSelection !== undefined ? { modelSelection: input.modelSelection } : {}), + activeTurnId: turn.turnId, + lastRuntimeEvent: "provider.sendTurn", + lastRuntimeEventAt: new Date().toISOString(), + }, + }); + yield* analytics.record("provider.turn.sent", { + provider: routed.adapter.provider, + model: input.modelSelection?.model, + interactionMode: input.interactionMode, + attachmentCount: input.attachments.length, + hasInput: typeof input.input === "string" && input.input.trim().length > 0, + }); + return turn; + }); - const interruptTurn: ProviderServiceShape["interruptTurn"] = (rawInput) => - Effect.gen(function* () { - const input = yield* decodeInputOrValidationError({ - operation: "ProviderService.interruptTurn", - schema: ProviderInterruptTurnInput, - payload: rawInput, - }); - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.interruptTurn", - allowRecovery: true, - }); - yield* routed.adapter.interruptTurn(routed.threadId, input.turnId); - yield* analytics.record("provider.turn.interrupted", { - provider: routed.adapter.provider, - }); + const interruptTurn: ProviderServiceShape["interruptTurn"] = Effect.fn("interruptTurn")( + function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.interruptTurn", + schema: ProviderInterruptTurnInput, + payload: rawInput, }); - - const respondToRequest: ProviderServiceShape["respondToRequest"] = (rawInput) => - Effect.gen(function* () { - const input = yield* decodeInputOrValidationError({ - operation: "ProviderService.respondToRequest", - schema: ProviderRespondToRequestInput, - payload: rawInput, - }); - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.respondToRequest", - allowRecovery: true, - }); - yield* routed.adapter.respondToRequest(routed.threadId, input.requestId, input.decision); - yield* analytics.record("provider.request.responded", { - provider: routed.adapter.provider, - decision: input.decision, - }); + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.interruptTurn", + allowRecovery: true, + }); + yield* routed.adapter.interruptTurn(routed.threadId, input.turnId); + yield* analytics.record("provider.turn.interrupted", { + provider: routed.adapter.provider, }); + }, + ); - const respondToUserInput: ProviderServiceShape["respondToUserInput"] = (rawInput) => - Effect.gen(function* () { - const input = yield* decodeInputOrValidationError({ - operation: "ProviderService.respondToUserInput", - schema: ProviderRespondToUserInputInput, - payload: rawInput, - }); - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.respondToUserInput", - allowRecovery: true, - }); - yield* routed.adapter.respondToUserInput(routed.threadId, input.requestId, input.answers); + const respondToRequest: ProviderServiceShape["respondToRequest"] = Effect.fn("respondToRequest")( + function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.respondToRequest", + schema: ProviderRespondToRequestInput, + payload: rawInput, + }); + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.respondToRequest", + allowRecovery: true, + }); + yield* routed.adapter.respondToRequest(routed.threadId, input.requestId, input.decision); + yield* analytics.record("provider.request.responded", { + provider: routed.adapter.provider, + decision: input.decision, }); + }, + ); - const stopSession: ProviderServiceShape["stopSession"] = (rawInput) => - Effect.gen(function* () { - const input = yield* decodeInputOrValidationError({ - operation: "ProviderService.stopSession", - schema: ProviderStopSessionInput, - payload: rawInput, - }); - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.stopSession", - allowRecovery: false, - }); - if (routed.isActive) { - yield* routed.adapter.stopSession(routed.threadId); - } - yield* directory.remove(input.threadId); - yield* analytics.record("provider.session.stopped", { - provider: routed.adapter.provider, - }); + const respondToUserInput: ProviderServiceShape["respondToUserInput"] = Effect.fn( + "respondToUserInput", + )(function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.respondToUserInput", + schema: ProviderRespondToUserInputInput, + payload: rawInput, + }); + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.respondToUserInput", + allowRecovery: true, + }); + yield* routed.adapter.respondToUserInput(routed.threadId, input.requestId, input.answers); + }); + + const stopSession: ProviderServiceShape["stopSession"] = Effect.fn("stopSession")( + function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.stopSession", + schema: ProviderStopSessionInput, + payload: rawInput, + }); + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.stopSession", + allowRecovery: false, }); + if (routed.isActive) { + yield* routed.adapter.stopSession(routed.threadId); + } + yield* directory.remove(input.threadId); + yield* analytics.record("provider.session.stopped", { + provider: routed.adapter.provider, + }); + }, + ); - const listSessions: ProviderServiceShape["listSessions"] = () => - Effect.gen(function* () { - const sessionsByProvider = yield* Effect.forEach(adapters, (adapter) => - adapter.listSessions(), - ); - const activeSessions = sessionsByProvider.flatMap((sessions) => sessions); - const persistedBindings = yield* directory.listThreadIds().pipe( - Effect.flatMap((threadIds) => - Effect.forEach( - threadIds, - (threadId) => - directory - .getBinding(threadId) - .pipe(Effect.orElseSucceed(() => Option.none())), - { concurrency: "unbounded" }, - ), + const listSessions: ProviderServiceShape["listSessions"] = Effect.fn("listSessions")( + function* () { + const sessionsByProvider = yield* Effect.forEach(adapters, (adapter) => + adapter.listSessions(), + ); + const activeSessions = sessionsByProvider.flatMap((sessions) => sessions); + const persistedBindings = yield* directory.listThreadIds().pipe( + Effect.flatMap((threadIds) => + Effect.forEach( + threadIds, + (threadId) => + directory + .getBinding(threadId) + .pipe(Effect.orElseSucceed(() => Option.none())), + { concurrency: "unbounded" }, ), - Effect.orElseSucceed(() => [] as Array>), - ); - const bindingsByThreadId = new Map(); - for (const bindingOption of persistedBindings) { - const binding = Option.getOrUndefined(bindingOption); - if (binding) { - bindingsByThreadId.set(binding.threadId, binding); - } + ), + Effect.orElseSucceed(() => [] as Array>), + ); + const bindingsByThreadId = new Map(); + for (const bindingOption of persistedBindings) { + const binding = Option.getOrUndefined(bindingOption); + if (binding) { + bindingsByThreadId.set(binding.threadId, binding); } + } - return activeSessions.map((session) => { - const binding = bindingsByThreadId.get(session.threadId); - if (!binding) { - return session; - } - - const overrides: { - resumeCursor?: ProviderSession["resumeCursor"]; - runtimeMode?: ProviderSession["runtimeMode"]; - } = {}; - if (session.resumeCursor === undefined && binding.resumeCursor !== undefined) { - overrides.resumeCursor = binding.resumeCursor; - } - if (binding.runtimeMode !== undefined) { - overrides.runtimeMode = binding.runtimeMode; - } - return Object.assign({}, session, overrides); - }); - }); - - const getCapabilities: ProviderServiceShape["getCapabilities"] = (provider) => - registry.getByProvider(provider).pipe(Effect.map((adapter) => adapter.capabilities)); + return activeSessions.map((session) => { + const binding = bindingsByThreadId.get(session.threadId); + if (!binding) { + return session; + } - const rollbackConversation: ProviderServiceShape["rollbackConversation"] = (rawInput) => - Effect.gen(function* () { - const input = yield* decodeInputOrValidationError({ - operation: "ProviderService.rollbackConversation", - schema: ProviderRollbackConversationInput, - payload: rawInput, - }); - if (input.numTurns === 0) { - return; + const overrides: { + resumeCursor?: ProviderSession["resumeCursor"]; + runtimeMode?: ProviderSession["runtimeMode"]; + } = {}; + if (session.resumeCursor === undefined && binding.resumeCursor !== undefined) { + overrides.resumeCursor = binding.resumeCursor; } - const routed = yield* resolveRoutableSession({ - threadId: input.threadId, - operation: "ProviderService.rollbackConversation", - allowRecovery: true, - }); - yield* routed.adapter.rollbackThread(routed.threadId, input.numTurns); - yield* analytics.record("provider.conversation.rolled_back", { - provider: routed.adapter.provider, - turns: input.numTurns, - }); + if (binding.runtimeMode !== undefined) { + overrides.runtimeMode = binding.runtimeMode; + } + return Object.assign({}, session, overrides); }); + }, + ); - const runStopAll = () => - Effect.gen(function* () { - const threadIds = yield* directory.listThreadIds(); - const activeSessions = yield* Effect.forEach(adapters, (adapter) => - adapter.listSessions(), - ).pipe( - Effect.map((sessionsByAdapter) => sessionsByAdapter.flatMap((sessions) => sessions)), - ); - yield* Effect.forEach(activeSessions, (session) => - upsertSessionBinding(session, session.threadId, { - lastRuntimeEvent: "provider.stopAll", - lastRuntimeEventAt: new Date().toISOString(), - }), - ).pipe(Effect.asVoid); - yield* Effect.forEach(adapters, (adapter) => adapter.stopAll()).pipe(Effect.asVoid); - yield* Effect.forEach(threadIds, (threadId) => - directory.getProvider(threadId).pipe( - Effect.flatMap((provider) => - directory.upsert({ - threadId, - provider, - status: "stopped", - runtimePayload: { - activeTurnId: null, - lastRuntimeEvent: "provider.stopAll", - lastRuntimeEventAt: new Date().toISOString(), - }, - }), - ), - ), - ).pipe(Effect.asVoid); - yield* analytics.record("provider.sessions.stopped_all", { - sessionCount: threadIds.length, - }); - yield* analytics.flush; - }); + const getCapabilities: ProviderServiceShape["getCapabilities"] = (provider) => + registry.getByProvider(provider).pipe(Effect.map((adapter) => adapter.capabilities)); + + const rollbackConversation: ProviderServiceShape["rollbackConversation"] = Effect.fn( + "rollbackConversation", + )(function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.rollbackConversation", + schema: ProviderRollbackConversationInput, + payload: rawInput, + }); + if (input.numTurns === 0) { + return; + } + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.rollbackConversation", + allowRecovery: true, + }); + yield* routed.adapter.rollbackThread(routed.threadId, input.numTurns); + yield* analytics.record("provider.conversation.rolled_back", { + provider: routed.adapter.provider, + turns: input.numTurns, + }); + }); - yield* Effect.addFinalizer(() => - Effect.catch(runStopAll(), (cause) => - Effect.logWarning("failed to stop provider service", { cause }), + const runStopAll = Effect.fn("runStopAll")(function* () { + const threadIds = yield* directory.listThreadIds(); + const activeSessions = yield* Effect.forEach(adapters, (adapter) => + adapter.listSessions(), + ).pipe(Effect.map((sessionsByAdapter) => sessionsByAdapter.flatMap((sessions) => sessions))); + yield* Effect.forEach(activeSessions, (session) => + upsertSessionBinding(session, session.threadId, { + lastRuntimeEvent: "provider.stopAll", + lastRuntimeEventAt: new Date().toISOString(), + }), + ).pipe(Effect.asVoid); + yield* Effect.forEach(adapters, (adapter) => adapter.stopAll()).pipe(Effect.asVoid); + yield* Effect.forEach(threadIds, (threadId) => + directory.getProvider(threadId).pipe( + Effect.flatMap((provider) => + directory.upsert({ + threadId, + provider, + status: "stopped", + runtimePayload: { + activeTurnId: null, + lastRuntimeEvent: "provider.stopAll", + lastRuntimeEventAt: new Date().toISOString(), + }, + }), + ), ), - ); - - return { - startSession, - sendTurn, - interruptTurn, - respondToRequest, - respondToUserInput, - stopSession, - listSessions, - getCapabilities, - rollbackConversation, - // Each access creates a fresh PubSub subscription so that multiple - // consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each - // independently receive all runtime events. - get streamEvents(): ProviderServiceShape["streamEvents"] { - return Stream.fromPubSub(runtimeEventPubSub); - }, - } satisfies ProviderServiceShape; + ).pipe(Effect.asVoid); + yield* analytics.record("provider.sessions.stopped_all", { + sessionCount: threadIds.length, + }); + yield* analytics.flush; }); + yield* Effect.addFinalizer(() => + Effect.catch(runStopAll(), (cause) => + Effect.logWarning("failed to stop provider service", { cause }), + ), + ); + + return { + startSession, + sendTurn, + interruptTurn, + respondToRequest, + respondToUserInput, + stopSession, + listSessions, + getCapabilities, + rollbackConversation, + // Each access creates a fresh PubSub subscription so that multiple + // consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each + // independently receive all runtime events. + get streamEvents(): ProviderServiceShape["streamEvents"] { + return Stream.fromPubSub(runtimeEventPubSub); + }, + } satisfies ProviderServiceShape; +}); + export const ProviderServiceLive = Layer.effect(ProviderService, makeProviderService()); export function makeProviderServiceLive(options?: ProviderServiceLiveOptions) {