Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 152 additions & 156 deletions apps/server/src/provider/Layers/EventNdjsonLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,184 +74,180 @@ function resolveStreamLabel(stream: EventNdjsonStream): string {
}
}

function toLogMessage(event: unknown): Effect.Effect<string | undefined> {
return Effect.gen(function* () {
const serialized = yield* Effect.sync(() => {
try {
return { ok: true as const, value: JSON.stringify(event) };
} catch (error) {
return { ok: false as const, error };
}
});

if (!serialized.ok) {
yield* logWarning("failed to serialize provider event log record", {
error: serialized.error,
});
return undefined;
const toLogMessage = Effect.fn("toLogMessage")(function* (
event: unknown,
): Effect.fn.Return<string | undefined> {
const serialized = yield* Effect.sync(() => {
try {
return { ok: true as const, value: JSON.stringify(event) };
} catch (error) {
return { ok: false as const, error };
}
});

if (typeof serialized.value !== "string") {
return undefined;
}
if (!serialized.ok) {
yield* logWarning("failed to serialize provider event log record", {
error: serialized.error,
});
return undefined;
}

return serialized.value;
});
}
if (typeof serialized.value !== "string") {
return undefined;
}

return serialized.value;
});

function makeThreadWriter(input: {
const makeThreadWriter = Effect.fn("makeThreadWriter")(function* (input: {
readonly filePath: string;
readonly maxBytes: number;
readonly maxFiles: number;
readonly batchWindowMs: number;
readonly streamLabel: string;
}): Effect.Effect<ThreadWriter | undefined> {
return Effect.gen(function* () {
const sinkResult = yield* Effect.sync(() => {
try {
return {
ok: true as const,
sink: new RotatingFileSink({
filePath: input.filePath,
maxBytes: input.maxBytes,
maxFiles: input.maxFiles,
throwOnError: true,
}),
};
} catch (error) {
return { ok: false as const, error };
}
});

if (!sinkResult.ok) {
yield* logWarning("failed to initialize provider thread log file", {
filePath: input.filePath,
error: sinkResult.error,
});
return undefined;
}): Effect.fn.Return<ThreadWriter | undefined> {
const sinkResult = yield* Effect.sync(() => {
try {
return {
ok: true as const,
sink: new RotatingFileSink({
filePath: input.filePath,
maxBytes: input.maxBytes,
maxFiles: input.maxFiles,
throwOnError: true,
}),
};
} catch (error) {
return { ok: false as const, error };
}
});

if (!sinkResult.ok) {
yield* logWarning("failed to initialize provider thread log file", {
filePath: input.filePath,
error: sinkResult.error,
});
return undefined;
}

const sink = sinkResult.sink;
const scope = yield* Scope.make();
const lineLogger = makeLineLogger(input.streamLabel);
const batchedLogger = yield* Logger.batched(lineLogger, {
window: input.batchWindowMs,
flush: (messages) =>
Effect.gen(function* () {
const flushResult = yield* Effect.sync(() => {
try {
for (const message of messages) {
sink.write(message);
}
return { ok: true as const };
} catch (error) {
return { ok: false as const, error };
}
});

if (!flushResult.ok) {
yield* logWarning("provider event log batch flush failed", {
filePath: input.filePath,
error: flushResult.error,
});
const sink = sinkResult.sink;
const scope = yield* Scope.make();
const lineLogger = makeLineLogger(input.streamLabel);
const batchedLogger = yield* Logger.batched(lineLogger, {
window: input.batchWindowMs,
flush: Effect.fn("makeThreadWriter.flush")(function* (messages) {
const flushResult = yield* Effect.sync(() => {
try {
for (const message of messages) {
sink.write(message);
}
}),
}).pipe(Effect.provideService(Scope.Scope, scope));

const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false });

return {
writeMessage(message: string) {
return Effect.log(message).pipe(Effect.provide(loggerLayer));
},
close() {
return Scope.close(scope, Exit.void);
},
} satisfies ThreadWriter;
});
}
return { ok: true as const };
} catch (error) {
return { ok: false as const, error };
}
});

export function makeEventNdjsonLogger(
if (!flushResult.ok) {
yield* logWarning("provider event log batch flush failed", {
filePath: input.filePath,
error: flushResult.error,
});
}
}),
}).pipe(Effect.provideService(Scope.Scope, scope));

const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false });

return {
writeMessage(message: string) {
return Effect.log(message).pipe(Effect.provide(loggerLayer));
},
close() {
return Scope.close(scope, Exit.void);
},
} satisfies ThreadWriter;
});

