From c61492d62f75388bafe2d7642bd1574488d1b37b Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 28 Mar 2026 22:08:48 -0700 Subject: [PATCH 1/3] Refactor projection pipeline side effects - Wrap projection helpers with named `Effect.fn` - Share attachment directory reads and harden unsafe thread cleanup - Mark the projection pipeline checklist item complete --- .../Layers/ProjectionPipeline.test.ts | 42 ++ .../Layers/ProjectionPipeline.ts | 458 +++++++++--------- docs/effect-fn-checklist.md | 16 +- 3 files changed, 287 insertions(+), 229 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 77b5d4d619..64a19701fd 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -1094,6 +1094,48 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta }, ); +it.layer( + Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-attachments-delete-direct-")), +)("OrchestrationProjectionPipeline", (it) => { + it.effect("ignores unsafe thread ids for incremental attachment cleanup paths", () => + Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const now = new Date().toISOString(); + const { attachmentsDir: attachmentsRootDir, stateDir } = yield* ServerConfig; + const attachmentsSentinelPath = path.join(attachmentsRootDir, "sentinel.txt"); + const stateDirSentinelPath = path.join(stateDir, "state-sentinel.txt"); + yield* fileSystem.makeDirectory(attachmentsRootDir, { recursive: true }); + yield* fileSystem.writeFileString(attachmentsSentinelPath, "keep-attachments-root"); + yield* fileSystem.writeFileString(stateDirSentinelPath, "keep-state-dir"); + + yield* eventStore + .append({ + type: "thread.deleted", + eventId: EventId.makeUnsafe("evt-unsafe-thread-delete-direct"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe(".."), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-unsafe-thread-delete-direct"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-unsafe-thread-delete-direct"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe(".."), + deletedAt: now, + }, + }) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + assert.isTrue(yield* exists(attachmentsRootDir)); + assert.isTrue(yield* exists(attachmentsSentinelPath)); + assert.isTrue(yield* exists(stateDirSentinelPath)); + }), + ); +}); + it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { it.effect("resumes from projector last_applied_sequence without replaying older events", () => Effect.gen(function* () { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 7d85d1e38c..22635df8d4 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -77,7 +77,7 @@ interface AttachmentSideEffects { readonly prunedThreadRelativePaths: Map>; } -const materializeAttachmentsForProjection = Effect.fn( +const materializeAttachmentsForProjection = Effect.fn("materializeAttachmentsForProjection")( (input: { readonly attachments: ReadonlyArray }) => Effect.succeed(input.attachments.length === 0 ? [] : input.attachments), ); @@ -238,124 +238,145 @@ function collectThreadAttachmentRelativePaths( return relativePaths; } -const runAttachmentSideEffects = Effect.fn(function* (sideEffects: AttachmentSideEffects) { +const runAttachmentSideEffects = Effect.fn("runAttachmentSideEffects")(function* ( + sideEffects: AttachmentSideEffects, +) { const serverConfig = yield* Effect.service(ServerConfig); const fileSystem = yield* Effect.service(FileSystem.FileSystem); const path = yield* Effect.service(Path.Path); const attachmentsRootDir = serverConfig.attachmentsDir; - - yield* Effect.forEach( - sideEffects.deletedThreadIds, - (threadId) => - Effect.gen(function* () { - const threadSegment = toSafeThreadAttachmentSegment(threadId); - if (!threadSegment) { - yield* Effect.logWarning("skipping attachment cleanup for unsafe thread id", { - threadId, - }); - return; - } - const entries = yield* fileSystem - .readDirectory(attachmentsRootDir, { recursive: false }) - .pipe(Effect.catch(() => Effect.succeed([] as Array))); - yield* Effect.forEach( - entries, - (entry) => - Effect.gen(function* () { - const normalizedEntry = entry.replace(/^[/\\]+/, "").replace(/\\/g, "/"); - if (normalizedEntry.length === 0 || normalizedEntry.includes("/")) { - return; - } - const attachmentId = parseAttachmentIdFromRelativePath(normalizedEntry); - if (!attachmentId) { - return; - } - const attachmentThreadSegment = parseThreadSegmentFromAttachmentId(attachmentId); - if (!attachmentThreadSegment || attachmentThreadSegment !== threadSegment) { - return; - } - yield* fileSystem.remove(path.join(attachmentsRootDir, normalizedEntry), { - force: true, - }); - }), - { concurrency: 1 }, - ); - }), - { concurrency: 1 }, + const readAttachmentRootEntries = fileSystem + .readDirectory(attachmentsRootDir, { recursive: false }) + .pipe(Effect.catch(() => Effect.succeed([] as Array))); + + const removeDeletedThreadAttachmentEntry = Effect.fn("removeDeletedThreadAttachmentEntry")( + function* (threadSegment: string, entry: string) { + const normalizedEntry = entry.replace(/^[/\\]+/, "").replace(/\\/g, "/"); + if (normalizedEntry.length === 0 || normalizedEntry.includes("/")) { + return; + } + const attachmentId = parseAttachmentIdFromRelativePath(normalizedEntry); + if (!attachmentId) { + return; + } + const attachmentThreadSegment = parseThreadSegmentFromAttachmentId(attachmentId); + if (!attachmentThreadSegment || attachmentThreadSegment !== threadSegment) { + return; + } + yield* fileSystem.remove(path.join(attachmentsRootDir, normalizedEntry), { + force: true, + }); + }, ); + const deleteThreadAttachments = Effect.fn("deleteThreadAttachments")(function* ( + threadId: string, + ) { + const threadSegment = toSafeThreadAttachmentSegment(threadId); + if (!threadSegment) { + yield* Effect.logWarning("skipping attachment cleanup for unsafe thread id", { + threadId, + }); + return; + } + + const entries = yield* readAttachmentRootEntries; + yield* Effect.forEach( + entries, + (entry) => removeDeletedThreadAttachmentEntry(threadSegment, entry), + { + concurrency: 1, + }, + ); + }); + + const pruneThreadAttachmentEntry = Effect.fn("pruneThreadAttachmentEntry")(function* ( + threadSegment: string, + keptThreadRelativePaths: Set, + entry: string, + ) { + const relativePath = entry.replace(/^[/\\]+/, "").replace(/\\/g, "/"); + if (relativePath.length === 0 || relativePath.includes("/")) { + return; + } + const attachmentId = parseAttachmentIdFromRelativePath(relativePath); + if (!attachmentId) { + return; + } + const attachmentThreadSegment = parseThreadSegmentFromAttachmentId(attachmentId); + if (!attachmentThreadSegment || attachmentThreadSegment !== threadSegment) { + return; + } + + const absolutePath = path.join(attachmentsRootDir, relativePath); + const fileInfo = yield* fileSystem + .stat(absolutePath) + .pipe(Effect.catch(() => Effect.succeed(null))); + if (!fileInfo || fileInfo.type !== "File") { + return; + } + + if (!keptThreadRelativePaths.has(relativePath)) { + yield* fileSystem.remove(absolutePath, { force: true }); + } + }); + + const pruneThreadAttachments = Effect.fn("pruneThreadAttachments")(function* ( + threadId: string, + keptThreadRelativePaths: Set, + ) { + if (sideEffects.deletedThreadIds.has(threadId)) { + return; + } + + const threadSegment = toSafeThreadAttachmentSegment(threadId); + if (!threadSegment) { + yield* Effect.logWarning("skipping attachment prune for unsafe thread id", { threadId }); + return; + } + + const entries = yield* readAttachmentRootEntries; + yield* Effect.forEach( + entries, + (entry) => pruneThreadAttachmentEntry(threadSegment, keptThreadRelativePaths, entry), + { concurrency: 1 }, + ); + }); + + yield* Effect.forEach(sideEffects.deletedThreadIds, deleteThreadAttachments, { + concurrency: 1, + }); + yield* Effect.forEach( sideEffects.prunedThreadRelativePaths.entries(), - ([threadId, keptThreadRelativePaths]) => { - if (sideEffects.deletedThreadIds.has(threadId)) { - return Effect.void; - } - return Effect.gen(function* () { - const threadSegment = toSafeThreadAttachmentSegment(threadId); - if (!threadSegment) { - yield* Effect.logWarning("skipping attachment prune for unsafe thread id", { threadId }); - return; - } - const entries = yield* fileSystem - .readDirectory(attachmentsRootDir, { recursive: false }) - .pipe(Effect.catch(() => Effect.succeed([] as Array))); - yield* Effect.forEach( - entries, - (entry) => - Effect.gen(function* () { - const relativePath = entry.replace(/^[/\\]+/, "").replace(/\\/g, "/"); - if (relativePath.length === 0 || relativePath.includes("/")) { - return; - } - const attachmentId = parseAttachmentIdFromRelativePath(relativePath); - if (!attachmentId) { - return; - } - const attachmentThreadSegment = parseThreadSegmentFromAttachmentId(attachmentId); - if (!attachmentThreadSegment || attachmentThreadSegment !== threadSegment) { - return; - } - - const absolutePath = path.join(attachmentsRootDir, relativePath); - const fileInfo = yield* fileSystem - .stat(absolutePath) - .pipe(Effect.catch(() => Effect.succeed(null))); - if (!fileInfo || fileInfo.type !== "File") { - return; - } - - if (!keptThreadRelativePaths.has(relativePath)) { - yield* fileSystem.remove(absolutePath, { force: true }); - } - }), - { concurrency: 1 }, - ); - }); - }, + ([threadId, keptThreadRelativePaths]) => + pruneThreadAttachments(threadId, keptThreadRelativePaths), { concurrency: 1 }, ); }); -const makeOrchestrationProjectionPipeline = Effect.gen(function* () { - const sql = yield* SqlClient.SqlClient; - const eventStore = yield* OrchestrationEventStore; - const projectionStateRepository = yield* ProjectionStateRepository; - const projectionProjectRepository = yield* ProjectionProjectRepository; - const projectionThreadRepository = yield* ProjectionThreadRepository; - const projectionThreadMessageRepository = yield* ProjectionThreadMessageRepository; - const projectionThreadProposedPlanRepository = yield* ProjectionThreadProposedPlanRepository; - const projectionThreadActivityRepository = yield* ProjectionThreadActivityRepository; - const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; - const projectionTurnRepository = yield* ProjectionTurnRepository; - const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; - - const fileSystem = yield* FileSystem.FileSystem; - const path = yield* Path.Path; - const serverConfig = yield* ServerConfig; - - const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) => - Effect.gen(function* () { +const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjectionPipeline")( + function* () { + const sql = yield* SqlClient.SqlClient; + const eventStore = yield* OrchestrationEventStore; + const projectionStateRepository = yield* ProjectionStateRepository; + const projectionProjectRepository = yield* ProjectionProjectRepository; + const projectionThreadRepository = yield* ProjectionThreadRepository; + const projectionThreadMessageRepository = yield* ProjectionThreadMessageRepository; + const projectionThreadProposedPlanRepository = yield* ProjectionThreadProposedPlanRepository; + const projectionThreadActivityRepository = yield* ProjectionThreadActivityRepository; + const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; + const projectionTurnRepository = yield* ProjectionTurnRepository; + const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; + + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const serverConfig = yield* ServerConfig; + + const applyProjectsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyProjectsProjection", + )(function* (event, _attachmentSideEffects) { switch (event.type) { case "project.created": yield* projectionProjectRepository.upsert({ @@ -412,8 +433,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyThreadsProjection: ProjectorDefinition["apply"] = (event, attachmentSideEffects) => - Effect.gen(function* () { + const applyThreadsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadsProjection", + )(function* (event, attachmentSideEffects) { switch (event.type) { case "thread.created": yield* projectionThreadRepository.upsert({ @@ -597,11 +619,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyThreadMessagesProjection: ProjectorDefinition["apply"] = ( - event, - attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyThreadMessagesProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadMessagesProjection", + )(function* (event, attachmentSideEffects) { switch (event.type) { case "thread.message-sent": { const existingRows = yield* projectionThreadMessageRepository.listByThreadId({ @@ -674,11 +694,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyThreadProposedPlansProjection: ProjectorDefinition["apply"] = ( - event, - _attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyThreadProposedPlansProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadProposedPlansProjection", + )(function* (event, _attachmentSideEffects) { switch (event.type) { case "thread.proposed-plan-upserted": yield* projectionThreadProposedPlanRepository.upsert({ @@ -727,11 +745,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyThreadActivitiesProjection: ProjectorDefinition["apply"] = ( - event, - _attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyThreadActivitiesProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadActivitiesProjection", + )(function* (event, _attachmentSideEffects) { switch (event.type) { case "thread.activity-appended": yield* projectionThreadActivityRepository.upsert({ @@ -781,11 +797,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyThreadSessionsProjection: ProjectorDefinition["apply"] = ( - event, - _attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyThreadSessionsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadSessionsProjection", + )(function* (event, _attachmentSideEffects) { if (event.type !== "thread.session-set") { return; } @@ -800,11 +814,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { }); }); - const applyThreadTurnsProjection: ProjectorDefinition["apply"] = ( - event, - _attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyThreadTurnsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyThreadTurnsProjection", + )(function* (event, _attachmentSideEffects) { switch (event.type) { case "thread.turn-start-requested": { yield* projectionTurnRepository.replacePendingTurnStart({ @@ -1058,13 +1070,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const applyCheckpointsProjection: ProjectorDefinition["apply"] = () => Effect.void; + const applyCheckpointsProjection: ProjectorDefinition["apply"] = () => Effect.void; - const applyPendingApprovalsProjection: ProjectorDefinition["apply"] = ( - event, - _attachmentSideEffects, - ) => - Effect.gen(function* () { + const applyPendingApprovalsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyPendingApprovalsProjection", + )(function* (event, _attachmentSideEffects) { switch (event.type) { case "thread.activity-appended": { const requestId = @@ -1150,47 +1160,49 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); - const projectors: ReadonlyArray = [ - { - name: ORCHESTRATION_PROJECTOR_NAMES.projects, - apply: applyProjectsProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threadMessages, - apply: applyThreadMessagesProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, - apply: applyThreadProposedPlansProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threadActivities, - apply: applyThreadActivitiesProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threadSessions, - apply: applyThreadSessionsProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threadTurns, - apply: applyThreadTurnsProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.checkpoints, - apply: applyCheckpointsProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals, - apply: applyPendingApprovalsProjection, - }, - { - name: ORCHESTRATION_PROJECTOR_NAMES.threads, - apply: applyThreadsProjection, - }, - ]; - - const runProjectorForEvent = (projector: ProjectorDefinition, event: OrchestrationEvent) => - Effect.gen(function* () { + const projectors: ReadonlyArray = [ + { + name: ORCHESTRATION_PROJECTOR_NAMES.projects, + apply: applyProjectsProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadMessages, + apply: applyThreadMessagesProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, + apply: applyThreadProposedPlansProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadActivities, + apply: applyThreadActivitiesProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadSessions, + apply: applyThreadSessionsProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadTurns, + apply: applyThreadTurnsProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.checkpoints, + apply: applyCheckpointsProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals, + apply: applyPendingApprovalsProjection, + }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threads, + apply: applyThreadsProjection, + }, + ]; + + const runProjectorForEvent = Effect.fn("runProjectorForEvent")(function* ( + projector: ProjectorDefinition, + event: OrchestrationEvent, + ) { const attachmentSideEffects: AttachmentSideEffects = { deletedThreadIds: new Set(), prunedThreadRelativePaths: new Map>(), @@ -1209,6 +1221,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { ); yield* runAttachmentSideEffects(attachmentSideEffects).pipe( + Effect.provideService(FileSystem.FileSystem, fileSystem), + Effect.provideService(Path.Path, path), + Effect.provideService(ServerConfig, serverConfig), Effect.catch((cause) => Effect.logWarning("failed to apply projected attachment side-effects", { projector: projector.name, @@ -1220,63 +1235,64 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { ); }); - const bootstrapProjector = (projector: ProjectorDefinition) => - projectionStateRepository - .getByProjector({ - projector: projector.name, - }) - .pipe( - Effect.flatMap((stateRow) => - Stream.runForEach( - eventStore.readFromSequence( - Option.isSome(stateRow) ? stateRow.value.lastAppliedSequence : 0, + const bootstrapProjector = (projector: ProjectorDefinition) => + projectionStateRepository + .getByProjector({ + projector: projector.name, + }) + .pipe( + Effect.flatMap((stateRow) => + Stream.runForEach( + eventStore.readFromSequence( + Option.isSome(stateRow) ? stateRow.value.lastAppliedSequence : 0, + ), + (event) => runProjectorForEvent(projector, event), ), - (event) => runProjectorForEvent(projector, event), ), + ); + + const projectEvent: OrchestrationProjectionPipelineShape["projectEvent"] = (event) => + Effect.forEach(projectors, (projector) => runProjectorForEvent(projector, event), { + concurrency: 1, + }).pipe( + Effect.provideService(FileSystem.FileSystem, fileSystem), + Effect.provideService(Path.Path, path), + Effect.provideService(ServerConfig, serverConfig), + Effect.asVoid, + Effect.catchTag("SqlError", (sqlError) => + Effect.fail(toPersistenceSqlError("ProjectionPipeline.projectEvent:query")(sqlError)), ), ); - const projectEvent: OrchestrationProjectionPipelineShape["projectEvent"] = (event) => - Effect.forEach(projectors, (projector) => runProjectorForEvent(projector, event), { - concurrency: 1, - }).pipe( + const bootstrap: OrchestrationProjectionPipelineShape["bootstrap"] = Effect.forEach( + projectors, + bootstrapProjector, + { concurrency: 1 }, + ).pipe( Effect.provideService(FileSystem.FileSystem, fileSystem), Effect.provideService(Path.Path, path), Effect.provideService(ServerConfig, serverConfig), Effect.asVoid, + Effect.tap(() => + Effect.log("orchestration projection pipeline bootstrapped").pipe( + Effect.annotateLogs({ projectors: projectors.length }), + ), + ), Effect.catchTag("SqlError", (sqlError) => - Effect.fail(toPersistenceSqlError("ProjectionPipeline.projectEvent:query")(sqlError)), + Effect.fail(toPersistenceSqlError("ProjectionPipeline.bootstrap:query")(sqlError)), ), ); - const bootstrap: OrchestrationProjectionPipelineShape["bootstrap"] = Effect.forEach( - projectors, - bootstrapProjector, - { concurrency: 1 }, - ).pipe( - Effect.provideService(FileSystem.FileSystem, fileSystem), - Effect.provideService(Path.Path, path), - Effect.provideService(ServerConfig, serverConfig), - Effect.asVoid, - Effect.tap(() => - Effect.log("orchestration projection pipeline bootstrapped").pipe( - Effect.annotateLogs({ projectors: projectors.length }), - ), - ), - Effect.catchTag("SqlError", (sqlError) => - Effect.fail(toPersistenceSqlError("ProjectionPipeline.bootstrap:query")(sqlError)), - ), - ); - - return { - bootstrap, - projectEvent, - } satisfies OrchestrationProjectionPipelineShape; -}); + return { + bootstrap, + projectEvent, + } satisfies OrchestrationProjectionPipelineShape; + }, +); export const OrchestrationProjectionPipelineLive = Layer.effect( OrchestrationProjectionPipeline, - makeOrchestrationProjectionPipeline, + makeOrchestrationProjectionPipeline(), ).pipe( Layer.provideMerge(NodeServices.layer), Layer.provideMerge(ProjectionProjectRepositoryLive), diff --git a/docs/effect-fn-checklist.md b/docs/effect-fn-checklist.md index 0d28171aa2..045aaf0b16 100644 --- a/docs/effect-fn-checklist.md +++ b/docs/effect-fn-checklist.md @@ -36,7 +36,7 @@ const new = Effect.fn('functionName')(function* () { - [ ] `apps/server/src/git/Layers/GitCore.ts` - [ ] `apps/server/src/git/Layers/GitManager.ts` - [ ] `apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` -- [ ] `apps/server/src/orchestration/Layers/ProjectionPipeline.ts` +- [x] `apps/server/src/orchestration/Layers/ProjectionPipeline.ts` - [ ] `apps/server/src/orchestration/Layers/OrchestrationEngine.ts` - [ ] `apps/server/src/provider/Layers/EventNdjsonLogger.ts` - [ ] `Everything else` @@ -79,13 +79,13 @@ const new = Effect.fn('functionName')(function* () { ### `apps/server/src/orchestration/Layers/ProjectionPipeline.ts` (`25`) -- [ ] [runProjectorForEvent](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L1161) -- [ ] [applyProjectsProjection](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L357) -- [ ] [applyThreadsProjection](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L415) -- [ ] `Effect.forEach(..., threadId => Effect.gen(...))` callbacks around `L250` -- [ ] `Effect.forEach(..., entry => Effect.gen(...))` callbacks around `L264` -- [ ] `Effect.forEach(..., entry => Effect.gen(...))` callbacks around `L305` -- [ ] Remaining apply helpers in this file +- [x] [runProjectorForEvent](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L1161) +- [x] [applyProjectsProjection](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L357) +- [x] [applyThreadsProjection](/Users/julius/Development/Work/codething-mvp/apps/server/src/orchestration/Layers/ProjectionPipeline.ts#L415) +- [x] `Effect.forEach(..., threadId => Effect.gen(...))` callbacks around `L250` +- [x] `Effect.forEach(..., entry => Effect.gen(...))` callbacks around `L264` +- [x] `Effect.forEach(..., entry => Effect.gen(...))` callbacks around `L305` +- [x] Remaining apply helpers in this file ### `apps/server/src/provider/Layers/ProviderService.ts` (`24`) From 9aa023cd9e15f9fac39e7d260f51e6d2bb9347e6 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 28 Mar 2026 22:32:11 -0700 Subject: [PATCH 2/3] not sure why u did that codex --- apps/server/src/orchestration/Layers/ProjectionPipeline.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 22635df8d4..f0f2ccabee 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -1221,9 +1221,6 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti ); yield* runAttachmentSideEffects(attachmentSideEffects).pipe( - Effect.provideService(FileSystem.FileSystem, fileSystem), - Effect.provideService(Path.Path, path), - Effect.provideService(ServerConfig, serverConfig), Effect.catch((cause) => Effect.logWarning("failed to apply projected attachment side-effects", { projector: projector.name, From 684160a15ec6759a87912a3c7f99e9b524dd2c4f Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 28 Mar 2026 22:33:10 -0700 Subject: [PATCH 3/3] rev test --- .../Layers/ProjectionPipeline.test.ts | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 64a19701fd..77b5d4d619 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -1094,48 +1094,6 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta }, ); -it.layer( - Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-attachments-delete-direct-")), -)("OrchestrationProjectionPipeline", (it) => { - it.effect("ignores unsafe thread ids for incremental attachment cleanup paths", () => - Effect.gen(function* () { - const fileSystem = yield* FileSystem.FileSystem; - const path = yield* Path.Path; - const projectionPipeline = yield* OrchestrationProjectionPipeline; - const eventStore = yield* OrchestrationEventStore; - const now = new Date().toISOString(); - const { attachmentsDir: attachmentsRootDir, stateDir } = yield* ServerConfig; - const attachmentsSentinelPath = path.join(attachmentsRootDir, "sentinel.txt"); - const stateDirSentinelPath = path.join(stateDir, "state-sentinel.txt"); - yield* fileSystem.makeDirectory(attachmentsRootDir, { recursive: true }); - yield* fileSystem.writeFileString(attachmentsSentinelPath, "keep-attachments-root"); - yield* fileSystem.writeFileString(stateDirSentinelPath, "keep-state-dir"); - - yield* eventStore - .append({ - type: "thread.deleted", - eventId: EventId.makeUnsafe("evt-unsafe-thread-delete-direct"), - aggregateKind: "thread", - aggregateId: ThreadId.makeUnsafe(".."), - occurredAt: now, - commandId: CommandId.makeUnsafe("cmd-unsafe-thread-delete-direct"), - causationEventId: null, - correlationId: CorrelationId.makeUnsafe("cmd-unsafe-thread-delete-direct"), - metadata: {}, - payload: { - threadId: ThreadId.makeUnsafe(".."), - deletedAt: now, - }, - }) - .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); - - assert.isTrue(yield* exists(attachmentsRootDir)); - assert.isTrue(yield* exists(attachmentsSentinelPath)); - assert.isTrue(yield* exists(stateDirSentinelPath)); - }), - ); -}); - it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { it.effect("resumes from projector last_applied_sequence without replaying older events", () => Effect.gen(function* () {