Skip to content
10 changes: 7 additions & 3 deletions apps/server/src/provider/Layers/ProviderHealth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type {
ServerProviderStatus,
ServerProviderStatusState,
} from "@t3tools/contracts";
import { Effect, Layer, Option, Result, Stream } from "effect";
import { Array, Effect, Fiber, Layer, Option, Result, Stream } from "effect";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";

import {
Expand Down Expand Up @@ -312,9 +312,13 @@ export const checkCodexProviderStatus: Effect.Effect<
export const ProviderHealthLive = Layer.effect(
ProviderHealth,
Effect.gen(function* () {
const codexStatus = yield* checkCodexProviderStatus;
const codexStatusFiber = yield* checkCodexProviderStatus.pipe(
Effect.map(Array.of),
Effect.forkScoped,
);

return {
getStatuses: Effect.succeed([codexStatus]),
getStatuses: Fiber.join(codexStatusFiber),
} satisfies ProviderHealthShape;
}),
);
24 changes: 20 additions & 4 deletions apps/server/src/wsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
WebSocketRequest,
WsPush,
WsResponse,
ServerProviderStatus,
} from "@t3tools/contracts";
import * as NodeHttpServer from "@effect/platform-node/NodeHttpServer";
import {
Expand Down Expand Up @@ -268,8 +269,6 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
),
);

const providerStatuses = yield* providerHealth.getStatuses;

const clients = yield* Ref.make(new Set<WebSocket>());
const logger = createLogger("ws");

Expand Down Expand Up @@ -617,6 +616,23 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
const subscriptionsScope = yield* Scope.make("sequential");
yield* Effect.addFinalizer(() => Scope.close(subscriptionsScope, Exit.void));

// Push updated provider statuses to connected clients once background health checks finish.
let providers: ReadonlyArray<ServerProviderStatus> = [];
yield* providerHealth.getStatuses.pipe(
Effect.flatMap((statuses) => {
providers = statuses;
return broadcastPush({
type: "push",
channel: WS_CHANNELS.serverConfigUpdated,
data: {
issues: [],
providers: statuses,
},
});
}),
Effect.forkIn(subscriptionsScope),
);

yield* Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) =>
broadcastPush({
type: "push",
Expand All @@ -631,7 +647,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
channel: WS_CHANNELS.serverConfigUpdated,
data: {
issues: event.issues,
providers: providerStatuses,
providers,
},
}),
).pipe(Effect.forkIn(subscriptionsScope));
Expand Down Expand Up @@ -883,7 +899,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
keybindingsConfigPath,
keybindings: keybindingsConfig.keybindings,
issues: keybindingsConfig.issues,
providers: providerStatuses,
providers,
availableEditors,
};

Expand Down
128 changes: 127 additions & 1 deletion apps/web/src/composerDraftStore.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { ProjectId, ThreadId } from "@t3tools/contracts";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { type ComposerImageAttachment, useComposerDraftStore } from "./composerDraftStore";
import {
type ComposerImageAttachment,
createDebouncedStorage,
useComposerDraftStore,
} from "./composerDraftStore";

function makeImage(input: {
id: string;
Expand Down Expand Up @@ -451,3 +455,125 @@ describe("composerDraftStore runtime and interaction settings", () => {
expect(useComposerDraftStore.getState().draftsByThreadId[threadId]).toBeUndefined();
});
});

// ---------------------------------------------------------------------------
// createDebouncedStorage
// ---------------------------------------------------------------------------

function createMockStorage() {
const store = new Map<string, string>();
return {
getItem: vi.fn((name: string) => store.get(name) ?? null),
setItem: vi.fn((name: string, value: string) => {
store.set(name, value);
}),
removeItem: vi.fn((name: string) => {
store.delete(name);
}),
};
}

describe("createDebouncedStorage", () => {
beforeEach(() => {
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

it("delegates getItem immediately", () => {
const base = createMockStorage();
base.getItem.mockReturnValueOnce("value");
const storage = createDebouncedStorage(base);

expect(storage.getItem("key")).toBe("value");
expect(base.getItem).toHaveBeenCalledWith("key");
});

it("does not write to base storage until the debounce fires", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
expect(base.setItem).not.toHaveBeenCalled();

vi.advanceTimersByTime(299);
expect(base.setItem).not.toHaveBeenCalled();

vi.advanceTimersByTime(1);
expect(base.setItem).toHaveBeenCalledWith("key", "v1");
});

it("only writes the last value when setItem is called rapidly", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
storage.setItem("key", "v2");
storage.setItem("key", "v3");

vi.advanceTimersByTime(300);
expect(base.setItem).toHaveBeenCalledTimes(1);
expect(base.setItem).toHaveBeenCalledWith("key", "v3");
});

it("removeItem cancels a pending setItem write", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
storage.removeItem("key");

vi.advanceTimersByTime(300);
expect(base.setItem).not.toHaveBeenCalled();
expect(base.removeItem).toHaveBeenCalledWith("key");
});

it("flush writes the pending value immediately", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
expect(base.setItem).not.toHaveBeenCalled();

storage.flush();
expect(base.setItem).toHaveBeenCalledWith("key", "v1");

// Timer should be cancelled; no duplicate write.
vi.advanceTimersByTime(300);
expect(base.setItem).toHaveBeenCalledTimes(1);
});