export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function* (
filePath: string,
options: EventNdjsonLoggerOptions,
): Effect.Effect<EventNdjsonLogger | undefined> {
return Effect.gen(function* () {
const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES;
const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES;
const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS;
const streamLabel = resolveStreamLabel(options.stream);

const directoryReady = yield* Effect.sync(() => {
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
return true;
} catch (error) {
return { ok: false as const, error };
}
): Effect.fn.Return<EventNdjsonLogger | undefined> {
const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES;
const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES;
const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS;
const streamLabel = resolveStreamLabel(options.stream);

const directoryReady = yield* Effect.sync(() => {
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
return true;
} catch (error) {
return { ok: false as const, error };
}
});
if (directoryReady !== true) {
yield* logWarning("failed to create provider event log directory", {
filePath,
error: directoryReady.error,
});
if (directoryReady !== true) {
yield* logWarning("failed to create provider event log directory", {
filePath,
error: directoryReady.error,
});
return undefined;
}

const threadWriters = new Map<string, ThreadWriter>();
const failedSegments = new Set<string>();

const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* (
threadSegment: string,
): Effect.fn.Return<ThreadWriter | undefined> {
if (failedSegments.has(threadSegment)) {
return undefined;
}
const existing = threadWriters.get(threadSegment);
if (existing) {
return existing;
}

const threadWriters = new Map<string, ThreadWriter>();
const failedSegments = new Set<string>();

const resolveThreadWriter = (threadSegment: string): Effect.Effect<ThreadWriter | undefined> =>
Effect.gen(function* () {
if (failedSegments.has(threadSegment)) {
return undefined;
}
const existing = threadWriters.get(threadSegment);
if (existing) {
return existing;
}
const writer = yield* makeThreadWriter({
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
maxBytes,
maxFiles,
batchWindowMs,
streamLabel,
});
if (!writer) {
failedSegments.add(threadSegment);
return undefined;
}

const writer = yield* makeThreadWriter({
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
maxBytes,
maxFiles,
batchWindowMs,
streamLabel,
});
if (!writer) {
failedSegments.add(threadSegment);
return undefined;
}
threadWriters.set(threadSegment, writer);
return writer;
});

threadWriters.set(threadSegment, writer);
return writer;
});
const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) {
const threadSegment = resolveThreadSegment(threadId);
const message = yield* toLogMessage(event);
if (!message) {
return;
}

return {
filePath,
write(event: unknown, threadId: ThreadId | null) {
return Effect.gen(function* () {
const threadSegment = resolveThreadSegment(threadId);
const message = yield* toLogMessage(event);
if (!message) {
return;
}
const writer = yield* resolveThreadWriter(threadSegment);
if (!writer) {
return;
}

const writer = yield* resolveThreadWriter(threadSegment);
if (!writer) {
return;
}
yield* writer.writeMessage(message);
});

yield* writer.writeMessage(message);
});
},
close() {
return Effect.gen(function* () {
for (const writer of threadWriters.values()) {
yield* writer.close();
}
threadWriters.clear();
});
},
} satisfies EventNdjsonLogger;
const close = Effect.fn("close")(function* () {
for (const writer of threadWriters.values()) {
yield* writer.close();
}
threadWriters.clear();
});
}

return {
filePath,
write,
close,
} satisfies EventNdjsonLogger;
});
Loading