diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 28b7ecf611..a4fd6f235d 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -74,184 +74,180 @@ function resolveStreamLabel(stream: EventNdjsonStream): string { } } -function toLogMessage(event: unknown): Effect.Effect { - return Effect.gen(function* () { - const serialized = yield* Effect.sync(() => { - try { - return { ok: true as const, value: JSON.stringify(event) }; - } catch (error) { - return { ok: false as const, error }; - } - }); - - if (!serialized.ok) { - yield* logWarning("failed to serialize provider event log record", { - error: serialized.error, - }); - return undefined; +const toLogMessage = Effect.fn("toLogMessage")(function* ( + event: unknown, +): Effect.fn.Return { + const serialized = yield* Effect.sync(() => { + try { + return { ok: true as const, value: JSON.stringify(event) }; + } catch (error) { + return { ok: false as const, error }; } + }); - if (typeof serialized.value !== "string") { - return undefined; - } + if (!serialized.ok) { + yield* logWarning("failed to serialize provider event log record", { + error: serialized.error, + }); + return undefined; + } - return serialized.value; - }); -} + if (typeof serialized.value !== "string") { + return undefined; + } + + return serialized.value; +}); -function makeThreadWriter(input: { +const makeThreadWriter = Effect.fn("makeThreadWriter")(function* (input: { readonly filePath: string; readonly maxBytes: number; readonly maxFiles: number; readonly batchWindowMs: number; readonly streamLabel: string; -}): Effect.Effect { - return Effect.gen(function* () { - const sinkResult = yield* Effect.sync(() => { - try { - return { - ok: true as const, - sink: new RotatingFileSink({ - filePath: input.filePath, - maxBytes: input.maxBytes, - maxFiles: input.maxFiles, - throwOnError: true, - }), - }; - } catch (error) { - return { ok: false as const, error }; - } - }); - - if (!sinkResult.ok) { - yield* logWarning("failed to initialize provider thread log file", { - filePath: input.filePath, - error: sinkResult.error, - }); - return undefined; +}): Effect.fn.Return { + const sinkResult = yield* Effect.sync(() => { + try { + return { + ok: true as const, + sink: new RotatingFileSink({ + filePath: input.filePath, + maxBytes: input.maxBytes, + maxFiles: input.maxFiles, + throwOnError: true, + }), + }; + } catch (error) { + return { ok: false as const, error }; } + }); + + if (!sinkResult.ok) { + yield* logWarning("failed to initialize provider thread log file", { + filePath: input.filePath, + error: sinkResult.error, + }); + return undefined; + } - const sink = sinkResult.sink; - const scope = yield* Scope.make(); - const lineLogger = makeLineLogger(input.streamLabel); - const batchedLogger = yield* Logger.batched(lineLogger, { - window: input.batchWindowMs, - flush: (messages) => - Effect.gen(function* () { - const flushResult = yield* Effect.sync(() => { - try { - for (const message of messages) { - sink.write(message); - } - return { ok: true as const }; - } catch (error) { - return { ok: false as const, error }; - } - }); - - if (!flushResult.ok) { - yield* logWarning("provider event log batch flush failed", { - filePath: input.filePath, - error: flushResult.error, - }); + const sink = sinkResult.sink; + const scope = yield* Scope.make(); + const lineLogger = makeLineLogger(input.streamLabel); + const batchedLogger = yield* Logger.batched(lineLogger, { + window: input.batchWindowMs, + flush: Effect.fn("makeThreadWriter.flush")(function* (messages) { + const flushResult = yield* Effect.sync(() => { + try { + for (const message of messages) { + sink.write(message); } - }), - }).pipe(Effect.provideService(Scope.Scope, scope)); - - const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false }); - - return { - writeMessage(message: string) { - return Effect.log(message).pipe(Effect.provide(loggerLayer)); - }, - close() { - return Scope.close(scope, Exit.void); - }, - } satisfies ThreadWriter; - }); -} + return { ok: true as const }; + } catch (error) { + return { ok: false as const, error }; + } + }); -export function makeEventNdjsonLogger( + if (!flushResult.ok) { + yield* logWarning("provider event log batch flush failed", { + filePath: input.filePath, + error: flushResult.error, + }); + } + }), + }).pipe(Effect.provideService(Scope.Scope, scope)); + + const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false }); + + return { + writeMessage(message: string) { + return Effect.log(message).pipe(Effect.provide(loggerLayer)); + }, + close() { + return Scope.close(scope, Exit.void); + }, + } satisfies ThreadWriter; +}); + +export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function* ( filePath: string, options: EventNdjsonLoggerOptions, -): Effect.Effect { - return Effect.gen(function* () { - const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; - const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; - const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; - const streamLabel = resolveStreamLabel(options.stream); - - const directoryReady = yield* Effect.sync(() => { - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true }); - return true; - } catch (error) { - return { ok: false as const, error }; - } +): Effect.fn.Return { + const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; + const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; + const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; + const streamLabel = resolveStreamLabel(options.stream); + + const directoryReady = yield* Effect.sync(() => { + try { + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + return true; + } catch (error) { + return { ok: false as const, error }; + } + }); + if (directoryReady !== true) { + yield* logWarning("failed to create provider event log directory", { + filePath, + error: directoryReady.error, }); - if (directoryReady !== true) { - yield* logWarning("failed to create provider event log directory", { - filePath, - error: directoryReady.error, - }); + return undefined; + } + + const threadWriters = new Map(); + const failedSegments = new Set(); + + const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( + threadSegment: string, + ): Effect.fn.Return { + if (failedSegments.has(threadSegment)) { return undefined; } + const existing = threadWriters.get(threadSegment); + if (existing) { + return existing; + } - const threadWriters = new Map(); - const failedSegments = new Set(); - - const resolveThreadWriter = (threadSegment: string): Effect.Effect => - Effect.gen(function* () { - if (failedSegments.has(threadSegment)) { - return undefined; - } - const existing = threadWriters.get(threadSegment); - if (existing) { - return existing; - } + const writer = yield* makeThreadWriter({ + filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), + maxBytes, + maxFiles, + batchWindowMs, + streamLabel, + }); + if (!writer) { + failedSegments.add(threadSegment); + return undefined; + } - const writer = yield* makeThreadWriter({ - filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }); - if (!writer) { - failedSegments.add(threadSegment); - return undefined; - } + threadWriters.set(threadSegment, writer); + return writer; + }); - threadWriters.set(threadSegment, writer); - return writer; - }); + const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { + const threadSegment = resolveThreadSegment(threadId); + const message = yield* toLogMessage(event); + if (!message) { + return; + } - return { - filePath, - write(event: unknown, threadId: ThreadId | null) { - return Effect.gen(function* () { - const threadSegment = resolveThreadSegment(threadId); - const message = yield* toLogMessage(event); - if (!message) { - return; - } + const writer = yield* resolveThreadWriter(threadSegment); + if (!writer) { + return; + } - const writer = yield* resolveThreadWriter(threadSegment); - if (!writer) { - return; - } + yield* writer.writeMessage(message); + }); - yield* writer.writeMessage(message); - }); - }, - close() { - return Effect.gen(function* () { - for (const writer of threadWriters.values()) { - yield* writer.close(); - } - threadWriters.clear(); - }); - }, - } satisfies EventNdjsonLogger; + const close = Effect.fn("close")(function* () { + for (const writer of threadWriters.values()) { + yield* writer.close(); + } + threadWriters.clear(); }); -} + + return { + filePath, + write, + close, + } satisfies EventNdjsonLogger; +});