it("flush is a no-op when nothing is pending", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.flush();
expect(base.setItem).not.toHaveBeenCalled();
});

it("flush after removeItem is a no-op", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
storage.removeItem("key");
storage.flush();

expect(base.setItem).not.toHaveBeenCalled();
});

it("setItem works normally after removeItem cancels a pending write", () => {
const base = createMockStorage();
const storage = createDebouncedStorage(base);

storage.setItem("key", "v1");
storage.removeItem("key");
storage.setItem("key", "v2");

vi.advanceTimersByTime(300);
expect(base.setItem).toHaveBeenCalledTimes(1);
expect(base.setItem).toHaveBeenCalledWith("key", "v2");
});
});
46 changes: 44 additions & 2 deletions apps/web/src/composerDraftStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,54 @@ import {
DEFAULT_RUNTIME_MODE,
type ChatImageAttachment,
} from "./types";
import { Debouncer } from "@tanstack/react-pacer";
import { create } from "zustand";
import { createJSONStorage, persist } from "zustand/middleware";
import { createJSONStorage, persist, type StateStorage } from "zustand/middleware";

export const COMPOSER_DRAFT_STORAGE_KEY = "t3code:composer-drafts:v1";
export type DraftThreadEnvMode = "local" | "worktree";

const COMPOSER_PERSIST_DEBOUNCE_MS = 300;

interface DebouncedStorage extends StateStorage {
flush: () => void;
}

export function createDebouncedStorage(baseStorage: StateStorage): DebouncedStorage {
const debouncedSetItem = new Debouncer(
(name: string, value: string) => {
baseStorage.setItem(name, value);
},
{ wait: COMPOSER_PERSIST_DEBOUNCE_MS },
);

return {
getItem: (name) => baseStorage.getItem(name),
setItem: (name, value) => {
debouncedSetItem.maybeExecute(name, value);
},
removeItem: (name) => {
debouncedSetItem.cancel();
baseStorage.removeItem(name);
},
flush: () => {
debouncedSetItem.flush();
},
};
}

const composerDebouncedStorage: DebouncedStorage =
typeof localStorage !== "undefined"
? createDebouncedStorage(localStorage)
: { getItem: () => null, setItem: () => {}, removeItem: () => {}, flush: () => {} };

// Flush pending composer draft writes before page unload to prevent data loss.
if (typeof window !== "undefined") {
window.addEventListener("beforeunload", () => {
composerDebouncedStorage.flush();
});
}

export interface PersistedComposerImageAttachment {
id: string;
name: string;
Expand Down Expand Up @@ -1169,7 +1211,7 @@ export const useComposerDraftStore = create<ComposerDraftStoreState>()(
{
name: COMPOSER_DRAFT_STORAGE_KEY,
version: 1,
storage: createJSONStorage(() => localStorage),
storage: createJSONStorage(() => composerDebouncedStorage),
partialize: (state) => {
const persistedDraftsByThreadId: PersistedComposerDraftStoreState["draftsByThreadId"] = {};
for (const [threadId, draft] of Object.entries(state.draftsByThreadId)) {
Expand Down
23 changes: 20 additions & 3 deletions apps/web/src/routes/__root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from "@tanstack/react-router";
import { useEffect, useRef } from "react";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import { Throttler } from "@tanstack/react-pacer";

import { APP_DISPLAY_NAME } from "../branding";
import { Button } from "../components/ui/button";
Expand Down Expand Up @@ -150,6 +151,7 @@ function EventRouter() {
let latestSequence = 0;
let syncing = false;
let pending = false;
let needsProviderInvalidation = false;

const flushSnapshotSync = async (): Promise<void> => {
const snapshot = await api.orchestration.getSnapshot();
Expand Down Expand Up @@ -185,17 +187,30 @@ function EventRouter() {
syncing = false;
};

void syncSnapshot().catch(() => undefined);
const domainEventFlushThrottler = new Throttler(
() => {
if (needsProviderInvalidation) {
needsProviderInvalidation = false;
void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all });
}
void syncSnapshot();
},
{
wait: 100,
leading: false,
trailing: true,
},
);

const unsubDomainEvent = api.orchestration.onDomainEvent((event) => {
if (event.sequence <= latestSequence) {
return;
}
latestSequence = event.sequence;
if (event.type === "thread.turn-diff-completed" || event.type === "thread.reverted") {
void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all });
needsProviderInvalidation = true;
}
void syncSnapshot();
domainEventFlushThrottler.maybeExecute();
});
const unsubTerminalEvent = api.terminal.onEvent((event) => {
const hasRunningSubprocess = terminalRunningSubprocessFromEvent(event);
Expand Down Expand Up @@ -280,6 +295,8 @@ function EventRouter() {
});
return () => {
disposed = true;
needsProviderInvalidation = false;
domainEventFlushThrottler.cancel();
unsubDomainEvent();
unsubTerminalEvent();
unsubWelcome();
Expand Down
Loading