From 0860272cb177cf99c24107f686d75279a33497e9 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 00:27:31 -0700 Subject: [PATCH 1/2] migrate Effect.fn in apps/server/src/provider/Layers/EventNdjsonLogger.ts Co-authored-by: codex --- .../src/provider/Layers/EventNdjsonLogger.ts | 260 +++++++++--------- 1 file changed, 130 insertions(+), 130 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 28b7ecf611..3f90d6c624 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -74,108 +74,107 @@ function resolveStreamLabel(stream: EventNdjsonStream): string { } } -function toLogMessage(event: unknown): Effect.Effect { - return Effect.gen(function* () { - const serialized = yield* Effect.sync(() => { - try { - return { ok: true as const, value: JSON.stringify(event) }; - } catch (error) { - return { ok: false as const, error }; - } - }); - - if (!serialized.ok) { - yield* logWarning("failed to serialize provider event log record", { - error: serialized.error, - }); - return undefined; +const toLogMessage = Effect.fn("toLogMessage")(function* ( + event: unknown, +): Effect.fn.Return { + const serialized = yield* Effect.sync(() => { + try { + return { ok: true as const, value: JSON.stringify(event) }; + } catch (error) { + return { ok: false as const, error }; } + }); - if (typeof serialized.value !== "string") { - return undefined; - } + if (!serialized.ok) { + yield* logWarning("failed to serialize provider event log record", { + error: serialized.error, + }); + return undefined; + } - return serialized.value; - }); -} + if (typeof serialized.value !== "string") { + return undefined; + } -function makeThreadWriter(input: { + return serialized.value; +}); + +const makeThreadWriter = Effect.fn("makeThreadWriter")(function* (input: { readonly filePath: string; readonly maxBytes: number; readonly maxFiles: number; readonly batchWindowMs: number; readonly streamLabel: string; -}): Effect.Effect { - return Effect.gen(function* () { - const sinkResult = yield* Effect.sync(() => { - try { - return { - ok: true as const, - sink: new RotatingFileSink({ - filePath: input.filePath, - maxBytes: input.maxBytes, - maxFiles: input.maxFiles, - throwOnError: true, - }), - }; - } catch (error) { - return { ok: false as const, error }; - } +}): Effect.fn.Return { + const sinkResult = yield* Effect.sync(() => { + try { + return { + ok: true as const, + sink: new RotatingFileSink({ + filePath: input.filePath, + maxBytes: input.maxBytes, + maxFiles: input.maxFiles, + throwOnError: true, + }), + }; + } catch (error) { + return { ok: false as const, error }; + } + }); + + if (!sinkResult.ok) { + yield* logWarning("failed to initialize provider thread log file", { + filePath: input.filePath, + error: sinkResult.error, }); + return undefined; + } - if (!sinkResult.ok) { - yield* logWarning("failed to initialize provider thread log file", { - filePath: input.filePath, - error: sinkResult.error, + const sink = sinkResult.sink; + const scope = yield* Scope.make(); + const lineLogger = makeLineLogger(input.streamLabel); + const batchedLogger = yield* Logger.batched(lineLogger, { + window: input.batchWindowMs, + flush: Effect.fn("makeThreadWriter.flush")(function* (messages) { + const flushResult = yield* Effect.sync(() => { + try { + for (const message of messages) { + sink.write(message); + } + return { ok: true as const }; + } catch (error) { + return { ok: false as const, error }; + } }); - return undefined; - } - const sink = sinkResult.sink; - const scope = yield* Scope.make(); - const lineLogger = makeLineLogger(input.streamLabel); - const batchedLogger = yield* Logger.batched(lineLogger, { - window: input.batchWindowMs, - flush: (messages) => - Effect.gen(function* () { - const flushResult = yield* Effect.sync(() => { - try { - for (const message of messages) { - sink.write(message); - } - return { ok: true as const }; - } catch (error) { - return { ok: false as const, error }; - } - }); - - if (!flushResult.ok) { - yield* logWarning("provider event log batch flush failed", { - filePath: input.filePath, - error: flushResult.error, - }); - } - }), - }).pipe(Effect.provideService(Scope.Scope, scope)); + if (!flushResult.ok) { + yield* logWarning("provider event log batch flush failed", { + filePath: input.filePath, + error: flushResult.error, + }); + } + }), + }).pipe(Effect.provideService(Scope.Scope, scope)); - const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false }); + const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false }); - return { - writeMessage(message: string) { - return Effect.log(message).pipe(Effect.provide(loggerLayer)); - }, - close() { - return Scope.close(scope, Exit.void); - }, - } satisfies ThreadWriter; - }); -} + return { + writeMessage(message: string) { + return Effect.log(message).pipe(Effect.provide(loggerLayer)); + }, + close() { + return Scope.close(scope, Exit.void); + }, + } satisfies ThreadWriter; +}); export function makeEventNdjsonLogger( filePath: string, options: EventNdjsonLoggerOptions, ): Effect.Effect { - return Effect.gen(function* () { + return Effect.fn("makeEventNdjsonLogger")(function* (): Effect.fn.Return< + EventNdjsonLogger | undefined + > { const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; @@ -200,58 +199,59 @@ export function makeEventNdjsonLogger( const threadWriters = new Map(); const failedSegments = new Set(); - const resolveThreadWriter = (threadSegment: string): Effect.Effect => - Effect.gen(function* () { - if (failedSegments.has(threadSegment)) { - return undefined; - } - const existing = threadWriters.get(threadSegment); - if (existing) { - return existing; - } - - const writer = yield* makeThreadWriter({ - filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }); - if (!writer) { - failedSegments.add(threadSegment); - return undefined; - } + const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( + threadSegment: string, + ): Effect.fn.Return { + if (failedSegments.has(threadSegment)) { + return undefined; + } + const existing = threadWriters.get(threadSegment); + if (existing) { + return existing; + } - threadWriters.set(threadSegment, writer); - return writer; + const writer = yield* makeThreadWriter({ + filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), + maxBytes, + maxFiles, + batchWindowMs, + streamLabel, }); + if (!writer) { + failedSegments.add(threadSegment); + return undefined; + } - return { - filePath, - write(event: unknown, threadId: ThreadId | null) { - return Effect.gen(function* () { - const threadSegment = resolveThreadSegment(threadId); - const message = yield* toLogMessage(event); - if (!message) { - return; - } + threadWriters.set(threadSegment, writer); + return writer; + }); - const writer = yield* resolveThreadWriter(threadSegment); - if (!writer) { - return; - } + const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { + const threadSegment = resolveThreadSegment(threadId); + const message = yield* toLogMessage(event); + if (!message) { + return; + } - yield* writer.writeMessage(message); - }); - }, - close() { - return Effect.gen(function* () { - for (const writer of threadWriters.values()) { - yield* writer.close(); - } - threadWriters.clear(); - }); - }, + const writer = yield* resolveThreadWriter(threadSegment); + if (!writer) { + return; + } + + yield* writer.writeMessage(message); + }); + + const close = Effect.fn("close")(function* () { + for (const writer of threadWriters.values()) { + yield* writer.close(); + } + threadWriters.clear(); + }); + + return { + filePath, + write, + close, } satisfies EventNdjsonLogger; - }); + })(); } From 55e68ca415f4003702c85cf0949cc635676c35a2 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 09:05:39 -0700 Subject: [PATCH 2/2] kewl --- .../src/provider/Layers/EventNdjsonLogger.ts | 146 +++++++++--------- 1 file changed, 71 insertions(+), 75 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 3f90d6c624..a4fd6f235d 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -168,90 +168,86 @@ const makeThreadWriter = Effect.fn("makeThreadWriter")(function* (input: { } satisfies ThreadWriter; }); -export function makeEventNdjsonLogger( +export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function* ( filePath: string, options: EventNdjsonLoggerOptions, -): Effect.Effect { - return Effect.fn("makeEventNdjsonLogger")(function* (): Effect.fn.Return< - EventNdjsonLogger | undefined - > { - const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; - const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; - const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; - const streamLabel = resolveStreamLabel(options.stream); - - const directoryReady = yield* Effect.sync(() => { - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true }); - return true; - } catch (error) { - return { ok: false as const, error }; - } - }); - if (directoryReady !== true) { - yield* logWarning("failed to create provider event log directory", { - filePath, - error: directoryReady.error, - }); - return undefined; - } +): Effect.fn.Return { + const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; + const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; + const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; + const streamLabel = resolveStreamLabel(options.stream); - const threadWriters = new Map(); - const failedSegments = new Set(); + const directoryReady = yield* Effect.sync(() => { + try { + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + return true; + } catch (error) { + return { ok: false as const, error }; + } + }); + if (directoryReady !== true) { + yield* logWarning("failed to create provider event log directory", { + filePath, + error: directoryReady.error, + }); + return undefined; + } - const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( - threadSegment: string, - ): Effect.fn.Return { - if (failedSegments.has(threadSegment)) { - return undefined; - } - const existing = threadWriters.get(threadSegment); - if (existing) { - return existing; - } + const threadWriters = new Map(); + const failedSegments = new Set(); - const writer = yield* makeThreadWriter({ - filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }); - if (!writer) { - failedSegments.add(threadSegment); - return undefined; - } + const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( + threadSegment: string, + ): Effect.fn.Return { + if (failedSegments.has(threadSegment)) { + return undefined; + } + const existing = threadWriters.get(threadSegment); + if (existing) { + return existing; + } - threadWriters.set(threadSegment, writer); - return writer; + const writer = yield* makeThreadWriter({ + filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), + maxBytes, + maxFiles, + batchWindowMs, + streamLabel, }); + if (!writer) { + failedSegments.add(threadSegment); + return undefined; + } - const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { - const threadSegment = resolveThreadSegment(threadId); - const message = yield* toLogMessage(event); - if (!message) { - return; - } + threadWriters.set(threadSegment, writer); + return writer; + }); - const writer = yield* resolveThreadWriter(threadSegment); - if (!writer) { - return; - } + const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { + const threadSegment = resolveThreadSegment(threadId); + const message = yield* toLogMessage(event); + if (!message) { + return; + } - yield* writer.writeMessage(message); - }); + const writer = yield* resolveThreadWriter(threadSegment); + if (!writer) { + return; + } - const close = Effect.fn("close")(function* () { - for (const writer of threadWriters.values()) { - yield* writer.close(); - } - threadWriters.clear(); - }); + yield* writer.writeMessage(message); + }); - return { - filePath, - write, - close, - } satisfies EventNdjsonLogger; - })(); -} + const close = Effect.fn("close")(function* () { + for (const writer of threadWriters.values()) { + yield* writer.close(); + } + threadWriters.clear(); + }); + + return { + filePath, + write, + close, + } satisfies EventNdjsonLogger; +});