Skip to content

Commit afb58bb

Browse files
committed
fix: guard resolveThreadWriter with semaphore to prevent TOCTOU race
Multiple adapter fibers can concurrently call resolveThreadWriter for the same thread segment (e.g. _global). The check-then-create pattern yields between get() and set(), allowing duplicate writers to be created. The second writer overwrites the first in the map, orphaning its file handle and batched logger. Wrap the resolve logic with a Semaphore(1) so only one fiber can create a writer for a given segment at a time.
1 parent f5a208a commit afb58bb

File tree

1 file changed

+27
-25
lines changed

1 file changed

+27
-25
lines changed

apps/server/src/provider/Layers/EventNdjsonLogger.ts

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import path from "node:path";
1010

1111
import type { ThreadId } from "@t3tools/contracts";
1212
import { RotatingFileSink } from "@t3tools/shared/logging";
13-
import { Effect, Exit, Logger, Scope } from "effect";
13+
import { Effect, Exit, Logger, Scope, Semaphore } from "effect";
1414

1515
import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts";
1616

@@ -195,33 +195,35 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function
195195

196196
const threadWriters = new Map<string, ThreadWriter>();
197197
const failedSegments = new Set<string>();
198+
const writerMutex = yield* Semaphore.make(1);
198199

199-
const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* (
200-
threadSegment: string,
201-
): Effect.fn.Return<ThreadWriter | undefined> {
202-
if (failedSegments.has(threadSegment)) {
203-
return undefined;
204-
}
205-
const existing = threadWriters.get(threadSegment);
206-
if (existing) {
207-
return existing;
208-
}
200+
const resolveThreadWriter = (threadSegment: string) =>
201+
writerMutex.withPermits(1)(
202+
Effect.gen(function* () {
203+
if (failedSegments.has(threadSegment)) {
204+
return undefined;
205+
}
206+
const existing = threadWriters.get(threadSegment);
207+
if (existing) {
208+
return existing;
209+
}
209210

210-
const writer = yield* makeThreadWriter({
211-
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
212-
maxBytes,
213-
maxFiles,
214-
batchWindowMs,
215-
streamLabel,
216-
});
217-
if (!writer) {
218-
failedSegments.add(threadSegment);
219-
return undefined;
220-
}
211+
const writer = yield* makeThreadWriter({
212+
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
213+
maxBytes,
214+
maxFiles,
215+
batchWindowMs,
216+
streamLabel,
217+
});
218+
if (!writer) {
219+
failedSegments.add(threadSegment);
220+
return undefined;
221+
}
221222

222-
threadWriters.set(threadSegment, writer);
223-
return writer;
224-
});
223+
threadWriters.set(threadSegment, writer);
224+
return writer;
225+
}).pipe(Effect.withSpan("resolveThreadWriter")),
226+
);
225227

226228
const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) {
227229
const threadSegment = resolveThreadSegment(threadId);

0 commit comments

Comments
 (0)