diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 4fcc7eddd1..b9ac4bfc4a 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -20,6 +20,7 @@ import { ProviderItemId, ThreadId, TurnId, + ProviderSendTurnInput, } from "@t3tools/contracts"; import { Effect, FileSystem, Layer, Queue, Schema, ServiceMap, Stream } from "effect"; @@ -1327,38 +1328,40 @@ function mapToRuntimeEvents( return []; } -const makeCodexAdapter = (options?: CodexAdapterLiveOptions) => - Effect.gen(function* () { - const fileSystem = yield* FileSystem.FileSystem; - const serverConfig = yield* Effect.service(ServerConfig); - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined - ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { - stream: "native", - }) - : undefined); - - const manager = yield* Effect.acquireRelease( - Effect.gen(function* () { - if (options?.manager) { - return options.manager; - } - const services = yield* Effect.services(); - return options?.makeManager?.(services) ?? new CodexAppServerManager(services); - }), - (manager) => - Effect.sync(() => { - try { - manager.stopAll(); - } catch { - // Finalizers should never fail and block shutdown. - } - }), - ); - const serverSettingsService = yield* ServerSettingsService; +const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( + options?: CodexAdapterLiveOptions, +) { + const fileSystem = yield* FileSystem.FileSystem; + const serverConfig = yield* Effect.service(ServerConfig); + const nativeEventLogger = + options?.nativeEventLogger ?? + (options?.nativeEventLogPath !== undefined + ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { + stream: "native", + }) + : undefined); - const startSession: CodexAdapterShape["startSession"] = Effect.fn(function* (input) { + const acquireManager = Effect.fn("acquireManager")(function* () { + if (options?.manager) { + return options.manager; + } + const services = yield* Effect.services(); + return options?.makeManager?.(services) ?? new CodexAppServerManager(services); + }); + + const manager = yield* Effect.acquireRelease(acquireManager(), (manager) => + Effect.sync(() => { + try { + manager.stopAll(); + } catch { + // Finalizers should never fail and block shutdown. + } + }), + ); + const serverSettingsService = yield* ServerSettingsService; + + const startSession: CodexAdapterShape["startSession"] = Effect.fn("startSession")( + function* (input) { if (input.provider !== undefined && input.provider !== PROVIDER) { return yield* new ProviderAdapterValidationError({ provider: PROVIDER, @@ -1407,211 +1410,212 @@ const makeCodexAdapter = (options?: CodexAdapterLiveOptions) => cause, }), }); + }, + ); + + const resolveAttachment = Effect.fn("resolveAttachment")(function* ( + input: ProviderSendTurnInput, + attachment: NonNullable[number], + ) { + const attachmentPath = resolveAttachmentPath({ + attachmentsDir: serverConfig.attachmentsDir, + attachment, }); + if (!attachmentPath) { + return yield* toRequestError( + input.threadId, + "turn/start", + new Error(`Invalid attachment id '${attachment.id}'.`), + ); + } + const bytes = yield* fileSystem.readFile(attachmentPath).pipe( + Effect.mapError( + (cause) => + new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "turn/start", + detail: toMessage(cause, "Failed to read attachment file."), + cause, + }), + ), + ); + return { + type: "image" as const, + url: `data:${attachment.mimeType};base64,${Buffer.from(bytes).toString("base64")}`, + }; + }); - const sendTurn: CodexAdapterShape["sendTurn"] = (input) => - Effect.gen(function* () { - const codexAttachments = yield* Effect.forEach( - input.attachments ?? [], - (attachment) => - Effect.gen(function* () { - const attachmentPath = resolveAttachmentPath({ - attachmentsDir: serverConfig.attachmentsDir, - attachment, - }); - if (!attachmentPath) { - return yield* toRequestError( - input.threadId, - "turn/start", - new Error(`Invalid attachment id '${attachment.id}'.`), - ); - } - const bytes = yield* fileSystem.readFile(attachmentPath).pipe( - Effect.mapError( - (cause) => - new ProviderAdapterRequestError({ - provider: PROVIDER, - method: "turn/start", - detail: toMessage(cause, "Failed to read attachment file."), - cause, - }), - ), - ); - return { - type: "image" as const, - url: `data:${attachment.mimeType};base64,${Buffer.from(bytes).toString("base64")}`, - }; - }), - { concurrency: 1 }, - ); + const sendTurn: CodexAdapterShape["sendTurn"] = Effect.fn("sendTurn")(function* (input) { + const codexAttachments = yield* Effect.forEach( + input.attachments ?? [], + (attachment) => resolveAttachment(input, attachment), + { concurrency: 1 }, + ); - return yield* Effect.tryPromise({ - try: () => { - const managerInput = { - threadId: input.threadId, - ...(input.input !== undefined ? { input: input.input } : {}), - ...(input.modelSelection?.provider === "codex" - ? { model: input.modelSelection.model } - : {}), - ...(input.modelSelection?.provider === "codex" && - input.modelSelection.options?.reasoningEffort !== undefined - ? { effort: input.modelSelection.options.reasoningEffort } - : {}), - ...(input.modelSelection?.provider === "codex" && - input.modelSelection.options?.fastMode - ? { serviceTier: "fast" } - : {}), - ...(input.interactionMode !== undefined - ? { interactionMode: input.interactionMode } - : {}), - ...(codexAttachments.length > 0 ? { attachments: codexAttachments } : {}), - }; - return manager.sendTurn(managerInput); - }, - catch: (cause) => toRequestError(input.threadId, "turn/start", cause), - }).pipe( - Effect.map((result) => ({ - ...result, - threadId: input.threadId, - })), - ); - }); + return yield* Effect.tryPromise({ + try: () => { + const managerInput = { + threadId: input.threadId, + ...(input.input !== undefined ? { input: input.input } : {}), + ...(input.modelSelection?.provider === "codex" + ? { model: input.modelSelection.model } + : {}), + ...(input.modelSelection?.provider === "codex" && + input.modelSelection.options?.reasoningEffort !== undefined + ? { effort: input.modelSelection.options.reasoningEffort } + : {}), + ...(input.modelSelection?.provider === "codex" && input.modelSelection.options?.fastMode + ? { serviceTier: "fast" } + : {}), + ...(input.interactionMode !== undefined + ? { interactionMode: input.interactionMode } + : {}), + ...(codexAttachments.length > 0 ? { attachments: codexAttachments } : {}), + }; + return manager.sendTurn(managerInput); + }, + catch: (cause) => toRequestError(input.threadId, "turn/start", cause), + }).pipe( + Effect.map((result) => ({ + ...result, + threadId: input.threadId, + })), + ); + }); - const interruptTurn: CodexAdapterShape["interruptTurn"] = (threadId, turnId) => - Effect.tryPromise({ - try: () => manager.interruptTurn(threadId, turnId), - catch: (cause) => toRequestError(threadId, "turn/interrupt", cause), - }); + const interruptTurn: CodexAdapterShape["interruptTurn"] = (threadId, turnId) => + Effect.tryPromise({ + try: () => manager.interruptTurn(threadId, turnId), + catch: (cause) => toRequestError(threadId, "turn/interrupt", cause), + }); - const readThread: CodexAdapterShape["readThread"] = (threadId) => - Effect.tryPromise({ - try: () => manager.readThread(threadId), - catch: (cause) => toRequestError(threadId, "thread/read", cause), - }).pipe( - Effect.map((snapshot) => ({ - threadId, - turns: snapshot.turns, - })), + const readThread: CodexAdapterShape["readThread"] = (threadId) => + Effect.tryPromise({ + try: () => manager.readThread(threadId), + catch: (cause) => toRequestError(threadId, "thread/read", cause), + }).pipe( + Effect.map((snapshot) => ({ + threadId, + turns: snapshot.turns, + })), + ); + + const rollbackThread: CodexAdapterShape["rollbackThread"] = (threadId, numTurns) => { + if (!Number.isInteger(numTurns) || numTurns < 1) { + return Effect.fail( + new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "rollbackThread", + issue: "numTurns must be an integer >= 1.", + }), ); + } - const rollbackThread: CodexAdapterShape["rollbackThread"] = (threadId, numTurns) => { - if (!Number.isInteger(numTurns) || numTurns < 1) { - return Effect.fail( - new ProviderAdapterValidationError({ - provider: PROVIDER, - operation: "rollbackThread", - issue: "numTurns must be an integer >= 1.", - }), - ); - } + return Effect.tryPromise({ + try: () => manager.rollbackThread(threadId, numTurns), + catch: (cause) => toRequestError(threadId, "thread/rollback", cause), + }).pipe( + Effect.map((snapshot) => ({ + threadId, + turns: snapshot.turns, + })), + ); + }; - return Effect.tryPromise({ - try: () => manager.rollbackThread(threadId, numTurns), - catch: (cause) => toRequestError(threadId, "thread/rollback", cause), - }).pipe( - Effect.map((snapshot) => ({ - threadId, - turns: snapshot.turns, - })), - ); - }; + const respondToRequest: CodexAdapterShape["respondToRequest"] = (threadId, requestId, decision) => + Effect.tryPromise({ + try: () => manager.respondToRequest(threadId, requestId, decision), + catch: (cause) => toRequestError(threadId, "item/requestApproval/decision", cause), + }); - const respondToRequest: CodexAdapterShape["respondToRequest"] = ( - threadId, - requestId, - decision, - ) => - Effect.tryPromise({ - try: () => manager.respondToRequest(threadId, requestId, decision), - catch: (cause) => toRequestError(threadId, "item/requestApproval/decision", cause), - }); + const respondToUserInput: CodexAdapterShape["respondToUserInput"] = ( + threadId, + requestId, + answers, + ) => + Effect.tryPromise({ + try: () => manager.respondToUserInput(threadId, requestId, answers), + catch: (cause) => toRequestError(threadId, "item/tool/requestUserInput", cause), + }); - const respondToUserInput: CodexAdapterShape["respondToUserInput"] = ( - threadId, - requestId, - answers, - ) => - Effect.tryPromise({ - try: () => manager.respondToUserInput(threadId, requestId, answers), - catch: (cause) => toRequestError(threadId, "item/tool/requestUserInput", cause), - }); + const stopSession: CodexAdapterShape["stopSession"] = (threadId) => + Effect.sync(() => { + manager.stopSession(threadId); + }); - const stopSession: CodexAdapterShape["stopSession"] = (threadId) => - Effect.sync(() => { - manager.stopSession(threadId); - }); + const listSessions: CodexAdapterShape["listSessions"] = () => + Effect.sync(() => manager.listSessions()); - const listSessions: CodexAdapterShape["listSessions"] = () => - Effect.sync(() => manager.listSessions()); + const hasSession: CodexAdapterShape["hasSession"] = (threadId) => + Effect.sync(() => manager.hasSession(threadId)); - const hasSession: CodexAdapterShape["hasSession"] = (threadId) => - Effect.sync(() => manager.hasSession(threadId)); + const stopAll: CodexAdapterShape["stopAll"] = () => + Effect.sync(() => { + manager.stopAll(); + }); - const stopAll: CodexAdapterShape["stopAll"] = () => - Effect.sync(() => { - manager.stopAll(); - }); + const runtimeEventQueue = yield* Queue.unbounded(); - const runtimeEventQueue = yield* Queue.unbounded(); - - yield* Effect.acquireRelease( - Effect.gen(function* () { - const writeNativeEvent = (event: ProviderEvent) => - Effect.gen(function* () { - if (!nativeEventLogger) { - return; - } - yield* nativeEventLogger.write(event, event.threadId); - }); - - const services = yield* Effect.services(); - const listener = (event: ProviderEvent) => - Effect.gen(function* () { - yield* writeNativeEvent(event); - const runtimeEvents = mapToRuntimeEvents(event, event.threadId); - if (runtimeEvents.length === 0) { - yield* Effect.logDebug("ignoring unhandled Codex provider event", { - method: event.method, - threadId: event.threadId, - turnId: event.turnId, - itemId: event.itemId, - }); - return; - } - yield* Queue.offerAll(runtimeEventQueue, runtimeEvents); - }).pipe(Effect.runPromiseWith(services)); - manager.on("event", listener); - return listener; - }), - (listener) => - Effect.gen(function* () { - yield* Effect.sync(() => { - manager.off("event", listener); - }); - yield* Queue.shutdown(runtimeEventQueue); - }), - ); + const writeNativeEvent = Effect.fn("writeNativeEvent")(function* (event: ProviderEvent) { + if (!nativeEventLogger) { + return; + } + yield* nativeEventLogger.write(event, event.threadId); + }); - return { - provider: PROVIDER, - capabilities: { - sessionModelSwitch: "in-session", - }, - startSession, - sendTurn, - interruptTurn, - readThread, - rollbackThread, - respondToRequest, - respondToUserInput, - stopSession, - listSessions, - hasSession, - stopAll, - streamEvents: Stream.fromQueue(runtimeEventQueue), - } satisfies CodexAdapterShape; + const registerListener = Effect.fn("registerListener")(function* () { + const services = yield* Effect.services(); + const listenerEffect = Effect.fn("listener")(function* (event: ProviderEvent) { + yield* writeNativeEvent(event); + const runtimeEvents = mapToRuntimeEvents(event, event.threadId); + if (runtimeEvents.length === 0) { + yield* Effect.logDebug("ignoring unhandled Codex provider event", { + method: event.method, + threadId: event.threadId, + turnId: event.turnId, + itemId: event.itemId, + }); + return; + } + yield* Queue.offerAll(runtimeEventQueue, runtimeEvents); + }); + const listener = (event: ProviderEvent) => + listenerEffect(event).pipe(Effect.runPromiseWith(services)); + manager.on("event", listener); + return listener; + }); + + const unregisterListener = Effect.fn("unregisterListener")(function* ( + listener: (event: ProviderEvent) => Promise, + ) { + yield* Effect.sync(() => { + manager.off("event", listener); + }); + yield* Queue.shutdown(runtimeEventQueue); }); + yield* Effect.acquireRelease(registerListener(), unregisterListener); + + return { + provider: PROVIDER, + capabilities: { + sessionModelSwitch: "in-session", + }, + startSession, + sendTurn, + interruptTurn, + readThread, + rollbackThread, + respondToRequest, + respondToUserInput, + stopSession, + listSessions, + hasSession, + stopAll, + streamEvents: Stream.fromQueue(runtimeEventQueue), + } satisfies CodexAdapterShape; +}); + export const CodexAdapterLive = Layer.effect(CodexAdapter, makeCodexAdapter()); export function makeCodexAdapterLive(options?: CodexAdapterLiveOptions) {