From cc8d098b8de5b0613ea6d560708c431b451c2fd3 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 00:27:45 -0700 Subject: [PATCH 1/2] migrate Effect.fn in apps/server/src/provider/makeManagedServerProvider.ts Co-authored-by: codex --- .../src/provider/makeManagedServerProvider.ts | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/apps/server/src/provider/makeManagedServerProvider.ts b/apps/server/src/provider/makeManagedServerProvider.ts index 4c1d0878fc..c097a571ef 100644 --- a/apps/server/src/provider/makeManagedServerProvider.ts +++ b/apps/server/src/provider/makeManagedServerProvider.ts @@ -12,7 +12,11 @@ export function makeManagedServerProvider(input: { readonly checkProvider: Effect.Effect; readonly refreshInterval?: Duration.Input; }): Effect.Effect { - return Effect.gen(function* () { + return Effect.fn("makeManagedServerProvider")(function* (): Effect.fn.Return< + ServerProviderShape, + ServerSettingsError, + Scope.Scope + > { const refreshSemaphore = yield* Semaphore.make(1); const changesPubSub = yield* Effect.acquireRelease( PubSub.unbounded(), @@ -23,25 +27,27 @@ export function makeManagedServerProvider(input: { const snapshotRef = yield* Ref.make(initialSnapshot); const settingsRef = yield* Ref.make(initialSettings); - const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) => - refreshSemaphore.withPermits(1)( - Effect.gen(function* () { - const forceRefresh = options?.forceRefresh === true; - const previousSettings = yield* Ref.get(settingsRef); - if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) { - yield* Ref.set(settingsRef, nextSettings); - return yield* Ref.get(snapshotRef); - } + const applySnapshotBase = Effect.fn("applySnapshot")(function* ( + nextSettings: Settings, + options?: { readonly forceRefresh?: boolean }, + ) { + const forceRefresh = options?.forceRefresh === true; + const previousSettings = yield* Ref.get(settingsRef); + if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) { + yield* Ref.set(settingsRef, nextSettings); + return yield* Ref.get(snapshotRef); + } - const nextSnapshot = yield* input.checkProvider; - yield* Ref.set(settingsRef, nextSettings); - yield* Ref.set(snapshotRef, nextSnapshot); - yield* PubSub.publish(changesPubSub, nextSnapshot); - return nextSnapshot; - }), - ); + const nextSnapshot = yield* input.checkProvider; + yield* Ref.set(settingsRef, nextSettings); + yield* Ref.set(snapshotRef, nextSnapshot); + yield* PubSub.publish(changesPubSub, nextSnapshot); + return nextSnapshot; + }); + const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) => + refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options)); - const refreshSnapshot = Effect.gen(function* () { + const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () { const nextSettings = yield* input.getSettings; return yield* applySnapshot(nextSettings, { forceRefresh: true }); }); @@ -52,7 +58,7 @@ export function makeManagedServerProvider(input: { yield* Effect.forever( Effect.sleep(input.refreshInterval ?? "60 seconds").pipe( - Effect.flatMap(() => refreshSnapshot), + Effect.flatMap(() => refreshSnapshot()), Effect.ignoreCause({ log: true }), ), ).pipe(Effect.forkScoped); @@ -63,10 +69,10 @@ export function makeManagedServerProvider(input: { Effect.tapError(Effect.logError), Effect.orDie, ), - refresh: refreshSnapshot.pipe(Effect.tapError(Effect.logError), Effect.orDie), + refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie), get streamChanges() { return Stream.fromPubSub(changesPubSub); }, } satisfies ServerProviderShape; - }); + })(); } From 55d88a65d94ffe75f1e8a17bc14dad580533ce82 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 09:00:08 -0700 Subject: [PATCH 2/2] bad code --- .../src/provider/makeManagedServerProvider.ts | 116 +++++++++--------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/apps/server/src/provider/makeManagedServerProvider.ts b/apps/server/src/provider/makeManagedServerProvider.ts index c097a571ef..59aeac1ab5 100644 --- a/apps/server/src/provider/makeManagedServerProvider.ts +++ b/apps/server/src/provider/makeManagedServerProvider.ts @@ -5,74 +5,70 @@ import * as Semaphore from "effect/Semaphore"; import type { ServerProviderShape } from "./Services/ServerProvider"; import { ServerSettingsError } from "@t3tools/contracts"; -export function makeManagedServerProvider(input: { +export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")(function* < + Settings, +>(input: { readonly getSettings: Effect.Effect; readonly streamSettings: Stream.Stream; readonly haveSettingsChanged: (previous: Settings, next: Settings) => boolean; readonly checkProvider: Effect.Effect; readonly refreshInterval?: Duration.Input; -}): Effect.Effect { - return Effect.fn("makeManagedServerProvider")(function* (): Effect.fn.Return< - ServerProviderShape, - ServerSettingsError, - Scope.Scope - > { - const refreshSemaphore = yield* Semaphore.make(1); - const changesPubSub = yield* Effect.acquireRelease( - PubSub.unbounded(), - PubSub.shutdown, - ); - const initialSettings = yield* input.getSettings; - const initialSnapshot = yield* input.checkProvider; - const snapshotRef = yield* Ref.make(initialSnapshot); - const settingsRef = yield* Ref.make(initialSettings); +}): Effect.fn.Return { + const refreshSemaphore = yield* Semaphore.make(1); + const changesPubSub = yield* Effect.acquireRelease( + PubSub.unbounded(), + PubSub.shutdown, + ); + const initialSettings = yield* input.getSettings; + const initialSnapshot = yield* input.checkProvider; + const snapshotRef = yield* Ref.make(initialSnapshot); + const settingsRef = yield* Ref.make(initialSettings); - const applySnapshotBase = Effect.fn("applySnapshot")(function* ( - nextSettings: Settings, - options?: { readonly forceRefresh?: boolean }, - ) { - const forceRefresh = options?.forceRefresh === true; - const previousSettings = yield* Ref.get(settingsRef); - if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) { - yield* Ref.set(settingsRef, nextSettings); - return yield* Ref.get(snapshotRef); - } - - const nextSnapshot = yield* input.checkProvider; + const applySnapshotBase = Effect.fn("applySnapshot")(function* ( + nextSettings: Settings, + options?: { readonly forceRefresh?: boolean }, + ) { + const forceRefresh = options?.forceRefresh === true; + const previousSettings = yield* Ref.get(settingsRef); + if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) { yield* Ref.set(settingsRef, nextSettings); - yield* Ref.set(snapshotRef, nextSnapshot); - yield* PubSub.publish(changesPubSub, nextSnapshot); - return nextSnapshot; - }); - const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) => - refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options)); + return yield* Ref.get(snapshotRef); + } + + const nextSnapshot = yield* input.checkProvider; + yield* Ref.set(settingsRef, nextSettings); + yield* Ref.set(snapshotRef, nextSnapshot); + yield* PubSub.publish(changesPubSub, nextSnapshot); + return nextSnapshot; + }); + const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) => + refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options)); - const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () { - const nextSettings = yield* input.getSettings; - return yield* applySnapshot(nextSettings, { forceRefresh: true }); - }); + const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () { + const nextSettings = yield* input.getSettings; + return yield* applySnapshot(nextSettings, { forceRefresh: true }); + }); - yield* Stream.runForEach(input.streamSettings, (nextSettings) => - Effect.asVoid(applySnapshot(nextSettings)), - ).pipe(Effect.forkScoped); + yield* Stream.runForEach(input.streamSettings, (nextSettings) => + Effect.asVoid(applySnapshot(nextSettings)), + ).pipe(Effect.forkScoped); - yield* Effect.forever( - Effect.sleep(input.refreshInterval ?? "60 seconds").pipe( - Effect.flatMap(() => refreshSnapshot()), - Effect.ignoreCause({ log: true }), - ), - ).pipe(Effect.forkScoped); + yield* Effect.forever( + Effect.sleep(input.refreshInterval ?? "60 seconds").pipe( + Effect.flatMap(() => refreshSnapshot()), + Effect.ignoreCause({ log: true }), + ), + ).pipe(Effect.forkScoped); - return { - getSnapshot: input.getSettings.pipe( - Effect.flatMap(applySnapshot), - Effect.tapError(Effect.logError), - Effect.orDie, - ), - refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie), - get streamChanges() { - return Stream.fromPubSub(changesPubSub); - }, - } satisfies ServerProviderShape; - })(); -} + return { + getSnapshot: input.getSettings.pipe( + Effect.flatMap(applySnapshot), + Effect.tapError(Effect.logError), + Effect.orDie, + ), + refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie), + get streamChanges() { + return Stream.fromPubSub(changesPubSub); + }, + } satisfies ServerProviderShape; +});