Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 56 additions & 54 deletions apps/server/src/provider/makeManagedServerProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,70 @@ import * as Semaphore from "effect/Semaphore";
import type { ServerProviderShape } from "./Services/ServerProvider";
import { ServerSettingsError } from "@t3tools/contracts";

export function makeManagedServerProvider<Settings>(input: {
export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")(function* <
Settings,
>(input: {
readonly getSettings: Effect.Effect<Settings>;
readonly streamSettings: Stream.Stream<Settings>;
readonly haveSettingsChanged: (previous: Settings, next: Settings) => boolean;
readonly checkProvider: Effect.Effect<ServerProvider, ServerSettingsError>;
readonly refreshInterval?: Duration.Input;
}): Effect.Effect<ServerProviderShape, ServerSettingsError, Scope.Scope> {
return Effect.gen(function* () {
const refreshSemaphore = yield* Semaphore.make(1);
const changesPubSub = yield* Effect.acquireRelease(
PubSub.unbounded<ServerProvider>(),
PubSub.shutdown,
);
const initialSettings = yield* input.getSettings;
const initialSnapshot = yield* input.checkProvider;
const snapshotRef = yield* Ref.make(initialSnapshot);
const settingsRef = yield* Ref.make(initialSettings);
}): Effect.fn.Return<ServerProviderShape, ServerSettingsError, Scope.Scope> {
const refreshSemaphore = yield* Semaphore.make(1);
const changesPubSub = yield* Effect.acquireRelease(
PubSub.unbounded<ServerProvider>(),
PubSub.shutdown,
);
const initialSettings = yield* input.getSettings;
const initialSnapshot = yield* input.checkProvider;
const snapshotRef = yield* Ref.make(initialSnapshot);
const settingsRef = yield* Ref.make(initialSettings);

const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) =>
refreshSemaphore.withPermits(1)(
Effect.gen(function* () {
const forceRefresh = options?.forceRefresh === true;
const previousSettings = yield* Ref.get(settingsRef);
if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) {
yield* Ref.set(settingsRef, nextSettings);
return yield* Ref.get(snapshotRef);
}
const applySnapshotBase = Effect.fn("applySnapshot")(function* (
nextSettings: Settings,
options?: { readonly forceRefresh?: boolean },
) {
const forceRefresh = options?.forceRefresh === true;
const previousSettings = yield* Ref.get(settingsRef);
if (!forceRefresh && !input.haveSettingsChanged(previousSettings, nextSettings)) {
yield* Ref.set(settingsRef, nextSettings);
return yield* Ref.get(snapshotRef);
}

const nextSnapshot = yield* input.checkProvider;
yield* Ref.set(settingsRef, nextSettings);
yield* Ref.set(snapshotRef, nextSnapshot);
yield* PubSub.publish(changesPubSub, nextSnapshot);
return nextSnapshot;
}),
);
const nextSnapshot = yield* input.checkProvider;
yield* Ref.set(settingsRef, nextSettings);
yield* Ref.set(snapshotRef, nextSnapshot);
yield* PubSub.publish(changesPubSub, nextSnapshot);
return nextSnapshot;
});
const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) =>
refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options));

const refreshSnapshot = Effect.gen(function* () {
const nextSettings = yield* input.getSettings;
return yield* applySnapshot(nextSettings, { forceRefresh: true });
});
const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () {
const nextSettings = yield* input.getSettings;
return yield* applySnapshot(nextSettings, { forceRefresh: true });
});

yield* Stream.runForEach(input.streamSettings, (nextSettings) =>
Effect.asVoid(applySnapshot(nextSettings)),
).pipe(Effect.forkScoped);
yield* Stream.runForEach(input.streamSettings, (nextSettings) =>
Effect.asVoid(applySnapshot(nextSettings)),
).pipe(Effect.forkScoped);

yield* Effect.forever(
Effect.sleep(input.refreshInterval ?? "60 seconds").pipe(
Effect.flatMap(() => refreshSnapshot),
Effect.ignoreCause({ log: true }),
),
).pipe(Effect.forkScoped);
yield* Effect.forever(
Effect.sleep(input.refreshInterval ?? "60 seconds").pipe(
Effect.flatMap(() => refreshSnapshot()),
Effect.ignoreCause({ log: true }),
),
).pipe(Effect.forkScoped);

return {
getSnapshot: input.getSettings.pipe(
Effect.flatMap(applySnapshot),
Effect.tapError(Effect.logError),
Effect.orDie,
),
refresh: refreshSnapshot.pipe(Effect.tapError(Effect.logError), Effect.orDie),
get streamChanges() {
return Stream.fromPubSub(changesPubSub);
},
} satisfies ServerProviderShape;
});
}
return {
getSnapshot: input.getSettings.pipe(
Effect.flatMap(applySnapshot),
Effect.tapError(Effect.logError),
Effect.orDie,
),
refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie),
get streamChanges() {
return Stream.fromPubSub(changesPubSub);
},
} satisfies ServerProviderShape;
});
Loading